CIRCT 22.0.0git
Loading...
Searching...
No Matches
RpcClient.cpp
Go to the documentation of this file.
1//===- RpcClient.cpp - ESI Cosim RPC client implementation ----------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// DO NOT EDIT!
10// This file is distributed as part of an ESI package. The source for this file
11// should always be modified within CIRCT
12// (lib/dialect/ESI/runtime/cpp/lib/backends/RpcClient.cpp).
13//
14//===----------------------------------------------------------------------===//
15
17#include "esi/Utils.h"
18
19#include "cosim.grpc.pb.h"
20
21#include <grpc/grpc.h>
22#include <grpcpp/channel.h>
23#include <grpcpp/client_context.h>
24#include <grpcpp/create_channel.h>
25#include <grpcpp/security/credentials.h>
26
27#include <thread>
28
29using namespace esi;
30using namespace esi::backends::cosim;
31
32using grpc::ClientContext;
33using grpc::Status;
34
35static void checkStatus(Status s, const std::string &msg) {
36 if (!s.ok())
37 throw std::runtime_error(msg + ". Code " + to_string(s.error_code()) +
38 ": " + s.error_message() + " (" +
39 s.error_details() + ")");
40}
41
42//===----------------------------------------------------------------------===//
43// ReadChannelConnectionImpl - gRPC streaming reader implementation
44//===----------------------------------------------------------------------===//
45
46namespace {
47class ReadChannelConnectionImpl
49 public grpc::ClientReadReactor<::esi::cosim::Message> {
50public:
51 ReadChannelConnectionImpl(::esi::cosim::ChannelServer::Stub *stub,
52 const ::esi::cosim::ChannelDesc &desc,
54 : stub(stub), grpcDesc(desc), callback(std::move(callback)),
55 context(nullptr) {}
56
57 ~ReadChannelConnectionImpl() override { disconnect(); }
58
59 void start() {
60 context = new ClientContext();
61 stub->async()->ConnectToClientChannel(context, &grpcDesc, this);
62 StartCall();
63 StartRead(&incomingMessage);
64 }
65
66 void OnReadDone(bool ok) override {
67 if (!ok)
68 // This happens when we are disconnecting since we are canceling the call.
69 return;
70
71 // Read the delivered message and push it onto the queue.
72 const std::string &messageString = incomingMessage.data();
73 MessageData data(reinterpret_cast<const uint8_t *>(messageString.data()),
74 messageString.size());
75
76 while (!callback(data))
77 // Blocking here could cause deadlocks in specific situations.
78 // TODO: Implement a way to handle this better.
79 std::this_thread::sleep_for(std::chrono::milliseconds(10));
80
81 // Initiate the next read.
82 StartRead(&incomingMessage);
83 }
84
85 void disconnect() override {
86 if (!context)
87 return;
88 context->TryCancel();
89 // Don't delete the context since gRPC still holds a reference to it.
90 // TODO: figure out how to delete it.
91 context = nullptr;
92 }
93
94private:
95 ::esi::cosim::ChannelServer::Stub *stub;
96 ::esi::cosim::ChannelDesc grpcDesc;
98 ClientContext *context;
99 ::esi::cosim::Message incomingMessage;
100};
101} // namespace
102
103//===----------------------------------------------------------------------===//
104// RpcClient::Impl - internal implementation class
105//===----------------------------------------------------------------------===//
106
108public:
109 Impl(const std::string &hostname, uint16_t port) {
110 auto channel = grpc::CreateChannel(hostname + ":" + std::to_string(port),
111 grpc::InsecureChannelCredentials());
112 stub = ::esi::cosim::ChannelServer::NewStub(channel);
113 }
114
115 ::esi::cosim::ChannelServer::Stub *getStub() const { return stub.get(); }
116
117 ::esi::cosim::Manifest getManifest() const {
118 ::esi::cosim::Manifest response;
119 // To get around the a race condition where the manifest may not be set yet,
120 // loop until it is. TODO: fix this with the DPI API change.
121 do {
122 ClientContext context;
123 ::esi::cosim::VoidMessage arg;
124 Status s = stub->GetManifest(&context, arg, &response);
125 checkStatus(s, "Failed to get manifest");
126 std::this_thread::sleep_for(std::chrono::milliseconds(10));
127 } while (response.esi_version() < 0);
128 return response;
129 }
130
131 bool getChannelDesc(const std::string &channelName,
132 ::esi::cosim::ChannelDesc &desc) const {
133 ClientContext context;
134 ::esi::cosim::VoidMessage arg;
135 ::esi::cosim::ListOfChannels response;
136 Status s = stub->ListChannels(&context, arg, &response);
137 checkStatus(s, "Failed to list channels");
138 for (const auto &channel : response.channels())
139 if (channel.name() == channelName) {
140 desc = channel;
141 return true;
142 }
143 return false;
144 }
145
146 std::vector<RpcClient::ChannelDesc> listChannels() const {
147 ClientContext context;
148 ::esi::cosim::VoidMessage arg;
149 ::esi::cosim::ListOfChannels response;
150 Status s = stub->ListChannels(&context, arg, &response);
151 checkStatus(s, "Failed to list channels");
152
153 std::vector<RpcClient::ChannelDesc> result;
154 result.reserve(response.channels_size());
155 for (const auto &grpcDesc : response.channels()) {
157 desc.name = grpcDesc.name();
158 desc.type = grpcDesc.type();
159 if (grpcDesc.dir() ==
160 ::esi::cosim::ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER)
162 else
164 result.push_back(std::move(desc));
165 }
166 return result;
167 }
168
169 void writeToServer(const std::string &channelName, const MessageData &data) {
170 ClientContext context;
171 ::esi::cosim::AddressedMessage grpcMsg;
172 grpcMsg.set_channel_name(channelName);
173 grpcMsg.mutable_message()->set_data(data.getBytes(), data.getSize());
174 ::esi::cosim::VoidMessage response;
175 grpc::Status sendStatus = stub->SendToServer(&context, grpcMsg, &response);
176 if (!sendStatus.ok())
177 throw std::runtime_error("Failed to write to channel '" + channelName +
178 "': " + std::to_string(sendStatus.error_code()) +
179 " " + sendStatus.error_message() +
180 ". Details: " + sendStatus.error_details());
181 }
182
183 std::unique_ptr<RpcClient::ReadChannelConnection>
184 connectClientReceiver(const std::string &channelName,
185 RpcClient::ReadCallback callback) {
186 ::esi::cosim::ChannelDesc grpcDesc;
187 if (!getChannelDesc(channelName, grpcDesc))
188 throw std::runtime_error("Could not find channel '" + channelName + "'");
189
190 auto connection = std::make_unique<ReadChannelConnectionImpl>(
191 stub.get(), grpcDesc, std::move(callback));
192 connection->start();
193 return connection;
194 }
195
196private:
197 std::unique_ptr<::esi::cosim::ChannelServer::Stub> stub;
198};
199
200//===----------------------------------------------------------------------===//
201// RpcClient
202//===----------------------------------------------------------------------===//
203
204RpcClient::RpcClient(const std::string &hostname, uint16_t port)
205 : impl(std::make_unique<Impl>(hostname, port)) {}
206
207RpcClient::~RpcClient() = default;
208
209uint32_t RpcClient::getEsiVersion() const {
210 return impl->getManifest().esi_version();
211}
212
213std::vector<uint8_t> RpcClient::getCompressedManifest() const {
214 ::esi::cosim::Manifest response = impl->getManifest();
215 std::string compressedManifestStr = response.compressed_manifest();
216 return std::vector<uint8_t>(compressedManifestStr.begin(),
217 compressedManifestStr.end());
218}
219
220bool RpcClient::getChannelDesc(const std::string &channelName,
221 ChannelDesc &desc) const {
222 ::esi::cosim::ChannelDesc grpcDesc;
223 if (!impl->getChannelDesc(channelName, grpcDesc))
224 return false;
225
226 desc.name = grpcDesc.name();
227 desc.type = grpcDesc.type();
228 if (grpcDesc.dir() ==
229 ::esi::cosim::ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER)
231 else
233 return true;
234}
235
236std::vector<RpcClient::ChannelDesc> RpcClient::listChannels() const {
237 return impl->listChannels();
238}
239
240void RpcClient::writeToServer(const std::string &channelName,
241 const MessageData &data) {
242 impl->writeToServer(channelName, data);
243}
244
245std::unique_ptr<RpcClient::ReadChannelConnection>
246RpcClient::connectClientReceiver(const std::string &channelName,
247 ReadCallback callback) {
248 return impl->connectClientReceiver(channelName, std::move(callback));
249}
static std::unique_ptr< Context > context
static void checkStatus(Status s, const std::string &msg)
Definition RpcClient.cpp:35
std::unique_ptr<::esi::cosim::ChannelServer::Stub > stub
Impl(const std::string &hostname, uint16_t port)
void writeToServer(const std::string &channelName, const MessageData &data)
::esi::cosim::Manifest getManifest() const
std::vector< RpcClient::ChannelDesc > listChannels() const
bool getChannelDesc(const std::string &channelName, ::esi::cosim::ChannelDesc &desc) const
::esi::cosim::ChannelServer::Stub * getStub() const
std::unique_ptr< RpcClient::ReadChannelConnection > connectClientReceiver(const std::string &channelName, RpcClient::ReadCallback callback)
A logical chunk of data representing serialized data.
Definition Common.h:113
Abstract handle for a read channel connection.
Definition RpcClient.h:77
std::unique_ptr< ReadChannelConnection > connectClientReceiver(const std::string &channelName, ReadCallback callback)
Connect to a client-bound channel and receive messages via callback.
RpcClient(const std::string &hostname, uint16_t port)
std::vector< uint8_t > getCompressedManifest() const
Get the compressed manifest from the server.
uint32_t getEsiVersion() const
Get the ESI version from the manifest.
void writeToServer(const std::string &channelName, const MessageData &data)
Send a message to a server-bound channel.
bool getChannelDesc(const std::string &channelName, ChannelDesc &desc) const
Get the channel description for a channel name.
std::function< bool(const MessageData &)> ReadCallback
Callback type for receiving messages from a client-bound channel.
Definition RpcClient.h:73
std::vector< ChannelDesc > listChannels() const
List all channels available on the server.
std::unique_ptr< Impl > impl
Definition RpcClient.h:90
Definition esi.py:1
Description of a channel from the server.
Definition RpcClient.h:55