19#include "cosim.grpc.pb.h"
22#include <grpcpp/channel.h>
23#include <grpcpp/client_context.h>
24#include <grpcpp/create_channel.h>
25#include <grpcpp/security/credentials.h>
27#include <condition_variable>
34using grpc::ClientContext;
39 throw std::runtime_error(msg +
". Code " + to_string(s.error_code()) +
40 ": " + s.error_message() +
" (" +
41 s.error_details() +
")");
49class ReadChannelConnectionImpl
51 public grpc::ClientReadReactor<::esi::cosim::Message> {
53 ReadChannelConnectionImpl(::esi::cosim::ChannelServer::Stub *stub,
54 const ::esi::cosim::ChannelDesc &desc,
56 : stub(stub), grpcDesc(desc), callback(std::move(callback)),
57 context(nullptr), done(false) {}
59 ~ReadChannelConnectionImpl()
override {
disconnect(); }
62 context =
new ClientContext();
63 stub->async()->ConnectToClientChannel(context, &grpcDesc,
this);
65 StartRead(&incomingMessage);
68 void OnReadDone(
bool ok)
override {
74 const std::string &messageString = incomingMessage.data();
75 MessageData data(
reinterpret_cast<const uint8_t *
>(messageString.data()),
76 messageString.size());
78 while (!callback(data))
81 std::this_thread::sleep_for(std::chrono::milliseconds(10));
84 StartRead(&incomingMessage);
88 void OnDone(
const grpc::Status & )
override {
89 std::lock_guard<std::mutex> lock(doneMutex);
102 std::unique_lock<std::mutex> lock(doneMutex);
103 doneCV.wait(lock, [
this]() {
return done; });
111 ::esi::cosim::ChannelServer::Stub *stub;
112 ::esi::cosim::ChannelDesc grpcDesc;
115 ::esi::cosim::Message incomingMessage;
118 std::mutex doneMutex;
119 std::condition_variable doneCV;
130 Impl(
const std::string &hostname, uint16_t port) {
131 auto channel = grpc::CreateChannel(hostname +
":" + std::to_string(port),
132 grpc::InsecureChannelCredentials());
133 stub = ::esi::cosim::ChannelServer::NewStub(channel);
136 ::esi::cosim::ChannelServer::Stub *
getStub()
const {
return stub.get(); }
139 ::esi::cosim::Manifest response;
144 ::esi::cosim::VoidMessage arg;
145 Status s =
stub->GetManifest(&
context, arg, &response);
147 std::this_thread::sleep_for(std::chrono::milliseconds(10));
148 }
while (response.esi_version() < 0);
153 ::esi::cosim::ChannelDesc &desc)
const {
155 ::esi::cosim::VoidMessage arg;
156 ::esi::cosim::ListOfChannels response;
157 Status s =
stub->ListChannels(&
context, arg, &response);
159 for (
const auto &channel : response.channels())
160 if (channel.name() == channelName) {
169 ::esi::cosim::VoidMessage arg;
170 ::esi::cosim::ListOfChannels response;
171 Status s =
stub->ListChannels(&
context, arg, &response);
174 std::vector<RpcClient::ChannelDesc> result;
175 result.reserve(response.channels_size());
176 for (
const auto &grpcDesc : response.channels()) {
178 desc.
name = grpcDesc.name();
179 desc.
type = grpcDesc.type();
180 if (grpcDesc.dir() ==
181 ::esi::cosim::ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER)
185 result.push_back(std::move(desc));
192 ::esi::cosim::AddressedMessage grpcMsg;
193 grpcMsg.set_channel_name(channelName);
194 grpcMsg.mutable_message()->set_data(data.getBytes(), data.getSize());
195 ::esi::cosim::VoidMessage response;
196 grpc::Status sendStatus =
stub->SendToServer(&
context, grpcMsg, &response);
197 if (!sendStatus.ok())
198 throw std::runtime_error(
"Failed to write to channel '" + channelName +
199 "': " + std::to_string(sendStatus.error_code()) +
200 " " + sendStatus.error_message() +
201 ". Details: " + sendStatus.error_details());
204 std::unique_ptr<RpcClient::ReadChannelConnection>
207 ::esi::cosim::ChannelDesc grpcDesc;
209 throw std::runtime_error(
"Could not find channel '" + channelName +
"'");
211 auto connection = std::make_unique<ReadChannelConnectionImpl>(
212 stub.get(), grpcDesc, std::move(callback));
218 std::unique_ptr<::esi::cosim::ChannelServer::Stub>
stub;
226 : impl(std::make_unique<
Impl>(hostname, port)) {}
231 return impl->getManifest().esi_version();
235 ::esi::cosim::Manifest response =
impl->getManifest();
236 std::string compressedManifestStr = response.compressed_manifest();
237 return std::vector<uint8_t>(compressedManifestStr.begin(),
238 compressedManifestStr.end());
243 ::esi::cosim::ChannelDesc grpcDesc;
244 if (!
impl->getChannelDesc(channelName, grpcDesc))
247 desc.
name = grpcDesc.name();
248 desc.
type = grpcDesc.type();
249 if (grpcDesc.dir() ==
250 ::esi::cosim::ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER)
258 return impl->listChannels();
263 impl->writeToServer(channelName, data);
266std::unique_ptr<RpcClient::ReadChannelConnection>
269 return impl->connectClientReceiver(channelName, std::move(callback));
static std::unique_ptr< Context > context
static void checkStatus(Status s, const std::string &msg)
std::unique_ptr<::esi::cosim::ChannelServer::Stub > stub
Impl(const std::string &hostname, uint16_t port)
void writeToServer(const std::string &channelName, const MessageData &data)
::esi::cosim::Manifest getManifest() const
std::vector< RpcClient::ChannelDesc > listChannels() const
bool getChannelDesc(const std::string &channelName, ::esi::cosim::ChannelDesc &desc) const
::esi::cosim::ChannelServer::Stub * getStub() const
std::unique_ptr< RpcClient::ReadChannelConnection > connectClientReceiver(const std::string &channelName, RpcClient::ReadCallback callback)
A logical chunk of data representing serialized data.
Abstract handle for a read channel connection.
virtual void disconnect()=0
std::unique_ptr< ReadChannelConnection > connectClientReceiver(const std::string &channelName, ReadCallback callback)
Connect to a client-bound channel and receive messages via callback.
RpcClient(const std::string &hostname, uint16_t port)
std::vector< uint8_t > getCompressedManifest() const
Get the compressed manifest from the server.
uint32_t getEsiVersion() const
Get the ESI version from the manifest.
void writeToServer(const std::string &channelName, const MessageData &data)
Send a message to a server-bound channel.
bool getChannelDesc(const std::string &channelName, ChannelDesc &desc) const
Get the channel description for a channel name.
std::function< bool(const MessageData &)> ReadCallback
Callback type for receiving messages from a client-bound channel.
std::vector< ChannelDesc > listChannels() const
List all channels available on the server.
std::unique_ptr< Impl > impl
Description of a channel from the server.