1"""Hardware design for the codegen + port-kind coverage integration test.
3Where ``serialization_probes.py`` exercises wire-format invariants, this design
4exercises the *port-kind* surface area of the ESI runtime + facade codegen.
5Each probe module here is named for the codegen / runtime path it exercises
6so a regression in any single path lights up exactly one driver assertion.
11import pycde.esi
as esi
12from pycde
import AppID, Clock, Module, Reset, System, generator
14from pycde.common
import Constant
15from pycde.constructs
import ControlReg, Counter, Reg, Wire
16from pycde.esi
import ListWindowToParallel, ListWindowToSerial
17from pycde.signals
import Struct
18from pycde.types
import (Array, Bits, Bundle, BundledChannel, Channel,
19 ChannelDirection, ChannelSignaling, List, SInt,
20 StructType, TypeAlias, UInt, Window)
26SendI8 = Bundle([BundledChannel(
"send", ChannelDirection.FROM, Bits(8))])
27RecvI8 = Bundle([BundledChannel(
"recv", ChannelDirection.TO, Bits(8))])
28SendI0 = Bundle([BundledChannel(
"send", ChannelDirection.FROM, Bits(0))])
29RecvI0 = Bundle([BundledChannel(
"recv", ChannelDirection.TO, Bits(0))])
45 """Typed function with a multi-field argument struct.
47 Exercises ``TypedFunction``'s emplace-style ``call(...)`` overload, which
48 forwards its arguments into the generated arg struct's constructor so the
49 C++ driver can call ``connected->call(a, b)`` instead of building the
50 struct itself. The body computes ``a * b`` and returns it as ``ui32``.
58 result_wire = Wire(Channel(UInt(32)))
64 args = esi.FuncService.get_call_chans(AppID(
"call"),
68 arg, valid = args.unwrap(ready)
69 product = (arg[
"a"] * arg[
"b"]).as_uint(32)
70 out_chan, out_ready = Channel(UInt(32)).
wrap(product, valid)
71 ready.assign(out_ready)
72 result_wire.assign(out_chan)
76 """Typed function with a void argument (typed-result specialization).
78 The C++ driver invokes ``connected->call().get()`` and asserts the constant
79 token comes back. Hardware sends ``0xCAFEF00D`` on every call so a wrong-
80 byte-order bug in the result path fails distinguishably.
88 result_wire = Wire(Channel(UInt(32)))
89 args = esi.FuncService.get_call_chans(AppID(
"call"),
93 _, valid = args.unwrap(ready)
94 token = UInt(32)(0xCAFEF00D)
95 out_chan, out_ready = Channel(UInt(32)).
wrap(token, valid)
96 ready.assign(out_ready)
97 result_wire.assign(out_chan)
106 """Typed function with a void result (typed-arg specialization).
108 Hardware accepts the request and returns a one-byte zero (the void-result
109 wire encoding). No state is observable other than that the call completes;
110 the test asserts the future resolves without throwing.
118 result_wire = Wire(Channel(Bits(0)))
119 args = esi.FuncService.get_call_chans(AppID(
"call"),
122 ready = Wire(Bits(1))
123 _, valid = args.unwrap(ready)
124 out_chan, out_ready = Channel(Bits(0)).
wrap(Bits(0)(0), valid)
125 ready.assign(out_ready)
126 result_wire.assign(out_chan)
135 """Hardware-initiated call into the host via `CallService`.
137 The trigger is an MMIO write at offset ``0x10`` (whose write data forms
138 the ``payload``) so the driver can deterministically time when the
139 callback fires. The callback returns no payload (``Bits(0)`` is the void
140 encoding); the host-side handler increments a counter so the driver can
141 assert it actually ran.
152 mmio_bundle = esi.MMIO.read_write(appid=AppID(
"trigger"))
153 data_resp_chan = Wire(Channel(Bits(64)))
154 cmd_chan = mmio_bundle.unpack(data=data_resp_chan)[
"cmd"]
160 xact, cmd = cmd_chan.snoop_xact()
163 data_resp_chan.assign(cmd_chan.transform(
lambda c: Bits(64)(c.data)))
168 is_trigger = (cmd.offset == UInt(32)(0x10))
169 trigger_xact = xact & is_trigger
173 data_reg = cmd.data.as_uint(32).reg(clk, rst, ce=trigger_xact)
174 notify_args =
NotifyArgs(tag=UInt(8)(0xA5), payload=data_reg)
180 cb_consumed = Wire(Bits(1))
181 cb_valid = ControlReg(clk,
183 asserts=[trigger_xact],
184 resets=[cb_consumed])
185 cb_chan, cb_ready = Channel(NotifyArgs).
wrap(notify_args, cb_valid)
186 cb_consumed.assign(cb_valid & cb_ready)
187 esi.CallService.call(AppID(
"callback"), cb_chan, Bits(0))
196 """To-host channel of `EventStruct`: TypedReadPort polling of a struct.
198 Hardware pushes a small bounded sequence of distinct events on reset
199 release so the driver can read N items and check exact values. Each event
200 has ``ts = i+1`` and ``val = -(i+1)`` so an off-by-one or sign bug shows
207 num_events = Constant(UInt(8), 4)
217 ready_wire = Wire(Bits(1))
218 increment_wire = Wire(Bits(1))
219 counter = Counter(8)(clk=clk,
222 increment=increment_wire,
223 instance_name=
"event_counter")
224 valid = counter.out < TypedReadChannelStruct.num_events.value
225 increment_wire.assign(valid & ready_wire)
227 one_based = (counter.out + UInt(8)(1)).as_uint(8)
228 ts = one_based.as_uint(64)
231 one_based_u32 = one_based.as_uint(32)
232 neg = (UInt(32)(0) - one_based_u32).as_sint(32)
234 out_chan, out_ready = Channel(EventStruct).
wrap(event, valid)
235 ready_wire.assign(out_ready)
236 esi.ChannelService.to_host(AppID(
"data"), out_chan)
240 """From-host channel of ``ui8``: TypedWritePort + MMIO accumulator readback.
242 Hardware accepts every byte and XORs each one into a running register; the
243 latest XOR-accumulator value is exposed via the ``accumulator`` MMIO read
244 region so the driver can assert what it sent actually arrived. An always-
245 ready receiver is fine here because the test sends a small known sequence.
256 chan = esi.ChannelService.from_host(AppID(
"data"), Bits(8))
257 always_ready = Bits(1)(1)
258 data, valid = chan.unwrap(always_ready)
260 acc = Reg(Bits(8), clk=clk, rst=rst, rst_value=0, ce=valid, name=
"cmds_acc")
261 acc.assign(acc ^ data)
265 mmio_bundle = esi.MMIO.read(appid=AppID(
"accumulator"))
266 resp_chan = Wire(Channel(Bits(64)))
267 addr_chan = mmio_bundle.unpack(data=resp_chan)[
"offset"]
268 resp_chan.assign(addr_chan.transform(
lambda _: acc.as_bits(64)))
272 """MMIO read/write region that loops back the most recent write.
274 The test writes a value to offset 0x10 and reads it back; whatever was
275 last written at any 8-byte-aligned offset is what the read returns. Stores all
276 writes for simplicity.
287 mmio_bundle = esi.MMIO.read_write(appid=AppID(
"region"))
288 resp_chan = Wire(Channel(Bits(64)))
289 cmd_chan = mmio_bundle.unpack(data=resp_chan)[
"cmd"]
291 cmd_ready = Wire(Bits(1))
292 cmd, cmd_valid = cmd_chan.unwrap(cmd_ready)
294 write_handshake = cmd_valid & cmd.write
295 storage = Reg(Bits(64),
301 storage.assign(cmd.data)
304 response, resp_ready = Channel(Bits(64)).
wrap(storage, cmd_valid)
305 cmd_ready.assign(resp_ready)
306 resp_chan.assign(response)
310 """Free-running ``ui64`` cycle counter exposed as a telemetry metric.
312 Hardware increments the counter every clock; the host reads it twice and
313 asserts the second read is strictly greater than the first (cycle counts
314 are monotonic between any two host-visible reads).
322 cycle_cnt = Counter(64)(clk=ports.clk,
325 increment=Bits(1)(1),
326 instance_name=
"cycleCounter")
327 esi.Telemetry.report_signal(ports.clk, ports.rst, AppID(
"cycleCount"),
332 """Module exposing an indexed array of typed-function ports.
334 Instantiates ``num_entries`` ``FuncService`` ports under the same appid name
335 ``call`` with indices 0..N-1 -- the codegen groups them into a single
336 ``IndexedPorts<TypedFunction<...>>`` member, which the C++ driver iterates
337 over (``connected->call[i]``). Each entry returns ``arg + (i+1)``, so the
338 driver can verify it talked to the right index by sending the same arg to
339 every entry and comparing replies.
342 num_entries = Constant(UInt(8), 3)
349 for i
in range(IndexedFuncGroup.num_entries.value):
351 result_wire = Wire(Channel(UInt(16)))
352 args = esi.FuncService.get_call_chans(AppID(
"call", i),
355 ready = Wire(Bits(1))
356 arg, valid = args.unwrap(ready)
357 sum_ = (arg + UInt(16)(addend)).as_uint(16)
358 out_chan, out_ready = Channel(UInt(16)).
wrap(sum_, valid)
359 ready.assign(out_ready)
360 result_wire.assign(out_chan)
364 """Custom-`@esi.ServiceDecl` raw-channel loopback.
366 Connects ``HostComms.Recv`` -> ``HostComms.Send`` for an 8-bit byte stream
367 (host writes, HW echoes back) and ``VoidComms.Recv`` -> ``VoidComms.Send``
368 for a zero-bit "tick" stream. Both pairs are exposed via custom service
369 decls rather than the standard `ChannelService`. ``Top`` instantiates two
370 copies under indexed appids so the test also covers same-name multi-instance
371 hierarchy resolution.
378 data_in = HostComms.Recv(AppID(
"byte_in")).unpack()[
"recv"]
379 HostComms.Send(AppID(
"byte_out")).unpack(send=data_in)
381 void_in = VoidComms.Recv(AppID(
"void_in")).unpack()[
"recv"]
382 VoidComms.Send(AppID(
"void_out")).unpack(send=void_in)
396 """Typed function: small struct -> small struct.
398 Returns ``{x = b+1, y = b}`` so the host can verify the arithmetic and
399 the order of struct fields end-to-end.
407 result_wire = Wire(Channel(StructResult))
408 args = esi.FuncService.get_call_chans(AppID(
"call"),
411 ready = Wire(Bits(1))
412 arg, valid = args.unwrap(ready)
414 plus_one = (b + SInt(8)(1)).as_sint(8)
416 out_chan, out_ready = Channel(StructResult).
wrap(result, valid)
417 ready.assign(out_ready)
418 result_wire.assign(out_chan)
434 """Typed function: nested odd-bit-width struct round-trip with arithmetic
435 on every field. Each field gets a distinct addend so a swap of any two
436 fields fails distinguishably."""
443 result_wire = Wire(Channel(OddStruct))
444 args = esi.FuncService.get_call_chans(AppID(
"call"),
447 ready = Wire(Bits(1))
448 arg, valid = args.unwrap(ready)
449 a = (arg[
"a"] + UInt(12)(1)).as_uint(12)
450 b = (arg[
"b"] + SInt(7)(-3)).as_sint(7)
452 p = (inner[
"p"] + UInt(8)(5)).as_uint(8)
453 q = (inner[
"q"] + SInt(8)(2)).as_sint(8)
454 r0 = (inner[
"r"][0] + UInt(8)(1)).as_uint(8)
455 r1 = (inner[
"r"][1] + UInt(8)(2)).as_uint(8)
456 new_inner =
OddInner(p=p, q=q, r=[r0, r1])
457 result =
OddStruct(a=a, b=b, inner=new_inner)
458 out_chan, out_ready = Channel(OddStruct).
wrap(result, valid)
459 ready.assign(out_ready)
460 result_wire.assign(out_chan)
464 """Typed function: ``si4 -> si4`` identity.
466 The driver tests positive, negative, and the si4 boundary values to
467 exercise sign extension at a sub-byte width through the typed facade.
475 result_wire = Wire(Channel(SInt(4)))
476 args = esi.FuncService.get_call_chans(AppID(
"call"),
479 ready = Wire(Bits(1))
480 arg, valid = args.unwrap(ready)
481 out_chan, out_ready = Channel(SInt(4)).
wrap(arg, valid)
482 ready.assign(out_ready)
483 result_wire.assign(out_chan)
486ArrayArg = SInt(8) * 1
487ArrayResult = TypeAlias(SInt(8) * 2,
"ArrayResult")
491 """Typed function with an array result.
493 Receives a one-element array and returns a two-element array containing
494 the input element and ``input + 1``. Exercises the typed facade's
495 ``std::array`` path end-to-end.
503 result_wire = Wire(Channel(ArrayResult))
504 args = esi.FuncService.get_call_chans(AppID(
"call"),
507 ready = Wire(Bits(1))
508 arg, valid = args.unwrap(ready)
510 plus_one = (elem + SInt(8)(1)).as_sint(8)
513 out_chan, out_ready = Channel(ArrayResult).
wrap(result_array, valid)
514 ready.assign(out_ready)
515 result_wire.assign(out_chan)
521_TRANSFORM_LIST_BULK_WIDTH = 16
522_TRANSFORM_LIST_ITEMS_PER_FRAME = 1
529_TRANSFORM_LIST_STRUCT = StructType([(
"data", List(TransformListItem))])
530_transform_list_window = Window.serial_of(_TRANSFORM_LIST_STRUCT,
531 _TRANSFORM_LIST_BULK_WIDTH,
532 _TRANSFORM_LIST_ITEMS_PER_FRAME)
536 """Typed function: ``window<list<si32>> -> window<list<si32>>``.
538 Doubles each element of the input list and emits the result as another
539 serial-burst windowed list. Driving the burst protocol is delegated to
540 `ListWindowToParallel` / `ListWindowToSerial` so this module only has to
541 describe the per-element transform.
549 result_chan = Wire(Channel(_transform_list_window))
550 args = esi.FuncService.get_call_chans(AppID(
"call"),
551 arg_type=_transform_list_window,
554 s2p = ListWindowToParallel(_transform_list_window)(clk=ports.clk,
557 parallel_in = s2p.parallel_out
559 par_ready = Wire(Bits(1))
560 par_window, par_valid = parallel_in.unwrap(par_ready)
561 par_struct = par_window.unwrap()
563 in_item = par_struct[
"data"]
564 last_bit = par_struct[
"last"]
565 in_v = in_item[
"v"].as_uint(32)
566 doubled = (in_v + in_v).as_bits(32)
569 parallel_result_window_type = Window.default_of(_TRANSFORM_LIST_STRUCT)
570 parallel_result_struct = parallel_result_window_type.lowered_type({
574 parallel_result_window = parallel_result_window_type.wrap(
575 parallel_result_struct)
576 parallel_result_chan, par_result_ready = Channel(
577 parallel_result_window_type).
wrap(parallel_result_window, par_valid)
578 par_ready.assign(par_result_ready)
580 p2s = ListWindowToSerial(parallel_result_window_type,
581 _TRANSFORM_LIST_BULK_WIDTH,
582 _TRANSFORM_LIST_ITEMS_PER_FRAME,
586 parallel_in=parallel_result_chan)
587 result_chan.assign(p2s.serial_out)
593_WINDOW_PROBE_TAG = 0xCAFE
594_WINDOW_PROBE_ITEMS = [10, 20, 30, 40]
595_WINDOW_PROBE_BULK_WIDTH = 16
596_WINDOW_PROBE_ITEMS_PER_FRAME = 1
597_window_probe_struct = StructType([(
"tag", Bits(16)),
598 (
"items", List(Bits(32)))])
599_window_probe_window = Window.serial_of(_window_probe_struct,
600 _WINDOW_PROBE_BULK_WIDTH,
601 _WINDOW_PROBE_ITEMS_PER_FRAME)
605 """HW-initiated callback whose argument is a windowed list with header.
607 Combines the callback pattern (``CallService.call``) with the serial-burst
608 windowed list payload. An MMIO write at offset ``0x10`` arms one burst;
609 the HW then sends the same ``{tag=0xCAFE, items=[10,20,30,40]}`` pattern
610 used by the channel probes into the host callback. The host handler
611 verifies the payload and the callback returns void (``Bits(0)``).
623 trigger_bundle = esi.MMIO.read_write(appid=AppID(
"trigger"))
624 resp_chan = Wire(Channel(Bits(64)))
625 cmd_chan = trigger_bundle.unpack(data=resp_chan)[
"cmd"]
626 cmd_xact, cmd = cmd_chan.snoop_xact()
627 resp_chan.assign(cmd_chan.transform(
lambda c: Bits(64)(c.data)))
628 trigger_xact = cmd_xact & (cmd.offset == UInt(32)(0x10))
630 n_items = len(_WINDOW_PROBE_ITEMS)
631 burst_end = Wire(Bits(1))
633 armed = ControlReg(clk, rst, asserts=[trigger_xact], resets=[burst_end])
635 par_ready = Wire(Bits(1))
636 handshake = armed & par_ready
637 idx_counter = Counter(2)(clk=clk,
641 instance_name=
"cb_window_idx")
642 idx = idx_counter.out
643 last_bit = (idx == UInt(2)(n_items - 1))
644 burst_end.assign(handshake & last_bit)
647 Bits(32), len(_WINDOW_PROBE_ITEMS))(_WINDOW_PROBE_ITEMS)[idx.as_bits()]
649 parallel_window_type = Window.default_of(_window_probe_struct)
650 par_struct = parallel_window_type.lowered_type({
651 "tag": Bits(16)(_WINDOW_PROBE_TAG),
655 par_window = parallel_window_type.wrap(par_struct)
656 parallel_chan, parallel_ready = Channel(parallel_window_type).
wrap(
658 par_ready.assign(parallel_ready)
660 p2s = ListWindowToSerial(parallel_window_type, _WINDOW_PROBE_BULK_WIDTH,
661 _WINDOW_PROBE_ITEMS_PER_FRAME,
662 4)(clk=clk, rst=rst, parallel_in=parallel_chan)
664 esi.CallService.call(AppID(
"callback"), p2s.serial_out, Bits(0))
668 """To-host channel of ``window<{tag, list<si32>}>``.
670 Exercises the typed read path for windowed-list-with-header on a raw
671 channel (no `TypedFunction` orchestrator on top). The driver writes any
672 value to offset ``0x10`` of the ``trigger`` MMIO region to arm one burst;
673 the HW then emits exactly one burst (``tag = 0xCAFE`` and the four-element
674 list ``[10, 20, 30, 40]``) and goes idle. Free-running emission would
675 unboundedly fill the host runtime's polling queue, so each burst is gated
676 on an explicit trigger.
690 trigger_bundle = esi.MMIO.read_write(appid=AppID(
"trigger"))
691 resp_chan = Wire(Channel(Bits(64)))
692 cmd_chan = trigger_bundle.unpack(data=resp_chan)[
"cmd"]
693 cmd_xact, cmd = cmd_chan.snoop_xact()
694 resp_chan.assign(cmd_chan.transform(
lambda c: Bits(64)(c.data)))
695 trigger_xact = cmd_xact & (cmd.offset == UInt(32)(0x10))
697 n_items = len(_WINDOW_PROBE_ITEMS)
698 burst_end = Wire(Bits(1))
702 armed = ControlReg(clk, rst, asserts=[trigger_xact], resets=[burst_end])
706 par_ready = Wire(Bits(1))
707 handshake = armed & par_ready
708 idx_counter = Counter(2)(clk=clk,
712 instance_name=
"window_read_idx")
713 idx = idx_counter.out
714 last_bit = (idx == UInt(2)(n_items - 1))
715 burst_end.assign(handshake & last_bit)
717 Bits(32), len(_WINDOW_PROBE_ITEMS))(_WINDOW_PROBE_ITEMS)[idx.as_bits()]
721 parallel_window_type = Window.default_of(_window_probe_struct)
722 par_struct = parallel_window_type.lowered_type({
723 "tag": Bits(16)(_WINDOW_PROBE_TAG),
727 par_window = parallel_window_type.wrap(par_struct)
728 parallel_chan, parallel_ready = Channel(parallel_window_type).
wrap(
730 par_ready.assign(parallel_ready)
732 p2s = ListWindowToSerial(parallel_window_type, _WINDOW_PROBE_BULK_WIDTH,
733 _WINDOW_PROBE_ITEMS_PER_FRAME,
734 4)(clk=clk, rst=rst, parallel_in=parallel_chan)
735 esi.ChannelService.to_host(AppID(
"data"), p2s.serial_out)
739 """From-host channel of ``window<{tag, list<si32>}>``.
741 Exercises the typed write path for windowed-list-with-header on a raw
742 channel. Hardware receives one burst, converts it to parallel, and
743 AND-reduces per-beat equality against the same constant pattern as
744 `ChannelWindowedListRead`. The latched match flag is exposed via the
745 ``match`` MMIO region so the driver can verify the burst landed
757 chan = esi.ChannelService.from_host(AppID(
"data"), _window_probe_window)
758 s2p = ListWindowToParallel(_window_probe_window)(clk=clk,
761 par_ready = Wire(Bits(1))
762 par_window, par_valid = s2p.parallel_out.unwrap(par_ready)
763 par_struct = par_window.unwrap()
764 par_ready.assign(Bits(1)(1))
766 handshake = par_valid
767 last_bit = par_struct[
"last"].as_bits(1)
770 n_items = len(_WINDOW_PROBE_ITEMS)
771 idx_clr = (handshake & last_bit).as_bits(1)
772 idx_counter = Counter(2)(clk=clk,
776 instance_name=
"window_write_idx")
777 idx = idx_counter.out
779 expected_bits = Array(
780 Bits(32), len(_WINDOW_PROBE_ITEMS))(_WINDOW_PROBE_ITEMS)[idx.as_bits()]
782 tag_ok = (par_struct[
"tag"].as_bits(16) == Bits(16)(_WINDOW_PROBE_TAG))
783 item_ok = (par_struct[
"items"].as_bits(32) == expected_bits)
784 beat_ok = (tag_ok & item_ok).as_bits(1)
788 running_match = Wire(Bits(1))
789 running_match_reg = Reg(Bits(1),
794 name=
"window_match_running")
795 running_match.assign((running_match_reg & beat_ok).as_bits(1))
796 running_match_reg.assign(running_match)
798 final_match = Reg(Bits(1),
803 name=
"window_match_final")
804 final_match.assign(running_match)
807 mmio_bundle = esi.MMIO.read(appid=AppID(
"match"))
808 resp_chan = Wire(Channel(Bits(64)))
809 addr_chan = mmio_bundle.unpack(data=resp_chan)[
"offset"]
812 lambda _: final_match.as_bits(1).pad_or_truncate(64).as_bits(64)))
823 appid=AppID(
"typed_func_multi_arg_inst"))
826 appid=AppID(
"typed_func_void_arg_inst"))
829 appid=AppID(
"typed_func_void_result_inst"))
832 appid=AppID(
"call_service_callback_inst"))
835 appid=AppID(
"typed_read_channel_struct_inst"))
838 appid=AppID(
"typed_write_channel_byte_inst"))
841 appid=AppID(
"mmio_read_write_inst"))
844 appid=AppID(
"telemetry_metric_inst"))
847 appid=AppID(
"indexed_func_group_inst"))
852 appid=AppID(
"custom_service_decl_channel", 0))
854 appid=AppID(
"custom_service_decl_channel", 1))
857 appid=AppID(
"typed_func_struct_inst"))
860 appid=AppID(
"typed_func_nested_struct_inst"))
863 appid=AppID(
"typed_func_subbyte_signed_inst"))
866 appid=AppID(
"typed_func_array_result_inst"))
869 appid=AppID(
"typed_func_windowed_list_inst"))
872 appid=AppID(
"channel_windowed_list_read_inst"))
875 appid=AppID(
"channel_windowed_list_write_inst"))
878 appid=AppID(
"callback_windowed_list_inst"))
881if __name__ ==
"__main__":
882 bsp = get_bsp(sys.argv[2]
if len(sys.argv) > 2
else None)
883 s = System(
bsp(Top), name=
"TestCodegen", output_directory=sys.argv[1])
return wrap(CMemoryType::get(unwrap(ctx), baseType, numElements))