CIRCT  20.0.0git
RpcServer.cpp
Go to the documentation of this file.
1 //===- RpcServer.cpp - Run a cosim server ---------------------------------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
10 #include "esi/Utils.h"
11 
12 #include "cosim.grpc.pb.h"
13 
14 #include <grpc/grpc.h>
15 #include <grpcpp/security/server_credentials.h>
16 #include <grpcpp/server.h>
17 #include <grpcpp/server_builder.h>
18 #include <grpcpp/server_context.h>
19 
20 #include <algorithm>
21 #include <cassert>
22 #include <cstdlib>
23 
24 using namespace esi;
25 using namespace esi::cosim;
26 
27 using grpc::CallbackServerContext;
28 using grpc::Server;
29 using grpc::ServerUnaryReactor;
30 using grpc::ServerWriteReactor;
31 using grpc::Status;
32 using grpc::StatusCode;
33 
34 /// Write the port number to a file. Necessary when we are allowed to select our
35 /// own port. We can't use stdout/stderr because the flushing semantics are
36 /// undefined (as in `flush()` doesn't work on all simulators).
37 static void writePort(uint16_t port) {
38  // "cosim.cfg" since we may want to include other info in the future.
39  FILE *fd = fopen("cosim.cfg", "w");
40  fprintf(fd, "port: %u\n", static_cast<unsigned int>(port));
41  fclose(fd);
42 }
43 
44 namespace {
45 class RpcServerReadPort;
46 class RpcServerWritePort;
47 } // namespace
48 
51 public:
52  Impl(int port);
53  ~Impl();
54 
55  //===--------------------------------------------------------------------===//
56  // Internal API
57  //===--------------------------------------------------------------------===//
58 
60  const std::vector<uint8_t> &compressedManifest) {
61  this->compressedManifest = compressedManifest;
62  this->esiVersion = esiVersion;
63  }
64 
65  ReadChannelPort &registerReadPort(const std::string &name,
66  const std::string &type);
67  WriteChannelPort &registerWritePort(const std::string &name,
68  const std::string &type);
69 
70  void stop();
71 
72  //===--------------------------------------------------------------------===//
73  // RPC API implementations. See the .proto file for the API documentation.
74  //===--------------------------------------------------------------------===//
75 
76  ServerUnaryReactor *GetManifest(CallbackServerContext *context,
77  const VoidMessage *,
78  Manifest *response) override;
79  ServerUnaryReactor *ListChannels(CallbackServerContext *, const VoidMessage *,
80  ListOfChannels *channelsOut) override;
81  ServerWriteReactor<esi::cosim::Message> *
82  ConnectToClientChannel(CallbackServerContext *context,
83  const ChannelDesc *request) override;
84  ServerUnaryReactor *SendToServer(CallbackServerContext *context,
85  const esi::cosim::AddressedMessage *request,
86  esi::cosim::VoidMessage *response) override;
87 
88 private:
90  std::vector<uint8_t> compressedManifest;
91  std::map<std::string, std::unique_ptr<RpcServerReadPort>> readPorts;
92  std::map<std::string, std::unique_ptr<RpcServerWritePort>> writePorts;
93 
94  std::unique_ptr<Server> server;
95 };
97 
98 //===----------------------------------------------------------------------===//
99 // Read and write ports
100 //
101 // Implemented as simple queues which the RPC server writes to and reads from.
102 //===----------------------------------------------------------------------===//
103 
104 namespace {
105 /// Implements a simple read queue. The RPC server will push messages into this
106 /// as appropriate.
107 class RpcServerReadPort : public ReadChannelPort {
108 public:
109  RpcServerReadPort(Type *type) : ReadChannelPort(type) {}
110 
111  /// Internal call. Push a message FROM the RPC client to the read port.
112  void push(MessageData &data) {
113  while (!callback(data))
114  std::this_thread::sleep_for(std::chrono::milliseconds(1));
115  }
116 };
117 
118 /// Implements a simple write queue. The RPC server will pull messages from this
119 /// as appropriate. Note that this could be more performant if a callback is
120 /// used. This would have more complexity as when a client disconnects the
121 /// outstanding messages will need somewhere to be held until the next client
122 /// connects. For now, it's simpler to just have the server poll the queue.
123 class RpcServerWritePort : public WriteChannelPort {
124 public:
125  RpcServerWritePort(Type *type) : WriteChannelPort(type) {}
126  void write(const MessageData &data) override { writeQueue.push(data); }
127  bool tryWrite(const MessageData &data) override {
128  writeQueue.push(data);
129  return true;
130  }
131 
132  utils::TSQueue<MessageData> writeQueue;
133 };
134 } // namespace
135 
136 //===----------------------------------------------------------------------===//
137 // RPC server implementations
138 //===----------------------------------------------------------------------===//
139 
140 /// Start a server on the given port. -1 means to let the OS pick a port.
141 Impl::Impl(int port) : esiVersion(-1) {
142  grpc::ServerBuilder builder;
143  std::string server_address("127.0.0.1:" + std::to_string(port));
144  // TODO: use secure credentials. Not so bad for now since we only accept
145  // connections on localhost.
146  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(),
147  &port);
148  builder.RegisterService(this);
149  server = builder.BuildAndStart();
150  if (!server)
151  throw std::runtime_error("Failed to start server on " + server_address);
152  writePort(port);
153  std::cout << "Server listening on 127.0.0.1:" << port << std::endl;
154 }
155 
156 void Impl::stop() {
157  // Disconnect all the ports.
158  for (auto &[name, port] : readPorts)
159  port->disconnect();
160  for (auto &[name, port] : writePorts)
161  port->disconnect();
162 
163  // Shutdown the server and wait for it to finish.
164  server->Shutdown();
165  server->Wait();
166  server = nullptr;
167 }
168 
170  if (server)
171  stop();
172 }
173 
174 ReadChannelPort &Impl::registerReadPort(const std::string &name,
175  const std::string &type) {
176  auto port = new RpcServerReadPort(new Type(type));
177  readPorts.emplace(name, port);
178  port->connect();
179  return *port;
180 }
181 WriteChannelPort &Impl::registerWritePort(const std::string &name,
182  const std::string &type) {
183  auto port = new RpcServerWritePort(new Type(type));
184  writePorts.emplace(name, port);
185  port->connect();
186  return *port;
187 }
188 
189 ServerUnaryReactor *Impl::GetManifest(CallbackServerContext *context,
190  const VoidMessage *, Manifest *response) {
191  response->set_esi_version(esiVersion);
192  response->set_compressed_manifest(compressedManifest.data(),
193  compressedManifest.size());
194  ServerUnaryReactor *reactor = context->DefaultReactor();
195  reactor->Finish(Status::OK);
196  return reactor;
197 }
198 
199 /// Load the list of channels into the response and fire it off.
200 ServerUnaryReactor *Impl::ListChannels(CallbackServerContext *context,
201  const VoidMessage *,
202  ListOfChannels *channelsOut) {
203  for (auto &[name, port] : readPorts) {
204  auto *channel = channelsOut->add_channels();
205  channel->set_name(name);
206  channel->set_type(port->getType()->getID());
207  channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER);
208  }
209  for (auto &[name, port] : writePorts) {
210  auto *channel = channelsOut->add_channels();
211  channel->set_name(name);
212  channel->set_type(port->getType()->getID());
213  channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT);
214  }
215 
216  // The default reactor is basically to just finish the RPC call as if we're
217  // implementing the RPC function as a blocking call.
218  auto reactor = context->DefaultReactor();
219  reactor->Finish(Status::OK);
220  return reactor;
221 }
222 
223 namespace {
224 /// When a client connects to a read port (on its end, a write port on this
225 /// end), construct one of these to poll the corresponding write port on this
226 /// side and forward the messages.
227 class RpcServerWriteReactor : public ServerWriteReactor<esi::cosim::Message> {
228 public:
229  RpcServerWriteReactor(RpcServerWritePort *writePort)
230  : writePort(writePort), sentSuccessfully(SendStatus::UnknownStatus),
231  shutdown(false) {
232  myThread = std::thread(&RpcServerWriteReactor::threadLoop, this);
233  }
234  ~RpcServerWriteReactor() {
235  shutdown = true;
236  // Wake up the potentially sleeping thread.
237  sentSuccessfullyCV.notify_one();
238  myThread.join();
239  }
240 
241  // Deleting 'this' from within a callback is safe since this is how gRPC tells
242  // us that it's released the reference. This pattern lets gRPC manage this
243  // object. (Though a shared pointer would be better.) It was actually copied
244  // from one of the gRPC examples:
245  // https://github.com/grpc/grpc/blob/4795c5e69b25e8c767b498bea784da0ef8c96fd5/examples/cpp/route_guide/route_guide_callback_server.cc#L120
246  // The alternative is to have something else (e.g. Impl) manage this object
247  // and have this method tell it that gRPC is done with it and it should be
248  // deleted. As of now, there's no specific need for that and it adds
249  // additional complexity. If there is at some point in the future, change
250  // this.
251  void OnDone() override { delete this; }
252  void OnWriteDone(bool ok) override {
253  std::scoped_lock<std::mutex> lock(sentMutex);
254  sentSuccessfully = ok ? SendStatus::Success : SendStatus::Failure;
255  sentSuccessfullyCV.notify_one();
256  }
257  void OnCancel() override {
258  std::scoped_lock<std::mutex> lock(sentMutex);
259  sentSuccessfully = SendStatus::Disconnect;
260  sentSuccessfullyCV.notify_one();
261  }
262 
263 private:
264  /// The polling loop.
265  void threadLoop();
266  /// The polling thread.
267  std::thread myThread;
268 
269  /// Assoicated write port on this side. (Read port on the client side.)
270  RpcServerWritePort *writePort;
271 
272  /// Mutex to protect the sentSuccessfully flag.
273  std::mutex sentMutex;
274  enum SendStatus { UnknownStatus, Success, Failure, Disconnect };
275  volatile SendStatus sentSuccessfully;
276  std::condition_variable sentSuccessfullyCV;
277 
278  std::atomic<bool> shutdown;
279 };
280 
281 } // namespace
282 
283 void RpcServerWriteReactor::threadLoop() {
284  while (!shutdown && sentSuccessfully != SendStatus::Disconnect) {
285  // TODO: adapt this to a new notification mechanism which is forthcoming.
286  if (writePort->writeQueue.empty())
287  std::this_thread::sleep_for(std::chrono::microseconds(100));
288 
289  // This lambda will get called with the message at the front of the queue.
290  // If the send is successful, return true to pop it. We don't know, however,
291  // if the message was sent successfully in this thread. It's only when the
292  // `OnWriteDone` method is called by gRPC that we know. Use locking and
293  // condition variables to orchestrate this confirmation.
294  writePort->writeQueue.pop([this](const MessageData &data) -> bool {
295  esi::cosim::Message msg;
296  msg.set_data(reinterpret_cast<const char *>(data.getBytes()),
297  data.getSize());
298 
299  // Get a lock, reset the flag, start sending the message, and wait for the
300  // write to complete or fail. Be mindful of the shutdown flag.
301  std::unique_lock<std::mutex> lock(sentMutex);
302  sentSuccessfully = SendStatus::UnknownStatus;
303  StartWrite(&msg);
304  sentSuccessfullyCV.wait(lock, [&]() {
305  return shutdown || sentSuccessfully != SendStatus::UnknownStatus;
306  });
307  bool ret = sentSuccessfully == SendStatus::Success;
308  lock.unlock();
309  return ret;
310  });
311  }
312  Finish(Status::OK);
313 }
314 
315 /// When a client sends a message to a read port (write port on this end), start
316 /// streaming messages until the client calls uncle and requests a cancellation.
317 ServerWriteReactor<esi::cosim::Message> *
318 Impl::ConnectToClientChannel(CallbackServerContext *context,
319  const ChannelDesc *request) {
320  printf("connect to client channel\n");
321  auto it = writePorts.find(request->name());
322  if (it == writePorts.end()) {
323  auto reactor = new RpcServerWriteReactor(nullptr);
324  reactor->Finish(Status(StatusCode::NOT_FOUND, "Unknown channel"));
325  return reactor;
326  }
327  return new RpcServerWriteReactor(it->second.get());
328 }
329 
330 /// When a client sends a message to a write port (a read port on this end),
331 /// simply locate the associated port, and write that message into its queue.
332 ServerUnaryReactor *
333 Impl::SendToServer(CallbackServerContext *context,
334  const esi::cosim::AddressedMessage *request,
335  esi::cosim::VoidMessage *response) {
336  auto reactor = context->DefaultReactor();
337  auto it = readPorts.find(request->channel_name());
338  if (it == readPorts.end()) {
339  reactor->Finish(Status(StatusCode::NOT_FOUND, "Unknown channel"));
340  return reactor;
341  }
342 
343  std::string msgDataString = request->message().data();
344  MessageData data(reinterpret_cast<const uint8_t *>(msgDataString.data()),
345  msgDataString.size());
346  it->second->push(data);
347  reactor->Finish(Status::OK);
348  return reactor;
349 }
350 
351 //===----------------------------------------------------------------------===//
352 // RpcServer pass throughs to the actual implementations above.
353 //===----------------------------------------------------------------------===//
355  if (impl)
356  delete impl;
357 }
358 void RpcServer::setManifest(int esiVersion,
359  const std::vector<uint8_t> &compressedManifest) {
360  impl->setManifest(esiVersion, compressedManifest);
361 }
363  const std::string &type) {
364  return impl->registerReadPort(name, type);
365 }
367  const std::string &type) {
368  return impl->registerWritePort(name, type);
369 }
370 void RpcServer::run(int port) { impl = new Impl(port); }
372  assert(impl && "Server not running");
373  impl->stop();
374 }
assert(baseType &&"element must be base type")
static void writePort(uint16_t port)
Write the port number to a file.
Definition: RpcServer.cpp:37
esi::cosim::RpcServer::Impl Impl
Definition: RpcServer.cpp:96
Class to parse a manifest.
Definition: Manifest.h:39
A logical chunk of data representing serialized data.
Definition: Common.h:92
A ChannelPort which reads data from the accelerator.
Definition: Ports.h:103
Root class of the ESI type system.
Definition: Types.h:27
A ChannelPort which sends data to the accelerator.
Definition: Ports.h:74
ServerUnaryReactor * ListChannels(CallbackServerContext *, const VoidMessage *, ListOfChannels *channelsOut) override
Load the list of channels into the response and fire it off.
Definition: RpcServer.cpp:200
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
Definition: RpcServer.cpp:174
std::unique_ptr< Server > server
Definition: RpcServer.cpp:94
ServerWriteReactor< esi::cosim::Message > * ConnectToClientChannel(CallbackServerContext *context, const ChannelDesc *request) override
When a client sends a message to a read port (write port on this end), start streaming messages until...
Definition: RpcServer.cpp:318
std::vector< uint8_t > compressedManifest
Definition: RpcServer.cpp:90
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Definition: RpcServer.cpp:59
Impl(int port)
Start a server on the given port. -1 means to let the OS pick a port.
Definition: RpcServer.cpp:141
ServerUnaryReactor * GetManifest(CallbackServerContext *context, const VoidMessage *, Manifest *response) override
Definition: RpcServer.cpp:189
std::map< std::string, std::unique_ptr< RpcServerWritePort > > writePorts
Definition: RpcServer.cpp:92
ServerUnaryReactor * SendToServer(CallbackServerContext *context, const esi::cosim::AddressedMessage *request, esi::cosim::VoidMessage *response) override
When a client sends a message to a write port (a read port on this end), simply locate the associated...
Definition: RpcServer.cpp:333
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
Definition: RpcServer.cpp:181
std::map< std::string, std::unique_ptr< RpcServerReadPort > > readPorts
Definition: RpcServer.cpp:91
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
Register a read or write port which communicates over RPC.
Definition: RpcServer.cpp:362
void run(int port)
Definition: RpcServer.cpp:370
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Set the manifest and version.
Definition: RpcServer.cpp:358
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
Definition: RpcServer.cpp:366
Thread safe queue.
Definition: Utils.h:39
Definition: esi.py:1