CIRCT 23.0.0git
Loading...
Searching...
No Matches
TypedPorts.h
Go to the documentation of this file.
1//===- TypedPorts.h - Strongly-typed ESI port wrappers ----------*- C++ -*-===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// DO NOT EDIT!
10// This file is distributed as part of an ESI package. The source for this file
11// should always be modified within CIRCT.
12//
13//===----------------------------------------------------------------------===//
14//
15// Thin, non-owning wrappers around WriteChannelPort / ReadChannelPort that
16// verify type compatibility at connect() time and provide strongly-typed
17// write/read APIs. Purely additive — no changes to the untyped port classes.
18//
19//===----------------------------------------------------------------------===//
20
21// NOLINTNEXTLINE(llvm-header-guard)
22#ifndef ESI_TYPED_PORTS_H
23#define ESI_TYPED_PORTS_H
24
25#include "esi/Design.h"
26#include "esi/Ports.h"
27#include "esi/Services.h"
28#include "esi/Types.h"
29
30#include <algorithm>
31#include <array>
32#include <cstdint>
33#include <cstring>
34#include <functional>
35#include <future>
36#include <map>
37#include <memory>
38#include <mutex>
39#include <optional>
40#include <queue>
41#include <stdexcept>
42#include <string>
43#include <string_view>
44#include <type_traits>
45#include <typeinfo>
46
47namespace esi {
48
49//===----------------------------------------------------------------------===//
50// AcceleratorMismatchError — thrown for type mismatches and port-not-found.
51//===----------------------------------------------------------------------===//
52
53class AcceleratorMismatchError : public std::runtime_error {
54public:
55 using std::runtime_error::runtime_error;
56};
57
58//===----------------------------------------------------------------------===//
59// Helpers: unwrap TypeAliasType and width-aware serialization.
60//===----------------------------------------------------------------------===//
61
62/// Unwrap TypeAliasType (possibly recursively) to get the underlying type.
63inline const Type *unwrapTypeAlias(const Type *t) {
64 while (auto *alias = dynamic_cast<const TypeAliasType *>(t))
65 t = alias->getInnerType();
66 return t;
67}
68
69/// Compute the wire byte count for a port type. Returns 0 if not a
70/// BitVectorType (meaning sizeof(T) should be used instead).
71/// Wire format info for a port type, cached at connect() time.
72struct WireInfo {
73 size_t bytes = 0; // (bitWidth+7)/8, or 0 if not a BitVectorType.
74 size_t bitWidth = 0;
75};
76
77inline WireInfo getWireInfo(const Type *portType) {
78 const Type *inner = unwrapTypeAlias(portType);
79 if (auto *bv = dynamic_cast<const BitVectorType *>(inner))
80 return {(bv->getWidth() + 7) / 8, bv->getWidth()};
81 return {};
82}
83
84/// Pack a C++ integral value into a MessageData with the given wire byte count.
85/// If wi.bytes is 0, falls back to sizeof(T).
86template <typename T>
88 if constexpr (std::is_integral_v<T> && !std::is_same_v<T, bool>) {
89 if (wi.bytes > 0 && wi.bytes != sizeof(T)) {
90 std::vector<uint8_t> buf(wi.bytes, 0);
91 std::memcpy(buf.data(), &data, std::min(wi.bytes, sizeof(T)));
92 return MessageData(std::move(buf));
93 }
94 } else if constexpr (std::is_base_of_v<SegmentedMessageData, T>) {
95 return data.toMessageData();
96 }
97 return MessageData(reinterpret_cast<const uint8_t *>(&data), sizeof(T));
98}
99
100template <typename T>
101MessageData toMessageData(const T &data, const Type *portType) {
102 return toMessageData(data, getWireInfo(portType));
103}
104
105/// Unpack a MessageData into a C++ integral value with the given wire info.
106/// If the wire size differs from sizeof(T), copies available bytes into
107/// a zero-initialized value and sign-extends for signed types using the
108/// actual bit width to locate the sign bit.
109template <typename T>
111 if constexpr (std::is_integral_v<T> && !std::is_same_v<T, bool>) {
112 if (wi.bytes > 0 && msg.getSize() == wi.bytes &&
113 wi.bitWidth < sizeof(T) * 8) {
114 // Copy wire bytes into a zero-initialized value.
115 T val = 0;
116 std::memcpy(&val, msg.getBytes(), std::min(wi.bytes, sizeof(T)));
117 // Sign-extend for signed types if the sign bit is set.
118 if constexpr (std::is_signed_v<T>) {
119 size_t signBit = wi.bitWidth - 1;
120 size_t signByte = signBit / 8;
121 uint8_t signMask = uint8_t(1) << (signBit % 8);
122 if (signByte < wi.bytes && (msg.getBytes()[signByte] & signMask)) {
123 // Set all bits above the sign bit to 1.
124 if (wi.bitWidth < sizeof(T) * 8)
125 val |= static_cast<T>(~T(0)) << wi.bitWidth;
126 }
127 }
128 return val;
129 }
130 }
131 return *msg.as<T>();
132}
133
134template <typename T>
135T fromMessageData(const MessageData &msg, const Type *portType) {
136 return fromMessageData<T>(msg, getWireInfo(portType));
137}
138
139namespace detail {
140
141/// Owning callback used by typed read deserializers.
142///
143/// Returning `false` means the callee did not accept the object and wants the
144/// exact same owned value retried later.
145template <typename T>
146using TypedReadOwnedCallback = std::function<bool(std::unique_ptr<T> &)>;
147
148template <typename T, typename = void>
149struct has_type_deserializer : std::false_type {};
150
151template <typename T>
152struct has_type_deserializer<T, std::void_t<typename T::TypeDeserializer>>
153 : std::true_type {};
154
155template <typename T>
157
158template <typename T>
160 MessageData &scratch) {
161 if (auto *flat = dynamic_cast<const MessageData *>(&msg))
162 return *flat;
163 scratch = msg.toMessageData();
164 return scratch;
165}
166
167/// Default deserializer for simple 1:1 typed reads.
168///
169/// This path converts one raw message into one typed value and forwards it to
170/// the typed callback. If the callback rejects the decoded object, preserve it
171/// until the raw message is retried so the same owned object is presented
172/// again.
173template <typename T>
175public:
177
180
181 bool push(std::unique_ptr<SegmentedMessageData> &msg) {
182 std::scoped_lock<std::mutex> lock(mutex);
183 if (!msg)
184 throw std::runtime_error("PODTypeDeserializer::push: null message");
185
186 // Flush the buffer, bail if still full.
187 pokeLocked();
188 if (pendingOutput)
189 return false;
190
191 // Translate the raw message into a typed object. This always consumes the
192 // raw message.
193 MessageData scratch;
194 const MessageData &flat = getMessageDataRef<T>(*msg, scratch);
195 pendingOutput = std::make_unique<T>(fromMessageData<T>(flat, wireInfo));
196 msg.reset();
197 pokeLocked();
198 return true;
199 }
200
201 bool poke() {
202 std::scoped_lock<std::mutex> lock(mutex);
203 return pokeLocked();
204 }
205
206private:
207 bool pokeLocked() {
209 pendingOutput.reset();
210 return true;
211 }
212 return false;
213 }
214
217 std::mutex mutex;
218 std::unique_ptr<T> pendingOutput;
219};
220
221template <typename T, typename = void>
225
226template <typename T>
227struct DeserializerSelector<T, std::void_t<typename T::TypeDeserializer>> {
228 using type = typename T::TypeDeserializer;
229};
230
231template <typename T>
233
234template <typename T>
236
237} // namespace detail
238
239/// Helper base class for stateful deserializers which may emit zero, one, or
240/// many typed outputs for each raw input message.
241///
242/// Derived classes implement `decode()` and must consume the raw input message
243/// before returning. This base class handles retrying blocked typed outputs and
244/// preserving them in FIFO order until the client accepts them.
245template <typename T>
247public:
249 using DecodedOutputs = std::vector<std::unique_ptr<T>>;
250
253
254 virtual ~QueuedDecodeTypeDeserializer() = default;
255
256 /// Push one raw message into the deserializer.
257 ///
258 /// Returns `false` only when previously decoded typed outputs are still
259 /// blocked on delivery. In that case `msg` is left untouched so the caller
260 /// can retry it later.
261 bool push(std::unique_ptr<SegmentedMessageData> &msg) {
262 std::scoped_lock<std::mutex> lock(mutex);
263 if (!msg)
264 throw std::runtime_error(
265 "QueuedDecodeTypeDeserializer::push: null message");
266
267 if (!pokeLocked())
268 return false;
269
270 DecodedOutputs decoded = decode(msg);
271 if (msg)
272 throw std::runtime_error(
273 "QueuedDecodeTypeDeserializer::push: decode must consume the "
274 "message");
275 for (std::unique_ptr<T> &value : decoded) {
276 if (!value)
277 throw std::runtime_error(
278 "QueuedDecodeTypeDeserializer::push: null decoded output");
279 pendingOutputs.push(std::move(value));
280 }
281
282 // Once decode() has consumed the raw message, keep any blocked typed
283 // outputs in the pending queue and report success. pokeLocked() still
284 // opportunistically drains what it can before returning.
285 pokeLocked();
286 return true;
287 }
288
289 /// Retry delivery of any typed outputs which were previously blocked by the
290 /// client callback.
291 bool poke() {
292 std::scoped_lock<std::mutex> lock(mutex);
293 return pokeLocked();
294 }
295
296protected:
297 /// Decode one raw message into zero or more typed outputs.
298 ///
299 /// Implementations must consume `msg` before returning, even when zero
300 /// outputs are produced.
301 virtual DecodedOutputs decode(std::unique_ptr<SegmentedMessageData> &msg) = 0;
302
303private:
304 bool pokeLocked() {
305 while (!pendingOutputs.empty()) {
306 std::unique_ptr<T> &value = pendingOutputs.front();
307 if (!value)
308 throw std::runtime_error(
309 "QueuedDecodeTypeDeserializer::poke: null pending output");
310 if (!output(value))
311 return false;
312 pendingOutputs.pop();
313 }
314 return true;
315 }
316
318 std::queue<std::unique_ptr<T>> pendingOutputs;
319 std::mutex mutex;
320};
321
322/// Reusable serial-list window deserializer.
323///
324/// Walks the serial-list multi-burst protocol used by codegen'd window
325/// helpers -- zero or more `header(N>0) + N data` bursts terminated by a
326/// `header(0)` footer -- and emits a fully-formed `T` instance built from the
327/// accumulated header fields and data frames.
328///
329/// `T` must be a generated window helper that exposes:
330/// - nested `header_frame` / `data_frame` types of identical size,
331/// - `count_type`,
332/// - `static count_type T::_headerCount(const header_frame &)`,
333/// - `static std::unique_ptr<T> T::_fromFrames(const header_frame &,
334/// std::vector<data_frame> &&)`.
335///
336/// `T` typically declares `SerialListTypeDeserializer<T>` as a friend so the
337/// template can reach the (private) `header_frame` definition; the helpers
338/// themselves can stay private as well.
339template <typename T>
341public:
343 using OutputCallback = typename Base::OutputCallback;
344 using DecodedOutputs = typename Base::DecodedOutputs;
345
348
349private:
350 using header_frame = typename T::header_frame;
351 using data_frame = typename T::data_frame;
352 using count_type = typename T::count_type;
353
354 static_assert(sizeof(header_frame) == sizeof(data_frame),
355 "header and data frames must be the same width");
356 static constexpr size_t kFrameSize = sizeof(data_frame);
357
358 DecodedOutputs decode(std::unique_ptr<SegmentedMessageData> &msg) override {
359 DecodedOutputs out;
360 MessageData scratch;
361 const MessageData &flat = detail::getMessageDataRef<T>(*msg, scratch);
362 const uint8_t *bytes = flat.getBytes();
363 size_t size = flat.getSize();
364
365 size_t offset = 0;
366 while (offset < size) {
367 size_t needed = kFrameSize - partial_.size();
368 size_t chunk = std::min(needed, size - offset);
369 partial_.insert(partial_.end(), bytes + offset, bytes + offset + chunk);
370 offset += chunk;
371 if (partial_.size() != kFrameSize)
372 break;
373
374 if (remaining_ == 0) {
375 // Header or footer frame. Decode into a local frame first so we can
376 // inspect the count without committing. Only the first header of a
377 // transaction is guaranteed to carry valid static fields; the static
378 // slots of continuation and footer headers may be garbage.
379 header_frame frame{};
380 std::memcpy(&frame, partial_.data(), kFrameSize);
381 partial_.clear();
382 count_type batchCount = T::_headerCount(frame);
383 if (batchCount == 0) {
384 // Footer: emit the accumulated value using the first header's
385 // static fields.
386 if (!pending_header_)
387 throw std::runtime_error(
388 "SerialListTypeDeserializer: footer received before any "
389 "header");
390 out.push_back(
391 T::_fromFrames(*pending_header_, std::move(pending_frames_)));
392 pending_frames_.clear();
393 pending_header_.reset();
394 continue;
395 }
396
397 if (!pending_header_)
398 pending_header_ = frame;
399 remaining_ = batchCount;
400 continue;
401 }
402
403 // Data frame.
404 auto &frame = pending_frames_.emplace_back();
405 std::memcpy(&frame, partial_.data(), kFrameSize);
406 partial_.clear();
407 --remaining_;
408 }
409
410 msg.reset();
411 return out;
412 }
413
414 std::vector<uint8_t> partial_;
415 std::optional<header_frame> pending_header_;
416 std::vector<data_frame> pending_frames_;
418};
419
420//===----------------------------------------------------------------------===//
421// Type-trait: detect T::_ESI_ID (a static constexpr std::string_view).
422//===----------------------------------------------------------------------===//
423
424template <typename T, typename = void>
425struct has_esi_id : std::false_type {};
426
427template <typename T>
428struct has_esi_id<T, std::void_t<decltype(T::_ESI_ID)>>
429 : std::is_convertible<decltype(T::_ESI_ID), std::string_view> {};
430
431template <typename T>
432inline constexpr bool has_esi_id_v = has_esi_id<T>::value;
433
434// Detect T::_ESI_WINDOW_ID (a static constexpr std::string_view) for
435// generated SegmentedMessageData subclasses bound to a specific WindowType.
436template <typename T, typename = void>
437struct has_esi_window_id : std::false_type {};
438
439template <typename T>
440struct has_esi_window_id<T, std::void_t<decltype(T::_ESI_WINDOW_ID)>>
441 : std::is_convertible<decltype(T::_ESI_WINDOW_ID), std::string_view> {};
442
443template <typename T>
445
446//===----------------------------------------------------------------------===//
447// verifyTypeCompatibility<T>(const Type *portType)
448//
449// Checks that the ESI runtime Type is compatible with the C++ type T.
450// Dispatch order: _ESI_ID → void → bool → std::array → signed int →
451// unsigned int → error.
452//===----------------------------------------------------------------------===//
453
454// Detect ``std::array<T, N>``.
455template <typename T>
456struct is_std_array : std::false_type {};
457template <typename T, std::size_t N>
458struct is_std_array<std::array<T, N>> : std::true_type {};
459template <typename T>
461
462template <typename T>
463void verifyTypeCompatibility(const Type *portType) {
464 if (!portType)
465 throw AcceleratorMismatchError("Port type is null");
466
467 // Unwrap TypeAliasType to get the inner type for verification.
468 portType = unwrapTypeAlias(portType);
469
470 // If the port is a windowed type, verify the window id (if T declares one)
471 // and then continue checking the inner ('into') type.
472 if (auto *windowType = dynamic_cast<const WindowType *>(portType)) {
473 if constexpr (has_esi_window_id_v<T>) {
474 if (std::string_view(windowType->getID()) != T::_ESI_WINDOW_ID)
476 "ESI window mismatch: C++ type has _ESI_WINDOW_ID '" +
477 std::string(T::_ESI_WINDOW_ID) + "' but port window type is '" +
478 windowType->toString(/*oneLine=*/true) + "'");
479 } else {
481 "ESI type mismatch: port is a window type ('" +
482 windowType->toString(/*oneLine=*/true) +
483 "') but C++ type has no _ESI_WINDOW_ID");
484 }
485 portType = unwrapTypeAlias(windowType->getIntoType());
486 } else if constexpr (has_esi_window_id_v<T>) {
488 std::string("ESI type mismatch: C++ type has _ESI_WINDOW_ID '") +
489 std::string(T::_ESI_WINDOW_ID) + "' but port type is not a window: '" +
490 portType->toString(/*oneLine=*/true) + "'");
491 }
492
493 if constexpr (has_esi_id_v<T>) {
494 // Highest priority: user-defined ESI ID string comparison.
495 if (std::string_view(portType->getID()) != T::_ESI_ID)
497 "ESI type mismatch: C++ type has _ESI_ID '" +
498 std::string(T::_ESI_ID) + "' but port type is '" +
499 portType->toString(/*oneLine=*/true) + "'");
500 } else if constexpr (std::is_void_v<T>) {
501 if (!dynamic_cast<const VoidType *>(portType))
502 throw AcceleratorMismatchError("ESI type mismatch: expected VoidType for "
503 "void, but port type is '" +
504 portType->toString(/*oneLine=*/true) +
505 "'");
506 } else if constexpr (std::is_same_v<T, bool>) {
507 // bool maps to signless i1, which is BitsType with width <= 1.
508 auto *bits = dynamic_cast<const BitsType *>(portType);
509 if (!bits || bits->getWidth() > 1)
511 "ESI type mismatch: expected BitsType with width <= 1 for "
512 "bool, but port type is '" +
513 portType->toString(/*oneLine=*/true) + "'");
514 } else if constexpr (is_std_array_v<T>) {
515 // std::array<U, N> maps to ESI ArrayType with matching size and element
516 // type. The ArrayType's `reverse` flag controls only wire-byte ordering
517 // during serialization (handled in toMessageData/fromMessageData) and
518 // does not affect compatibility.
519 using Element = typename T::value_type;
520 constexpr size_t N = std::tuple_size<T>::value;
521 auto *arr = dynamic_cast<const ArrayType *>(portType);
522 if (!arr)
524 "ESI type mismatch: expected ArrayType for std::array, but port "
525 "type is '" +
526 portType->toString(/*oneLine=*/true) + "'");
527 if (arr->getSize() != N)
528 throw AcceleratorMismatchError("ESI type mismatch: ArrayType size " +
529 std::to_string(arr->getSize()) +
530 " does not match std::array size " +
531 std::to_string(N));
532 verifyTypeCompatibility<Element>(arr->getElementType());
533 } else if constexpr (std::is_integral_v<T> && std::is_signed_v<T>) {
534 auto *sint = dynamic_cast<const SIntType *>(portType);
535 if (!sint)
537 "ESI type mismatch: expected SIntType for signed integer, "
538 "but port type is '" +
539 portType->toString(/*oneLine=*/true) + "'");
540 if (sint->getWidth() > sizeof(T) * 8)
542 "ESI type mismatch: SIntType width " +
543 std::to_string(sint->getWidth()) + " does not fit in " +
544 std::to_string(sizeof(T) * 8) + "-bit signed integer");
545 // Require closest-size match: reject if a smaller C++ type would suffice.
546 if (sizeof(T) > 1 && sint->getWidth() <= (sizeof(T) / 2) * 8)
547 throw AcceleratorMismatchError("ESI type mismatch: SIntType width " +
548 std::to_string(sint->getWidth()) +
549 " should use a smaller C++ type than " +
550 std::to_string(sizeof(T) * 8) + "-bit");
551 } else if constexpr (std::is_integral_v<T> && std::is_unsigned_v<T>) {
552 // Accept UIntType (uiM) or BitsType (iM, signless).
553 auto *uintPort = dynamic_cast<const UIntType *>(portType);
554 auto *bits = dynamic_cast<const BitsType *>(portType);
555 if (!uintPort && !bits)
557 "ESI type mismatch: expected UIntType or BitsType for unsigned "
558 "integer, but port type is '" +
559 portType->toString(/*oneLine=*/true) + "'");
560 uint64_t width = uintPort ? uintPort->getWidth() : bits->getWidth();
561 if (width > sizeof(T) * 8)
563 "ESI type mismatch: bit width " + std::to_string(width) +
564 " does not fit in " + std::to_string(sizeof(T) * 8) +
565 "-bit unsigned integer");
566 // Require closest-size match: reject if a smaller C++ type would suffice.
567 if (sizeof(T) > 1 && width <= (sizeof(T) / 2) * 8)
568 throw AcceleratorMismatchError("ESI type mismatch: bit width " +
569 std::to_string(width) +
570 " should use a smaller C++ type than " +
571 std::to_string(sizeof(T) * 8) + "-bit");
572 } else {
574 std::string("Cannot verify type compatibility for C++ type '") +
575 typeid(T).name() + "' against ESI port type '" +
576 portType->toString(/*oneLine=*/true) + "'");
577 }
578}
579
580// Port-aware overload: when checking against a ChannelPort, also reconcile
581// the windowed wrapper. ChannelPort::getType() returns the unwrapped 'into'
582// type for windowed ports, so we have to consult getWindowType() directly to
583// detect the window and forward the original WindowType into the Type-based
584// overload above.
585template <typename T>
587 if (!port)
588 throw AcceleratorMismatchError("Port is null");
589 const WindowType *windowType = port->getWindowType();
590 if constexpr (has_esi_window_id_v<T>) {
591 if (!windowType)
593 std::string("ESI type mismatch: C++ type has _ESI_WINDOW_ID '") +
594 std::string(T::_ESI_WINDOW_ID) +
595 "' but port is not a window type ('" +
596 (port->getType() ? port->getType()->toString(/*oneLine=*/true)
597 : std::string("<null>")) +
598 "')");
599 } else {
600 if (windowType)
602 "ESI type mismatch: port is a window type ('" +
603 windowType->toString(/*oneLine=*/true) +
604 "') but C++ type has no _ESI_WINDOW_ID");
605 }
606 const Type *forwardType =
607 windowType ? static_cast<const Type *>(windowType) : port->getType();
608 verifyTypeCompatibility<T>(forwardType);
609}
610
611//===----------------------------------------------------------------------===//
612// TypedWritePort<T, SkipTypeCheck = false>
613//
614// When SkipTypeCheck is false, the `connect` method runs the type check via
615// `verifyTypeCompatibility<T>()`. When SkipTypeCheck is true, `connect`
616// skips that verification.
617//===----------------------------------------------------------------------===//
618
619template <typename T, bool SkipTypeCheck = false>
621public:
622 explicit TypedWritePort(WriteChannelPort &port) : inner(&port) {}
623 // NOLINTNEXTLINE(google-explicit-constructor)
625
626 void connect(const ChannelPort::ConnectOptions &opts = {std::nullopt,
627 false}) {
628 if (!inner)
629 throw AcceleratorMismatchError("TypedWritePort: null port pointer");
630 if (!SkipTypeCheck)
631 verifyTypeCompatibility<T>(inner);
633 inner->connect(opts);
634 }
635
636 void write(const T &data) { inner->write(toMessageData(data, wireInfo_)); }
637
638 /// Write by taking ownership. If T is a SegmentedMessageData, this hands
639 /// the message directly to the port's segmented write path.
640 void write(std::unique_ptr<T> &data) {
641 if (!data)
642 throw std::runtime_error("TypedWritePort::write: null unique_ptr");
643 if constexpr (std::is_base_of_v<SegmentedMessageData, T>) {
644 inner->write(std::move(data));
645 } else {
646 write(*data);
647 data.reset();
648 }
649 }
650
651 bool tryWrite(const T &data) {
652 return inner->tryWrite(toMessageData(data, wireInfo_));
653 }
654
655 bool flush() { return inner->flush(); }
657 bool isConnected() const { return inner && inner->isConnected(); }
658
659 WriteChannelPort &raw() { return *inner; }
660 const WriteChannelPort &raw() const { return *inner; }
661
662private:
665};
666
667/// Specialization for void — write takes no data argument.
668template <>
669class TypedWritePort<void> {
670public:
671 explicit TypedWritePort(WriteChannelPort &port) : inner(&port) {}
672 // NOLINTNEXTLINE(google-explicit-constructor)
674
675 void connect(const ChannelPort::ConnectOptions &opts = {std::nullopt,
676 false}) {
677 if (!inner)
678 throw AcceleratorMismatchError("TypedWritePort: null port pointer");
679 verifyTypeCompatibility<void>(inner->getType());
680 inner->connect(opts);
681 }
682
683 void write() {
684 uint8_t zero = 0;
685 inner->write(MessageData(&zero, 1));
686 }
687
688 bool tryWrite() {
689 uint8_t zero = 0;
690 return inner->tryWrite(MessageData(&zero, 1));
691 }
692
693 bool flush() { return inner->flush(); }
695 bool isConnected() const { return inner && inner->isConnected(); }
696
697 WriteChannelPort &raw() { return *inner; }
698 const WriteChannelPort &raw() const { return *inner; }
699
700private:
702};
703
704//===----------------------------------------------------------------------===//
705// TypedReadPort<T>
706//===----------------------------------------------------------------------===//
707
708/// Strongly typed wrapper around a raw read channel.
709///
710/// For scalar/POD-like `T`, this performs a 1:1 conversion from raw messages.
711/// If `T` defines a nested `TypeDeserializer`, one instance is created per
712/// connection and drives both callback and polling reads through that
713/// deserializer.
714///
715/// Polling reads return `std::unique_ptr<T>` so complex decoded values can be
716/// delivered without an extra copy.
717template <typename T, bool SkipTypeCheck = false>
719public:
720 explicit TypedReadPort(ReadChannelPort &port) : inner(&port) {}
721 // NOLINTNEXTLINE(google-explicit-constructor)
723 TypedReadPort(const TypedReadPort &) = delete;
727
729 if (inner && mode != Mode::Disconnected)
730 disconnect();
731 }
732
733 /// Connect in polling mode.
734 ///
735 /// The port installs an internal typed output queue. `read()` and
736 /// `readAsync()` consume from that queue.
737 void connect(const ChannelPort::ConnectOptions &opts = {std::nullopt,
738 false}) {
739 if (!inner)
740 throw AcceleratorMismatchError("TypedReadPort: null port pointer");
743 emplaceDeserializer([this](std::unique_ptr<T> &value) -> bool {
744 return pollingState->enqueue(value);
745 });
746 try {
747 inner->connect(
748 [this](std::unique_ptr<SegmentedMessageData> &msg) -> bool {
749 assert(deserializer && "Deserializer should be connected");
750 return deserializer->push(msg);
751 },
752 opts);
753 } catch (...) {
755 throw;
756 }
758 }
759
760 /// Connect a non-owning typed callback.
761 ///
762 /// The callback sees the decoded value by reference. Returning `false`
763 /// requests that the same decoded value be retried later.
764 void connect(std::function<bool(const T &)> callback,
765 const ChannelPort::ConnectOptions &opts = {std::nullopt,
766 false}) {
767 connect([cb = std::move(callback)](
768 std::unique_ptr<T> &value) -> bool { return cb(*value); },
769 opts);
770 }
771
772 /// Connect an owning typed callback.
773 ///
774 /// This is the typed analogue of `ReadChannelPort::ReadCallback`: the
775 /// callback may take ownership of the decoded value or return `false` to
776 /// retry delivery later with the same object.
778 const ChannelPort::ConnectOptions &opts = {std::nullopt,
779 false}) {
780 if (!inner)
781 throw AcceleratorMismatchError("TypedReadPort: null port pointer");
783 emplaceDeserializer(std::move(callback));
784 try {
785 inner->connect(
786 [this](std::unique_ptr<SegmentedMessageData> &msg) -> bool {
787 assert(deserializer && "Deserializer should be connected");
788 return deserializer->push(msg);
789 },
790 opts);
791 } catch (...) {
793 throw;
794 }
795 // TODO: Hook callback-mode custom-deserializer poke() retries into the
796 // existing periodic poll/background-worker path.
798 }
799
800 /// Blocking typed read in polling mode.
801 std::unique_ptr<T> read() {
802 std::future<std::unique_ptr<T>> f = readAsync();
803 f.wait();
804 return f.get();
805 }
806
807 /// Asynchronous typed read in polling mode.
808 ///
809 /// The returned future yields ownership of the next decoded value.
810 std::future<std::unique_ptr<T>> readAsync() {
811 if (mode == Mode::Callback)
812 throw std::runtime_error(
813 "Cannot read from a callback channel. `connect()` without a "
814 "callback specified to use polling mode.");
816 throw std::runtime_error(
817 "Cannot read from a disconnected channel. `connect()` first.");
818
819 if (!pollingState)
820 throw std::runtime_error(
821 "Cannot read from a disconnected channel. `connect()` first.");
822
823 std::future<std::unique_ptr<T>> future = pollingState->readAsync();
824
825 if (deserializer)
826 deserializer->poke();
827 return future;
828 }
829
830 /// Set the maximum number of decoded typed values buffered in polling mode.
831 /// `0` means unbounded.
832 void setMaxDataQueueMsgs(uint64_t maxMsgs) {
833 maxDataQueueMsgs = maxMsgs;
834 if (pollingState)
835 pollingState->setMaxQueued(maxMsgs);
836 }
837
838 /// Disconnect the typed port and abandon any pending polling reads.
844 bool isConnected() const { return inner && inner->isConnected(); }
845
846 ReadChannelPort &raw() { return *inner; }
847 const ReadChannelPort &raw() const { return *inner; }
848
849private:
853
855 deserializer.reset();
856 pollingState.reset();
857 }
858
860 if constexpr (detail::has_type_deserializer_v<T>) {
861 deserializer.emplace(std::move(callback));
862 } else {
863 deserializer.emplace(std::move(callback), wireInfo_);
864 }
865 }
866
869 throw std::runtime_error("Channel already connected");
870 if constexpr (SkipTypeCheck) {
871 // Skip verification, but still compute wireInfo for the POD path so
872 // small / non-byte-aligned wire widths still encode correctly.
873 if constexpr (!detail::has_type_deserializer_v<T>)
875 } else if constexpr (has_esi_id_v<T>) {
876 verifyTypeCompatibility<T>(inner);
877 } else if constexpr (!detail::has_type_deserializer_v<T>) {
878 verifyTypeCompatibility<T>(inner);
880 }
881 }
882
887 std::optional<Deserializer> deserializer;
888 std::optional<PollingState> pollingState;
889};
890
891/// Specialization for void — read discards data and returns nothing.
892template <>
893class TypedReadPort<void> {
894public:
895 explicit TypedReadPort(ReadChannelPort &port) : inner(&port) {}
896 // NOLINTNEXTLINE(google-explicit-constructor)
898 TypedReadPort(const TypedReadPort &) = delete;
902
904 if (inner && inner->isConnected())
905 disconnect();
906 }
907
908 void connect(const ChannelPort::ConnectOptions &opts = {std::nullopt,
909 false}) {
910 if (!inner)
911 throw AcceleratorMismatchError("TypedReadPort: null port pointer");
912 verifyTypeCompatibility<void>(inner->getType());
913 inner->connect(opts);
914 }
915
916 void connect(std::function<bool()> callback,
917 const ChannelPort::ConnectOptions &opts = {std::nullopt,
918 false}) {
919 if (!inner)
920 throw AcceleratorMismatchError("TypedReadPort: null port pointer");
921 verifyTypeCompatibility<void>(inner->getType());
922 inner->connect(
923 [cb = std::move(callback)](MessageData) -> bool { return cb(); }, opts);
924 }
925
926 void read() {
927 MessageData outData;
928 inner->read(outData);
929 }
930
931 std::future<void> readAsync() {
932 auto innerFuture = inner->readAsync();
933 return std::async(
934 std::launch::deferred,
935 [f = std::move(innerFuture)]() mutable -> void { f.get(); });
936 }
937
939 bool isConnected() const { return inner && inner->isConnected(); }
940
941 ReadChannelPort &raw() { return *inner; }
942 const ReadChannelPort &raw() const { return *inner; }
943
944private:
946};
947
948//===----------------------------------------------------------------------===//
949// TypedFunction<ArgT, ResultT>
950//
951// Strongly-typed function-call wrapper built on top of TypedWritePort and
952// TypedReadPort. Implicitly constructible from `FuncService::Function *` (the
953// return type of `BundlePort::getAs<FuncService::Function>()`); construction
954// resolves the underlying raw "arg" / "result" channels but does not connect
955// them until `connect()` is called.
956//
957// We intentionally bypass `FuncService::Function::call`. That helper returns
958// the future from a single `result->readAsync()`, which only sees one raw
959// frame and would race when the response is multi-frame or when calls are
960// pipelined. Driving a `TypedReadPort<ResultT>` instead reuses its persistent
961// per-port deserializer (so partial frames / pending outputs survive between
962// calls) and its FIFO polling buffer (so pipelined `readAsync()` futures
963// hand back per-call decoded values in call order, even when consumers
964// `.get()` them out of order).
965//===----------------------------------------------------------------------===//
966
967namespace detail {
968
969/// Standard ConnectOptions for typed function ports: untranslated frames so
970/// the deserializer can see raw frame boundaries.
972 return ChannelPort::ConnectOptions(/*bufferSize=*/std::nullopt,
973 /*translateMessage=*/false);
974}
975
976/// Convert a `std::future<std::unique_ptr<T>>` (as returned by
977/// `TypedReadPort::readAsync()`) into a `std::future<T>`. Uses a deferred
978/// async so `.get()` blocks only when the caller actually waits for the
979/// value, preserving the per-call FIFO ordering of the underlying polling
980/// buffer. A null `unique_ptr` from a misbehaving deserializer is reported
981/// as a runtime error rather than dereferenced.
982template <typename T>
983std::future<T> awaitDecoded(std::future<std::unique_ptr<T>> inner) {
984 return std::async(std::launch::deferred,
985 [fut = std::move(inner)]() mutable -> T {
986 std::unique_ptr<T> v = fut.get();
987 if (!v)
988 throw std::runtime_error(
989 "TypedFunction: deserializer produced a null "
990 "value");
991 return std::move(*v);
992 });
993}
994
995/// Throw the standard "null Function pointer" error used by every
996/// TypedFunction specialization.
997[[noreturn]] inline void throwNullFunction() {
999 "TypedFunction: null Function pointer (getAs failed or wrong type)");
1000}
1001
1002/// Throw a clear "not connected" error from `call()` paths.
1003[[noreturn]] inline void throwNotConnected() {
1004 throw std::runtime_error("TypedFunction: must be 'connect'ed before "
1005 "calling");
1006}
1007
1008/// Throw a clear "already connected" error from `connect()` paths.
1009[[noreturn]] inline void throwAlreadyConnected() {
1010 throw std::runtime_error("TypedFunction is already connected");
1011}
1012
1013} // namespace detail
1014
1015template <typename ArgT, typename ResultT, bool SkipTypeCheck = false>
1017public:
1018 /// Implicit conversion from Function* (returned by getAs<>()).
1019 // NOLINTNEXTLINE(google-explicit-constructor)
1021 TypedFunction(const TypedFunction &) = delete;
1023
1024 void connect() {
1025 if (!inner)
1027 if (argPort)
1029 argPort.emplace(&inner->getRawWrite("arg"));
1030 resultPort.emplace(&inner->getRawRead("result"));
1032 argPort->connect(opts);
1033 resultPort->connect(opts);
1034 }
1035
1036 std::future<ResultT> call(const ArgT &arg) {
1037 if (!argPort)
1039 // Serialize the per-call write+readAsync pair so the polling buffer's
1040 // FIFO matches the call FIFO. Pipelined calls are still allowed -- the
1041 // shared deserializer drains responses in wire order and the FIFO
1042 // ensures call N's future resolves to call N's result, regardless of
1043 // the order in which callers `.get()` the futures.
1044 std::scoped_lock<std::mutex> lock(callMutex);
1045 argPort->write(arg);
1046 return detail::awaitDecoded<ResultT>(resultPort->readAsync());
1047 }
1048
1049 /// Emplace-style call: constructs `ArgT` in-place from the forwarded
1050 /// arguments and forwards to `call(const ArgT &)`. SFINAE-disabled for the
1051 /// single-`ArgT`-argument case so it does not shadow the lvalue overload.
1052 template <
1053 typename First, typename... Rest,
1054 typename = std::enable_if_t<
1055 std::is_constructible_v<ArgT, First, Rest...> &&
1056 (!std::is_same_v<std::decay_t<First>, ArgT> || sizeof...(Rest) != 0)>>
1057 std::future<ResultT> call(First &&first, Rest &&...rest) {
1058 return call(ArgT(std::forward<First>(first), std::forward<Rest>(rest)...));
1059 }
1060
1061 /// Function-call operator overloads: forward to `call()`.
1062 template <typename... Args>
1063 auto operator()(Args &&...args)
1064 -> decltype(this->call(std::forward<Args>(args)...)) {
1065 return call(std::forward<Args>(args)...);
1066 }
1067
1069 const services::FuncService::Function &raw() const { return *inner; }
1070
1071private:
1073 std::optional<TypedWritePort<ArgT, SkipTypeCheck>> argPort;
1074 std::optional<TypedReadPort<ResultT, SkipTypeCheck>> resultPort;
1075 std::mutex callMutex;
1076};
1077
1078/// Partial specialization: void argument, typed result.
1079template <typename ResultT, bool SkipTypeCheck>
1080class TypedFunction<void, ResultT, SkipTypeCheck> {
1081public:
1082 // NOLINTNEXTLINE(google-explicit-constructor)
1084 TypedFunction(const TypedFunction &) = delete;
1086
1087 void connect() {
1088 if (!inner)
1090 if (argPort)
1092 argPort.emplace(&inner->getRawWrite("arg"));
1093 resultPort.emplace(&inner->getRawRead("result"));
1095 argPort->connect(opts);
1096 resultPort->connect(opts);
1097 }
1098
1099 std::future<ResultT> call() {
1100 if (!argPort)
1102 std::scoped_lock<std::mutex> lock(callMutex);
1103 argPort->write();
1104 return detail::awaitDecoded<ResultT>(resultPort->readAsync());
1105 }
1106
1107 /// Function-call operator overload: forwards to `call()`.
1108 std::future<ResultT> operator()() { return call(); }
1109
1111 const services::FuncService::Function &raw() const { return *inner; }
1112
1113private:
1115 std::optional<TypedWritePort<void>> argPort;
1116 std::optional<TypedReadPort<ResultT, SkipTypeCheck>> resultPort;
1117 std::mutex callMutex;
1118};
1119
1120/// Partial specialization: typed argument, void result.
1121template <typename ArgT, bool SkipTypeCheck>
1122class TypedFunction<ArgT, void, SkipTypeCheck> {
1123public:
1124 // NOLINTNEXTLINE(google-explicit-constructor)
1126 TypedFunction(const TypedFunction &) = delete;
1128
1129 void connect() {
1130 if (!inner)
1132 if (argPort)
1134 argPort.emplace(&inner->getRawWrite("arg"));
1135 resultPort.emplace(&inner->getRawRead("result"));
1137 argPort->connect(opts);
1138 resultPort->connect(opts);
1139 }
1140
1141 std::future<void> call(const ArgT &arg) {
1142 if (!argPort)
1144 std::scoped_lock<std::mutex> lock(callMutex);
1145 argPort->write(arg);
1146 return resultPort->readAsync();
1147 }
1148
1149 /// Emplace-style call: constructs `ArgT` in-place from the forwarded
1150 /// arguments and forwards to `call(const ArgT &)`. SFINAE-disabled for the
1151 /// single-`ArgT`-argument case so it does not shadow the lvalue overload.
1152 template <
1153 typename First, typename... Rest,
1154 typename = std::enable_if_t<
1155 std::is_constructible_v<ArgT, First, Rest...> &&
1156 (!std::is_same_v<std::decay_t<First>, ArgT> || sizeof...(Rest) != 0)>>
1157 std::future<void> call(First &&first, Rest &&...rest) {
1158 return call(ArgT(std::forward<First>(first), std::forward<Rest>(rest)...));
1159 }
1160
1161 /// Function-call operator overloads: forward to `call()`.
1162 template <typename... Args>
1163 auto operator()(Args &&...args)
1164 -> decltype(this->call(std::forward<Args>(args)...)) {
1165 return call(std::forward<Args>(args)...);
1166 }
1167
1169 const services::FuncService::Function &raw() const { return *inner; }
1170
1171private:
1173 std::optional<TypedWritePort<ArgT, SkipTypeCheck>> argPort;
1174 std::optional<TypedReadPort<void>> resultPort;
1175 std::mutex callMutex;
1176};
1177
1178/// Full specialization: void argument, void result.
1179template <bool SkipTypeCheck>
1180class TypedFunction<void, void, SkipTypeCheck> {
1181public:
1182 // NOLINTNEXTLINE(google-explicit-constructor)
1184 TypedFunction(const TypedFunction &) = delete;
1186
1187 void connect() {
1188 if (!inner)
1190 if (argPort)
1192 argPort.emplace(&inner->getRawWrite("arg"));
1193 resultPort.emplace(&inner->getRawRead("result"));
1195 argPort->connect(opts);
1196 resultPort->connect(opts);
1197 }
1198
1199 std::future<void> call() {
1200 if (!argPort)
1202 std::scoped_lock<std::mutex> lock(callMutex);
1203 argPort->write();
1204 return resultPort->readAsync();
1205 }
1206
1207 /// Function-call operator overload: forwards to `call()`.
1208 std::future<void> operator()() { return call(); }
1209
1211 const services::FuncService::Function &raw() const { return *inner; }
1212
1213private:
1215 std::optional<TypedWritePort<void>> argPort;
1216 std::optional<TypedReadPort<void>> resultPort;
1217 std::mutex callMutex;
1218};
1219
1220//===----------------------------------------------------------------------===//
1221// TypedCallback<ArgT, ResultT>
1222//
1223// Strongly-typed accelerator-callback wrapper built on top of TypedReadPort
1224// and TypedWritePort. Implicitly constructible from `CallService::Callback *`
1225// (the return type of `BundlePort::getAs<CallService::Callback>()`);
1226// construction resolves the underlying raw "arg" / "result" channels but
1227// does not connect them until `connect()` is called.
1228//
1229// We bypass `CallService::Callback::connect` for the same reason we bypass
1230// `FuncService::Function::call`: typed args may span multiple raw frames and
1231// we need the persistent stateful deserializer that `TypedReadPort<ArgT>`
1232// already provides.
1233//
1234// Note: the `quick` parameter is kept for API compatibility but has no
1235// effect in this design. The typed user callback is always dispatched inline
1236// from the read-channel callback thread (matching the previous
1237// custom-deserializer behavior). If service-thread dispatch is needed in the
1238// future, that can be layered on top by using `TypedReadPort` polling mode
1239// plus a worker.
1240//
1241// Lifetime: the TypedReadPort callback installed on `connect()` captures
1242// `this`. The wrapper is non-copyable and non-movable, so its address is
1243// stable for its entire lifetime. Destruction of the embedded TypedReadPort
1244// synchronously revokes the callback and waits for any in-flight callback
1245// dispatch to complete (see ReadChannelPort::disconnect), so the captured
1246// `this` cannot dangle. Callers must ensure the TypedCallback outlives any
1247// expected callback dispatch -- using a temporary like
1248// `TypedCallback<...>(cb).connect(...)` is safe only because the destructor
1249// runs at end-of-full-expression and disconnects before returning.
1250//===----------------------------------------------------------------------===//
1251
1252namespace detail {
1253
1254/// Throw the standard "null Callback pointer" error used by every
1255/// TypedCallback specialization.
1256[[noreturn]] inline void throwNullCallback() {
1258 "TypedCallback: null Callback pointer (getAs failed or wrong type)");
1259}
1260
1261/// Throw a clear "already connected" error from TypedCallback `connect()`
1262/// paths.
1263[[noreturn]] inline void throwCallbackAlreadyConnected() {
1264 throw std::runtime_error("TypedCallback is already connected");
1265}
1266
1267} // namespace detail
1268
1269template <typename ArgT, typename ResultT, bool SkipTypeCheck = false>
1271public:
1272 // NOLINTNEXTLINE(google-explicit-constructor)
1274 TypedCallback(const TypedCallback &) = delete;
1276
1277 void connect(std::function<ResultT(const ArgT &)> callback,
1278 bool quick = false) {
1279 (void)quick; // See class header.
1280 if (!inner)
1282 if (argPort)
1284 argPort.emplace(&inner->getRawRead("arg"));
1285 resultPort.emplace(&inner->getRawWrite("result"));
1287 resultPort->connect(opts);
1288 userCallback = std::move(callback);
1289 // The TypedReadPort callback runs on the read-channel callback thread
1290 // for every decoded ArgT. We forward to the user's callback and then
1291 // serialize the result through the TypedWritePort.
1292 argPort->connect(
1293 [this](const ArgT &arg) -> bool {
1294 ResultT result = userCallback(arg);
1295 resultPort->write(result);
1296 return true;
1297 },
1298 opts);
1299 }
1300
1302 const services::CallService::Callback &raw() const { return *inner; }
1303
1304private:
1306 std::optional<TypedReadPort<ArgT, SkipTypeCheck>> argPort;
1307 std::optional<TypedWritePort<ResultT, SkipTypeCheck>> resultPort;
1308 std::function<ResultT(const ArgT &)> userCallback;
1309};
1310
1311/// Partial specialization: void argument, typed result.
1312template <typename ResultT, bool SkipTypeCheck>
1313class TypedCallback<void, ResultT, SkipTypeCheck> {
1314public:
1315 // NOLINTNEXTLINE(google-explicit-constructor)
1317 TypedCallback(const TypedCallback &) = delete;
1319
1320 void connect(std::function<ResultT()> callback, bool quick = false) {
1321 (void)quick;
1322 if (!inner)
1324 if (argPort)
1326 argPort.emplace(&inner->getRawRead("arg"));
1327 resultPort.emplace(&inner->getRawWrite("result"));
1329 resultPort->connect(opts);
1330 userCallback = std::move(callback);
1331 argPort->connect(
1332 [this]() -> bool {
1333 ResultT result = userCallback();
1334 resultPort->write(result);
1335 return true;
1336 },
1337 opts);
1338 }
1339
1341 const services::CallService::Callback &raw() const { return *inner; }
1342
1343private:
1345 std::optional<TypedReadPort<void>> argPort;
1346 std::optional<TypedWritePort<ResultT, SkipTypeCheck>> resultPort;
1347 std::function<ResultT()> userCallback;
1348};
1349
1350/// Partial specialization: typed argument, void result.
1351template <typename ArgT, bool SkipTypeCheck>
1352class TypedCallback<ArgT, void, SkipTypeCheck> {
1353public:
1354 // NOLINTNEXTLINE(google-explicit-constructor)
1356 TypedCallback(const TypedCallback &) = delete;
1358
1359 void connect(std::function<void(const ArgT &)> callback, bool quick = false) {
1360 (void)quick;
1361 if (!inner)
1363 if (argPort)
1365 argPort.emplace(&inner->getRawRead("arg"));
1366 resultPort.emplace(&inner->getRawWrite("result"));
1368 resultPort->connect(opts);
1369 userCallback = std::move(callback);
1370 argPort->connect(
1371 [this](const ArgT &arg) -> bool {
1372 userCallback(arg);
1373 resultPort->write();
1374 return true;
1375 },
1376 opts);
1377 }
1378
1380 const services::CallService::Callback &raw() const { return *inner; }
1381
1382private:
1384 std::optional<TypedReadPort<ArgT, SkipTypeCheck>> argPort;
1385 std::optional<TypedWritePort<void>> resultPort;
1386 std::function<void(const ArgT &)> userCallback;
1387};
1388
1389/// Full specialization: void argument, void result.
1390template <bool SkipTypeCheck>
1391class TypedCallback<void, void, SkipTypeCheck> {
1392public:
1393 // NOLINTNEXTLINE(google-explicit-constructor)
1395 TypedCallback(const TypedCallback &) = delete;
1397
1398 void connect(std::function<void()> callback, bool quick = false) {
1399 (void)quick;
1400 if (!inner)
1402 if (argPort)
1404 argPort.emplace(&inner->getRawRead("arg"));
1405 resultPort.emplace(&inner->getRawWrite("result"));
1407 resultPort->connect(opts);
1408 userCallback = std::move(callback);
1409 argPort->connect(
1410 [this]() -> bool {
1411 userCallback();
1412 resultPort->write();
1413 return true;
1414 },
1415 opts);
1416 }
1417
1419 const services::CallService::Callback &raw() const { return *inner; }
1420
1421private:
1423 std::optional<TypedReadPort<void>> argPort;
1424 std::optional<TypedWritePort<void>> resultPort;
1425 std::function<void()> userCallback;
1426};
1427
1428//===----------------------------------------------------------------------===//
1429// IndexedPorts<T> — wrapper over std::map<int, T>.
1430//
1431// Constructed once from a moved std::map (populated via `try_emplace` so T
1432// need not be movable or default-constructible). Supports both mutable and
1433// const iteration / indexing so callers can invoke non-const methods on the
1434// stored ports (e.g. `connect()` on a typed port wrapper).
1435//===----------------------------------------------------------------------===//
1436
1437template <typename T>
1439public:
1440 explicit IndexedPorts(std::map<int, T> &&ports) : ports_(std::move(ports)) {}
1443 IndexedPorts(const IndexedPorts &) = delete;
1445
1446 T &operator[](int idx) { return ports_.at(idx); }
1447 const T &operator[](int idx) const { return ports_.at(idx); }
1448 auto begin() { return ports_.begin(); }
1449 auto end() { return ports_.end(); }
1450 auto begin() const { return ports_.cbegin(); }
1451 auto end() const { return ports_.cend(); }
1452 size_t size() const { return ports_.size(); }
1453 bool contains(int idx) const { return ports_.count(idx) > 0; }
1454
1455private:
1456 std::map<int, T> ports_;
1457};
1458
1459//===----------------------------------------------------------------------===//
1460// Port-lookup helpers: findPortOrThrow, findPortAsOrThrow, findPortIndices.
1461//
1462// `findPortOrThrow` and `findPortAsOrThrow` are thin wrappers over
1463// `HWModule::resolvePort` that translate "not found" into a thrown
1464// `AcceleratorMismatchError` (instead of a nullptr return) and the typed
1465// variant additionally `dynamic_cast`s to a service-specific subclass. They
1466// exist so generated facade code does not have to repeat the same
1467// boilerplate (build a single-element AppIDPath, check for null, format an
1468// error string) for every port lookup it performs.
1469//===----------------------------------------------------------------------===//
1470
1471/// Look up a BundlePort by AppID in `module`. Throws AcceleratorMismatchError
1472/// if the port is not found. The returned reference is mutable: even though
1473/// `HWModule::resolvePort` is a const method, the underlying port map stores
1474/// `BundlePort&` references, so callers can invoke non-const methods on the
1475/// returned port (e.g. `getRawRead`, `connect`).
1476inline BundlePort &findPortOrThrow(HWModule *module, const AppID &id) {
1477 AppIDPath lastLookup;
1478 BundlePort *port = module->resolvePort(AppIDPath{id}, lastLookup);
1479 if (!port)
1480 throw AcceleratorMismatchError("Expected port '" + id.toString() +
1481 "' not found in module");
1482 return *port;
1483}
1484
1485/// Look up a BundlePort by AppID and cast it to `T`. Throws
1486/// AcceleratorMismatchError if the port is missing or has the wrong runtime
1487/// type. The returned pointer is mutable.
1488template <typename T>
1489T *findPortAsOrThrow(HWModule *module, const AppID &id) {
1490 BundlePort &port = findPortOrThrow(module, id);
1491 T *result = port.getAs<T>();
1492 if (!result)
1493 throw AcceleratorMismatchError("Port '" + id.toString() +
1494 "' has unexpected type (expected " +
1495 typeid(T).name() + ")");
1496 return result;
1497}
1498
1499/// Return a sorted vector of the `idx` values for every port whose AppID name
1500/// matches `name`. Ports without an index are ignored.
1501inline std::vector<uint32_t> findPortIndices(HWModule *module,
1502 const std::string &name) {
1503 std::vector<uint32_t> indices;
1504 for (const auto &[appid, port] : module->getPorts())
1505 if (appid.name == name && appid.idx.has_value())
1506 indices.push_back(appid.idx.value());
1507 std::sort(indices.begin(), indices.end());
1508 return indices;
1509}
1510
1511} // namespace esi
1512
1513#endif // ESI_TYPED_PORTS_H
assert(baseType &&"element must be base type")
Arrays have a compile time specified (static) size and an element type.
Definition Types.h:283
Bit vectors include signed, unsigned, and signless integers.
Definition Types.h:193
Bits are just an array of bits.
Definition Types.h:206
Services provide connections to 'bundles' – collections of named, unidirectional communication channe...
Definition Ports.h:611
T * getAs() const
Cast this Bundle port to a subclass which is actually useful.
Definition Ports.h:639
ReadChannelPort & getRawRead(const std::string &name) const
Definition Ports.cpp:52
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
Definition Ports.cpp:42
Unidirectional channels are the basic communication primitive between the host and accelerator.
Definition Ports.h:115
const Type * getType() const
Definition Ports.h:219
const WindowType * getWindowType() const
If this port carries a windowed type, return the original WindowType (whose intoType is what getType(...
Definition Ports.h:224
Represents either the top level or an instance of a hardware module.
Definition Design.h:47
const T & operator[](int idx) const
std::map< int, T > ports_
IndexedPorts(std::map< int, T > &&ports)
auto end() const
auto begin() const
IndexedPorts & operator=(const IndexedPorts &)=delete
IndexedPorts(IndexedPorts &&)=default
IndexedPorts & operator=(IndexedPorts &&)=default
bool contains(int idx) const
T & operator[](int idx)
size_t size() const
IndexedPorts(const IndexedPorts &)=delete
A concrete flat message backed by a single vector of bytes.
Definition Common.h:155
const uint8_t * getBytes() const
Definition Common.h:166
const T * as() const
Cast to a type.
Definition Common.h:190
size_t getSize() const
Get the size of the data in bytes.
Definition Common.h:180
Helper base class for stateful deserializers which may emit zero, one, or many typed outputs for each...
Definition TypedPorts.h:246
detail::TypedReadOwnedCallback< T > OutputCallback
Definition TypedPorts.h:248
QueuedDecodeTypeDeserializer(OutputCallback output)
Definition TypedPorts.h:251
virtual ~QueuedDecodeTypeDeserializer()=default
std::queue< std::unique_ptr< T > > pendingOutputs
Definition TypedPorts.h:318
std::vector< std::unique_ptr< T > > DecodedOutputs
Definition TypedPorts.h:249
bool poke()
Retry delivery of any typed outputs which were previously blocked by the client callback.
Definition TypedPorts.h:291
bool push(std::unique_ptr< SegmentedMessageData > &msg)
Push one raw message into the deserializer.
Definition TypedPorts.h:261
virtual DecodedOutputs decode(std::unique_ptr< SegmentedMessageData > &msg)=0
Decode one raw message into zero or more typed outputs.
A ChannelPort which reads data from the accelerator.
Definition Ports.h:453
virtual void connect(ReadCallback callback, const ConnectOptions &options={})
Definition Ports.cpp:140
virtual std::future< MessageData > readAsync()
Asynchronous polling read.
Definition Ports.cpp:225
virtual bool isConnected() const override
Definition Ports.h:475
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
Definition Ports.h:504
virtual void disconnect() override
Disconnect the channel.
Definition Ports.cpp:70
virtual void read(MessageData &outData)
Specify a buffer to read into.
Definition Ports.h:517
Signed integer.
Definition Types.h:224
Abstract multi-segment message.
Definition Common.h:133
virtual MessageData toMessageData() const
Flatten all segments into a standard MessageData.
Definition Common.cpp:60
Reusable serial-list window deserializer.
Definition TypedPorts.h:340
std::vector< data_frame > pending_frames_
Definition TypedPorts.h:416
typename Base::DecodedOutputs DecodedOutputs
Definition TypedPorts.h:344
DecodedOutputs decode(std::unique_ptr< SegmentedMessageData > &msg) override
Decode one raw message into zero or more typed outputs.
Definition TypedPorts.h:358
typename Base::OutputCallback OutputCallback
Definition TypedPorts.h:343
std::optional< header_frame > pending_header_
Definition TypedPorts.h:415
SerialListTypeDeserializer(OutputCallback output)
Definition TypedPorts.h:346
QueuedDecodeTypeDeserializer< T > Base
Definition TypedPorts.h:342
typename T::header_frame header_frame
Definition TypedPorts.h:350
static constexpr size_t kFrameSize
Definition TypedPorts.h:356
typename T::count_type count_type
Definition TypedPorts.h:352
typename T::data_frame data_frame
Definition TypedPorts.h:351
std::vector< uint8_t > partial_
Definition TypedPorts.h:414
Type aliases provide a named type which forwards to an inner type.
Definition Types.h:166
Root class of the ESI type system.
Definition Types.h:36
std::string toString(bool oneLine=false) const
Definition Types.cpp:120
ID getID() const
Definition Types.h:42
services::CallService::Callback * inner
std::optional< TypedReadPort< ArgT, SkipTypeCheck > > argPort
std::optional< TypedWritePort< void > > resultPort
const services::CallService::Callback & raw() const
std::function< void(const ArgT &)> userCallback
void connect(std::function< void(const ArgT &)> callback, bool quick=false)
TypedCallback(const TypedCallback &)=delete
services::CallService::Callback & raw()
TypedCallback & operator=(const TypedCallback &)=delete
TypedCallback(services::CallService::Callback *cb)
std::optional< TypedWritePort< ResultT, SkipTypeCheck > > resultPort
services::CallService::Callback & raw()
std::optional< TypedReadPort< void > > argPort
void connect(std::function< ResultT()> callback, bool quick=false)
TypedCallback(services::CallService::Callback *cb)
const services::CallService::Callback & raw() const
services::CallService::Callback * inner
TypedCallback & operator=(const TypedCallback &)=delete
std::optional< TypedWritePort< void > > resultPort
std::optional< TypedReadPort< void > > argPort
TypedCallback(services::CallService::Callback *cb)
services::CallService::Callback & raw()
void connect(std::function< void()> callback, bool quick=false)
TypedCallback & operator=(const TypedCallback &)=delete
TypedCallback(const TypedCallback &)=delete
const services::CallService::Callback & raw() const
services::CallService::Callback * inner
services::CallService::Callback * inner
std::optional< TypedReadPort< ArgT, SkipTypeCheck > > argPort
services::CallService::Callback & raw()
void connect(std::function< ResultT(const ArgT &)> callback, bool quick=false)
TypedCallback(services::CallService::Callback *cb)
std::optional< TypedWritePort< ResultT, SkipTypeCheck > > resultPort
TypedCallback & operator=(const TypedCallback &)=delete
std::function< ResultT(const ArgT &)> userCallback
const services::CallService::Callback & raw() const
TypedCallback(const TypedCallback &)=delete
auto operator()(Args &&...args) -> decltype(this->call(std::forward< Args >(args)...))
Function-call operator overloads: forward to call().
TypedFunction & operator=(const TypedFunction &)=delete
std::optional< TypedWritePort< ArgT, SkipTypeCheck > > argPort
std::future< void > call(First &&first, Rest &&...rest)
Emplace-style call: constructs ArgT in-place from the forwarded arguments and forwards to call(const ...
TypedFunction(const TypedFunction &)=delete
std::optional< TypedReadPort< void > > resultPort
const services::FuncService::Function & raw() const
services::FuncService::Function & raw()
std::future< void > call(const ArgT &arg)
services::FuncService::Function * inner
TypedFunction(services::FuncService::Function *func)
std::optional< TypedReadPort< ResultT, SkipTypeCheck > > resultPort
TypedFunction & operator=(const TypedFunction &)=delete
std::optional< TypedWritePort< void > > argPort
services::FuncService::Function & raw()
services::FuncService::Function * inner
std::future< ResultT > operator()()
Function-call operator overload: forwards to call().
const services::FuncService::Function & raw() const
TypedFunction(services::FuncService::Function *func)
const services::FuncService::Function & raw() const
TypedFunction(const TypedFunction &)=delete
TypedFunction & operator=(const TypedFunction &)=delete
services::FuncService::Function * inner
TypedFunction(services::FuncService::Function *func)
std::optional< TypedWritePort< void > > argPort
std::optional< TypedReadPort< void > > resultPort
services::FuncService::Function & raw()
std::future< void > operator()()
Function-call operator overload: forwards to call().
std::optional< TypedWritePort< ArgT, SkipTypeCheck > > argPort
std::optional< TypedReadPort< ResultT, SkipTypeCheck > > resultPort
std::future< ResultT > call(First &&first, Rest &&...rest)
Emplace-style call: constructs ArgT in-place from the forwarded arguments and forwards to call(const ...
TypedFunction(const TypedFunction &)=delete
services::FuncService::Function & raw()
std::mutex callMutex
auto operator()(Args &&...args) -> decltype(this->call(std::forward< Args >(args)...))
Function-call operator overloads: forward to call().
std::future< ResultT > call(const ArgT &arg)
TypedFunction & operator=(const TypedFunction &)=delete
const services::FuncService::Function & raw() const
TypedFunction(services::FuncService::Function *func)
Implicit conversion from Function* (returned by getAs<>()).
services::FuncService::Function * inner
TypedReadPort(const TypedReadPort &)=delete
TypedReadPort & operator=(TypedReadPort &&)=delete
TypedReadPort & operator=(const TypedReadPort &)=delete
ReadChannelPort * inner
Definition TypedPorts.h:945
TypedReadPort(ReadChannelPort &port)
Definition TypedPorts.h:895
TypedReadPort(ReadChannelPort *port)
Definition TypedPorts.h:897
TypedReadPort(TypedReadPort &&)=delete
std::future< void > readAsync()
Definition TypedPorts.h:931
void connect(const ChannelPort::ConnectOptions &opts={std::nullopt, false})
Definition TypedPorts.h:908
ReadChannelPort & raw()
Definition TypedPorts.h:941
void connect(std::function< bool()> callback, const ChannelPort::ConnectOptions &opts={std::nullopt, false})
Definition TypedPorts.h:916
const ReadChannelPort & raw() const
Definition TypedPorts.h:942
Strongly typed wrapper around a raw read channel.
Definition TypedPorts.h:718
TypedReadPort(ReadChannelPort &port)
Definition TypedPorts.h:720
ReadChannelPort & raw()
Definition TypedPorts.h:846
detail::DeserializerFor< T > Deserializer
Definition TypedPorts.h:850
ReadChannelPort * inner
Definition TypedPorts.h:883
TypedReadPort & operator=(const TypedReadPort &)=delete
std::unique_ptr< T > read()
Blocking typed read in polling mode.
Definition TypedPorts.h:801
void connect(const ChannelPort::ConnectOptions &opts={std::nullopt, false})
Connect in polling mode.
Definition TypedPorts.h:737
uint64_t maxDataQueueMsgs
Definition TypedPorts.h:886
void disconnect()
Disconnect the typed port and abandon any pending polling reads.
Definition TypedPorts.h:839
void setMaxDataQueueMsgs(uint64_t maxMsgs)
Set the maximum number of decoded typed values buffered in polling mode.
Definition TypedPorts.h:832
void emplaceDeserializer(detail::DeserializerOutputCallback< T > callback)
Definition TypedPorts.h:859
void connect(std::function< bool(const T &)> callback, const ChannelPort::ConnectOptions &opts={std::nullopt, false})
Connect a non-owning typed callback.
Definition TypedPorts.h:764
const ReadChannelPort & raw() const
Definition TypedPorts.h:847
TypedReadPort & operator=(TypedReadPort &&)=delete
std::optional< PollingState > pollingState
Definition TypedPorts.h:888
TypedReadPort(const TypedReadPort &)=delete
std::optional< Deserializer > deserializer
Definition TypedPorts.h:887
void connect(detail::TypedReadOwnedCallback< T > callback, const ChannelPort::ConnectOptions &opts={std::nullopt, false})
Connect an owning typed callback.
Definition TypedPorts.h:777
std::future< std::unique_ptr< T > > readAsync()
Asynchronous typed read in polling mode.
Definition TypedPorts.h:810
TypedReadPort(ReadChannelPort *port)
Definition TypedPorts.h:722
TypedReadPort(TypedReadPort &&)=delete
bool isConnected() const
Definition TypedPorts.h:844
WriteChannelPort * inner
Definition TypedPorts.h:701
TypedWritePort(WriteChannelPort &port)
Definition TypedPorts.h:671
TypedWritePort(WriteChannelPort *port)
Definition TypedPorts.h:673
WriteChannelPort & raw()
Definition TypedPorts.h:697
void connect(const ChannelPort::ConnectOptions &opts={std::nullopt, false})
Definition TypedPorts.h:675
const WriteChannelPort & raw() const
Definition TypedPorts.h:698
const WriteChannelPort & raw() const
Definition TypedPorts.h:660
void connect(const ChannelPort::ConnectOptions &opts={std::nullopt, false})
Definition TypedPorts.h:626
WriteChannelPort & raw()
Definition TypedPorts.h:659
void write(std::unique_ptr< T > &data)
Write by taking ownership.
Definition TypedPorts.h:640
bool tryWrite(const T &data)
Definition TypedPorts.h:651
bool isConnected() const
Definition TypedPorts.h:657
TypedWritePort(WriteChannelPort &port)
Definition TypedPorts.h:622
void write(const T &data)
Definition TypedPorts.h:636
TypedWritePort(WriteChannelPort *port)
Definition TypedPorts.h:624
WriteChannelPort * inner
Definition TypedPorts.h:663
Unsigned integer.
Definition Types.h:235
The "void" type is a special type which can be used to represent no type.
Definition Types.h:141
Windows represent a fixed-size sliding window over a stream of data.
Definition Types.h:316
A ChannelPort which sends data to the accelerator.
Definition Ports.h:308
virtual bool isConnected() const override
Definition Ports.h:323
virtual void disconnect() override
Definition Ports.h:322
void write(const MessageData &data)
A very basic blocking write API.
Definition Ports.h:327
bool flush()
Flush any buffered data.
Definition Ports.h:375
bool tryWrite(const MessageData &data)
A basic non-blocking write API.
Definition Ports.h:357
virtual void connect(const ConnectOptions &options={}) override
Set up a connection to the accelerator.
Definition Ports.h:312
Default deserializer for simple 1:1 typed reads.
Definition TypedPorts.h:174
bool push(std::unique_ptr< SegmentedMessageData > &msg)
Definition TypedPorts.h:181
TypedReadOwnedCallback< T > OutputCallback
Definition TypedPorts.h:176
std::unique_ptr< T > pendingOutput
Definition TypedPorts.h:218
PODTypeDeserializer(OutputCallback output, WireInfo wireInfo)
Definition TypedPorts.h:178
Shared queue/promise helper for polling-style read APIs.
Definition Ports.h:43
A function call which gets attached to a service port.
Definition Services.h:405
A function call which gets attached to a service port.
Definition Services.h:353
constexpr bool has_type_deserializer_v
Definition TypedPorts.h:156
ChannelPort::ConnectOptions typedFunctionConnectOptions()
Standard ConnectOptions for typed function ports: untranslated frames so the deserializer can see raw...
Definition TypedPorts.h:971
void throwAlreadyConnected()
Throw a clear "already connected" error from connect() paths.
void throwNullFunction()
Throw the standard "null Function pointer" error used by every TypedFunction specialization.
Definition TypedPorts.h:997
std::function< bool(std::unique_ptr< T > &)> TypedReadOwnedCallback
Owning callback used by typed read deserializers.
Definition TypedPorts.h:146
void throwNotConnected()
Throw a clear "not connected" error from call() paths.
void throwCallbackAlreadyConnected()
Throw a clear "already connected" error from TypedCallback connect() paths.
typename DeserializerFor< T >::OutputCallback DeserializerOutputCallback
Definition TypedPorts.h:235
void throwNullCallback()
Throw the standard "null Callback pointer" error used by every TypedCallback specialization.
typename DeserializerSelector< T >::type DeserializerFor
Definition TypedPorts.h:232
std::future< T > awaitDecoded(std::future< std::unique_ptr< T > > inner)
Convert a std::future<std::unique_ptr<T>> (as returned by TypedReadPort::readAsync()) into a std::fut...
Definition TypedPorts.h:983
const MessageData & getMessageDataRef(const SegmentedMessageData &msg, MessageData &scratch)
Definition TypedPorts.h:159
Definition esi.py:1
std::string toString(const std::any &a)
'Stringify' a std::any. This is used to log std::any values by some loggers.
Definition Logging.cpp:132
void verifyTypeCompatibility(const Type *portType)
Definition TypedPorts.h:463
MessageData toMessageData(const T &data, WireInfo wi)
Pack a C++ integral value into a MessageData with the given wire byte count.
Definition TypedPorts.h:87
std::vector< uint32_t > findPortIndices(HWModule *module, const std::string &name)
Return a sorted vector of the idx values for every port whose AppID name matches name.
WireInfo getWireInfo(const Type *portType)
Definition TypedPorts.h:77
BundlePort & findPortOrThrow(HWModule *module, const AppID &id)
Look up a BundlePort by AppID in module.
constexpr bool has_esi_id_v
Definition TypedPorts.h:432
T * findPortAsOrThrow(HWModule *module, const AppID &id)
Look up a BundlePort by AppID and cast it to T.
constexpr bool has_esi_window_id_v
Definition TypedPorts.h:444
constexpr bool is_std_array_v
Definition TypedPorts.h:460
const Type * unwrapTypeAlias(const Type *t)
Unwrap TypeAliasType (possibly recursively) to get the underlying type.
Definition TypedPorts.h:63
T fromMessageData(const MessageData &msg, WireInfo wi)
Unpack a MessageData into a C++ integral value with the given wire info.
Definition TypedPorts.h:110
Compute the wire byte count for a port type.
Definition TypedPorts.h:72
size_t bitWidth
Definition TypedPorts.h:74
PODTypeDeserializer< T > type
Definition TypedPorts.h:223