Skip to content

Commit

Permalink
refactor: refactor lib to be header-only
Browse files Browse the repository at this point in the history
  • Loading branch information
g-tejas committed Jul 22, 2024
1 parent ac4eb9d commit 8c60775
Show file tree
Hide file tree
Showing 26 changed files with 803 additions and 900 deletions.
5 changes: 3 additions & 2 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ AccessModifierOffset: -2
AllowShortBlocksOnASingleLine: Never
AllowShortCaseLabelsOnASingleLine: true
AlwaysBreakTemplateDeclarations: Yes
SortIncludes: true
ColumnLimit: 90
IndentWidth: 2
TabWidth: 2
IndentWidth: 4
TabWidth: 4
UseTab: Never
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.29)
project(flux)
project(loom)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
Expand All @@ -8,7 +8,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
option(BUILD_TESTS "Build tests" ON)
option(BUILD_EXAMPLES "Build examples" ON)

add_subdirectory(libflux)
add_subdirectory(lib)

if (BUILD_EXAMPLES)
message(STATUS "FLUX: Building examples")
Expand Down
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# eventloop
# loom

This abstraction serves the main purpose: To not think about IO readiness events, but just IO completion.
What do I mean by this? The `io_uring` and `epoll` interface are quite different. `io_uring` takes the SQE,
Expand All @@ -19,9 +19,10 @@ and the underlying implementation of the asynchronous interface.

## Todo

Need to decide whether we want a callback interface, or what.
Normally, the callbacks are function pointers casted to integers, and stored in the user_data field of the SQE.
Both kqueue and epoll have this. Once the IO is complete, the callback is invoked.
- Rename to `loom` ? More appropriate than `flux` since this is a fiber library.
Need to decide whether we want a callback interface, or what.
Normally, the callbacks are function pointers casted to integers, and stored in the user_data field of the SQE.
Both kqueue and epoll have this. Once the IO is complete, the callback is invoked.

https://webflow.com/made-in-webflow/website/Apple-Style-Grid Can make this as the front page LOL

Expand All @@ -35,6 +36,12 @@ https://webflow.com/made-in-webflow/website/Apple-Style-Grid Can make this as th

