Skip to content

Commit

Permalink
feat: reorganise and add darwin backend
Browse files Browse the repository at this point in the history
  • Loading branch information
g-tejas committed Jul 2, 2024
1 parent 8ecdbbc commit 891b1aa
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 115 deletions.
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ add_subdirectory(external)

set(SOURCES
src/main.cpp
src/backends/kqueue.cpp)
src/thread.cpp
src/reactor.cpp
src/backends/darwin.cpp)

if (BUILD_BINARY)
message(STATUS "Building binary")
Expand Down
9 changes: 9 additions & 0 deletions include/backends/darwin.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include "reactor.hpp"

namespace flux {
class KqueueReactor : public Reactor {
public:
};
} /* namespace flux */
8 changes: 0 additions & 8 deletions include/io.hpp

This file was deleted.

26 changes: 26 additions & 0 deletions include/reactor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

using Thread = int;

namespace flux {
//! Base class for all Reactor backends [`epoll`, `kqueue`, `io_uring`]
class Reactor {
public:
Reactor() = default;
virtual ~Reactor();

//! Subscribe this thread to this file descriptor
void subscribe(int fd, Thread thread);

//! Remove all subscribers to a particular file descriptor
void unsubscribe(int fd);

//! Remove all subscribers to this thread
void unsubscribe(Thread *thread);

//! Returns true if any subscriptions are active
auto active() -> bool const;

private:
};
} /* namespace flux */
75 changes: 75 additions & 0 deletions include/thread.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#pragma once

#include <boost/context/detail/fcontext.hpp>
#include <boost/context/protected_fixedsize_stack.hpp>
#include <boost/context/stack_context.hpp>

#include <cstddef>

#define THREAD_STATUS_COMPLETE 0
#define THREAD_STATUS_ERROR 1

namespace flux {
using namespace boost::context::detail;
using namespace boost::context;
using StackAllocator = protected_fixedsize_stack;
using ThreadContext = fcontext_t;
using ReturnContext = transfer_t;

struct Event {
enum Type : uint8_t {
NA = 0,
SocketRead = 1,
SocketWriteable = 2, // this is not currently implemented or handled
SocketError = 3,
SocketHangup = 4
};
Type type;
int fd;
};

/// A stack-ful coroutine class. This is a base class and user needs to implement it
/// and implement the virtual `run()` method.
// TODO: Implement CRTP
// TODO: Implement boost intrusive pointer
// TODO: Implement pthread style creation instead of having to define an object
class Thread {
public:
explicit Thread(size_t _stack_size);

~Thread();

//! Called from within thread's context
//! Passes control back to the caller (e.g Reactor), and this thread will be resumed
//! when the events are ready.
auto wait() -> Event *;

//! Resumes the thread with the given event. Returns true if resumable
auto resume(Event *event) -> bool;

//! Where you place your business logic
virtual void run() = 0;

//! Allocates `stack_size` for thread stack and starts executing the `run()` method
void start(size_t stack_size);

private:
//! Entry point from the coroutine context, has the function type void* (*)(void*)
static void enter(ReturnContext ctx);

private:
/// Represents the thread's state. Contains hardware context, stack pointer, instruction
/// pointers, etc.
fcontext_t m_thread_context{};

/// Used for context switching. Flip flops between the reactor and the thread
ReturnContext m_return_context{};

/// Used for managing the stack of the thread
stack_context m_stack;

/// Size of the requested stack
size_t m_stack_size;
};

} /* namespace flux */
2 changes: 2 additions & 0 deletions include/utils.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include "quill/LogMacros.h"
#include <format>
#include <source_location>
Expand Down
1 change: 1 addition & 0 deletions src/backends/darwin.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "backends/darwin.hpp"
1 change: 0 additions & 1 deletion src/backends/kqueue.cpp

This file was deleted.

107 changes: 2 additions & 105 deletions src/main.cpp
Original file line number Diff line number Diff line change
@@ -1,122 +1,19 @@
#include "fifo.hpp"
#include "utils.hpp"
#include <boost/context/detail/fcontext.hpp>
#include <boost/context/protected_fixedsize_stack.hpp>
#include <boost/context/stack_context.hpp>
#include <cassert>
#include <cstdio>
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/event.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <thread>
#include <unistd.h>

#include "thread.hpp"

// we check if macos
#ifdef __APPLE__
#define SOCK_NONBLOCK O_NONBLOCK
#endif

#define THREAD_STATUS_COMPLETE 0
#define THREAD_STATUS_ERROR 1

#define DEFAULT_STACK_SIZE 4096

