28 type = chanType->getInner();
30 if (translationType) {
31 this->type = translationType->getIntoType();
32 this->
translationInfo = std::make_unique<TranslationInfo>(translationType);
40 : id(id), type(type), channels(channels) {}
45 throw std::runtime_error(
"Channel '" + name +
"' not found");
48 throw std::runtime_error(
"Channel '" + name +
"' is not a write channel");
55 throw std::runtime_error(
"Channel '" + name +
"' not found");
58 throw std::runtime_error(
"Channel '" + name +
"' is not a read channel");
88 std::unique_ptr<SegmentedMessageData> &msg) {
98 assert(
callback &&
"Callback should be set in non-disconnected mode");
108 auto finish = [
this]() {
122 if (msg->totalSize() == 1) {
123 msg = std::make_unique<MessageData>();
125 throw std::runtime_error(
126 "zero-width message must use a single-byte placeholder payload");
131 bool consumed = activeCallback(msg);
143 throw std::runtime_error(
"Channel already connected");
152 this->callback = [
this, cb = std::move(
callback)](
153 std::unique_ptr<SegmentedMessageData> &data) {
178 [cb = std::move(
callback)](std::unique_ptr<SegmentedMessageData> &data)
179 ->
bool {
return cb(data->toMessageData()); },
185 throw std::runtime_error(
"Channel already connected");
200 translate](std::unique_ptr<SegmentedMessageData> &msg) {
212 data = msg->toMessageData();
227 throw std::runtime_error(
228 "Cannot read from a callback channel. Call `connect()` without a "
229 "callback specified to use polling mode.");
231 throw std::runtime_error(
232 "Cannot read from a disconnected channel. `connect()` the channel "
233 "without a callback before calling `readAsync()`.");
251 for (
const auto &field : frame.fields) {
252 if (field.bulkCountWidth > 0)
253 throw std::runtime_error(
254 "Window translation for serial (bulk) list encoding is not "
255 "implemented (window '" +
258 "'). Connect the port with ConnectOptions{translateMessage=false} "
259 "and decode the frames manually.");
265 const Type *intoType = windowType->getIntoType();
269 throw std::runtime_error(
270 "Window intoType must be a struct for translation");
272 const auto &intoFields = intoStruct->
getFields();
281 const Type *listElementType;
282 size_t listElementSize;
284 std::map<std::string, FieldInfo> fieldMap;
285 size_t currentOffset = 0;
286 hasListField =
false;
288 auto processField = [&](
const std::string &name,
const Type *fieldType) {
289 auto *listType =
dynamic_cast<const ListType *
>(fieldType);
295 const Type *elemType = listType->getElementType();
298 throw std::runtime_error(
299 "Cannot translate list with dynamically-sized element type: " +
302 throw std::runtime_error(
303 "Cannot translate list with zero-width element type: " + name);
304 if (elemBits % 8 != 0)
305 throw std::runtime_error(
306 "Cannot translate list element with non-byte-aligned size: " +
308 size_t elemBytes = (
static_cast<size_t>(elemBits) + 7) / 8;
309 fieldMap[name] = {currentOffset, fieldType,
true, elemType, elemBytes};
313 currentOffset +=
sizeof(size_t);
316 std::ptrdiff_t fieldBits = fieldType->
getBitWidth();
318 throw std::runtime_error(
"Cannot translate field with dynamic size: " +
320 if (fieldBits % 8 != 0)
321 throw std::runtime_error(
322 "Cannot translate field with non-byte-aligned size: " + name);
323 size_t fieldBytes = (
static_cast<size_t>(fieldBits) + 7) / 8;
324 fieldMap[name] = {currentOffset, fieldType,
false,
nullptr, 0};
325 currentOffset += fieldBytes;
330 for (
auto it = intoFields.rbegin(); it != intoFields.rend(); ++it)
331 processField(it->first, it->second);
333 for (
const auto &[name, fieldType] : intoFields)
334 processField(name, fieldType);
339 intoTypeBytes = currentOffset;
341 const auto &windowFrames = windowType->getFrames();
343 frames.reserve(windowFrames.size());
345 for (
const auto &frame : windowFrames) {
347 size_t frameOffset = 0;
364 struct FrameFieldLayout {
370 size_t listElementSize;
372 const Type *listElementType;
374 std::vector<FrameFieldLayout> fieldLayouts;
376 for (
auto fieldIt = frame.fields.rbegin(); fieldIt != frame.fields.rend();
379 auto it = fieldMap.find(field.
name);
380 if (it == fieldMap.end())
381 throw std::runtime_error(
"Frame field '" + field.
name +
382 "' not found in intoType");
384 const FieldInfo &fieldInfo = it->second;
385 FrameFieldLayout layout;
386 layout.name = field.
name;
387 layout.bufferOffset = fieldInfo.offset;
388 layout.isList = fieldInfo.isList;
390 layout.listElementType = fieldInfo.listElementType;
391 layout.listElementSize = fieldInfo.listElementSize;
393 if (fieldInfo.isList) {
400 throw std::runtime_error(
401 "List translation with numItems > 1 is not yet supported. "
403 field.
name +
"' has numItems=" + std::to_string(numItems));
406 size_t lastOffset = frameOffset;
410 layout.frameOffset = frameOffset;
411 layout.size = fieldInfo.listElementSize;
412 fieldLayouts.push_back(layout);
413 frameOffset += layout.size;
426 std::ptrdiff_t fieldBits = fieldInfo.type->getBitWidth();
427 layout.frameOffset = frameOffset;
428 layout.size = (
static_cast<size_t>(fieldBits) + 7) / 8;
429 fieldLayouts.push_back(layout);
430 frameOffset += layout.size;
437 for (
const auto &layout : fieldLayouts) {
438 if (!layout.isList) {
441 {layout.frameOffset, layout.bufferOffset, layout.size});
450 return a.frameOffset < b.frameOffset;
454 if (!frameInfo.
copyOps.empty()) {
455 std::vector<CopyOp> mergedOps;
456 mergedOps.push_back(frameInfo.
copyOps[0]);
458 for (
size_t i = 1; i < frameInfo.
copyOps.size(); ++i) {
459 CopyOp &last = mergedOps.back();
466 mergedOps.push_back(current);
470 frameInfo.
copyOps = std::move(mergedOps);
473 frames.push_back(std::move(frameInfo));
479 std::ptrdiff_t loweredBits = windowType->getLoweredType()->getBitWidth();
480 if (loweredBits > 0) {
481 frameBytes = (loweredBits + 7) / 8;
484 for (
const auto &f : frames)
485 frameBytes = std::max(frameBytes, f.expectedSize);
491 "Translation type must be set for window translation.");
494 const uint8_t *frameData = data.getBytes();
495 size_t frameDataSize = data.size();
501 if (frameDataSize < frameInfo.expectedSize)
502 throw std::runtime_error(
"Frame data too small: expected at least " +
503 std::to_string(frameInfo.expectedSize) +
504 " bytes, got " + std::to_string(frameDataSize) +
510 bool isFirstFrame = (nextFrameIndex == 0) && !accumulatingListData;
517 listDataBuffer.clear();
521 for (
const auto &op : frameInfo.copyOps)
522 std::memcpy(translationBuffer.data() + op.bufferOffset,
523 frameData + op.frameOffset, op.size);
526 if (frameInfo.listField.has_value()) {
527 const auto &listInfo = frameInfo.listField.value();
530 size_t bytesToCopy = listInfo.elementSize;
532 if (listInfo.dataOffset > frameDataSize)
533 throw std::runtime_error(
"List data offset is beyond frame bounds");
535 if (listInfo.dataOffset + bytesToCopy > frameDataSize)
536 throw std::runtime_error(
"List data extends beyond frame bounds");
537 size_t oldSize = listDataBuffer.size();
538 listDataBuffer.resize(oldSize + bytesToCopy);
539 std::memcpy(listDataBuffer.data() + oldSize,
540 frameData + listInfo.dataOffset, bytesToCopy);
543 uint8_t lastFlag = frameData[listInfo.lastFieldOffset];
548 size_t listLength = listDataBuffer.size() / listInfo.elementSize;
549 size_t *listLengthPtr =
reinterpret_cast<size_t *
>(
550 translationBuffer.data() + listInfo.listLengthBufferOffset);
551 *listLengthPtr = listLength;
554 size_t headerSize = translationBuffer.size();
555 translationBuffer.resize(headerSize + listDataBuffer.size());
556 std::memcpy(translationBuffer.data() + headerSize, listDataBuffer.data(),
557 listDataBuffer.size());
561 listDataBuffer.clear();
562 accumulatingListData =
false;
568 accumulatingListData =
true;
577 if (nextFrameIndex >= numFrames) {
588 "Translation type must be set for window translation.");
590 const uint8_t *srcData = data.getBytes();
591 size_t srcDataSize = data.size();
593 if (srcDataSize < translationInfo->intoTypeBytes)
594 throw std::runtime_error(
"Source data too small: expected at least " +
596 " bytes, got " + std::to_string(srcDataSize) +
603 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
604 for (
const auto &op : frameInfo.copyOps)
605 std::memcpy(frameBuffer.data() + op.frameOffset,
606 srcData + op.bufferOffset, op.size);
607 translationBuffer.emplace_back(std::move(frameBuffer));
614 if (!frameInfo.listField.has_value()) {
616 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
617 for (
const auto &op : frameInfo.copyOps)
618 std::memcpy(frameBuffer.data() + op.frameOffset,
619 srcData + op.bufferOffset, op.size);
620 translationBuffer.emplace_back(std::move(frameBuffer));
623 const auto &listInfo = frameInfo.listField.value();
626 size_t listLength = 0;
627 std::memcpy(&listLength, srcData + listInfo.listLengthBufferOffset,
631 if (
translationInfo->intoTypeBytes + (listLength * listInfo.elementSize) >
633 throw std::runtime_error(
634 "Source buffer too small for list data: possible corrupted or "
635 "inconsistent list length field");
641 size_t itemsRemaining = listLength;
642 size_t listDataOffset = 0;
646 throw std::runtime_error(
647 "Cannot send empty lists - parallel ESI list encoding requires at "
648 "least one frame to be sent, and each frame must contain at least "
651 while (itemsRemaining > 0) {
652 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
655 for (
const auto &op : frameInfo.copyOps)
656 std::memcpy(frameBuffer.data() + op.frameOffset,
657 srcData + op.bufferOffset, op.size);
660 size_t bytesInThisFrame = listInfo.elementSize;
663 std::memcpy(frameBuffer.data() + listInfo.dataOffset,
664 listData + listDataOffset, bytesInThisFrame);
668 listDataOffset += bytesInThisFrame;
671 frameBuffer[listInfo.lastFieldOffset] = (itemsRemaining == 0) ? 1 : 0;
673 translationBuffer.emplace_back(std::move(frameBuffer));
679std::vector<MessageData>
682 assert(frameBytes > 0 &&
"Frame size must be greater than 0");
683 if (data.getSize() % frameBytes != 0)
684 throw std::runtime_error(
685 "write message size (" + std::to_string(data.getSize()) +
686 ") is not a multiple of frame size (" + std::to_string(frameBytes) +
688 std::vector<MessageData> frames;
689 size_t numFrames = data.getSize() / frameBytes;
690 const uint8_t *ptr = data.getBytes();
691 for (
size_t i = 0; i < numFrames; ++i) {
692 frames.emplace_back(ptr, frameBytes);
assert(baseType &&"element must be base type")
ReadChannelPort & getRawRead(const std::string &name) const
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
BundlePort(AppID id, const BundleType *type, PortMap channels)
Construct a port.
Bundles represent a collection of channels.
size_t getFrameSizeBytes() const
Get the size of each frame in bytes.
virtual void connectImpl(const ConnectOptions &options)
Called by all connect methods to let backends initiate the underlying connections.
ChannelPort(const Type *type)
std::unique_ptr< TranslationInfo > translationInfo
Channels are the basic communication primitives.
Lists represent variable-length sequences of elements of a single type.
A concrete flat message backed by a single vector of bytes.
MessageData toMessageData() const override
Flatten all segments into a standard MessageData.
A ChannelPort which reads data from the accelerator.
std::mutex callbackMutex
Synchronizes callback revocation during disconnect.
virtual void connect(ReadCallback callback, const ConnectOptions &options={})
virtual std::future< MessageData > readAsync()
Asynchronous polling read.
size_t nextFrameIndex
Index of the next expected frame (for multi-frame windows).
std::vector< uint8_t > translationBuffer
Window translation support.
bool accumulatingListData
Flag to track whether we're in the middle of accumulating list data.
void resetTranslationState()
Reset translation state buffers and indices.
std::optional< detail::PollingBuffer< MessageData > > pollingState
bool translateIncoming(MessageData &data)
Translate incoming data if the port type is a window type.
virtual void disconnect() override
Disconnect the channel.
std::unique_ptr< SegmentedMessageData > translatedMessage
If a translated message has been assembled but not yet consumed, retain ownership here so retries pre...
uint64_t maxDataQueueMsgs
std::function< bool(MessageData)> FlatReadCallback
Compatibility callback API for callers which want flattened message bytes instead of the owning segme...
ReadCallback callback
Backends should not call this directly.
std::vector< uint8_t > listDataBuffer
For list fields: accumulated list data across frames.
bool invokeCallback(std::unique_ptr< SegmentedMessageData > &msg)
Invoke the currently registered callback.
std::function< bool(std::unique_ptr< SegmentedMessageData > &)> ReadCallback
Primary callback API for raw reads.
std::condition_variable callbackCv
Structs are an ordered collection of fields, each with a name and a type.
const FieldVector & getFields() const
Root class of the ESI type system.
std::string toString(bool oneLine=false) const
virtual std::ptrdiff_t getBitWidth() const
Windows represent a fixed-size sliding window over a stream of data.
const std::vector< Frame > & getFrames() const
const std::string & getName() const
A ChannelPort which sends data to the accelerator.
void translateOutgoing(const MessageData &data)
Translate outgoing data if the port type is a window type.
std::vector< MessageData > getMessageFrames(const MessageData &data)
Break a message into its frames.
std::map< std::string, ChannelPort & > PortMap
std::optional< unsigned > bufferSize
The buffer size is optional and should be considered merely a hint.
bool translateMessage
If the type of this port is a window, translate the incoming/outgoing data into its underlying ('into...
A copy operation for translating between frame data and the translation.
size_t bufferOffset
Offset in the translation buffer.
size_t frameOffset
Offset in the incoming/outgoing frame data.
size_t size
Number of bytes to copy.
Information about each frame in the windowed type.
std::vector< CopyOp > copyOps
Precomputed copy operations for translating this frame (non-list fields).
std::optional< ListFieldInfo > listField
Information about list fields in this frame (parallel encoding).
size_t expectedSize
The total size of a frame in bytes.
Information about a list field within a frame (for parallel encoding).
size_t listLengthBufferOffset
Offset in the translation buffer where list length is stored.
size_t dataOffset
Offset of the list data array in the frame.
std::string fieldName
Name of the list field.
size_t lastFieldOffset
Offset of the 'last' field in the frame.
size_t elementSize
Size of each list element in bytes.
size_t listDataBufferOffset
Offset in the translation buffer where list data starts.
void precomputeFrameInfo()
Precompute and optimize the copy operations for translating frames.
void requireTranslationSupported() const
Throw if this window cannot be translated by the runtime translator (e.g.
const WindowType * windowType
The window type being translated.
Field information describing a field within a frame.