12 #include "cosim.grpc.pb.h"
14 #include <grpc/grpc.h>
15 #include <grpcpp/security/server_credentials.h>
16 #include <grpcpp/server.h>
17 #include <grpcpp/server_builder.h>
18 #include <grpcpp/server_context.h>
27 using grpc::CallbackServerContext;
29 using grpc::ServerUnaryReactor;
30 using grpc::ServerWriteReactor;
32 using grpc::StatusCode;
39 FILE *fd = fopen(
"cosim.cfg",
"w");
40 fprintf(fd,
"port: %u\n",
static_cast<unsigned int>(port));
45 class RpcServerReadPort;
46 class RpcServerWritePort;
66 const std::string &type);
68 const std::string &type);
76 ServerUnaryReactor *
GetManifest(CallbackServerContext *context,
79 ServerUnaryReactor *
ListChannels(CallbackServerContext *,
const VoidMessage *,
80 ListOfChannels *channelsOut)
override;
81 ServerWriteReactor<esi::cosim::Message> *
83 const ChannelDesc *request)
override;
84 ServerUnaryReactor *
SendToServer(CallbackServerContext *context,
85 const esi::cosim::AddressedMessage *request,
86 esi::cosim::VoidMessage *response)
override;
91 std::map<std::string, std::unique_ptr<RpcServerReadPort>>
readPorts;
92 std::map<std::string, std::unique_ptr<RpcServerWritePort>>
writePorts;
113 while (!callback(data))
114 std::this_thread::sleep_for(std::chrono::milliseconds(1));
126 void write(
const MessageData &data)
override { writeQueue.push(data); }
128 writeQueue.push(data);
142 grpc::ServerBuilder builder;
143 std::string server_address(
"127.0.0.1:" + std::to_string(port));
146 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(),
148 builder.RegisterService(
this);
149 server = builder.BuildAndStart();
151 throw std::runtime_error(
"Failed to start server on " + server_address);
153 std::cout <<
"Server listening on 127.0.0.1:" << port << std::endl;
175 const std::string &type) {
176 auto port =
new RpcServerReadPort(
new Type(type));
182 const std::string &type) {
183 auto port =
new RpcServerWritePort(
new Type(type));
190 const VoidMessage *,
Manifest *response) {
194 ServerUnaryReactor *reactor = context->DefaultReactor();
195 reactor->Finish(Status::OK);
202 ListOfChannels *channelsOut) {
204 auto *channel = channelsOut->add_channels();
205 channel->set_name(name);
206 channel->set_type(port->getType()->getID());
207 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER);
210 auto *channel = channelsOut->add_channels();
211 channel->set_name(name);
212 channel->set_type(port->getType()->getID());
213 channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT);
218 auto reactor = context->DefaultReactor();
219 reactor->Finish(Status::OK);
227 class RpcServerWriteReactor :
public ServerWriteReactor<esi::cosim::Message> {
229 RpcServerWriteReactor(RpcServerWritePort *
writePort)
232 myThread = std::thread(&RpcServerWriteReactor::threadLoop,
this);
234 ~RpcServerWriteReactor() {
237 sentSuccessfullyCV.notify_one();
251 void OnDone()
override {
delete this; }
252 void OnWriteDone(
bool ok)
override {
253 std::scoped_lock<std::mutex> lock(sentMutex);
254 sentSuccessfully = ok ? SendStatus::Success : SendStatus::Failure;
255 sentSuccessfullyCV.notify_one();
257 void OnCancel()
override {
258 std::scoped_lock<std::mutex> lock(sentMutex);
259 sentSuccessfully = SendStatus::Disconnect;
260 sentSuccessfullyCV.notify_one();
267 std::thread myThread;
273 std::mutex sentMutex;
274 enum SendStatus { UnknownStatus, Success, Failure, Disconnect };
275 volatile SendStatus sentSuccessfully;
276 std::condition_variable sentSuccessfullyCV;
278 std::atomic<bool> shutdown;
283 void RpcServerWriteReactor::threadLoop() {
284 while (!shutdown && sentSuccessfully != SendStatus::Disconnect) {
287 std::this_thread::sleep_for(std::chrono::microseconds(100));
295 esi::cosim::Message msg;
296 msg.set_data(
reinterpret_cast<const char *
>(
data.getBytes()),
301 std::unique_lock<std::mutex> lock(sentMutex);
302 sentSuccessfully = SendStatus::UnknownStatus;
304 sentSuccessfullyCV.wait(lock, [&]() {
305 return shutdown || sentSuccessfully != SendStatus::UnknownStatus;
307 bool ret = sentSuccessfully == SendStatus::Success;
317 ServerWriteReactor<esi::cosim::Message> *
319 const ChannelDesc *request) {
320 printf(
"connect to client channel\n");
323 auto reactor =
new RpcServerWriteReactor(
nullptr);
324 reactor->Finish(Status(StatusCode::NOT_FOUND,
"Unknown channel"));
327 return new RpcServerWriteReactor(it->second.get());
334 const esi::cosim::AddressedMessage *request,
335 esi::cosim::VoidMessage *response) {
336 auto reactor = context->DefaultReactor();
337 auto it =
readPorts.find(request->channel_name());
339 reactor->Finish(Status(StatusCode::NOT_FOUND,
"Unknown channel"));
343 std::string msgDataString = request->message().data();
344 MessageData data(
reinterpret_cast<const uint8_t *
>(msgDataString.data()),
345 msgDataString.size());
346 it->second->push(data);
347 reactor->Finish(Status::OK);
359 const std::vector<uint8_t> &compressedManifest) {
363 const std::string &type) {
367 const std::string &type) {
assert(baseType &&"element must be base type")
static void writePort(uint16_t port)
Write the port number to a file.
esi::cosim::RpcServer::Impl Impl
Class to parse a manifest.
A logical chunk of data representing serialized data.
A ChannelPort which reads data from the accelerator.
Root class of the ESI type system.
A ChannelPort which sends data to the accelerator.
ServerUnaryReactor * ListChannels(CallbackServerContext *, const VoidMessage *, ListOfChannels *channelsOut) override
Load the list of channels into the response and fire it off.
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
std::unique_ptr< Server > server
ServerWriteReactor< esi::cosim::Message > * ConnectToClientChannel(CallbackServerContext *context, const ChannelDesc *request) override
When a client sends a message to a read port (write port on this end), start streaming messages until...
std::vector< uint8_t > compressedManifest
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Impl(int port)
Start a server on the given port. -1 means to let the OS pick a port.
ServerUnaryReactor * GetManifest(CallbackServerContext *context, const VoidMessage *, Manifest *response) override
std::map< std::string, std::unique_ptr< RpcServerWritePort > > writePorts
ServerUnaryReactor * SendToServer(CallbackServerContext *context, const esi::cosim::AddressedMessage *request, esi::cosim::VoidMessage *response) override
When a client sends a message to a write port (a read port on this end), simply locate the associated...
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)
std::map< std::string, std::unique_ptr< RpcServerReadPort > > readPorts
ReadChannelPort & registerReadPort(const std::string &name, const std::string &type)
Register a read or write port which communicates over RPC.
void setManifest(int esiVersion, const std::vector< uint8_t > &compressedManifest)
Set the manifest and version.
WriteChannelPort & registerWritePort(const std::string &name, const std::string &type)