CIRCT 21.0.0git
Loading...
Searching...
No Matches
RpcServer.cpp
Go to the documentation of this file.
1//===- RpcServer.cpp - Run a cosim server ---------------------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
10#include "esi/Utils.h"
11
12#include "cosim.grpc.pb.h"
13
14#include <grpc/grpc.h>
15#include <grpcpp/security/server_credentials.h>
16#include <grpcpp/server.h>
17#include <grpcpp/server_builder.h>
18#include <grpcpp/server_context.h>
19
20#include <algorithm>
21#include <cassert>
22#include <cstdlib>
23
24using namespace esi;
25using namespace esi::cosim;
26
27using grpc::CallbackServerContext;
28using grpc::Server;
29using grpc::ServerUnaryReactor;
30using grpc::ServerWriteReactor;
31using grpc::Status;
32using grpc::StatusCode;
33
34/// Write the port number to a file. Necessary when we are allowed to select our
35/// own port. We can't use stdout/stderr because the flushing semantics are
36/// undefined (as in `flush()` doesn't work on all simulators).
37static void writePort(uint16_t port) {
38 // "cosim.cfg" since we may want to include other info in the future.
39 FILE *fd = fopen("cosim.cfg", "w");
40 fprintf(fd, "port: %u\n", static_cast<unsigned int>(port));
41 fclose(fd);
42}
43
44namespace {
45class RpcServerReadPort;
46class RpcServerWritePort;
47} // namespace
48
51public:
52 Impl(int port);
53 ~Impl();
54
55 //===--------------------------------------------------------------------===//
56 // Internal API
57 //===--------------------------------------------------------------------===//
58
60 const std::vector<uint8_t> &compressedManifest) {
61 this->compressedManifest = compressedManifest;
62 this->esiVersion = esiVersion;
63 }
64
65 ReadChannelPort &registerReadPort(const std::string &name,
66 const std::string &type);
67 WriteChannelPort &registerWritePort(const std::string &name,
68 const std::string &type);
69
70 void stop();
71
72 //===--------------------------------------------------------------------===//
73 // RPC API implementations. See the .proto file for the API documentation.
74 //===--------------------------------------------------------------------===//
75
76 ServerUnaryReactor *GetManifest(CallbackServerContext *context,
77 const VoidMessage *,
78 Manifest *response) override;
79 ServerUnaryReactor *ListChannels(CallbackServerContext *, const VoidMessage *,
80 ListOfChannels *channelsOut) override;
81 ServerWriteReactor<esi::cosim::Message> *
82 ConnectToClientChannel(CallbackServerContext *context,
83 const ChannelDesc *request) override;
84 ServerUnaryReactor *SendToServer(CallbackServerContext *context,
85 const esi::cosim::AddressedMessage *request,
86 esi::cosim::VoidMessage *response) override;
87
88private:
90 std::vector<uint8_t> compressedManifest;
91 std::map<std::string, std::unique_ptr<RpcServerReadPort>> readPorts;
92 std::map<std::string, std::unique_ptr<RpcServerWritePort>> writePorts;
93
94 std::unique_ptr<Server> server;
95};
97
98//===----------------------------------------------------------------------===//
99// Read and write ports
100//
101// Implemented as simple queues which the RPC server writes to and reads from.
102//===----------------------------------------------------------------------===//
103
104namespace {
105/// Implements a simple read queue. The RPC server will push messages into this
106/// as appropriate.
107class RpcServerReadPort : public ReadChannelPort {
108public:
109 RpcServerReadPort(Type *type) : ReadChannelPort(type) {}
110
111 /// Internal call. Push a message FROM the RPC client to the read port.
112 void push(MessageData &data) {
113 while (!callback(data))
114 std::this_thread::sleep_for(std::chrono::milliseconds(1));
115 }
116};
117
118/// Implements a simple write queue. The RPC server will pull messages from this
119/// as appropriate. Note that this could be more performant if a callback is
120/// used. This would have more complexity as when a client disconnects the
121/// outstanding messages will need somewhere to be held until the next client
122/// connects. For now, it's simpler to just have the server poll the queue.
123class RpcServerWritePort : public WriteChannelPort {
124public:
125 RpcServerWritePort(Type *type) : WriteChannelPort(type) {}
126 void write(const MessageData &data) override { writeQueue.push(data); }
127 bool tryWrite(const MessageData &data) override {
128 writeQueue.push(data);
129 return true;
130 }
131
133};
134} // namespace
135
136//===----------------------------------------------------------------------===//
137// RPC server implementations
138//===----------------------------------------------------------------------===//
139
140/// Start a server on the given port. -1 means to let the OS pick a port.
141Impl::Impl(int port) : esiVersion(-1) {
142 grpc::ServerBuilder builder;
143 std::string server_address("127.0.0.1:" + std::to_string(port));
144 // TODO: use secure credentials. Not so bad for now since we only accept
145 // connections on localhost.
146 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(),
147 &port);
148 builder.RegisterService(this);
149 server = builder.BuildAndStart();
150 if (!server)
151 throw std::runtime_error("Failed to start server on " + server_address);
152 writePort(port);
153 std::cout << "Server listening on 127.0.0.1:" << port << std::endl;
154}
155
157 // Disconnect all the ports.
158 for (auto &[name, port] : readPorts)
159 port->disconnect();
160 for (auto &[name, port] : writePorts)
161 port->disconnect();
162
163 // Shutdown the server and wait for it to finish.
164 server->Shutdown();
165 server->Wait();
166 server = nullptr;
167}
168
170 if (server)
171 stop();
172}
173
175 const std::string &type) {
176 auto port = new RpcServerReadPort(new Type(type));
177 readPorts.emplace(name, port);
178 port->connect();
179 return *port;
180}
182 const std::string &type) {
183 auto port = new RpcServerWritePort(new Type(type));
184 writePorts.emplace(name, port);
185 port->connect();
186 return *port;
187}
188
189ServerUnaryReactor *Impl::GetManifest(CallbackServerContext *context,
190 const VoidMessage *, Manifest *response) {
191 response->set_esi_version(esiVersion);
192 response->set_compressed_manifest(compressedManifest.data(),
193 compressedManifest.size());
194 ServerUnaryReactor *reactor = context->DefaultReactor();
195 reactor->Finish(Status::OK);
196 return reactor;
197}
198
199/// Load the list of channels into the response and fire it off.
200ServerUnaryReactor *Impl::ListChannels(CallbackServerContext *context,
201 const VoidMessage *,
202 ListOfChannels *channelsOut) {
203 for (auto &[name, port] : readPorts) {
204 auto *channel = channelsOut->add_channels();
205 channel->set_name(name);
206 channel->set_type(port->getType()->getID());
207 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER);
208 }
209 for (auto &[name, port] : writePorts) {
210 auto *channel = channelsOut->add_channels();
211 channel->set_name(name);
212 channel->set_type(port->getType()->getID());
213 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT);
214 }
215
216 // The default reactor is basically to just finish the RPC call as if we're
217 // implementing the RPC function as a blocking call.
218 auto reactor = context->DefaultReactor();
219 reactor->Finish(Status::OK);
220 return reactor;
221}
222
223namespace {
224/// When a client connects to a read port (on its end, a write port on this
225/// end), construct one of these to poll the corresponding write port on this
226/// side and forward the messages.
227class RpcServerWriteReactor : public ServerWriteReactor<esi::cosim::Message> {
228public:
229 RpcServerWriteReactor(RpcServerWritePort *writePort)
230 : writePort(writePort), sentSuccessfully(SendStatus::UnknownStatus),
231 shutdown(false) {
232 myThread = std::thread(&RpcServerWriteReactor::threadLoop, this);
233 }
234 ~RpcServerWriteReactor() {
235 shutdown = true;
236 // Wake up the potentially sleeping thread.
237 sentSuccessfullyCV.notify_one();
238 myThread.join();
239 }
240
241 // Deleting 'this' from within a callback is safe since this is how gRPC tells
242 // us that it's released the reference. This pattern lets gRPC manage this
243 // object. (Though a shared pointer would be better.) It was actually copied
244 // from one of the gRPC examples:
245 // https://github.com/grpc/grpc/blob/4795c5e69b25e8c767b498bea784da0ef8c96fd5/examples/cpp/route_guide/route_guide_callback_server.cc#L120
246 // The alternative is to have something else (e.g. Impl) manage this object
247 // and have this method tell it that gRPC is done with it and it should be
248 // deleted. As of now, there's no specific need for that and it adds
249 // additional complexity. If there is at some point in the future, change
250 // this.
251 void OnDone() override { delete this; }
252 void OnWriteDone(bool ok) override {
253 std::scoped_lock<std::mutex> lock(sentMutex);
254 sentSuccessfully = ok ? SendStatus::Success : SendStatus::Failure;
255 sentSuccessfullyCV.notify_one();
256 }
257 void OnCancel() override {
258 std::scoped_lock<std::mutex> lock(sentMutex);
259 sentSuccessfully = SendStatus::Disconnect;
260 sentSuccessfullyCV.notify_one();
261 }
262
263private:
264 /// The polling loop.
265 void threadLoop();
266 /// The polling thread.
267 std::thread myThread;
268
269 /// Assoicated write port on this side. (Read port on the client side.)
270 RpcServerWritePort *writePort;
271
272 /// Mutex to protect the sentSuccessfully flag.
273 std::mutex sentMutex;
274 enum SendStatus { UnknownStatus, Success, Failure, Disconnect };
275 volatile SendStatus sentSuccessfully;
276 std::condition_variable sentSuccessfullyCV;
277
278 std::atomic<bool> shutdown;
279};
280
281} // namespace
282
283void RpcServerWriteReactor::threadLoop() {
284 while (!shutdown && sentSuccessfully != SendStatus::Disconnect) {
285 // TODO: adapt this to a new notification mechanism which is forthcoming.
286 if (writePort->writeQueue.empty())
287 std::this_thread::sleep_for(std::chrono::microseconds(100));
288
289 // This lambda will get called with the message at the front of the queue.
290 // If the send is successful, return true to pop it. We don't know, however,
291 // if the message was sent successfully in this thread. It's only when the
292 // `OnWriteDone` method is called by gRPC that we know. Use locking and
293 // condition variables to orchestrate this confirmation.
294 writePort->writeQueue.pop([this](const MessageData &data) -> bool {
295 esi::cosim::Message msg;
296 msg.set_data(reinterpret_cast<const char *>(data.getBytes()),
297 data.getSize());
298
299 // Get a lock, reset the flag, start sending the message, and wait for the
300 // write to complete or fail. Be mindful of the shutdown flag.
301 std::unique_lock<std::mutex> lock(sentMutex);
302 sentSuccessfully = SendStatus::UnknownStatus;
303 StartWrite(&msg);
304 sentSuccessfullyCV.wait(lock, [&]() {
305 return shutdown || sentSuccessfully != SendStatus::UnknownStatus;
306 });
307 bool ret = sentSuccessfully == SendStatus::Success;
308 lock.unlock();
309 return ret;
310 });
311 }
312 Finish(Status::OK);
313}
314
315/// When a client sends a message to a read port (write port on this end), start
316/// streaming messages until the client calls uncle and requests a cancellation.
317ServerWriteReactor<esi::cosim::Message> *
318Impl::ConnectToClientChannel(CallbackServerContext *context,
319 const ChannelDesc *request) {
320 printf("connect to client channel\n");
321 auto it = writePorts.find(request->name());
322 if (it == writePorts.end()) {
323 auto reactor = new RpcServerWriteReactor(nullptr);
324 reactor->Finish(Status(StatusCode::NOT_FOUND, "Unknown channel"));
325 return reactor;
326 }
327 return new RpcServerWriteReactor(it->second.get());
328}
329
330/// When a client sends a message to a write port (a read port on this end),
331/// simply locate the associated port, and write that message into its queue.
332ServerUnaryReactor *
333Impl::SendToServer(CallbackServerContext *context,
334 const esi::cosim::AddressedMessage *request,
335 esi::cosim::VoidMessage *response) {
336 auto reactor = context->DefaultReactor();
337 auto it = readPorts.find(request->channel_name());
338 if (it == readPorts.end()) {
339 reactor->Finish(Status(StatusCode::NOT_FOUND, "Unknown channel"));
340 return reactor;
341 }
342
343 std::string msgDataString = request->message().data();
344 MessageData data(reinterpret_cast<const uint8_t *>(msgDataString.data()),
345 msgDataString.size());
346 it->second->push(data);
347 reactor->Finish(Status::OK);
348 return reactor;
349}
350
351//===----------------------------------------------------------------------===//
352// RpcServer pass throughs to the actual implementations above.
353//===----------------------------------------------------------------------===//
355 if (impl)
356 delete impl;
357}
358void RpcServer::setManifest(int esiVersion,
359 const std::vector<uint8_t> &compressedManifest) {
360 impl->setManifest(esiVersion, compressedManifest);
361}
363 const std::string &type) {
364 return impl->registerReadPort(name, type);
365}
367 const std::string &type) {
368 return impl->registerWritePort(name, type);
369}
370void RpcServer::run(int port) { impl = new Impl(port); }
372 assert(impl && "Server not running");
373 impl->stop();
374}
assert(baseType &&"element must be base type")
static void writePort(uint16_t port)
Write the port number to a file.
Definition RpcServer.cpp:37
Class to parse a manifest.
Definition Manifest.h:39
A logical chunk of data representing serialized data.
Definition Common.h:103
A ChannelPort which reads data from the accelerator.
Definition Ports.h:124
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Definition Ports.h:185
Root class of the ESI type system.
Definition Types.h:27
A ChannelPort which sends data to the accelerator.
Definition Ports.h:77
virtual void write(const MessageData &)=0
A very basic blocking write API.
virtual bool tryWrite(const MessageData &data)=0
A basic non-blocking write API.
ServerUnaryReactor * ListChannels(CallbackServerContext *, const VoidMessage *, ListOfChannels *channelsOut) override
Load the list of channels into the response and fire it off.
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
std::unique_ptr< Server > server
Definition RpcServer.cpp:94
ServerWriteReactor< esi::cosim::Message > * ConnectToClientChannel(CallbackServerContext *context, const ChannelDesc *request) override
When a client sends a message to a read port (write port on this end), start streaming messages until...
std::vector< uint8_t > compressedManifest
Definition RpcServer.cpp:90
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Definition RpcServer.cpp:59
Impl(int port)
Start a server on the given port. -1 means to let the OS pick a port.
ServerUnaryReactor * GetManifest(CallbackServerContext *context, const VoidMessage *, Manifest *response) override
std::map< std::string, std::unique_ptr< RpcServerWritePort > > writePorts
Definition RpcServer.cpp:92
ServerUnaryReactor * SendToServer(CallbackServerContext *context, const esi::cosim::AddressedMessage *request, esi::cosim::VoidMessage *response) override
When a client sends a message to a write port (a read port on this end), simply locate the associated...
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
std::map< std::string, std::unique_ptr< RpcServerReadPort > > readPorts
Definition RpcServer.cpp:91
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
Register a read or write port which communicates over RPC.
void run(int port)
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Set the manifest and version.
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
Thread safe queue.
Definition Utils.h:39
Definition esi.py:1