CIRCT 23.0.0git
Loading...
Searching...
No Matches
RpcClient.cpp
Go to the documentation of this file.
1//===- RpcClient.cpp - ESI Cosim RPC client implementation ----------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// Client implementation of the cosim WebSocket + JSON protocol. The wire
10// protocol is documented in cosim-protocol.md.
11//
12//===----------------------------------------------------------------------===//
13
15#include "esi/Logging.h"
16#include "esi/Utils.h"
17
18#include "RpcWire.h"
19
20#include <ixwebsocket/IXBase64.h>
21#include <ixwebsocket/IXNetSystem.h>
22#include <ixwebsocket/IXWebSocket.h>
23#include <ixwebsocket/IXWebSocketCloseConstants.h>
24#include <nlohmann/json.hpp>
25
26#include <atomic>
27#include <chrono>
28#include <condition_variable>
29#include <cstdint>
30#include <cstring>
31#include <future>
32#include <map>
33#include <memory>
34#include <mutex>
35#include <stdexcept>
36#include <string>
37#include <thread>
38#include <unordered_map>
39#include <vector>
40
41using namespace esi;
42using namespace esi::backends::cosim;
43using json = nlohmann::json;
44
45//===----------------------------------------------------------------------===//
46// RpcClient::Impl - hides IXWebSocket + JSON behind the public header
47//===----------------------------------------------------------------------===//
48
49namespace {
50class ReadChannelConnectionImpl;
51} // namespace
52
54public:
55 Impl(Logger &logger, const std::string &hostname, uint16_t port);
56 ~Impl();
57
58 Logger &getLogger() { return logger; }
59
60 uint32_t getEsiVersion() const { return esiVersion; }
61 std::vector<uint8_t> getCompressedManifest() const { return manifest; }
62
63 bool getChannelDesc(const std::string &name,
64 RpcClient::ChannelDesc &desc) const;
65 std::vector<RpcClient::ChannelDesc> listChannels() const;
66
67 void writeToServer(const std::string &channelName, const MessageData &data);
68
69 std::unique_ptr<RpcClient::ReadChannelConnection>
70 connectClientReceiver(const std::string &channelName,
72
73 // Helpers used by ReadChannelConnectionImpl.
74 void unsubscribe(uint64_t channelId);
75 void unregisterCallback(uint64_t channelId);
76
77private:
78 struct ChannelMeta {
79 uint64_t id;
80 std::string name;
81 std::string type;
83 };
84
85 // ---- websocket lifecycle ----
86 void onMessage(const ix::WebSocketMessagePtr &msg);
87 void handleControlFrame(const std::string &text);
88 void handleBinaryFrame(const std::string &data);
89 void failAllPending(const std::string &reason);
90
91 // ---- request/response plumbing ----
92
93 /// Issue a JSON-RPC style request and synchronously await the response.
94 /// Throws on error (transport or server-reported).
95 json call(const std::string &method, json params);
96
97 std::string host;
99 ix::WebSocket ws;
100
101 // Manifest + channel table cached from the `hello` response.
102 uint32_t esiVersion = 0;
103 std::vector<uint8_t> manifest;
104 std::unordered_map<std::string, ChannelMeta> channelsByName;
105 std::unordered_map<uint64_t, ChannelMeta> channelsById;
106
107 // Pending control-plane requests, keyed by id.
108 std::mutex pendingMutex;
109 std::atomic<uint64_t> nextRequestId{1};
110 std::unordered_map<uint64_t, std::shared_ptr<std::promise<json>>> pending;
111
112 // Per-channel read state. Connection-time `subscribe` registers; the
113 // ReadChannelConnection destructor unregisters and sends `unsubscribe`.
114 //
115 // Each entry owns its own non-blocking TSQueue: `handleBinaryFrame` only
116 // pushes into the queue (on IX's single per-connection network thread),
117 // and a dedicated transport thread is what actually invokes the
118 // user-supplied callback. That decoupling is what makes the client
119 // cross-channel-deadlock-safe regardless of what the user's callback
120 // does: a callback that returns `false` (rejects the message) only
121 // delays its own channel -- the IX thread is free to keep delivering on
122 // other channels.
123 //
124 // The queue's `notifier` rings the server-wide doorbell
125 // (`Impl::markReady`) so the transport thread wakes up only when there's
126 // work for a specific channel.
127 //
128 // Held by `shared_ptr` so the transport thread can copy the entry under
129 // the lock and outlive a concurrent `unregisterCallback` that erases the
130 // map entry: the cancel flag flips, the transport thread sees it on its
131 // next iteration, and drops anything still in the queue.
134 std::atomic<bool> canceled{false};
135 uint64_t channelId;
137
138 ReadCallbackEntry(uint64_t channelId, std::function<void()> notifier)
139 : channelId(channelId), queue(std::move(notifier)) {}
140 };
142 std::unordered_map<uint64_t, std::shared_ptr<ReadCallbackEntry>>
144
145 // Dirty-set doorbell for the transport thread. Each entry's TSQueue
146 // notifier inserts the channel id into `readyIds` and wakes the consumer.
148 std::thread transportThread;
149
150 void transportLoop();
151
152 // Connection-open synchronization. The WS callback fires on IX's background
153 // thread, so the constructor blocks here until either Open or Error fires.
154 std::mutex openMutex;
155 std::condition_variable openCV;
156 enum class OpenState { Pending, Open, Failed } openState{OpenState::Pending};
157 std::string openError;
158
159 // Most recent unsolicited server-initiated error (`type="error"`), if any.
160 // The server sends one of these before closing the WS to explain why (e.g.
161 // `server_busy`). Recorded so a subsequent Close/Error path can surface it
162 // instead of the much-less-actionable raw WebSocket close reason.
164 std::string lastServerError;
165
166 std::atomic<bool> disconnecting{false};
167
168 // Cross-thread fault propagation out of the IX network thread. See
169 // FaultStash docs in RpcWire.h.
171};
172
173//===----------------------------------------------------------------------===//
174// ReadChannelConnectionImpl
175//
176// Held by the consumer of `connectClientReceiver`. Destructor unregisters the
177// callback and sends `unsubscribe`. The actual incoming bytes are delivered
178// by the shared websocket callback in RpcClient::Impl.
179//===----------------------------------------------------------------------===//
180
181namespace {
182class ReadChannelConnectionImpl : public RpcClient::ReadChannelConnection {
183public:
184 ReadChannelConnectionImpl(RpcClient::Impl *impl, uint64_t channelId)
185 : impl(impl), channelId(channelId) {}
186 ~ReadChannelConnectionImpl() override { disconnect(); }
187
188 void disconnect() override {
189 if (disconnected.exchange(true))
190 return;
191 impl->unregisterCallback(channelId);
192 try {
193 impl->unsubscribe(channelId);
194 } catch (const std::exception &e) {
195 // We're tearing down a per-channel reader and can't propagate the
196 // exception (this runs in a destructor). Log it so the failure isn't
197 // silently lost.
198 impl->getLogger().warning(
199 "cosim",
200 std::string("unsubscribe during disconnect failed: ") + e.what());
201 }
202 }
203
204private:
206 uint64_t channelId;
207 std::atomic<bool> disconnected{false};
208};
209} // namespace
210
211//===----------------------------------------------------------------------===//
212// RpcClient::Impl - construction / `hello` handshake
213//===----------------------------------------------------------------------===//
214
215RpcClient::Impl::Impl(Logger &logger, const std::string &hostname,
216 uint16_t port)
217 : host(hostname), logger(logger) {
218 // On Windows, `ix::initNetSystem()` calls `WSAStartup`; bail out cleanly if
219 // it fails rather than letting the WS start path fail with a less
220 // actionable `WSANOTINITIALISED`. No-op on other platforms.
221 if (!ix::initNetSystem())
222 throw std::runtime_error(
223 "RpcClient: ix::initNetSystem() failed (WSAStartup)");
224
225 ws.setUrl("ws://" + host + ":" + std::to_string(port) + "/esi/cosim/v3");
226 ws.disablePerMessageDeflate();
227 ws.disableAutomaticReconnection();
228
229 ws.setOnMessageCallback(
230 [this](const ix::WebSocketMessagePtr &msg) { onMessage(msg); });
231
232 ws.start();
233
234 // Wait for the WS to reach Open (or fail).
235 {
236 std::unique_lock<std::mutex> lock(openMutex);
237 openCV.wait(lock, [&] { return openState != OpenState::Pending; });
238 if (openState == OpenState::Failed)
239 throw std::runtime_error("RpcClient: failed to connect to " +
240 ws.getUrl() + ": " + openError);
241 }
242
243 // Send `hello`; the server blocks the response until its manifest is ready.
244 json result = call("hello", {{"client_protocol_version", 3}});
245
246 // Cache manifest blob.
247 if (auto it = result.find("esi_version"); it != result.end())
248 esiVersion = it->get<uint32_t>();
249 if (auto it = result.find("compressed_manifest_b64"); it != result.end()) {
250 // macaron::Base64::Decode returns an empty error string on success and
251 // writes raw bytes into the out string; reinterpret as a byte vector.
252 std::string raw;
253 std::string err = macaron::Base64::Decode(it->get<std::string>(), raw);
254 if (!err.empty())
255 throw std::runtime_error("RpcClient: invalid base64 in manifest blob: " +
256 err);
257 manifest.assign(raw.begin(), raw.end());
258 }
259
260 // Cache channel table.
261 auto channelsIt = result.find("channels");
262 if (channelsIt == result.end() || !channelsIt->is_array())
263 throw std::runtime_error("RpcClient: hello response missing channels");
264 for (const json &c : *channelsIt) {
265 ChannelMeta meta;
266 meta.id = c.at("channel_id").get<uint64_t>();
267 meta.name = c.at("name").get<std::string>();
268 meta.type = c.at("type").get<std::string>();
269 std::string dir = c.at("direction").get<std::string>();
270 meta.direction = dir == "to_server" ? RpcClient::ChannelDirection::ToServer
272 channelsByName.emplace(meta.name, meta);
273 channelsById.emplace(meta.id, std::move(meta));
274 }
275
276 // Start the transport thread now that initialisation is complete. It
277 // sleeps on `readyIds`'s internal CV until a per-channel TSQueue push
278 // wakes it.
279 transportThread = std::thread([this] { transportLoop(); });
280}
281
283 disconnecting = true;
284 failAllPending("client shutdown");
285 // Mark every per-channel entry canceled so the transport loop drops any
286 // remaining queued frames promptly instead of spinning on the user
287 // callback.
288 {
289 std::lock_guard<std::mutex> lock(readCallbacksMutex);
290 for (auto &[id, entry] : readCallbacks)
291 entry->canceled.store(true);
292 }
293 // Stop IX *first* so no more `handleBinaryFrame` calls can fire and push
294 // into the per-channel queues. ws.stop() joins IX's internal threads
295 // synchronously.
296 ws.stop();
297 // Now retire the transport thread; nothing else is producing for it.
298 if (transportThread.joinable()) {
299 readyIds.requestShutdown();
300 transportThread.join();
301 }
302}
303
304//===----------------------------------------------------------------------===//
305// RpcClient::Impl - WebSocket message dispatch
306//===----------------------------------------------------------------------===//
307
308void RpcClient::Impl::onMessage(const ix::WebSocketMessagePtr &msg) {
309 switch (msg->type) {
310 case ix::WebSocketMessageType::Open: {
311 std::lock_guard<std::mutex> lock(openMutex);
312 openState = OpenState::Open;
313 openCV.notify_all();
314 return;
315 }
316 case ix::WebSocketMessageType::Error: {
317 std::string reason = msg->errorInfo.reason;
318 {
319 std::lock_guard<std::mutex> lock(lastServerErrorMutex);
320 if (!lastServerError.empty())
321 reason = lastServerError;
322 }
323 {
324 std::lock_guard<std::mutex> lock(openMutex);
325 if (openState == OpenState::Pending) {
326 openState = OpenState::Failed;
327 openError = reason;
328 openCV.notify_all();
329 }
330 }
331 failAllPending("websocket error: " + reason);
332 return;
333 }
334 case ix::WebSocketMessageType::Close: {
335 std::string reason = msg->closeInfo.reason;
336 {
337 std::lock_guard<std::mutex> lock(lastServerErrorMutex);
338 if (!lastServerError.empty())
339 reason = lastServerError;
340 }
341 // If the server closed us before `Open` settled (e.g. immediate
342 // `server_busy` reject), surface that as an Open failure so the
343 // constructor's wait unblocks with a useful error.
344 {
345 std::lock_guard<std::mutex> lock(openMutex);
346 if (openState == OpenState::Pending) {
347 openState = OpenState::Failed;
348 openError = reason;
349 openCV.notify_all();
350 }
351 }
352 failAllPending("websocket closed: " + reason);
353 return;
354 }
355 case ix::WebSocketMessageType::Message:
356 // An exception escaping this callback would kill IX's network thread.
357 // Stash the first one so the next public RpcClient method rethrows it
358 // on the user's thread.
359 try {
360 if (msg->binary)
361 handleBinaryFrame(msg->str);
362 else
363 handleControlFrame(msg->str);
364 } catch (...) {
365 faultStash.record(std::current_exception());
366 }
367 return;
368 default:
369 return;
370 }
371}
372
373void RpcClient::Impl::handleControlFrame(const std::string &text) {
374 json resp;
375 try {
376 resp = json::parse(text);
377 } catch (const std::exception &e) {
378 // Unparseable text frame: nothing useful we can do.
379 return;
380 }
381 auto typeIt = resp.find("type");
382 if (typeIt == resp.end() || !typeIt->is_string())
383 return;
384 std::string type = typeIt->get<std::string>();
385
386 // Unsolicited server-initiated error. The server sends one of these before
387 // closing the WS (e.g. `server_busy`). Stash the message so the imminent
388 // Close event surfaces this human-readable reason instead of just the raw
389 // WebSocket close reason.
390 if (type == "error") {
391 if (auto errIt = resp.find("error"); errIt != resp.end()) {
392 std::string code = errIt->value("code", std::string("internal"));
393 std::string message = errIt->value("message", std::string());
394 std::lock_guard<std::mutex> lock(lastServerErrorMutex);
395 lastServerError = "server error [" + code + "]: " + message;
396 }
397 return;
398 }
399
400 if (type != "response") {
401 // Unknown control-frame type. Per the protocol, receivers MUST ignore
402 // unrecognised fields/types so the wire format can evolve additively
403 // under the same `protocol_version`, but warn so genuine misbehavior
404 // doesn't go silently dropped.
405 logger.debug("cosim",
406 "Ignoring control frame with unknown type \"" + type + "\"");
407 return;
408 }
409 auto idIt = resp.find("request_id");
410 if (idIt == resp.end())
411 return;
412 uint64_t requestId = idIt->get<uint64_t>();
413
414 std::shared_ptr<std::promise<json>> promise;
415 {
416 std::lock_guard<std::mutex> lock(pendingMutex);
417 auto it = pending.find(requestId);
418 if (it == pending.end())
419 return;
420 promise = it->second;
421 // Erase under the lock so a concurrent `failAllPending` won't also
422 // try to fulfill the same promise.
423 pending.erase(it);
424 }
425
426 try {
427 if (auto errIt = resp.find("error"); errIt != resp.end()) {
428 std::string code = errIt->value("code", std::string("internal"));
429 std::string message = errIt->value("message", std::string());
430 promise->set_exception(std::make_exception_ptr(
431 std::runtime_error("Server error [" + code + "]: " + message)));
432 } else if (auto resultIt = resp.find("result"); resultIt != resp.end()) {
433 promise->set_value(*resultIt);
434 } else {
435 // Per the wire spec, a `type=response` frame MUST carry either
436 // `result` (success) or `error` (failure). Surface the protocol
437 // violation to the caller instead of pretending it succeeded.
438 promise->set_exception(std::make_exception_ptr(std::runtime_error(
439 "Server response missing both \"result\" and \"error\" fields "
440 "(protocol violation) for request_id=" +
441 std::to_string(requestId))));
442 }
443 } catch (const std::exception &e) {
444 // Promise might be already satisfied if we raced with a shutdown that
445 // already fulfilled it via failAllPending. Benign, but log so unexpected
446 // duplicate responses from the server don't go entirely unnoticed.
447 logger.debug("cosim",
448 std::string("failed to fulfill promise for request_id=") +
449 std::to_string(requestId) + ": " + e.what());
450 }
451}
452
453void RpcClient::Impl::handleBinaryFrame(const std::string &data) {
454 uint64_t channelId;
455 const uint8_t *payloadBytes;
456 size_t payloadSize;
457 if (!::esi::cosim::parseDataFrame(data, channelId, payloadBytes, payloadSize))
458 return;
459
460 std::shared_ptr<ReadCallbackEntry> entry;
461 {
462 std::lock_guard<std::mutex> lock(readCallbacksMutex);
463 auto it = readCallbacks.find(channelId);
464 if (it == readCallbacks.end()) {
465 // Per the protocol, the server MUST stop sending frames for a channel
466 // after the unsubscribe-ack. A frame for an unknown channel is
467 // therefore either a server bug or a misrouted message: warn rather
468 // than silently dropping so the issue is visible.
469 logger.warning("cosim", "Dropping inbound frame for channel id " +
470 std::to_string(channelId) +
471 ": no active subscriber");
472 return;
473 }
474 entry = it->second;
475 }
476
477 // Push the payload into the per-channel queue and return immediately;
478 // never block IX's network thread on the user-supplied callback. The
479 // queue's notifier rings the transport doorbell, and the dedicated
480 // transport thread is what actually invokes the callback.
481 entry->queue.push(MessageData(payloadBytes, payloadSize));
482}
483
484void RpcClient::Impl::failAllPending(const std::string &reason) {
485 std::lock_guard<std::mutex> lock(pendingMutex);
486 for (auto &[id, pr] : pending) {
487 try {
488 pr->set_exception(std::make_exception_ptr(std::runtime_error(reason)));
489 } catch (const std::exception &e) {
490 // Already-satisfied promise; benign but log it in case a real bug is
491 // racing with shutdown.
492 logger.debug("cosim", std::string("failAllPending: request_id=") +
493 std::to_string(id) +
494 " already satisfied: " + e.what());
495 }
496 }
497 pending.clear();
498}
499
500//===----------------------------------------------------------------------===//
501// RpcClient::Impl - request/response, sends
502//===----------------------------------------------------------------------===//
503
504json RpcClient::Impl::call(const std::string &method, json params) {
505 faultStash.check();
506 uint64_t requestId = nextRequestId.fetch_add(1);
507 std::future<json> future;
508 {
509 std::lock_guard<std::mutex> lock(pendingMutex);
510 auto promise = std::make_shared<std::promise<json>>();
511 future = promise->get_future();
512 pending.emplace(requestId, std::move(promise));
513 }
514
515 json req;
516 req["type"] = "request";
517 req["request_id"] = requestId;
518 req["method"] = method;
519 req["params"] = std::move(params);
520 std::string text = req.dump();
521
522 // No send-side mutex needed: IXWebSocket's `sendUtf8Text` is internally
523 // serialized with `sendBinary`, and every cosim frame is a single send
524 // call (no header+body splits).
525 auto info = ws.sendUtf8Text(text);
526 if (!info.success) {
527 std::lock_guard<std::mutex> plock(pendingMutex);
528 pending.erase(requestId);
529 throw std::runtime_error("RpcClient: failed to send " + method);
530 }
531
532 // Bounded wait so callers cannot hang forever on a missing/lost response.
533 // On WS close or error, `failAllPending()` fires and the future becomes
534 // ready immediately. The timeout only matters if the server stays
535 // connected but never replies (server bug, lost control frame).
536 constexpr auto kCallTimeout = std::chrono::seconds(30);
537 if (future.wait_for(kCallTimeout) != std::future_status::ready) {
538 // Drop the pending entry so a late response doesn't fulfill a
539 // destroyed promise (with shared_ptr it's still safe, but it would
540 // otherwise leak the slot forever).
541 std::lock_guard<std::mutex> plock(pendingMutex);
542 pending.erase(requestId);
543 throw std::runtime_error("RpcClient: timed out waiting for response to " +
544 method);
545 }
546 return future.get();
547}
548
549void RpcClient::Impl::writeToServer(const std::string &channelName,
550 const MessageData &data) {
551 faultStash.check();
552 auto it = channelsByName.find(channelName);
553 if (it == channelsByName.end())
554 throw std::runtime_error("Unknown channel '" + channelName + "'");
555 if (it->second.direction != RpcClient::ChannelDirection::ToServer)
556 throw std::runtime_error("Channel '" + channelName +
557 "' is not a to-server channel");
558
559 std::string frame = esi::cosim::buildDataFrame(it->second.id, data.getBytes(),
560 data.getSize());
561 // IXWebSocket serializes sends internally; no extra mutex needed.
562 auto info = ws.sendBinary(frame);
563 if (!info.success)
564 throw std::runtime_error("Failed to send data on channel '" + channelName +
565 "'");
566}
567
568//===----------------------------------------------------------------------===//
569// RpcClient::Impl - channel descriptor lookup
570//===----------------------------------------------------------------------===//
571
572bool RpcClient::Impl::getChannelDesc(const std::string &name,
573 RpcClient::ChannelDesc &desc) const {
574 auto it = channelsByName.find(name);
575 if (it == channelsByName.end())
576 return false;
577 desc.name = it->second.name;
578 desc.type = it->second.type;
579 desc.dir = it->second.direction;
580 return true;
581}
582
583std::vector<RpcClient::ChannelDesc> RpcClient::Impl::listChannels() const {
584 std::vector<RpcClient::ChannelDesc> out;
585 out.reserve(channelsByName.size());
586 for (const auto &[name, meta] : channelsByName) {
588 d.name = meta.name;
589 d.type = meta.type;
590 d.dir = meta.direction;
591 out.push_back(std::move(d));
592 }
593 return out;
594}
595
596//===----------------------------------------------------------------------===//
597// RpcClient::Impl - subscribe / unsubscribe
598//===----------------------------------------------------------------------===//
599
600std::unique_ptr<RpcClient::ReadChannelConnection>
601RpcClient::Impl::connectClientReceiver(const std::string &channelName,
602 RpcClient::ReadCallback callback) {
603 faultStash.check();
604 auto it = channelsByName.find(channelName);
605 if (it == channelsByName.end())
606 throw std::runtime_error("Unknown channel '" + channelName + "'");
607 if (it->second.direction != RpcClient::ChannelDirection::ToClient)
608 throw std::runtime_error("Channel '" + channelName +
609 "' is not a to-client channel");
610 uint64_t channelId = it->second.id;
611
612 // Register the callback first so any racing inbound binary frame after the
613 // server's subscribe-ack has somewhere to land.
614 {
615 std::lock_guard<std::mutex> lock(readCallbacksMutex);
616 auto entry = std::make_shared<ReadCallbackEntry>(
617 channelId, [this, channelId] { readyIds.markReady(channelId); });
618 entry->callback = std::move(callback);
619 readCallbacks[channelId] = std::move(entry);
620 }
621
622 try {
623 call("subscribe", {{"channel_id", channelId}});
624 } catch (...) {
625 std::lock_guard<std::mutex> lock(readCallbacksMutex);
626 readCallbacks.erase(channelId);
627 throw;
628 }
629
630 return std::make_unique<ReadChannelConnectionImpl>(this, channelId);
631}
632
633void RpcClient::Impl::unsubscribe(uint64_t channelId) {
634 if (disconnecting)
635 return;
636 // Skip the RPC if the WS is already closed (server tore down first, network
637 // drop, etc.). The unsubscribe is moot anyway, and otherwise every
638 // per-channel cleanup during teardown would generate a noisy "failed to
639 // send unsubscribe" warning from ReadChannelConnectionImpl::disconnect.
640 if (ws.getReadyState() != ix::ReadyState::Open)
641 return;
642 call("unsubscribe", {{"channel_id", channelId}});
643}
644
645void RpcClient::Impl::unregisterCallback(uint64_t channelId) {
646 std::shared_ptr<ReadCallbackEntry> entry;
647 {
648 std::lock_guard<std::mutex> lock(readCallbacksMutex);
649 auto it = readCallbacks.find(channelId);
650 if (it == readCallbacks.end())
651 return;
652 entry = it->second;
653 readCallbacks.erase(it);
654 }
655 // Flip the cancel flag on the entry that any in-flight transport-loop
656 // iteration already copied out, so it bails out promptly and drops any
657 // remaining queued frames.
658 entry->canceled.store(true);
659 // Nudge the transport thread so it observes the cancelation promptly and
660 // drains+drops whatever's still queued.
661 readyIds.markReady(channelId);
662}
663
664//===----------------------------------------------------------------------===//
665// RpcClient::Impl - transport thread (per-channel deadlock isolation)
666//===----------------------------------------------------------------------===//
667
669 // Channels whose last delivery attempt left a message at the head of the
670 // queue (user callback returned `false`). We retry them after a short
671 // backoff so other channels still get a chance every wake, and so a
672 // permanently-busy channel doesn't pin the CPU.
673 std::unordered_set<uint64_t> retry;
674
675 while (true) {
676 std::unordered_set<uint64_t> ids;
677 if (!readyIds.waitDrain(
678 ids, retry.empty() ? std::optional<std::chrono::milliseconds>{}
679 : std::chrono::milliseconds(1)))
680 return;
681 ids.insert(retry.begin(), retry.end());
682 retry.clear();
683
684 for (uint64_t id : ids) {
685 std::shared_ptr<ReadCallbackEntry> entry;
686 {
687 std::lock_guard<std::mutex> lock(readCallbacksMutex);
688 auto it = readCallbacks.find(id);
689 if (it == readCallbacks.end())
690 continue;
691 entry = it->second;
692 }
693
694 if (entry->canceled.load() || disconnecting)
695 continue;
696
697 // Best-effort drain. `TSQueue::pop(callback)` peeks at the front and
698 // only pops if our callback returns true; on `false` we leave the
699 // message at the head and mark the channel for retry.
700 bool stuck = false;
701 while (!stuck && !entry->canceled.load() && !disconnecting) {
702 bool gotOne = false;
703 entry->queue.pop([&](const MessageData &data) {
704 gotOne = true;
705 std::unique_ptr<SegmentedMessageData> msg =
706 std::make_unique<MessageData>(data);
707 if (entry->callback(msg))
708 return true;
709 stuck = true;
710 return false;
711 });
712 if (!gotOne)
713 break; // queue empty
714 if (stuck)
715 retry.insert(id);
716 }
717 }
718 }
719}
720
721//===----------------------------------------------------------------------===//
722// RpcClient - public pass-throughs (unchanged API)
723//===----------------------------------------------------------------------===//
724
725RpcClient::RpcClient(Logger &logger, const std::string &hostname, uint16_t port)
726 : impl(std::make_unique<Impl>(logger, hostname, port)) {}
727
728RpcClient::~RpcClient() = default;
729
730uint32_t RpcClient::getEsiVersion() const { return impl->getEsiVersion(); }
731
732std::vector<uint8_t> RpcClient::getCompressedManifest() const {
733 return impl->getCompressedManifest();
734}
735
736bool RpcClient::getChannelDesc(const std::string &channelName,
737 ChannelDesc &desc) const {
738 return impl->getChannelDesc(channelName, desc);
739}
740
741std::vector<RpcClient::ChannelDesc> RpcClient::listChannels() const {
742 return impl->listChannels();
743}
744
745void RpcClient::writeToServer(const std::string &channelName,
746 const MessageData &data) {
747 impl->writeToServer(channelName, data);
748}
749
750std::unique_ptr<RpcClient::ReadChannelConnection>
751RpcClient::connectClientReceiver(const std::string &channelName,
752 ReadCallback callback) {
753 return impl->connectClientReceiver(channelName, std::move(callback));
754}
std::vector< uint8_t > getCompressedManifest() const
Definition RpcClient.cpp:61
void handleControlFrame(const std::string &text)
void unsubscribe(uint64_t channelId)
void failAllPending(const std::string &reason)
enum RpcClient::Impl::OpenState Pending
void unregisterCallback(uint64_t channelId)
std::unordered_map< uint64_t, std::shared_ptr< std::promise< json > > > pending
std::unordered_map< uint64_t, std::shared_ptr< ReadCallbackEntry > > readCallbacks
void onMessage(const ix::WebSocketMessagePtr &msg)
std::unordered_map< uint64_t, ChannelMeta > channelsById
void handleBinaryFrame(const std::string &data)
std::atomic< uint64_t > nextRequestId
std::unordered_map< std::string, ChannelMeta > channelsByName
bool getChannelDesc(const std::string &name, RpcClient::ChannelDesc &desc) const
utils::ReadyIdSet< uint64_t > readyIds
void writeToServer(const std::string &channelName, const MessageData &data)
std::vector< RpcClient::ChannelDesc > listChannels() const
::esi::cosim::FaultStash faultStash
std::unique_ptr< RpcClient::ReadChannelConnection > connectClientReceiver(const std::string &channelName, RpcClient::ReadCallback callback)
json call(const std::string &method, json params)
Issue a JSON-RPC style request and synchronously await the response.
virtual void warning(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report a warning.
Definition Logging.h:70
void debug(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report a debug message.
Definition Logging.h:83
A concrete flat message backed by a single vector of bytes.
Definition Common.h:155
Abstract handle for a read channel connection.
Definition RpcClient.h:77
std::function< bool(std::unique_ptr< SegmentedMessageData > &)> ReadCallback
Callback type for receiving messages from a client-bound channel.
Definition RpcClient.h:73
ChannelDirection
Channel direction as reported by the server.
Definition RpcClient.h:50
std::unique_ptr< ReadChannelConnection > connectClientReceiver(const std::string &channelName, ReadCallback callback)
Connect to a client-bound channel and receive messages via callback.
RpcClient(Logger &logger, const std::string &hostname, uint16_t port)
std::vector< uint8_t > getCompressedManifest() const
Get the compressed manifest from the server.
uint32_t getEsiVersion() const
Get the ESI version from the manifest.
void writeToServer(const std::string &channelName, const MessageData &data)
Send a message to a server-bound channel.
bool getChannelDesc(const std::string &channelName, ChannelDesc &desc) const
Get the channel description for a channel name.
std::vector< ChannelDesc > listChannels() const
List all channels available on the server.
std::unique_ptr< Impl > impl
Definition RpcClient.h:92
Cross-thread error channel for the IXWebSocket network thread.
Definition RpcWire.h:69
void handleControlFrame(ix::WebSocket &ws, const std::string &text)
Impl(Context &ctxt, int port)
void handleBinaryFrame(const std::string &data)
Multi-producer / single-consumer dirty-set of channel ids, with CV-style blocking drain semantics.
Definition Utils.h:177
Thread safe queue.
Definition Utils.h:48
std::string buildDataFrame(uint64_t channelId, const uint8_t *bytes, size_t size)
Pack a cosim binary data frame: [u64 LE channel_id][payload].
Definition RpcWire.h:29
bool parseDataFrame(const std::string &data, uint64_t &channelId, const uint8_t *&payload, size_t &payloadSize)
Parse a cosim binary data frame.
Definition RpcWire.h:48
Definition esi.py:1
ReadCallbackEntry(uint64_t channelId, std::function< void()> notifier)
Description of a channel from the server.
Definition RpcClient.h:53