29using PortMap = std::map<std::string, ChannelPort &>;
234 "Cannot call write() with pending translated messages");
248 virtual void write(std::unique_ptr<SegmentedMessageData> msg) {
250 throw std::runtime_error(
251 "WriteChannelPort::write: null SegmentedMessageData");
252 write(msg->toMessageData());
267 "Translation buffer should be empty after successful flush");
323 throw std::runtime_error(
errmsg);
328 throw std::runtime_error(
errmsg);
331 throw std::runtime_error(
errmsg);
363 const ConnectOptions &options = {});
377 virtual std::future<MessageData>
readAsync();
382 std::future<MessageData> f =
readAsync();
384 outData = std::move(f.get());
440 throw std::runtime_error(
errmsg);
443 throw std::runtime_error(
errmsg);
446 throw std::runtime_error(
errmsg);
483 template <
typename T>
485 return const_cast<T *
>(
dynamic_cast<const T *
>(
this));
493 result |= channel.second.poll();
assert(baseType &&"element must be base type")
Services provide connections to 'bundles' – collections of named, unidirectional communication channe...
virtual ~BundlePort()=default
T * getAs() const
Cast this Bundle port to a subclass which is actually useful.
ReadChannelPort & getRawRead(const std::string &name) const
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
const PortMap & getChannels() const
bool poll()
Calls poll on all channels in the bundle and returns true if any of them returned true.
static bool isWrite(BundleType::Direction bundleDir)
Compute the direction of a channel given the bundle direction and the bundle port's direction.
AppID getID() const
Get the ID of the port.
Bundles represent a collection of channels.
Unidirectional channels are the basic communication primitive between the host and accelerator.
const Type * getType() const
virtual void connect(const ConnectOptions &options=ConnectOptions())=0
Set up a connection to the accelerator.
virtual void disconnect()=0
virtual bool pollImpl()
Method called by poll() to actually poll the channel if the channel is connected.
size_t getFrameSizeBytes() const
Get the size of each frame in bytes.
virtual void connectImpl(const ConnectOptions &options)
Called by all connect methods to let backends initiate the underlying connections.
ChannelPort(const Type *type)
std::unique_ptr< TranslationInfo > translationInfo
bool poll()
Poll for incoming data.
virtual bool isConnected() const =0
A logical chunk of data representing serialized data.
A ChannelPort which reads data from the accelerator.
virtual std::future< MessageData > readAsync()
Asynchronous read.
size_t nextFrameIndex
Index of the next expected frame (for multi-frame windows).
virtual bool isConnected() const override
virtual void connect(std::function< bool(MessageData)> callback, const ConnectOptions &options={})
std::vector< uint8_t > translationBuffer
Window translation support.
std::mutex pollingM
Mutex to protect the two queues used for polling.
bool accumulatingListData
Flag to track whether we're in the middle of accumulating list data.
void resetTranslationState()
Reset translation state buffers and indices.
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Mode
Indicates the current mode of the channel.
std::queue< MessageData > dataQueue
Store incoming data here if there are no outstanding promises to be fulfilled.
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
bool translateIncoming(MessageData &data)
Translate incoming data if the port type is a window type.
std::queue< std::promise< MessageData > > promiseQueue
Promises to be fulfilled when data is available.
void setMaxDataQueueMsgs(uint64_t maxMsgs)
Set maximum number of messages to store in the dataQueue.
uint64_t maxDataQueueMsgs
Maximum number of messages to store in dataQueue. 0 means no limit.
virtual void disconnect() override
std::vector< uint8_t > listDataBuffer
For list fields: accumulated list data across frames.
virtual void read(MessageData &outData)
Specify a buffer to read into.
ReadChannelPort(const Type *type)
Root class of the ESI type system.
virtual std::ptrdiff_t getBitWidth() const
Instantiated when a backend does not know how to create a read channel.
void connect(std::function< bool(MessageData)> callback, const ConnectOptions &options=ConnectOptions()) override
void connect(const ConnectOptions &options=ConnectOptions()) override
Connect to the channel in polling mode.
std::future< MessageData > readAsync() override
Asynchronous read.
UnknownReadChannelPort(const Type *type, std::string errmsg)
Instantiated when a backend does not know how to create a write channel.
void connect(const ConnectOptions &options={}) override
Set up a connection to the accelerator.
void writeImpl(const MessageData &) override
Implementation for write(). Subclasses must implement this.
UnknownWriteChannelPort(const Type *type, std::string errmsg)
bool tryWriteImpl(const MessageData &) override
Implementation for tryWrite(). Subclasses must implement this.
Windows represent a fixed-size sliding window over a stream of data.
A ChannelPort which sends data to the accelerator.
virtual void write(std::unique_ptr< SegmentedMessageData > msg)
Write a multi-segment message.
virtual bool isConnected() const override
virtual void disconnect() override
size_t translationBufferIdx
Index of the next message to write in translationBuffer.
virtual bool tryWriteImpl(const MessageData &data)=0
Implementation for tryWrite(). Subclasses must implement this.
bool translateMessages
Whether to translate outgoing data if the port type is a window type.
void write(const MessageData &data)
A very basic blocking write API.
bool flush()
Flush any buffered data.
bool tryWrite(const MessageData &data)
A basic non-blocking write API.
virtual void connect(const ConnectOptions &options={}) override
Set up a connection to the accelerator.
void translateOutgoing(const MessageData &data)
Translate outgoing data if the port type is a window type.
std::vector< MessageData > getMessageFrames(const MessageData &data)
Break a message into its frames.
std::vector< MessageData > translationBuffer
If tryWrite cannot write all the messages of a windowed type at once, it stores them here and writes ...
virtual void writeImpl(const MessageData &)=0
Implementation for write(). Subclasses must implement this.
uint64_t bitsToBytes(uint64_t bits)
Compute ceil(bits/8).
std::map< std::string, ChannelPort & > PortMap
ConnectOptions(std::optional< unsigned > bufferSize=std::nullopt, bool translateMessage=true)
std::optional< unsigned > bufferSize
The buffer size is optional and should be considered merely a hint.
bool translateMessage
If the type of this port is a window, translate the incoming/outgoing data into its underlying ('into...
A copy operation for translating between frame data and the translation.
size_t bufferOffset
Offset in the translation buffer.
size_t frameOffset
Offset in the incoming/outgoing frame data.
size_t size
Number of bytes to copy.
Information about each frame in the windowed type.
std::vector< CopyOp > copyOps
Precomputed copy operations for translating this frame (non-list fields).
std::optional< ListFieldInfo > listField
Information about list fields in this frame (parallel encoding).
size_t expectedSize
The total size of a frame in bytes.
Information about a list field within a frame (for parallel encoding).
size_t listLengthBufferOffset
Offset in the translation buffer where list length is stored.
size_t dataOffset
Offset of the list data array in the frame.
std::string fieldName
Name of the list field.
size_t lastFieldOffset
Offset of the 'last' field in the frame.
size_t elementSize
Size of each list element in bytes.
size_t listDataBufferOffset
Offset in the translation buffer where list data starts.
Instructions for translating windowed types.
void precomputeFrameInfo()
Precompute and optimize the copy operations for translating frames.
TranslationInfo(const WindowType *windowType)
bool hasListField
True if the window contains a list field (variable-size message).
const WindowType * windowType
The window type being translated.
size_t intoTypeBytes
Size of the 'into' type in bytes (for fixed-size types).
size_t frameBytes
Number of bytes per wire frame (from the lowered type's bit width).
std::vector< FrameInfo > frames
Precomputed information about each frame.