28 type = chanType->getInner();
30 if (translationType) {
31 this->type = translationType->getIntoType();
32 this->
translationInfo = std::make_unique<TranslationInfo>(translationType);
40 : id(id), type(type), channels(channels) {}
45 throw std::runtime_error(
"Channel '" + name +
"' not found");
48 throw std::runtime_error(
"Channel '" + name +
"' is not a write channel");
55 throw std::runtime_error(
"Channel '" + name +
"' not found");
58 throw std::runtime_error(
"Channel '" + name +
"' is not a read channel");
72 throw std::runtime_error(
"Channel already connected");
105 std::scoped_lock<std::mutex> lock(
pollingM);
107 "Both queues are in use.");
111 std::promise<MessageData> p = std::move(
promiseQueue.front());
113 p.set_value(std::move(data));
128 throw std::runtime_error(
129 "Cannot read from a callback channel. `connect()` without a callback "
130 "specified to use polling mode.");
132 std::scoped_lock<std::mutex> lock(
pollingM);
134 "Both queues are in use.");
138 std::promise<MessageData> p;
139 std::future<MessageData> f = p.get_future();
140 p.set_value(std::move(
dataQueue.front()));
159 throw std::runtime_error(
160 "Window intoType must be a struct for translation");
162 const auto &intoFields = intoStruct->
getFields();
171 const Type *listElementType;
172 size_t listElementSize;
174 std::map<std::string, FieldInfo> fieldMap;
175 size_t currentOffset = 0;
178 auto processField = [&](
const std::string &name,
const Type *fieldType) {
179 auto *listType =
dynamic_cast<const ListType *
>(fieldType);
185 const Type *elemType = listType->getElementType();
188 throw std::runtime_error(
189 "Cannot translate list with dynamically-sized element type: " +
191 if (elemBits % 8 != 0)
192 throw std::runtime_error(
193 "Cannot translate list element with non-byte-aligned size: " +
195 size_t elemBytes = (
static_cast<size_t>(elemBits) + 7) / 8;
196 fieldMap[name] = {currentOffset, fieldType,
true, elemType, elemBytes};
200 currentOffset +=
sizeof(size_t);
203 std::ptrdiff_t fieldBits = fieldType->
getBitWidth();
205 throw std::runtime_error(
"Cannot translate field with dynamic size: " +
207 if (fieldBits % 8 != 0)
208 throw std::runtime_error(
209 "Cannot translate field with non-byte-aligned size: " + name);
210 size_t fieldBytes = (
static_cast<size_t>(fieldBits) + 7) / 8;
211 fieldMap[name] = {currentOffset, fieldType,
false,
nullptr, 0};
212 currentOffset += fieldBytes;
217 for (
auto it = intoFields.rbegin(); it != intoFields.rend(); ++it)
218 processField(it->first, it->second);
220 for (
const auto &[name, fieldType] : intoFields)
221 processField(name, fieldType);
230 frames.reserve(windowFrames.size());
232 for (
const auto &frame : windowFrames) {
234 size_t frameOffset = 0;
251 struct FrameFieldLayout {
257 size_t listElementSize;
259 const Type *listElementType;
261 std::vector<FrameFieldLayout> fieldLayouts;
263 for (
auto fieldIt = frame.fields.rbegin(); fieldIt != frame.fields.rend();
266 auto it = fieldMap.find(field.
name);
267 if (it == fieldMap.end())
268 throw std::runtime_error(
"Frame field '" + field.
name +
269 "' not found in intoType");
271 const FieldInfo &fieldInfo = it->second;
272 FrameFieldLayout layout;
273 layout.name = field.
name;
274 layout.bufferOffset = fieldInfo.offset;
275 layout.isList = fieldInfo.isList;
277 layout.listElementType = fieldInfo.listElementType;
278 layout.listElementSize = fieldInfo.listElementSize;
280 if (fieldInfo.isList) {
287 throw std::runtime_error(
288 "List translation with numItems > 1 is not yet supported. "
290 field.
name +
"' has numItems=" + std::to_string(numItems));
293 size_t lastOffset = frameOffset;
297 layout.frameOffset = frameOffset;
298 layout.size = fieldInfo.listElementSize;
299 fieldLayouts.push_back(layout);
300 frameOffset += layout.size;
313 std::ptrdiff_t fieldBits = fieldInfo.type->getBitWidth();
314 layout.frameOffset = frameOffset;
315 layout.size = (
static_cast<size_t>(fieldBits) + 7) / 8;
316 fieldLayouts.push_back(layout);
317 frameOffset += layout.size;
324 for (
const auto &layout : fieldLayouts) {
325 if (!layout.isList) {
328 {layout.frameOffset, layout.bufferOffset, layout.size});
337 return a.frameOffset < b.frameOffset;
341 if (!frameInfo.
copyOps.empty()) {
342 std::vector<CopyOp> mergedOps;
343 mergedOps.push_back(frameInfo.
copyOps[0]);
345 for (
size_t i = 1; i < frameInfo.
copyOps.size(); ++i) {
346 CopyOp &last = mergedOps.back();
353 mergedOps.push_back(current);
357 frameInfo.
copyOps = std::move(mergedOps);
360 frames.push_back(std::move(frameInfo));
367 if (loweredBits > 0) {
371 for (
const auto &f :
frames)
378 "Translation type must be set for window translation.");
381 const uint8_t *frameData = data.getBytes();
382 size_t frameDataSize = data.size();
388 if (frameDataSize < frameInfo.expectedSize)
389 throw std::runtime_error(
"Frame data too small: expected at least " +
390 std::to_string(frameInfo.expectedSize) +
391 " bytes, got " + std::to_string(frameDataSize) +
397 bool isFirstFrame = (nextFrameIndex == 0) && !accumulatingListData;
404 listDataBuffer.clear();
408 for (
const auto &op : frameInfo.copyOps)
409 std::memcpy(translationBuffer.data() + op.bufferOffset,
410 frameData + op.frameOffset, op.size);
413 if (frameInfo.listField.has_value()) {
414 const auto &listInfo = frameInfo.listField.value();
417 size_t bytesToCopy = listInfo.elementSize;
419 if (listInfo.dataOffset > frameDataSize)
420 throw std::runtime_error(
"List data offset is beyond frame bounds");
422 if (listInfo.dataOffset + bytesToCopy > frameDataSize)
423 throw std::runtime_error(
"List data extends beyond frame bounds");
424 size_t oldSize = listDataBuffer.size();
425 listDataBuffer.resize(oldSize + bytesToCopy);
426 std::memcpy(listDataBuffer.data() + oldSize,
427 frameData + listInfo.dataOffset, bytesToCopy);
430 uint8_t lastFlag = frameData[listInfo.lastFieldOffset];
435 size_t listLength = listDataBuffer.size() / listInfo.elementSize;
436 size_t *listLengthPtr =
reinterpret_cast<size_t *
>(
437 translationBuffer.data() + listInfo.listLengthBufferOffset);
438 *listLengthPtr = listLength;
441 size_t headerSize = translationBuffer.size();
442 translationBuffer.resize(headerSize + listDataBuffer.size());
443 std::memcpy(translationBuffer.data() + headerSize, listDataBuffer.data(),
444 listDataBuffer.size());
448 listDataBuffer.clear();
449 accumulatingListData =
false;
455 accumulatingListData =
true;
464 if (nextFrameIndex >= numFrames) {
475 "Translation type must be set for window translation.");
477 const uint8_t *srcData = data.getBytes();
478 size_t srcDataSize = data.size();
480 if (srcDataSize < translationInfo->intoTypeBytes)
481 throw std::runtime_error(
"Source data too small: expected at least " +
483 " bytes, got " + std::to_string(srcDataSize) +
490 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
491 for (
const auto &op : frameInfo.copyOps)
492 std::memcpy(frameBuffer.data() + op.frameOffset,
493 srcData + op.bufferOffset, op.size);
494 translationBuffer.emplace_back(std::move(frameBuffer));
501 if (!frameInfo.listField.has_value()) {
503 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
504 for (
const auto &op : frameInfo.copyOps)
505 std::memcpy(frameBuffer.data() + op.frameOffset,
506 srcData + op.bufferOffset, op.size);
507 translationBuffer.emplace_back(std::move(frameBuffer));
510 const auto &listInfo = frameInfo.listField.value();
513 size_t listLength = 0;
514 std::memcpy(&listLength, srcData + listInfo.listLengthBufferOffset,
518 if (
translationInfo->intoTypeBytes + (listLength * listInfo.elementSize) >
520 throw std::runtime_error(
521 "Source buffer too small for list data: possible corrupted or "
522 "inconsistent list length field");
528 size_t itemsRemaining = listLength;
529 size_t listDataOffset = 0;
533 throw std::runtime_error(
534 "Cannot send empty lists - parallel ESI list encoding requires at "
535 "least one frame to be sent, and each frame must contain at least "
538 while (itemsRemaining > 0) {
539 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
542 for (
const auto &op : frameInfo.copyOps)
543 std::memcpy(frameBuffer.data() + op.frameOffset,
544 srcData + op.bufferOffset, op.size);
547 size_t bytesInThisFrame = listInfo.elementSize;
550 std::memcpy(frameBuffer.data() + listInfo.dataOffset,
551 listData + listDataOffset, bytesInThisFrame);
555 listDataOffset += bytesInThisFrame;
558 frameBuffer[listInfo.lastFieldOffset] = (itemsRemaining == 0) ? 1 : 0;
560 translationBuffer.emplace_back(std::move(frameBuffer));
566std::vector<MessageData>
569 assert(frameBytes > 0 &&
"Frame size must be greater than 0");
570 if (data.getSize() % frameBytes != 0)
571 throw std::runtime_error(
572 "write message size (" + std::to_string(data.getSize()) +
573 ") is not a multiple of frame size (" + std::to_string(frameBytes) +
575 std::vector<MessageData> frames;
576 size_t numFrames = data.getSize() / frameBytes;
577 const uint8_t *ptr = data.getBytes();
578 for (
size_t i = 0; i < numFrames; ++i) {
579 frames.emplace_back(ptr, frameBytes);
assert(baseType &&"element must be base type")
ReadChannelPort & getRawRead(const std::string &name) const
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
BundlePort(AppID id, const BundleType *type, PortMap channels)
Construct a port.
Bundles represent a collection of channels.
size_t getFrameSizeBytes() const
Get the size of each frame in bytes.
virtual void connectImpl(const ConnectOptions &options)
Called by all connect methods to let backends initiate the underlying connections.
ChannelPort(const Type *type)
std::unique_ptr< TranslationInfo > translationInfo
Channels are the basic communication primitives.
Lists represent variable-length sequences of elements of a single type.
A logical chunk of data representing serialized data.
A ChannelPort which reads data from the accelerator.
virtual std::future< MessageData > readAsync()
Asynchronous read.
size_t nextFrameIndex
Index of the next expected frame (for multi-frame windows).
virtual void connect(std::function< bool(MessageData)> callback, const ConnectOptions &options={})
std::vector< uint8_t > translationBuffer
Window translation support.
std::mutex pollingM
Mutex to protect the two queues used for polling.
bool accumulatingListData
Flag to track whether we're in the middle of accumulating list data.
void resetTranslationState()
Reset translation state buffers and indices.
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
std::queue< MessageData > dataQueue
Store incoming data here if there are no outstanding promises to be fulfilled.
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
bool translateIncoming(MessageData &data)
Translate incoming data if the port type is a window type.
std::queue< std::promise< MessageData > > promiseQueue
Promises to be fulfilled when data is available.
uint64_t maxDataQueueMsgs
Maximum number of messages to store in dataQueue. 0 means no limit.
std::vector< uint8_t > listDataBuffer
For list fields: accumulated list data across frames.
Structs are an ordered collection of fields, each with a name and a type.
const FieldVector & getFields() const
Root class of the ESI type system.
std::string toString(bool oneLine=false) const
virtual std::ptrdiff_t getBitWidth() const
Windows represent a fixed-size sliding window over a stream of data.
const std::vector< Frame > & getFrames() const
const Type * getLoweredType() const
const Type * getIntoType() const
A ChannelPort which sends data to the accelerator.
void translateOutgoing(const MessageData &data)
Translate outgoing data if the port type is a window type.
std::vector< MessageData > getMessageFrames(const MessageData &data)
Break a message into its frames.
std::map< std::string, ChannelPort & > PortMap
bool translateMessage
If the type of this port is a window, translate the incoming/outgoing data into its underlying ('into...
A copy operation for translating between frame data and the translation.
size_t bufferOffset
Offset in the translation buffer.
size_t frameOffset
Offset in the incoming/outgoing frame data.
size_t size
Number of bytes to copy.
Information about each frame in the windowed type.
std::vector< CopyOp > copyOps
Precomputed copy operations for translating this frame (non-list fields).
std::optional< ListFieldInfo > listField
Information about list fields in this frame (parallel encoding).
size_t expectedSize
The total size of a frame in bytes.
Information about a list field within a frame (for parallel encoding).
size_t listLengthBufferOffset
Offset in the translation buffer where list length is stored.
size_t dataOffset
Offset of the list data array in the frame.
std::string fieldName
Name of the list field.
size_t lastFieldOffset
Offset of the 'last' field in the frame.
size_t elementSize
Size of each list element in bytes.
size_t listDataBufferOffset
Offset in the translation buffer where list data starts.
void precomputeFrameInfo()
Precompute and optimize the copy operations for translating frames.
bool hasListField
True if the window contains a list field (variable-size message).
const WindowType * windowType
The window type being translated.
size_t intoTypeBytes
Size of the 'into' type in bytes (for fixed-size types).
size_t frameBytes
Number of bytes per wire frame (from the lowered type's bit width).
std::vector< FrameInfo > frames
Precomputed information about each frame.
Field information describing a field within a frame.