13from __future__
import annotations
15from .
import esiCppAccel
as cpp
17from typing
import TYPE_CHECKING
20 from .accelerator
import HWModule
22from concurrent.futures
import Future
23from typing
import Any, Callable, Dict, List, Optional, Tuple, Type, Union
29 """Get the wrapper class for a C++ type."""
30 if isinstance(cpp_type, cpp.ChannelType):
33 for cpp_type_cls, wrapper_cls
in __esi_mapping.items():
34 if isinstance(cpp_type, cpp_type_cls):
35 return wrapper_cls.wrap_cpp(cpp_type)
36 return ESIType.wrap_cpp(cpp_type)
40__esi_mapping: Dict[Type, Type] = {}
50 """Wrap a C++ ESI type with its corresponding Python ESI Type."""
51 instance = cls.__new__(cls)
52 instance._init_from_cpp(cpp_type)
56 """Initialize instance attributes from a C++ type object."""
61 """Does this type support host communication via Python? Returns either
62 '(True, None)' if it is, or '(False, reason)' if it is not."""
65 return (
False,
"runtime only supports types with multiple of 8 bits")
68 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
69 """Is a Python object compatible with HW type? Returns either '(True,
70 None)' if it is, or '(False, reason)' if it is not."""
71 assert False,
"unimplemented"
75 """Size of this type, in bits. Negative for unbounded types."""
76 assert False,
"unimplemented"
80 """Maximum size of a value of this type, in bytes."""
87 """Convert a Python object to a bytearray."""
88 assert False,
"unimplemented"
90 def deserialize(self, data: bytearray) -> Tuple[object, bytearray]:
91 """Convert a bytearray to a Python object. Return the object and the
93 assert False,
"unimplemented"
104 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
106 return (
False, f
"void type cannot must represented by None, not {obj}")
115 return bytearray([0])
117 def deserialize(self, data: bytearray) -> Tuple[object, bytearray]:
119 raise ValueError(f
"void type cannot be represented by {data}")
120 return (
None, data[1:])
123__esi_mapping[cpp.VoidType] = VoidType
131 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
132 if not isinstance(obj, (bytearray, bytes, list)):
133 return (
False, f
"invalid type: {type(obj)}")
134 if isinstance(obj, list)
and not all(
135 [isinstance(b, int)
and b.bit_length() <= 8
for b
in obj]):
136 return (
False, f
"list item too large: {obj}")
138 return (
False, f
"wrong size: {len(obj)}")
145 def serialize(self, obj: Union[bytearray, bytes, List[int]]) -> bytearray:
146 if isinstance(obj, bytearray):
148 if isinstance(obj, bytes)
or isinstance(obj, list):
149 return bytearray(obj)
150 raise ValueError(f
"cannot convert {obj} to bytearray")
152 def deserialize(self, data: bytearray) -> Tuple[bytearray, bytearray]:
156__esi_mapping[cpp.BitsType] = BitsType
174 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
175 if not isinstance(obj, int):
176 return (
False, f
"must be an int, not {type(obj)}")
178 return (
False, f
"out of range: {obj}")
182 return f
"uint{self.bit_width}"
192__esi_mapping[cpp.UIntType] = UIntType
200 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
201 if not isinstance(obj, int):
202 return (
False, f
"must be an int, not {type(obj)}")
205 return (
False, f
"out of range: {obj}")
208 return (
False, f
"out of range: {obj}")
212 return f
"sint{self.bit_width}"
222__esi_mapping[cpp.SIntType] = SIntType
227 def __init__(self, id: str, fields: List[Tuple[str,
"ESIType"]]):
229 cpp_fields = [(name, field_type.cpp_type)
for name, field_type
in fields]
233 """Initialize instance attributes from a C++ type object."""
240 widths = [ty.bit_width
for (_, ty)
in self.
fields]
241 if any([w < 0
for w
in widths]):
245 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
247 if not isinstance(obj, dict):
248 if not hasattr(obj,
"__dict__"):
249 return (
False,
"must be a dict or have __dict__ attribute")
252 for (fname, ftype)
in self.
fields:
254 return (
False, f
"missing field '{fname}'")
255 fvalid, reason = ftype.is_valid(obj[fname])
257 return (
False, f
"invalid field '{fname}': {reason}")
259 if fields_count != len(obj):
260 return (
False,
"missing fields")
265 if not isinstance(obj, dict):
267 ordered_fields = reversed(
269 for (fname, ftype)
in ordered_fields:
271 ret.extend(ftype.serialize(fval))
274 def deserialize(self, data: bytearray) -> Tuple[Dict[str, Any], bytearray]:
276 ordered_fields = reversed(
278 for (fname, ftype)
in ordered_fields:
279 (fval, data) = ftype.deserialize(data)
284__esi_mapping[cpp.StructType] = StructType
289 def __init__(self, id: str, element_type:
"ESIType", size: int):
293 """Initialize instance attributes from a C++ type object."""
302 def is_valid(self, obj) -> Tuple[bool, Optional[str]]:
303 if not isinstance(obj, list):
304 return (
False, f
"must be a list, not {type(obj)}")
305 if len(obj) != self.
size:
306 return (
False, f
"wrong size: expected {self.size} not {len(obj)}")
307 for (idx, e)
in enumerate(obj):
310 return (
False, f
"invalid element {idx}: {reason}")
315 for e
in reversed(lst):
319 def deserialize(self, data: bytearray) -> Tuple[List[Any], bytearray]:
321 for _
in range(self.
size):
328__esi_mapping[cpp.ArrayType] = ArrayType
332 """A unidirectional communication channel. This is the basic communication
333 method with an accelerator."""
335 def __init__(self, owner: BundlePort, cpp_port: cpp.ChannelPort):
340 def connect(self, buffer_size: Optional[int] =
None):
341 (supports_host, reason) = self.
type.supports_host
342 if not supports_host:
343 raise TypeError(f
"unsupported type: {reason}")
345 opts = cpp.ConnectOptions()
346 opts.buffer_size = buffer_size
355 """A unidirectional communication channel from the host to the accelerator."""
357 def __init__(self, owner: BundlePort, cpp_port: cpp.WriteChannelPort):
359 self.
cpp_port: cpp.WriteChannelPort = cpp_port
362 valid, reason = self.
type.is_valid(msg)
365 f
"'{msg}' cannot be converted to '{self.type}': {reason}")
366 msg_bytes: bytearray = self.
type.serialize(msg)
370 """Write a typed message to the channel. Attempts to serialize 'msg' to what
371 the accelerator expects, but will fail if the object is not convertible to
377 """Like 'write', but uses the non-blocking tryWrite method of the underlying
378 port. Returns True if the write was successful, False otherwise."""
383 """A unidirectional communication channel from the accelerator to the host."""
385 def __init__(self, owner: BundlePort, cpp_port: cpp.ReadChannelPort):
387 self.
cpp_port: cpp.ReadChannelPort = cpp_port
390 """Read a typed message from the channel. Returns a deserialized object of a
391 type defined by the port type."""
394 (msg, leftover) = self.
type.deserialize(buffer)
395 if len(leftover) != 0:
396 raise ValueError(f
"leftover bytes: {leftover}")
401 """A collections of named, unidirectional communication channels."""
405 def __new__(cls, owner: HWModule, cpp_port: cpp.BundlePort):
407 if isinstance(cpp_port, cpp.Function):
408 return super().
__new__(FunctionPort)
409 if isinstance(cpp_port, cpp.Callback):
410 return super().
__new__(CallbackPort)
411 if isinstance(cpp_port, cpp.MMIORegion):
412 return super().
__new__(MMIORegion)
413 if isinstance(cpp_port, cpp.Metric):
414 return super().
__new__(MetricPort)
417 def __init__(self, owner: HWModule, cpp_port: cpp.BundlePort):
429 """A specialization of `Future` for ESI messages. Wraps the cpp object and
430 deserializes the result. Hopefully overrides all the methods necessary for
431 proper operation, which is assumed to be not all of them."""
433 def __init__(self, result_type: Type, cpp_future: cpp.MessageDataFuture):
443 def result(self, timeout: Optional[Union[int, float]] =
None) -> Any:
447 (msg, leftover) = self.
result_type.deserialize(result_bytes)
448 if len(leftover) != 0:
449 raise ValueError(f
"leftover bytes: {leftover}")
453 raise NotImplementedError(
"add_done_callback is not implemented")
457 """A region of memory-mapped I/O space. This is a collection of named
458 channels, which are either read or read-write. The channels are accessed
459 by name, and can be connected to the host."""
461 def __init__(self, owner: HWModule, cpp_port: cpp.MMIORegion):
467 return self.
region.descriptor
469 def read(self, offset: int) -> bytearray:
470 """Read a value from the MMIO region at the given offset."""
473 def write(self, offset: int, data: bytearray) ->
None:
474 """Write a value to the MMIO region at the given offset."""
479 """A pair of channels which carry the input and output of a function."""
481 def __init__(self, owner: HWModule, cpp_port: cpp.BundlePort):
491 def call(self, *args: Any, **kwargs: Any) -> Future:
492 """Call the function with the given argument and returns a future of the
496 if len(args) > 0
and len(kwargs) > 0:
497 raise ValueError(
"cannot use both positional and keyword arguments")
507 valid, reason = self.
arg_type.is_valid(selected)
510 f
"'{selected}' cannot be converted to '{self.arg_type}': {reason}")
511 arg_bytes: bytearray = self.
arg_type.serialize(selected)
515 def __call__(self, *args: Any, **kwds: Any) -> Future:
516 return self.
call(*args, **kwds)
520 """Callback ports are the inverse of function ports -- instead of calls to the
521 accelerator, they get called from the accelerator. Specify the function which
522 you'd like the accelerator to call when you call `connect`."""
524 def __init__(self, owner: HWModule, cpp_port: cpp.BundlePort):
532 def type_convert_wrapper(cb: Callable[[Any], Any],
533 msg: bytearray) -> Optional[bytearray]:
535 (obj, leftover) = self.
arg_type.deserialize(msg)
536 if len(leftover) != 0:
537 raise ValueError(f
"leftover bytes: {leftover}")
542 except Exception
as e:
543 traceback.print_exception(e)
551 """Telemetry ports report an individual piece of information from the
552 acceelerator. The method of accessing telemetry will likely change in the
555 def __init__(self, owner: HWModule, cpp_port: cpp.BundlePort):
Tuple[bool, Optional[str]] is_valid(self, obj)
_init_from_cpp(self, cpp.ArrayType cpp_type)
Tuple[List[Any], bytearray] deserialize(self, bytearray data)
__init__(self, str id, "ESIType" element_type, int size)
bytearray serialize(self, list lst)
bytearray serialize(self, Union[bytearray, bytes, List[int]] obj)
Tuple[bytearray, bytearray] deserialize(self, bytearray data)
Tuple[bool, Optional[str]] is_valid(self, obj)
__init__(self, str id, int width)
__new__(cls, HWModule owner, cpp.BundlePort cpp_port)
WritePort write_port(self, str channel_name)
ReadPort read_port(self, str channel_name)
__init__(self, HWModule owner, cpp.BundlePort cpp_port)
__init__(self, HWModule owner, cpp.BundlePort cpp_port)
connect(self, Callable[[Any], Any] cb)
Tuple[bool, Optional[str]] supports_host(self)
Tuple[object, bytearray] deserialize(self, bytearray data)
_init_from_cpp(self, cpp.Type cpp_type)
Tuple[bool, Optional[str]] is_valid(self, obj)
wrap_cpp(cls, cpp.Type cpp_type)
bytearray serialize(self, obj)
Future call(self, *Any args, **Any kwargs)
__init__(self, HWModule owner, cpp.BundlePort cpp_port)
Future __call__(self, *Any args, **Any kwds)
__init__(self, str id, int width)
None write(self, int offset, bytearray data)
bytearray read(self, int offset)
__init__(self, HWModule owner, cpp.MMIORegion cpp_port)
cpp.MMIORegionDesc descriptor(self)
Any result(self, Optional[Union[int, float]] timeout=None)
__init__(self, Type result_type, cpp.MessageDataFuture cpp_future)
None add_done_callback(self, Callable[[Future], object] fn)
__init__(self, HWModule owner, cpp.BundlePort cpp_port)
__init__(self, BundlePort owner, cpp.ChannelPort cpp_port)
connect(self, Optional[int] buffer_size=None)
__init__(self, BundlePort owner, cpp.ReadChannelPort cpp_port)
Tuple[int, bytearray] deserialize(self, bytearray data)
Tuple[bool, Optional[str]] is_valid(self, obj)
bytearray serialize(self, int obj)
__init__(self, str id, int width)
__init__(self, str id, List[Tuple[str, "ESIType"]] fields)
bytearray serialize(self, obj)
Tuple[Dict[str, Any], bytearray] deserialize(self, bytearray data)
Tuple[bool, Optional[str]] is_valid(self, obj)
_init_from_cpp(self, cpp.StructType cpp_type)
__init__(self, str id, int width)
Tuple[int, bytearray] deserialize(self, bytearray data)
Tuple[bool, Optional[str]] is_valid(self, obj)
bytearray serialize(self, int obj)
Tuple[object, bytearray] deserialize(self, bytearray data)
Tuple[bool, Optional[str]] is_valid(self, obj)
bytearray serialize(self, obj)
bool try_write(self, msg=None)
bytearray __serialize_msg(self, msg=None)
bool write(self, msg=None)
__init__(self, BundlePort owner, cpp.WriteChannelPort cpp_port)
_get_esi_type(cpp.Type cpp_type)