32class UnknownEngine :
public Engine {
38 throw std::runtime_error(
"Unknown engine '" + engineName +
"'");
42 const std::string &channelName,
44 const Type *type)
override {
45 if (BundlePort::isWrite(dir))
46 return std::make_unique<UnknownWriteChannelPort>(
48 "Unknown engine '" + engineName +
"': cannot create write port");
50 return std::make_unique<UnknownReadChannelPort>(
51 type,
"Unknown engine '" + engineName +
"': cannot create read port");
55 std::string engineName;
74 static constexpr size_t BufferPtrOffset = 8;
76 OneItemBuffersToHostReadPort(
const Type *type, OneItemBuffersToHost *engine,
77 AppIDPath idPath,
const std::string &channelName)
79 channelName(channelName) {
90 void writeBufferPtr();
96 std::string identifier()
const {
return idPath.
toStr() +
"." + channelName; }
104 std::unique_ptr<services::HostMem::HostMemRegion> buffer;
106 std::unique_ptr<SegmentedMessageData> pendingMessage;
108 uint64_t pollCount = 0;
112 std::string channelName;
116 friend class OneItemBuffersToHostReadPort;
123 auto mmioIDIter = details.find(
"mmio");
124 if (mmioIDIter != details.end())
125 mmioID = std::any_cast<AppID>(mmioIDIter->second);
132 return std::make_unique<OneItemBuffersToHost>(conn, idPath, details);
141 const std::string &channelName,
143 const Type *type)
override {
144 if (BundlePort::isWrite(dir))
145 return std::make_unique<UnknownWriteChannelPort>(
146 type, idPath.
toStr() +
"." + channelName +
147 " OneItemBuffersToHost: cannot create write port");
148 return std::make_unique<OneItemBuffersToHostReadPort>(type,
this, idPath,
154 std::optional<AppID> mmioID;
160void OneItemBuffersToHost::connect() {
165 throw std::runtime_error(
"OneItemBuffersToHost: no mmio path specified");
168 throw std::runtime_error(
"OneItemBuffersToHost: no host memory service");
175 mmioPath.push_back(*mmioID);
179 throw std::runtime_error(
181 " OneItemBuffersToHost: could not find MMIO port at " +
185 throw std::runtime_error(
187 " OneItemBuffersToHost: MMIO port is not an MMIO port");
193void OneItemBuffersToHostReadPort::writeBufferPtr() {
194 uint8_t *bufferData =
reinterpret_cast<uint8_t *
>(buffer->getPtr());
195 bufferData[bufferSize - 1] = 0;
197 engine->mmio->write(BufferPtrOffset,
198 reinterpret_cast<uint64_t
>(buffer->getDevicePtr()));
201void OneItemBuffersToHostReadPort::connectImpl(
204 buffer = engine->hostMem->allocate(bufferSize, {
true,
false});
208bool OneItemBuffersToHostReadPort::pollImpl() {
209 Logger &logger = engine->conn.getLogger();
211 auto tryDeliverPending = [&]() {
213 [&](std::string &subsystem, std::string &msg,
214 std::unique_ptr<std::map<std::string, std::any>> &details) {
215 subsystem =
"OneItemBuffersToHost";
216 msg =
"received message";
217 details = std::make_unique<std::map<std::string, std::any>>();
219 (*details)[
"channel"] = identifier();
220 (*details)[
"data_size"] = flat.getSize();
221 (*details)[
"message_data"] = flat.toHex();
224 if (!invokeCallback(pendingMessage)) {
226 [&](std::string &subsystem, std::string &msg,
227 std::unique_ptr<std::map<std::string, std::any>> &details) {
228 subsystem =
"OneItemBuffersToHost";
229 msg =
"callback rejected data";
230 details = std::make_unique<std::map<std::string, std::any>>();
231 (*details)[
"channel"] = identifier();
236 pendingMessage.reset();
239 [
this](std::string &subsystem, std::string &msg,
240 std::unique_ptr<std::map<std::string, std::any>> &details) {
241 subsystem =
"OneItemBuffersToHost";
242 msg =
"callback accepted data";
243 details = std::make_unique<std::map<std::string, std::any>>();
244 (*details)[
"channel"] = identifier();
250 return tryDeliverPending();
253 uint8_t *bufferData =
reinterpret_cast<uint8_t *
>(buffer->getPtr());
254 if (bufferData[bufferSize - 1] == 0)
258 pendingMessage = std::make_unique<MessageData>(bufferData, bufferSize - 1);
259 return tryDeliverPending();
285class OneItemBuffersFromHost;
289 static constexpr size_t BufferPtrOffset = 8;
290 static constexpr size_t CompletionPtrOffset = 16;
292 OneItemBuffersFromHostWritePort(
const Type *type,
293 OneItemBuffersFromHost *engine,
295 const std::string &channelName)
297 channelName(channelName) {}
302 std::string identifier()
const {
return idPath.
toStr() +
"." + channelName; }
306 bool tryWriteImpl(
const MessageData &data)
override;
312 bool pollImpl()
override;
317 std::unique_ptr<services::HostMem::HostMemRegion> data_buffer;
318 std::unique_ptr<services::HostMem::HostMemRegion> completion_buffer;
321 std::mutex bufferMutex;
326 std::queue<MessageData> pendingFrames;
330 std::string channelName;
334 friend class OneItemBuffersFromHostWritePort;
341 auto mmioIDIter = details.find(
"mmio");
342 if (mmioIDIter != details.end())
343 mmioID = std::any_cast<AppID>(mmioIDIter->second);
350 return std::make_unique<OneItemBuffersFromHost>(conn, idPath, details);
358 std::unique_ptr<ChannelPort> createPort(
AppIDPath idPath,
359 const std::string &channelName,
361 const Type *type)
override {
362 if (!BundlePort::isWrite(dir))
363 return std::make_unique<UnknownReadChannelPort>(
364 type, idPath.
toStr() +
"." + channelName +
365 " OneItemBuffersFromHost: cannot create read port");
366 return std::make_unique<OneItemBuffersFromHostWritePort>(type,
this, idPath,
372 std::optional<AppID> mmioID;
379void OneItemBuffersFromHost::connect() {
384 throw std::runtime_error(
"OneItemBuffersFromHost: no mmio path specified");
387 throw std::runtime_error(
"OneItemBuffersFromHost: no host memory service");
394 mmioPath.push_back(*mmioID);
398 throw std::runtime_error(
400 " OneItemBuffersFromHost: could not find MMIO port at " +
404 throw std::runtime_error(
406 " OneItemBuffersFromHost: MMIO port is not an MMIO port");
412void OneItemBuffersFromHostWritePort::connectImpl(
415 data_buffer = engine->hostMem->allocate(
416 std::max(getFrameSizeBytes(), (
size_t)512), {
false,
false});
417 completion_buffer = engine->hostMem->allocate(512, {
true,
false});
420 *
static_cast<uint8_t *
>(completion_buffer->getPtr()) = 1;
423void OneItemBuffersFromHostWritePort::enqueueFrames(
const MessageData &data) {
424 auto frames = getMessageFrames(data);
425 std::lock_guard<std::mutex> lock(bufferMutex);
426 for (
const auto &frame : frames)
427 pendingFrames.push(frame);
430void OneItemBuffersFromHostWritePort::writeImpl(
const MessageData &data) {
431 while (!tryWriteImpl(data))
433 std::this_thread::yield();
436bool OneItemBuffersFromHostWritePort::tryWriteImpl(
const MessageData &data) {
439 std::lock_guard<std::mutex> lock(bufferMutex);
440 if (!pendingFrames.empty())
448bool OneItemBuffersFromHostWritePort::pollImpl() {
449 Logger &logger = engine->conn.getLogger();
452 completion_buffer->flush();
453 uint8_t *completion =
454 reinterpret_cast<uint8_t *
>(completion_buffer->getPtr());
455 if (*completion == 0)
458 std::lock_guard<std::mutex> lock(bufferMutex);
463 if (pendingFrames.empty())
466 MessageData frame = std::move(pendingFrames.front());
471 void *bufferData = data_buffer->getPtr();
472 std::memcpy(bufferData, src, sendSize);
473 data_buffer->flush();
475 completion_buffer->flush();
476 engine->mmio->write(BufferPtrOffset,
477 reinterpret_cast<uint64_t
>(data_buffer->getDevicePtr()));
480 reinterpret_cast<uint64_t
>(completion_buffer->getDevicePtr()));
482 logger.
trace([
this, sendSize](
483 std::string &subsystem, std::string &msg,
484 std::unique_ptr<std::map<std::string, std::any>> &details) {
485 subsystem =
"OneItemBuffersFromHost";
486 msg =
"initiated transfer";
487 details = std::make_unique<std::map<std::string, std::any>>();
488 (*details)[
"channel"] = identifier();
489 (*details)[
"data_size"] = sendSize;
504 const std::string &channelName,
506 auto portIter =
ownedPorts.find(std::make_pair(idPath, channelName));
508 return *portIter->second;
509 std::unique_ptr<ChannelPort> port =
512 ownedPorts.emplace(std::make_pair(idPath, channelName), std::move(port));
519 for (
auto [channelName, dir, type] : bundleType->
getChannels()) {
524 ports.emplace(channelName, engineIter->second->requestPort(
525 idPath, channelName, dir, type));
532 auto [it, inserted] =
bundleEngineMap.try_emplace(channelName, engine);
534 throw std::runtime_error(
"Channel already exists in engine map");
542class EngineRegistry {
544 static std::map<std::string, registry::internal::EngineCreate> &get() {
545 static EngineRegistry instance;
546 return instance.engineRegistry;
550 std::map<std::string, registry::internal::EngineCreate> engineRegistry;
554std::unique_ptr<Engine>
556 const std::string &dmaEngineName,
AppIDPath idPath,
559 auto ® = EngineRegistry::get();
560 auto it = reg.find(dmaEngineName);
562 return std::make_unique<UnknownEngine>(conn, dmaEngineName);
563 return it->second(conn, idPath, details, clients);
568 auto tried = EngineRegistry::get().try_emplace(name, create);
570 throw std::runtime_error(
"Engine already exists in registry");
#define REGISTER_ENGINE(Name, TEngine)
Abstract class representing a connection to an accelerator.
Top level accelerator class.
std::string toStr() const
PortMap requestPorts(const AppIDPath &idPath, const BundleType *bundleType) const
Request ports for all the channels in a bundle.
std::map< std::string, Engine * > bundleEngineMap
void setEngine(const std::string &channelName, Engine *engine)
Set a particlar engine for a particular channel.
Services provide connections to 'bundles' – collections of named, unidirectional communication channe...
Bundles represent a collection of channels.
const ChannelVector & getChannels() const
Unidirectional channels are the basic communication primitive between the host and accelerator.
virtual bool pollImpl()
Method called by poll() to actually poll the channel if the channel is connected.
virtual void connectImpl(const ConnectOptions &options)
Called by all connect methods to let backends initiate the underlying connections.
Engines implement the actual channel communication between the host and the accelerator.
virtual void connect()
Start the engine, if applicable.
virtual ChannelPort & requestPort(AppIDPath idPath, const std::string &channelName, BundleType::Direction dir, const Type *type)
Get a port for a channel, from the cache if it exists or create it.
virtual std::unique_ptr< ChannelPort > createPort(AppIDPath idPath, const std::string &channelName, BundleType::Direction dir, const Type *type)=0
Each engine needs to know how to create a ports.
std::map< std::pair< AppIDPath, std::string >, std::unique_ptr< ChannelPort > > ownedPorts
void trace(const std::string &subsystem, const std::string &msg, const std::map< std::string, std::any > *details=nullptr)
Log a trace message.
A concrete flat message backed by a single vector of bytes.
const uint8_t * getBytes() const
size_t getSize() const
Get the size of the data in bytes.
MessageData toMessageData() const override
Flatten all segments into a standard MessageData.
A ChannelPort which reads data from the accelerator.
Root class of the ESI type system.
virtual std::ptrdiff_t getBitWidth() const
A ChannelPort which sends data to the accelerator.
A "slice" of some parent MMIO space.
connect(destination, source)
std::function< std::unique_ptr< Engine >(AcceleratorConnection &conn, AppIDPath idPath, const ServiceImplDetails &details, const HWClientDetails &clients)> EngineCreate
Engines can register themselves for pluggable functionality.
void registerEngine(const std::string &name, EngineCreate create)
std::unique_ptr< Engine > createEngine(AcceleratorConnection &conn, const std::string &dmaEngineName, AppIDPath idPath, const ServiceImplDetails &details, const HWClientDetails &clients)
Create an engine by name.
uint64_t bitsToBytes(uint64_t bits)
Compute ceil(bits/8).
std::map< std::string, std::any > ServiceImplDetails
std::map< std::string, ChannelPort & > PortMap
std::vector< HWClientDetail > HWClientDetails
OneItemBuffersFromHost(Type client_type)
OneItemBuffersToHost(Type client_type)