CIRCT 23.0.0git
Loading...
Searching...
No Matches
test_esi.py
Go to the documentation of this file.
1from __future__ import annotations
2
3import os
4from pathlib import Path
5import sys
6import time
7from typing import Optional
8
9import esiaccel as esi
10from esiaccel.accelerator import AcceleratorConnection
11from esiaccel.cosim.pytest import cosim_test
12from esiaccel.types import MMIORegion
13
14HW_DIR = Path(__file__).resolve().parent.parent / "hw"
15
16
17def run(conn: AcceleratorConnection, platform: str = "cosim") -> None:
18 mmio = conn.get_service_mmio()
19 data = mmio.read(8)
20 assert data == 0x207D98E5E5100E51
21
22 assert conn.sysinfo().esi_version() == 0
23 m = conn.manifest()
24 assert m.api_version == 0
25 print(m.type_table)
26
27 # Test the cycle count and clock frequency APIs
28 sysinfo = conn.sysinfo()
29 cycle_count = sysinfo.cycle_count()
30 if cycle_count is not None:
31 print(f"Cycle count: {cycle_count}")
32 assert cycle_count > 0, f"Cycle count should be positive, got {cycle_count}"
33
34 # Test that cycle count is monotonically increasing
35 time.sleep(0.01) # Small delay to let simulation advance
36 cycle_count2 = sysinfo.cycle_count()
37 print(f"Cycle count after delay: {cycle_count2}")
38 assert cycle_count2 > cycle_count, \
39 f"Cycle count should be monotonically increasing: {cycle_count2} <= {cycle_count}"
40
41 # Test again to ensure consistency
42 time.sleep(0.01)
43 cycle_count3 = sysinfo.cycle_count()
44 print(f"Cycle count after second delay: {cycle_count3}")
45 assert cycle_count3 > cycle_count2, \
46 f"Cycle count should be monotonically increasing: {cycle_count3} <= {cycle_count2}"
47 else:
48 print("Cycle count: not available")
49
50 clock_freq = sysinfo.core_clock_frequency()
51 print(f"Clock frequency: {clock_freq} Hz")
52 if platform == "cosim":
53 assert clock_freq == 20_000_000, \
54 f"Expected clock frequency 20_000_000 Hz for cosim, got {clock_freq}"
55 else:
56 if clock_freq is not None:
57 print(f"Core clock frequency: {clock_freq} Hz")
58 assert clock_freq > 0, \
59 f"Clock frequency should be positive, got {clock_freq}"
60 else:
61 print("Core clock frequency: not available")
62
63 d = conn.build_accelerator()
64
65 mmio_svc: esi.accelerator.MMIO
66 for svc in d.services:
67 if isinstance(svc, esi.accelerator.MMIO):
68 mmio_svc = svc
69 break
70
71 for id, region in mmio_svc.regions.items():
72 print(f"Region {id}: {region.base} - {region.base + region.size}")
73
74 assert len(mmio_svc.regions) == 5
75
76 ##############################################################################
77 # MMIOClient tests
78 ##############################################################################
79
80 def read_offset(mmio_x: MMIORegion, offset: int, add_amt: int):
81 data = mmio_x.read(offset)
82 if data == add_amt + offset:
83 print(f"PASS: read_offset({offset}, {add_amt}) -> {data}")
84 else:
85 assert False, f"read_offset({offset}, {add_amt}) -> {data}"
86
87 mmio9 = d.ports[esi.AppID("mmio_client", 9)]
88 read_offset(mmio9, 0, 9)
89 read_offset(mmio9, 13, 9)
90
91 mmio4 = d.ports[esi.AppID("mmio_client", 4)]
92 read_offset(mmio4, 0, 4)
93 read_offset(mmio4, 13, 4)
94
95 mmio14 = d.ports[esi.AppID("mmio_client", 14)]
96 read_offset(mmio14, 0, 14)
97 read_offset(mmio14, 13, 14)
98
99 ##############################################################################
100 # MMIOReadWriteClient tests
101 ##############################################################################
102
103 mmio_rw = d.ports[esi.AppID("mmio_rw_client")]
104
105 def read_offset_check(i: int, add_amt: int):
106 d = mmio_rw.read(i)
107 if d == i + add_amt:
108 print(f"PASS: read_offset_check({i}): {d}")
109 else:
110 assert False, f": read_offset_check({i}): {d}"
111
112 add_amt = 137
113 mmio_rw.write(8, add_amt)
114 read_offset_check(0, add_amt)
115 read_offset_check(12, add_amt)
116 read_offset_check(0x140, add_amt)
117
118 ##############################################################################
119 # Manifest tests
120 ##############################################################################
121
122 loopback = d.children[esi.AppID("loopback")]
123 recv = loopback.ports[esi.AppID("add")].read_port("result")
124 recv.connect()
125
126 send = loopback.ports[esi.AppID("add")].write_port("arg")
127 send.connect()
128
129 loopback_info = None
130 for mod_info in m.module_infos:
131 if mod_info.name == "LoopbackInOutAdd":
132 loopback_info = mod_info
133 break
134 assert loopback_info is not None
135 add_amt = mod_info.constants["add_amt"].value
136
137 ##############################################################################
138 # Callback tests
139 ##############################################################################
140
141 callback = d.children[esi.AppID("callback")]
142 cb_port = callback.ports[esi.AppID("cb")]
143 cb_mmio = callback.ports[esi.AppID("cmd")]
144
145 recv_data: Optional[int] = None
146
147 def my_callback(data: int) -> int:
148 nonlocal recv_data
149 recv_data = data
150 print(f"Callback received data: {data}")
151 return data + 7
152
153 cb_port.connect(my_callback)
154 cb_mmio.write(0x10, 5)
155 while recv_data is None:
156 time.sleep(0.25)
157 assert recv_data == 5
158
159 ##############################################################################
160 # Loopback add 7 tests
161 ##############################################################################
162
163 data = 10234
164 # Blocking write interface
165 send.write(data)
166 resp = recv.read()
167
168 print(f"data: {data}")
169 print(f"resp: {resp}")
170 assert resp == data + add_amt
171
172 # Non-blocking write interface
173 data = 10235
174 nb_wr_start = time.time()
175
176 # Timeout of 5 seconds
177 nb_timeout = nb_wr_start + 5
178 write_succeeded = False
179 while time.time() < nb_timeout:
180 write_succeeded = send.try_write(data)
181 if write_succeeded:
182 break
183
184 assert write_succeeded, "Non-blocking write failed"
185 resp = recv.read()
186 print(f"data: {data}")
187 print(f"resp: {resp}")
188 assert resp == data + add_amt
189
190 print("PASS")
191
192 ##############################################################################
193 # Const producer tests
194 ##############################################################################
195
196 producer_bundle = d.ports[esi.AppID("const_producer")]
197 producer = producer_bundle.read_port("data")
198 producer.connect()
199 data = producer.read()
200 producer.disconnect()
201 print(f"data: {data}")
202 assert data == 42
203
204 ##############################################################################
205 # Handshake JoinAddFunc tests
206 ##############################################################################
207
208 # Disabled test since the DC dialect flow is broken. Leaving the code here in
209 # case someone fixes it.
210
211 # a = d.ports[esi.AppID("join_a")].write_port("data")
212 # a.connect()
213 # b = d.ports[esi.AppID("join_b")].write_port("data")
214 # b.connect()
215 # x = d.ports[esi.AppID("join_x")].read_port("data")
216 # x.connect()
217
218 # a.write(15)
219 # b.write(24)
220 # xdata = x.read()
221 # print(f"join: {xdata}")
222 # assert xdata == 15 + 24
223
224 ##############################################################################
225 # StructToWindowFunc tests
226 ##############################################################################
227
228 print("Testing StructToWindowFunc...")
229 struct_to_window_bundle = d.ports[esi.AppID("struct_to_window")]
230
231 # Get the write port for sending the complete struct
232 struct_send = struct_to_window_bundle.write_port("arg")
233 struct_send.connect()
234
235 # Get the read port for receiving the windowed result
236 window_recv = struct_to_window_bundle.read_port("result")
237 window_recv.connect()
238
239 # Create test data - a struct with four 32-bit fields (as bytearrays,
240 # little-endian)
241 test_struct = {
242 "a": bytearray([0x11, 0x11, 0x11, 0x11]),
243 "b": bytearray([0x22, 0x22, 0x22, 0x22]),
244 "c": bytearray([0x33, 0x33, 0x33, 0x33]),
245 "d": bytearray([0x44, 0x44, 0x44, 0x44])
246 }
247
248 # Send the complete struct
249 struct_send.write(test_struct)
250
251 # The windowed result should arrive as two frames
252 # Frame 1 contains fields a and b
253 # Frame 2 contains fields c and d
254 # After translation, we should get back the complete struct
255 result = window_recv.read()
256
257 print(f"Sent struct: {test_struct}")
258 print(f"Received result: {result}")
259
260 # Verify the result matches the input
261 assert result["a"] == test_struct[
262 "a"], f"Field 'a' mismatch: {result['a']} != {test_struct['a']}"
263 assert result["b"] == test_struct[
264 "b"], f"Field 'b' mismatch: {result['b']} != {test_struct['b']}"
265 assert result["c"] == test_struct[
266 "c"], f"Field 'c' mismatch: {result['c']} != {test_struct['c']}"
267 assert result["d"] == test_struct[
268 "d"], f"Field 'd' mismatch: {result['d']} != {test_struct['d']}"
269
270 print("PASS: StructToWindowFunc test passed")
271
272 # Test with different values
273 test_struct2 = {
274 "a": bytearray([0xEF, 0xBE, 0xAD, 0xDE]), # 0xDEADBEEF little-endian
275 "b": bytearray([0xBE, 0xBA, 0xFE, 0xCA]), # 0xCAFEBABE little-endian
276 "c": bytearray([0x78, 0x56, 0x34, 0x12]), # 0x12345678 little-endian
277 "d":
278 bytearray([0x21, 0x43, 0x65, 0x87]) # 0x87654321 little-endian
279 }
280 struct_send.write(test_struct2)
281 result2 = window_recv.read()
282
283 print(f"Sent struct: {test_struct2}")
284 print(f"Received result: {result2}")
285
286 assert result2["a"] == test_struct2[
287 "a"], f"Field 'a' mismatch: {result2['a']} != {test_struct2['a']}"
288 assert result2["b"] == test_struct2[
289 "b"], f"Field 'b' mismatch: {result2['b']} != {test_struct2['b']}"
290 assert result2["c"] == test_struct2[
291 "c"], f"Field 'c' mismatch: {result2['c']} != {test_struct2['c']}"
292 assert result2["d"] == test_struct2[
293 "d"], f"Field 'd' mismatch: {result2['d']} != {test_struct2['d']}"
294
295 print("PASS: StructToWindowFunc test 2 passed")
296
297 struct_send.disconnect()
298 window_recv.disconnect()
299
300 ##############################################################################
301 # WindowToStructFunc tests
302 ##############################################################################
303
304 print("Testing WindowToStructFunc...")
305 window_to_struct_bundle = d.ports[esi.AppID("struct_from_window")]
306
307 # Get the write port for sending the windowed struct (two frames)
308 window_send = window_to_struct_bundle.write_port("arg")
309 window_send.connect()
310
311 # Get the read port for receiving the complete struct
312 struct_recv = window_to_struct_bundle.read_port("result")
313 struct_recv.connect()
314
315 # Create test data - a struct with four 32-bit fields (as bytearrays,
316 # little-endian). We'll send this as a windowed struct and expect to get it
317 # back as a complete struct.
318 test_window_struct = {
319 "a": bytearray([0xAA, 0xAA, 0xAA, 0xAA]),
320 "b": bytearray([0xBB, 0xBB, 0xBB, 0xBB]),
321 "c": bytearray([0xCC, 0xCC, 0xCC, 0xCC]),
322 "d": bytearray([0xDD, 0xDD, 0xDD, 0xDD])
323 }
324
325 # Send the windowed struct (the runtime will split it into two frames)
326 window_send.write(test_window_struct)
327
328 # Read the complete struct result
329 result = struct_recv.read()
330
331 print(f"Sent windowed struct: {test_window_struct}")
332 print(f"Received complete struct: {result}")
333
334 # Verify the result matches the input
335 assert result["a"] == test_window_struct[
336 "a"], f"Field 'a' mismatch: {result['a']} != {test_window_struct['a']}"
337 assert result["b"] == test_window_struct[
338 "b"], f"Field 'b' mismatch: {result['b']} != {test_window_struct['b']}"
339 assert result["c"] == test_window_struct[
340 "c"], f"Field 'c' mismatch: {result['c']} != {test_window_struct['c']}"
341 assert result["d"] == test_window_struct[
342 "d"], f"Field 'd' mismatch: {result['d']} != {test_window_struct['d']}"
343
344 print("PASS: WindowToStructFunc test passed")
345
346 # Test with different values
347 test_window_struct2 = {
348 "a": bytearray([0x01, 0x02, 0x03, 0x04]),
349 "b": bytearray([0x05, 0x06, 0x07, 0x08]),
350 "c": bytearray([0x09, 0x0A, 0x0B, 0x0C]),
351 "d": bytearray([0x0D, 0x0E, 0x0F, 0x10])
352 }
353 window_send.write(test_window_struct2)
354 result2 = struct_recv.read()
355
356 print(f"Sent windowed struct: {test_window_struct2}")
357 print(f"Received complete struct: {result2}")
358
359 assert result2["a"] == test_window_struct2[
360 "a"], f"Field 'a' mismatch: {result2['a']} != {test_window_struct2['a']}"
361 assert result2["b"] == test_window_struct2[
362 "b"], f"Field 'b' mismatch: {result2['b']} != {test_window_struct2['b']}"
363 assert result2["c"] == test_window_struct2[
364 "c"], f"Field 'c' mismatch: {result2['c']} != {test_window_struct2['c']}"
365 assert result2["d"] == test_window_struct2[
366 "d"], f"Field 'd' mismatch: {result2['d']} != {test_window_struct2['d']}"
367
368 print("PASS: WindowToStructFunc test 2 passed")
369
370 window_send.disconnect()
371 struct_recv.disconnect()
372
373
374@cosim_test(HW_DIR / "esi_test.py")
375def test_cosim_esi(conn: AcceleratorConnection) -> None:
376 run(conn)
377
378
379@cosim_test(HW_DIR / "esi_test.py")
380def test_cosim_esi_manifest_mmio(host: str, port: int) -> None:
381 os.environ["ESI_COSIM_MANIFEST_MMIO"] = "1"
382 conn = esi.connect("cosim", f"{host}:{port}")
383 run(conn)
384
385
386if __name__ == "__main__":
387 platform = sys.argv[1]
388 conn_str = sys.argv[2]
389 conn = esi.Context(esi.LogLevel.Debug).connect(platform, conn_str)
390 run(conn, platform)
static void print(TypedAttr val, llvm::raw_ostream &os)
static mlir::Operation * resolve(Context &context, mlir::SymbolRefAttr sym)
AcceleratorConnections, Accelerators, and Manifests must all share a context.
Definition Context.h:34
None test_cosim_esi(AcceleratorConnection conn)
Definition test_esi.py:375
None run(AcceleratorConnection conn, str platform="cosim")
Definition test_esi.py:17
None test_cosim_esi_manifest_mmio(str host, int port)
Definition test_esi.py:380