19#include "cosim.grpc.pb.h"
22#include <grpcpp/channel.h>
23#include <grpcpp/client_context.h>
24#include <grpcpp/create_channel.h>
25#include <grpcpp/security/credentials.h>
32using grpc::ClientContext;
37 throw std::runtime_error(msg +
". Code " + to_string(s.error_code()) +
38 ": " + s.error_message() +
" (" +
39 s.error_details() +
")");
47class ReadChannelConnectionImpl
49 public grpc::ClientReadReactor<::esi::cosim::Message> {
51 ReadChannelConnectionImpl(::esi::cosim::ChannelServer::Stub *stub,
52 const ::esi::cosim::ChannelDesc &desc,
54 : stub(stub), grpcDesc(desc), callback(std::move(callback)),
57 ~ReadChannelConnectionImpl()
override {
disconnect(); }
60 context =
new ClientContext();
61 stub->async()->ConnectToClientChannel(context, &grpcDesc,
this);
63 StartRead(&incomingMessage);
66 void OnReadDone(
bool ok)
override {
72 const std::string &messageString = incomingMessage.data();
73 MessageData data(
reinterpret_cast<const uint8_t *
>(messageString.data()),
74 messageString.size());
76 while (!callback(data))
79 std::this_thread::sleep_for(std::chrono::milliseconds(10));
82 StartRead(&incomingMessage);
95 ::esi::cosim::ChannelServer::Stub *stub;
96 ::esi::cosim::ChannelDesc grpcDesc;
99 ::esi::cosim::Message incomingMessage;
109 Impl(
const std::string &hostname, uint16_t port) {
110 auto channel = grpc::CreateChannel(hostname +
":" + std::to_string(port),
111 grpc::InsecureChannelCredentials());
112 stub = ::esi::cosim::ChannelServer::NewStub(channel);
115 ::esi::cosim::ChannelServer::Stub *
getStub()
const {
return stub.get(); }
118 ::esi::cosim::Manifest response;
123 ::esi::cosim::VoidMessage arg;
124 Status s =
stub->GetManifest(&
context, arg, &response);
126 std::this_thread::sleep_for(std::chrono::milliseconds(10));
127 }
while (response.esi_version() < 0);
132 ::esi::cosim::ChannelDesc &desc)
const {
134 ::esi::cosim::VoidMessage arg;
135 ::esi::cosim::ListOfChannels response;
136 Status s =
stub->ListChannels(&
context, arg, &response);
138 for (
const auto &channel : response.channels())
139 if (channel.name() == channelName) {
148 ::esi::cosim::VoidMessage arg;
149 ::esi::cosim::ListOfChannels response;
150 Status s =
stub->ListChannels(&
context, arg, &response);
153 std::vector<RpcClient::ChannelDesc> result;
154 result.reserve(response.channels_size());
155 for (
const auto &grpcDesc : response.channels()) {
157 desc.
name = grpcDesc.name();
158 desc.
type = grpcDesc.type();
159 if (grpcDesc.dir() ==
160 ::esi::cosim::ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER)
164 result.push_back(std::move(desc));
171 ::esi::cosim::AddressedMessage grpcMsg;
172 grpcMsg.set_channel_name(channelName);
173 grpcMsg.mutable_message()->set_data(data.getBytes(), data.getSize());
174 ::esi::cosim::VoidMessage response;
175 grpc::Status sendStatus =
stub->SendToServer(&
context, grpcMsg, &response);
176 if (!sendStatus.ok())
177 throw std::runtime_error(
"Failed to write to channel '" + channelName +
178 "': " + std::to_string(sendStatus.error_code()) +
179 " " + sendStatus.error_message() +
180 ". Details: " + sendStatus.error_details());
183 std::unique_ptr<RpcClient::ReadChannelConnection>
186 ::esi::cosim::ChannelDesc grpcDesc;
188 throw std::runtime_error(
"Could not find channel '" + channelName +
"'");
190 auto connection = std::make_unique<ReadChannelConnectionImpl>(
191 stub.get(), grpcDesc, std::move(callback));
197 std::unique_ptr<::esi::cosim::ChannelServer::Stub>
stub;
205 : impl(std::make_unique<
Impl>(hostname, port)) {}
210 return impl->getManifest().esi_version();
214 ::esi::cosim::Manifest response =
impl->getManifest();
215 std::string compressedManifestStr = response.compressed_manifest();
216 return std::vector<uint8_t>(compressedManifestStr.begin(),
217 compressedManifestStr.end());
222 ::esi::cosim::ChannelDesc grpcDesc;
223 if (!
impl->getChannelDesc(channelName, grpcDesc))
226 desc.
name = grpcDesc.name();
227 desc.
type = grpcDesc.type();
228 if (grpcDesc.dir() ==
229 ::esi::cosim::ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER)
237 return impl->listChannels();
242 impl->writeToServer(channelName, data);
245std::unique_ptr<RpcClient::ReadChannelConnection>
248 return impl->connectClientReceiver(channelName, std::move(callback));
static std::unique_ptr< Context > context
static void checkStatus(Status s, const std::string &msg)
std::unique_ptr<::esi::cosim::ChannelServer::Stub > stub
Impl(const std::string &hostname, uint16_t port)
void writeToServer(const std::string &channelName, const MessageData &data)
::esi::cosim::Manifest getManifest() const
std::vector< RpcClient::ChannelDesc > listChannels() const
bool getChannelDesc(const std::string &channelName, ::esi::cosim::ChannelDesc &desc) const
::esi::cosim::ChannelServer::Stub * getStub() const
std::unique_ptr< RpcClient::ReadChannelConnection > connectClientReceiver(const std::string &channelName, RpcClient::ReadCallback callback)
A logical chunk of data representing serialized data.
Abstract handle for a read channel connection.
virtual void disconnect()=0
std::unique_ptr< ReadChannelConnection > connectClientReceiver(const std::string &channelName, ReadCallback callback)
Connect to a client-bound channel and receive messages via callback.
RpcClient(const std::string &hostname, uint16_t port)
std::vector< uint8_t > getCompressedManifest() const
Get the compressed manifest from the server.
uint32_t getEsiVersion() const
Get the ESI version from the manifest.
void writeToServer(const std::string &channelName, const MessageData &data)
Send a message to a server-bound channel.
bool getChannelDesc(const std::string &channelName, ChannelDesc &desc) const
Get the channel description for a channel name.
std::function< bool(const MessageData &)> ReadCallback
Callback type for receiving messages from a client-bound channel.
std::vector< ChannelDesc > listChannels() const
List all channels available on the server.
std::unique_ptr< Impl > impl
Description of a channel from the server.