CIRCT 23.0.0git
Loading...
Searching...
No Matches
Ports.h
Go to the documentation of this file.
1//===- Ports.h - ESI communication channels ---------------------*- C++ -*-===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// DO NOT EDIT!
10// This file is distributed as part of an ESI package. The source for this file
11// should always be modified within CIRCT.
12//
13//===----------------------------------------------------------------------===//
14
15// NOLINTNEXTLINE(llvm-header-guard)
16#ifndef ESI_PORTS_H
17#define ESI_PORTS_H
18
19#include "esi/Common.h"
20#include "esi/Types.h"
21#include "esi/Utils.h"
22
23#include <algorithm>
24#include <cassert>
25#include <condition_variable>
26#include <future>
27#include <mutex>
28#include <queue>
29
30namespace esi {
31
32class ChannelPort;
33using PortMap = std::map<std::string, ChannelPort &>;
34
35namespace detail {
36
37/// Shared queue/promise helper for polling-style read APIs.
38///
39/// Producers either fulfill the oldest waiting reader immediately or enqueue
40/// the value for a later `readAsync()`. Consumers either receive buffered data
41/// immediately or install a promise which is fulfilled by the next enqueue.
42template <typename BufferedT>
44public:
45 explicit PollingBuffer(uint64_t maxQueued) : maxQueued(maxQueued) {}
46
47 /// Return the current bounded queue size. `0` means unbounded.
48 uint64_t getMaxQueued() {
49 std::scoped_lock<std::mutex> lock(mutex);
50 return maxQueued;
51 }
52
53 /// Update the bounded queue size. `0` means unbounded.
54 void setMaxQueued(uint64_t maxMsgs) {
55 std::scoped_lock<std::mutex> lock(mutex);
56 maxQueued = maxMsgs;
57 }
58
59 /// Try to deliver or enqueue a produced value.
60 ///
61 /// Returns `false` only when the bounded queue is full.
62 bool enqueue(BufferedT &value) {
63 std::scoped_lock<std::mutex> lock(mutex);
64 assert(!(!promiseQueue.empty() && !dataQueue.empty()) &&
65 "Both queues are in use.");
66
67 if (!promiseQueue.empty()) {
68 std::promise<BufferedT> promise = std::move(promiseQueue.front());
69 promiseQueue.pop();
70 promise.set_value(std::move(value));
71 } else {
72 if (dataQueue.size() >= maxQueued && maxQueued != 0)
73 return false;
74 dataQueue.push(std::move(value));
75 }
76 return true;
77 }
78
79 /// Read the next value asynchronously.
80 ///
81 /// If a value is already buffered, the returned future is ready
82 /// immediately. Otherwise the future is backed by an internal promise which
83 /// is fulfilled by a later `enqueue()`.
84 std::future<BufferedT> readAsync() {
85 std::scoped_lock<std::mutex> lock(mutex);
86 assert(!(!promiseQueue.empty() && !dataQueue.empty()) &&
87 "Both queues are in use.");
88
89 if (!dataQueue.empty()) {
90 std::promise<BufferedT> promise;
91 std::future<BufferedT> future = promise.get_future();
92 promise.set_value(std::move(dataQueue.front()));
93 dataQueue.pop();
94 return future;
95 }
96
97 promiseQueue.emplace();
98 return promiseQueue.back().get_future();
99 }
100
101private:
102 std::mutex mutex;
103 std::queue<BufferedT> dataQueue;
104 uint64_t maxQueued;
105 std::queue<std::promise<BufferedT>> promiseQueue;
106};
107
108} // namespace detail
109
110/// Unidirectional channels are the basic communication primitive between the
111/// host and accelerator. A 'ChannelPort' is the host side of a channel. It can
112/// be either read or write but not both. At this level, channels are untyped --
113/// just streams of bytes. They are not intended to be used directly by users
114/// but used by higher level APIs which add types.
116public:
117 ChannelPort(const Type *type);
118 virtual ~ChannelPort() {}
119
121 /// The buffer size is optional and should be considered merely a hint.
122 /// Individual implementations use it however they like. The unit is number
123 /// of messages of the port type.
124 std::optional<unsigned> bufferSize = std::nullopt;
125
126 /// If the type of this port is a window, translate the incoming/outgoing
127 /// data into its underlying ('into') type. For 'into' types without lists,
128 /// just re-arranges the data fields from the lowered type to the 'into'
129 /// type.
130 ///
131 /// If this option is false, no translation is done and the data is
132 /// passed through as-is. Same is true for non-windowed types.
133 ///
134 /// For messages with lists, only two types are supported:
135 /// 1) Parallel encoding includes any 'header' data with each frame. Said
136 /// header data is the same across all frames, so this encoding is
137 /// inefficient but is commonly used for on-chip streaming interfaces.
138 /// Each frame contains a 'last' field to indicate the end of the list.
139 /// In cases where 'numItems' is greater than 1, a field named
140 /// '<listField>_size' indicates the number of valid items in that
141 /// frame.
142 /// 2) Serial (bulk transfer) encoding, where a 'header' frame precedes
143 /// the list data frame. Said header frame contains a 'count' field
144 /// indicating the number of items in the list. Importantly, the
145 /// header frame is always re-transmitted after the specified number of
146 /// list items have been sent. If the 'count' field is zero, the end of
147 /// the list has been reached. If it is non-zero, the message has not
148 /// been completely transmitted and reading should continue until a
149 /// 'count' of zero is received.
150 ///
151 /// In both cases, the host-side MessageData contains the complete header
152 /// followed by the list data. In other words, header data is not duplicated
153 /// in the returned message. So for a windowed type with header fields and
154 /// a list of (x,y) coordinates, the host memory layout would be:
155 /// ```
156 /// struct ExampleList {
157 /// uint32_t headerField2; // SystemVerilog ordering
158 /// uint32_t headerField1;
159 /// size_t list_length; // Number list items
160 /// struct { uint16_t y, x; } list_data[];
161 /// }
162 /// ```
163 ///
164 /// In a parallel encoding, each frame's wire format (from hardware) is:
165 /// ```
166 /// struct ExampleListFrame {
167 /// uint8_t list_last; // Non-zero indicates last item in list
168 /// struct { uint16_t y, x; } list_data[numItems]; // SV field ordering
169 /// uint32_t headerField2; // SV struct ordering (reversed)
170 /// uint32_t headerField1;
171 /// }
172 /// ```
173 ///
174 /// Important note: for consistency, preserves SystemVerilog struct field
175 /// ordering! So it's the opposite of C struct ordering.
176 ///
177 /// Implementation status:
178 /// - Only parallel list encoding is supported.
179 /// - Fields must be byte-aligned.
180 ///
181 /// See the CIRCT documentation (or td files) for more details on windowed
182 /// messages.
183 bool translateMessage = true;
184
185 ConnectOptions(std::optional<unsigned> bufferSize = std::nullopt,
186 bool translateMessage = true)
188 };
189
190 /// Set up a connection to the accelerator.
191 virtual void connect(const ConnectOptions &options = ConnectOptions()) = 0;
192 virtual void disconnect() = 0;
193 virtual bool isConnected() const = 0;
194
195 /// Poll for incoming data. Returns true if data was read or written into a
196 /// buffer as a result of the poll. Calling the call back could (will) also
197 /// happen in that case. Some backends need this to be called periodically. In
198 /// the usual case, this will be called by a background thread, but the ESI
199 /// runtime does not want to assume that the host processes use standard
200 /// threads. If the user wants to provide their own threads, they need to call
201 /// this on each port occasionally. This is also called from the 'master' poll
202 /// method in the Accelerator class.
203 bool poll() {
204 if (isConnected())
205 return pollImpl();
206 return false;
207 }
208
209 /// Get the size of each frame in bytes. For windowed types, this is the
210 /// lowered type's width; otherwise, the port type's width. Transports
211 /// (DMA engines, cosim, etc.) require every message to be at least one
212 /// byte, so void / zero-width port types are reported as 1 byte here; the
213 /// transport pads or strips that placeholder byte transparently.
214 size_t getFrameSizeBytes() const {
215 size_t bytes = translationInfo ? translationInfo->frameBytes
217 return std::max<size_t>(1, bytes);
218 }
219 const Type *getType() const { return type; }
220
221 /// If this port carries a windowed type, return the original WindowType
222 /// (whose `intoType` is what `getType()` returns). Returns nullptr for
223 /// non-windowed ports.
224 const WindowType *getWindowType() const {
225 return translationInfo ? translationInfo->windowType : nullptr;
226 }
227
228protected:
229 const Type *type;
230
231 /// Instructions for translating windowed types. Precomputes and optimizes a
232 /// list of copy operations.
235
236 /// Precompute and optimize the copy operations for translating frames.
237 void precomputeFrameInfo();
238
239 /// Throw if this window cannot be translated by the runtime translator
240 /// (e.g. uses serial/bulk list encoding which is not yet supported).
241 void requireTranslationSupported() const;
242
243 /// The window type being translated.
245
246 /// A copy operation for translating between frame data and the translation.
247 /// Run this during the translation.
248 struct CopyOp {
249 /// Offset in the incoming/outgoing frame data.
251 /// Offset in the translation buffer.
253 /// Number of bytes to copy.
254 size_t size;
255 };
256
257 /// Information about a list field within a frame (for parallel encoding).
258 /// Note: Currently only numItems == 1 is supported (one list element per
259 /// frame).
261 /// Name of the list field.
262 std::string fieldName;
263 /// Offset of the list data array in the frame.
265 /// Size of each list element in bytes.
267 /// Offset of the 'last' field in the frame.
269 /// Offset in the translation buffer where list length is stored.
271 /// Offset in the translation buffer where list data starts.
273 };
274
275 /// Information about each frame in the windowed type.
276 struct FrameInfo {
277 /// The total size of a frame in bytes.
279 /// Precomputed copy operations for translating this frame (non-list
280 /// fields).
281 std::vector<CopyOp> copyOps;
282 /// Information about list fields in this frame (parallel encoding).
283 /// Currently only one list field per frame is supported.
284 std::optional<ListFieldInfo> listField;
285 };
286 /// Precomputed information about each frame.
287 std::vector<FrameInfo> frames;
288 /// Size of the 'into' type in bytes (for fixed-size types).
289 /// For types with lists, this is the size of the fixed header portion.
290 size_t intoTypeBytes = 0;
291 /// Number of bytes per wire frame (from the lowered type's bit width).
292 size_t frameBytes = 0;
293 /// True if the window contains a list field (variable-size message).
294 bool hasListField = false;
295 };
296 std::unique_ptr<TranslationInfo> translationInfo;
297
298 /// Method called by poll() to actually poll the channel if the channel is
299 /// connected.
300 virtual bool pollImpl() { return false; }
301
302 /// Called by all connect methods to let backends initiate the underlying
303 /// connections.
304 virtual void connectImpl(const ConnectOptions &options) {}
305};
306
307/// A ChannelPort which sends data to the accelerator.
309public:
311
312 virtual void connect(const ConnectOptions &options = {}) override {
313 translateMessages = options.translateMessage && translationInfo;
314 if (translationInfo) {
315 translationInfo->precomputeFrameInfo();
317 translationInfo->requireTranslationSupported();
318 }
319 connectImpl(options);
320 connected = true;
321 }
322 virtual void disconnect() override { connected = false; }
323 virtual bool isConnected() const override { return connected; }
324
325 /// A very basic blocking write API. Will likely change for performance
326 /// reasons.
327 void write(const MessageData &data) {
328 if (translateMessages) {
329 assert(translationBuffer.empty() &&
330 "Cannot call write() with pending translated messages");
331 translateOutgoing(data);
332 for (auto &msg : translationBuffer)
334 translationBuffer.clear();
335 } else {
337 }
338 }
339
340 /// Write a multi-segment message. Takes ownership so the backend can hold
341 /// the message across partial writes / async completions. Default flattens
342 /// and calls the regular write(). Backends override for scatter-gather /
343 /// chunked-DMA support.
344 virtual void write(std::unique_ptr<SegmentedMessageData> msg) {
345 if (!msg)
346 throw std::runtime_error(
347 "WriteChannelPort::write: null SegmentedMessageData");
348 write(msg->toMessageData());
349 }
350
351 /// A basic non-blocking write API. Returns true if any of the data was queued
352 /// and/or sent. If the data type is a window a 'true' return does not
353 /// indicate that the message has been completely written. The 'flush' method
354 /// can be used to check that the entire buffer has been written. It is
355 /// invalid for backends to always return false (i.e. backends must eventually
356 /// ensure that writes may succeed).
357 bool tryWrite(const MessageData &data) {
358 if (translateMessages) {
359 // Do not accept a new message if there are pending messages to flush.
360 if (!flush())
361 return false;
362 assert(translationBuffer.empty() &&
363 "Translation buffer should be empty after successful flush");
364 translateOutgoing(data);
365 flush();
366 return true;
367 } else {
369 }
370 }
371 /// Flush any buffered data. Returns true if all data was flushed.
372 ///
373 /// If `translateMessages` is false, calling `flush()` will immediately return
374 /// true and perform no action, as there is no buffered data to flush.
375 bool flush() {
376 while (translationBufferIdx < translationBuffer.size()) {
377 if (!tryWriteImpl(
379 return false;
381 }
382 translationBuffer.clear();
384 return true;
385 }
386
387protected:
388 /// Pad an empty payload with a single zero byte if the port type has a
389 /// zero-width logical width (e.g. `VoidType`). Transports (DMA engines,
390 /// cosim, etc.) require every message to carry at least one byte.
392 if (!data.getSize() && type && type->getBitWidth() == 0)
393 return transportPad0Bytes;
394 return data;
395 }
396
397 /// Backing storage for `maybePadEmptyMessage` return so that the returned
398 /// reference outlives the call.
399 inline static const MessageData transportPad0Bytes = MessageData({0});
400
401public:
402protected:
403 /// Implementation for write(). Subclasses must implement this.
404 virtual void writeImpl(const MessageData &) = 0;
405
406 /// Implementation for tryWrite(). Subclasses must implement this.
407 virtual bool tryWriteImpl(const MessageData &data) = 0;
408
409 /// Break a message into its frames.
410 std::vector<MessageData> getMessageFrames(const MessageData &data);
411
412 /// Whether to translate outgoing data if the port type is a window type. Set
413 /// by the connect() method.
414 bool translateMessages = false;
415 /// If tryWrite cannot write all the messages of a windowed type at once, it
416 /// stores them here and writes them out one by one on subsequent calls.
417 std::vector<MessageData> translationBuffer;
418 /// Index of the next message to write in translationBuffer.
420 /// Translate outgoing data if the port type is a window type. Append the new
421 /// message 'chunks' to translationBuffer.
422 void translateOutgoing(const MessageData &data);
423
424private:
425 volatile bool connected = false;
426};
427
428/// Instantiated when a backend does not know how to create a write channel.
430public:
433
434 void connect(const ConnectOptions &options = {}) override {
435 throw std::runtime_error(errmsg);
436 }
437
438protected:
439 void writeImpl(const MessageData &) override {
440 throw std::runtime_error(errmsg);
441 }
442 bool tryWriteImpl(const MessageData &) override {
443 throw std::runtime_error(errmsg);
444 }
445
446 std::string errmsg;
447};
448
449/// A ChannelPort which reads data from the accelerator. It has two modes:
450/// Callback and Polling which cannot be used at the same time. The mode is set
451/// at connect() time. To change the mode, disconnect() and then connect()
452/// again. When the port is disconnected, it will backpressure hardware.
454
455public:
456 /// Primary callback API for raw reads.
457 ///
458 /// The message is passed by owning reference so the callee can consume it,
459 /// move storage out of it, or leave it intact when returning `false` to
460 /// request a retry with the same message object.
462 std::function<bool(std::unique_ptr<SegmentedMessageData> &)>;
463
464 /// Compatibility callback API for callers which want flattened message
465 /// bytes instead of the owning segmented message object.
466 using FlatReadCallback = std::function<bool(MessageData)>;
467
470
471 /// Disconnect the channel. Warning: this method may block until all callbacks
472 /// have returned. Do not call it anywhere which could be in a callback
473 /// control path otherwise deadlock will occur.
474 virtual void disconnect() override;
475 virtual bool isConnected() const override {
476 return mode != Mode::Disconnected;
477 }
478
479 //===--------------------------------------------------------------------===//
480 // Callback mode: To use a callback, connect with a callback function which
481 // will get called with incoming data. This function can be called from any
482 // thread. It shall return true to indicate that the data was consumed. False
483 // if it could not accept the data and should be tried again at some point in
484 // the future. Callback is not allowed to block and needs to execute quickly.
485 //
486 // TODO: Have the callback return something upon which the caller can check,
487 // wait, and be notified.
488 //===--------------------------------------------------------------------===//
489
490 virtual void connect(ReadCallback callback,
491 const ConnectOptions &options = {});
492
493 /// Connect a compatibility callback which receives flattened `MessageData`
494 /// objects. This adapts the primary segmented callback path.
495 virtual void connect(FlatReadCallback callback,
496 const ConnectOptions &options = {});
497
498 //===--------------------------------------------------------------------===//
499 // Polling mode methods: To use futures or blocking reads, connect without any
500 // arguments. You will then be able to use readAsync() or read().
501 //===--------------------------------------------------------------------===//
502
503 /// Default max data queue size set at connect time.
504 static constexpr uint64_t DefaultMaxDataQueueMsgs = 32;
505
506 /// Connect to the channel in polling mode.
507 virtual void connect(const ConnectOptions &options = {}) override;
508
509 /// Asynchronous polling read.
510 ///
511 /// Throws if the port is disconnected or currently connected in callback
512 /// mode.
513 virtual std::future<MessageData> readAsync();
514
515 /// Specify a buffer to read into. Blocking. Basic API, will likely change
516 /// for performance and functionality reasons.
517 virtual void read(MessageData &outData) {
518 std::future<MessageData> f = readAsync();
519 f.wait();
520 outData = std::move(f.get());
521 }
522
523 /// Set maximum number of messages to store in the dataQueue. 0 means no
524 /// limit. This is only used in polling mode and is set to default of 32 upon
525 /// connect. While it may seem redundant to have this and bufferSize, there
526 /// may be (and are) backends which have a very small amount of memory which
527 /// are accelerator accessible and want to move messages out as quickly as
528 /// possible.
529 void setMaxDataQueueMsgs(uint64_t maxMsgs) {
530 maxDataQueueMsgs = maxMsgs;
531 if (pollingState)
532 pollingState->setMaxQueued(maxMsgs);
533 }
534
535protected:
536 /// Invoke the currently registered callback.
537 ///
538 /// Handles synchronization race conditions with disconnects. Backends should
539 /// use this helper instead of calling `callback` directly.
540 bool invokeCallback(std::unique_ptr<SegmentedMessageData> &msg);
541
542private:
543 /// Backends should not call this directly. Use `invokeCallback()` instead,
544 /// which handles synchronization with disconnects.
546
547protected:
548 /// Indicates the current mode of the channel.
550 volatile Mode mode;
551
552 /// If a translated message has been assembled but not yet consumed, retain
553 /// ownership here so retries present the same message object.
554 std::unique_ptr<SegmentedMessageData> translatedMessage;
555
556 /// Window translation support.
557 std::vector<uint8_t> translationBuffer;
558 /// Index of the next expected frame (for multi-frame windows).
559 size_t nextFrameIndex = 0;
560 /// For list fields: accumulated list data across frames.
561 std::vector<uint8_t> listDataBuffer;
562 /// Flag to track whether we're in the middle of accumulating list data.
564 /// Reset translation state buffers and indices.
566 /// Translate incoming data if the port type is a window type. Returns true if
567 /// the message has been completely received.
568 bool translateIncoming(MessageData &data);
569
570 //===--------------------------------------------------------------------===//
571 // Polling mode members.
572 //===--------------------------------------------------------------------===//
573
575 std::optional<detail::PollingBuffer<MessageData>> pollingState;
576
577 /// Synchronizes callback revocation during disconnect.
578 std::mutex callbackMutex;
579 std::condition_variable callbackCv;
580 size_t activeCallbacks = 0;
581};
582
583/// Instantiated when a backend does not know how to create a read channel.
585public:
588
590 const ConnectOptions &options = ConnectOptions()) override {
591 throw std::runtime_error(errmsg);
592 }
594 const ConnectOptions &options = ConnectOptions()) override {
595 throw std::runtime_error(errmsg);
596 }
597 void connect(const ConnectOptions &options = ConnectOptions()) override {
598 throw std::runtime_error(errmsg);
599 }
600 std::future<MessageData> readAsync() override {
601 throw std::runtime_error(errmsg);
602 }
603
604protected:
605 std::string errmsg;
606};
607
608/// Services provide connections to 'bundles' -- collections of named,
609/// unidirectional communication channels. This class provides access to those
610/// ChannelPorts.
612public:
613 /// Compute the direction of a channel given the bundle direction and the
614 /// bundle port's direction.
615 static bool isWrite(BundleType::Direction bundleDir) {
616 return bundleDir == BundleType::Direction::To;
617 }
618
619 /// Construct a port.
621 virtual ~BundlePort() = default;
622
623 /// Get the ID of the port.
624 AppID getID() const { return id; }
625
626 /// Get access to the raw byte streams of a channel. Intended for internal
627 /// usage and binding to other languages (e.g. Python) which have their own
628 /// message serialization code. Exposed publicly as an escape hatch, but
629 /// ordinary users should not use. You have been warned.
630 WriteChannelPort &getRawWrite(const std::string &name) const;
631 ReadChannelPort &getRawRead(const std::string &name) const;
632 const PortMap &getChannels() const { return channels; }
633
634 /// Cast this Bundle port to a subclass which is actually useful. Returns
635 /// nullptr if the cast fails.
636 // TODO: this probably shouldn't be 'const', but bundle ports' user access are
637 // const. Change that.
638 template <typename T>
639 T *getAs() const {
640 return const_cast<T *>(dynamic_cast<const T *>(this));
641 }
642
643 /// Calls `poll` on all channels in the bundle and returns true if any of them
644 /// returned true.
645 bool poll() {
646 bool result = false;
647 for (auto &channel : channels)
648 result |= channel.second.poll();
649 return result;
650 }
651
652protected:
656};
657
658} // namespace esi
659
660#endif // ESI_PORTS_H
assert(baseType &&"element must be base type")
Services provide connections to 'bundles' – collections of named, unidirectional communication channe...
Definition Ports.h:611
virtual ~BundlePort()=default
T * getAs() const
Cast this Bundle port to a subclass which is actually useful.
Definition Ports.h:639
PortMap channels
Definition Ports.h:655
ReadChannelPort & getRawRead(const std::string &name) const
Definition Ports.cpp:52
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
Definition Ports.cpp:42
const PortMap & getChannels() const
Definition Ports.h:632
bool poll()
Calls poll on all channels in the bundle and returns true if any of them returned true.
Definition Ports.h:645
const BundleType * type
Definition Ports.h:654
static bool isWrite(BundleType::Direction bundleDir)
Compute the direction of a channel given the bundle direction and the bundle port's direction.
Definition Ports.h:615
AppID getID() const
Get the ID of the port.
Definition Ports.h:624
Bundles represent a collection of channels.
Definition Types.h:104
Unidirectional channels are the basic communication primitive between the host and accelerator.
Definition Ports.h:115
const Type * getType() const
Definition Ports.h:219
virtual void connect(const ConnectOptions &options=ConnectOptions())=0
Set up a connection to the accelerator.
virtual void disconnect()=0
const WindowType * getWindowType() const
If this port carries a windowed type, return the original WindowType (whose intoType is what getType(...
Definition Ports.h:224
virtual bool pollImpl()
Method called by poll() to actually poll the channel if the channel is connected.
Definition Ports.h:300
const Type * type
Definition Ports.h:229
size_t getFrameSizeBytes() const
Get the size of each frame in bytes.
Definition Ports.h:214
virtual void connectImpl(const ConnectOptions &options)
Called by all connect methods to let backends initiate the underlying connections.
Definition Ports.h:304
ChannelPort(const Type *type)
Definition Ports.cpp:26
virtual ~ChannelPort()
Definition Ports.h:118
std::unique_ptr< TranslationInfo > translationInfo
Definition Ports.h:296
bool poll()
Poll for incoming data.
Definition Ports.h:203
virtual bool isConnected() const =0
A concrete flat message backed by a single vector of bytes.
Definition Common.h:155
A ChannelPort which reads data from the accelerator.
Definition Ports.h:453
volatile Mode mode
Definition Ports.h:550
std::mutex callbackMutex
Synchronizes callback revocation during disconnect.
Definition Ports.h:578
virtual void connect(ReadCallback callback, const ConnectOptions &options={})
Definition Ports.cpp:140
virtual std::future< MessageData > readAsync()
Asynchronous polling read.
Definition Ports.cpp:225
size_t nextFrameIndex
Index of the next expected frame (for multi-frame windows).
Definition Ports.h:559
virtual bool isConnected() const override
Definition Ports.h:475
std::vector< uint8_t > translationBuffer
Window translation support.
Definition Ports.h:557
bool accumulatingListData
Flag to track whether we're in the middle of accumulating list data.
Definition Ports.h:563
void resetTranslationState()
Reset translation state buffers and indices.
Definition Ports.cpp:62
Mode
Indicates the current mode of the channel.
Definition Ports.h:549
std::optional< detail::PollingBuffer< MessageData > > pollingState
Definition Ports.h:575
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
Definition Ports.h:504
bool translateIncoming(MessageData &data)
Translate incoming data if the port type is a window type.
Definition Ports.cpp:489
virtual void disconnect() override
Disconnect the channel.
Definition Ports.cpp:70
void setMaxDataQueueMsgs(uint64_t maxMsgs)
Set maximum number of messages to store in the dataQueue.
Definition Ports.h:529
size_t activeCallbacks
Definition Ports.h:580
std::unique_ptr< SegmentedMessageData > translatedMessage
If a translated message has been assembled but not yet consumed, retain ownership here so retries pre...
Definition Ports.h:554
uint64_t maxDataQueueMsgs
Definition Ports.h:574
std::function< bool(MessageData)> FlatReadCallback
Compatibility callback API for callers which want flattened message bytes instead of the owning segme...
Definition Ports.h:466
ReadCallback callback
Backends should not call this directly.
Definition Ports.h:545
std::vector< uint8_t > listDataBuffer
For list fields: accumulated list data across frames.
Definition Ports.h:561
bool invokeCallback(std::unique_ptr< SegmentedMessageData > &msg)
Invoke the currently registered callback.
Definition Ports.cpp:87
std::function< bool(std::unique_ptr< SegmentedMessageData > &)> ReadCallback
Primary callback API for raw reads.
Definition Ports.h:462
virtual void read(MessageData &outData)
Specify a buffer to read into.
Definition Ports.h:517
std::condition_variable callbackCv
Definition Ports.h:579
ReadChannelPort(const Type *type)
Definition Ports.h:468
Root class of the ESI type system.
Definition Types.h:36
virtual std::ptrdiff_t getBitWidth() const
Definition Types.h:43
Instantiated when a backend does not know how to create a read channel.
Definition Ports.h:584
void connect(ReadCallback callback, const ConnectOptions &options=ConnectOptions()) override
Definition Ports.h:589
void connect(const ConnectOptions &options=ConnectOptions()) override
Connect to the channel in polling mode.
Definition Ports.h:597
std::future< MessageData > readAsync() override
Asynchronous polling read.
Definition Ports.h:600
UnknownReadChannelPort(const Type *type, std::string errmsg)
Definition Ports.h:586
void connect(FlatReadCallback callback, const ConnectOptions &options=ConnectOptions()) override
Connect a compatibility callback which receives flattened MessageData objects.
Definition Ports.h:593
Instantiated when a backend does not know how to create a write channel.
Definition Ports.h:429
void connect(const ConnectOptions &options={}) override
Set up a connection to the accelerator.
Definition Ports.h:434
void writeImpl(const MessageData &) override
Implementation for write(). Subclasses must implement this.
Definition Ports.h:439
UnknownWriteChannelPort(const Type *type, std::string errmsg)
Definition Ports.h:431
bool tryWriteImpl(const MessageData &) override
Implementation for tryWrite(). Subclasses must implement this.
Definition Ports.h:442
Windows represent a fixed-size sliding window over a stream of data.
Definition Types.h:316
A ChannelPort which sends data to the accelerator.
Definition Ports.h:308
virtual void write(std::unique_ptr< SegmentedMessageData > msg)
Write a multi-segment message.
Definition Ports.h:344
virtual bool isConnected() const override
Definition Ports.h:323
virtual void disconnect() override
Definition Ports.h:322
size_t translationBufferIdx
Index of the next message to write in translationBuffer.
Definition Ports.h:419
virtual bool tryWriteImpl(const MessageData &data)=0
Implementation for tryWrite(). Subclasses must implement this.
volatile bool connected
Definition Ports.h:425
bool translateMessages
Whether to translate outgoing data if the port type is a window type.
Definition Ports.h:414
void write(const MessageData &data)
A very basic blocking write API.
Definition Ports.h:327
static const MessageData transportPad0Bytes
Backing storage for maybePadEmptyMessage return so that the returned reference outlives the call.
Definition Ports.h:399
const MessageData & maybePadEmptyMessage(const MessageData &data)
Pad an empty payload with a single zero byte if the port type has a zero-width logical width (e....
Definition Ports.h:391
bool flush()
Flush any buffered data.
Definition Ports.h:375
bool tryWrite(const MessageData &data)
A basic non-blocking write API.
Definition Ports.h:357
virtual void connect(const ConnectOptions &options={}) override
Set up a connection to the accelerator.
Definition Ports.h:312
void translateOutgoing(const MessageData &data)
Translate outgoing data if the port type is a window type.
Definition Ports.cpp:586
std::vector< MessageData > getMessageFrames(const MessageData &data)
Break a message into its frames.
Definition Ports.cpp:680
std::vector< MessageData > translationBuffer
If tryWrite cannot write all the messages of a windowed type at once, it stores them here and writes ...
Definition Ports.h:417
virtual void writeImpl(const MessageData &)=0
Implementation for write(). Subclasses must implement this.
Shared queue/promise helper for polling-style read APIs.
Definition Ports.h:43
PollingBuffer(uint64_t maxQueued)
Definition Ports.h:45
std::future< BufferedT > readAsync()
Read the next value asynchronously.
Definition Ports.h:84
uint64_t getMaxQueued()
Return the current bounded queue size. 0 means unbounded.
Definition Ports.h:48
bool enqueue(BufferedT &value)
Try to deliver or enqueue a produced value.
Definition Ports.h:62
void setMaxQueued(uint64_t maxMsgs)
Update the bounded queue size. 0 means unbounded.
Definition Ports.h:54
std::queue< BufferedT > dataQueue
Definition Ports.h:103
std::queue< std::promise< BufferedT > > promiseQueue
Definition Ports.h:105
uint64_t bitsToBytes(uint64_t bits)
Compute ceil(bits/8).
Definition Utils.h:232
Definition esi.py:1
std::map< std::string, ChannelPort & > PortMap
Definition Ports.h:33
ConnectOptions(std::optional< unsigned > bufferSize=std::nullopt, bool translateMessage=true)
Definition Ports.h:185
std::optional< unsigned > bufferSize
The buffer size is optional and should be considered merely a hint.
Definition Ports.h:124
bool translateMessage
If the type of this port is a window, translate the incoming/outgoing data into its underlying ('into...
Definition Ports.h:183
A copy operation for translating between frame data and the translation.
Definition Ports.h:248
size_t bufferOffset
Offset in the translation buffer.
Definition Ports.h:252
size_t frameOffset
Offset in the incoming/outgoing frame data.
Definition Ports.h:250
size_t size
Number of bytes to copy.
Definition Ports.h:254
Information about each frame in the windowed type.
Definition Ports.h:276
std::vector< CopyOp > copyOps
Precomputed copy operations for translating this frame (non-list fields).
Definition Ports.h:281
std::optional< ListFieldInfo > listField
Information about list fields in this frame (parallel encoding).
Definition Ports.h:284
size_t expectedSize
The total size of a frame in bytes.
Definition Ports.h:278
Information about a list field within a frame (for parallel encoding).
Definition Ports.h:260
size_t listLengthBufferOffset
Offset in the translation buffer where list length is stored.
Definition Ports.h:270
size_t dataOffset
Offset of the list data array in the frame.
Definition Ports.h:264
std::string fieldName
Name of the list field.
Definition Ports.h:262
size_t lastFieldOffset
Offset of the 'last' field in the frame.
Definition Ports.h:268
size_t elementSize
Size of each list element in bytes.
Definition Ports.h:266
size_t listDataBufferOffset
Offset in the translation buffer where list data starts.
Definition Ports.h:272
Instructions for translating windowed types.
Definition Ports.h:233
void precomputeFrameInfo()
Precompute and optimize the copy operations for translating frames.
Definition Ports.cpp:264
void requireTranslationSupported() const
Throw if this window cannot be translated by the runtime translator (e.g.
Definition Ports.cpp:245
TranslationInfo(const WindowType *windowType)
Definition Ports.h:234
bool hasListField
True if the window contains a list field (variable-size message).
Definition Ports.h:294
const WindowType * windowType
The window type being translated.
Definition Ports.h:244
size_t intoTypeBytes
Size of the 'into' type in bytes (for fixed-size types).
Definition Ports.h:290
size_t frameBytes
Number of bytes per wire frame (from the lowered type's bit width).
Definition Ports.h:292
std::vector< FrameInfo > frames
Precomputed information about each frame.
Definition Ports.h:287