Skip to content

Commit

Permalink
fix: fix timer not working bug
Browse files Browse the repository at this point in the history
  • Loading branch information
g-tejas committed Jul 13, 2024
1 parent 89e1060 commit fbe945a
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 91 deletions.
2 changes: 1 addition & 1 deletion include/backends/darwin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class KqueueReactor : public Reactor {
//! whichever comes first. Look at TigerBeetle's implementation
auto run_for_ns(uint64_t ns) const -> bool;

void set_timer(int timer_period);
void set_timer(int id, int timer_period, Thread *thread);

private:
//! Manages the socket operations. Basically a switch statement
Expand Down
5 changes: 1 addition & 4 deletions include/thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Thread {
virtual void run() = 0;

//! Starts executing the `run()` method
void start(size_t stack_size);
void start();

//! Subscribe to a particular file descriptor
auto subscribe(int fd) -> bool;
Expand All @@ -77,9 +77,6 @@ class Thread {

/// Used for managing the stack of the thread
stack_context m_stack;

/// Size of the requested stack
size_t m_stack_size;
};

} /* namespace flux */
31 changes: 16 additions & 15 deletions src/backends/darwin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,46 @@ bool KqueueReactor::work() {
if (n < 0 || errno == EINTR)
return false;

for (int i = 0; i < events.max_size(); ++i) {
if (events[i].filter & EV_ERROR) {
for (int i = 0; i < n; ++i) {
if (events[i].flags & EV_ERROR) {
printf("Error event\n");
// this->notify(Event{.type = Event::Type::SocketError,
// .fd = static_cast<int>(events[i].ident)});
}
if (events[i].flags & EVFILT_READ) {
if (events[i].filter == EVFILT_READ) {
printf("Read event\n");
this->notify(Event{.type = Event::Type::SocketRead,
.fd = static_cast<int>(events[i].ident)});
} else if (events[i].flags & EVFILT_WRITE) {
} else if (events[i].filter == EVFILT_WRITE) {
printf("Write event\n");
this->notify(Event{.type = Event::Type::SocketWriteable,
.fd = static_cast<int>(events[i].ident)});
} else if (events[i].flags & EVFILT_TIMER) {
} else if (events[i].filter == EVFILT_TIMER) {
printf("Timer event\n");
this->notify(Event{.type = Event::Type::Timer, .fd = -1});
this->notify(
Event{.type = Event::Type::Timer, .fd = static_cast<int>(events[i].ident)});
}
}
return true;
}

auto KqueueReactor::run_for_ns(uint64_t ns) const -> bool { return true; }

void KqueueReactor::set_timer(int timer_period) {
static int timer_id = 100;
struct kevent ev;

EV_SET(&ev, timer_id++, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, timer_period, nullptr);
void KqueueReactor::set_timer(int id, int timer_period, Thread *thread) {
FLUX_ASSERT(m_kqueue_fd >= 0, "Kqueue file descriptor is invalid");
// Have to manually subscribe the worker to the timer
m_subs_by_fd[id].push_back(thread);
m_subcount++;
thread->subscribe(id);

struct kevent ev {};
EV_SET(&ev, id, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, timer_period, nullptr);
int n = kevent(m_kqueue_fd, &ev, 1, nullptr, 0, nullptr);
FLUX_ASSERT(n >= 0, "Failed to set timer");
}

void KqueueReactor::handle_sock_op(int fd, Operation op) {
#ifndef NDEBUG
// #ifndef NDEBUG
FLUX_ASSERT(m_kqueue_fd >= 0, "Kqueue file descriptor is invalid");
#endif
// #endif

if (m_nchanges == m_changes.size()) [[unlikely]] {
// `changelist` is full, need to flush the changes to the kernel.
Expand Down
65 changes: 8 additions & 57 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
#include "utils.hpp"
#include <cstdio>
#include <fcntl.h>
#include <iostream>

#include "backends/darwin.hpp"
#include "thread.hpp"

using namespace std;

// we check if macos
#ifdef __APPLE__
#define SOCK_NONBLOCK O_NONBLOCK
Expand All @@ -23,51 +26,6 @@ void thread_create(void *(*fn)(void *), void *arg,
uint64_t stack_size = DEFAULT_STACK_SIZE);
} /* namespace flux */

// struct Completion {
// Completion *next;
// // void (*callback)(IO *, Completion *);
// };
//
// class IO {
// private:
// Fifo<Completion> timeouts;
// Fifo<Completion> completed;
// Fifo<Completion> io_pending;
//
// public:
// IO(size_t entries, int flags) {}
//
// ~IO() = default;
//
// public:
// // Pass all queued submissions to the kernel and peek for completions
// void tick() {}
//
// // Pass all queued submissions to the kernel and run for `nanoseconds`
// void run_for_ns(uint64_t nanoseconds) {}
//
// void flush(bool wait_for_completions) {}
//
// public:
// void accept() {}
//
// void close() {}
//
// void connect() {}
//
// void read() {}
//
// void recv() {}
//
// void send() {}
//
// void write() {}
// void timeout() {}
//
// private:
// void submit() {}
// };

struct Worker : flux::Thread {
explicit Worker(size_t _stack_size) : Thread(_stack_size) {}
void run() override {
Expand All @@ -81,28 +39,21 @@ struct Worker : flux::Thread {
}
}
};

int main() {
// Create a new context and stack to execute foo. Pass the stack pointer to the end,
// stack grows downwards for most architecture, from highest mem address -> lowest
Worker worker(DEFAULT_STACK_SIZE); // need to heap allocate
worker.start(100);
worker.start();
flux::Event fake{flux::Event::Type::NA, 0};

flux::KqueueReactor reactor;
reactor.subscribe(100, &worker);
reactor.set_timer(5000);
reactor.set_timer(1010, 1500, &worker);
// reactor.subscribe(1010, &worker);

while (reactor.active()) {
printf("Working\n");
reactor.work();
}

flux::Event fake{flux::Event::Type::NA, 0};

while (worker.resume(&fake)) {
}

printf("Done\n");
return 0;

// CHECK_EX(false, "This is a test");
Expand All @@ -127,4 +78,4 @@ int main() {
// struct sockaddr_in addr = {};
// addr.sin_family = AF_INET;
// addr.sin_port = ntohs(64000);
}
}
7 changes: 0 additions & 7 deletions src/reactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,17 @@ void Reactor::remove_thread(Thread *thread) {
auto Reactor::active() const -> bool { return m_subcount > 0; }

void Reactor::notify(Event event) {
printf("Reactor::notify(%d)\n", event.fd);
std::vector<Thread *> cleanup;
printf("here\n");
for (const auto thread : m_subs_by_fd[event.fd]) {
printf("here2\n");
if (!thread->resume(&event)) {
cleanup.push_back(thread);
}
}

printf("finished resuming\n");

for (const auto thread : cleanup) {
this->remove_thread(thread);
}

printf("finished cleaning up\n");

if (event.type == Event::Type::SocketHangup || event.type == Event::Type::SocketError) {
this->remove_socket(event.fd);
}
Expand Down
14 changes: 7 additions & 7 deletions src/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

using namespace flux;

Thread::Thread(size_t _stack_size) : m_stack_size(_stack_size) {
protected_fixedsize_stack stack_allocator(m_stack_size);
Thread::Thread(size_t _stack_size) {
StackAllocator stack_allocator(_stack_size);
m_stack = stack_allocator.allocate();
}

Thread::~Thread() {
protected_fixedsize_stack stack_allocator(m_stack.size);
StackAllocator stack_allocator(m_stack.size);
stack_allocator.deallocate(m_stack);
}

Expand All @@ -18,25 +18,25 @@ auto Thread::wait() -> Event * {
}

auto Thread::resume(Event *event) -> bool {
m_return_context = jump_fcontext(m_thread_context, event);
printf("in resume\n");
m_return_context = jump_fcontext(m_return_context.fctx, (void *)event);
return m_return_context.data != THREAD_STATUS_COMPLETE;
}

void Thread::start(size_t stack_size) {
void Thread::start() {
// We enter the coroutine from a static routine because method signature of a
// member function might be iffy, and we need to pass the `this` pointer.
m_thread_context = make_fcontext(m_stack.sp, m_stack.size, Thread::enter);

// Transfers control to this thread. The reason we pass `this` is that we want to
// set the m_return_context to the reactor's context.
jump_fcontext(m_thread_context, (void *)this);
m_return_context = jump_fcontext(m_thread_context, (void *)this);
}

void Thread::enter(ReturnContext ctx) {
auto *thread = reinterpret_cast<Thread *>(ctx.data);

thread->m_return_context = ctx;
printf("Thread::enter\n");

thread->run();

Expand Down

0 comments on commit fbe945a

Please sign in to comment.