CIRCT 23.0.0git
Loading...
Searching...
No Matches
Ports.cpp
Go to the documentation of this file.
1//===- Ports.cpp - ESI communication channels -------------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// DO NOT EDIT!
10// This file is distributed as part of an ESI package. The source for this file
11// should always be modified within CIRCT (lib/dialect/ESI/runtime/cpp/).
12//
13//===----------------------------------------------------------------------===//
14
15#include "esi/Ports.h"
16#include "esi/Types.h"
17
18#include <algorithm>
19#include <chrono>
20#include <cstring>
21#include <map>
22#include <stdexcept>
23
24using namespace esi;
25
27 if (auto chanType = dynamic_cast<const ChannelType *>(type))
28 type = chanType->getInner();
29 auto translationType = dynamic_cast<const WindowType *>(type);
30 if (translationType) {
31 this->type = translationType->getIntoType();
32 this->translationInfo = std::make_unique<TranslationInfo>(translationType);
33 } else {
34 this->type = type;
35 this->translationInfo = nullptr;
36 }
37}
38
40 : id(id), type(type), channels(channels) {}
41
42WriteChannelPort &BundlePort::getRawWrite(const std::string &name) const {
43 auto f = channels.find(name);
44 if (f == channels.end())
45 throw std::runtime_error("Channel '" + name + "' not found");
46 auto *write = dynamic_cast<WriteChannelPort *>(&f->second);
47 if (!write)
48 throw std::runtime_error("Channel '" + name + "' is not a write channel");
49 return *write;
50}
51
52ReadChannelPort &BundlePort::getRawRead(const std::string &name) const {
53 auto f = channels.find(name);
54 if (f == channels.end())
55 throw std::runtime_error("Channel '" + name + "' not found");
56 auto *read = dynamic_cast<ReadChannelPort *>(&f->second);
57 if (!read)
58 throw std::runtime_error("Channel '" + name + "' is not a read channel");
59 return *read;
60}
61
69
71 {
72 std::unique_lock<std::mutex> lock(callbackMutex);
73 // Revoke the callback under the same mutex used by invokeCallback(). New
74 // deliveries need to observe the disconnected state and the callback
75 // nullification at the same time.
76 callback = nullptr;
78 // Wait for any callback which already snapped the old function to finish
79 // before clearing per-connection state below.
80 callbackCv.wait(lock, [this]() { return activeCallbacks == 0; });
81 }
82
83 pollingState.reset();
85}
86
88 std::unique_ptr<SegmentedMessageData> &msg) {
89 ReadCallback activeCallback;
90 {
91 std::lock_guard<std::mutex> lock(callbackMutex);
92 // Check the disconnected state and snapshot the callback while holding the
93 // same mutex as disconnect(). That prevents disconnect() from clearing the
94 // callback after we decide to invoke it but before we've retained our own
95 // safe local copy.
97 return false;
98 assert(callback && "Callback should be set in non-disconnected mode");
99 activeCallback = callback;
100 // Count only callbacks which successfully captured a callable target.
101 // disconnect() waits on this count so it can safely tear down state which
102 // the callback path may still reference.
104 }
105
106 // Release the in-flight slot on every exit path, including exceptions from
107 // the callback itself.
108 auto finish = [this]() {
109 std::lock_guard<std::mutex> lock(callbackMutex);
110 assert(activeCallbacks > 0 && "Callback count underflow");
112 if (activeCallbacks == 0)
113 callbackCv.notify_all();
114 };
115
116 // For zero-width port types (e.g. `VoidType`), transports send a single
117 // placeholder byte to satisfy the "every message is at least one byte"
118 // invariant. Only strip the payload when it matches that encoding so
119 // malformed transport data is surfaced instead of silently discarded. The
120 // matching pad happens in `WriteChannelPort::maybePadEmptyMessage`.
121 if (msg && type && type->getBitWidth() == 0) {
122 if (msg->totalSize() == 1) {
123 msg = std::make_unique<MessageData>();
124 } else {
125 throw std::runtime_error(
126 "zero-width message must use a single-byte placeholder payload");
127 }
128 }
129
130 try {
131 bool consumed = activeCallback(msg);
132 finish();
133 return consumed;
134 } catch (...) {
135 finish();
136 throw;
137 }
138}
139
141 const ConnectOptions &options) {
143 throw std::runtime_error("Channel already connected");
144
146
147 if (translationInfo)
148 translationInfo->precomputeFrameInfo();
149
150 if (options.translateMessage && translationInfo) {
151 translationInfo->requireTranslationSupported();
152 this->callback = [this, cb = std::move(callback)](
153 std::unique_ptr<SegmentedMessageData> &data) {
154 if (!translatedMessage) {
155 MessageData flat = data->toMessageData();
156 if (!translateIncoming(flat))
157 return true;
159 std::make_unique<MessageData>(std::move(translationBuffer));
160 }
161
162 if (!cb(translatedMessage))
163 return false;
164
165 translatedMessage.reset();
166 return true;
167 };
168 } else {
169 this->callback = callback;
170 }
171 connectImpl(options);
173}
174
176 const ConnectOptions &options) {
177 connect(
178 [cb = std::move(callback)](std::unique_ptr<SegmentedMessageData> &data)
179 -> bool { return cb(data->toMessageData()); },
180 options);
181}
182
185 throw std::runtime_error("Channel already connected");
186
187 if (options.bufferSize.has_value())
188 maxDataQueueMsgs = options.bufferSize.value();
190
192
193 if (translationInfo)
194 translationInfo->precomputeFrameInfo();
195
196 bool translate = options.translateMessage && translationInfo;
197 if (translate)
198 translationInfo->requireTranslationSupported();
199 this->callback = [this,
200 translate](std::unique_ptr<SegmentedMessageData> &msg) {
201 MessageData data;
202 if (translate) {
203 if (!translatedMessage) {
204 MessageData flat = msg->toMessageData();
205 if (!translateIncoming(flat))
206 return true;
208 std::make_unique<MessageData>(std::move(translationBuffer));
209 }
210 data = translatedMessage->toMessageData();
211 } else {
212 data = msg->toMessageData();
213 }
214
215 if (!pollingState->enqueue(data))
216 return false;
217
218 translatedMessage.reset();
219 return true;
220 };
221 connectImpl(options);
223}
224
225std::future<MessageData> ReadChannelPort::readAsync() {
226 if (mode == Mode::Callback)
227 throw std::runtime_error(
228 "Cannot read from a callback channel. Call `connect()` without a "
229 "callback specified to use polling mode.");
231 throw std::runtime_error(
232 "Cannot read from a disconnected channel. `connect()` the channel "
233 "without a callback before calling `readAsync()`.");
234
235 assert(mode == Mode::Polling && "Channel must be in polling mode to read");
236 assert(pollingState && "Polling state should be initialized in polling mode");
237
238 return pollingState->readAsync();
239}
240
241//===----------------------------------------------------------------------===//
242// Window translation support
243//===----------------------------------------------------------------------===//
244
246 // Reject serial-encoded windows: any field with bulkCountWidth > 0 indicates
247 // serial (bulk) encoding, which the translator does not yet support. Surface
248 // this clearly at connect()-time rather than silently buffering forever in
249 // translateIncoming().
250 for (const auto &frame : windowType->getFrames()) {
251 for (const auto &field : frame.fields) {
252 if (field.bulkCountWidth > 0)
253 throw std::runtime_error(
254 "Window translation for serial (bulk) list encoding is not "
255 "implemented (window '" +
256 windowType->getName() + "', frame '" + frame.name + "', field '" +
257 field.name +
258 "'). Connect the port with ConnectOptions{translateMessage=false} "
259 "and decode the frames manually.");
260 }
261 }
262}
263
265 const Type *intoType = windowType->getIntoType();
266
267 const StructType *intoStruct = dynamic_cast<const StructType *>(intoType);
268 if (!intoStruct)
269 throw std::runtime_error(
270 "Window intoType must be a struct for translation");
271
272 const auto &intoFields = intoStruct->getFields();
273
274 // Build a map from field name to (offset, type, isListField).
275 // For list fields, the offset is where the list_length field will be stored,
276 // and we also track the element type.
277 struct FieldInfo {
278 size_t offset;
279 const Type *type;
280 bool isList;
281 const Type *listElementType; // Only valid if isList is true
282 size_t listElementSize; // Only valid if isList is true
283 };
284 std::map<std::string, FieldInfo> fieldMap;
285 size_t currentOffset = 0;
286 hasListField = false;
287
288 auto processField = [&](const std::string &name, const Type *fieldType) {
289 auto *listType = dynamic_cast<const ListType *>(fieldType);
290 if (listType) {
291 hasListField = true;
292 // For list fields in the intoType:
293 // - We store a list_length (size_t / 8 bytes) followed by the list data
294 // - The offset here is where the list_length goes
295 const Type *elemType = listType->getElementType();
296 std::ptrdiff_t elemBits = elemType->getBitWidth();
297 if (elemBits < 0)
298 throw std::runtime_error(
299 "Cannot translate list with dynamically-sized element type: " +
300 name);
301 if (elemBits == 0)
302 throw std::runtime_error(
303 "Cannot translate list with zero-width element type: " + name);
304 if (elemBits % 8 != 0)
305 throw std::runtime_error(
306 "Cannot translate list element with non-byte-aligned size: " +
307 name);
308 size_t elemBytes = (static_cast<size_t>(elemBits) + 7) / 8;
309 fieldMap[name] = {currentOffset, fieldType, true, elemType, elemBytes};
310 // Reserve space for list_length. We use size_t (8 bytes on 64-bit
311 // platforms) for consistency with standard C/C++ container sizes.
312 // Note: This means the translated message format is platform-dependent.
313 currentOffset += sizeof(size_t);
314 // List data will be appended dynamically after the fixed header
315 } else {
316 std::ptrdiff_t fieldBits = fieldType->getBitWidth();
317 if (fieldBits < 0)
318 throw std::runtime_error("Cannot translate field with dynamic size: " +
319 name);
320 if (fieldBits % 8 != 0)
321 throw std::runtime_error(
322 "Cannot translate field with non-byte-aligned size: " + name);
323 size_t fieldBytes = (static_cast<size_t>(fieldBits) + 7) / 8;
324 fieldMap[name] = {currentOffset, fieldType, false, nullptr, 0};
325 currentOffset += fieldBytes;
326 }
327 };
328
329 if (intoStruct->isReverse()) {
330 for (auto it = intoFields.rbegin(); it != intoFields.rend(); ++it)
331 processField(it->first, it->second);
332 } else {
333 for (const auto &[name, fieldType] : intoFields)
334 processField(name, fieldType);
335 }
336
337 // intoTypeBytes is now the size of the fixed header portion
338 // (for types with lists, this excludes the variable-length list data)
339 intoTypeBytes = currentOffset;
340
341 const auto &windowFrames = windowType->getFrames();
342 frames.clear();
343 frames.reserve(windowFrames.size());
344
345 for (const auto &frame : windowFrames) {
346 FrameInfo frameInfo;
347 size_t frameOffset = 0;
348
349 // Calculate frame layout in SV memory order.
350 // SV structs are laid out with the last declared field at the lowest
351 // address. So when we iterate fields in reverse order (rbegin to rend),
352 // we get the memory layout order.
353 //
354 // For list fields, the lowered struct in CIRCT adds fields in this order:
355 // 1. list_data[numItems] - array of list elements
356 // 2. list_size - (if numItems > 1) count of valid items
357 // 3. last - indicates end of list
358 //
359 // In SV memory layout (reversed), this becomes:
360 // offset 0: last (1 byte)
361 // offset 1: list_size (if present)
362 // offset after size: list_data[numItems]
363 // offset after list: header fields...
364 struct FrameFieldLayout {
365 std::string name;
366 size_t frameOffset;
367 size_t size;
368 bool isList;
369 size_t numItems; // From window spec
370 size_t listElementSize; // Size of each list element
371 size_t bufferOffset; // Offset in translation buffer
372 const Type *listElementType; // Element type for list fields
373 };
374 std::vector<FrameFieldLayout> fieldLayouts;
375
376 for (auto fieldIt = frame.fields.rbegin(); fieldIt != frame.fields.rend();
377 ++fieldIt) {
378 const WindowType::Field &field = *fieldIt;
379 auto it = fieldMap.find(field.name);
380 if (it == fieldMap.end())
381 throw std::runtime_error("Frame field '" + field.name +
382 "' not found in intoType");
383
384 const FieldInfo &fieldInfo = it->second;
385 FrameFieldLayout layout;
386 layout.name = field.name;
387 layout.bufferOffset = fieldInfo.offset;
388 layout.isList = fieldInfo.isList;
389 layout.numItems = field.numItems;
390 layout.listElementType = fieldInfo.listElementType;
391 layout.listElementSize = fieldInfo.listElementSize;
392
393 if (fieldInfo.isList) {
394 // For list fields, the frame layout is (in memory order):
395 // 1. 'last' field (1 byte)
396 // 2. list_data (one element per frame)
397 // Note: numItems > 1 is not yet supported.
398 size_t numItems = field.numItems > 0 ? field.numItems : 1;
399 if (numItems != 1)
400 throw std::runtime_error(
401 "List translation with numItems > 1 is not yet supported. "
402 "Field '" +
403 field.name + "' has numItems=" + std::to_string(numItems));
404
405 // 'last' field comes first in memory
406 size_t lastOffset = frameOffset;
407 frameOffset += 1;
408
409 // List data comes after 'last'
410 layout.frameOffset = frameOffset;
411 layout.size = fieldInfo.listElementSize;
412 fieldLayouts.push_back(layout);
413 frameOffset += layout.size;
414
415 // Create ListFieldInfo for this list field
416 ListFieldInfo listInfo;
417 listInfo.fieldName = layout.name;
418 listInfo.dataOffset = layout.frameOffset;
419 listInfo.elementSize = fieldInfo.listElementSize;
420 listInfo.listLengthBufferOffset = layout.bufferOffset;
421 listInfo.listDataBufferOffset = intoTypeBytes;
422 listInfo.lastFieldOffset = lastOffset;
423 frameInfo.listField = listInfo;
424 } else {
425 // Regular (non-list) field
426 std::ptrdiff_t fieldBits = fieldInfo.type->getBitWidth();
427 layout.frameOffset = frameOffset;
428 layout.size = (static_cast<size_t>(fieldBits) + 7) / 8;
429 fieldLayouts.push_back(layout);
430 frameOffset += layout.size;
431 }
432 }
433
434 frameInfo.expectedSize = frameOffset;
435
436 // Second pass: build copy ops for non-list fields
437 for (const auto &layout : fieldLayouts) {
438 if (!layout.isList) {
439 // Regular field - add copy op
440 frameInfo.copyOps.push_back(
441 {layout.frameOffset, layout.bufferOffset, layout.size});
442 }
443 // List fields were already handled in the first pass
444 }
445
446 // Sort copy ops by frameOffset to ensure processing order matches frame
447 // layout.
448 std::sort(frameInfo.copyOps.begin(), frameInfo.copyOps.end(),
449 [](const CopyOp &a, const CopyOp &b) {
450 return a.frameOffset < b.frameOffset;
451 });
452
453 // Merge adjacent copy ops
454 if (!frameInfo.copyOps.empty()) {
455 std::vector<CopyOp> mergedOps;
456 mergedOps.push_back(frameInfo.copyOps[0]);
457
458 for (size_t i = 1; i < frameInfo.copyOps.size(); ++i) {
459 CopyOp &last = mergedOps.back();
460 CopyOp current = frameInfo.copyOps[i];
461
462 if (last.frameOffset + last.size == current.frameOffset &&
463 last.bufferOffset + last.size == current.bufferOffset) {
464 last.size += current.size;
465 } else {
466 mergedOps.push_back(current);
467 }
468 }
469
470 frameInfo.copyOps = std::move(mergedOps);
471 }
472
473 frames.push_back(std::move(frameInfo));
474 }
475
476 // Set frameBytes from the lowered type's bit width. Fall back to the
477 // maximum computed frame expectedSize if the type system can't report width
478 // (e.g. opaque union types).
479 std::ptrdiff_t loweredBits = windowType->getLoweredType()->getBitWidth();
480 if (loweredBits > 0) {
481 frameBytes = (loweredBits + 7) / 8;
482 } else {
483 frameBytes = 0;
484 for (const auto &f : frames)
485 frameBytes = std::max(frameBytes, f.expectedSize);
486 }
487}
488
491 "Translation type must be set for window translation.");
492
493 // Get the frame data directly.
494 const uint8_t *frameData = data.getBytes();
495 size_t frameDataSize = data.size();
496
497 // Get the frame metadata for the current expected frame.
498 const auto &frameInfo = translationInfo->frames[nextFrameIndex];
499
500 // Check size
501 if (frameDataSize < frameInfo.expectedSize)
502 throw std::runtime_error("Frame data too small: expected at least " +
503 std::to_string(frameInfo.expectedSize) +
504 " bytes, got " + std::to_string(frameDataSize) +
505 " bytes");
506
507 // Check if this is the first frame of a new message.
508 // For list frames, we use accumulatingListData to track whether we're
509 // continuing to accumulate list items.
510 bool isFirstFrame = (nextFrameIndex == 0) && !accumulatingListData;
511
512 if (isFirstFrame) {
513 // Initialize the translation buffer to hold the fixed header portion.
514 translationBuffer.resize(translationInfo->intoTypeBytes, 0);
515 // Clear list data buffer for types with lists
516 if (translationInfo->hasListField)
517 listDataBuffer.clear();
518 }
519
520 // Execute copy ops for non-list fields
521 for (const auto &op : frameInfo.copyOps)
522 std::memcpy(translationBuffer.data() + op.bufferOffset,
523 frameData + op.frameOffset, op.size);
524
525 // Handle list field if present in this frame
526 if (frameInfo.listField.has_value()) {
527 const auto &listInfo = frameInfo.listField.value();
528
529 // With numItems == 1, each frame contains exactly one list element
530 size_t bytesToCopy = listInfo.elementSize;
531 // Additional check: dataOffset must not be beyond frameDataSize
532 if (listInfo.dataOffset > frameDataSize)
533 throw std::runtime_error("List data offset is beyond frame bounds");
534 // Bounds check to prevent buffer overflow from corrupted _size field
535 if (listInfo.dataOffset + bytesToCopy > frameDataSize)
536 throw std::runtime_error("List data extends beyond frame bounds");
537 size_t oldSize = listDataBuffer.size();
538 listDataBuffer.resize(oldSize + bytesToCopy);
539 std::memcpy(listDataBuffer.data() + oldSize,
540 frameData + listInfo.dataOffset, bytesToCopy);
541
542 // Check if this is the last frame of the list
543 uint8_t lastFlag = frameData[listInfo.lastFieldOffset];
544 if (lastFlag) {
545 // List is complete. Build the final message:
546 // [fixed header with list_length (size_t)][list data...]
547 // Write the actual length to the listLengthBufferOffset position.
548 size_t listLength = listDataBuffer.size() / listInfo.elementSize;
549 size_t *listLengthPtr = reinterpret_cast<size_t *>(
550 translationBuffer.data() + listInfo.listLengthBufferOffset);
551 *listLengthPtr = listLength;
552
553 // Append list data to translation buffer
554 size_t headerSize = translationBuffer.size();
555 translationBuffer.resize(headerSize + listDataBuffer.size());
556 std::memcpy(translationBuffer.data() + headerSize, listDataBuffer.data(),
557 listDataBuffer.size());
558
559 // Reset for next message
560 nextFrameIndex = 0;
561 listDataBuffer.clear();
562 accumulatingListData = false;
563 return true;
564 }
565
566 // Not the last frame - stay on this frame index for list accumulation
567 // (list frames repeat until last=true)
568 accumulatingListData = true;
569 return false;
570 }
571
572 // No list field in this frame - advance to next frame
573 nextFrameIndex++;
574 size_t numFrames = translationInfo->frames.size();
575
576 // Check if all frames have been received.
577 if (nextFrameIndex >= numFrames) {
578 // Reset for the next message.
579 nextFrameIndex = 0;
580 return true;
581 }
582
583 return false;
584}
585
588 "Translation type must be set for window translation.");
589
590 const uint8_t *srcData = data.getBytes();
591 size_t srcDataSize = data.size();
592
593 if (srcDataSize < translationInfo->intoTypeBytes)
594 throw std::runtime_error("Source data too small: expected at least " +
595 std::to_string(translationInfo->intoTypeBytes) +
596 " bytes, got " + std::to_string(srcDataSize) +
597 " bytes");
598
599 // Check if we have list fields
600 if (!translationInfo->hasListField) {
601 // No list fields - simple fixed-size translation
602 for (const auto &frameInfo : translationInfo->frames) {
603 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
604 for (const auto &op : frameInfo.copyOps)
605 std::memcpy(frameBuffer.data() + op.frameOffset,
606 srcData + op.bufferOffset, op.size);
607 translationBuffer.emplace_back(std::move(frameBuffer));
608 }
609 return;
610 }
611
612 // Handle list fields - need to split list data into multiple frames
613 for (const auto &frameInfo : translationInfo->frames) {
614 if (!frameInfo.listField.has_value()) {
615 // Non-list frame - copy as normal
616 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
617 for (const auto &op : frameInfo.copyOps)
618 std::memcpy(frameBuffer.data() + op.frameOffset,
619 srcData + op.bufferOffset, op.size);
620 translationBuffer.emplace_back(std::move(frameBuffer));
621 } else {
622 // List frame - need to generate multiple frames
623 const auto &listInfo = frameInfo.listField.value();
624
625 // Read the list length from the source data
626 size_t listLength = 0;
627 std::memcpy(&listLength, srcData + listInfo.listLengthBufferOffset,
628 sizeof(size_t));
629
630 // Check that the buffer is large enough for the list data
631 if (translationInfo->intoTypeBytes + (listLength * listInfo.elementSize) >
632 srcDataSize) {
633 throw std::runtime_error(
634 "Source buffer too small for list data: possible corrupted or "
635 "inconsistent list length field");
636 }
637 // Get pointer to list data (after the fixed header)
638 const uint8_t *listData = srcData + translationInfo->intoTypeBytes;
639
640 // Generate frames for the list
641 size_t itemsRemaining = listLength;
642 size_t listDataOffset = 0;
643
644 // Handle empty list case - still need to send one frame with last=true
645 if (listLength == 0)
646 throw std::runtime_error(
647 "Cannot send empty lists - parallel ESI list encoding requires at "
648 "least one frame to be sent, and each frame must contain at least "
649 "one element.");
650
651 while (itemsRemaining > 0) {
652 std::vector<uint8_t> frameBuffer(frameInfo.expectedSize, 0);
653
654 // Copy non-list fields (header data) to each frame
655 for (const auto &op : frameInfo.copyOps)
656 std::memcpy(frameBuffer.data() + op.frameOffset,
657 srcData + op.bufferOffset, op.size);
658
659 // With numItems == 1, each frame contains exactly one list element
660 size_t bytesInThisFrame = listInfo.elementSize;
661
662 // Copy list data
663 std::memcpy(frameBuffer.data() + listInfo.dataOffset,
664 listData + listDataOffset, bytesInThisFrame);
665
666 // Update remaining count
667 itemsRemaining -= 1;
668 listDataOffset += bytesInThisFrame;
669
670 // Set last field
671 frameBuffer[listInfo.lastFieldOffset] = (itemsRemaining == 0) ? 1 : 0;
672
673 translationBuffer.emplace_back(std::move(frameBuffer));
674 }
675 }
676 }
677}
678
679std::vector<MessageData>
681 size_t frameBytes = getFrameSizeBytes();
682 assert(frameBytes > 0 && "Frame size must be greater than 0");
683 if (data.getSize() % frameBytes != 0)
684 throw std::runtime_error(
685 "write message size (" + std::to_string(data.getSize()) +
686 ") is not a multiple of frame size (" + std::to_string(frameBytes) +
687 ") on type " + type->toString());
688 std::vector<MessageData> frames;
689 size_t numFrames = data.getSize() / frameBytes;
690 const uint8_t *ptr = data.getBytes();
691 for (size_t i = 0; i < numFrames; ++i) {
692 frames.emplace_back(ptr, frameBytes);
693 ptr += frameBytes;
694 }
695 return frames;
696}
assert(baseType &&"element must be base type")
PortMap channels
Definition Ports.h:655
ReadChannelPort & getRawRead(const std::string &name) const
Definition Ports.cpp:52
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
Definition Ports.cpp:42
BundlePort(AppID id, const BundleType *type, PortMap channels)
Construct a port.
Definition Ports.cpp:39
Bundles represent a collection of channels.
Definition Types.h:104
const Type * type
Definition Ports.h:229
size_t getFrameSizeBytes() const
Get the size of each frame in bytes.
Definition Ports.h:214
virtual void connectImpl(const ConnectOptions &options)
Called by all connect methods to let backends initiate the underlying connections.
Definition Ports.h:304
ChannelPort(const Type *type)
Definition Ports.cpp:26
std::unique_ptr< TranslationInfo > translationInfo
Definition Ports.h:296
Channels are the basic communication primitives.
Definition Types.h:125
Lists represent variable-length sequences of elements of a single type.
Definition Types.h:386
A concrete flat message backed by a single vector of bytes.
Definition Common.h:155
MessageData toMessageData() const override
Flatten all segments into a standard MessageData.
Definition Common.h:213
A ChannelPort which reads data from the accelerator.
Definition Ports.h:453
volatile Mode mode
Definition Ports.h:550
std::mutex callbackMutex
Synchronizes callback revocation during disconnect.
Definition Ports.h:578
virtual void connect(ReadCallback callback, const ConnectOptions &options={})
Definition Ports.cpp:140
virtual std::future< MessageData > readAsync()
Asynchronous polling read.
Definition Ports.cpp:225
size_t nextFrameIndex
Index of the next expected frame (for multi-frame windows).
Definition Ports.h:559
std::vector< uint8_t > translationBuffer
Window translation support.
Definition Ports.h:557
bool accumulatingListData
Flag to track whether we're in the middle of accumulating list data.
Definition Ports.h:563
void resetTranslationState()
Reset translation state buffers and indices.
Definition Ports.cpp:62
std::optional< detail::PollingBuffer< MessageData > > pollingState
Definition Ports.h:575
bool translateIncoming(MessageData &data)
Translate incoming data if the port type is a window type.
Definition Ports.cpp:489
virtual void disconnect() override
Disconnect the channel.
Definition Ports.cpp:70
size_t activeCallbacks
Definition Ports.h:580
std::unique_ptr< SegmentedMessageData > translatedMessage
If a translated message has been assembled but not yet consumed, retain ownership here so retries pre...
Definition Ports.h:554
uint64_t maxDataQueueMsgs
Definition Ports.h:574
std::function< bool(MessageData)> FlatReadCallback
Compatibility callback API for callers which want flattened message bytes instead of the owning segme...
Definition Ports.h:466
ReadCallback callback
Backends should not call this directly.
Definition Ports.h:545
std::vector< uint8_t > listDataBuffer
For list fields: accumulated list data across frames.
Definition Ports.h:561
bool invokeCallback(std::unique_ptr< SegmentedMessageData > &msg)
Invoke the currently registered callback.
Definition Ports.cpp:87
std::function< bool(std::unique_ptr< SegmentedMessageData > &)> ReadCallback
Primary callback API for raw reads.
Definition Ports.h:462
std::condition_variable callbackCv
Definition Ports.h:579
Structs are an ordered collection of fields, each with a name and a type.
Definition Types.h:246
bool isReverse() const
Definition Types.h:275
const FieldVector & getFields() const
Definition Types.h:254
Root class of the ESI type system.
Definition Types.h:36
std::string toString(bool oneLine=false) const
Definition Types.cpp:120
virtual std::ptrdiff_t getBitWidth() const
Definition Types.h:43
Windows represent a fixed-size sliding window over a stream of data.
Definition Types.h:316
const std::vector< Frame > & getFrames() const
Definition Types.h:340
const std::string & getName() const
Definition Types.h:337
A ChannelPort which sends data to the accelerator.
Definition Ports.h:308
void translateOutgoing(const MessageData &data)
Translate outgoing data if the port type is a window type.
Definition Ports.cpp:586
std::vector< MessageData > getMessageFrames(const MessageData &data)
Break a message into its frames.
Definition Ports.cpp:680
Definition esi.py:1
std::map< std::string, ChannelPort & > PortMap
Definition Ports.h:33
std::optional< unsigned > bufferSize
The buffer size is optional and should be considered merely a hint.
Definition Ports.h:124
bool translateMessage
If the type of this port is a window, translate the incoming/outgoing data into its underlying ('into...
Definition Ports.h:183
A copy operation for translating between frame data and the translation.
Definition Ports.h:248
size_t bufferOffset
Offset in the translation buffer.
Definition Ports.h:252
size_t frameOffset
Offset in the incoming/outgoing frame data.
Definition Ports.h:250
size_t size
Number of bytes to copy.
Definition Ports.h:254
Information about each frame in the windowed type.
Definition Ports.h:276
std::vector< CopyOp > copyOps
Precomputed copy operations for translating this frame (non-list fields).
Definition Ports.h:281
std::optional< ListFieldInfo > listField
Information about list fields in this frame (parallel encoding).
Definition Ports.h:284
size_t expectedSize
The total size of a frame in bytes.
Definition Ports.h:278
Information about a list field within a frame (for parallel encoding).
Definition Ports.h:260
size_t listLengthBufferOffset
Offset in the translation buffer where list length is stored.
Definition Ports.h:270
size_t dataOffset
Offset of the list data array in the frame.
Definition Ports.h:264
std::string fieldName
Name of the list field.
Definition Ports.h:262
size_t lastFieldOffset
Offset of the 'last' field in the frame.
Definition Ports.h:268
size_t elementSize
Size of each list element in bytes.
Definition Ports.h:266
size_t listDataBufferOffset
Offset in the translation buffer where list data starts.
Definition Ports.h:272
void precomputeFrameInfo()
Precompute and optimize the copy operations for translating frames.
Definition Ports.cpp:264
void requireTranslationSupported() const
Throw if this window cannot be translated by the runtime translator (e.g.
Definition Ports.cpp:245
const WindowType * windowType
The window type being translated.
Definition Ports.h:244
Field information describing a field within a frame.
Definition Types.h:319
std::string name
Definition Types.h:320