CIRCT  20.0.0git
Ports.cpp
Go to the documentation of this file.
1 //===- Ports.cpp - ESI communication channels -------------------------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 //
9 // DO NOT EDIT!
10 // This file is distributed as part of an ESI package. The source for this file
11 // should always be modified within CIRCT (lib/dialect/ESI/runtime/cpp/).
12 //
13 //===----------------------------------------------------------------------===//
14 
15 #include "esi/Ports.h"
16 
17 #include <chrono>
18 #include <stdexcept>
19 
20 using namespace esi;
21 
22 BundlePort::BundlePort(AppID id, std::map<std::string, ChannelPort &> channels)
23  : id(id), channels(channels) {}
24 
25 WriteChannelPort &BundlePort::getRawWrite(const std::string &name) const {
26  auto f = channels.find(name);
27  if (f == channels.end())
28  throw std::runtime_error("Channel '" + name + "' not found");
29  auto *write = dynamic_cast<WriteChannelPort *>(&f->second);
30  if (!write)
31  throw std::runtime_error("Channel '" + name + "' is not a write channel");
32  return *write;
33 }
34 
35 ReadChannelPort &BundlePort::getRawRead(const std::string &name) const {
36  auto f = channels.find(name);
37  if (f == channels.end())
38  throw std::runtime_error("Channel '" + name + "' not found");
39  auto *read = dynamic_cast<ReadChannelPort *>(&f->second);
40  if (!read)
41  throw std::runtime_error("Channel '" + name + "' is not a read channel");
42  return *read;
43 }
44 void ReadChannelPort::connect(std::function<bool(MessageData)> callback,
45  std::optional<unsigned> bufferSize) {
46  if (mode != Mode::Disconnected)
47  throw std::runtime_error("Channel already connected");
48  mode = Mode::Callback;
49  this->callback = callback;
50  connectImpl(bufferSize);
51 }
52 
53 void ReadChannelPort::connect(std::optional<unsigned> bufferSize) {
54  mode = Mode::Polling;
56  this->callback = [this](MessageData data) {
57  std::scoped_lock<std::mutex> lock(pollingM);
58  assert(!(!promiseQueue.empty() && !dataQueue.empty()) &&
59  "Both queues are in use.");
60 
61  if (!promiseQueue.empty()) {
62  // If there are promises waiting, fulfill the first one.
63  std::promise<MessageData> p = std::move(promiseQueue.front());
64  promiseQueue.pop();
65  p.set_value(std::move(data));
66  } else {
67  // If not, add it to the data queue, unless the queue is full.
68  if (dataQueue.size() >= maxDataQueueMsgs && maxDataQueueMsgs != 0)
69  return false;
70  dataQueue.push(std::move(data));
71  }
72  return true;
73  };
74  connectImpl(bufferSize);
75 }
76 
77 std::future<MessageData> ReadChannelPort::readAsync() {
78  if (mode == Mode::Callback)
79  throw std::runtime_error(
80  "Cannot read from a callback channel. `connect()` without a callback "
81  "specified to use polling mode.");
82 
83  std::scoped_lock<std::mutex> lock(pollingM);
84  assert(!(!promiseQueue.empty() && !dataQueue.empty()) &&
85  "Both queues are in use.");
86 
87  if (!dataQueue.empty()) {
88  // If there's data available, fulfill the promise immediately.
89  std::promise<MessageData> p;
90  std::future<MessageData> f = p.get_future();
91  p.set_value(std::move(dataQueue.front()));
92  dataQueue.pop();
93  return f;
94  } else {
95  // Otherwise, add a promise to the queue and return the future.
96  promiseQueue.emplace();
97  return promiseQueue.back().get_future();
98  }
99 }
assert(baseType &&"element must be base type")
std::map< std::string, ChannelPort & > channels
Definition: Ports.h:229
ReadChannelPort & getRawRead(const std::string &name) const
Definition: Ports.cpp:35
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
Definition: Ports.cpp:25
BundlePort(AppID id, std::map< std::string, ChannelPort & > channels)
Construct a port.
Definition: Ports.cpp:22
virtual void connectImpl(std::optional< unsigned > bufferSize)
Called by all connect methods to let backends initiate the underlying connections.
Definition: Ports.h:70
A logical chunk of data representing serialized data.
Definition: Common.h:92
A ChannelPort which reads data from the accelerator.
Definition: Ports.h:103
volatile Mode mode
Definition: Ports.h:161
virtual std::future< MessageData > readAsync()
Asynchronous read.
Definition: Ports.cpp:77
std::mutex pollingM
Mutex to protect the two queues used for polling.
Definition: Ports.h:171
virtual void connect(std::function< bool(MessageData)> callback, std::optional< unsigned > bufferSize=std::nullopt)
Definition: Ports.cpp:44
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Definition: Ports.h:164
std::queue< MessageData > dataQueue
Store incoming data here if there are no outstanding promises to be fulfilled.
Definition: Ports.h:174
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
Definition: Ports.h:133
std::queue< std::promise< MessageData > > promiseQueue
Promises to be fulfilled when data is available.
Definition: Ports.h:178
uint64_t maxDataQueueMsgs
Maximum number of messages to store in dataQueue. 0 means no limit.
Definition: Ports.h:176
A ChannelPort which sends data to the accelerator.
Definition: Ports.h:74
Definition: esi.py:1