17def run(conn: AcceleratorConnection, platform: str =
"cosim") ->
None:
18 mmio = conn.get_service_mmio()
20 assert data == 0x207D98E5E5100E51
22 assert conn.sysinfo().esi_version() == 0
24 assert m.api_version == 0
28 sysinfo = conn.sysinfo()
29 cycle_count = sysinfo.cycle_count()
30 if cycle_count
is not None:
31 print(f
"Cycle count: {cycle_count}")
32 assert cycle_count > 0, f
"Cycle count should be positive, got {cycle_count}"
36 cycle_count2 = sysinfo.cycle_count()
37 print(f
"Cycle count after delay: {cycle_count2}")
38 assert cycle_count2 > cycle_count, \
39 f
"Cycle count should be monotonically increasing: {cycle_count2} <= {cycle_count}"
43 cycle_count3 = sysinfo.cycle_count()
44 print(f
"Cycle count after second delay: {cycle_count3}")
45 assert cycle_count3 > cycle_count2, \
46 f
"Cycle count should be monotonically increasing: {cycle_count3} <= {cycle_count2}"
48 print(
"Cycle count: not available")
50 clock_freq = sysinfo.core_clock_frequency()
51 print(f
"Clock frequency: {clock_freq} Hz")
52 if platform ==
"cosim":
53 assert clock_freq == 20_000_000, \
54 f
"Expected clock frequency 20_000_000 Hz for cosim, got {clock_freq}"
56 if clock_freq
is not None:
57 print(f
"Core clock frequency: {clock_freq} Hz")
58 assert clock_freq > 0, \
59 f
"Clock frequency should be positive, got {clock_freq}"
61 print(
"Core clock frequency: not available")
63 d = conn.build_accelerator()
65 mmio_svc: esi.accelerator.MMIO
66 for svc
in d.services:
67 if isinstance(svc, esi.accelerator.MMIO):
71 for id, region
in mmio_svc.regions.items():
72 print(f
"Region {id}: {region.base} - {region.base + region.size}")
74 assert len(mmio_svc.regions) == 5
80 def read_offset(mmio_x: MMIORegion, offset: int, add_amt: int):
81 data = mmio_x.read(offset)
82 if data == add_amt + offset:
83 print(f
"PASS: read_offset({offset}, {add_amt}) -> {data}")
85 assert False, f
"read_offset({offset}, {add_amt}) -> {data}"
87 mmio9 = d.ports[
esi.AppID(
"mmio_client", 9)]
88 read_offset(mmio9, 0, 9)
89 read_offset(mmio9, 13, 9)
91 mmio4 = d.ports[
esi.AppID(
"mmio_client", 4)]
92 read_offset(mmio4, 0, 4)
93 read_offset(mmio4, 13, 4)
95 mmio14 = d.ports[
esi.AppID(
"mmio_client", 14)]
96 read_offset(mmio14, 0, 14)
97 read_offset(mmio14, 13, 14)
103 mmio_rw = d.ports[
esi.AppID(
"mmio_rw_client")]
105 def read_offset_check(i: int, add_amt: int):
108 print(f
"PASS: read_offset_check({i}): {d}")
110 assert False, f
": read_offset_check({i}): {d}"
113 mmio_rw.write(8, add_amt)
114 read_offset_check(0, add_amt)
115 read_offset_check(12, add_amt)
116 read_offset_check(0x140, add_amt)
122 loopback = d.children[
esi.AppID(
"loopback")]
123 recv = loopback.ports[
esi.AppID(
"add")].read_port(
"result")
126 send = loopback.ports[
esi.AppID(
"add")].write_port(
"arg")
130 for mod_info
in m.module_infos:
131 if mod_info.name ==
"LoopbackInOutAdd":
132 loopback_info = mod_info
134 assert loopback_info
is not None
135 add_amt = mod_info.constants[
"add_amt"].value
141 callback = d.children[
esi.AppID(
"callback")]
142 cb_port = callback.ports[
esi.AppID(
"cb")]
143 cb_mmio = callback.ports[
esi.AppID(
"cmd")]
145 recv_data: Optional[int] =
None
147 def my_callback(data: int) -> int:
150 print(f
"Callback received data: {data}")
153 cb_port.connect(my_callback)
154 cb_mmio.write(0x10, 5)
155 while recv_data
is None:
157 assert recv_data == 5
168 print(f
"data: {data}")
169 print(f
"resp: {resp}")
170 assert resp == data + add_amt
174 nb_wr_start = time.time()
177 nb_timeout = nb_wr_start + 5
178 write_succeeded =
False
179 while time.time() < nb_timeout:
180 write_succeeded = send.try_write(data)
184 assert write_succeeded,
"Non-blocking write failed"
186 print(f
"data: {data}")
187 print(f
"resp: {resp}")
188 assert resp == data + add_amt
196 producer_bundle = d.ports[
esi.AppID(
"const_producer")]
197 producer = producer_bundle.read_port(
"data")
199 data = producer.read()
200 producer.disconnect()
201 print(f
"data: {data}")
228 print(
"Testing StructToWindowFunc...")
229 struct_to_window_bundle = d.ports[
esi.AppID(
"struct_to_window")]
232 struct_send = struct_to_window_bundle.write_port(
"arg")
233 struct_send.connect()
236 window_recv = struct_to_window_bundle.read_port(
"result")
237 window_recv.connect()
242 "a": bytearray([0x11, 0x11, 0x11, 0x11]),
243 "b": bytearray([0x22, 0x22, 0x22, 0x22]),
244 "c": bytearray([0x33, 0x33, 0x33, 0x33]),
245 "d": bytearray([0x44, 0x44, 0x44, 0x44])
249 struct_send.write(test_struct)
255 result = window_recv.read()
257 print(f
"Sent struct: {test_struct}")
258 print(f
"Received result: {result}")
261 assert result[
"a"] == test_struct[
262 "a"], f
"Field 'a' mismatch: {result['a']} != {test_struct['a']}"
263 assert result[
"b"] == test_struct[
264 "b"], f
"Field 'b' mismatch: {result['b']} != {test_struct['b']}"
265 assert result[
"c"] == test_struct[
266 "c"], f
"Field 'c' mismatch: {result['c']} != {test_struct['c']}"
267 assert result[
"d"] == test_struct[
268 "d"], f
"Field 'd' mismatch: {result['d']} != {test_struct['d']}"
270 print(
"PASS: StructToWindowFunc test passed")
274 "a": bytearray([0xEF, 0xBE, 0xAD, 0xDE]),
275 "b": bytearray([0xBE, 0xBA, 0xFE, 0xCA]),
276 "c": bytearray([0x78, 0x56, 0x34, 0x12]),
278 bytearray([0x21, 0x43, 0x65, 0x87])
280 struct_send.write(test_struct2)
281 result2 = window_recv.read()
283 print(f
"Sent struct: {test_struct2}")
284 print(f
"Received result: {result2}")
286 assert result2[
"a"] == test_struct2[
287 "a"], f
"Field 'a' mismatch: {result2['a']} != {test_struct2['a']}"
288 assert result2[
"b"] == test_struct2[
289 "b"], f
"Field 'b' mismatch: {result2['b']} != {test_struct2['b']}"
290 assert result2[
"c"] == test_struct2[
291 "c"], f
"Field 'c' mismatch: {result2['c']} != {test_struct2['c']}"
292 assert result2[
"d"] == test_struct2[
293 "d"], f
"Field 'd' mismatch: {result2['d']} != {test_struct2['d']}"
295 print(
"PASS: StructToWindowFunc test 2 passed")
297 struct_send.disconnect()
298 window_recv.disconnect()
304 print(
"Testing WindowToStructFunc...")
305 window_to_struct_bundle = d.ports[
esi.AppID(
"struct_from_window")]
308 window_send = window_to_struct_bundle.write_port(
"arg")
309 window_send.connect()
312 struct_recv = window_to_struct_bundle.read_port(
"result")
313 struct_recv.connect()
318 test_window_struct = {
319 "a": bytearray([0xAA, 0xAA, 0xAA, 0xAA]),
320 "b": bytearray([0xBB, 0xBB, 0xBB, 0xBB]),
321 "c": bytearray([0xCC, 0xCC, 0xCC, 0xCC]),
322 "d": bytearray([0xDD, 0xDD, 0xDD, 0xDD])
326 window_send.write(test_window_struct)
329 result = struct_recv.read()
331 print(f
"Sent windowed struct: {test_window_struct}")
332 print(f
"Received complete struct: {result}")
335 assert result[
"a"] == test_window_struct[
336 "a"], f
"Field 'a' mismatch: {result['a']} != {test_window_struct['a']}"
337 assert result[
"b"] == test_window_struct[
338 "b"], f
"Field 'b' mismatch: {result['b']} != {test_window_struct['b']}"
339 assert result[
"c"] == test_window_struct[
340 "c"], f
"Field 'c' mismatch: {result['c']} != {test_window_struct['c']}"
341 assert result[
"d"] == test_window_struct[
342 "d"], f
"Field 'd' mismatch: {result['d']} != {test_window_struct['d']}"
344 print(
"PASS: WindowToStructFunc test passed")
347 test_window_struct2 = {
348 "a": bytearray([0x01, 0x02, 0x03, 0x04]),
349 "b": bytearray([0x05, 0x06, 0x07, 0x08]),
350 "c": bytearray([0x09, 0x0A, 0x0B, 0x0C]),
351 "d": bytearray([0x0D, 0x0E, 0x0F, 0x10])
353 window_send.write(test_window_struct2)
354 result2 = struct_recv.read()
356 print(f
"Sent windowed struct: {test_window_struct2}")
357 print(f
"Received complete struct: {result2}")
359 assert result2[
"a"] == test_window_struct2[
360 "a"], f
"Field 'a' mismatch: {result2['a']} != {test_window_struct2['a']}"
361 assert result2[
"b"] == test_window_struct2[
362 "b"], f
"Field 'b' mismatch: {result2['b']} != {test_window_struct2['b']}"
363 assert result2[
"c"] == test_window_struct2[
364 "c"], f
"Field 'c' mismatch: {result2['c']} != {test_window_struct2['c']}"
365 assert result2[
"d"] == test_window_struct2[
366 "d"], f
"Field 'd' mismatch: {result2['d']} != {test_window_struct2['d']}"
368 print(
"PASS: WindowToStructFunc test 2 passed")
370 window_send.disconnect()
371 struct_recv.disconnect()
374@cosim_test(HW_DIR / "esi_test.py")
AcceleratorConnections, Accelerators, and Manifests must all share a context.