CIRCT  19.0.0git
Server.cpp
Go to the documentation of this file.
1 //===- Server.cpp - Cosim RPC server ----------------------------*- C++ -*-===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 //
9 // Definitions for the RPC server class. Capnp C++ RPC servers are based on
10 // 'libkj' and its asyncrony model plus the capnp C++ API, both of which feel
11 // very foreign. In general, both RPC arguments and returns are passed as a C++
12 // object. In order to return data, the capnp message must be constructed inside
13 // that object.
14 //
15 // A [capnp encoded message](https://capnproto.org/encoding.html) can have
16 // multiple 'segments', which is a pain to deal with. (See comments below.)
17 //
18 //===----------------------------------------------------------------------===//
19 
20 #include "cosim/Server.h"
21 #include "CosimDpi.capnp.h"
22 #include <capnp/ez-rpc.h>
23 #include <thread>
24 #ifdef _WIN32
25 #include <io.h>
26 #else
27 #include <unistd.h>
28 #endif
29 
30 using namespace capnp;
31 using namespace esi::cosim;
32 
33 namespace {
34 /// Implements the `EsiDpiEndpoint` interface from the RPC schema. Mostly a
35 /// wrapper around an `Endpoint` object. Whereas the `Endpoint`s are long-lived
36 /// (associated with the HW endpoint), this class is constructed/destructed
37 /// when the client open()s it.
38 class EndpointServer final : public EsiDpiEndpoint::Server {
39  /// The wrapped endpoint.
40  Endpoint &endpoint;
41  /// Signals that this endpoint has been opened by a client and hasn't been
42  /// closed by said client.
43  bool open;
44 
45 public:
46  EndpointServer(Endpoint &ep);
47  /// Release the Endpoint should the client disconnect without properly closing
48  /// it.
49  ~EndpointServer();
50  /// Disallow copying as the 'open' variable needs to track the endpoint.
51  EndpointServer(const EndpointServer &) = delete;
52 
53  /// Implement the EsiDpiEndpoint RPC interface.
54  kj::Promise<void> sendFromHost(SendFromHostContext) override;
55  kj::Promise<void> recvToHost(RecvToHostContext) override;
56  kj::Promise<void> close(CloseContext) override;
57 };
58 
59 /// Implement the low level cosim RPC protocol.
60 class LowLevelServer final : public EsiLowLevel::Server {
61  // Queues to and from the simulation.
62  LowLevel &bridge;
63 
64  // Functions which poll for responses without blocking the main loop. Polling
65  // ain't great, but it's the only way (AFAICT) to do inter-thread
66  // communication between a libkj concurrent thread and other threads. There is
67  // a non-polling way to do it by setting up a queue over a OS-level pipe
68  // (since the libkj event loop uses 'select').
69  kj::Promise<void> pollReadResp(ReadMMIOContext context);
70  kj::Promise<void> pollWriteResp(WriteMMIOContext context);
71 
72 public:
73  LowLevelServer(LowLevel &bridge);
74  /// Release the Endpoint should the client disconnect without properly closing
75  /// it.
76  ~LowLevelServer();
77  /// Disallow copying as the 'open' variable needs to track the endpoint.
78  LowLevelServer(const LowLevelServer &) = delete;
79 
80  // Implement the protocol methods.
81  kj::Promise<void> readMMIO(ReadMMIOContext) override;
82  kj::Promise<void> writeMMIO(WriteMMIOContext) override;
83 };
84 
85 /// Implements the `CosimDpiServer` interface from the RPC schema.
86 class CosimServer final : public CosimDpiServer::Server {
87  /// The registry of endpoints. The RpcServer class owns this.
89  LowLevel &lowLevelBridge;
90  const unsigned int &esiVersion;
91  const std::vector<uint8_t> &compressedManifest;
92 
93 public:
94  CosimServer(EndpointRegistry &reg, LowLevel &lowLevelBridge,
95  const unsigned int &esiVersion,
96  const std::vector<uint8_t> &compressedManifest);
97 
98  /// List all the registered interfaces.
99  kj::Promise<void> list(ListContext ctxt) override;
100  /// Open a specific interface, locking it in the process.
101  kj::Promise<void> open(OpenContext ctxt) override;
102 
103  kj::Promise<void>
104  getCompressedManifest(GetCompressedManifestContext) override;
105 
106  kj::Promise<void> openLowLevel(OpenLowLevelContext ctxt) override;
107 };
108 } // anonymous namespace
109 
110 /// ------ EndpointServer definitions.
111 
112 EndpointServer::EndpointServer(Endpoint &ep) : endpoint(ep), open(true) {}
113 EndpointServer::~EndpointServer() {
114  if (open)
115  endpoint.returnForUse();
116 }
117 
118 /// This is the client polling for a message. If one is available, send it.
119 /// TODO: implement a blocking call with a timeout.
120 kj::Promise<void> EndpointServer::recvToHost(RecvToHostContext context) {
121  KJ_REQUIRE(open, "EndPoint closed already");
122 
123  // Try to pop a message.
124  Endpoint::BlobPtr blob;
125  auto msgPresent = endpoint.getMessageToClient(blob);
126  context.getResults().setHasData(msgPresent);
127  if (msgPresent) {
128  Data::Builder data(blob->data(), blob->size());
129  context.getResults().setResp(data.asReader());
130  }
131  return kj::READY_NOW;
132 }
133 
134 /// 'Send' is from the client perspective, so this is a message we are
135 /// recieving. The only way I could figure out to copy the raw message is a
136 /// double copy. I was have issues getting libkj's arrays to play nice with
137 /// others.
138 kj::Promise<void> EndpointServer::sendFromHost(SendFromHostContext context) {
139  KJ_REQUIRE(open, "EndPoint closed already");
140  KJ_REQUIRE(context.getParams().hasMsg(), "Send request must have a message.");
141  kj::ArrayPtr<const kj::byte> data = context.getParams().getMsg().asBytes();
142  Endpoint::BlobPtr blob =
143  std::make_unique<Endpoint::Blob>(data.begin(), data.end());
144  endpoint.pushMessageToSim(std::move(blob));
145  return kj::READY_NOW;
146 }
147 
148 kj::Promise<void> EndpointServer::close(CloseContext context) {
149  KJ_REQUIRE(open, "EndPoint closed already");
150  open = false;
151  endpoint.returnForUse();
152  return kj::READY_NOW;
153 }
154 
155 /// ------ LowLevelServer definitions.
156 
157 LowLevelServer::LowLevelServer(LowLevel &bridge) : bridge(bridge) {}
158 LowLevelServer::~LowLevelServer() {}
159 
160 kj::Promise<void> LowLevelServer::pollReadResp(ReadMMIOContext context) {
161  auto respMaybe = bridge.readResps.pop();
162  if (!respMaybe.has_value()) {
163  return kj::evalLast(
164  [this, KJ_CPCAP(context)]() mutable { return pollReadResp(context); });
165  }
166  auto resp = respMaybe.value();
167  KJ_REQUIRE(resp.second == 0, "Read MMIO register encountered an error");
168  context.getResults().setData(resp.first);
169  return kj::READY_NOW;
170 }
171 
172 kj::Promise<void> LowLevelServer::readMMIO(ReadMMIOContext context) {
173  bridge.readReqs.push(context.getParams().getAddress());
174  return kj::evalLast(
175  [this, KJ_CPCAP(context)]() mutable { return pollReadResp(context); });
176 }
177 
178 kj::Promise<void> LowLevelServer::pollWriteResp(WriteMMIOContext context) {
179  auto respMaybe = bridge.writeResps.pop();
180  if (!respMaybe.has_value()) {
181  return kj::evalLast(
182  [this, KJ_CPCAP(context)]() mutable { return pollWriteResp(context); });
183  }
184  auto resp = respMaybe.value();
185  KJ_REQUIRE(resp == 0, "write MMIO register encountered an error");
186  return kj::READY_NOW;
187 }
188 
189 kj::Promise<void> LowLevelServer::writeMMIO(WriteMMIOContext context) {
190  bridge.writeReqs.push(context.getParams().getAddress(),
191  context.getParams().getData());
192  return kj::evalLast(
193  [this, KJ_CPCAP(context)]() mutable { return pollWriteResp(context); });
194 }
195 
196 /// ----- CosimServer definitions.
197 
198 CosimServer::CosimServer(EndpointRegistry &reg, LowLevel &lowLevelBridge,
199  const unsigned int &esiVersion,
200  const std::vector<uint8_t> &compressedManifest)
201  : reg(reg), lowLevelBridge(lowLevelBridge), esiVersion(esiVersion),
202  compressedManifest(compressedManifest) {
203  printf("version: %d\n", esiVersion);
204 }
205 
206 kj::Promise<void> CosimServer::list(ListContext context) {
207  auto ifaces = context.getResults().initIfaces((unsigned int)reg.size());
208  unsigned int ctr = 0u;
209  reg.iterateEndpoints([&](std::string id, const Endpoint &ep) {
210  ifaces[ctr].setEndpointID(id);
211  ifaces[ctr].setFromHostType(ep.getSendTypeId());
212  ifaces[ctr].setToHostType(ep.getRecvTypeId());
213  ++ctr;
214  });
215  return kj::READY_NOW;
216 }
217 
218 kj::Promise<void> CosimServer::open(OpenContext ctxt) {
219  Endpoint *ep = reg[ctxt.getParams().getIface().getEndpointID()];
220  KJ_REQUIRE(ep != nullptr, "Could not find endpoint");
221 
222  auto gotLock = ep->setInUse();
223  KJ_REQUIRE(gotLock, "Endpoint in use");
224 
225  ctxt.getResults().setEndpoint(
226  EsiDpiEndpoint::Client(kj::heap<EndpointServer>(*ep)));
227  return kj::READY_NOW;
228 }
229 
230 kj::Promise<void>
231 CosimServer::getCompressedManifest(GetCompressedManifestContext ctxt) {
232  ctxt.getResults().setVersion(esiVersion);
233  ctxt.getResults().setCompressedManifest(
234  Data::Reader(compressedManifest.data(), compressedManifest.size()));
235  return kj::READY_NOW;
236 }
237 
238 kj::Promise<void> CosimServer::openLowLevel(OpenLowLevelContext ctxt) {
239  ctxt.getResults().setLowLevel(kj::heap<LowLevelServer>(lowLevelBridge));
240  return kj::READY_NOW;
241 }
242 
243 /// ----- RpcServer definitions.
244 
245 RpcServer::RpcServer() : mainThread(nullptr), stopSig(false) {}
247 
248 /// Write the port number to a file. Necessary when we allow 'EzRpcServer' to
249 /// select its own port. We can't use stdout/stderr because the flushing
250 /// semantics are undefined (as in `flush()` doesn't work on all simulators).
251 static void writePort(uint16_t port) {
252  // "cosim.cfg" since we may want to include other info in the future.
253  FILE *fd = fopen("cosim.cfg", "w");
254  fprintf(fd, "port: %u\n", (unsigned int)port);
255  fclose(fd);
256 }
257 
258 void RpcServer::mainLoop(uint16_t port) {
259  capnp::EzRpcServer rpcServer(kj::heap<CosimServer>(endpoints, lowLevelBridge,
260  esiVersion,
262  /* bindAddress */ "*", port);
263  auto &waitScope = rpcServer.getWaitScope();
264  // If port is 0, ExRpcSever selects one and we have to wait to get the port.
265  if (port == 0) {
266  auto portPromise = rpcServer.getPort();
267  port = portPromise.wait(waitScope);
268  }
269  writePort(port);
270  printf("[COSIM] Listening on port: %u\n", (unsigned int)port);
271 
272  // OK, this is uber hacky, but it unblocks me and isn't _too_ inefficient. The
273  // problem is that I can't figure out how read the stop signal from libkj
274  // asyncrony land.
275  //
276  // IIRC the main libkj wait loop uses `select()` (or something similar on
277  // Windows) on its FDs. As a result, any code which checks the stop variable
278  // doesn't run until there is some I/O. Probably the right way is to set up a
279  // pipe to deliver a shutdown signal.
280  //
281  // TODO: Figure out how to do this properly, if possible.
282  while (!stopSig) {
283  waitScope.poll();
284  std::this_thread::sleep_for(std::chrono::milliseconds(10));
285  }
286 }
287 
288 /// Start the server if not already started.
289 void RpcServer::run(uint16_t port) {
290  Lock g(m);
291  if (mainThread == nullptr) {
292  mainThread = new std::thread(&RpcServer::mainLoop, this, port);
293  } else {
294  fprintf(stderr, "Warning: cannot Run() RPC server more than once!");
295  }
296 }
297 
298 /// Signal the RPC server thread to stop. Wait for it to exit.
300  Lock g(m);
301  if (mainThread == nullptr) {
302  fprintf(stderr, "RpcServer not Run()\n");
303  } else if (!stopSig) {
304  stopSig = true;
305  mainThread->join();
306  }
307 }
static void writePort(uint16_t port)
Write the port number to a file.
Definition: Server.cpp:251
The Endpoint registry is where Endpoints report their existence (register) and they are looked up by ...
Definition: Endpoint.h:110
Implements a bi-directional, thread-safe bridge between the RPC server and DPI functions.
Definition: Endpoint.h:33
std::string getSendTypeId() const
Definition: Endpoint.h:49
bool setInUse()
These two are used to set and unset the inUse flag, to ensure that an open endpoint is not opened aga...
Definition: Endpoint.cpp:23
std::string getRecvTypeId() const
Definition: Endpoint.h:50
std::unique_ptr< Blob > BlobPtr
Definition: Endpoint.h:39
TSQueue< std::pair< uint64_t, uint8_t > > readResps
Definition: LowLevel.h:30
void mainLoop(uint16_t port)
The thread's main loop function. Exits on shutdown.
Definition: Server.cpp:258
unsigned int esiVersion
Definition: Server.h:58
std::thread * mainThread
Definition: Server.h:54
EndpointRegistry endpoints
Definition: Server.h:32
std::vector< uint8_t > compressedManifest
Definition: Server.h:59
std::mutex m
Definition: Server.h:56
void stop()
Signal the RPC server thread to stop. Wait for it to exit.
Definition: Server.cpp:299
std::lock_guard< std::mutex > Lock
Definition: Server.h:49
void run(uint16_t port)
Start and stop the server thread.
Definition: Server.cpp:289
LowLevel lowLevelBridge
Definition: Server.h:33
volatile bool stopSig
Definition: Server.h:55
std::optional< T > pop()
Pop something off the queue but return nullopt if the queue is empty.
Definition: Utils.h:37
def reg(value, clock, reset=None, reset_value=None, name=None, sym_name=None)
Definition: seq.py:20