13#include "cosim.grpc.pb.h"
16#include <grpcpp/security/server_credentials.h>
17#include <grpcpp/server.h>
18#include <grpcpp/server_builder.h>
19#include <grpcpp/server_context.h>
29using grpc::CallbackServerContext;
31using grpc::ServerUnaryReactor;
32using grpc::ServerWriteReactor;
34using grpc::StatusCode;
41 FILE *fd = fopen(
"cosim.cfg",
"w");
42 fprintf(fd,
"port: %u\n",
static_cast<unsigned int>(port));
47class RpcServerReadPort;
48class RpcServerWritePort;
70 const std::string &type);
72 const std::string &type);
74 void stop(uint32_t timeoutMS = 0);
85 ServerUnaryReactor *
ListChannels(CallbackServerContext *,
const VoidMessage *,
86 ListOfChannels *channelsOut)
override;
87 ServerWriteReactor<esi::cosim::Message> *
89 const ChannelDesc *request)
override;
91 const esi::cosim::AddressedMessage *request,
92 esi::cosim::VoidMessage *response)
override;
98 std::map<std::string, std::unique_ptr<RpcServerReadPort>>
readPorts;
99 std::map<std::string, std::unique_ptr<RpcServerWritePort>>
writePorts;
121 std::this_thread::sleep_for(std::chrono::milliseconds(1));
139 writeQueue.
push(data);
151 grpc::ServerBuilder builder;
152 std::string server_address(
"127.0.0.1:" + std::to_string(
port));
155 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(),
157 builder.RegisterService(
this);
158 server = builder.BuildAndStart();
160 throw std::runtime_error(
"Failed to start server on " + server_address);
164 std::to_string(
port));
176 server->Shutdown(gpr_time_add(
177 gpr_now(GPR_CLOCK_REALTIME),
178 gpr_time_from_millis(
static_cast<int>(timeoutMS), GPR_TIMESPAN)));
192 const std::string &type) {
193 auto port =
new RpcServerReadPort(
new Type(type));
198 const std::string &type) {
199 auto port =
new RpcServerWritePort(
new Type(type));
205 const VoidMessage *,
Manifest *response) {
209 ServerUnaryReactor *reactor =
context->DefaultReactor();
210 reactor->Finish(Status::OK);
217 ListOfChannels *channelsOut) {
219 auto *channel = channelsOut->add_channels();
220 channel->set_name(name);
221 channel->set_type(
port->getType()->getID());
222 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER);
225 auto *channel = channelsOut->add_channels();
226 channel->set_name(name);
227 channel->set_type(
port->getType()->getID());
228 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT);
233 auto reactor =
context->DefaultReactor();
234 reactor->Finish(Status::OK);
242class RpcServerWriteReactor :
public ServerWriteReactor<esi::cosim::Message> {
244 RpcServerWriteReactor(RpcServerWritePort *
writePort)
246 shutdown(false), onDoneCalled(false) {
247 myThread = std::thread(&RpcServerWriteReactor::threadLoop,
this);
254 void OnDone()
override {
257 std::scoped_lock<std::mutex> lock(sentMutex);
261 sentSuccessfullyCV.notify_one();
262 onDoneCV.notify_one();
265 if (myThread.joinable())
271 void OnWriteDone(
bool ok)
override {
272 std::scoped_lock<std::mutex> lock(sentMutex);
273 sentSuccessfully = ok ? SendStatus::Success : SendStatus::Failure;
274 sentSuccessfullyCV.notify_one();
277 void OnCancel()
override {
278 std::scoped_lock<std::mutex> lock(sentMutex);
280 sentSuccessfully = SendStatus::Disconnect;
281 sentSuccessfullyCV.notify_one();
288 std::thread myThread;
294 std::mutex sentMutex;
295 enum SendStatus { UnknownStatus, Success, Failure, Disconnect };
296 volatile SendStatus sentSuccessfully;
297 std::condition_variable sentSuccessfullyCV;
299 std::atomic<bool> shutdown;
303 std::condition_variable onDoneCV;
308void RpcServerWriteReactor::threadLoop() {
309 while (!shutdown && sentSuccessfully != SendStatus::Disconnect) {
312 std::this_thread::sleep_for(std::chrono::microseconds(100));
325 esi::cosim::Message msg;
326 msg.set_data(
reinterpret_cast<const char *
>(
data.getBytes()),
331 std::unique_lock<std::mutex> lock(sentMutex);
332 sentSuccessfully = SendStatus::UnknownStatus;
334 sentSuccessfullyCV.wait(lock, [&]() {
335 return shutdown || sentSuccessfully != SendStatus::UnknownStatus;
337 bool ret = sentSuccessfully == SendStatus::Success;
349ServerWriteReactor<esi::cosim::Message> *
351 const ChannelDesc *request) {
355 auto reactor =
new RpcServerWriteReactor(
nullptr);
356 reactor->Finish(Status(StatusCode::NOT_FOUND,
"Unknown channel"));
359 return new RpcServerWriteReactor(it->second.get());
366 const esi::cosim::AddressedMessage *request,
367 esi::cosim::VoidMessage *response) {
368 auto reactor =
context->DefaultReactor();
369 auto it =
readPorts.find(request->channel_name());
371 reactor->Finish(Status(StatusCode::NOT_FOUND,
"Unknown channel"));
375 std::string msgDataString = request->message().data();
376 MessageData data(
reinterpret_cast<const uint8_t *
>(msgDataString.data()),
377 msgDataString.size());
381 std::format(
"Channel '{}': Received message; pushing data to read port",
382 request->channel_name()));
383 it->second->push(data);
384 }
catch (
const std::exception &e) {
387 std::format(
"Channel '{}': Error pushing message to read port: {}",
388 request->channel_name(), e.what()));
390 Status(StatusCode::INTERNAL,
"Error pushing message to port"));
394 reactor->Finish(Status::OK);
405 const std::vector<uint8_t> &compressedManifest) {
407 throw std::runtime_error(
"Server not running");
409 impl->setManifest(esiVersion, compressedManifest);
413 const std::string &type) {
415 throw std::runtime_error(
"Server not running");
416 return impl->registerReadPort(name, type);
420 const std::string &type) {
421 return impl->registerWritePort(name, type);
425 throw std::runtime_error(
"Server already running");
426 impl = std::make_unique<Impl>(
ctxt, port);
430 throw std::runtime_error(
"Server not running");
431 impl->stop(timeoutMS);
436 throw std::runtime_error(
"Server not running");
437 return impl->getPort();
static std::unique_ptr< Context > context
static void writePort(uint16_t port)
Write the port number to a file.
AcceleratorConnections, Accelerators, and Manifests must all share a context.
virtual void error(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report an error.
virtual void info(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report an informational message.
void debug(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report a debug message.
Class to parse a manifest.
A logical chunk of data representing serialized data.
A ChannelPort which reads data from the accelerator.
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Root class of the ESI type system.
A ChannelPort which sends data to the accelerator.
virtual bool tryWriteImpl(const MessageData &data)=0
Implementation for tryWrite(). Subclasses must implement this.
virtual void writeImpl(const MessageData &)=0
Implementation for write(). Subclasses must implement this.
ServerUnaryReactor * ListChannels(CallbackServerContext *, const VoidMessage *, ListOfChannels *channelsOut) override
Load the list of channels into the response and fire it off.
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
std::unique_ptr< Server > server
ServerWriteReactor< esi::cosim::Message > * ConnectToClientChannel(CallbackServerContext *context, const ChannelDesc *request) override
When a client sends a message to a read port (write port on this end), start streaming messages until...
std::vector< uint8_t > compressedManifest
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Impl(Context &ctxt, int port)
Start a server on the given port. -1 means to let the OS pick a port.
ServerUnaryReactor * GetManifest(CallbackServerContext *context, const VoidMessage *, Manifest *response) override
std::map< std::string, std::unique_ptr< RpcServerWritePort > > writePorts
void stop(uint32_t timeoutMS=0)
ServerUnaryReactor * SendToServer(CallbackServerContext *context, const esi::cosim::AddressedMessage *request, esi::cosim::VoidMessage *response) override
When a client sends a message to a write port (a read port on this end), simply locate the associated...
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
std::map< std::string, std::unique_ptr< RpcServerReadPort > > readPorts
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
Register a read or write port which communicates over RPC.
void stop(uint32_t timeoutMS=0)
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Set the manifest and version.
std::unique_ptr< Impl > impl
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
void push(E... t)
Push onto the queue.