29using PortMap = std::map<std::string, ChannelPort &>;
225 "Cannot call write() with pending translated messages");
247 "Translation buffer should be empty after successful flush");
300 throw std::runtime_error(
errmsg);
305 throw std::runtime_error(
errmsg);
308 throw std::runtime_error(
errmsg);
340 const ConnectOptions &options = {});
354 virtual std::future<MessageData>
readAsync();
359 std::future<MessageData> f =
readAsync();
361 outData = std::move(f.get());
417 throw std::runtime_error(
errmsg);
420 throw std::runtime_error(
errmsg);
423 throw std::runtime_error(
errmsg);
460 template <
typename T>
462 return const_cast<T *
>(
dynamic_cast<const T *
>(
this));
470 result |= channel.second.poll();
assert(baseType &&"element must be base type")
Services provide connections to 'bundles' – collections of named, unidirectional communication channe...
virtual ~BundlePort()=default
T * getAs() const
Cast this Bundle port to a subclass which is actually useful.
ReadChannelPort & getRawRead(const std::string &name) const
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
const PortMap & getChannels() const
bool poll()
Calls poll on all channels in the bundle and returns true if any of them returned true.
static bool isWrite(BundleType::Direction bundleDir)
Compute the direction of a channel given the bundle direction and the bundle port's direction.
AppID getID() const
Get the ID of the port.
Bundles represent a collection of channels.
Unidirectional channels are the basic communication primitive between the host and accelerator.
const Type * getType() const
virtual void connect(const ConnectOptions &options=ConnectOptions())=0
Set up a connection to the accelerator.
virtual void disconnect()=0
virtual bool pollImpl()
Method called by poll() to actually poll the channel if the channel is connected.
virtual void connectImpl(const ConnectOptions &options)
Called by all connect methods to let backends initiate the underlying connections.
ChannelPort(const Type *type)
std::unique_ptr< TranslationInfo > translationInfo
bool poll()
Poll for incoming data.
virtual bool isConnected() const =0
A logical chunk of data representing serialized data.
A ChannelPort which reads data from the accelerator.
virtual std::future< MessageData > readAsync()
Asynchronous read.
size_t nextFrameIndex
Index of the next expected frame (for multi-frame windows).
virtual bool isConnected() const override
virtual void connect(std::function< bool(MessageData)> callback, const ConnectOptions &options={})
std::vector< uint8_t > translationBuffer
Window translation support.
std::mutex pollingM
Mutex to protect the two queues used for polling.
bool accumulatingListData
Flag to track whether we're in the middle of accumulating list data.
void resetTranslationState()
Reset translation state buffers and indices.
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Mode
Indicates the current mode of the channel.
std::queue< MessageData > dataQueue
Store incoming data here if there are no outstanding promises to be fulfilled.
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
bool translateIncoming(MessageData &data)
Translate incoming data if the port type is a window type.
std::queue< std::promise< MessageData > > promiseQueue
Promises to be fulfilled when data is available.
void setMaxDataQueueMsgs(uint64_t maxMsgs)
Set maximum number of messages to store in the dataQueue.
uint64_t maxDataQueueMsgs
Maximum number of messages to store in dataQueue. 0 means no limit.
virtual void disconnect() override
std::vector< uint8_t > listDataBuffer
For list fields: accumulated list data across frames.
virtual void read(MessageData &outData)
Specify a buffer to read into.
ReadChannelPort(const Type *type)
Root class of the ESI type system.
Instantiated when a backend does not know how to create a read channel.
void connect(std::function< bool(MessageData)> callback, const ConnectOptions &options=ConnectOptions()) override
void connect(const ConnectOptions &options=ConnectOptions()) override
Connect to the channel in polling mode.
std::future< MessageData > readAsync() override
Asynchronous read.
UnknownReadChannelPort(const Type *type, std::string errmsg)
Instantiated when a backend does not know how to create a write channel.
void connect(const ConnectOptions &options={}) override
Set up a connection to the accelerator.
void writeImpl(const MessageData &) override
Implementation for write(). Subclasses must implement this.
UnknownWriteChannelPort(const Type *type, std::string errmsg)
bool tryWriteImpl(const MessageData &) override
Implementation for tryWrite(). Subclasses must implement this.
Windows represent a fixed-size sliding window over a stream of data.
A ChannelPort which sends data to the accelerator.
virtual bool isConnected() const override
virtual void disconnect() override
size_t translationBufferIdx
Index of the next message to write in translationBuffer.
virtual bool tryWriteImpl(const MessageData &data)=0
Implementation for tryWrite(). Subclasses must implement this.
bool translateMessages
Whether to translate outgoing data if the port type is a window type.
void write(const MessageData &data)
A very basic blocking write API.
bool flush()
Flush any buffered data.
bool tryWrite(const MessageData &data)
A basic non-blocking write API.
virtual void connect(const ConnectOptions &options={}) override
Set up a connection to the accelerator.
void translateOutgoing(const MessageData &data)
Translate outgoing data if the port type is a window type.
std::vector< MessageData > translationBuffer
If tryWrite cannot write all the messages of a windowed type at once, it stores them here and writes ...
virtual void writeImpl(const MessageData &)=0
Implementation for write(). Subclasses must implement this.
std::map< std::string, ChannelPort & > PortMap
ConnectOptions(std::optional< unsigned > bufferSize=std::nullopt, bool translateMessage=true)
std::optional< unsigned > bufferSize
The buffer size is optional and should be considered merely a hint.
bool translateMessage
If the type of this port is a window, translate the incoming/outgoing data into its underlying ('into...
A copy operation for translating between frame data and the translation.
size_t bufferOffset
Offset in the translation buffer.
size_t frameOffset
Offset in the incoming/outgoing frame data.
size_t size
Number of bytes to copy.
Information about each frame in the windowed type.
std::vector< CopyOp > copyOps
Precomputed copy operations for translating this frame (non-list fields).
std::optional< ListFieldInfo > listField
Information about list fields in this frame (parallel encoding).
size_t expectedSize
The total size of a frame in bytes.
Information about a list field within a frame (for parallel encoding).
size_t listLengthBufferOffset
Offset in the translation buffer where list length is stored.
size_t dataOffset
Offset of the list data array in the frame.
std::string fieldName
Name of the list field.
size_t lastFieldOffset
Offset of the 'last' field in the frame.
size_t elementSize
Size of each list element in bytes.
size_t listDataBufferOffset
Offset in the translation buffer where list data starts.
Instructions for translating windowed types.
void precomputeFrameInfo()
Precompute and optimize the copy operations for translating frames.
TranslationInfo(const WindowType *windowType)
bool hasListField
True if the window contains a list field (variable-size message).
const WindowType * windowType
The window type being translated.
size_t intoTypeBytes
Size of the 'into' type in bytes (for fixed-size types).
std::vector< FrameInfo > frames
Precomputed information about each frame.