CIRCT 23.0.0git
Loading...
Searching...
No Matches
types.py
Go to the documentation of this file.
1# ===-----------------------------------------------------------------------===#
2# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
3# See https://llvm.org/LICENSE.txt for license information.
4# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
5# ===-----------------------------------------------------------------------===#
6#
7# The structure of the Python classes and hierarchy roughly mirrors the C++
8# side, but wraps the C++ objects. The wrapper classes sometimes add convenience
9# functionality and serve to return wrapped versions of the returned objects.
10#
11# ===-----------------------------------------------------------------------===#
12
13from __future__ import annotations
14
15from . import esiCppAccel as cpp
16
17from typing import TYPE_CHECKING
18
19if TYPE_CHECKING:
20 from .accelerator import HWModule
21
22from concurrent.futures import Future
23from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple, Type, Union
24import sys
25import traceback
26
27
28def _get_esi_type(cpp_type: cpp.Type):
29 """Get the wrapper class for a C++ type."""
30 for cpp_type_cls, wrapper_cls in __esi_mapping.items():
31 if isinstance(cpp_type, cpp_type_cls):
32 return wrapper_cls.wrap_cpp(cpp_type)
33 return ESIType.wrap_cpp(cpp_type)
34
35
36# Mapping from C++ types to wrapper classes
37__esi_mapping: Dict[Type, Type] = {}
38
39
40class ESIType:
41
42 def __init__(self, id: str):
43 self._init_from_cpp(cpp.Type(id))
44
45 @classmethod
46 def wrap_cpp(cls, cpp_type: cpp.Type):
47 """Wrap a C++ ESI type with its corresponding Python ESI Type."""
48 instance = cls.__new__(cls)
49 instance._init_from_cpp(cpp_type)
50 return instance
51
52 def _init_from_cpp(self, cpp_type: cpp.Type):
53 """Initialize instance attributes from a C++ type object."""
54 self.cpp_type = cpp_type
55
56 @property
57 def id(self) -> str:
58 """Get the stable id of this type."""
59 return self.cpp_type.id
60
61 @property
62 def supports_host(self) -> Tuple[bool, Optional[str]]:
63 """Does this type support host communication via Python? Returns either
64 '(True, None)' if it is, or '(False, reason)' if it is not."""
65
66 if self.bit_width >= 0 and self.bit_width % 8 != 0:
67 return (False, "runtime only supports types with multiple of 8 bits")
68 return (True, None)
69
70 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
71 """Is a Python object compatible with HW type? Returns either '(True,
72 None)' if it is, or '(False, reason)' if it is not."""
73 assert False, "unimplemented"
74
75 @property
76 def bit_width(self) -> int:
77 """Size of this type, in bits. Negative for unbounded types."""
78 assert False, "unimplemented"
79
80 @property
81 def max_size(self) -> int:
82 """Maximum size of a value of this type, in bytes."""
83 bitwidth = int((self.bit_width + 7) / 8)
84 if bitwidth < 0:
85 return bitwidth
86 return bitwidth
87
88 def serialize(self, obj) -> bytearray:
89 """Convert a Python object to a bytearray."""
90 assert False, "unimplemented"
91
92 def deserialize(self, data: bytearray) -> Tuple[object, bytearray]:
93 """Convert a bytearray to a Python object. Return the object and the
94 leftover bytes."""
95 assert False, "unimplemented"
96
97 def __hash__(self) -> int:
98 return hash(self.idid)
99
100 def __eq__(self, other) -> bool:
101 return isinstance(other, ESIType) and self.idid == other.id
102
103 def __str__(self) -> str:
104 return str(self.cpp_type)
105
106
108
109 def __init__(self, id: str, inner: "ESIType"):
110 self._init_from_cpp_init_from_cpp(cpp.ChannelType(id, inner.cpp_type))
111
112 def _init_from_cpp(self, cpp_type: cpp.ChannelType):
113 super()._init_from_cpp(cpp_type)
114 self.inner_type = _get_esi_type(cpp_type.inner)
115
116 @property
117 def bit_width(self) -> int:
118 return self.inner_type.bit_width
119
120 @property
121 def inner(self) -> "ESIType":
122 return self.inner_type
123
124 @property
125 def supports_host(self) -> Tuple[bool, Optional[str]]:
126 return self.inner_type.supports_host
127
128 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
129 return self.inner_type.is_valid(obj)
130
131 def serialize(self, obj) -> bytearray:
132 return self.inner_type.serialize(obj)
133
134 def deserialize(self, data: bytearray) -> Tuple[object, bytearray]:
135 return self.inner_type.deserialize(data)
136
137
138__esi_mapping[cpp.ChannelType] = ChannelType
139
140
142
143 class Channel(NamedTuple):
144 name: str
145 direction: cpp.BundleType.Direction
146 type: "ESIType"
147
148 def __init__(self, id: str, channels: List[Channel]):
149 cpp_channels = [(name, direction, channel_type.cpp_type)
150 for name, direction, channel_type in channels]
151 self._init_from_cpp_init_from_cpp(cpp.BundleType(id, cpp_channels))
152
153 def _init_from_cpp(self, cpp_type: cpp.BundleType):
154 super()._init_from_cpp(cpp_type)
155 self._channels = [
156 BundleType.Channel(name, direction, _get_esi_type(channel_type))
157 for name, direction, channel_type in cpp_type.channels
158 ]
159
160 @property
161 def channels(self) -> List["BundleType.Channel"]:
162 return self._channels
163
164
165__esi_mapping[cpp.BundleType] = BundleType
166
167
169
170 def __init__(self, id: str):
171 self._init_from_cpp(cpp.VoidType(id))
172
173 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
174 if obj is not None:
175 return (False, f"void type cannot must represented by None, not {obj}")
176 return (True, None)
177
178 @property
179 def bit_width(self) -> int:
180 return 8
181
182 def serialize(self, obj) -> bytearray:
183 # By convention, void is represented by a single byte of value 0.
184 return bytearray([0])
185
186 def deserialize(self, data: bytearray) -> Tuple[object, bytearray]:
187 if len(data) == 0:
188 raise ValueError(f"void type cannot be represented by {data}")
189 return (None, data[1:])
190
191
192__esi_mapping[cpp.VoidType] = VoidType
193
194
196
197 def __init__(self, id: str):
198 self._init_from_cpp(cpp.AnyType(id))
199
200 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
201 return (False, "any type is not supported for host communication")
202
203 @property
204 def bit_width(self) -> int:
205 return -1
206
207 def serialize(self, obj) -> bytearray:
208 raise ValueError("any type cannot be serialized")
209
210 def deserialize(self, data: bytearray) -> Tuple[object, bytearray]:
211 raise ValueError("any type cannot be deserialized")
212
213
214__esi_mapping[cpp.AnyType] = AnyType
215
216
218
219 def __init__(self, id: str, width: int):
220 self._init_from_cpp(cpp.BitsType(id, width))
221
222 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
223 if not isinstance(obj, (bytearray, bytes, list)):
224 return (False, f"invalid type: {type(obj)}")
225 if isinstance(obj, list) and not all(
226 [isinstance(b, int) and b.bit_length() <= 8 for b in obj]):
227 return (False, f"list item too large: {obj}")
228 if len(obj) != self.max_size:
229 return (False, f"wrong size: {len(obj)}")
230 return (True, None)
231
232 @property
233 def bit_width(self) -> int:
234 return self.cpp_type.width
235
236 def serialize(self, obj: Union[bytearray, bytes, List[int]]) -> bytearray:
237 if isinstance(obj, bytearray):
238 return obj
239 if isinstance(obj, bytes) or isinstance(obj, list):
240 return bytearray(obj)
241 raise ValueError(f"cannot convert {obj} to bytearray")
242
243 def deserialize(self, data: bytearray) -> Tuple[bytearray, bytearray]:
244 return (data[0:self.max_size], data[self.max_size:])
245
246
247__esi_mapping[cpp.BitsType] = BitsType
248
249
251
252 def __init__(self, id: str, width: int):
253 self._init_from_cpp(cpp.IntegerType(id, width))
254
255 @property
256 def bit_width(self) -> int:
257 return self.cpp_type.width
258
259
261
262 def __init__(self, id: str, width: int):
263 self._init_from_cpp(cpp.UIntType(id, width))
264
265 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
266 if not isinstance(obj, int):
267 return (False, f"must be an int, not {type(obj)}")
268 if obj < 0 or obj.bit_length() > self.bit_widthbit_width:
269 return (False, f"out of range: {obj}")
270 return (True, None)
271
272 def __str__(self) -> str:
273 return f"uint{self.bit_width}"
274
275 def serialize(self, obj: int) -> bytearray:
276 return bytearray(int.to_bytes(obj, self.max_sizemax_size, "little"))
277
278 def deserialize(self, data: bytearray) -> Tuple[int, bytearray]:
279 return (int.from_bytes(data[0:self.max_sizemax_size],
280 "little"), data[self.max_sizemax_size:])
281
282
283__esi_mapping[cpp.UIntType] = UIntType
284
285
287
288 def __init__(self, id: str, width: int):
289 self._init_from_cpp(cpp.SIntType(id, width))
290
291 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
292 if not isinstance(obj, int):
293 return (False, f"must be an int, not {type(obj)}")
294 if obj < 0:
295 if (-1 * obj) > 2**(self.bit_widthbit_width - 1):
296 return (False, f"out of range: {obj}")
297 elif obj < 0:
298 if obj >= 2**(self.bit_widthbit_width - 1) - 1:
299 return (False, f"out of range: {obj}")
300 return (True, None)
301
302 def __str__(self) -> str:
303 return f"sint{self.bit_width}"
304
305 def serialize(self, obj: int) -> bytearray:
306 return bytearray(int.to_bytes(obj, self.max_sizemax_size, "little", signed=True))
307
308 def deserialize(self, data: bytearray) -> Tuple[int, bytearray]:
309 return (int.from_bytes(data[0:self.max_sizemax_size], "little",
310 signed=True), data[self.max_sizemax_size:])
311
312
313__esi_mapping[cpp.SIntType] = SIntType
314
315
317
318 def __init__(self, id: str, fields: List[Tuple[str, "ESIType"]]):
319 # Convert Python ESIType fields to cpp Type fields
320 cpp_fields = [(name, field_type.cpp_type) for name, field_type in fields]
321 self._init_from_cpp_init_from_cpp(cpp.StructType(id, cpp_fields))
322
323 def _init_from_cpp(self, cpp_type: cpp.StructType):
324 """Initialize instance attributes from a C++ type object."""
325 super()._init_from_cpp(cpp_type)
326 # For wrap_cpp path, we need to convert C++ fields back to Python
327 self.fields = [(name, _get_esi_type(ty)) for (name, ty) in cpp_type.fields]
328
329 @property
330 def bit_width(self) -> int:
331 widths = [ty.bit_width for (_, ty) in self.fields]
332 if any([w < 0 for w in widths]):
333 return -1
334 return sum(widths)
335
336 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
337 fields_count = 0
338 if not isinstance(obj, dict):
339 if not hasattr(obj, "__dict__"):
340 return (False, "must be a dict or have __dict__ attribute")
341 obj = obj.__dict__
342
343 for (fname, ftype) in self.fields:
344 if fname not in obj:
345 return (False, f"missing field '{fname}'")
346 fvalid, reason = ftype.is_valid(obj[fname])
347 if not fvalid:
348 return (False, f"invalid field '{fname}': {reason}")
349 fields_count += 1
350 if fields_count != len(obj):
351 return (False, "missing fields")
352 return (True, None)
353
354 def serialize(self, obj) -> bytearray:
355 ret = bytearray()
356 if not isinstance(obj, dict):
357 obj = obj.__dict__
358 ordered_fields = reversed(
359 self.fields) if self.cpp_type.reverse else self.fields
360 for (fname, ftype) in ordered_fields:
361 fval = obj[fname]
362 ret.extend(ftype.serialize(fval))
363 return ret
364
365 def deserialize(self, data: bytearray) -> Tuple[Dict[str, Any], bytearray]:
366 ret = {}
367 ordered_fields = reversed(
368 self.fields) if self.cpp_type.reverse else self.fields
369 for (fname, ftype) in ordered_fields:
370 (fval, data) = ftype.deserialize(data)
371 ret[fname] = fval
372 return (ret, data)
373
374
375__esi_mapping[cpp.StructType] = StructType
376
377
379
380 def __init__(self, id: str, element_type: "ESIType", size: int):
381 self._init_from_cpp_init_from_cpp(cpp.ArrayType(id, element_type.cpp_type, size))
382
383 def _init_from_cpp(self, cpp_type: cpp.ArrayType):
384 """Initialize instance attributes from a C++ type object."""
385 super()._init_from_cpp(cpp_type)
386 self.element_type = _get_esi_type(cpp_type.element)
387 self.size = cpp_type.size
388
389 @property
390 def bit_width(self) -> int:
391 return self.element_type.bit_width * self.size
392
393 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
394 if not isinstance(obj, list):
395 return (False, f"must be a list, not {type(obj)}")
396 if len(obj) != self.size:
397 return (False, f"wrong size: expected {self.size} not {len(obj)}")
398 for (idx, e) in enumerate(obj):
399 evalid, reason = self.element_type.is_valid(e)
400 if not evalid:
401 return (False, f"invalid element {idx}: {reason}")
402 return (True, None)
403
404 def serialize(self, lst: list) -> bytearray:
405 ret = bytearray()
406 for e in reversed(lst):
407 ret.extend(self.element_type.serialize(e))
408 return ret
409
410 def deserialize(self, data: bytearray) -> Tuple[List[Any], bytearray]:
411 ret = []
412 for _ in range(self.size):
413 (obj, data) = self.element_type.deserialize(data)
414 ret.append(obj)
415 ret.reverse()
416 return (ret, data)
417
418
419__esi_mapping[cpp.ArrayType] = ArrayType
420
421
423
424 def __init__(self, id: str, name: str, inner_type: "ESIType"):
425 self._init_from_cpp_init_from_cpp(cpp.TypeAliasType(id, name, inner_type.cpp_type))
426
427 def _init_from_cpp(self, cpp_type: cpp.TypeAliasType):
428 super()._init_from_cpp(cpp_type)
429 self.name = cpp_type.name
430 self.inner_type = _get_esi_type(cpp_type.inner)
431
432 @property
433 def bit_width(self) -> int:
434 return self.inner_type.bit_width
435
436 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
437 return self.inner_type.is_valid(obj)
438
439 def serialize(self, obj) -> bytearray:
440 return self.inner_type.serialize(obj)
441
442 def deserialize(self, data: bytearray) -> Tuple[object, bytearray]:
443 return self.inner_type.deserialize(data)
444
445 def __str__(self) -> str:
446 return self.name
447
448
449__esi_mapping[cpp.TypeAliasType] = TypeAlias
450
451
452class Port:
453 """A unidirectional communication channel. This is the basic communication
454 method with an accelerator."""
455
456 def __init__(self, owner: BundlePort, cpp_port: cpp.ChannelPort):
457 self.owner = owner
458 self.cpp_port = cpp_port
459 self.type = _get_esi_type(cpp_port.type)
460
461 def connect(self, buffer_size: Optional[int] = None):
462 (supports_host, reason) = self.type.supports_host
463 if not supports_host:
464 raise TypeError(f"unsupported type: {reason}")
465
466 opts = cpp.ConnectOptions()
467 opts.buffer_size = buffer_size
468 self.cpp_port.connect(opts)
469 return self
470
471 def disconnect(self):
472 self.cpp_port.disconnect()
473
474
476 """A unidirectional communication channel from the host to the accelerator."""
477
478 def __init__(self, owner: BundlePort, cpp_port: cpp.WriteChannelPort):
479 super().__init__(owner, cpp_port)
480 self.cpp_port: cpp.WriteChannelPort = cpp_port
481
482 def __serialize_msg(self, msg=None) -> bytearray:
483 valid, reason = self.type.is_valid(msg)
484 if not valid:
485 raise ValueError(
486 f"'{msg}' cannot be converted to '{self.type}': {reason}")
487 msg_bytes: bytearray = self.type.serialize(msg)
488 return msg_bytes
489
490 def write(self, msg=None) -> bool:
491 """Write a typed message to the channel. Attempts to serialize 'msg' to what
492 the accelerator expects, but will fail if the object is not convertible to
493 the port type."""
494 self.cpp_port.write(self.__serialize_msg(msg))
495 return True
496
497 def try_write(self, msg=None) -> bool:
498 """Like 'write', but uses the non-blocking tryWrite method of the underlying
499 port. Returns True if the write was successful, False otherwise."""
500 return self.cpp_port.tryWrite(self.__serialize_msg(msg))
501
502
504 """A unidirectional communication channel from the accelerator to the host."""
505
506 def __init__(self, owner: BundlePort, cpp_port: cpp.ReadChannelPort):
507 super().__init__(owner, cpp_port)
508 self.cpp_port: cpp.ReadChannelPort = cpp_port
509
510 def read(self) -> object:
511 """Read a typed message from the channel. Returns a deserialized object of a
512 type defined by the port type."""
513
514 buffer = self.cpp_port.read()
515 (msg, leftover) = self.type.deserialize(buffer)
516 if len(leftover) != 0:
517 raise ValueError(f"leftover bytes: {leftover}")
518 return msg
519
520
522 """A collections of named, unidirectional communication channels."""
523
524 # When creating a new port, we need to determine if it is a service port and
525 # instantiate it correctly.
526 def __new__(cls, owner: HWModule, cpp_port: cpp.BundlePort):
527 # TODO: add a proper registration mechanism for service ports.
528 if isinstance(cpp_port, cpp.Function):
529 return super().__new__(FunctionPort)
530 if isinstance(cpp_port, cpp.Callback):
531 return super().__new__(CallbackPort)
532 if isinstance(cpp_port, cpp.MMIORegion):
533 return super().__new__(MMIORegion)
534 if isinstance(cpp_port, cpp.Metric):
535 return super().__new__(MetricPort)
536 if isinstance(cpp_port, cpp.ToHostChannel):
537 return super().__new__(ToHostPort)
538 if isinstance(cpp_port, cpp.FromHostChannel):
539 return super().__new__(FromHostPort)
540 return super().__new__(cls)
541
542 def __init__(self, owner: HWModule, cpp_port: cpp.BundlePort):
543 self.owner = owner
544 self.cpp_port = cpp_port
545
546 def write_port(self, channel_name: str) -> WritePort:
547 return WritePort(self, self.cpp_port.getWrite(channel_name))
548
549 def read_port(self, channel_name: str) -> ReadPort:
550 return ReadPort(self, self.cpp_port.getRead(channel_name))
551
552
553class MessageFuture(Future):
554 """A specialization of `Future` for ESI messages. Wraps the cpp object and
555 deserializes the result. Hopefully overrides all the methods necessary for
556 proper operation, which is assumed to be not all of them."""
557
558 def __init__(self, result_type: Type, cpp_future: cpp.MessageDataFuture):
559 self.result_type = result_type
560 self.cpp_future = cpp_future
561
562 def running(self) -> bool:
563 return True
564
565 def done(self) -> bool:
566 return self.cpp_future.valid()
567
568 def result(self, timeout: Optional[Union[int, float]] = None) -> Any:
569 # TODO: respect timeout
570 self.cpp_future.wait()
571 result_bytes = self.cpp_future.get()
572 (msg, leftover) = self.result_type.deserialize(result_bytes)
573 if len(leftover) != 0:
574 raise ValueError(f"leftover bytes: {leftover}")
575 return msg
576
577 def add_done_callback(self, fn: Callable[[Future], object]) -> None:
578 raise NotImplementedError("add_done_callback is not implemented")
579
580
582 """A region of memory-mapped I/O space. This is a collection of named
583 channels, which are either read or read-write. The channels are accessed
584 by name, and can be connected to the host."""
585
586 def __init__(self, owner: HWModule, cpp_port: cpp.MMIORegion):
587 super().__init__(owner, cpp_port)
588 self.region = cpp_port
589
590 @property
591 def descriptor(self) -> cpp.MMIORegionDesc:
592 return self.region.descriptor
593
594 def read(self, offset: int) -> bytearray:
595 """Read a value from the MMIO region at the given offset."""
596 return self.region.read(offset)
597
598 def write(self, offset: int, data: bytearray) -> None:
599 """Write a value to the MMIO region at the given offset."""
600 self.region.write(offset, data)
601
602
604 """A pair of channels which carry the input and output of a function."""
605
606 def __init__(self, owner: HWModule, cpp_port: cpp.BundlePort):
607 super().__init__(owner, cpp_port)
608 self.arg_type = self.write_port("arg").type
609 self.result_type = self.read_port("result").type
610 self.connected = False
611
612 def connect(self):
613 self.cpp_port.connect()
614 self.connected = True
615
616 def call(self, *args: Any, **kwargs: Any) -> Future:
617 """Call the function with the given argument and returns a future of the
618 result."""
619
620 # Accept either positional or keyword arguments, but not both.
621 if len(args) > 0 and len(kwargs) > 0:
622 raise ValueError("cannot use both positional and keyword arguments")
623
624 # Handle arguments: for single positional arg, unwrap it from tuple
625 if len(args) == 1:
626 selected = args[0]
627 elif len(args) > 1:
628 selected = args
629 else:
630 selected = kwargs
631
632 valid, reason = self.arg_type.is_valid(selected)
633 if not valid:
634 raise ValueError(
635 f"'{selected}' cannot be converted to '{self.arg_type}': {reason}")
636 arg_bytes: bytearray = self.arg_type.serialize(selected)
637 cpp_future = self.cpp_port.call(arg_bytes)
638 return MessageFuture(self.result_type, cpp_future)
639
640 def __call__(self, *args: Any, **kwds: Any) -> Future:
641 return self.call(*args, **kwds)
642
643
645 """Callback ports are the inverse of function ports -- instead of calls to the
646 accelerator, they get called from the accelerator. Specify the function which
647 you'd like the accelerator to call when you call `connect`."""
648
649 def __init__(self, owner: HWModule, cpp_port: cpp.BundlePort):
650 super().__init__(owner, cpp_port)
651 self.arg_type = self.read_port("arg").type
652 self.result_type = self.write_port("result").type
653 self.connected = False
654
655 def connect(self, cb: Callable[[Any], Any]):
656
657 def type_convert_wrapper(cb: Callable[[Any], Any],
658 msg: bytearray) -> Optional[bytearray]:
659 try:
660 (obj, leftover) = self.arg_type.deserialize(msg)
661 if len(leftover) != 0:
662 raise ValueError(f"leftover bytes: {leftover}")
663 result = cb(obj)
664 if result is None:
665 return None
666 return self.result_type.serialize(result)
667 except Exception as e:
668 traceback.print_exception(e)
669 return None
670
671 self.cpp_port.connect(lambda x: type_convert_wrapper(cb=cb, msg=x))
672 self.connected = True
673
674
676 """Telemetry ports report an individual piece of information from the
677 acceelerator. The method of accessing telemetry will likely change in the
678 future."""
679
680 def __init__(self, owner: HWModule, cpp_port: cpp.BundlePort):
681 super().__init__(owner, cpp_port)
682 self.connected = False
683
684 def connect(self):
685 self.cpp_port.connect()
686 self.connected = True
687
688 def read(self) -> Future:
689 cpp_future = self.cpp_port.read()
690 return MessageFuture(self.cpp_port.type, cpp_future)
691
692
694 """A channel which reads data from the accelerator (to_host)."""
695
696 def __init__(self, owner: HWModule, cpp_port: cpp.BundlePort):
697 super().__init__(owner, cpp_port)
698 self.data_type = self.read_port("data").type
699 self.connected = False
700
701 def connect(self):
702 self.cpp_port.connect()
703 self.connected = True
704
705 def read(self) -> Future:
706 """Read a value from the channel. Returns a future."""
707 cpp_future = self.cpp_port.read()
708 return MessageFuture(self.data_type, cpp_future)
709
710
712 """A channel which writes data to the accelerator (from_host)."""
713
714 def __init__(self, owner: HWModule, cpp_port: cpp.BundlePort):
715 super().__init__(owner, cpp_port)
716 self.data_type = self.write_port("data").type
717 self.connected = False
718
719 def connect(self):
720 self.cpp_port.connect()
721 self.connected = True
722
723 def write(self, data: Any) -> None:
724 """Write a value to the channel."""
725 valid, reason = self.data_type.is_valid(data)
726 if not valid:
727 raise ValueError(
728 f"'{data}' cannot be converted to '{self.data_type}': {reason}")
729 self.cpp_port.write(self.data_type.serialize(data))
__init__(self, str id)
Definition types.py:197
int bit_width(self)
Definition types.py:204
Tuple[object, bytearray] deserialize(self, bytearray data)
Definition types.py:210
bytearray serialize(self, obj)
Definition types.py:207
Tuple[bool, Optional[str]] is_valid(self, obj)
Definition types.py:200
Tuple[bool, Optional[str]] is_valid(self, obj)
Definition types.py:393
_init_from_cpp(self, cpp.ArrayType cpp_type)
Definition types.py:383
Tuple[List[Any], bytearray] deserialize(self, bytearray data)
Definition types.py:410
__init__(self, str id, "ESIType" element_type, int size)
Definition types.py:380
bytearray serialize(self, list lst)
Definition types.py:404
bytearray serialize(self, Union[bytearray, bytes, List[int]] obj)
Definition types.py:236
int bit_width(self)
Definition types.py:233
Tuple[bytearray, bytearray] deserialize(self, bytearray data)
Definition types.py:243
Tuple[bool, Optional[str]] is_valid(self, obj)
Definition types.py:222
__init__(self, str id, int width)
Definition types.py:219
__new__(cls, HWModule owner, cpp.BundlePort cpp_port)
Definition types.py:526
WritePort write_port(self, str channel_name)
Definition types.py:546
ReadPort read_port(self, str channel_name)
Definition types.py:549
__init__(self, HWModule owner, cpp.BundlePort cpp_port)
Definition types.py:542
List["BundleType.Channel"] channels(self)
Definition types.py:161
_init_from_cpp(self, cpp.BundleType cpp_type)
Definition types.py:153
__init__(self, str id, List[Channel] channels)
Definition types.py:148
__init__(self, HWModule owner, cpp.BundlePort cpp_port)
Definition types.py:649
connect(self, Callable[[Any], Any] cb)
Definition types.py:655
bytearray serialize(self, obj)
Definition types.py:131
_init_from_cpp(self, cpp.ChannelType cpp_type)
Definition types.py:112
Tuple[bool, Optional[str]] is_valid(self, obj)
Definition types.py:128
Tuple[object, bytearray] deserialize(self, bytearray data)
Definition types.py:134
__init__(self, str id, "ESIType" inner)
Definition types.py:109
Tuple[bool, Optional[str]] supports_host(self)
Definition types.py:125
"ESIType" inner(self)
Definition types.py:121
int bit_width(self)
Definition types.py:76
Tuple[bool, Optional[str]] supports_host(self)
Definition types.py:62
int __hash__(self)
Definition types.py:97
Tuple[object, bytearray] deserialize(self, bytearray data)
Definition types.py:92
_init_from_cpp(self, cpp.Type cpp_type)
Definition types.py:52
__init__(self, str id)
Definition types.py:42
Tuple[bool, Optional[str]] is_valid(self, obj)
Definition types.py:70
bool __eq__(self, other)
Definition types.py:100
int max_size(self)
Definition types.py:81
wrap_cpp(cls, cpp.Type cpp_type)
Definition types.py:46
str __str__(self)
Definition types.py:103
bytearray serialize(self, obj)
Definition types.py:88
__init__(self, HWModule owner, cpp.BundlePort cpp_port)
Definition types.py:714
None write(self, Any data)
Definition types.py:723
Future call(self, *Any args, **Any kwargs)
Definition types.py:616
__init__(self, HWModule owner, cpp.BundlePort cpp_port)
Definition types.py:606
Future __call__(self, *Any args, **Any kwds)
Definition types.py:640
int bit_width(self)
Definition types.py:256
__init__(self, str id, int width)
Definition types.py:252
None write(self, int offset, bytearray data)
Definition types.py:598
bytearray read(self, int offset)
Definition types.py:594
__init__(self, HWModule owner, cpp.MMIORegion cpp_port)
Definition types.py:586
cpp.MMIORegionDesc descriptor(self)
Definition types.py:591
Any result(self, Optional[Union[int, float]] timeout=None)
Definition types.py:568
__init__(self, Type result_type, cpp.MessageDataFuture cpp_future)
Definition types.py:558
None add_done_callback(self, Callable[[Future], object] fn)
Definition types.py:577
Future read(self)
Definition types.py:688
__init__(self, HWModule owner, cpp.BundlePort cpp_port)
Definition types.py:680
__init__(self, BundlePort owner, cpp.ChannelPort cpp_port)
Definition types.py:456
connect(self, Optional[int] buffer_size=None)
Definition types.py:461
__init__(self, BundlePort owner, cpp.ReadChannelPort cpp_port)
Definition types.py:506
object read(self)
Definition types.py:510
Tuple[int, bytearray] deserialize(self, bytearray data)
Definition types.py:308
Tuple[bool, Optional[str]] is_valid(self, obj)
Definition types.py:291
bytearray serialize(self, int obj)
Definition types.py:305
__init__(self, str id, int width)
Definition types.py:288
__init__(self, str id, List[Tuple[str, "ESIType"]] fields)
Definition types.py:318
bytearray serialize(self, obj)
Definition types.py:354
Tuple[Dict[str, Any], bytearray] deserialize(self, bytearray data)
Definition types.py:365
Tuple[bool, Optional[str]] is_valid(self, obj)
Definition types.py:336
_init_from_cpp(self, cpp.StructType cpp_type)
Definition types.py:323
Future read(self)
Definition types.py:705
__init__(self, HWModule owner, cpp.BundlePort cpp_port)
Definition types.py:696
Tuple[object, bytearray] deserialize(self, bytearray data)
Definition types.py:442
_init_from_cpp(self, cpp.TypeAliasType cpp_type)
Definition types.py:427
Tuple[bool, Optional[str]] is_valid(self, obj)
Definition types.py:436
bytearray serialize(self, obj)
Definition types.py:439
__init__(self, str id, str name, "ESIType" inner_type)
Definition types.py:424
__init__(self, str id, int width)
Definition types.py:262
Tuple[int, bytearray] deserialize(self, bytearray data)
Definition types.py:278
Tuple[bool, Optional[str]] is_valid(self, obj)
Definition types.py:265
bytearray serialize(self, int obj)
Definition types.py:275
int bit_width(self)
Definition types.py:179
Tuple[object, bytearray] deserialize(self, bytearray data)
Definition types.py:186
Tuple[bool, Optional[str]] is_valid(self, obj)
Definition types.py:173
__init__(self, str id)
Definition types.py:170
bytearray serialize(self, obj)
Definition types.py:182
bool try_write(self, msg=None)
Definition types.py:497
bytearray __serialize_msg(self, msg=None)
Definition types.py:482
bool write(self, msg=None)
Definition types.py:490
__init__(self, BundlePort owner, cpp.WriteChannelPort cpp_port)
Definition types.py:478
_get_esi_type(cpp.Type cpp_type)
Definition types.py:28