CIRCT 21.0.0git
Loading...
Searching...
No Matches
Ports.cpp
Go to the documentation of this file.
1//===- Ports.cpp - ESI communication channels -------------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// DO NOT EDIT!
10// This file is distributed as part of an ESI package. The source for this file
11// should always be modified within CIRCT (lib/dialect/ESI/runtime/cpp/).
12//
13//===----------------------------------------------------------------------===//
14
15#include "esi/Ports.h"
16
17#include <chrono>
18#include <stdexcept>
19
20using namespace esi;
21
22BundlePort::BundlePort(AppID id, const BundleType *type, PortMap channels)
23 : id(id), type(type), channels(channels) {}
24
25WriteChannelPort &BundlePort::getRawWrite(const std::string &name) const {
26 auto f = channels.find(name);
27 if (f == channels.end())
28 throw std::runtime_error("Channel '" + name + "' not found");
29 auto *write = dynamic_cast<WriteChannelPort *>(&f->second);
30 if (!write)
31 throw std::runtime_error("Channel '" + name + "' is not a write channel");
32 return *write;
33}
34
35ReadChannelPort &BundlePort::getRawRead(const std::string &name) const {
36 auto f = channels.find(name);
37 if (f == channels.end())
38 throw std::runtime_error("Channel '" + name + "' not found");
39 auto *read = dynamic_cast<ReadChannelPort *>(&f->second);
40 if (!read)
41 throw std::runtime_error("Channel '" + name + "' is not a read channel");
42 return *read;
43}
44void ReadChannelPort::connect(std::function<bool(MessageData)> callback,
45 std::optional<unsigned> bufferSize) {
47 throw std::runtime_error("Channel already connected");
48 this->callback = callback;
49 connectImpl(bufferSize);
51}
52
53void ReadChannelPort::connect(std::optional<unsigned> bufferSize) {
55 this->callback = [this](MessageData data) {
56 std::scoped_lock<std::mutex> lock(pollingM);
57 assert(!(!promiseQueue.empty() && !dataQueue.empty()) &&
58 "Both queues are in use.");
59
60 if (!promiseQueue.empty()) {
61 // If there are promises waiting, fulfill the first one.
62 std::promise<MessageData> p = std::move(promiseQueue.front());
63 promiseQueue.pop();
64 p.set_value(std::move(data));
65 } else {
66 // If not, add it to the data queue, unless the queue is full.
67 if (dataQueue.size() >= maxDataQueueMsgs && maxDataQueueMsgs != 0)
68 return false;
69 dataQueue.push(std::move(data));
70 }
71 return true;
72 };
73 connectImpl(bufferSize);
75}
76
77std::future<MessageData> ReadChannelPort::readAsync() {
78 if (mode == Mode::Callback)
79 throw std::runtime_error(
80 "Cannot read from a callback channel. `connect()` without a callback "
81 "specified to use polling mode.");
82
83 std::scoped_lock<std::mutex> lock(pollingM);
84 assert(!(!promiseQueue.empty() && !dataQueue.empty()) &&
85 "Both queues are in use.");
86
87 if (!dataQueue.empty()) {
88 // If there's data available, fulfill the promise immediately.
89 std::promise<MessageData> p;
90 std::future<MessageData> f = p.get_future();
91 p.set_value(std::move(dataQueue.front()));
92 dataQueue.pop();
93 return f;
94 } else {
95 // Otherwise, add a promise to the queue and return the future.
96 promiseQueue.emplace();
97 return promiseQueue.back().get_future();
98 }
99}
assert(baseType &&"element must be base type")
PortMap channels
Definition Ports.h:270
ReadChannelPort & getRawRead(const std::string &name) const
Definition Ports.cpp:35
WriteChannelPort & getRawWrite(const std::string &name) const
Get access to the raw byte streams of a channel.
Definition Ports.cpp:25
Bundles represent a collection of channels.
Definition Types.h:44
virtual void connectImpl(std::optional< unsigned > bufferSize)
Called by all connect methods to let backends initiate the underlying connections.
Definition Ports.h:73
A logical chunk of data representing serialized data.
Definition Common.h:103
A ChannelPort which reads data from the accelerator.
Definition Ports.h:124
volatile Mode mode
Definition Ports.h:182
virtual std::future< MessageData > readAsync()
Asynchronous read.
Definition Ports.cpp:77
std::mutex pollingM
Mutex to protect the two queues used for polling.
Definition Ports.h:192
virtual void connect(std::function< bool(MessageData)> callback, std::optional< unsigned > bufferSize=std::nullopt)
Definition Ports.cpp:44
std::function< bool(MessageData)> callback
Backends call this callback when new data is available.
Definition Ports.h:185
std::queue< MessageData > dataQueue
Store incoming data here if there are no outstanding promises to be fulfilled.
Definition Ports.h:195
static constexpr uint64_t DefaultMaxDataQueueMsgs
Default max data queue size set at connect time.
Definition Ports.h:154
std::queue< std::promise< MessageData > > promiseQueue
Promises to be fulfilled when data is available.
Definition Ports.h:199
uint64_t maxDataQueueMsgs
Maximum number of messages to store in dataQueue. 0 means no limit.
Definition Ports.h:197
A ChannelPort which sends data to the accelerator.
Definition Ports.h:77
Definition esi.py:1
std::map< std::string, ChannelPort & > PortMap
Definition Ports.h:29