Skip to content

Commit

Permalink
feat(frt): add a shared memory queue
Browse files Browse the repository at this point in the history
This will be used to implement streaming cosim with SystemVerilog DPI.
  • Loading branch information
Blaok committed Oct 22, 2024
1 parent cd2fbb1 commit 81972bd
Show file tree
Hide file tree
Showing 4 changed files with 273 additions and 1 deletion.
1 change: 0 additions & 1 deletion fpga-runtime/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ cc_test(
name = "frt-test",
size = "small",
srcs = glob(["**/*_test.cpp"]),
visibility = ["//visibility:public"],
deps = [
":frt",
"@googletest//:gtest",
Expand Down
144 changes: 144 additions & 0 deletions fpga-runtime/src/frt/devices/shared_memory_queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright (c) 2024 RapidStream Design Automation, Inc. and contributors.
// All rights reserved. The contributor(s) of this file has/have agreed to the
// RapidStream Contributor License Agreement.

#include "frt/devices/shared_memory_queue.h"

#include <cstdint>
#include <cstdlib>
#include <cstring>

#include <string>

#include <fcntl.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <unistd.h>

#include <glog/logging.h>

namespace fpga {
namespace internal {

namespace {

constexpr char kMagic[] = "tapa";
constexpr int32_t kVersion = 1;

} // namespace

void SharedMemoryQueue::Deleter::operator()(SharedMemoryQueue* ptr) {
if (munmap(ptr, ptr->mmap_len()) != 0) {
PLOG(ERROR) << "munmap";
}
}

SharedMemoryQueue::UniquePtr SharedMemoryQueue::New(int fd) {
auto* ptr = static_cast<SharedMemoryQueue*>(
mmap(nullptr, sizeof(SharedMemoryQueue), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, /*offset=*/0));
if (ptr == MAP_FAILED) {
PLOG(ERROR) << "mmap";
return nullptr;
}

const std::string magic(ptr->magic_, sizeof(ptr->magic_));
if (magic != kMagic) {
LOG(ERROR) << "unexpected magic '" << magic << "'; want '" << kMagic << "'"
<< "; size: " << magic.size();
return nullptr;
}

if (ptr->version_ != kVersion) {
LOG(ERROR) << "unexpected version " << ptr->version_ << "; want "
<< kVersion;
return nullptr;
}

if (ptr->depth_ <= 0) {
LOG(ERROR) << "unexpected non-positive depth " << ptr->depth_;
return nullptr;
}

if (ptr->width_ <= 0) {
LOG(ERROR) << "unexpected non-positive width " << ptr->width_;
return nullptr;
}

ptr = static_cast<SharedMemoryQueue*>(
mremap(ptr, sizeof(SharedMemoryQueue), ptr->mmap_len(), MREMAP_MAYMOVE));
if (ptr == MAP_FAILED) {
PLOG(ERROR) << "mremap";
PLOG_IF(ERROR, munmap(ptr, sizeof(SharedMemoryQueue)) != 0) << "munmap";
return nullptr;
}

return UniquePtr(ptr);
}

int SharedMemoryQueue::CreateFile(std::string& path, int32_t depth,
int32_t width) {
int fd = mkostemp(&path[0], O_CLOEXEC);
if (fd < 0) {
PLOG(ERROR) << "mkostemp";
return fd;
}

SharedMemoryQueue queue;
static_assert(sizeof(queue.magic_) + 1 == sizeof(kMagic)); // +1 for '\0'
memcpy(queue.magic_, kMagic, sizeof(queue.magic_));
queue.version_ = kVersion;
queue.depth_ = depth;
queue.width_ = width;
int rc = ftruncate(fd, sizeof(queue) + depth * width);
if (rc == 0) {
rc = write(fd, &queue, sizeof(queue));
if (rc == sizeof(queue)) {
return fd;
}
if (rc >= 0) {
LOG(ERROR) << "partial write: wrote " << rc << " bytes, want "
<< sizeof(queue);
} else {
PLOG(ERROR) << "write";
}
} else {
PLOG(ERROR) << "ftruncate";
}
PLOG_IF(ERROR, close(fd)) << "close";
PLOG_IF(ERROR, unlink(path.c_str())) << "unlink";
return rc;
}

int64_t SharedMemoryQueue::size() const { return head_ - tail_; }

int64_t SharedMemoryQueue::capacity() const { return depth_; }

bool SharedMemoryQueue::empty() const { return size() <= 0; }

bool SharedMemoryQueue::full() const { return size() >= capacity(); }

std::string SharedMemoryQueue::front() const {
return std::string(&data_[(tail_ % depth_) * width_], width_);
}

std::string SharedMemoryQueue::pop() {
CHECK_GT(size(), 0) << "pop called on an empty queue";
std::string val = front();
++tail_;
return val;
}

void SharedMemoryQueue::push(const std::string& val) {
CHECK_LT(size(), capacity()) << "push called on a full queue";
CHECK_EQ(val.size(), width_) << "unexpected input: " << val;
memcpy(&data_[(head_ % depth_) * width_], val.data(), val.size());
++head_;
}

size_t SharedMemoryQueue::mmap_len() const {
return sizeof(*this) + depth_ * width_;
}

} // namespace internal
} // namespace fpga
67 changes: 67 additions & 0 deletions fpga-runtime/src/frt/devices/shared_memory_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2024 RapidStream Design Automation, Inc. and contributors.
// All rights reserved. The contributor(s) of this file has/have agreed to the
// RapidStream Contributor License Agreement.

#ifndef FPGA_RUNTIME_SHARED_MEMORY_QUEUE_H_
#define FPGA_RUNTIME_SHARED_MEMORY_QUEUE_H_

#include <cstdint>
#include <cstring>

#include <atomic>
#include <memory>
#include <string>

namespace fpga {
namespace internal {

// Shared-memory lock-free SPSC queue with fixed depth and width.
class SharedMemoryQueue {
struct Deleter {
void operator()(SharedMemoryQueue* ptr);
};

public:
using UniquePtr = std::unique_ptr<SharedMemoryQueue, Deleter>;

// Returns `nullptr` on failure with logging.
static UniquePtr New(int fd);

// Creates a file suitable for backing a `SharedMemoryQueue` and returns the
// corresponding file descriptor, with `path_template` modified by `mkostemp`.
// Returns a negative fd on failure with the corresponding errno and logging.
static int CreateFile(std::string& path_template, int32_t depth,
int32_t width);

// Not copyable or movable.
SharedMemoryQueue(const SharedMemoryQueue&) = delete;
SharedMemoryQueue* operator=(const SharedMemoryQueue&) = delete;

int64_t size() const;
int64_t capacity() const;
bool empty() const;
bool full() const;
std::string front() const;
std::string pop();
void push(const std::string& val);

private:
explicit SharedMemoryQueue() = default;

size_t mmap_len() const;

char magic_[4] = {};
int32_t version_ = 0;
int32_t depth_ = 0;
int32_t width_ = 0;
std::atomic<int64_t> tail_{};
std::atomic<int64_t> head_{};
char data_[];
};

static_assert(sizeof(SharedMemoryQueue) == 32);

} // namespace internal
} // namespace fpga

#endif // FPGA_RUNTIME_SHARED_MEMORY_QUEUE_H_
62 changes: 62 additions & 0 deletions fpga-runtime/src/frt/devices/shared_memory_queue_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2024 RapidStream Design Automation, Inc. and contributors.
// All rights reserved. The contributor(s) of this file has/have agreed to the
// RapidStream Contributor License Agreement.

#include "frt/devices/shared_memory_queue.h"

#include <glog/logging.h>
#include <gtest/gtest.h>

#include "frt/devices/filesystem.h"

namespace fpga::internal {
namespace {

constexpr int kDepth = 2;
constexpr int kWidth = 3;

class SharedMemoryQueueTest : public testing::Test {
protected:
~SharedMemoryQueueTest() override {
if (fd_ >= 0) {
PLOG_IF(WARNING, close(fd_) != 0) << "close";
fd_ = -1;
}
fs::remove_all(temp_file_);
}

const testing::TestInfo* const test_info_ =
testing::UnitTest::GetInstance()->current_test_info();
std::string temp_file_ = fs::temp_directory_path() /
(std::string(test_info_->test_suite_name()) + "." +
test_info_->name() + ".shared_memory_queue.XXXXXX");
int fd_ = SharedMemoryQueue::CreateFile(temp_file_, kDepth, kWidth);
SharedMemoryQueue::UniquePtr queue_ = SharedMemoryQueue::New(fd_);
};

TEST_F(SharedMemoryQueueTest, PushAndPopSucceeds) {
const std::string val = "val";

queue_->push(val);
EXPECT_EQ(queue_->pop(), val);
}

TEST_F(SharedMemoryQueueTest, PushFailsWithInvalidInput) {
EXPECT_DEATH(queue_->push("too long"), "unexpected input");
}

TEST_F(SharedMemoryQueueTest, PushFailsWhenFull) {
queue_->push("val");
queue_->push("val");

EXPECT_TRUE(queue_->full());
EXPECT_DEATH(queue_->push("val"), "full");
}

TEST_F(SharedMemoryQueueTest, PopFailsWhenEmpty) {
EXPECT_TRUE(queue_->empty());
EXPECT_DEATH(queue_->pop(), "empty");
}

} // namespace
} // namespace fpga::internal

0 comments on commit 81972bd

Please sign in to comment.