25#include <condition_variable>
33using PortMap = std::map<std::string, ChannelPort &>;
42template <
typename BufferedT>
49 std::scoped_lock<std::mutex> lock(
mutex);
55 std::scoped_lock<std::mutex> lock(
mutex);
63 std::scoped_lock<std::mutex> lock(
mutex);
65 "Both queues are in use.");
68 std::promise<BufferedT> promise = std::move(
promiseQueue.front());
70 promise.set_value(std::move(value));
85 std::scoped_lock<std::mutex> lock(
mutex);
87 "Both queues are in use.");
90 std::promise<BufferedT> promise;
91 std::future<BufferedT> future = promise.get_future();
92 promise.set_value(std::move(
dataQueue.front()));
217 return std::max<size_t>(1, bytes);
330 "Cannot call write() with pending translated messages");
344 virtual void write(std::unique_ptr<SegmentedMessageData> msg) {
346 throw std::runtime_error(
347 "WriteChannelPort::write: null SegmentedMessageData");
348 write(msg->toMessageData());
363 "Translation buffer should be empty after successful flush");
435 throw std::runtime_error(
errmsg);
440 throw std::runtime_error(
errmsg);
443 throw std::runtime_error(
errmsg);
462 std::function<bool(std::unique_ptr<SegmentedMessageData> &)>;
491 const ConnectOptions &options = {});
496 const ConnectOptions &options = {});
513 virtual std::future<MessageData>
readAsync();
518 std::future<MessageData> f =
readAsync();
520 outData = std::move(f.get());
591 throw std::runtime_error(
errmsg);
595 throw std::runtime_error(
errmsg);
598 throw std::runtime_error(
errmsg);
601 throw std::runtime_error(
errmsg);
638 template <
typename T>
640 return const_cast<T *
>(
dynamic_cast<const T *
>(
this));
648 result |= channel.second.poll();
assert(baseType &&"element must be base type")
Services provide connections to 'bundles' – collections of named, unidirectional communication channe...
virtual ~BundlePort()=default
T * getAs() const
Cast this Bundle port to a subclass which is actually useful.
ReadChannelPort & getRawRead(const std::string &name) const
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
const PortMap & getChannels() const
bool poll()
Calls poll on all channels in the bundle and returns true if any of them returned true.
static bool isWrite(BundleType::Direction bundleDir)
Compute the direction of a channel given the bundle direction and the bundle port's direction.
AppID getID() const
Get the ID of the port.
Bundles represent a collection of channels.
Unidirectional channels are the basic communication primitive between the host and accelerator.
const Type * getType() const
virtual void connect(const ConnectOptions &options=ConnectOptions())=0
Set up a connection to the accelerator.
virtual void disconnect()=0
const WindowType * getWindowType() const
If this port carries a windowed type, return the original WindowType (whose intoType is what getType(...
virtual bool pollImpl()
Method called by poll() to actually poll the channel if the channel is connected.
size_t getFrameSizeBytes() const
Get the size of each frame in bytes.
virtual void connectImpl(const ConnectOptions &options)
Called by all connect methods to let backends initiate the underlying connections.
ChannelPort(const Type *type)
std::unique_ptr< TranslationInfo > translationInfo
bool poll()
Poll for incoming data.
virtual bool isConnected() const =0
A concrete flat message backed by a single vector of bytes.
A ChannelPort which reads data from the accelerator.
std::mutex callbackMutex
Synchronizes callback revocation during disconnect.
virtual void connect(ReadCallback callback, const ConnectOptions &options={})
virtual std::future< MessageData > readAsync()
Asynchronous polling read.
size_t nextFrameIndex
Index of the next expected frame (for multi-frame windows).
virtual bool isConnected() const override
std::vector< uint8_t > translationBuffer
Window translation support.
bool accumulatingListData
Flag to track whether we're in the middle of accumulating list data.
void resetTranslationState()
Reset translation state buffers and indices.
Mode
Indicates the current mode of the channel.
std::optional< detail::PollingBuffer< MessageData > > pollingState
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
bool translateIncoming(MessageData &data)
Translate incoming data if the port type is a window type.
virtual void disconnect() override
Disconnect the channel.
void setMaxDataQueueMsgs(uint64_t maxMsgs)
Set maximum number of messages to store in the dataQueue.
std::unique_ptr< SegmentedMessageData > translatedMessage
If a translated message has been assembled but not yet consumed, retain ownership here so retries pre...
uint64_t maxDataQueueMsgs
std::function< bool(MessageData)> FlatReadCallback
Compatibility callback API for callers which want flattened message bytes instead of the owning segme...
ReadCallback callback
Backends should not call this directly.
std::vector< uint8_t > listDataBuffer
For list fields: accumulated list data across frames.
bool invokeCallback(std::unique_ptr< SegmentedMessageData > &msg)
Invoke the currently registered callback.
std::function< bool(std::unique_ptr< SegmentedMessageData > &)> ReadCallback
Primary callback API for raw reads.
virtual void read(MessageData &outData)
Specify a buffer to read into.
std::condition_variable callbackCv
ReadChannelPort(const Type *type)
Root class of the ESI type system.
virtual std::ptrdiff_t getBitWidth() const
Instantiated when a backend does not know how to create a read channel.
void connect(ReadCallback callback, const ConnectOptions &options=ConnectOptions()) override
void connect(const ConnectOptions &options=ConnectOptions()) override
Connect to the channel in polling mode.
std::future< MessageData > readAsync() override
Asynchronous polling read.
UnknownReadChannelPort(const Type *type, std::string errmsg)
void connect(FlatReadCallback callback, const ConnectOptions &options=ConnectOptions()) override
Connect a compatibility callback which receives flattened MessageData objects.
Instantiated when a backend does not know how to create a write channel.
void connect(const ConnectOptions &options={}) override
Set up a connection to the accelerator.
void writeImpl(const MessageData &) override
Implementation for write(). Subclasses must implement this.
UnknownWriteChannelPort(const Type *type, std::string errmsg)
bool tryWriteImpl(const MessageData &) override
Implementation for tryWrite(). Subclasses must implement this.
Windows represent a fixed-size sliding window over a stream of data.
A ChannelPort which sends data to the accelerator.
virtual void write(std::unique_ptr< SegmentedMessageData > msg)
Write a multi-segment message.
virtual bool isConnected() const override
virtual void disconnect() override
size_t translationBufferIdx
Index of the next message to write in translationBuffer.
virtual bool tryWriteImpl(const MessageData &data)=0
Implementation for tryWrite(). Subclasses must implement this.
bool translateMessages
Whether to translate outgoing data if the port type is a window type.
void write(const MessageData &data)
A very basic blocking write API.
static const MessageData transportPad0Bytes
Backing storage for maybePadEmptyMessage return so that the returned reference outlives the call.
const MessageData & maybePadEmptyMessage(const MessageData &data)
Pad an empty payload with a single zero byte if the port type has a zero-width logical width (e....
bool flush()
Flush any buffered data.
bool tryWrite(const MessageData &data)
A basic non-blocking write API.
virtual void connect(const ConnectOptions &options={}) override
Set up a connection to the accelerator.
void translateOutgoing(const MessageData &data)
Translate outgoing data if the port type is a window type.
std::vector< MessageData > getMessageFrames(const MessageData &data)
Break a message into its frames.
std::vector< MessageData > translationBuffer
If tryWrite cannot write all the messages of a windowed type at once, it stores them here and writes ...
virtual void writeImpl(const MessageData &)=0
Implementation for write(). Subclasses must implement this.
Shared queue/promise helper for polling-style read APIs.
PollingBuffer(uint64_t maxQueued)
std::future< BufferedT > readAsync()
Read the next value asynchronously.
uint64_t getMaxQueued()
Return the current bounded queue size. 0 means unbounded.
bool enqueue(BufferedT &value)
Try to deliver or enqueue a produced value.
void setMaxQueued(uint64_t maxMsgs)
Update the bounded queue size. 0 means unbounded.
std::queue< BufferedT > dataQueue
std::queue< std::promise< BufferedT > > promiseQueue
uint64_t bitsToBytes(uint64_t bits)
Compute ceil(bits/8).
std::map< std::string, ChannelPort & > PortMap
ConnectOptions(std::optional< unsigned > bufferSize=std::nullopt, bool translateMessage=true)
std::optional< unsigned > bufferSize
The buffer size is optional and should be considered merely a hint.
bool translateMessage
If the type of this port is a window, translate the incoming/outgoing data into its underlying ('into...
A copy operation for translating between frame data and the translation.
size_t bufferOffset
Offset in the translation buffer.
size_t frameOffset
Offset in the incoming/outgoing frame data.
size_t size
Number of bytes to copy.
Information about each frame in the windowed type.
std::vector< CopyOp > copyOps
Precomputed copy operations for translating this frame (non-list fields).
std::optional< ListFieldInfo > listField
Information about list fields in this frame (parallel encoding).
size_t expectedSize
The total size of a frame in bytes.
Information about a list field within a frame (for parallel encoding).
size_t listLengthBufferOffset
Offset in the translation buffer where list length is stored.
size_t dataOffset
Offset of the list data array in the frame.
std::string fieldName
Name of the list field.
size_t lastFieldOffset
Offset of the 'last' field in the frame.
size_t elementSize
Size of each list element in bytes.
size_t listDataBufferOffset
Offset in the translation buffer where list data starts.
Instructions for translating windowed types.
void precomputeFrameInfo()
Precompute and optimize the copy operations for translating frames.
void requireTranslationSupported() const
Throw if this window cannot be translated by the runtime translator (e.g.
TranslationInfo(const WindowType *windowType)
bool hasListField
True if the window contains a list field (variable-size message).
const WindowType * windowType
The window type being translated.
size_t intoTypeBytes
Size of the 'into' type in bytes (for fixed-size types).
size_t frameBytes
Number of bytes per wire frame (from the lowered type's bit width).
std::vector< FrameInfo > frames
Precomputed information about each frame.