20#include <ixwebsocket/IXBase64.h>
21#include <ixwebsocket/IXNetSystem.h>
22#include <ixwebsocket/IXWebSocket.h>
23#include <ixwebsocket/IXWebSocketCloseConstants.h>
24#include <nlohmann/json.hpp>
28#include <condition_variable>
38#include <unordered_map>
43using json = nlohmann::json;
50class ReadChannelConnectionImpl;
65 std::vector<RpcClient::ChannelDesc>
listChannels()
const;
69 std::unique_ptr<RpcClient::ReadChannelConnection>
86 void onMessage(
const ix::WebSocketMessagePtr &msg);
110 std::unordered_map<uint64_t, std::shared_ptr<std::promise<json>>>
pending;
139 : channelId(channelId), queue(std::move(notifier)) {}
142 std::unordered_map<uint64_t, std::shared_ptr<ReadCallbackEntry>>
185 :
impl(impl), channelId(channelId) {}
186 ~ReadChannelConnectionImpl()
override {
disconnect(); }
189 if (disconnected.exchange(
true))
194 }
catch (
const std::exception &e) {
200 std::string(
"unsubscribe during disconnect failed: ") + e.what());
207 std::atomic<bool> disconnected{
false};
217 : host(hostname), logger(logger) {
221 if (!ix::initNetSystem())
222 throw std::runtime_error(
223 "RpcClient: ix::initNetSystem() failed (WSAStartup)");
225 ws.setUrl(
"ws://" +
host +
":" + std::to_string(port) +
"/esi/cosim/v3");
226 ws.disablePerMessageDeflate();
227 ws.disableAutomaticReconnection();
229 ws.setOnMessageCallback(
230 [
this](
const ix::WebSocketMessagePtr &msg) {
onMessage(msg); });
236 std::unique_lock<std::mutex> lock(
openMutex);
237 openCV.wait(lock, [&] {
return openState != OpenState::Pending; });
238 if (openState == OpenState::Failed)
239 throw std::runtime_error(
"RpcClient: failed to connect to " +
244 json result =
call(
"hello", {{
"client_protocol_version", 3}});
247 if (
auto it = result.find(
"esi_version"); it != result.end())
249 if (
auto it = result.find(
"compressed_manifest_b64"); it != result.end()) {
253 std::string err = macaron::Base64::Decode(it->get<std::string>(), raw);
255 throw std::runtime_error(
"RpcClient: invalid base64 in manifest blob: " +
257 manifest.assign(raw.begin(), raw.end());
261 auto channelsIt = result.find(
"channels");
262 if (channelsIt == result.end() || !channelsIt->is_array())
263 throw std::runtime_error(
"RpcClient: hello response missing channels");
264 for (
const json &c : *channelsIt) {
266 meta.
id = c.at(
"channel_id").get<uint64_t>();
267 meta.
name = c.at(
"name").get<std::string>();
268 meta.
type = c.at(
"type").get<std::string>();
269 std::string dir = c.at(
"direction").get<std::string>();
283 disconnecting =
true;
284 failAllPending(
"client shutdown");
289 std::lock_guard<std::mutex> lock(readCallbacksMutex);
290 for (
auto &[
id, entry] : readCallbacks)
291 entry->canceled.store(
true);
298 if (transportThread.joinable()) {
299 readyIds.requestShutdown();
300 transportThread.join();
310 case ix::WebSocketMessageType::Open: {
311 std::lock_guard<std::mutex> lock(openMutex);
312 openState = OpenState::Open;
316 case ix::WebSocketMessageType::Error: {
317 std::string reason = msg->errorInfo.reason;
319 std::lock_guard<std::mutex> lock(lastServerErrorMutex);
320 if (!lastServerError.empty())
321 reason = lastServerError;
324 std::lock_guard<std::mutex> lock(openMutex);
325 if (openState == OpenState::Pending) {
326 openState = OpenState::Failed;
331 failAllPending(
"websocket error: " + reason);
334 case ix::WebSocketMessageType::Close: {
335 std::string reason = msg->closeInfo.reason;
337 std::lock_guard<std::mutex> lock(lastServerErrorMutex);
338 if (!lastServerError.empty())
339 reason = lastServerError;
345 std::lock_guard<std::mutex> lock(openMutex);
346 if (openState == OpenState::Pending) {
347 openState = OpenState::Failed;
352 failAllPending(
"websocket closed: " + reason);
355 case ix::WebSocketMessageType::Message:
361 handleBinaryFrame(msg->str);
363 handleControlFrame(msg->str);
365 faultStash.record(std::current_exception());
376 resp = json::parse(text);
377 }
catch (
const std::exception &e) {
381 auto typeIt = resp.find(
"type");
382 if (typeIt == resp.end() || !typeIt->is_string())
384 std::string type = typeIt->get<std::string>();
390 if (type ==
"error") {
391 if (
auto errIt = resp.find(
"error"); errIt != resp.end()) {
392 std::string code = errIt->value(
"code", std::string(
"internal"));
393 std::string message = errIt->value(
"message", std::string());
394 std::lock_guard<std::mutex> lock(lastServerErrorMutex);
395 lastServerError =
"server error [" + code +
"]: " + message;
400 if (type !=
"response") {
405 logger.
debug(
"cosim",
406 "Ignoring control frame with unknown type \"" + type +
"\"");
409 auto idIt = resp.find(
"request_id");
410 if (idIt == resp.end())
412 uint64_t requestId = idIt->get<uint64_t>();
414 std::shared_ptr<std::promise<json>> promise;
416 std::lock_guard<std::mutex> lock(pendingMutex);
417 auto it = pending.find(requestId);
418 if (it == pending.end())
420 promise = it->second;
427 if (
auto errIt = resp.find(
"error"); errIt != resp.end()) {
428 std::string code = errIt->value(
"code", std::string(
"internal"));
429 std::string message = errIt->value(
"message", std::string());
430 promise->set_exception(std::make_exception_ptr(
431 std::runtime_error(
"Server error [" + code +
"]: " + message)));
432 }
else if (
auto resultIt = resp.find(
"result"); resultIt != resp.end()) {
433 promise->set_value(*resultIt);
438 promise->set_exception(std::make_exception_ptr(std::runtime_error(
439 "Server response missing both \"result\" and \"error\" fields "
440 "(protocol violation) for request_id=" +
441 std::to_string(requestId))));
443 }
catch (
const std::exception &e) {
447 logger.
debug(
"cosim",
448 std::string(
"failed to fulfill promise for request_id=") +
449 std::to_string(requestId) +
": " + e.what());
455 const uint8_t *payloadBytes;
460 std::shared_ptr<ReadCallbackEntry> entry;
462 std::lock_guard<std::mutex> lock(readCallbacksMutex);
463 auto it = readCallbacks.find(channelId);
464 if (it == readCallbacks.end()) {
469 logger.
warning(
"cosim",
"Dropping inbound frame for channel id " +
470 std::to_string(channelId) +
471 ": no active subscriber");
481 entry->queue.push(
MessageData(payloadBytes, payloadSize));
485 std::lock_guard<std::mutex> lock(pendingMutex);
486 for (
auto &[
id, pr] : pending) {
488 pr->set_exception(std::make_exception_ptr(std::runtime_error(reason)));
489 }
catch (
const std::exception &e) {
492 logger.
debug(
"cosim", std::string(
"failAllPending: request_id=") +
494 " already satisfied: " + e.what());
506 uint64_t requestId = nextRequestId.fetch_add(1);
507 std::future<json> future;
509 std::lock_guard<std::mutex> lock(pendingMutex);
510 auto promise = std::make_shared<std::promise<json>>();
511 future = promise->get_future();
512 pending.emplace(requestId, std::move(promise));
516 req[
"type"] =
"request";
517 req[
"request_id"] = requestId;
518 req[
"method"] = method;
519 req[
"params"] = std::move(params);
520 std::string text = req.dump();
525 auto info = ws.sendUtf8Text(text);
527 std::lock_guard<std::mutex> plock(pendingMutex);
528 pending.erase(requestId);
529 throw std::runtime_error(
"RpcClient: failed to send " + method);
536 constexpr auto kCallTimeout = std::chrono::seconds(30);
537 if (future.wait_for(kCallTimeout) != std::future_status::ready) {
541 std::lock_guard<std::mutex> plock(pendingMutex);
542 pending.erase(requestId);
543 throw std::runtime_error(
"RpcClient: timed out waiting for response to " +
552 auto it = channelsByName.find(channelName);
553 if (it == channelsByName.end())
554 throw std::runtime_error(
"Unknown channel '" + channelName +
"'");
556 throw std::runtime_error(
"Channel '" + channelName +
557 "' is not a to-server channel");
562 auto info = ws.sendBinary(frame);
564 throw std::runtime_error(
"Failed to send data on channel '" + channelName +
574 auto it = channelsByName.find(name);
575 if (it == channelsByName.end())
577 desc.
name = it->second.name;
578 desc.
type = it->second.type;
579 desc.
dir = it->second.direction;
584 std::vector<RpcClient::ChannelDesc> out;
585 out.reserve(channelsByName.size());
586 for (
const auto &[name, meta] : channelsByName) {
590 d.dir = meta.direction;
591 out.push_back(std::move(d));
600std::unique_ptr<RpcClient::ReadChannelConnection>
604 auto it = channelsByName.find(channelName);
605 if (it == channelsByName.end())
606 throw std::runtime_error(
"Unknown channel '" + channelName +
"'");
608 throw std::runtime_error(
"Channel '" + channelName +
609 "' is not a to-client channel");
610 uint64_t channelId = it->second.id;
615 std::lock_guard<std::mutex> lock(readCallbacksMutex);
616 auto entry = std::make_shared<ReadCallbackEntry>(
617 channelId, [
this, channelId] { readyIds.markReady(channelId); });
618 entry->callback = std::move(callback);
619 readCallbacks[channelId] = std::move(entry);
623 call(
"subscribe", {{
"channel_id", channelId}});
625 std::lock_guard<std::mutex> lock(readCallbacksMutex);
626 readCallbacks.erase(channelId);
630 return std::make_unique<ReadChannelConnectionImpl>(
this, channelId);
640 if (ws.getReadyState() != ix::ReadyState::Open)
642 call(
"unsubscribe", {{
"channel_id", channelId}});
646 std::shared_ptr<ReadCallbackEntry> entry;
648 std::lock_guard<std::mutex> lock(readCallbacksMutex);
649 auto it = readCallbacks.find(channelId);
650 if (it == readCallbacks.end())
653 readCallbacks.erase(it);
658 entry->canceled.store(
true);
661 readyIds.markReady(channelId);
673 std::unordered_set<uint64_t> retry;
676 std::unordered_set<uint64_t> ids;
677 if (!readyIds.waitDrain(
678 ids, retry.empty() ? std::optional<std::chrono::milliseconds>{}
679 : std::chrono::milliseconds(1)))
681 ids.insert(retry.begin(), retry.end());
684 for (uint64_t
id : ids) {
685 std::shared_ptr<ReadCallbackEntry> entry;
687 std::lock_guard<std::mutex> lock(readCallbacksMutex);
688 auto it = readCallbacks.find(
id);
689 if (it == readCallbacks.end())
694 if (entry->canceled.load() || disconnecting)
701 while (!stuck && !entry->canceled.load() && !disconnecting) {
705 std::unique_ptr<SegmentedMessageData> msg =
706 std::make_unique<MessageData>(data);
707 if (entry->callback(msg))
726 :
impl(std::make_unique<
Impl>(logger, hostname, port)) {}
733 return impl->getCompressedManifest();
738 return impl->getChannelDesc(channelName, desc);
742 return impl->listChannels();
747 impl->writeToServer(channelName, data);
750std::unique_ptr<RpcClient::ReadChannelConnection>
753 return impl->connectClientReceiver(channelName, std::move(callback));
std::vector< uint8_t > getCompressedManifest() const
void handleControlFrame(const std::string &text)
std::atomic< bool > disconnecting
void unsubscribe(uint64_t channelId)
uint32_t getEsiVersion() const
void failAllPending(const std::string &reason)
enum RpcClient::Impl::OpenState Pending
void unregisterCallback(uint64_t channelId)
std::unordered_map< uint64_t, std::shared_ptr< std::promise< json > > > pending
std::unordered_map< uint64_t, std::shared_ptr< ReadCallbackEntry > > readCallbacks
void onMessage(const ix::WebSocketMessagePtr &msg)
std::unordered_map< uint64_t, ChannelMeta > channelsById
void handleBinaryFrame(const std::string &data)
std::string lastServerError
std::condition_variable openCV
std::atomic< uint64_t > nextRequestId
std::unordered_map< std::string, ChannelMeta > channelsByName
std::thread transportThread
bool getChannelDesc(const std::string &name, RpcClient::ChannelDesc &desc) const
utils::ReadyIdSet< uint64_t > readyIds
std::mutex readCallbacksMutex
void writeToServer(const std::string &channelName, const MessageData &data)
std::vector< RpcClient::ChannelDesc > listChannels() const
::esi::cosim::FaultStash faultStash
std::unique_ptr< RpcClient::ReadChannelConnection > connectClientReceiver(const std::string &channelName, RpcClient::ReadCallback callback)
std::mutex lastServerErrorMutex
json call(const std::string &method, json params)
Issue a JSON-RPC style request and synchronously await the response.
std::vector< uint8_t > manifest
virtual void warning(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report a warning.
void debug(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Report a debug message.
A concrete flat message backed by a single vector of bytes.
Abstract handle for a read channel connection.
virtual void disconnect()=0
std::function< bool(std::unique_ptr< SegmentedMessageData > &)> ReadCallback
Callback type for receiving messages from a client-bound channel.
ChannelDirection
Channel direction as reported by the server.
std::unique_ptr< ReadChannelConnection > connectClientReceiver(const std::string &channelName, ReadCallback callback)
Connect to a client-bound channel and receive messages via callback.
RpcClient(Logger &logger, const std::string &hostname, uint16_t port)
std::vector< uint8_t > getCompressedManifest() const
Get the compressed manifest from the server.
uint32_t getEsiVersion() const
Get the ESI version from the manifest.
void writeToServer(const std::string &channelName, const MessageData &data)
Send a message to a server-bound channel.
bool getChannelDesc(const std::string &channelName, ChannelDesc &desc) const
Get the channel description for a channel name.
std::vector< ChannelDesc > listChannels() const
List all channels available on the server.
std::unique_ptr< Impl > impl
Cross-thread error channel for the IXWebSocket network thread.
void handleControlFrame(ix::WebSocket &ws, const std::string &text)
Impl(Context &ctxt, int port)
void handleBinaryFrame(const std::string &data)
Multi-producer / single-consumer dirty-set of channel ids, with CV-style blocking drain semantics.
std::string buildDataFrame(uint64_t channelId, const uint8_t *bytes, size_t size)
Pack a cosim binary data frame: [u64 LE channel_id][payload].
bool parseDataFrame(const std::string &data, uint64_t &channelId, const uint8_t *&payload, size_t &payloadSize)
Parse a cosim binary data frame.
std::atomic< bool > canceled
ReadCallbackEntry(uint64_t channelId, std::function< void()> notifier)
utils::TSQueue< MessageData > queue
RpcClient::ReadCallback callback
Description of a channel from the server.