13#include "cosim.grpc.pb.h"
16#include <grpcpp/security/server_credentials.h>
17#include <grpcpp/server.h>
18#include <grpcpp/server_builder.h>
19#include <grpcpp/server_context.h>
28using grpc::CallbackServerContext;
30using grpc::ServerUnaryReactor;
31using grpc::ServerWriteReactor;
33using grpc::StatusCode;
40 FILE *fd = fopen(
"cosim.cfg",
"w");
41 fprintf(fd,
"port: %u\n",
static_cast<unsigned int>(port));
46class RpcServerReadPort;
47class RpcServerWritePort;
69 const std::string &type);
71 const std::string &type);
84 ServerUnaryReactor *
ListChannels(CallbackServerContext *,
const VoidMessage *,
85 ListOfChannels *channelsOut)
override;
86 ServerWriteReactor<esi::cosim::Message> *
88 const ChannelDesc *request)
override;
90 const esi::cosim::AddressedMessage *request,
91 esi::cosim::VoidMessage *response)
override;
97 std::map<std::string, std::unique_ptr<RpcServerReadPort>>
readPorts;
98 std::map<std::string, std::unique_ptr<RpcServerWritePort>>
writePorts;
120 std::this_thread::sleep_for(std::chrono::milliseconds(1));
138 writeQueue.
push(data);
150 grpc::ServerBuilder builder;
151 std::string server_address(
"127.0.0.1:" + std::to_string(
port));
154 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(),
156 builder.RegisterService(
this);
157 server = builder.BuildAndStart();
159 throw std::runtime_error(
"Failed to start server on " + server_address);
163 std::to_string(
port));
185 const std::string &type) {
186 auto port =
new RpcServerReadPort(
new Type(type));
191 const std::string &type) {
192 auto port =
new RpcServerWritePort(
new Type(type));
198 const VoidMessage *,
Manifest *response) {
202 ServerUnaryReactor *reactor =
context->DefaultReactor();
203 reactor->Finish(Status::OK);
210 ListOfChannels *channelsOut) {
212 auto *channel = channelsOut->add_channels();
213 channel->set_name(name);
214 channel->set_type(
port->getType()->getID());
215 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER);
218 auto *channel = channelsOut->add_channels();
219 channel->set_name(name);
220 channel->set_type(
port->getType()->getID());
221 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT);
226 auto reactor =
context->DefaultReactor();
227 reactor->Finish(Status::OK);
235class RpcServerWriteReactor :
public ServerWriteReactor<esi::cosim::Message> {
237 RpcServerWriteReactor(RpcServerWritePort *
writePort)
240 myThread = std::thread(&RpcServerWriteReactor::threadLoop,
this);
242 ~RpcServerWriteReactor() {
245 sentSuccessfullyCV.notify_one();
259 void OnDone()
override {
delete this; }
260 void OnWriteDone(
bool ok)
override {
261 std::scoped_lock<std::mutex> lock(sentMutex);
262 sentSuccessfully = ok ? SendStatus::Success : SendStatus::Failure;
263 sentSuccessfullyCV.notify_one();
265 void OnCancel()
override {
266 std::scoped_lock<std::mutex> lock(sentMutex);
267 sentSuccessfully = SendStatus::Disconnect;
268 sentSuccessfullyCV.notify_one();
275 std::thread myThread;
281 std::mutex sentMutex;
282 enum SendStatus { UnknownStatus, Success, Failure, Disconnect };
283 volatile SendStatus sentSuccessfully;
284 std::condition_variable sentSuccessfullyCV;
286 std::atomic<bool> shutdown;
291void RpcServerWriteReactor::threadLoop() {
292 while (!shutdown && sentSuccessfully != SendStatus::Disconnect) {
295 std::this_thread::sleep_for(std::chrono::microseconds(100));
303 esi::cosim::Message msg;
304 msg.set_data(
reinterpret_cast<const char *
>(
data.getBytes()),
309 std::unique_lock<std::mutex> lock(sentMutex);
310 sentSuccessfully = SendStatus::UnknownStatus;
312 sentSuccessfullyCV.wait(lock, [&]() {
313 return shutdown || sentSuccessfully != SendStatus::UnknownStatus;
315 bool ret = sentSuccessfully == SendStatus::Success;
325ServerWriteReactor<esi::cosim::Message> *
327 const ChannelDesc *request) {
331 auto reactor =
new RpcServerWriteReactor(
nullptr);
332 reactor->Finish(Status(StatusCode::NOT_FOUND,
"Unknown channel"));
335 return new RpcServerWriteReactor(it->second.get());
342 const esi::cosim::AddressedMessage *request,
343 esi::cosim::VoidMessage *response) {
344 auto reactor =
context->DefaultReactor();
345 auto it =
readPorts.find(request->channel_name());
347 reactor->Finish(Status(StatusCode::NOT_FOUND,
"Unknown channel"));
351 std::string msgDataString = request->message().data();
352 MessageData data(
reinterpret_cast<const uint8_t *
>(msgDataString.data()),
353 msgDataString.size());
354 it->second->push(data);
355 reactor->Finish(Status::OK);
366 const std::vector<uint8_t> &compressedManifest) {
368 throw std::runtime_error(
"Server not running");
370 impl->setManifest(esiVersion, compressedManifest);
374 const std::string &type) {
376 throw std::runtime_error(
"Server not running");
377 return impl->registerReadPort(name, type);
381 const std::string &type) {
382 return impl->registerWritePort(name, type);
386 throw std::runtime_error(
"Server already running");
387 impl = std::make_unique<Impl>(
ctxt, port);
391 throw std::runtime_error(
"Server not running");
397 throw std::runtime_error(
"Server not running");
398 return impl->getPort();
static std::unique_ptr< Context > context
static void writePort(uint16_t port)
Write the port number to a file.
AcceleratorConnections, Accelerators, and Manifests must all share a context.
virtual void info(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report an informational message.
void debug(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report a debug message.
Class to parse a manifest.
A logical chunk of data representing serialized data.
A ChannelPort which reads data from the accelerator.
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Root class of the ESI type system.
A ChannelPort which sends data to the accelerator.
virtual bool tryWriteImpl(const MessageData &data)=0
Implementation for tryWrite(). Subclasses must implement this.
virtual void writeImpl(const MessageData &)=0
Implementation for write(). Subclasses must implement this.
ServerUnaryReactor * ListChannels(CallbackServerContext *, const VoidMessage *, ListOfChannels *channelsOut) override
Load the list of channels into the response and fire it off.
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
std::unique_ptr< Server > server
ServerWriteReactor< esi::cosim::Message > * ConnectToClientChannel(CallbackServerContext *context, const ChannelDesc *request) override
When a client sends a message to a read port (write port on this end), start streaming messages until...
std::vector< uint8_t > compressedManifest
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Impl(Context &ctxt, int port)
Start a server on the given port. -1 means to let the OS pick a port.
ServerUnaryReactor * GetManifest(CallbackServerContext *context, const VoidMessage *, Manifest *response) override
std::map< std::string, std::unique_ptr< RpcServerWritePort > > writePorts
ServerUnaryReactor * SendToServer(CallbackServerContext *context, const esi::cosim::AddressedMessage *request, esi::cosim::VoidMessage *response) override
When a client sends a message to a write port (a read port on this end), simply locate the associated...
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
std::map< std::string, std::unique_ptr< RpcServerReadPort > > readPorts
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
Register a read or write port which communicates over RPC.
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Set the manifest and version.
std::unique_ptr< Impl > impl
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
void push(E... t)
Push onto the queue.