CIRCT 20.0.0git
Loading...
Searching...
No Matches
Ports.cpp
Go to the documentation of this file.
1//===- Ports.cpp - ESI communication channels -------------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// DO NOT EDIT!
10// This file is distributed as part of an ESI package. The source for this file
11// should always be modified within CIRCT (lib/dialect/ESI/runtime/cpp/).
12//
13//===----------------------------------------------------------------------===//
14
15#include "esi/Ports.h"
16
17#include <chrono>
18#include <stdexcept>
19
20using namespace esi;
21
22BundlePort::BundlePort(AppID id, std::map<std::string, ChannelPort &> channels)
23 : id(id), channels(channels) {}
24
25WriteChannelPort &BundlePort::getRawWrite(const std::string &name) const {
26 auto f = channels.find(name);
27 if (f == channels.end())
28 throw std::runtime_error("Channel '" + name + "' not found");
29 auto *write = dynamic_cast<WriteChannelPort *>(&f->second);
30 if (!write)
31 throw std::runtime_error("Channel '" + name + "' is not a write channel");
32 return *write;
33}
34
35ReadChannelPort &BundlePort::getRawRead(const std::string &name) const {
36 auto f = channels.find(name);
37 if (f == channels.end())
38 throw std::runtime_error("Channel '" + name + "' not found");
39 auto *read = dynamic_cast<ReadChannelPort *>(&f->second);
40 if (!read)
41 throw std::runtime_error("Channel '" + name + "' is not a read channel");
42 return *read;
43}
44void ReadChannelPort::connect(std::function<bool(MessageData)> callback,
45 std::optional<unsigned> bufferSize) {
47 throw std::runtime_error("Channel already connected");
49 this->callback = callback;
50 connectImpl(bufferSize);
51}
52
53void ReadChannelPort::connect(std::optional<unsigned> bufferSize) {
56 this->callback = [this](MessageData data) {
57 std::scoped_lock<std::mutex> lock(pollingM);
58 assert(!(!promiseQueue.empty() && !dataQueue.empty()) &&
59 "Both queues are in use.");
60
61 if (!promiseQueue.empty()) {
62 // If there are promises waiting, fulfill the first one.
63 std::promise<MessageData> p = std::move(promiseQueue.front());
64 promiseQueue.pop();
65 p.set_value(std::move(data));
66 } else {
67 // If not, add it to the data queue, unless the queue is full.
68 if (dataQueue.size() >= maxDataQueueMsgs && maxDataQueueMsgs != 0)
69 return false;
70 dataQueue.push(std::move(data));
71 }
72 return true;
73 };
74 connectImpl(bufferSize);
75}
76
77std::future<MessageData> ReadChannelPort::readAsync() {
78 if (mode == Mode::Callback)
79 throw std::runtime_error(
80 "Cannot read from a callback channel. `connect()` without a callback "
81 "specified to use polling mode.");
82
83 std::scoped_lock<std::mutex> lock(pollingM);
84 assert(!(!promiseQueue.empty() && !dataQueue.empty()) &&
85 "Both queues are in use.");
86
87 if (!dataQueue.empty()) {
88 // If there's data available, fulfill the promise immediately.
89 std::promise<MessageData> p;
90 std::future<MessageData> f = p.get_future();
91 p.set_value(std::move(dataQueue.front()));
92 dataQueue.pop();
93 return f;
94 } else {
95 // Otherwise, add a promise to the queue and return the future.
96 promiseQueue.emplace();
97 return promiseQueue.back().get_future();
98 }
99}
assert(baseType &&"element must be base type")
std::map< std::string, ChannelPort & > channels
Definition Ports.h:229
ReadChannelPort & getRawRead(const std::string &name) const
Definition Ports.cpp:35
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
Definition Ports.cpp:25
virtual void connectImpl(std::optional< unsigned > bufferSize)
Called by all connect methods to let backends initiate the underlying connections.
Definition Ports.h:70
A logical chunk of data representing serialized data.
Definition Common.h:103
A ChannelPort which reads data from the accelerator.
Definition Ports.h:103
volatile Mode mode
Definition Ports.h:161
virtual std::future< MessageData > readAsync()
Asynchronous read.
Definition Ports.cpp:77
std::mutex pollingM
Mutex to protect the two queues used for polling.
Definition Ports.h:171
virtual void connect(std::function< bool(MessageData)> callback, std::optional< unsigned > bufferSize=std::nullopt)
Definition Ports.cpp:44
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Definition Ports.h:164
std::queue< MessageData > dataQueue
Store incoming data here if there are no outstanding promises to be fulfilled.
Definition Ports.h:174
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
Definition Ports.h:133
std::queue< std::promise< MessageData > > promiseQueue
Promises to be fulfilled when data is available.
Definition Ports.h:178
uint64_t maxDataQueueMsgs
Maximum number of messages to store in dataQueue. 0 means no limit.
Definition Ports.h:176
A ChannelPort which sends data to the accelerator.
Definition Ports.h:74
Definition esi.py:1