23 : id(id), channels(channels) {}
28 throw std::runtime_error(
"Channel '" + name +
"' not found");
31 throw std::runtime_error(
"Channel '" + name +
"' is not a write channel");
38 throw std::runtime_error(
"Channel '" + name +
"' not found");
41 throw std::runtime_error(
"Channel '" + name +
"' is not a read channel");
45 std::optional<unsigned> bufferSize) {
46 if (
mode != Mode::Disconnected)
47 throw std::runtime_error(
"Channel already connected");
48 mode = Mode::Callback;
57 std::scoped_lock<std::mutex> lock(
pollingM);
59 "Both queues are in use.");
63 std::promise<MessageData> p = std::move(
promiseQueue.front());
65 p.set_value(std::move(data));
78 if (
mode == Mode::Callback)
79 throw std::runtime_error(
80 "Cannot read from a callback channel. `connect()` without a callback "
81 "specified to use polling mode.");
83 std::scoped_lock<std::mutex> lock(
pollingM);
85 "Both queues are in use.");
89 std::promise<MessageData> p;
90 std::future<MessageData> f = p.get_future();
91 p.set_value(std::move(
dataQueue.front()));
assert(baseType &&"element must be base type")
std::map< std::string, ChannelPort & > channels
ReadChannelPort & getRawRead(const std::string &name) const
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
BundlePort(AppID id, std::map< std::string, ChannelPort & > channels)
Construct a port.
virtual void connectImpl(std::optional< unsigned > bufferSize)
Called by all connect methods to let backends initiate the underlying connections.
A logical chunk of data representing serialized data.
A ChannelPort which reads data from the accelerator.
virtual std::future< MessageData > readAsync()
Asynchronous read.
std::mutex pollingM
Mutex to protect the two queues used for polling.
virtual void connect(std::function< bool(MessageData)> callback, std::optional< unsigned > bufferSize=std::nullopt)
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
std::queue< MessageData > dataQueue
Store incoming data here if there are no outstanding promises to be fulfilled.
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
std::queue< std::promise< MessageData > > promiseQueue
Promises to be fulfilled when data is available.
uint64_t maxDataQueueMsgs
Maximum number of messages to store in dataQueue. 0 means no limit.
A ChannelPort which sends data to the accelerator.