13#include "cosim.grpc.pb.h"
16#include <grpcpp/security/server_credentials.h>
17#include <grpcpp/server.h>
18#include <grpcpp/server_builder.h>
19#include <grpcpp/server_context.h>
28using grpc::CallbackServerContext;
30using grpc::ServerUnaryReactor;
31using grpc::ServerWriteReactor;
33using grpc::StatusCode;
40 FILE *fd = fopen(
"cosim.cfg",
"w");
41 fprintf(fd,
"port: %u\n",
static_cast<unsigned int>(port));
46class RpcServerReadPort;
47class RpcServerWritePort;
69 const std::string &type);
71 const std::string &type);
73 void stop(uint32_t timeoutMS = 0);
84 ServerUnaryReactor *
ListChannels(CallbackServerContext *,
const VoidMessage *,
85 ListOfChannels *channelsOut)
override;
86 ServerWriteReactor<esi::cosim::Message> *
88 const ChannelDesc *request)
override;
90 const esi::cosim::AddressedMessage *request,
91 esi::cosim::VoidMessage *response)
override;
97 std::map<std::string, std::unique_ptr<RpcServerReadPort>>
readPorts;
98 std::map<std::string, std::unique_ptr<RpcServerWritePort>>
writePorts;
120 std::this_thread::sleep_for(std::chrono::milliseconds(1));
138 writeQueue.
push(data);
150 grpc::ServerBuilder builder;
151 std::string server_address(
"127.0.0.1:" + std::to_string(
port));
154 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(),
156 builder.RegisterService(
this);
157 server = builder.BuildAndStart();
159 throw std::runtime_error(
"Failed to start server on " + server_address);
163 std::to_string(
port));
175 server->Shutdown(gpr_time_add(
176 gpr_now(GPR_CLOCK_REALTIME),
177 gpr_time_from_millis(
static_cast<int>(timeoutMS), GPR_TIMESPAN)));
191 const std::string &type) {
192 auto port =
new RpcServerReadPort(
new Type(type));
197 const std::string &type) {
198 auto port =
new RpcServerWritePort(
new Type(type));
204 const VoidMessage *,
Manifest *response) {
208 ServerUnaryReactor *reactor =
context->DefaultReactor();
209 reactor->Finish(Status::OK);
216 ListOfChannels *channelsOut) {
218 auto *channel = channelsOut->add_channels();
219 channel->set_name(name);
220 channel->set_type(
port->getType()->getID());
221 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER);
224 auto *channel = channelsOut->add_channels();
225 channel->set_name(name);
226 channel->set_type(
port->getType()->getID());
227 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT);
232 auto reactor =
context->DefaultReactor();
233 reactor->Finish(Status::OK);
241class RpcServerWriteReactor :
public ServerWriteReactor<esi::cosim::Message> {
243 RpcServerWriteReactor(RpcServerWritePort *
writePort)
245 shutdown(false), onDoneCalled(false) {
246 myThread = std::thread(&RpcServerWriteReactor::threadLoop,
this);
253 void OnDone()
override {
256 std::scoped_lock<std::mutex> lock(sentMutex);
260 sentSuccessfullyCV.notify_one();
261 onDoneCV.notify_one();
264 if (myThread.joinable())
270 void OnWriteDone(
bool ok)
override {
271 std::scoped_lock<std::mutex> lock(sentMutex);
272 sentSuccessfully = ok ? SendStatus::Success : SendStatus::Failure;
273 sentSuccessfullyCV.notify_one();
276 void OnCancel()
override {
277 std::scoped_lock<std::mutex> lock(sentMutex);
279 sentSuccessfully = SendStatus::Disconnect;
280 sentSuccessfullyCV.notify_one();
287 std::thread myThread;
293 std::mutex sentMutex;
294 enum SendStatus { UnknownStatus, Success, Failure, Disconnect };
295 volatile SendStatus sentSuccessfully;
296 std::condition_variable sentSuccessfullyCV;
298 std::atomic<bool> shutdown;
302 std::condition_variable onDoneCV;
307void RpcServerWriteReactor::threadLoop() {
308 while (!shutdown && sentSuccessfully != SendStatus::Disconnect) {
311 std::this_thread::sleep_for(std::chrono::microseconds(100));
324 esi::cosim::Message msg;
325 msg.set_data(
reinterpret_cast<const char *
>(
data.getBytes()),
330 std::unique_lock<std::mutex> lock(sentMutex);
331 sentSuccessfully = SendStatus::UnknownStatus;
333 sentSuccessfullyCV.wait(lock, [&]() {
334 return shutdown || sentSuccessfully != SendStatus::UnknownStatus;
336 bool ret = sentSuccessfully == SendStatus::Success;
348ServerWriteReactor<esi::cosim::Message> *
350 const ChannelDesc *request) {
354 auto reactor =
new RpcServerWriteReactor(
nullptr);
355 reactor->Finish(Status(StatusCode::NOT_FOUND,
"Unknown channel"));
358 return new RpcServerWriteReactor(it->second.get());
365 const esi::cosim::AddressedMessage *request,
366 esi::cosim::VoidMessage *response) {
367 auto reactor =
context->DefaultReactor();
368 auto it =
readPorts.find(request->channel_name());
370 reactor->Finish(Status(StatusCode::NOT_FOUND,
"Unknown channel"));
374 std::string msgDataString = request->message().data();
375 MessageData data(
reinterpret_cast<const uint8_t *
>(msgDataString.data()),
376 msgDataString.size());
377 it->second->push(data);
378 reactor->Finish(Status::OK);
389 const std::vector<uint8_t> &compressedManifest) {
391 throw std::runtime_error(
"Server not running");
393 impl->setManifest(esiVersion, compressedManifest);
397 const std::string &type) {
399 throw std::runtime_error(
"Server not running");
400 return impl->registerReadPort(name, type);
404 const std::string &type) {
405 return impl->registerWritePort(name, type);
409 throw std::runtime_error(
"Server already running");
410 impl = std::make_unique<Impl>(
ctxt, port);
414 throw std::runtime_error(
"Server not running");
415 impl->stop(timeoutMS);
420 throw std::runtime_error(
"Server not running");
421 return impl->getPort();
static std::unique_ptr< Context > context
static void writePort(uint16_t port)
Write the port number to a file.
AcceleratorConnections, Accelerators, and Manifests must all share a context.
virtual void info(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report an informational message.
void debug(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report a debug message.
Class to parse a manifest.
A logical chunk of data representing serialized data.
A ChannelPort which reads data from the accelerator.
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Root class of the ESI type system.
A ChannelPort which sends data to the accelerator.
virtual bool tryWriteImpl(const MessageData &data)=0
Implementation for tryWrite(). Subclasses must implement this.
virtual void writeImpl(const MessageData &)=0
Implementation for write(). Subclasses must implement this.
ServerUnaryReactor * ListChannels(CallbackServerContext *, const VoidMessage *, ListOfChannels *channelsOut) override
Load the list of channels into the response and fire it off.
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
std::unique_ptr< Server > server
ServerWriteReactor< esi::cosim::Message > * ConnectToClientChannel(CallbackServerContext *context, const ChannelDesc *request) override
When a client sends a message to a read port (write port on this end), start streaming messages until...
std::vector< uint8_t > compressedManifest
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Impl(Context &ctxt, int port)
Start a server on the given port. -1 means to let the OS pick a port.
ServerUnaryReactor * GetManifest(CallbackServerContext *context, const VoidMessage *, Manifest *response) override
std::map< std::string, std::unique_ptr< RpcServerWritePort > > writePorts
void stop(uint32_t timeoutMS=0)
ServerUnaryReactor * SendToServer(CallbackServerContext *context, const esi::cosim::AddressedMessage *request, esi::cosim::VoidMessage *response) override
When a client sends a message to a write port (a read port on this end), simply locate the associated...
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
std::map< std::string, std::unique_ptr< RpcServerReadPort > > readPorts
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
Register a read or write port which communicates over RPC.
void stop(uint32_t timeoutMS=0)
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Set the manifest and version.
std::unique_ptr< Impl > impl
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
void push(E... t)
Push onto the queue.