- [Tiger Style](https://github.com/tigerbeetle/tigerbeetle/blob/main/docs/TIGER_STYLE.md)
- [⭐Fibers](https://graphitemaster.github.io/fibers/)
- [Old article about how to write high performance servers](https://web.archive.org/web/20060306033511/https://pl.atyp.us/content/tech/servers.html)
Talks about the four horsemen of poor performance. Read if necessary
- Context switches
- Data copies
- Lock contention
- Memoy allocation

*`kqueue`*: https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man2/kqueue.2.html

Expand Down
29 changes: 29 additions & 0 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,35 @@ be a state machine that is called every time a message is received. This functio

The coroutine runs until it yields cause it waits for an event

## Are stackful coroutines better than stack-less?

C++ Coroutines are stackless, and many other popular coroutine libraries, for good reason. Stackful coroutines come with
their own set of unique problems,

1. Pre-allocated stack size for every fiber being run.
In stackless coroutines, the stack that the coroutine uses is the stack of its executor. And the only data being
stored is local variables that span at least one suspension point. This means that if a function call stack requires
500Kb of stack space, then every fiber stack needs to accommodate for call stacks of this size unlike stackless
coroutines.
2. Thread local storage fuckery. Doesn't happen in stackless coroutines since the suspension points are known at compile
time and thread local access is well-defined. For stack-ful coroutines, however, can run into memory errors. For e.g,
suppose a function was inlined by the optimiser and writes to a cached TLS address and then gets migrated to another
OS thread, then it can corrupt the memory of the ex-thread.

## Baton passing

Like Folly, we can implement all of the fiber features on top of one synchronisation primitive, Batons.

```cpp
loom::baton b;
b.wait();
b.post();
```

The reactor will hold one side of the baton and the fiber itself holds the other. This way, fibers don't have to be used
with a reactor at all. To pass the data, copying is ok. Since we are only passing event notifications which are a couple
of bytes only.

## Cooperative multi tasking

- The whole idea behind fibers is that instead of having time quantums, we trust that the tasks will yield within a
Expand Down
20 changes: 20 additions & 0 deletions docs/NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Reactor Backends

[Super comprehensive blog](https://habr.com/en/articles/600123/) post on several event notification
backends: `kqueue`,`epoll`, `IOCP`

## `kqueue` (Darwin)

`KV_CLEAR`: Prevents kq from signalling the same event over and over again. If a socket wasn't read fully,
Expand All @@ -26,3 +29,20 @@ struct kevent {
## `epoll` (Older Linux distributions)
# Boost.Context
[Docs](https://live.boost.org/doc/libs/1_53_0/libs/context/doc/html/context/context.html#context.context.executing_a_context)
- UB if a context function throws an exception. Wrap with `try-catch`
- Calling `jump_fcontext` to the same context you are within results in undefined behaviour.
- The size of the stack must be larger than `fcontext_t`
There are POSIX apis for `makecontext`/`swapcontext`/`getcontext`. Perhaps we can wrap the Boost.Context fns around
these. So that once time permits, we
can [implement context switches with hand written asm](https://graphitemaster.github.io/fibers/). Boost.Context was
written to be general and cross platform, so we could probably strip out a lot of the stuff to optimise.
Use cases of Boost.Context in the wild
- [Facebook's Folly](https://github.com/facebook/folly/blob/main/folly/fibers/Fiber.h) Actually really good, look at it
for fiber design. The entire fiber library is built around their `baton::post` and `baton::wait`
7 changes: 6 additions & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
add_compile_definitions(USE_KQUEUE)

add_executable(fibers fibers.cpp)
target_link_libraries(fibers PRIVATE libflux)
target_link_libraries(fibers PRIVATE loom)

add_executable(monitor monitor.cpp)
target_link_libraries(monitor PRIVATE loom)
66 changes: 17 additions & 49 deletions examples/fibers.cpp
Original file line number Diff line number Diff line change
@@ -1,59 +1,27 @@
#include <cstdio>
#include <fcntl.h>
#include <iostream>

#include "tracy/Tracy.hpp"
#include "tracy/TracyC.h"

#include "flux/backends/darwin.hpp"
#include "flux/thread.hpp"
#include "loom/fiber.hpp"

using namespace std;

#ifdef __APPLE__
#define SOCK_NONBLOCK O_NONBLOCK
#endif

#define DEFAULT_STACK_SIZE 4096

struct Worker : flux::Thread {
explicit Worker(size_t _stack_size) : Thread(_stack_size) {}
void run() override {
int count = 0;
int thing = 0;
while (count < 100) {
count++;
thing = (thing * 7 + count) / 8;
printf("count: %d\n", count);
this->wait();
FrameMark;
struct Worker : loom::Fiber {
explicit Worker(size_t stack_size) : loom::Fiber(stack_size) {}
void run() {
int thing = 0;
for (int i = 0; i < 100; ++i) {
thing = (thing * 7 + i) / 8;
printf("count: %d\n", i);
this->wait();
}
}
}
};

void *print(void *args) {
int i = 0;
while (i++) {
std::cout << i << ": hello world\n";
}
}

int main() {
// // Create a new context and stack to execute foo. Pass the stack pointer to the end,
// // stack grows downwards for most architecture, from highest mem address -> lowest
// Worker worker(DEFAULT_STACK_SIZE); // need to heap allocate
// worker.start();
// flux::Event fake{flux::Event::Type::NA, 0};
//
// flux::KqueueReactor reactor;
// reactor.set_timer(1010, 1500, &worker);
// // reactor.subscribe(1010, &worker);
// tracy::SetThreadName("hello");
//
// while (reactor.active()) {
// reactor.work();
// FrameMark;
// }
Worker worker(4096);
worker.start();
loom::Event fake{loom::Event::Type::NA, 0};

flux::fiber_t f1;
flux::fiber_init(f1, nullptr, print, nullptr);
while (worker.resume(&fake)) {
cout << "Resuming worker" << endl;
}
}
6 changes: 6 additions & 0 deletions examples/monitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#include <iostream>

int main() {
// Make this example open a file, and read for file changes and print it out!
// Maybe can do a diff also, and then grab the diffs, and print it out
}
65 changes: 65 additions & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
cmake_minimum_required(VERSION 3.9)
project(loom)
include(FetchContent)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

# Boost
find_package(Boost COMPONENTS context REQUIRED)
message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}")

# Tracy
option(TRACY_ENABLE "" ON)
option(TRACY_FIBERS "" ON) # pdf page 38

FetchContent_Declare(
tracy
GIT_REPOSITORY https://github.com/wolfpld/tracy.git
GIT_TAG master
GIT_SHALLOW TRUE
GIT_PROGRESS TRUE
)
FetchContent_MakeAvailable(tracy)

# Quill
FetchContent_Declare(
quill
GIT_REPOSITORY https://github.com/odygrd/quill.git
GIT_TAG master
GIT_SHALLOW TRUE
GIT_PROGRESS TRUE
)
FetchContent_MakeAvailable(quill)

# sockpp
FetchContent_Declare(
sockpp
GIT_REPOSITORY https://github.com/fpagliughi/sockpp
GIT_TAG master
GIT_SHALLOW TRUE
GIT_PROGRESS TRUE
)
FetchContent_MakeAvailable(sockpp)
message(STATUS "sockpp include dir: ${sockpp_SOURCE_DIR}/include")

option(USE_EPOLL "" OFF)
option(USE_KQUEUE "" ON)

if ((NOT USE_EPOLL AND NOT USE_KQUEUE) OR (USE_EPOLL AND USE_KQUEUE))
message(FATAL_ERROR "Please select one backend: epoll or kqueue")
endif ()


# add the library
add_library(${PROJECT_NAME} INTERFACE)
target_include_directories(${PROJECT_NAME} INTERFACE include ${Boost_INCLUDE_DIRS})
if (USE_EPOLL)
message(STATUS "LOOM: Using epoll backend")
target_compile_definitions(${PROJECT_NAME} INTERFACE LOOM_BACKEND_EPOLL)
elseif (USE_KQUEUE)
message(STATUS "LOOM: Using kqueue backend")
target_compile_definitions(${PROJECT_NAME} INTERFACE LOOM_BACKEND_KQUEUE)
endif ()

target_link_libraries(${PROJECT_NAME} INTERFACE ${Boost_LIBRARIES} Tracy::TracyClient quill::quill sockpp)
99 changes: 99 additions & 0 deletions lib/include/loom/backends/kqueue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once

#include "loom/loom.hpp"
#include "loom/utils.hpp"

namespace loom {

Loom::Loom() {
m_kqueue_fd = kqueue();
FLUX_ASSERT(m_kqueue_fd != -1, "Failed to create kqueue");
}

Loom::~Loom() {
if (m_kqueue_fd >= 0) {
close(m_kqueue_fd);
}
}

void Loom::handle_sock_op(int fd, loom::Operation op) {
#ifndef NDEBUG
FLUX_ASSERT(m_kqueue_fd >= 0, "Kqueue file descriptor is invalid");
#endif

if (m_changes.size() == 16) [[unlikely]] {
// `changelist` is full, need to flush the changes to the kernel.
int n =
kevent(m_kqueue_fd, m_changes.data(), m_changes.size(), nullptr, 0, nullptr);
FLUX_ASSERT(n >= 0, "Failed to flush changes to the kernel");
m_changes.clear();
}
// EV_ADD attaches descriptor to kq
// EV_CLEAR prevents unnecessary signalling
struct kevent ev1 {};
struct kevent ev2 {};
switch (op) {
case Operation::Added: {
// Add RW flags to Operation as well
// Add a new kevent to the vector
EV_SET(&ev1, fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, nullptr);
EV_SET(&ev2, fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);
m_changes.push_back(ev1);
m_changes.push_back(ev2);
break;
}
case Operation::Removed:
EV_SET(&ev1, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
EV_SET(&ev2, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
break;
case Operation::Modified: [[fallthrough]];
case Operation::NIL: break;
}
}

//! This is one epoch of the event loop. Checks all the fds monitored by the event loop
//! and dispatches any events accordingly.
bool Loom::epoch() {
// TODO: 16 can be adjusted based on busy-ness of the event loop. Can even be dynamic
std::array<struct kevent, 16> events{};

// changelist: List of events to register with the kernel
// nchanges: Number of events in the changelist
// `m_timeout` must be non-empty if we want to also monitor SPSC queues for example
// `m_timeout` == 0 degrades the performance to a regular poll event loop
int n = kevent(m_kqueue_fd, m_changes.data(), m_changes.size(), events.data(),
events.size(), m_timeout);

// `kevent` can be interrupted by signals, so we check `errno` global variable.
if (n < 0 || errno == EINTR)
return false;

for (int i = 0; i < n; ++i) {
if (events[i].flags & EV_ERROR) {
printf("Error event\n");
}
if (events[i].filter == EVFILT_READ) {
printf("Read event\n");
this->notify(Event{.type = Event::Type::SocketRead,
.fd = static_cast<int>(events[i].ident)});
} else if (events[i].filter == EVFILT_WRITE) {
printf("Write event\n");
this->notify(Event{.type = Event::Type::SocketWriteable,
.fd = static_cast<int>(events[i].ident)});
} else if (events[i].filter == EVFILT_TIMER) {
printf("Timer event\n");
this->notify(Event{.type = Event::Type::Timer,
.fd = static_cast<int>(events[i].ident)});
}
}
return true;
}

void Loom::set_timer(int id, int timer_period) {
FLUX_ASSERT(m_kqueue_fd >= 0, "Kqueue file descriptor is invalid");
struct kevent ev {};
EV_SET(&ev, id, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, timer_period, nullptr);
int n = kevent(m_kqueue_fd, &ev, 1, nullptr, 0, nullptr);
FLUX_ASSERT(n >= 0, "Failed to set timer");
}
} /* namespace loom */
Loading

0 comments on commit 8c60775

Please sign in to comment.