CIRCT 21.0.0git
Loading...
Searching...
No Matches
Ports.h
Go to the documentation of this file.
1//===- Ports.h - ESI communication channels ---------------------*- C++ -*-===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// DO NOT EDIT!
10// This file is distributed as part of an ESI package. The source for this file
11// should always be modified within CIRCT.
12//
13//===----------------------------------------------------------------------===//
14
15// NOLINTNEXTLINE(llvm-header-guard)
16#ifndef ESI_PORTS_H
17#define ESI_PORTS_H
18
19#include "esi/Common.h"
20#include "esi/Types.h"
21#include "esi/Utils.h"
22
23#include <cassert>
24#include <future>
25
26namespace esi {
27
28class ChannelPort;
29using PortMap = std::map<std::string, ChannelPort &>;
30
31/// Unidirectional channels are the basic communication primitive between the
32/// host and accelerator. A 'ChannelPort' is the host side of a channel. It can
33/// be either read or write but not both. At this level, channels are untyped --
34/// just streams of bytes. They are not intended to be used directly by users
35/// but used by higher level APIs which add types.
37public:
39 virtual ~ChannelPort() {}
40
41 /// Set up a connection to the accelerator. The buffer size is optional and
42 /// should be considered merely a hint. Individual implementations use it
43 /// however they like. The unit is number of messages of the port type.
44 virtual void connect(std::optional<unsigned> bufferSize = std::nullopt) = 0;
45 virtual void disconnect() = 0;
46 virtual bool isConnected() const = 0;
47
48 /// Poll for incoming data. Returns true if data was read or written into a
49 /// buffer as a result of the poll. Calling the call back could (will) also
50 /// happen in that case. Some backends need this to be called periodically. In
51 /// the usual case, this will be called by a background thread, but the ESI
52 /// runtime does not want to assume that the host processes use standard
53 /// threads. If the user wants to provide their own threads, they need to call
54 /// this on each port occasionally. This is also called from the 'master' poll
55 /// method in the Accelerator class.
56 bool poll() {
57 if (isConnected())
58 return pollImpl();
59 return false;
60 }
61
62 const Type *getType() const { return type; }
63
64protected:
65 const Type *type;
66
67 /// Method called by poll() to actually poll the channel if the channel is
68 /// connected.
69 virtual bool pollImpl() { return false; }
70
71 /// Called by all connect methods to let backends initiate the underlying
72 /// connections.
73 virtual void connectImpl(std::optional<unsigned> bufferSize) {}
74};
75
76/// A ChannelPort which sends data to the accelerator.
78public:
80
81 virtual void
82 connect(std::optional<unsigned> bufferSize = std::nullopt) override {
83 connectImpl(bufferSize);
84 connected = true;
85 }
86 virtual void disconnect() override { connected = false; }
87 virtual bool isConnected() const override { return connected; }
88
89 /// A very basic blocking write API. Will likely change for performance
90 /// reasons.
91 virtual void write(const MessageData &) = 0;
92
93 /// A basic non-blocking write API. Returns true if the data was written.
94 /// It is invalid for backends to always return false (i.e. backends must
95 /// eventually ensure that writes may succeed).
96 virtual bool tryWrite(const MessageData &data) = 0;
97
98private:
99 volatile bool connected = false;
100};
101
102/// Instantiated when a backend does not know how to create a write channel.
104public:
107
108 void connect(std::optional<unsigned> bufferSize = std::nullopt) override {
109 throw std::runtime_error(errmsg);
110 }
111 void write(const MessageData &) override { throw std::runtime_error(errmsg); }
112 bool tryWrite(const MessageData &) override {
113 throw std::runtime_error(errmsg);
114 }
115
116protected:
117 std::string errmsg;
118};
119
120/// A ChannelPort which reads data from the accelerator. It has two modes:
121/// Callback and Polling which cannot be used at the same time. The mode is set
122/// at connect() time. To change the mode, disconnect() and then connect()
123/// again.
125
126public:
129 virtual void disconnect() override { mode = Mode::Disconnected; }
130 virtual bool isConnected() const override {
131 return mode != Mode::Disconnected;
132 }
133
134 //===--------------------------------------------------------------------===//
135 // Callback mode: To use a callback, connect with a callback function which
136 // will get called with incoming data. This function can be called from any
137 // thread. It shall return true to indicate that the data was consumed. False
138 // if it could not accept the data and should be tried again at some point in
139 // the future. Callback is not allowed to block and needs to execute quickly.
140 //
141 // TODO: Have the callback return something upon which the caller can check,
142 // wait, and be notified.
143 //===--------------------------------------------------------------------===//
144
145 virtual void connect(std::function<bool(MessageData)> callback,
146 std::optional<unsigned> bufferSize = std::nullopt);
147
148 //===--------------------------------------------------------------------===//
149 // Polling mode methods: To use futures or blocking reads, connect without any
150 // arguments. You will then be able to use readAsync() or read().
151 //===--------------------------------------------------------------------===//
152
153 /// Default max data queue size set at connect time.
154 static constexpr uint64_t DefaultMaxDataQueueMsgs = 32;
155
156 /// Connect to the channel in polling mode.
157 virtual void
158 connect(std::optional<unsigned> bufferSize = std::nullopt) override;
159
160 /// Asynchronous read.
161 virtual std::future<MessageData> readAsync();
162
163 /// Specify a buffer to read into. Blocking. Basic API, will likely change
164 /// for performance and functionality reasons.
165 virtual void read(MessageData &outData) {
166 std::future<MessageData> f = readAsync();
167 f.wait();
168 outData = std::move(f.get());
169 }
170
171 /// Set maximum number of messages to store in the dataQueue. 0 means no
172 /// limit. This is only used in polling mode and is set to default of 32 upon
173 /// connect. While it may seem redundant to have this and bufferSize, there
174 /// may be (and are) backends which have a very small amount of memory which
175 /// are accelerator accessible and want to move messages out as quickly as
176 /// possible.
177 void setMaxDataQueueMsgs(uint64_t maxMsgs) { maxDataQueueMsgs = maxMsgs; }
178
179protected:
180 /// Indicates the current mode of the channel.
182 volatile Mode mode;
183
184 /// Backends call this callback when new data is available.
185 std::function<bool(MessageData)> callback;
186
187 //===--------------------------------------------------------------------===//
188 // Polling mode members.
189 //===--------------------------------------------------------------------===//
190
191 /// Mutex to protect the two queues used for polling.
192 std::mutex pollingM;
193 /// Store incoming data here if there are no outstanding promises to be
194 /// fulfilled.
195 std::queue<MessageData> dataQueue;
196 /// Maximum number of messages to store in dataQueue. 0 means no limit.
198 /// Promises to be fulfilled when data is available.
199 std::queue<std::promise<MessageData>> promiseQueue;
200};
201
202/// Instantiated when a backend does not know how to create a read channel.
204public:
207
208 void connect(std::function<bool(MessageData)> callback,
209 std::optional<unsigned> bufferSize = std::nullopt) override {
210 throw std::runtime_error(errmsg);
211 }
212 void connect(std::optional<unsigned> bufferSize = std::nullopt) override {
213 throw std::runtime_error(errmsg);
214 }
215 std::future<MessageData> readAsync() override {
216 throw std::runtime_error(errmsg);
217 }
218
219protected:
220 std::string errmsg;
221};
222
223/// Services provide connections to 'bundles' -- collections of named,
224/// unidirectional communication channels. This class provides access to those
225/// ChannelPorts.
227public:
228 /// Compute the direction of a channel given the bundle direction and the
229 /// bundle port's direction.
230 static bool isWrite(BundleType::Direction bundleDir) {
231 return bundleDir == BundleType::Direction::To;
232 }
233
234 /// Construct a port.
236 virtual ~BundlePort() = default;
237
238 /// Get the ID of the port.
239 AppID getID() const { return id; }
240
241 /// Get access to the raw byte streams of a channel. Intended for internal
242 /// usage and binding to other languages (e.g. Python) which have their own
243 /// message serialization code. Exposed publicly as an escape hatch, but
244 /// ordinary users should not use. You have been warned.
245 WriteChannelPort &getRawWrite(const std::string &name) const;
246 ReadChannelPort &getRawRead(const std::string &name) const;
247 const PortMap &getChannels() const { return channels; }
248
249 /// Cast this Bundle port to a subclass which is actually useful. Returns
250 /// nullptr if the cast fails.
251 // TODO: this probably shouldn't be 'const', but bundle ports' user access are
252 // const. Change that.
253 template <typename T>
254 T *getAs() const {
255 return const_cast<T *>(dynamic_cast<const T *>(this));
256 }
257
258 /// Calls `poll` on all channels in the bundle and returns true if any of them
259 /// returned true.
260 bool poll() {
261 bool result = false;
262 for (auto &channel : channels)
263 result |= channel.second.poll();
264 return result;
265 }
266
267protected:
271};
272
273} // namespace esi
274
275#endif // ESI_PORTS_H
Services provide connections to 'bundles' – collections of named, unidirectional communication channe...
Definition Ports.h:226
virtual ~BundlePort()=default
T * getAs() const
Cast this Bundle port to a subclass which is actually useful.
Definition Ports.h:254
PortMap channels
Definition Ports.h:270
ReadChannelPort & getRawRead(const std::string &name) const
Definition Ports.cpp:35
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
Definition Ports.cpp:25
const PortMap & getChannels() const
Definition Ports.h:247
bool poll()
Calls poll on all channels in the bundle and returns true if any of them returned true.
Definition Ports.h:260
const BundleType * type
Definition Ports.h:269
static bool isWrite(BundleType::Direction bundleDir)
Compute the direction of a channel given the bundle direction and the bundle port's direction.
Definition Ports.h:230
AppID getID() const
Get the ID of the port.
Definition Ports.h:239
Bundles represent a collection of channels.
Definition Types.h:44
Unidirectional channels are the basic communication primitive between the host and accelerator.
Definition Ports.h:36
const Type * getType() const
Definition Ports.h:62
ChannelPort(const Type *type)
Definition Ports.h:38
virtual void disconnect()=0
virtual void connect(std::optional< unsigned > bufferSize=std::nullopt)=0
Set up a connection to the accelerator.
virtual void connectImpl(std::optional< unsigned > bufferSize)
Called by all connect methods to let backends initiate the underlying connections.
Definition Ports.h:73
virtual bool pollImpl()
Method called by poll() to actually poll the channel if the channel is connected.
Definition Ports.h:69
const Type * type
Definition Ports.h:65
virtual ~ChannelPort()
Definition Ports.h:39
bool poll()
Poll for incoming data.
Definition Ports.h:56
virtual bool isConnected() const =0
A logical chunk of data representing serialized data.
Definition Common.h:103
A ChannelPort which reads data from the accelerator.
Definition Ports.h:124
volatile Mode mode
Definition Ports.h:182
virtual std::future< MessageData > readAsync()
Asynchronous read.
Definition Ports.cpp:77
virtual bool isConnected() const override
Definition Ports.h:130
std::mutex pollingM
Mutex to protect the two queues used for polling.
Definition Ports.h:192
virtual void connect(std::function< bool(MessageData)> callback, std::optional< unsigned > bufferSize=std::nullopt)
Definition Ports.cpp:44
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Definition Ports.h:185
Mode
Indicates the current mode of the channel.
Definition Ports.h:181
std::queue< MessageData > dataQueue
Store incoming data here if there are no outstanding promises to be fulfilled.
Definition Ports.h:195
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
Definition Ports.h:154
std::queue< std::promise< MessageData > > promiseQueue
Promises to be fulfilled when data is available.
Definition Ports.h:199
void setMaxDataQueueMsgs(uint64_t maxMsgs)
Set maximum number of messages to store in the dataQueue.
Definition Ports.h:177
uint64_t maxDataQueueMsgs
Maximum number of messages to store in dataQueue. 0 means no limit.
Definition Ports.h:197
virtual void disconnect() override
Definition Ports.h:129
virtual void read(MessageData &outData)
Specify a buffer to read into.
Definition Ports.h:165
ReadChannelPort(const Type *type)
Definition Ports.h:127
Root class of the ESI type system.
Definition Types.h:27
Instantiated when a backend does not know how to create a read channel.
Definition Ports.h:203
void connect(std::optional< unsigned > bufferSize=std::nullopt) override
Connect to the channel in polling mode.
Definition Ports.h:212
void connect(std::function< bool(MessageData)> callback, std::optional< unsigned > bufferSize=std::nullopt) override
Definition Ports.h:208
std::future< MessageData > readAsync() override
Asynchronous read.
Definition Ports.h:215
UnknownReadChannelPort(const Type *type, std::string errmsg)
Definition Ports.h:205
Instantiated when a backend does not know how to create a write channel.
Definition Ports.h:103
void connect(std::optional< unsigned > bufferSize=std::nullopt) override
Set up a connection to the accelerator.
Definition Ports.h:108
void write(const MessageData &) override
A very basic blocking write API.
Definition Ports.h:111
bool tryWrite(const MessageData &) override
A basic non-blocking write API.
Definition Ports.h:112
UnknownWriteChannelPort(const Type *type, std::string errmsg)
Definition Ports.h:105
A ChannelPort which sends data to the accelerator.
Definition Ports.h:77
virtual bool isConnected() const override
Definition Ports.h:87
virtual void disconnect() override
Definition Ports.h:86
volatile bool connected
Definition Ports.h:99
virtual void write(const MessageData &)=0
A very basic blocking write API.
virtual bool tryWrite(const MessageData &data)=0
A basic non-blocking write API.
virtual void connect(std::optional< unsigned > bufferSize=std::nullopt) override
Set up a connection to the accelerator.
Definition Ports.h:82
Definition esi.py:1
std::map< std::string, ChannelPort & > PortMap
Definition Ports.h:29