CIRCT  20.0.0git
Ports.h
Go to the documentation of this file.
1 //===- Ports.h - ESI communication channels ---------------------*- C++ -*-===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 //
9 // DO NOT EDIT!
10 // This file is distributed as part of an ESI package. The source for this file
11 // should always be modified within CIRCT.
12 //
13 //===----------------------------------------------------------------------===//
14 
15 // NOLINTNEXTLINE(llvm-header-guard)
16 #ifndef ESI_PORTS_H
17 #define ESI_PORTS_H
18 
19 #include "esi/Common.h"
20 #include "esi/Types.h"
21 #include "esi/Utils.h"
22 
23 #include <cassert>
24 #include <future>
25 
26 namespace esi {
27 
28 /// Unidirectional channels are the basic communication primitive between the
29 /// host and accelerator. A 'ChannelPort' is the host side of a channel. It can
30 /// be either read or write but not both. At this level, channels are untyped --
31 /// just streams of bytes. They are not intended to be used directly by users
32 /// but used by higher level APIs which add types.
33 class ChannelPort {
34 public:
35  ChannelPort(const Type *type) : type(type) {}
36  virtual ~ChannelPort() {}
37 
38  /// Set up a connection to the accelerator. The buffer size is optional and
39  /// should be considered merely a hint. Individual implementations use it
40  /// however they like. The unit is number of messages of the port type.
41  virtual void connect(std::optional<unsigned> bufferSize = std::nullopt) = 0;
42  virtual void disconnect() = 0;
43  virtual bool isConnected() const = 0;
44 
45  /// Poll for incoming data. Returns true if data was read or written into a
46  /// buffer as a result of the poll. Calling the call back could (will) also
47  /// happen in that case. Some backends need this to be called periodically. In
48  /// the usual case, this will be called by a background thread, but the ESI
49  /// runtime does not want to assume that the host processes use standard
50  /// threads. If the user wants to provide their own threads, they need to call
51  /// this on each port occasionally. This is also called from the 'master' poll
52  /// method in the Accelerator class.
53  bool poll() {
54  if (isConnected())
55  return pollImpl();
56  return false;
57  }
58 
59  const Type *getType() const { return type; }
60 
61 protected:
62  const Type *type;
63 
64  /// Method called by poll() to actually poll the channel if the channel is
65  /// connected.
66  virtual bool pollImpl() { return false; }
67 
68  /// Called by all connect methods to let backends initiate the underlying
69  /// connections.
70  virtual void connectImpl(std::optional<unsigned> bufferSize) {}
71 };
72 
73 /// A ChannelPort which sends data to the accelerator.
74 class WriteChannelPort : public ChannelPort {
75 public:
77 
78  virtual void
79  connect(std::optional<unsigned> bufferSize = std::nullopt) override {
80  connectImpl(bufferSize);
81  connected = true;
82  }
83  virtual void disconnect() override { connected = false; }
84  virtual bool isConnected() const override { return connected; }
85 
86  /// A very basic blocking write API. Will likely change for performance
87  /// reasons.
88  virtual void write(const MessageData &) = 0;
89 
90  /// A basic non-blocking write API. Returns true if the data was written.
91  /// It is invalid for backends to always return false (i.e. backends must
92  /// eventually ensure that writes may succeed).
93  virtual bool tryWrite(const MessageData &data) = 0;
94 
95 private:
96  volatile bool connected = false;
97 };
98 
99 /// A ChannelPort which reads data from the accelerator. It has two modes:
100 /// Callback and Polling which cannot be used at the same time. The mode is set
101 /// at connect() time. To change the mode, disconnect() and then connect()
102 /// again.
103 class ReadChannelPort : public ChannelPort {
104 
105 public:
108  virtual void disconnect() override { mode = Mode::Disconnected; }
109  virtual bool isConnected() const override {
110  return mode != Mode::Disconnected;
111  }
112 
113  //===--------------------------------------------------------------------===//
114  // Callback mode: To use a callback, connect with a callback function which
115  // will get called with incoming data. This function can be called from any
116  // thread. It shall return true to indicate that the data was consumed. False
117  // if it could not accept the data and should be tried again at some point in
118  // the future. Callback is not allowed to block and needs to execute quickly.
119  //
120  // TODO: Have the callback return something upon which the caller can check,
121  // wait, and be notified.
122  //===--------------------------------------------------------------------===//
123 
124  virtual void connect(std::function<bool(MessageData)> callback,
125  std::optional<unsigned> bufferSize = std::nullopt);
126 
127  //===--------------------------------------------------------------------===//
128  // Polling mode methods: To use futures or blocking reads, connect without any
129  // arguments. You will then be able to use readAsync() or read().
130  //===--------------------------------------------------------------------===//
131 
132  /// Default max data queue size set at connect time.
133  static constexpr uint64_t DefaultMaxDataQueueMsgs = 32;
134 
135  /// Connect to the channel in polling mode.
136  virtual void
137  connect(std::optional<unsigned> bufferSize = std::nullopt) override;
138 
139  /// Asynchronous read.
140  virtual std::future<MessageData> readAsync();
141 
142  /// Specify a buffer to read into. Blocking. Basic API, will likely change
143  /// for performance and functionality reasons.
144  virtual void read(MessageData &outData) {
145  std::future<MessageData> f = readAsync();
146  f.wait();
147  outData = std::move(f.get());
148  }
149 
150  /// Set maximum number of messages to store in the dataQueue. 0 means no
151  /// limit. This is only used in polling mode and is set to default of 32 upon
152  /// connect. While it may seem redundant to have this and bufferSize, there
153  /// may be (and are) backends which have a very small amount of memory which
154  /// are accelerator accessible and want to move messages out as quickly as
155  /// possible.
156  void setMaxDataQueueMsgs(uint64_t maxMsgs) { maxDataQueueMsgs = maxMsgs; }
157 
158 protected:
159  /// Indicates the current mode of the channel.
161  volatile Mode mode;
162 
163  /// Backends call this callback when new data is available.
164  std::function<bool(MessageData)> callback;
165 
166  //===--------------------------------------------------------------------===//
167  // Polling mode members.
168  //===--------------------------------------------------------------------===//
169 
170  /// Mutex to protect the two queues used for polling.
171  std::mutex pollingM;
172  /// Store incoming data here if there are no outstanding promises to be
173  /// fulfilled.
174  std::queue<MessageData> dataQueue;
175  /// Maximum number of messages to store in dataQueue. 0 means no limit.
177  /// Promises to be fulfilled when data is available.
178  std::queue<std::promise<MessageData>> promiseQueue;
179 };
180 
181 /// Services provide connections to 'bundles' -- collections of named,
182 /// unidirectional communication channels. This class provides access to those
183 /// ChannelPorts.
184 class BundlePort {
185 public:
186  /// Compute the direction of a channel given the bundle direction and the
187  /// bundle port's direction.
188  static bool isWrite(BundleType::Direction bundleDir) {
189  return bundleDir == BundleType::Direction::To;
190  }
191 
192  /// Construct a port.
193  BundlePort(AppID id, std::map<std::string, ChannelPort &> channels);
194  virtual ~BundlePort() = default;
195 
196  /// Get the ID of the port.
197  AppID getID() const { return id; }
198 
199  /// Get access to the raw byte streams of a channel. Intended for internal
200  /// usage and binding to other languages (e.g. Python) which have their own
201  /// message serialization code. Exposed publicly as an escape hatch, but
202  /// ordinary users should not use. You have been warned.
203  WriteChannelPort &getRawWrite(const std::string &name) const;
204  ReadChannelPort &getRawRead(const std::string &name) const;
205  const std::map<std::string, ChannelPort &> &getChannels() const {
206  return channels;
207  }
208 
209  /// Cast this Bundle port to a subclass which is actually useful. Returns
210  /// nullptr if the cast fails.
211  // TODO: this probably shouldn't be 'const', but bundle ports' user access are
212  // const. Change that.
213  template <typename T>
214  T *getAs() const {
215  return const_cast<T *>(dynamic_cast<const T *>(this));
216  }
217 
218  /// Calls `poll` on all channels in the bundle and returns true if any of them
219  /// returned true.
220  bool poll() {
221  bool result = false;
222  for (auto &channel : channels)
223  result |= channel.second.poll();
224  return result;
225  }
226 
227 private:
229  std::map<std::string, ChannelPort &> channels;
230 };
231 
232 } // namespace esi
233 
234 #endif // ESI_PORTS_H
Services provide connections to 'bundles' – collections of named, unidirectional communication channe...
Definition: Ports.h:184
virtual ~BundlePort()=default
std::map< std::string, ChannelPort & > channels
Definition: Ports.h:229
ReadChannelPort & getRawRead(const std::string &name) const
Definition: Ports.cpp:35
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
Definition: Ports.cpp:25
bool poll()
Calls poll on all channels in the bundle and returns true if any of them returned true.
Definition: Ports.h:220
AppID id
Definition: Ports.h:228
const std::map< std::string, ChannelPort & > & getChannels() const
Definition: Ports.h:205
T * getAs() const
Cast this Bundle port to a subclass which is actually useful.
Definition: Ports.h:214
static bool isWrite(BundleType::Direction bundleDir)
Compute the direction of a channel given the bundle direction and the bundle port's direction.
Definition: Ports.h:188
BundlePort(AppID id, std::map< std::string, ChannelPort & > channels)
Construct a port.
Definition: Ports.cpp:22
AppID getID() const
Get the ID of the port.
Definition: Ports.h:197
Unidirectional channels are the basic communication primitive between the host and accelerator.
Definition: Ports.h:33
ChannelPort(const Type *type)
Definition: Ports.h:35
virtual void disconnect()=0
virtual void connect(std::optional< unsigned > bufferSize=std::nullopt)=0
Set up a connection to the accelerator.
virtual void connectImpl(std::optional< unsigned > bufferSize)
Called by all connect methods to let backends initiate the underlying connections.
Definition: Ports.h:70
virtual bool pollImpl()
Method called by poll() to actually poll the channel if the channel is connected.
Definition: Ports.h:66
const Type * type
Definition: Ports.h:62
const Type * getType() const
Definition: Ports.h:59
virtual ~ChannelPort()
Definition: Ports.h:36
bool poll()
Poll for incoming data.
Definition: Ports.h:53
virtual bool isConnected() const =0
A logical chunk of data representing serialized data.
Definition: Common.h:92
A ChannelPort which reads data from the accelerator.
Definition: Ports.h:103
volatile Mode mode
Definition: Ports.h:161
virtual std::future< MessageData > readAsync()
Asynchronous read.
Definition: Ports.cpp:77
virtual bool isConnected() const override
Definition: Ports.h:109
std::mutex pollingM
Mutex to protect the two queues used for polling.
Definition: Ports.h:171
virtual void connect(std::function< bool(MessageData)> callback, std::optional< unsigned > bufferSize=std::nullopt)
Definition: Ports.cpp:44
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Definition: Ports.h:164
Mode
Indicates the current mode of the channel.
Definition: Ports.h:160
std::queue< MessageData > dataQueue
Store incoming data here if there are no outstanding promises to be fulfilled.
Definition: Ports.h:174
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
Definition: Ports.h:133
std::queue< std::promise< MessageData > > promiseQueue
Promises to be fulfilled when data is available.
Definition: Ports.h:178
void setMaxDataQueueMsgs(uint64_t maxMsgs)
Set maximum number of messages to store in the dataQueue.
Definition: Ports.h:156
uint64_t maxDataQueueMsgs
Maximum number of messages to store in dataQueue. 0 means no limit.
Definition: Ports.h:176
virtual void disconnect() override
Definition: Ports.h:108
virtual void read(MessageData &outData)
Specify a buffer to read into.
Definition: Ports.h:144
ReadChannelPort(const Type *type)
Definition: Ports.h:106
Root class of the ESI type system.
Definition: Types.h:27
A ChannelPort which sends data to the accelerator.
Definition: Ports.h:74
virtual bool isConnected() const override
Definition: Ports.h:84
virtual void disconnect() override
Definition: Ports.h:83
volatile bool connected
Definition: Ports.h:96
virtual void write(const MessageData &)=0
A very basic blocking write API.
virtual bool tryWrite(const MessageData &data)=0
A basic non-blocking write API.
virtual void connect(std::optional< unsigned > bufferSize=std::nullopt) override
Set up a connection to the accelerator.
Definition: Ports.h:79
Definition: esi.py:1