28 type = chanType->getInner();
30 if (translationType) {
31 this->type = translationType->getIntoType();
32 this->
translationInfo = std::make_unique<TranslationInfo>(translationType);
40 : id(id), type(type), channels(channels) {}
45 throw std::runtime_error(
"Channel '" + name +
"' not found");
48 throw std::runtime_error(
"Channel '" + name +
"' is not a write channel");
55 throw std::runtime_error(
"Channel '" + name +
"' not found");
58 throw std::runtime_error(
"Channel '" + name +
"' is not a read channel");
72 throw std::runtime_error(
"Channel already connected");
105 std::scoped_lock<std::mutex> lock(
pollingM);
107 "Both queues are in use.");
111 std::promise<MessageData> p = std::move(
promiseQueue.front());
113 p.set_value(std::move(data));
128 throw std::runtime_error(
129 "Cannot read from a callback channel. `connect()` without a callback "
130 "specified to use polling mode.");
132 std::scoped_lock<std::mutex> lock(
pollingM);
134 "Both queues are in use.");
138 std::promise<MessageData> p;
139 std::future<MessageData> f = p.get_future();
140 p.set_value(std::move(
dataQueue.front()));
159 throw std::runtime_error(
160 "Window intoType must be a struct for translation");
162 const auto &intoFields = intoStruct->
getFields();
171 const Type *listElementType;
172 size_t listElementSize;
174 std::map<std::string, FieldInfo> fieldMap;
175 size_t currentOffset = 0;
178 auto processField = [&](
const std::string &name,
const Type *fieldType) {
179 auto *listType =
dynamic_cast<const ListType *
>(fieldType);
185 const Type *elemType = listType->getElementType();
188 throw std::runtime_error(
189 "Cannot translate list with dynamically-sized element type: " +
191 if (elemBits % 8 != 0)
192 throw std::runtime_error(
193 "Cannot translate list element with non-byte-aligned size: " +
195 size_t elemBytes = (
static_cast<size_t>(elemBits) + 7) / 8;
196 fieldMap[name] = {currentOffset, fieldType,
true, elemType, elemBytes};
200 currentOffset +=
sizeof(size_t);
203 std::ptrdiff_t fieldBits = fieldType->
getBitWidth();
205 throw std::runtime_error(
"Cannot translate field with dynamic size: " +
207 if (fieldBits % 8 != 0)
208 throw std::runtime_error(
209 "Cannot translate field with non-byte-aligned size: " + name);
210 size_t fieldBytes = (
static_cast<size_t>(fieldBits) + 7) / 8;
211 fieldMap[name] = {currentOffset, fieldType,
false,
nullptr, 0};
212 currentOffset += fieldBytes;
217 for (
auto it = intoFields.rbegin(); it != intoFields.rend(); ++it)
218 processField(it->first, it->second);
220 for (
const auto &[name, fieldType] : intoFields)
221 processField(name, fieldType);
230 frames.reserve(windowFrames.size());
232 for (
const auto &frame : windowFrames) {
234 size_t frameOffset = 0;
251 struct FrameFieldLayout {
257 size_t listElementSize;
259 const Type *listElementType;
261 std::vector<FrameFieldLayout> fieldLayouts;
263 for (
auto fieldIt = frame.fields.rbegin(); fieldIt != frame.fields.rend();
266 auto it = fieldMap.find(field.
name);
267 if (it == fieldMap.end())
268 throw std::runtime_error(
"Frame field '" + field.
name +
269 "' not found in intoType");
271 const FieldInfo &fieldInfo = it->second;
272 FrameFieldLayout layout;
273 layout.name = field.
name;
274 layout.bufferOffset = fieldInfo.offset;
275 layout.isList = fieldInfo.isList;
277 layout.listElementType = fieldInfo.listElementType;
278 layout.listElementSize = fieldInfo.listElementSize;
280 if (fieldInfo.isList) {
287 throw std::runtime_error(
288 "List translation with numItems > 1 is not yet supported. "
290 field.
name +
"' has numItems=" + std::to_string(numItems));
293 size_t lastOffset = frameOffset;
297 layout.frameOffset = frameOffset;
298 layout.size = fieldInfo.listElementSize;
299 fieldLayouts.push_back(layout);
300 frameOffset += layout.size;
313 std::ptrdiff_t fieldBits = fieldInfo.type->getBitWidth();
314 layout.frameOffset = frameOffset;
315 layout.size = (
static_cast<size_t>(fieldBits) + 7) / 8;
316 fieldLayouts.push_back(layout);
317 frameOffset += layout.size;
324 for (
const auto &layout : fieldLayouts) {
325 if (!layout.isList) {
328 {layout.frameOffset, layout.bufferOffset, layout.size});
337 return a.frameOffset < b.frameOffset;
341 if (!frameInfo.
copyOps.empty()) {
342 std::vector<CopyOp> mergedOps;
343 mergedOps.push_back(frameInfo.
copyOps[0]);
345 for (
size_t i = 1; i < frameInfo.
copyOps.size(); ++i) {
346 CopyOp &last = mergedOps.back();
353 mergedOps.push_back(current);
357 frameInfo.
copyOps = std::move(mergedOps);
360 frames.push_back(std::move(frameInfo));
366 "Translation type must be set for window translation.");
369 const uint8_t *frameData = data.getBytes();
370 size_t frameDataSize = data.size();
376 if (frameDataSize < frameInfo.expectedSize)
377 throw std::runtime_error(
"Frame data too small: expected at least " +
378 std::to_string(frameInfo.expectedSize) +
379 " bytes, got " + std::to_string(frameDataSize) +
385 bool isFirstFrame = (nextFrameIndex == 0) && !accumulatingListData;
392 listDataBuffer.clear();
396 for (
const auto &op : frameInfo.copyOps)
397 std::memcpy(translationBuffer.data() + op.bufferOffset,
398 frameData + op.frameOffset, op.size);
401 if (frameInfo.listField.has_value()) {
402 const auto &listInfo = frameInfo.listField.value();
405 size_t bytesToCopy = listInfo.elementSize;
407 if (listInfo.dataOffset > frameDataSize)
408 throw std::runtime_error(
"List data offset is beyond frame bounds");
410 if (listInfo.dataOffset + bytesToCopy > frameDataSize)
411 throw std::runtime_error(
"List data extends beyond frame bounds");
412 size_t oldSize = listDataBuffer.size();
413 listDataBuffer.resize(oldSize + bytesToCopy);
414 std::memcpy(listDataBuffer.data() + oldSize,
415 frameData + listInfo.dataOffset, bytesToCopy);
418 uint8_t lastFlag = frameData[listInfo.lastFieldOffset];
423 size_t listLength = listDataBuffer.size() / listInfo.elementSize;
424 size_t *listLengthPtr =
reinterpret_cast<size_t *
>(
425 translationBuffer.data() + listInfo.listLengthBufferOffset);
426 *listLengthPtr = listLength;
429 size_t headerSize = translationBuffer.size();
430 translationBuffer.resize(headerSize + listDataBuffer.size());
431 std::memcpy(translationBuffer.data() + headerSize, listDataBuffer.data(),
432 listDataBuffer.size());
436 listDataBuffer.clear();
437 accumulatingListData =
false;
443 accumulatingListData =
true;
452 if (nextFrameIndex >= numFrames) {
463 "Translation type must be set for window translation.");
465 const uint8_t *srcData = data.getBytes();
466 size_t srcDataSize = data.size();
468 if (srcDataSize < translationInfo->intoTypeBytes)
469 throw std::runtime_error(
"Source data too small: expected at least " +
471 " bytes, got " + std::to_string(srcDataSize) +
478 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
479 for (
const auto &op : frameInfo.copyOps)
480 std::memcpy(frameBuffer.data() + op.frameOffset,
481 srcData + op.bufferOffset, op.size);
482 translationBuffer.emplace_back(std::move(frameBuffer));
489 if (!frameInfo.listField.has_value()) {
491 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
492 for (
const auto &op : frameInfo.copyOps)
493 std::memcpy(frameBuffer.data() + op.frameOffset,
494 srcData + op.bufferOffset, op.size);
495 translationBuffer.emplace_back(std::move(frameBuffer));
498 const auto &listInfo = frameInfo.listField.value();
501 size_t listLength = 0;
502 std::memcpy(&listLength, srcData + listInfo.listLengthBufferOffset,
506 if (
translationInfo->intoTypeBytes + (listLength * listInfo.elementSize) >
508 throw std::runtime_error(
509 "Source buffer too small for list data: possible corrupted or "
510 "inconsistent list length field");
516 size_t itemsRemaining = listLength;
517 size_t listDataOffset = 0;
521 throw std::runtime_error(
522 "Cannot send empty lists - parallel ESI list encoding requires at "
523 "least one frame to be sent, and each frame must contain at least "
526 while (itemsRemaining > 0) {
527 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
530 for (
const auto &op : frameInfo.copyOps)
531 std::memcpy(frameBuffer.data() + op.frameOffset,
532 srcData + op.bufferOffset, op.size);
535 size_t bytesInThisFrame = listInfo.elementSize;
538 std::memcpy(frameBuffer.data() + listInfo.dataOffset,
539 listData + listDataOffset, bytesInThisFrame);
543 listDataOffset += bytesInThisFrame;
546 frameBuffer[listInfo.lastFieldOffset] = (itemsRemaining == 0) ? 1 : 0;
548 translationBuffer.emplace_back(std::move(frameBuffer));
assert(baseType &&"element must be base type")
ReadChannelPort & getRawRead(const std::string &name) const
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
BundlePort(AppID id, const BundleType *type, PortMap channels)
Construct a port.
Bundles represent a collection of channels.
virtual void connectImpl(const ConnectOptions &options)
Called by all connect methods to let backends initiate the underlying connections.
ChannelPort(const Type *type)
std::unique_ptr< TranslationInfo > translationInfo
Channels are the basic communication primitives.
Lists represent variable-length sequences of elements of a single type.
A logical chunk of data representing serialized data.
A ChannelPort which reads data from the accelerator.
virtual std::future< MessageData > readAsync()
Asynchronous read.
size_t nextFrameIndex
Index of the next expected frame (for multi-frame windows).
virtual void connect(std::function< bool(MessageData)> callback, const ConnectOptions &options={})
std::vector< uint8_t > translationBuffer
Window translation support.
std::mutex pollingM
Mutex to protect the two queues used for polling.
bool accumulatingListData
Flag to track whether we're in the middle of accumulating list data.
void resetTranslationState()
Reset translation state buffers and indices.
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
std::queue< MessageData > dataQueue
Store incoming data here if there are no outstanding promises to be fulfilled.
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
bool translateIncoming(MessageData &data)
Translate incoming data if the port type is a window type.
std::queue< std::promise< MessageData > > promiseQueue
Promises to be fulfilled when data is available.
uint64_t maxDataQueueMsgs
Maximum number of messages to store in dataQueue. 0 means no limit.
std::vector< uint8_t > listDataBuffer
For list fields: accumulated list data across frames.
Structs are an ordered collection of fields, each with a name and a type.
const FieldVector & getFields() const
Root class of the ESI type system.
virtual std::ptrdiff_t getBitWidth() const
Windows represent a fixed-size sliding window over a stream of data.
const std::vector< Frame > & getFrames() const
const Type * getIntoType() const
A ChannelPort which sends data to the accelerator.
void translateOutgoing(const MessageData &data)
Translate outgoing data if the port type is a window type.
std::map< std::string, ChannelPort & > PortMap
bool translateMessage
If the type of this port is a window, translate the incoming/outgoing data into its underlying ('into...
A copy operation for translating between frame data and the translation.
size_t bufferOffset
Offset in the translation buffer.
size_t frameOffset
Offset in the incoming/outgoing frame data.
size_t size
Number of bytes to copy.
Information about each frame in the windowed type.
std::vector< CopyOp > copyOps
Precomputed copy operations for translating this frame (non-list fields).
std::optional< ListFieldInfo > listField
Information about list fields in this frame (parallel encoding).
size_t expectedSize
The total size of a frame in bytes.
Information about a list field within a frame (for parallel encoding).
size_t listLengthBufferOffset
Offset in the translation buffer where list length is stored.
size_t dataOffset
Offset of the list data array in the frame.
std::string fieldName
Name of the list field.
size_t lastFieldOffset
Offset of the 'last' field in the frame.
size_t elementSize
Size of each list element in bytes.
size_t listDataBufferOffset
Offset in the translation buffer where list data starts.
void precomputeFrameInfo()
Precompute and optimize the copy operations for translating frames.
bool hasListField
True if the window contains a list field (variable-size message).
const WindowType * windowType
The window type being translated.
size_t intoTypeBytes
Size of the 'into' type in bytes (for fixed-size types).
std::vector< FrameInfo > frames
Precomputed information about each frame.
Field information describing a field within a frame.