namespace flux {
using namespace boost::context::detail;
using namespace boost::context;
using StackAllocator = protected_fixedsize_stack;
using ThreadContext = fcontext_t;
using ReturnContext = transfer_t;

struct Event {
enum Type : uint8_t {
NA = 0,
SocketRead = 1,
SocketWriteable = 2, // this is not currently implemented or handled
SocketError = 3,
SocketHangup = 4
};
Type type;
int fd;
};

/// A stack-ful coroutine class. This is a base class and user needs to implement it
/// and implement the virtual `run()` method.
// TODO: Implement CRTP
// TODO: Implement boost intrusive pointer
// TODO: Implement pthread style creation instead of having to define an object
class Thread {
public:
explicit Thread(size_t _stack_size) : m_stack_size(_stack_size) {
protected_fixedsize_stack stack_allocator(m_stack_size);
m_stack = stack_allocator.allocate();
}
~Thread() {
protected_fixedsize_stack stack_allocator(m_stack.size);
stack_allocator.deallocate(m_stack);
}

/// Called from within thread's context
/// Passes control back to the caller (e.g Reactor), and this thread will be resumed
/// when the events are ready.
auto wait() -> Event * {
m_return_context = jump_fcontext(m_return_context.fctx, this);
return reinterpret_cast<Event *>(m_return_context.data);
}

/// Resumes the thread with the given event. Returns true if resumable
auto resume(Event *event) -> bool {
m_return_context = jump_fcontext(m_thread_context, event);
return m_return_context.data != THREAD_STATUS_COMPLETE;
}

/// Where you place your business logic
virtual void run() = 0;

/// Allocates `stack_size` for thread stack and starts executing the `run()` method
void start(size_t stack_size) {
// We enter the coroutine from a static routine because method signature of a
// member function might be iffy, and we need to pass the `this` pointer.
m_thread_context = make_fcontext(m_stack.sp, m_stack.size, Thread::enter);

// Transfers control to this thread. The reason we pass `this` is that we want to
// set the m_return_context to the reactor's context.
jump_fcontext(m_thread_context, (void *)this);
}

private:
/// Entry point from the coroutine context, has the function type void* (*)(void*)
static void enter(ReturnContext ctx) {
auto *thread = reinterpret_cast<Thread *>(ctx.data);

thread->m_return_context = ctx;

thread->run();

while (true) {
// Transfer control back to the caller and pass zero to indicate that we are done
thread->m_return_context = jump_fcontext(thread->m_return_context.fctx, 0);
}
}

private:
/// Represents the thread's state. Contains hardware context, stack pointer, instruction
/// pointers, etc.
fcontext_t m_thread_context{};

/// Used for context switching. Flip flops between the reactor and the thread
ReturnContext m_return_context{};

/// Used for managing the stack of the thread
stack_context m_stack;

/// Size of the requested stack
size_t m_stack_size;
};
// I want the API to look like this somehow. Not a class that i have to inherit
// and then write the logic in the run code. Need to see how can modify this to run a
// function instead.
Expand Down
16 changes: 16 additions & 0 deletions src/reactor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include "reactor.hpp"

using namespace flux;

Reactor::~Reactor() {}

//! Subscribe this thread to this file descriptor
void Reactor::subscribe(int fd, Thread thread) {}

//! Remove all subscribers to a particular file descriptor
void Reactor::unsubscribe(int fd) {}

//! Remove all subscribers to this thread
void Reactor::unsubscribe(Thread *thread) {}

auto Reactor::active() -> bool const {}
46 changes: 46 additions & 0 deletions src/thread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include "thread.hpp"

using namespace flux;

Thread::Thread(size_t _stack_size) : m_stack_size(_stack_size) {
protected_fixedsize_stack stack_allocator(m_stack_size);
m_stack = stack_allocator.allocate();
}

Thread::~Thread() {
protected_fixedsize_stack stack_allocator(m_stack.size);
stack_allocator.deallocate(m_stack);
}

auto Thread::wait() -> Event * {
m_return_context = jump_fcontext(m_return_context.fctx, this);
return reinterpret_cast<Event *>(m_return_context.data);
}

auto Thread::resume(Event *event) -> bool {
m_return_context = jump_fcontext(m_thread_context, event);
return m_return_context.data != THREAD_STATUS_COMPLETE;
}

void Thread::start(size_t stack_size) {
// We enter the coroutine from a static routine because method signature of a
// member function might be iffy, and we need to pass the `this` pointer.
m_thread_context = make_fcontext(m_stack.sp, m_stack.size, Thread::enter);

// Transfers control to this thread. The reason we pass `this` is that we want to
// set the m_return_context to the reactor's context.
jump_fcontext(m_thread_context, (void *)this);
}

void Thread::enter(ReturnContext ctx) {
auto *thread = reinterpret_cast<Thread *>(ctx.data);

thread->m_return_context = ctx;

thread->run();

while (true) {
// Transfer control back to the caller and pass zero to indicate that we are done
thread->m_return_context = jump_fcontext(thread->m_return_context.fctx, 0);
}
}

0 comments on commit 891b1aa

Please sign in to comment.