CIRCT 22.0.0git
Loading...
Searching...
No Matches
RpcServer.cpp
Go to the documentation of this file.
1//===- RpcServer.cpp - Run a cosim server ---------------------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
10#include "esi/Context.h"
11#include "esi/Utils.h"
12
13#include "cosim.grpc.pb.h"
14
15#include <grpc/grpc.h>
16#include <grpcpp/security/server_credentials.h>
17#include <grpcpp/server.h>
18#include <grpcpp/server_builder.h>
19#include <grpcpp/server_context.h>
20
21#include <algorithm>
22#include <cassert>
23#include <cstdlib>
24
25using namespace esi;
26using namespace esi::cosim;
27
28using grpc::CallbackServerContext;
29using grpc::Server;
30using grpc::ServerUnaryReactor;
31using grpc::ServerWriteReactor;
32using grpc::Status;
33using grpc::StatusCode;
34
35/// Write the port number to a file. Necessary when we are allowed to select our
36/// own port. We can't use stdout/stderr because the flushing semantics are
37/// undefined (as in `flush()` doesn't work on all simulators).
38static void writePort(uint16_t port) {
39 // "cosim.cfg" since we may want to include other info in the future.
40 FILE *fd = fopen("cosim.cfg", "w");
41 fprintf(fd, "port: %u\n", static_cast<unsigned int>(port));
42 fclose(fd);
43}
44
45namespace {
46class RpcServerReadPort;
47class RpcServerWritePort;
48} // namespace
49
52public:
53 Impl(Context &ctxt, int port);
54 ~Impl();
55
56 Context &getContext() { return ctxt; }
57
58 //===--------------------------------------------------------------------===//
59 // Internal API
60 //===--------------------------------------------------------------------===//
61
63 const std::vector<uint8_t> &compressedManifest) {
64 this->compressedManifest = compressedManifest;
65 this->esiVersion = esiVersion;
66 }
67
68 ReadChannelPort &registerReadPort(const std::string &name,
69 const std::string &type);
70 WriteChannelPort &registerWritePort(const std::string &name,
71 const std::string &type);
72
73 void stop();
74
75 int getPort() { return port; }
76
77 //===--------------------------------------------------------------------===//
78 // RPC API implementations. See the .proto file for the API documentation.
79 //===--------------------------------------------------------------------===//
80
81 ServerUnaryReactor *GetManifest(CallbackServerContext *context,
82 const VoidMessage *,
83 Manifest *response) override;
84 ServerUnaryReactor *ListChannels(CallbackServerContext *, const VoidMessage *,
85 ListOfChannels *channelsOut) override;
86 ServerWriteReactor<esi::cosim::Message> *
87 ConnectToClientChannel(CallbackServerContext *context,
88 const ChannelDesc *request) override;
89 ServerUnaryReactor *SendToServer(CallbackServerContext *context,
90 const esi::cosim::AddressedMessage *request,
91 esi::cosim::VoidMessage *response) override;
92
93private:
96 std::vector<uint8_t> compressedManifest;
97 std::map<std::string, std::unique_ptr<RpcServerReadPort>> readPorts;
98 std::map<std::string, std::unique_ptr<RpcServerWritePort>> writePorts;
99 int port = -1;
100 std::unique_ptr<Server> server;
101};
103
104//===----------------------------------------------------------------------===//
105// Read and write ports
106//
107// Implemented as simple queues which the RPC server writes to and reads from.
108//===----------------------------------------------------------------------===//
109
110namespace {
111/// Implements a simple read queue. The RPC server will push messages into this
112/// as appropriate.
113class RpcServerReadPort : public ReadChannelPort {
114public:
115 RpcServerReadPort(Type *type) : ReadChannelPort(type) {}
116
117 /// Internal call. Push a message FROM the RPC client to the read port.
118 void push(MessageData &data) {
119 while (!callback(data))
120 std::this_thread::sleep_for(std::chrono::milliseconds(1));
121 }
122};
123
124/// Implements a simple write queue. The RPC server will pull messages from this
125/// as appropriate. Note that this could be more performant if a callback is
126/// used. This would have more complexity as when a client disconnects the
127/// outstanding messages will need somewhere to be held until the next client
128/// connects. For now, it's simpler to just have the server poll the queue.
129class RpcServerWritePort : public WriteChannelPort {
130public:
131 RpcServerWritePort(Type *type) : WriteChannelPort(type) {}
132
134
135protected:
136 void writeImpl(const MessageData &data) override { writeQueue.push(data); }
137 bool tryWriteImpl(const MessageData &data) override {
138 writeQueue.push(data);
139 return true;
140 }
141};
142} // namespace
143
144//===----------------------------------------------------------------------===//
145// RPC server implementations
146//===----------------------------------------------------------------------===//
147
148/// Start a server on the given port. -1 means to let the OS pick a port.
149Impl::Impl(Context &ctxt, int port) : ctxt(ctxt), esiVersion(-1) {
150 grpc::ServerBuilder builder;
151 std::string server_address("127.0.0.1:" + std::to_string(port));
152 // TODO: use secure credentials. Not so bad for now since we only accept
153 // connections on localhost.
154 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(),
155 &port);
156 builder.RegisterService(this);
157 server = builder.BuildAndStart();
158 if (!server)
159 throw std::runtime_error("Failed to start server on " + server_address);
161 this->port = port;
162 ctxt.getLogger().info("cosim", "Server listening on 127.0.0.1:" +
163 std::to_string(port));
164}
165
167 // Disconnect all the ports.
168 for (auto &[name, port] : readPorts)
169 port->disconnect();
170 for (auto &[name, port] : writePorts)
171 port->disconnect();
172
173 // Shutdown the server and wait for it to finish.
174 server->Shutdown();
175 server->Wait();
176 server = nullptr;
177}
178
180 if (server)
181 stop();
182}
183
185 const std::string &type) {
186 auto port = new RpcServerReadPort(new Type(type));
187 readPorts.emplace(name, port);
188 return *port;
189}
191 const std::string &type) {
192 auto port = new RpcServerWritePort(new Type(type));
193 writePorts.emplace(name, port);
194 return *port;
195}
196
197ServerUnaryReactor *Impl::GetManifest(CallbackServerContext *context,
198 const VoidMessage *, Manifest *response) {
199 response->set_esi_version(esiVersion);
200 response->set_compressed_manifest(compressedManifest.data(),
201 compressedManifest.size());
202 ServerUnaryReactor *reactor = context->DefaultReactor();
203 reactor->Finish(Status::OK);
204 return reactor;
205}
206
207/// Load the list of channels into the response and fire it off.
208ServerUnaryReactor *Impl::ListChannels(CallbackServerContext *context,
209 const VoidMessage *,
210 ListOfChannels *channelsOut) {
211 for (auto &[name, port] : readPorts) {
212 auto *channel = channelsOut->add_channels();
213 channel->set_name(name);
214 channel->set_type(port->getType()->getID());
215 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER);
216 }
217 for (auto &[name, port] : writePorts) {
218 auto *channel = channelsOut->add_channels();
219 channel->set_name(name);
220 channel->set_type(port->getType()->getID());
221 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT);
222 }
223
224 // The default reactor is basically to just finish the RPC call as if we're
225 // implementing the RPC function as a blocking call.
226 auto reactor = context->DefaultReactor();
227 reactor->Finish(Status::OK);
228 return reactor;
229}
230
231namespace {
232/// When a client connects to a read port (on its end, a write port on this
233/// end), construct one of these to poll the corresponding write port on this
234/// side and forward the messages.
235class RpcServerWriteReactor : public ServerWriteReactor<esi::cosim::Message> {
236public:
237 RpcServerWriteReactor(RpcServerWritePort *writePort)
238 : writePort(writePort), sentSuccessfully(SendStatus::UnknownStatus),
239 shutdown(false) {
240 myThread = std::thread(&RpcServerWriteReactor::threadLoop, this);
241 }
242 ~RpcServerWriteReactor() {
243 shutdown = true;
244 // Wake up the potentially sleeping thread.
245 sentSuccessfullyCV.notify_one();
246 myThread.join();
247 }
248
249 // Deleting 'this' from within a callback is safe since this is how gRPC tells
250 // us that it's released the reference. This pattern lets gRPC manage this
251 // object. (Though a shared pointer would be better.) It was actually copied
252 // from one of the gRPC examples:
253 // https://github.com/grpc/grpc/blob/4795c5e69b25e8c767b498bea784da0ef8c96fd5/examples/cpp/route_guide/route_guide_callback_server.cc#L120
254 // The alternative is to have something else (e.g. Impl) manage this object
255 // and have this method tell it that gRPC is done with it and it should be
256 // deleted. As of now, there's no specific need for that and it adds
257 // additional complexity. If there is at some point in the future, change
258 // this.
259 void OnDone() override { delete this; }
260 void OnWriteDone(bool ok) override {
261 std::scoped_lock<std::mutex> lock(sentMutex);
262 sentSuccessfully = ok ? SendStatus::Success : SendStatus::Failure;
263 sentSuccessfullyCV.notify_one();
264 }
265 void OnCancel() override {
266 std::scoped_lock<std::mutex> lock(sentMutex);
267 sentSuccessfully = SendStatus::Disconnect;
268 sentSuccessfullyCV.notify_one();
269 }
270
271private:
272 /// The polling loop.
273 void threadLoop();
274 /// The polling thread.
275 std::thread myThread;
276
277 /// Assoicated write port on this side. (Read port on the client side.)
278 RpcServerWritePort *writePort;
279
280 /// Mutex to protect the sentSuccessfully flag.
281 std::mutex sentMutex;
282 enum SendStatus { UnknownStatus, Success, Failure, Disconnect };
283 volatile SendStatus sentSuccessfully;
284 std::condition_variable sentSuccessfullyCV;
285
286 std::atomic<bool> shutdown;
287};
288
289} // namespace
290
291void RpcServerWriteReactor::threadLoop() {
292 while (!shutdown && sentSuccessfully != SendStatus::Disconnect) {
293 // TODO: adapt this to a new notification mechanism which is forthcoming.
294 if (writePort->writeQueue.empty())
295 std::this_thread::sleep_for(std::chrono::microseconds(100));
296
297 // This lambda will get called with the message at the front of the queue.
298 // If the send is successful, return true to pop it. We don't know, however,
299 // if the message was sent successfully in this thread. It's only when the
300 // `OnWriteDone` method is called by gRPC that we know. Use locking and
301 // condition variables to orchestrate this confirmation.
302 writePort->writeQueue.pop([this](const MessageData &data) -> bool {
303 esi::cosim::Message msg;
304 msg.set_data(reinterpret_cast<const char *>(data.getBytes()),
305 data.getSize());
306
307 // Get a lock, reset the flag, start sending the message, and wait for the
308 // write to complete or fail. Be mindful of the shutdown flag.
309 std::unique_lock<std::mutex> lock(sentMutex);
310 sentSuccessfully = SendStatus::UnknownStatus;
311 StartWrite(&msg);
312 sentSuccessfullyCV.wait(lock, [&]() {
313 return shutdown || sentSuccessfully != SendStatus::UnknownStatus;
314 });
315 bool ret = sentSuccessfully == SendStatus::Success;
316 lock.unlock();
317 return ret;
318 });
319 }
320 Finish(Status::OK);
321}
322
323/// When a client sends a message to a read port (write port on this end), start
324/// streaming messages until the client calls uncle and requests a cancellation.
325ServerWriteReactor<esi::cosim::Message> *
327 const ChannelDesc *request) {
328 getContext().getLogger().debug("cosim", "connect to client channel");
329 auto it = writePorts.find(request->name());
330 if (it == writePorts.end()) {
331 auto reactor = new RpcServerWriteReactor(nullptr);
332 reactor->Finish(Status(StatusCode::NOT_FOUND, "Unknown channel"));
333 return reactor;
334 }
335 return new RpcServerWriteReactor(it->second.get());
336}
337
338/// When a client sends a message to a write port (a read port on this end),
339/// simply locate the associated port, and write that message into its queue.
340ServerUnaryReactor *
341Impl::SendToServer(CallbackServerContext *context,
342 const esi::cosim::AddressedMessage *request,
343 esi::cosim::VoidMessage *response) {
344 auto reactor = context->DefaultReactor();
345 auto it = readPorts.find(request->channel_name());
346 if (it == readPorts.end()) {
347 reactor->Finish(Status(StatusCode::NOT_FOUND, "Unknown channel"));
348 return reactor;
349 }
350
351 std::string msgDataString = request->message().data();
352 MessageData data(reinterpret_cast<const uint8_t *>(msgDataString.data()),
353 msgDataString.size());
354 it->second->push(data);
355 reactor->Finish(Status::OK);
356 return reactor;
357}
358
359//===----------------------------------------------------------------------===//
360// RpcServer pass throughs to the actual implementations above.
361//===----------------------------------------------------------------------===//
363RpcServer::~RpcServer() = default;
364
365void RpcServer::setManifest(int esiVersion,
366 const std::vector<uint8_t> &compressedManifest) {
367 if (!impl)
368 throw std::runtime_error("Server not running");
369
370 impl->setManifest(esiVersion, compressedManifest);
371}
372
374 const std::string &type) {
375 if (!impl)
376 throw std::runtime_error("Server not running");
377 return impl->registerReadPort(name, type);
378}
379
381 const std::string &type) {
382 return impl->registerWritePort(name, type);
383}
384void RpcServer::run(int port) {
385 if (impl)
386 throw std::runtime_error("Server already running");
387 impl = std::make_unique<Impl>(ctxt, port);
388}
390 if (!impl)
391 throw std::runtime_error("Server not running");
392 impl->stop();
393}
394
396 if (!impl)
397 throw std::runtime_error("Server not running");
398 return impl->getPort();
399}
static std::unique_ptr< Context > context
static void writePort(uint16_t port)
Write the port number to a file.
Definition RpcServer.cpp:38
AcceleratorConnections, Accelerators, and Manifests must all share a context.
Definition Context.h:34
Logger & getLogger()
Definition Context.h:69
virtual void info(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report an informational message.
Definition Logging.h:75
void debug(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report a debug message.
Definition Logging.h:83
Class to parse a manifest.
Definition Manifest.h:39
A logical chunk of data representing serialized data.
Definition Common.h:113
A ChannelPort which reads data from the accelerator.
Definition Ports.h:318
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Definition Ports.h:378
Root class of the ESI type system.
Definition Types.h:34
A ChannelPort which sends data to the accelerator.
Definition Ports.h:206
virtual bool tryWriteImpl(const MessageData &data)=0
Implementation for tryWrite(). Subclasses must implement this.
virtual void writeImpl(const MessageData &)=0
Implementation for write(). Subclasses must implement this.
ServerUnaryReactor * ListChannels(CallbackServerContext *, const VoidMessage *, ListOfChannels *channelsOut) override
Load the list of channels into the response and fire it off.
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
std::unique_ptr< Server > server
ServerWriteReactor< esi::cosim::Message > * ConnectToClientChannel(CallbackServerContext *context, const ChannelDesc *request) override
When a client sends a message to a read port (write port on this end), start streaming messages until...
std::vector< uint8_t > compressedManifest
Definition RpcServer.cpp:96
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Definition RpcServer.cpp:62
Impl(Context &ctxt, int port)
Start a server on the given port. -1 means to let the OS pick a port.
ServerUnaryReactor * GetManifest(CallbackServerContext *context, const VoidMessage *, Manifest *response) override
std::map< std::string, std::unique_ptr< RpcServerWritePort > > writePorts
Definition RpcServer.cpp:98
ServerUnaryReactor * SendToServer(CallbackServerContext *context, const esi::cosim::AddressedMessage *request, esi::cosim::VoidMessage *response) override
When a client sends a message to a write port (a read port on this end), simply locate the associated...
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
std::map< std::string, std::unique_ptr< RpcServerReadPort > > readPorts
Definition RpcServer.cpp:97
RpcServer(Context &ctxt)
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
Register a read or write port which communicates over RPC.
void run(int port=-1)
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Set the manifest and version.
std::unique_ptr< Impl > impl
Definition RpcServer.h:61
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
Thread safe queue.
Definition Utils.h:39
void push(E... t)
Push onto the queue.
Definition Utils.h:55
Definition esi.py:1