29 user_module: Type[Module],
30 dma_engine_pair: Optional[Tuple[Callable, Callable]] =
None,
32 """Wrap and return a cosimulation 'board support package' containing
35 class ESI_Cosim_UserTopWrapper(Module):
36 """Wrap the user module along with 'standard' service generators so that
37 those generators can issue their own service requests to be picked up by the
38 actual top-level catch-all cosim service generator."""
43 mmio = Input(esi.MMIO.read_write.type)
48 ChannelHostMemModule = ChannelHostMem(read_width=HostMemWidth,
49 write_width=HostMemWidth)
51 hostmem_read = ChannelHostMemModule.read
52 hostmem_write = ChannelHostMemModule.write
59 reset_request = Output(Bits(1))
63 user_module(clk=ports.clk, rst=ports.rst)
64 esi.TelemetryMMIO(esi.Telemetry,
69 if dma_engine_pair
is not None:
70 ChannelEngineService(dma_engine_pair[0], dma_engine_pair[1])(
81 ports.reset_request = mmio.reset_request
86 hostmem = ESI_Cosim_UserTopWrapper.ChannelHostMemModule(
91 ports.hostmem_read = hostmem.read
92 ports.hostmem_write = hostmem.write
94 class ESI_Cosim_Top(Module):
100 System.current().platform =
"cosim"
102 mmio_read_write = esi.FuncService.get(
103 esi.AppID(
"__cosim_mmio_read_write"), esi.MMIO.read_write.type)
109 design_reset = Wire(Bits(1))
110 reset_pending = Wire(Bits(1))
111 combined_rst = ports.rst | design_reset
123 mmio_resp = Wire(Channel(esi.MMIODataType))
124 host_cmd = mmio_read_write.unpack(data=mmio_resp)[
'cmd']
125 gate_ready = Wire(Bits(1))
126 cmd_payload, cmd_valid = host_cmd.unwrap(gate_ready)
127 fwd_valid = (cmd_valid & ~reset_pending).as_bits()
128 gated_cmd, fwd_ready = host_cmd.type.wrap(cmd_payload, fwd_valid)
129 gate_ready.assign((fwd_ready & ~reset_pending).as_bits())
130 gated_mmio, gated_froms = esi.MMIO.read_write.type.pack(cmd=gated_cmd)
131 mmio_resp.assign(gated_froms[
'data'])
133 wrapper = ESI_Cosim_UserTopWrapper(clk=ports.clk,
136 reset_controller = DesignResetController(ResetCycles)(
137 clk=ports.clk, rst=ports.rst, reset_request=wrapper.reset_request)
138 design_reset.assign(reset_controller.design_reset)
139 reset_pending.assign(reset_controller.reset_pending)
149 def drop_while_resetting(chan: ChannelSignal) -> ChannelSignal:
150 src_ready = Wire(Bits(1))
151 payload, valid = chan.unwrap(src_ready)
152 fwd_valid = (valid & ~reset_pending).as_bits()
153 gated, dn_ready = chan.type.wrap(payload, fwd_valid)
154 src_ready.assign((dn_ready | reset_pending).as_bits())
157 resp_channel = esi.ChannelService.from_host(
161 (
"data", Bits(ESI_Cosim_UserTopWrapper.HostMemWidth)),
163 req = wrapper.hostmem_read.unpack(
164 resp=drop_while_resetting(resp_channel))[
'req']
165 esi.ChannelService.to_host(
esi.AppID(
"__cosim_hostmem_read_req"), req)
167 ack_wire = Wire(Channel(UInt(8)))
168 write_req = wrapper.hostmem_write.unpack(ackTag=ack_wire)[
'req']
169 ack_tag = esi.CallService.call(
esi.AppID(
"__cosim_hostmem_write"),
171 ack_wire.assign(drop_while_resetting(ack_tag))
173 cosim_svc = raw_esi.ServiceInstanceOp(
175 appID=AppID(
"cosim")._appid,
177 impl_type=ir.StringAttr.get(
"cosim"),
178 inputs=[ports.clk.value, ports.rst.value],
180 core_freq = System.current().core_freq
181 if core_freq
is not None:
182 cosim_svc.operation.attributes[
183 "esi.core_clock_frequency_hz"] = ir.IntegerAttr.get(
184 ir.IntegerType.get_unsigned(64), core_freq)