19#include "cosim.grpc.pb.h"
22#include <grpcpp/channel.h>
23#include <grpcpp/client_context.h>
24#include <grpcpp/create_channel.h>
25#include <grpcpp/security/credentials.h>
27#include <condition_variable>
34using grpc::ClientContext;
39 throw std::runtime_error(msg +
". Code " + to_string(s.error_code()) +
40 ": " + s.error_message() +
" (" +
41 s.error_details() +
")");
49class ReadChannelConnectionImpl
51 public grpc::ClientReadReactor<::esi::cosim::Message> {
53 ReadChannelConnectionImpl(::esi::cosim::ChannelServer::Stub *stub,
54 const ::esi::cosim::ChannelDesc &desc,
56 : stub(stub), grpcDesc(desc), callback(std::move(callback)),
57 context(nullptr), done(false) {}
59 ~ReadChannelConnectionImpl()
override {
disconnect(); }
62 context =
new ClientContext();
63 stub->async()->ConnectToClientChannel(context, &grpcDesc,
this);
65 StartRead(&incomingMessage);
69 bool isDisconnecting() {
70 std::lock_guard<std::mutex> lock(doneMutex);
79 void OnReadDone(
bool ok)
override {
85 if (isDisconnecting())
89 const std::string &messageString = incomingMessage.data();
90 MessageData data(
reinterpret_cast<const uint8_t *
>(messageString.data()),
91 messageString.size());
94 while (!callback(data)) {
95 if (isDisconnecting())
97 std::this_thread::sleep_for(std::chrono::milliseconds(10));
101 if (isDisconnecting())
105 StartRead(&incomingMessage);
109 void OnDone(
const grpc::Status & )
override {
110 std::lock_guard<std::mutex> lock(doneMutex);
117 std::lock_guard<std::mutex> lock(doneMutex);
122 disconnecting =
true;
136 context->TryCancel();
140 std::unique_lock<std::mutex> lock(doneMutex);
141 doneCV.wait_for(lock, std::chrono::milliseconds(1000),
142 [
this]() {
return done; });
149 ::esi::cosim::ChannelServer::Stub *stub;
150 ::esi::cosim::ChannelDesc grpcDesc;
153 ::esi::cosim::Message incomingMessage;
156 std::mutex doneMutex;
157 std::condition_variable doneCV;
159 bool disconnecting =
false;
169 Impl(
const std::string &hostname, uint16_t port) {
170 auto channel = grpc::CreateChannel(hostname +
":" + std::to_string(port),
171 grpc::InsecureChannelCredentials());
172 stub = ::esi::cosim::ChannelServer::NewStub(channel);
175 ::esi::cosim::ChannelServer::Stub *
getStub()
const {
return stub.get(); }
178 ::esi::cosim::Manifest response;
183 ::esi::cosim::VoidMessage arg;
184 Status s =
stub->GetManifest(&
context, arg, &response);
186 std::this_thread::sleep_for(std::chrono::milliseconds(10));
187 }
while (response.esi_version() < 0);
192 ::esi::cosim::ChannelDesc &desc)
const {
194 ::esi::cosim::VoidMessage arg;
195 ::esi::cosim::ListOfChannels response;
196 Status s =
stub->ListChannels(&
context, arg, &response);
198 for (
const auto &channel : response.channels())
199 if (channel.name() == channelName) {
208 ::esi::cosim::VoidMessage arg;
209 ::esi::cosim::ListOfChannels response;
210 Status s =
stub->ListChannels(&
context, arg, &response);
213 std::vector<RpcClient::ChannelDesc> result;
214 result.reserve(response.channels_size());
215 for (
const auto &grpcDesc : response.channels()) {
217 desc.
name = grpcDesc.name();
218 desc.
type = grpcDesc.type();
219 if (grpcDesc.dir() ==
220 ::esi::cosim::ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER)
224 result.push_back(std::move(desc));
231 ::esi::cosim::AddressedMessage grpcMsg;
232 grpcMsg.set_channel_name(channelName);
233 grpcMsg.mutable_message()->set_data(data.getBytes(), data.getSize());
234 ::esi::cosim::VoidMessage response;
235 grpc::Status sendStatus =
stub->SendToServer(&
context, grpcMsg, &response);
236 if (!sendStatus.ok())
237 throw std::runtime_error(
"Failed to write to channel '" + channelName +
238 "': " + std::to_string(sendStatus.error_code()) +
239 " " + sendStatus.error_message() +
240 ". Details: " + sendStatus.error_details());
243 std::unique_ptr<RpcClient::ReadChannelConnection>
246 ::esi::cosim::ChannelDesc grpcDesc;
248 throw std::runtime_error(
"Could not find channel '" + channelName +
"'");
250 auto connection = std::make_unique<ReadChannelConnectionImpl>(
251 stub.get(), grpcDesc, std::move(callback));
257 std::unique_ptr<::esi::cosim::ChannelServer::Stub>
stub;
265 : impl(std::make_unique<
Impl>(hostname, port)) {}
270 return impl->getManifest().esi_version();
274 ::esi::cosim::Manifest response =
impl->getManifest();
275 std::string compressedManifestStr = response.compressed_manifest();
276 return std::vector<uint8_t>(compressedManifestStr.begin(),
277 compressedManifestStr.end());
282 ::esi::cosim::ChannelDesc grpcDesc;
283 if (!
impl->getChannelDesc(channelName, grpcDesc))
286 desc.
name = grpcDesc.name();
287 desc.
type = grpcDesc.type();
288 if (grpcDesc.dir() ==
289 ::esi::cosim::ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER)
297 return impl->listChannels();
302 impl->writeToServer(channelName, data);
305std::unique_ptr<RpcClient::ReadChannelConnection>
308 return impl->connectClientReceiver(channelName, std::move(callback));
static std::unique_ptr< Context > context
static void checkStatus(Status s, const std::string &msg)
std::unique_ptr<::esi::cosim::ChannelServer::Stub > stub
Impl(const std::string &hostname, uint16_t port)
void writeToServer(const std::string &channelName, const MessageData &data)
::esi::cosim::Manifest getManifest() const
std::vector< RpcClient::ChannelDesc > listChannels() const
bool getChannelDesc(const std::string &channelName, ::esi::cosim::ChannelDesc &desc) const
::esi::cosim::ChannelServer::Stub * getStub() const
std::unique_ptr< RpcClient::ReadChannelConnection > connectClientReceiver(const std::string &channelName, RpcClient::ReadCallback callback)
A logical chunk of data representing serialized data.
Abstract handle for a read channel connection.
virtual void disconnect()=0
std::unique_ptr< ReadChannelConnection > connectClientReceiver(const std::string &channelName, ReadCallback callback)
Connect to a client-bound channel and receive messages via callback.
RpcClient(const std::string &hostname, uint16_t port)
std::vector< uint8_t > getCompressedManifest() const
Get the compressed manifest from the server.
uint32_t getEsiVersion() const
Get the ESI version from the manifest.
void writeToServer(const std::string &channelName, const MessageData &data)
Send a message to a server-bound channel.
bool getChannelDesc(const std::string &channelName, ChannelDesc &desc) const
Get the channel description for a channel name.
std::function< bool(const MessageData &)> ReadCallback
Callback type for receiving messages from a client-bound channel.
std::vector< ChannelDesc > listChannels() const
List all channels available on the server.
std::unique_ptr< Impl > impl
Description of a channel from the server.