10#include "gtest/gtest.h"
26TEST(TypedPortsTest, VoidTypeCompatibility) {
28 EXPECT_NO_THROW(verifyTypeCompatibility<void>(&voidType));
35 EXPECT_THROW(verifyTypeCompatibility<void>(&sint32),
39TEST(TypedPortsTest, BoolTypeCompatibility) {
41 EXPECT_NO_THROW(verifyTypeCompatibility<bool>(&bits1));
52TEST(TypedPortsTest, SignedIntTypeCompatibility) {
55 EXPECT_NO_THROW(verifyTypeCompatibility<int32_t>(&sint17));
59 EXPECT_NO_THROW(verifyTypeCompatibility<int32_t>(&sint32));
63 EXPECT_THROW(verifyTypeCompatibility<int32_t>(&sint33),
68 EXPECT_THROW(verifyTypeCompatibility<int32_t>(&sint16),
73 EXPECT_THROW(verifyTypeCompatibility<int32_t>(&sint8),
77 EXPECT_NO_THROW(verifyTypeCompatibility<int8_t>(&sint8));
81 EXPECT_THROW(verifyTypeCompatibility<int32_t>(&uint31),
86 EXPECT_NO_THROW(verifyTypeCompatibility<int64_t>(&sint33b));
90 EXPECT_NO_THROW(verifyTypeCompatibility<int64_t>(&sint64));
94 EXPECT_THROW(verifyTypeCompatibility<int64_t>(&sint65),
98 EXPECT_THROW(verifyTypeCompatibility<int64_t>(&sint32),
102TEST(TypedPortsTest, UnsignedIntTypeCompatibility) {
105 EXPECT_NO_THROW(verifyTypeCompatibility<uint32_t>(&uint17));
109 EXPECT_NO_THROW(verifyTypeCompatibility<uint32_t>(&uint32_t_));
113 EXPECT_THROW(verifyTypeCompatibility<uint32_t>(&uint33),
118 EXPECT_THROW(verifyTypeCompatibility<uint32_t>(&uint16_),
122 EXPECT_NO_THROW(verifyTypeCompatibility<uint16_t>(&uint16_));
126 EXPECT_NO_THROW(verifyTypeCompatibility<uint32_t>(&bits17));
130 EXPECT_NO_THROW(verifyTypeCompatibility<uint32_t>(&bits32));
134 EXPECT_THROW(verifyTypeCompatibility<uint32_t>(&bits33),
139 EXPECT_THROW(verifyTypeCompatibility<uint32_t>(&bits8),
141 EXPECT_NO_THROW(verifyTypeCompatibility<uint8_t>(&bits8));
145 EXPECT_NO_THROW(verifyTypeCompatibility<uint64_t>(&uint33b));
149 EXPECT_NO_THROW(verifyTypeCompatibility<uint64_t>(&uint64_t_));
153 EXPECT_THROW(verifyTypeCompatibility<uint64_t>(&uint65),
157 EXPECT_THROW(verifyTypeCompatibility<uint64_t>(&uint32_t_),
162 EXPECT_THROW(verifyTypeCompatibility<uint32_t>(&sint31),
168 static constexpr std::string_view _ESI_ID =
"MyModule.TestStruct";
173struct DeserializerWithESIID {
174 static constexpr std::string_view _ESI_ID =
"MyModule.DeserializedStruct";
176 class TypeDeserializer
180 using OutputCallback = Base::OutputCallback;
181 using DecodedOutputs = Base::DecodedOutputs;
183 explicit TypeDeserializer(OutputCallback output)
184 : Base(std::move(output)) {}
187 DecodedOutputs decode(std::unique_ptr<SegmentedMessageData> &msg)
override {
194TEST(TypedPortsTest, ESIIDTypeCompatibility) {
196 StructType matchType(
"MyModule.TestStruct", {});
197 EXPECT_NO_THROW(verifyTypeCompatibility<TestStruct>(&matchType));
200 StructType mismatchType(
"OtherModule.OtherStruct", {});
201 EXPECT_THROW(verifyTypeCompatibility<TestStruct>(&mismatchType),
205 UIntType uintWithMatchingID(
"MyModule.TestStruct", 32);
206 EXPECT_NO_THROW(verifyTypeCompatibility<TestStruct>(&uintWithMatchingID));
209TEST(TypedPortsTest, NullPortTypeThrows) {
211 verifyTypeCompatibility<int32_t>(
static_cast<const Type *
>(
nullptr)),
214 verifyTypeCompatibility<void>(
static_cast<const Type *
>(
nullptr)),
219struct UnknownCppType {
223TEST(TypedPortsTest, FallbackThrows) {
225 EXPECT_THROW(verifyTypeCompatibility<UnknownCppType>(&uint32),
239 void connect(
const ConnectOptions &opts = {})
override {
243 void disconnect()
override { connected =
false; }
256 bool connected =
false;
259TEST(TypedPortsTest, TypedWritePortConnectThrowsOnMismatch) {
261 MockWritePort mock(&uint32);
266TEST(TypedPortsTest, TypedWritePortConnectSucceeds) {
268 MockWritePort mock(&sint31);
270 EXPECT_NO_THROW(typed.connect());
271 EXPECT_TRUE(typed.isConnected());
274TEST(TypedPortsTest, TypedWritePortRoundTrip) {
276 MockWritePort mock(&sint15);
284 ASSERT_EQ(mock.lastWritten.getSize(), 2u);
287TEST(TypedPortsTest, SignExtensionNonByteAligned) {
293 EXPECT_EQ(wi.
bytes, 1u);
297 int32_t val = fromMessageData<int32_t>(msg, wi);
306 int32_t val = fromMessageData<int32_t>(msg, wi);
314 EXPECT_EQ(wi.
bytes, 3u);
315 uint8_t wire[3] = {0xFF, 0xFF, 0x3F};
317 int32_t val = fromMessageData<int32_t>(msg, wi);
324 uint8_t wire[3] = {0xFF, 0xFF, 0x1F};
326 int32_t val = fromMessageData<int32_t>(msg, wi);
327 EXPECT_EQ(val, 0x1FFFFF);
331TEST(TypedPortsTest, TypedWritePortVoid) {
333 MockWritePort mock(&voidType);
335 EXPECT_NO_THROW(typed.connect());
338 ASSERT_EQ(mock.lastWritten.getSize(), 1u);
339 EXPECT_EQ(mock.lastWritten.getData()[0], 0);
352 const ConnectOptions & = {})
override {
358 std::future<MessageData>
readAsync()
override {
359 std::promise<MessageData> p;
360 p.set_value(nextResponse);
361 return p.get_future();
367TEST(TypedPortsTest, TypedReadPortCustomDeserializerVerifiesESIID) {
368 StructType matchType(
"MyModule.DeserializedStruct", {});
369 MockReadPort matching(&matchType);
371 EXPECT_NO_THROW(ok.connect());
373 StructType mismatchType(
"OtherModule.OtherStruct", {});
374 MockReadPort mismatch(&mismatchType);
383 bool deliver(std::unique_ptr<SegmentedMessageData> msg) {
385 pending = std::move(msg);
386 return retryPending();
389 bool retryPending() {
391 throw std::runtime_error(
392 "CallbackDrivenMockReadPort::retryPending with no message");
399 bool hasPending()
const {
return static_cast<bool>(pending); }
402 size_t deliveryCount = 0;
405 std::unique_ptr<SegmentedMessageData> pending;
408class ThrowOnCopyReadCallback {
410 explicit ThrowOnCopyReadCallback(std::shared_ptr<bool> shouldThrow)
411 : shouldThrow(std::move(shouldThrow)) {}
413 ThrowOnCopyReadCallback(
const ThrowOnCopyReadCallback &other)
414 : shouldThrow(other.shouldThrow) {
416 throw std::runtime_error(
"ThrowOnCopyReadCallback copy failure");
419 ThrowOnCopyReadCallback(ThrowOnCopyReadCallback &&) =
default;
420 ThrowOnCopyReadCallback &operator=(
const ThrowOnCopyReadCallback &) =
default;
421 ThrowOnCopyReadCallback &operator=(ThrowOnCopyReadCallback &&) =
default;
423 bool operator()(std::unique_ptr<SegmentedMessageData> &)
const {
428 std::shared_ptr<bool> shouldThrow;
431static MessageData packUint32Words(std::initializer_list<uint32_t> values) {
432 std::vector<uint8_t> bytes(values.size() *
sizeof(uint32_t));
434 for (uint32_t value : values) {
435 std::memcpy(bytes.data() + offset, &value,
sizeof(value));
436 offset +=
sizeof(value);
441struct BufferedSequence {
442 std::vector<uint32_t> values;
444 class TypeDeserializer
448 using OutputCallback = Base::OutputCallback;
449 using DecodedOutputs = Base::DecodedOutputs;
451 explicit TypeDeserializer(OutputCallback output)
452 : Base(std::move(output)) {}
455 DecodedOutputs decode(std::unique_ptr<SegmentedMessageData> &msg)
override {
458 detail::getMessageDataRef<BufferedSequence>(*msg, scratch);
459 if (flat.
getSize() %
sizeof(uint32_t) != 0)
460 throw std::runtime_error(
461 "BufferedSequence::TypeDeserializer: truncated word payload");
463 DecodedOutputs decoded;
464 for (
size_t offset = 0; offset < flat.
getSize();
465 offset +=
sizeof(uint32_t)) {
467 std::memcpy(&value, flat.
getBytes() + offset,
sizeof(value));
468 auto sequence = std::make_unique<BufferedSequence>();
469 sequence->values.push_back(value);
470 decoded.push_back(std::move(sequence));
478TEST(TypedPortsTest, TypedReadPortPODBackpressuresAfterOneBufferedOutput) {
480 CallbackDrivenMockReadPort mock(&uint32);
484 typed.setMaxDataQueueMsgs(1);
487 mock.deliver(std::make_unique<MessageData>(packUint32Words({11}))));
492 mock.deliver(std::make_unique<MessageData>(packUint32Words({22}))));
493 EXPECT_FALSE(mock.hasPending());
495 mock.deliver(std::make_unique<MessageData>(packUint32Words({33}))));
496 EXPECT_TRUE(mock.hasPending());
498 std::unique_ptr<uint32_t> first = typed.read();
500 EXPECT_EQ(*first, 11u);
501 EXPECT_TRUE(mock.retryPending());
502 EXPECT_FALSE(mock.hasPending());
503 std::unique_ptr<uint32_t> second = typed.read();
505 EXPECT_EQ(*second, 22u);
506 std::unique_ptr<uint32_t> third = typed.read();
508 EXPECT_EQ(*third, 33u);
511TEST(TypedPortsTest, TypedReadPortPODRetriesSameOwnedObjectOnLaterPush) {
513 CallbackDrivenMockReadPort mock(&uint32);
516 const uint32_t *firstObject =
nullptr;
517 size_t callbackAttempts = 0;
519 typed.connect([&](std::unique_ptr<uint32_t> &value) {
522 if (callbackAttempts == 1) {
523 EXPECT_EQ(*value, 11u);
524 firstObject = value.get();
527 if (callbackAttempts == 2) {
528 EXPECT_EQ(*value, 11u);
529 EXPECT_EQ(value.get(), firstObject);
532 EXPECT_EQ(*value, 22u);
537 mock.deliver(std::make_unique<MessageData>(packUint32Words({11}))));
538 EXPECT_FALSE(mock.hasPending());
542 mock.deliver(std::make_unique<MessageData>(packUint32Words({22}))));
543 EXPECT_FALSE(mock.hasPending());
544 EXPECT_EQ(callbackAttempts, 3u);
547TEST(TypedPortsTest, TypedReadPortCustomDeserializerPokesBlockedOutput) {
549 CallbackDrivenMockReadPort mock(&uint32);
553 typed.setMaxDataQueueMsgs(1);
556 mock.deliver(std::make_unique<MessageData>(packUint32Words({10, 20}))));
557 EXPECT_EQ(mock.deliveryCount, 1u);
559 std::unique_ptr<BufferedSequence> first = typed.read();
561 ASSERT_EQ(first->values.size(), 1u);
562 EXPECT_EQ(first->values[0], 10u);
564 std::unique_ptr<BufferedSequence> second = typed.read();
566 ASSERT_EQ(second->values.size(), 1u);
567 EXPECT_EQ(second->values[0], 20u);
568 EXPECT_EQ(mock.deliveryCount, 1u);
572 TypedReadPortCustomDeserializerConsumesMultipleFramesPerRawMessage) {
574 CallbackDrivenMockReadPort mock(&uint32);
579 std::future<std::unique_ptr<BufferedSequence>> first = typed.readAsync();
580 std::future<std::unique_ptr<BufferedSequence>> second = typed.readAsync();
581 std::future<std::unique_ptr<BufferedSequence>> third = typed.readAsync();
584 mock.deliver(std::make_unique<MessageData>(packUint32Words({1, 2, 3}))));
586 std::unique_ptr<BufferedSequence> firstValue = first.get();
587 ASSERT_TRUE(firstValue);
588 EXPECT_EQ(firstValue->values[0], 1u);
590 std::unique_ptr<BufferedSequence> secondValue = second.get();
591 ASSERT_TRUE(secondValue);
592 EXPECT_EQ(secondValue->values[0], 2u);
594 std::unique_ptr<BufferedSequence> thirdValue = third.get();
595 ASSERT_TRUE(thirdValue);
596 EXPECT_EQ(thirdValue->values[0], 3u);
600 TypedReadPortCustomDeserializerQueuesMultiplePendingOutputs) {
602 CallbackDrivenMockReadPort mock(&uint32);
606 typed.setMaxDataQueueMsgs(1);
609 mock.deliver(std::make_unique<MessageData>(packUint32Words({7, 8, 9}))));
610 EXPECT_EQ(mock.deliveryCount, 1u);
612 std::unique_ptr<BufferedSequence> first = typed.read();
614 EXPECT_EQ(first->values[0], 7u);
616 std::unique_ptr<BufferedSequence> second = typed.read();
618 EXPECT_EQ(second->values[0], 8u);
620 std::unique_ptr<BufferedSequence> third = typed.read();
622 EXPECT_EQ(third->values[0], 9u);
623 EXPECT_EQ(mock.deliveryCount, 1u);
626struct FragmentedCoord {
630static_assert(
sizeof(FragmentedCoord) == 8,
"Size mismatch");
632static std::array<uint8_t,
sizeof(FragmentedCoord)> packCoordBytes(uint32_t y,
634 FragmentedCoord coord{y, x};
635 std::array<uint8_t,
sizeof(FragmentedCoord)> bytes{};
636 std::memcpy(bytes.data(), &coord,
sizeof(coord));
640struct FragmentedCoordBatch {
641 std::vector<FragmentedCoord> coords;
643 class TypeDeserializer
647 using OutputCallback = Base::OutputCallback;
648 using DecodedOutputs = Base::DecodedOutputs;
650 explicit TypeDeserializer(OutputCallback output)
651 : Base(std::move(output)) {}
654 DecodedOutputs decode(std::unique_ptr<SegmentedMessageData> &msg)
override {
657 detail::getMessageDataRef<FragmentedCoordBatch>(*msg, scratch);
659 DecodedOutputs decoded;
660 const uint8_t *bytes = flat.
getBytes();
662 while (offset < flat.
getSize()) {
663 size_t needed =
sizeof(FragmentedCoord) - partialFrameBytes.size();
664 size_t chunkSize = std::min(needed, flat.
getSize() - offset);
665 partialFrameBytes.insert(partialFrameBytes.end(), bytes + offset,
666 bytes + offset + chunkSize);
669 if (partialFrameBytes.size() !=
sizeof(FragmentedCoord))
672 FragmentedCoord coord;
673 std::memcpy(&coord, partialFrameBytes.data(),
sizeof(coord));
674 partialFrameBytes.clear();
676 auto batch = std::make_unique<FragmentedCoordBatch>();
677 batch->coords.push_back(coord);
678 decoded.push_back(std::move(batch));
685 std::vector<uint8_t> partialFrameBytes;
690 TypedReadPortCustomDeserializerConsumesSplitFramesAcrossRawMessages) {
692 CallbackDrivenMockReadPort mock(&uint32);
697 std::future<std::unique_ptr<FragmentedCoordBatch>> first = typed.readAsync();
698 std::future<std::unique_ptr<FragmentedCoordBatch>> second = typed.readAsync();
700 std::array<uint8_t,
sizeof(FragmentedCoord)> coordA = packCoordBytes(10, 20);
701 std::array<uint8_t,
sizeof(FragmentedCoord)> coordB = packCoordBytes(30, 40);
703 std::vector<uint8_t> firstChunk(coordA.begin(), coordA.begin() + 6);
704 EXPECT_TRUE(mock.deliver(
705 std::make_unique<MessageData>(
MessageData(std::move(firstChunk)))));
707 std::vector<uint8_t> secondChunk;
708 secondChunk.insert(secondChunk.end(), coordA.begin() + 6, coordA.end());
709 secondChunk.insert(secondChunk.end(), coordB.begin(), coordB.end());
710 EXPECT_TRUE(mock.deliver(
711 std::make_unique<MessageData>(
MessageData(std::move(secondChunk)))));
713 std::unique_ptr<FragmentedCoordBatch> firstBatch = first.get();
714 ASSERT_TRUE(firstBatch);
715 ASSERT_EQ(firstBatch->coords.size(), 1u);
716 EXPECT_EQ(firstBatch->coords[0].y, 10u);
717 EXPECT_EQ(firstBatch->coords[0].x, 20u);
719 std::unique_ptr<FragmentedCoordBatch> secondBatch = second.get();
720 ASSERT_TRUE(secondBatch);
721 ASSERT_EQ(secondBatch->coords.size(), 1u);
722 EXPECT_EQ(secondBatch->coords[0].y, 30u);
723 EXPECT_EQ(secondBatch->coords[0].x, 40u);
729TEST(TypedPortsTest, TypedFunctionNullThrowsAtConnect) {
735TEST(TypedPortsTest, TypedFunctionCallBeforeConnectThrows) {
738 EXPECT_THROW(typed.call(0u).get(), std::runtime_error);
741TEST(TypedPortsTest, TypedFunctionDoubleConnectThrows) {
745 ChannelType argChanType(
"channel<si24>", &argInner);
747 ChannelType resultChanType(
"channel<ui15>", &resultInner);
750 {
"arg", BundleType::Direction::To, &argChanType},
751 {
"result", BundleType::Direction::From, &resultChanType},
753 BundleType bundleType(
"func_bundle", channels);
755 MockWritePort mockWrite(&argInner);
756 CallbackDrivenMockReadPort mockRead(&resultInner);
759 mockWrite, mockRead);
763 EXPECT_THROW(typed.connect(), std::runtime_error);
767TEST(TypedPortsTest, TypedFunctionConnectVerifiesTypes) {
770 ChannelType argChanType(
"channel<si24>", &argInner);
772 ChannelType resultChanType(
"channel<ui15>", &resultInner);
775 {
"arg", BundleType::Direction::To, &argChanType},
776 {
"result", BundleType::Direction::From, &resultChanType},
778 BundleType bundleType(
"func_bundle", channels);
780 MockWritePort mockWrite(&argInner);
781 MockReadPort mockRead(&resultInner);
784 mockWrite, mockRead);
789 EXPECT_NO_THROW(typed.connect());
793TEST(TypedPortsTest, TypedFunctionConnectRejectsArgMismatch) {
795 ChannelType argChanType(
"channel<ui24>", &argInner);
797 ChannelType resultChanType(
"channel<ui15>", &resultInner);
800 {
"arg", BundleType::Direction::To, &argChanType},
801 {
"result", BundleType::Direction::From, &resultChanType},
803 BundleType bundleType(
"func_bundle", channels);
805 MockWritePort mockWrite(&argInner);
806 MockReadPort mockRead(&resultInner);
809 mockWrite, mockRead);
817TEST(TypedPortsTest, TypedFunctionCallRoundTrip) {
819 ChannelType argChanType(
"channel<si24>", &argInner);
821 ChannelType resultChanType(
"channel<ui15>", &resultInner);
824 {
"arg", BundleType::Direction::To, &argChanType},
825 {
"result", BundleType::Direction::From, &resultChanType},
827 BundleType bundleType(
"func_bundle", channels);
829 MockWritePort mockWrite(&argInner);
830 CallbackDrivenMockReadPort mockRead(&resultInner);
833 mockWrite, mockRead);
839 std::future<uint16_t> f = typed.call(arg);
843 EXPECT_TRUE(mockRead.deliver(std::make_unique<MessageData>(
MessageData(
844 reinterpret_cast<const uint8_t *
>(&expected),
sizeof(expected)))));
845 uint16_t result = f.get();
846 EXPECT_EQ(result, 42);
849 ASSERT_EQ(mockWrite.lastWritten.getSize(), 3u);
867 std::vector<uint32_t> items;
872 return {
reinterpret_cast<const uint8_t *
>(&header),
sizeof(Header)};
873 return {
reinterpret_cast<const uint8_t *
>(items.data()),
874 items.
size() *
sizeof(uint32_t)};
878TEST(TypedPortsTest, ReadChannelPortSegmentedCallbackRetriesSameMessageObject) {
880 CallbackDrivenMockReadPort mock(&uint32);
883 const uint8_t *firstBytes =
nullptr;
884 size_t callbackCalls = 0;
886 mock.connect([&](std::unique_ptr<SegmentedMessageData> &msg) ->
bool {
888 EXPECT_EQ(msg->numSegments(), 1u);
890 auto *flat =
dynamic_cast<MessageData *
>(msg.get());
891 EXPECT_NE(flat,
nullptr);
894 EXPECT_EQ(*flat->
as<uint32_t>(), expected);
896 if (callbackCalls == 1)
899 EXPECT_EQ(flat->
getBytes(), firstBytes);
901 return callbackCalls == 2;
906 EXPECT_TRUE(mock.hasPending());
907 EXPECT_TRUE(mock.retryPending());
908 EXPECT_FALSE(mock.hasPending());
909 EXPECT_EQ(callbackCalls, 2u);
912TEST(TypedPortsTest, ReadChannelPortFlatCallbackFlattensSegmentedMessageRetry) {
914 CallbackDrivenMockReadPort mock(&uint32);
917 input.header.a = 0x12345678;
918 input.header.b = 0xABCD;
919 input.items = {1, 2, 3};
922 size_t callbackCalls = 0;
926 return callbackCalls == 2;
929 EXPECT_FALSE(mock.deliver(std::make_unique<TestSegmented>(input)));
930 EXPECT_TRUE(mock.hasPending());
931 EXPECT_TRUE(mock.retryPending());
932 EXPECT_FALSE(mock.hasPending());
933 EXPECT_EQ(callbackCalls, 2u);
936TEST(TypedPortsTest, ReadChannelPortPollingRetriesFlattenedSegmentedMessage) {
938 CallbackDrivenMockReadPort mock(&uint32);
940 mock.setMaxDataQueueMsgs(1);
942 TestSegmented firstInput;
943 firstInput.header.a = 0xAAAA5555;
944 firstInput.header.b = 0x1357;
945 firstInput.items = {10};
946 MessageData firstExpected = firstInput.toMessageData();
948 TestSegmented secondInput;
949 secondInput.header.a = 0xDEADBEEF;
950 secondInput.header.b = 0x2468;
951 secondInput.items = {20, 30};
952 MessageData secondExpected = secondInput.toMessageData();
954 EXPECT_TRUE(mock.deliver(std::make_unique<TestSegmented>(firstInput)));
955 EXPECT_FALSE(mock.deliver(std::make_unique<TestSegmented>(secondInput)));
956 EXPECT_TRUE(mock.hasPending());
962 EXPECT_TRUE(mock.retryPending());
963 EXPECT_FALSE(mock.hasPending());
966 mock.read(secondOut);
970TEST(TypedPortsTest, ReadChannelPortPollingReadAsyncThrowsWhenDisconnected) {
972 CallbackDrivenMockReadPort mock(&uint32);
974 EXPECT_THROW(mock.readAsync(), std::runtime_error);
977 std::future<MessageData> pending = mock.readAsync();
980 EXPECT_EQ(pending.wait_for(std::chrono::milliseconds(0)),
981 std::future_status::ready);
982 EXPECT_THROW(pending.get(), std::future_error);
983 EXPECT_THROW(mock.readAsync(), std::runtime_error);
987 mock.deliver(std::make_unique<MessageData>(packUint32Words({11}))));
990 EXPECT_EQ(*out.
as<uint32_t>(), 11u);
993TEST(TypedPortsTest, ReadChannelPortPollingConnectRejectsReconnect) {
995 CallbackDrivenMockReadPort mock(&uint32);
998 EXPECT_THROW(mock.connect(), std::runtime_error);
1001TEST(TypedPortsTest, ReadChannelPortDisconnectRevokesCallback) {
1003 CallbackDrivenMockReadPort mock(&uint32);
1009 mock.deliver(std::make_unique<MessageData>(packUint32Words({11}))));
1010 EXPECT_TRUE(mock.hasPending());
1013 EXPECT_TRUE(mock.retryPending());
1014 EXPECT_FALSE(mock.hasPending());
1018 EXPECT_EQ(*out.
as<uint32_t>(), 11u);
1021TEST(TypedPortsTest, TypedReadPortDestructorDisconnectsRawPort) {
1023 CallbackDrivenMockReadPort mock(&uint32);
1028 EXPECT_TRUE(mock.isConnected());
1031 EXPECT_FALSE(mock.isConnected());
1033 mock.deliver(std::make_unique<MessageData>(packUint32Words({11}))));
1034 EXPECT_TRUE(mock.hasPending());
1037 reconnected.connect();
1038 EXPECT_TRUE(mock.retryPending());
1039 EXPECT_FALSE(mock.hasPending());
1041 std::unique_ptr<uint32_t> out = reconnected.read();
1043 EXPECT_EQ(*out, 11u);
1047 ReadChannelPortInvokeCallbackMaintainsCountOnCallbackCopyFailure) {
1049 CallbackDrivenMockReadPort mock(&uint32);
1050 auto shouldThrow = std::make_shared<bool>(
false);
1052 mock.connect(ThrowOnCopyReadCallback(shouldThrow));
1053 *shouldThrow =
true;
1056 mock.deliver(std::make_unique<MessageData>(packUint32Words({11}))),
1057 std::runtime_error);
1058 EXPECT_EQ(mock.numActiveCallbacks(), 0u);
1061TEST(TypedPortsTest, TypedWritePortSegmentedMessageData) {
1064 MockWritePort mock(&uint32);
1070 msg.header.a = 0x12345678;
1071 msg.header.b = 0xABCD;
1072 msg.items = {1, 2, 3};
1078 EXPECT_EQ(mock.lastWritten.getSize(), 18u);
1081 const uint8_t *bytes = mock.lastWritten.getBytes();
1084 std::memcpy(&gotA, bytes, 4);
1085 std::memcpy(&gotB, bytes + 4, 2);
1086 EXPECT_EQ(gotA, 0x12345678u);
1087 EXPECT_EQ(gotB, 0xABCDu);
1090 uint32_t gotItems[3];
1091 std::memcpy(gotItems, bytes + 6, 12);
1092 EXPECT_EQ(gotItems[0], 1u);
1093 EXPECT_EQ(gotItems[1], 2u);
1094 EXPECT_EQ(gotItems[2], 3u);
1097TEST(TypedPortsTest, TypedWritePortSegmentedNoTypeCheck) {
1101 MockWritePort mock(&sint8);
1104 EXPECT_NO_THROW(typed.connect());
1113 EXPECT_EQ(mock.lastWritten.getSize(), 10u);
1116TEST(TypedPortsTest, TypedFunctionSegmentedArg) {
1120 ChannelType argChanType(
"channel<ui32>", &argInner);
1122 ChannelType resultChanType(
"channel<ui16>", &resultInner);
1125 {
"arg", BundleType::Direction::To, &argChanType},
1126 {
"result", BundleType::Direction::From, &resultChanType},
1128 BundleType bundleType(
"func_bundle", channels);
1130 MockWritePort mockWrite(&argInner);
1131 CallbackDrivenMockReadPort mockRead(&resultInner);
1134 mockWrite, mockRead);
1140 arg.header.a = 0xDEAD;
1141 arg.header.b = 0xBE;
1142 arg.items = {10, 20};
1144 std::future<uint16_t> f = typed.call(arg);
1147 EXPECT_TRUE(mockRead.deliver(std::make_unique<MessageData>(
MessageData(
1148 reinterpret_cast<const uint8_t *
>(&expected),
sizeof(expected)))));
1149 uint16_t result = f.get();
1150 EXPECT_EQ(result, 99u);
1153 EXPECT_EQ(mockWrite.lastWritten.getSize(), 14u);
1155 const uint8_t *bytes = mockWrite.lastWritten.getBytes();
1157 std::memcpy(&gotA, bytes, 4);
1158 EXPECT_EQ(gotA, 0xDEADu);
1160 uint32_t gotItem0, gotItem1;
1161 std::memcpy(&gotItem0, bytes + 6, 4);
1162 std::memcpy(&gotItem1, bytes + 10, 4);
1163 EXPECT_EQ(gotItem0, 10u);
1164 EXPECT_EQ(gotItem1, 20u);
1170struct UnrecognizedCppType {
1175TEST(TypedPortsTest, VerifyTypeCompatibilityThrowsForUnsupportedType) {
1177 EXPECT_THROW(verifyTypeCompatibility<UnrecognizedCppType>(&uint32),
1181 EXPECT_THROW(verifyTypeCompatibility<UnrecognizedCppType>(&sint16),
1201 class TypeDeserializer {
1205 explicit TypeDeserializer(OutputCallback output)
1206 : output(std::move(output)) {}
1208 bool push(std::unique_ptr<SegmentedMessageData> &msg) {
1211 if (!output(pending))
1218 detail::getMessageDataRef<OneWord>(*msg, scratch);
1219 if (flat.
getSize() !=
sizeof(uint32_t))
1220 throw std::runtime_error(
"OneWord: bad size");
1221 pending = std::make_unique<OneWord>();
1222 std::memcpy(&pending->value, flat.
getBytes(),
sizeof(uint32_t));
1226 if (output(pending))
1232 if (pending && output(pending)) {
1240 OutputCallback output;
1241 std::unique_ptr<OneWord> pending;
1245TEST(TypedPortsTest, TypedFunctionCustomDeserializerSingleFrame) {
1247 ChannelType argChanType(
"channel<ui32>", &argInner);
1249 ChannelType resultChanType(
"channel<ui32>", &resultInner);
1252 {
"arg", BundleType::Direction::To, &argChanType},
1253 {
"result", BundleType::Direction::From, &resultChanType},
1255 BundleType bundleType(
"func_bundle", channels);
1257 MockWritePort mockWrite(&argInner);
1258 CallbackDrivenMockReadPort mockRead(&resultInner);
1261 mockWrite, mockRead);
1266 std::future<OneWord> f = typed.call(0u);
1270 EXPECT_TRUE(mockRead.deliver(std::make_unique<MessageData>(
MessageData(
1271 reinterpret_cast<const uint8_t *
>(&expected),
sizeof(expected)))));
1273 OneWord result = f.get();
1274 EXPECT_EQ(result.value, expected);
1279TEST(TypedPortsTest, TypedFunctionCustomDeserializerReadsMultipleFrames) {
1285 ChannelType argChanType(
"channel<ui32>", &argInner);
1287 ChannelType resultChanType(
"channel<ui32>", &resultInner);
1290 {
"arg", BundleType::Direction::To, &argChanType},
1291 {
"result", BundleType::Direction::From, &resultChanType},
1293 BundleType bundleType(
"func_bundle", channels);
1295 MockWritePort mockWrite(&argInner);
1296 CallbackDrivenMockReadPort mockRead(&resultInner);
1299 mockWrite, mockRead);
1304 std::future<FragmentedCoordBatch> f = typed.call(0u);
1307 std::array<uint8_t,
sizeof(FragmentedCoord)> coordBytes =
1308 packCoordBytes(55, 77);
1309 std::vector<uint8_t> firstChunk(coordBytes.begin(), coordBytes.begin() + 6);
1310 std::vector<uint8_t> secondChunk(coordBytes.begin() + 6, coordBytes.end());
1312 mockRead.deliver(std::make_unique<MessageData>(std::move(firstChunk))));
1314 mockRead.deliver(std::make_unique<MessageData>(std::move(secondChunk))));
1316 FragmentedCoordBatch batch = f.get();
1317 ASSERT_EQ(batch.coords.size(), 1u);
1318 EXPECT_EQ(batch.coords[0].y, 55u);
1319 EXPECT_EQ(batch.coords[0].x, 77u);
1324TEST(TypedPortsTest, TypedFunctionCustomDeserializerSkipsResultTypeCheck) {
1329 ChannelType argChanType(
"channel<ui32>", &argInner);
1332 ChannelType resultChanType(
"channel<Anything>", &resultInner);
1335 {
"arg", BundleType::Direction::To, &argChanType},
1336 {
"result", BundleType::Direction::From, &resultChanType},
1338 BundleType bundleType(
"func_bundle", channels);
1340 MockWritePort mockWrite(&argInner);
1341 CallbackDrivenMockReadPort mockRead(&resultInner);
1344 mockWrite, mockRead);
1347 EXPECT_NO_THROW(typed.connect());
1352TEST(TypedPortsTest, TypedFunctionPipelinedCallsGetOutOfOrder) {
1359 ChannelType argChanType(
"channel<ui32>", &argInner);
1361 ChannelType resultChanType(
"channel<ui32>", &resultInner);
1364 {
"arg", BundleType::Direction::To, &argChanType},
1365 {
"result", BundleType::Direction::From, &resultChanType},
1367 BundleType bundleType(
"func_bundle", channels);
1369 MockWritePort mockWrite(&argInner);
1370 CallbackDrivenMockReadPort mockRead(&resultInner);
1373 mockWrite, mockRead);
1380 std::future<FragmentedCoordBatch> f1 = typed.call(0u);
1381 std::future<FragmentedCoordBatch> f2 = typed.call(0u);
1383 auto deliverCoord = [&](uint32_t y, uint32_t x) {
1384 auto bytes = packCoordBytes(y, x);
1385 std::vector<uint8_t> first(bytes.begin(), bytes.begin() + 6);
1386 std::vector<uint8_t> second(bytes.begin() + 6, bytes.end());
1388 mockRead.deliver(std::make_unique<MessageData>(std::move(first))));
1390 mockRead.deliver(std::make_unique<MessageData>(std::move(second))));
1392 deliverCoord(11, 22);
1393 deliverCoord(33, 44);
1396 FragmentedCoordBatch r2 = f2.get();
1397 ASSERT_EQ(r2.coords.size(), 1u);
1398 EXPECT_EQ(r2.coords[0].y, 33u);
1399 EXPECT_EQ(r2.coords[0].x, 44u);
1401 FragmentedCoordBatch r1 = f1.get();
1402 ASSERT_EQ(r1.coords.size(), 1u);
1403 EXPECT_EQ(r1.coords[0].y, 11u);
1404 EXPECT_EQ(r1.coords[0].x, 22u);
Bits are just an array of bits.
Bundles represent a collection of channels.
std::vector< std::tuple< std::string, Direction, const Type * > > ChannelVector
virtual void connectImpl(const ConnectOptions &options)
Called by all connect methods to let backends initiate the underlying connections.
Channels are the basic communication primitives.
A concrete flat message backed by a single vector of bytes.
const std::vector< uint8_t > & getData() const
Get the data as a vector of bytes.
const uint8_t * getBytes() const
const T * as() const
Cast to a type.
size_t getSize() const
Get the size of the data in bytes.
static MessageData from(T &t)
Cast from a type to its raw bytes.
Helper base class for stateful deserializers which may emit zero, one, or many typed outputs for each...
A ChannelPort which reads data from the accelerator.
virtual void connect(ReadCallback callback, const ConnectOptions &options={})
virtual std::future< MessageData > readAsync()
Asynchronous polling read.
bool invokeCallback(std::unique_ptr< SegmentedMessageData > &msg)
Invoke the currently registered callback.
virtual void read(MessageData &outData)
Specify a buffer to read into.
Abstract multi-segment message.
virtual Segment segment(size_t idx) const =0
Get a segment by index.
virtual size_t numSegments() const =0
Number of segments in the message.
Structs are an ordered collection of fields, each with a name and a type.
Root class of the ESI type system.
Strongly typed wrapper around a raw read channel.
The "void" type is a special type which can be used to represent no type.
A ChannelPort which sends data to the accelerator.
virtual bool isConnected() const override
virtual void disconnect() override
virtual bool tryWriteImpl(const MessageData &data)=0
Implementation for tryWrite(). Subclasses must implement this.
virtual void connect(const ConnectOptions &options={}) override
Set up a connection to the accelerator.
virtual void writeImpl(const MessageData &)=0
Implementation for write(). Subclasses must implement this.
static Function * get(AppID id, BundleType *type, WriteChannelPort &arg, ReadChannelPort &result)
std::function< bool(std::unique_ptr< T > &)> TypedReadOwnedCallback
Owning callback used by typed read deserializers.
WireInfo getWireInfo(const Type *portType)
A contiguous, non-owning view of bytes within a SegmentedMessageData.
Compute the wire byte count for a port type.