Skip to content

Commit

Permalink
LibCore: Port SharedCircularQueue to Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
stasoid committed Nov 30, 2024
1 parent ecba40b commit a386195
Showing 1 changed file with 19 additions and 40 deletions.
59 changes: 19 additions & 40 deletions Libraries/LibCore/SharedCircularQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,11 @@

#pragma once

#include <AK/Assertions.h>
#include <AK/Atomic.h>
#include <AK/BuiltinWrappers.h>
#include <AK/ByteString.h>
#include <AK/Debug.h>
#include <AK/Error.h>
#include <AK/Format.h>
#include <AK/Function.h>
#include <AK/NonnullRefPtr.h>
#include <AK/NumericLimits.h>
#include <AK/Platform.h>
#include <AK/RefCounted.h>
#include <AK/RefPtr.h>
#include <AK/Types.h>
#include <AK/Variant.h>
#include <AK/Weakable.h>
#include <LibCore/AnonymousBuffer.h>
#include <LibCore/System.h>
#include <errno.h>
#include <fcntl.h>
#include <sched.h>
#include <sys/mman.h>

namespace Core {

Expand Down Expand Up @@ -64,14 +47,17 @@ class SharedSingleProducerCircularQueue final {
// Allocates a new circular queue in shared memory.
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create()
{
auto fd = TRY(System::anon_create(sizeof(SharedMemorySPCQ), O_CLOEXEC));
return create_internal(fd, true);
auto anon_buf = TRY(AnonymousBuffer::create_with_size(sizeof(SharedMemorySPCQ)));
auto shared_queue = new (anon_buf.data<void>()) SharedMemorySPCQ;
return create_internal(anon_buf, shared_queue);
}

// Uses an existing circular queue from given shared memory.
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create(int fd)
{
return create_internal(fd, false);
auto anon_buf = TRY(AnonymousBuffer::create_from_anon_fd(fd, sizeof(SharedMemorySPCQ)));
auto shared_queue = reinterpret_cast<SharedMemorySPCQ*>(anon_buf.data<void>());
return create_internal(anon_buf, shared_queue);
}

constexpr size_t size() const { return Size; }
Expand All @@ -84,7 +70,7 @@ class SharedSingleProducerCircularQueue final {
return head - tail;
}

ALWAYS_INLINE constexpr int fd() const { return m_queue->m_fd; }
ALWAYS_INLINE constexpr int fd() const { return m_queue->fd(); }
ALWAYS_INLINE constexpr bool is_valid() const { return !m_queue.is_null(); }

ALWAYS_INLINE constexpr size_t weak_head() const { return m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); }
Expand Down Expand Up @@ -174,42 +160,35 @@ class SharedSingleProducerCircularQueue final {
alignas(ValueType) Array<ValueType, Size> m_data;
};

class RefCountedSharedMemorySPCQ : public RefCounted<RefCountedSharedMemorySPCQ> {
class RefCountedSharedMemorySPCQ
: public RefCounted<RefCountedSharedMemorySPCQ>
, public AnonymousBuffer {
friend class SharedSingleProducerCircularQueue;

public:
SharedMemorySPCQ* m_queue;
void* m_raw;
int m_fd;

~RefCountedSharedMemorySPCQ()
{
MUST(System::close(m_fd));
MUST(System::munmap(m_raw, sizeof(SharedMemorySPCQ)));
dbgln_if(SHARED_QUEUE_DEBUG, "destructed SSPCQ at {:p}, shared mem: {:p}", this, this->m_raw);
dbgln_if(SHARED_QUEUE_DEBUG, "destructed SSPCQ at {:p}, shared mem: {:p}", this, m_queue);
}

private:
RefCountedSharedMemorySPCQ(SharedMemorySPCQ* queue, int fd)
: m_queue(queue)
, m_raw(reinterpret_cast<void*>(queue))
, m_fd(fd)
RefCountedSharedMemorySPCQ(AnonymousBuffer anon_buf, SharedMemorySPCQ* shared_queue)
: AnonymousBuffer(anon_buf)
, m_queue(shared_queue)
{
}
};

static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create_internal(int fd, bool is_new)
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create_internal(AnonymousBuffer anon_buf, SharedMemorySPCQ* shared_queue)
{
auto name = ByteString::formatted("SharedSingleProducerCircularQueue@{:x}", fd);
auto* raw_mapping = TRY(System::mmap(nullptr, sizeof(SharedMemorySPCQ), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0, 0, name));
dbgln_if(SHARED_QUEUE_DEBUG, "successfully mmapped {} at {:p}", name, raw_mapping);

SharedMemorySPCQ* shared_queue = is_new ? new (raw_mapping) SharedMemorySPCQ() : reinterpret_cast<SharedMemorySPCQ*>(raw_mapping);

if (!shared_queue)
return Error::from_string_literal("Unexpected error when creating shared queue from raw memory");

return SharedSingleProducerCircularQueue<T, Size> { move(name), adopt_ref(*new (nothrow) RefCountedSharedMemorySPCQ(shared_queue, fd)) };
auto name = ByteString::formatted("SharedSingleProducerCircularQueue@{:x}", anon_buf.fd());
dbgln_if(SHARED_QUEUE_DEBUG, "successfully mmapped {} at {:p}", name, shared_queue);
auto ref_counted = new (nothrow) RefCountedSharedMemorySPCQ(anon_buf, shared_queue);
return SharedSingleProducerCircularQueue<T, Size> { move(name), adopt_ref(*ref_counted) };
}

SharedSingleProducerCircularQueue(ByteString name, RefPtr<RefCountedSharedMemorySPCQ> queue)
Expand Down

0 comments on commit a386195

Please sign in to comment.