CIRCT 22.0.0git
Loading...
Searching...
No Matches
RpcClient.cpp
Go to the documentation of this file.
1//===- RpcClient.cpp - ESI Cosim RPC client implementation ----------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// DO NOT EDIT!
10// This file is distributed as part of an ESI package. The source for this file
11// should always be modified within CIRCT
12// (lib/dialect/ESI/runtime/cpp/lib/backends/RpcClient.cpp).
13//
14//===----------------------------------------------------------------------===//
15
17#include "esi/Utils.h"
18
19#include "cosim.grpc.pb.h"
20
21#include <grpc/grpc.h>
22#include <grpcpp/channel.h>
23#include <grpcpp/client_context.h>
24#include <grpcpp/create_channel.h>
25#include <grpcpp/security/credentials.h>
26
27#include <condition_variable>
28#include <mutex>
29#include <thread>
30
31using namespace esi;
32using namespace esi::backends::cosim;
33
34using grpc::ClientContext;
35using grpc::Status;
36
37static void checkStatus(Status s, const std::string &msg) {
38 if (!s.ok())
39 throw std::runtime_error(msg + ". Code " + to_string(s.error_code()) +
40 ": " + s.error_message() + " (" +
41 s.error_details() + ")");
42}
43
44//===----------------------------------------------------------------------===//
45// ReadChannelConnectionImpl - gRPC streaming reader implementation
46//===----------------------------------------------------------------------===//
47
48namespace {
49class ReadChannelConnectionImpl
51 public grpc::ClientReadReactor<::esi::cosim::Message> {
52public:
53 ReadChannelConnectionImpl(::esi::cosim::ChannelServer::Stub *stub,
54 const ::esi::cosim::ChannelDesc &desc,
56 : stub(stub), grpcDesc(desc), callback(std::move(callback)),
57 context(nullptr), done(false) {}
58
59 ~ReadChannelConnectionImpl() override { disconnect(); }
60
61 void start() {
62 context = new ClientContext();
63 stub->async()->ConnectToClientChannel(context, &grpcDesc, this);
64 StartCall();
65 StartRead(&incomingMessage);
66 }
67
68 void OnReadDone(bool ok) override {
69 if (!ok)
70 // This happens when we are disconnecting since we are canceling the call.
71 return;
72
73 // Read the delivered message and push it onto the queue.
74 const std::string &messageString = incomingMessage.data();
75 MessageData data(reinterpret_cast<const uint8_t *>(messageString.data()),
76 messageString.size());
77
78 while (!callback(data))
79 // Blocking here could cause deadlocks in specific situations.
80 // TODO: Implement a way to handle this better.
81 std::this_thread::sleep_for(std::chrono::milliseconds(10));
82
83 // Initiate the next read.
84 StartRead(&incomingMessage);
85 }
86
87 // Called by gRPC when the RPC is fully complete (after cancel or error).
88 void OnDone(const grpc::Status & /*status*/) override {
89 std::lock_guard<std::mutex> lock(doneMutex);
90 done = true;
91 doneCV.notify_all();
92 }
93
94 void disconnect() override {
95 if (!context)
96 return;
97
98 // Initiate cancellation.
99 context->TryCancel();
100
101 // Wait for gRPC to signal completion via OnDone().
102 std::unique_lock<std::mutex> lock(doneMutex);
103 doneCV.wait(lock, [this]() { return done; });
104
105 // Now it's safe to clean up.
106 delete context;
107 context = nullptr;
108 }
109
110private:
111 ::esi::cosim::ChannelServer::Stub *stub;
112 ::esi::cosim::ChannelDesc grpcDesc;
114 ClientContext *context;
115 ::esi::cosim::Message incomingMessage;
116
117 // Synchronization for waiting on gRPC completion.
118 std::mutex doneMutex;
119 std::condition_variable doneCV;
120 bool done;
121};
122} // namespace
123
124//===----------------------------------------------------------------------===//
125// RpcClient::Impl - internal implementation class
126//===----------------------------------------------------------------------===//
127
129public:
130 Impl(const std::string &hostname, uint16_t port) {
131 auto channel = grpc::CreateChannel(hostname + ":" + std::to_string(port),
132 grpc::InsecureChannelCredentials());
133 stub = ::esi::cosim::ChannelServer::NewStub(channel);
134 }
135
136 ::esi::cosim::ChannelServer::Stub *getStub() const { return stub.get(); }
137
138 ::esi::cosim::Manifest getManifest() const {
139 ::esi::cosim::Manifest response;
140 // To get around the a race condition where the manifest may not be set yet,
141 // loop until it is. TODO: fix this with the DPI API change.
142 do {
143 ClientContext context;
144 ::esi::cosim::VoidMessage arg;
145 Status s = stub->GetManifest(&context, arg, &response);
146 checkStatus(s, "Failed to get manifest");
147 std::this_thread::sleep_for(std::chrono::milliseconds(10));
148 } while (response.esi_version() < 0);
149 return response;
150 }
151
152 bool getChannelDesc(const std::string &channelName,
153 ::esi::cosim::ChannelDesc &desc) const {
154 ClientContext context;
155 ::esi::cosim::VoidMessage arg;
156 ::esi::cosim::ListOfChannels response;
157 Status s = stub->ListChannels(&context, arg, &response);
158 checkStatus(s, "Failed to list channels");
159 for (const auto &channel : response.channels())
160 if (channel.name() == channelName) {
161 desc = channel;
162 return true;
163 }
164 return false;
165 }
166
167 std::vector<RpcClient::ChannelDesc> listChannels() const {
168 ClientContext context;
169 ::esi::cosim::VoidMessage arg;
170 ::esi::cosim::ListOfChannels response;
171 Status s = stub->ListChannels(&context, arg, &response);
172 checkStatus(s, "Failed to list channels");
173
174 std::vector<RpcClient::ChannelDesc> result;
175 result.reserve(response.channels_size());
176 for (const auto &grpcDesc : response.channels()) {
178 desc.name = grpcDesc.name();
179 desc.type = grpcDesc.type();
180 if (grpcDesc.dir() ==
181 ::esi::cosim::ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER)
183 else
185 result.push_back(std::move(desc));
186 }
187 return result;
188 }
189
190 void writeToServer(const std::string &channelName, const MessageData &data) {
191 ClientContext context;
192 ::esi::cosim::AddressedMessage grpcMsg;
193 grpcMsg.set_channel_name(channelName);
194 grpcMsg.mutable_message()->set_data(data.getBytes(), data.getSize());
195 ::esi::cosim::VoidMessage response;
196 grpc::Status sendStatus = stub->SendToServer(&context, grpcMsg, &response);
197 if (!sendStatus.ok())
198 throw std::runtime_error("Failed to write to channel '" + channelName +
199 "': " + std::to_string(sendStatus.error_code()) +
200 " " + sendStatus.error_message() +
201 ". Details: " + sendStatus.error_details());
202 }
203
204 std::unique_ptr<RpcClient::ReadChannelConnection>
205 connectClientReceiver(const std::string &channelName,
206 RpcClient::ReadCallback callback) {
207 ::esi::cosim::ChannelDesc grpcDesc;
208 if (!getChannelDesc(channelName, grpcDesc))
209 throw std::runtime_error("Could not find channel '" + channelName + "'");
210
211 auto connection = std::make_unique<ReadChannelConnectionImpl>(
212 stub.get(), grpcDesc, std::move(callback));
213 connection->start();
214 return connection;
215 }
216
217private:
218 std::unique_ptr<::esi::cosim::ChannelServer::Stub> stub;
219};
220
221//===----------------------------------------------------------------------===//
222// RpcClient
223//===----------------------------------------------------------------------===//
224
225RpcClient::RpcClient(const std::string &hostname, uint16_t port)
226 : impl(std::make_unique<Impl>(hostname, port)) {}
227
228RpcClient::~RpcClient() = default;
229
230uint32_t RpcClient::getEsiVersion() const {
231 return impl->getManifest().esi_version();
232}
233
234std::vector<uint8_t> RpcClient::getCompressedManifest() const {
235 ::esi::cosim::Manifest response = impl->getManifest();
236 std::string compressedManifestStr = response.compressed_manifest();
237 return std::vector<uint8_t>(compressedManifestStr.begin(),
238 compressedManifestStr.end());
239}
240
241bool RpcClient::getChannelDesc(const std::string &channelName,
242 ChannelDesc &desc) const {
243 ::esi::cosim::ChannelDesc grpcDesc;
244 if (!impl->getChannelDesc(channelName, grpcDesc))
245 return false;
246
247 desc.name = grpcDesc.name();
248 desc.type = grpcDesc.type();
249 if (grpcDesc.dir() ==
250 ::esi::cosim::ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER)
252 else
254 return true;
255}
256
257std::vector<RpcClient::ChannelDesc> RpcClient::listChannels() const {
258 return impl->listChannels();
259}
260
261void RpcClient::writeToServer(const std::string &channelName,
262 const MessageData &data) {
263 impl->writeToServer(channelName, data);
264}
265
266std::unique_ptr<RpcClient::ReadChannelConnection>
267RpcClient::connectClientReceiver(const std::string &channelName,
268 ReadCallback callback) {
269 return impl->connectClientReceiver(channelName, std::move(callback));
270}
static std::unique_ptr< Context > context
static void checkStatus(Status s, const std::string &msg)
Definition RpcClient.cpp:37
std::unique_ptr<::esi::cosim::ChannelServer::Stub > stub
Impl(const std::string &hostname, uint16_t port)
void writeToServer(const std::string &channelName, const MessageData &data)
::esi::cosim::Manifest getManifest() const
std::vector< RpcClient::ChannelDesc > listChannels() const
bool getChannelDesc(const std::string &channelName, ::esi::cosim::ChannelDesc &desc) const
::esi::cosim::ChannelServer::Stub * getStub() const
std::unique_ptr< RpcClient::ReadChannelConnection > connectClientReceiver(const std::string &channelName, RpcClient::ReadCallback callback)
A logical chunk of data representing serialized data.
Definition Common.h:113
Abstract handle for a read channel connection.
Definition RpcClient.h:77
std::unique_ptr< ReadChannelConnection > connectClientReceiver(const std::string &channelName, ReadCallback callback)
Connect to a client-bound channel and receive messages via callback.
RpcClient(const std::string &hostname, uint16_t port)
std::vector< uint8_t > getCompressedManifest() const
Get the compressed manifest from the server.
uint32_t getEsiVersion() const
Get the ESI version from the manifest.
void writeToServer(const std::string &channelName, const MessageData &data)
Send a message to a server-bound channel.
bool getChannelDesc(const std::string &channelName, ChannelDesc &desc) const
Get the channel description for a channel name.
std::function< bool(const MessageData &)> ReadCallback
Callback type for receiving messages from a client-bound channel.
Definition RpcClient.h:73
std::vector< ChannelDesc > listChannels() const
List all channels available on the server.
std::unique_ptr< Impl > impl
Definition RpcClient.h:90
Definition esi.py:1
Description of a channel from the server.
Definition RpcClient.h:55