diff --git a/include/backends/darwin.hpp b/include/backends/darwin.hpp index 5eebdde..4401d1e 100644 --- a/include/backends/darwin.hpp +++ b/include/backends/darwin.hpp @@ -22,7 +22,7 @@ class KqueueReactor : public Reactor { //! whichever comes first. Look at TigerBeetle's implementation auto run_for_ns(uint64_t ns) const -> bool; - void set_timer(int timer_period); + void set_timer(int id, int timer_period, Thread *thread); private: //! Manages the socket operations. Basically a switch statement diff --git a/include/thread.hpp b/include/thread.hpp index 541fae8..21ab6f8 100644 --- a/include/thread.hpp +++ b/include/thread.hpp @@ -53,7 +53,7 @@ class Thread { virtual void run() = 0; //! Starts executing the `run()` method - void start(size_t stack_size); + void start(); //! Subscribe to a particular file descriptor auto subscribe(int fd) -> bool; @@ -77,9 +77,6 @@ class Thread { /// Used for managing the stack of the thread stack_context m_stack; - - /// Size of the requested stack - size_t m_stack_size; }; } /* namespace flux */ diff --git a/src/backends/darwin.cpp b/src/backends/darwin.cpp index 3e24bf3..b27ca66 100644 --- a/src/backends/darwin.cpp +++ b/src/backends/darwin.cpp @@ -24,23 +24,22 @@ bool KqueueReactor::work() { if (n < 0 || errno == EINTR) return false; - for (int i = 0; i < events.max_size(); ++i) { - if (events[i].filter & EV_ERROR) { + for (int i = 0; i < n; ++i) { + if (events[i].flags & EV_ERROR) { printf("Error event\n"); - // this->notify(Event{.type = Event::Type::SocketError, - // .fd = static_cast(events[i].ident)}); } - if (events[i].flags & EVFILT_READ) { + if (events[i].filter == EVFILT_READ) { printf("Read event\n"); this->notify(Event{.type = Event::Type::SocketRead, .fd = static_cast(events[i].ident)}); - } else if (events[i].flags & EVFILT_WRITE) { + } else if (events[i].filter == EVFILT_WRITE) { printf("Write event\n"); this->notify(Event{.type = Event::Type::SocketWriteable, .fd = static_cast(events[i].ident)}); - } else if (events[i].flags & EVFILT_TIMER) { + } else if (events[i].filter == EVFILT_TIMER) { printf("Timer event\n"); - this->notify(Event{.type = Event::Type::Timer, .fd = -1}); + this->notify( + Event{.type = Event::Type::Timer, .fd = static_cast(events[i].ident)}); } } return true; @@ -48,21 +47,23 @@ bool KqueueReactor::work() { auto KqueueReactor::run_for_ns(uint64_t ns) const -> bool { return true; } -void KqueueReactor::set_timer(int timer_period) { - static int timer_id = 100; - struct kevent ev; - - EV_SET(&ev, timer_id++, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, timer_period, nullptr); +void KqueueReactor::set_timer(int id, int timer_period, Thread *thread) { + FLUX_ASSERT(m_kqueue_fd >= 0, "Kqueue file descriptor is invalid"); + // Have to manually subscribe the worker to the timer + m_subs_by_fd[id].push_back(thread); m_subcount++; + thread->subscribe(id); + struct kevent ev {}; + EV_SET(&ev, id, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, timer_period, nullptr); int n = kevent(m_kqueue_fd, &ev, 1, nullptr, 0, nullptr); FLUX_ASSERT(n >= 0, "Failed to set timer"); } void KqueueReactor::handle_sock_op(int fd, Operation op) { -#ifndef NDEBUG + // #ifndef NDEBUG FLUX_ASSERT(m_kqueue_fd >= 0, "Kqueue file descriptor is invalid"); -#endif + // #endif if (m_nchanges == m_changes.size()) [[unlikely]] { // `changelist` is full, need to flush the changes to the kernel. diff --git a/src/main.cpp b/src/main.cpp index e382934..7f7bc7a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -2,10 +2,13 @@ #include "utils.hpp" #include #include +#include #include "backends/darwin.hpp" #include "thread.hpp" +using namespace std; + // we check if macos #ifdef __APPLE__ #define SOCK_NONBLOCK O_NONBLOCK @@ -23,51 +26,6 @@ void thread_create(void *(*fn)(void *), void *arg, uint64_t stack_size = DEFAULT_STACK_SIZE); } /* namespace flux */ -// struct Completion { -// Completion *next; -// // void (*callback)(IO *, Completion *); -// }; -// -// class IO { -// private: -// Fifo timeouts; -// Fifo completed; -// Fifo io_pending; -// -// public: -// IO(size_t entries, int flags) {} -// -// ~IO() = default; -// -// public: -// // Pass all queued submissions to the kernel and peek for completions -// void tick() {} -// -// // Pass all queued submissions to the kernel and run for `nanoseconds` -// void run_for_ns(uint64_t nanoseconds) {} -// -// void flush(bool wait_for_completions) {} -// -// public: -// void accept() {} -// -// void close() {} -// -// void connect() {} -// -// void read() {} -// -// void recv() {} -// -// void send() {} -// -// void write() {} -// void timeout() {} -// -// private: -// void submit() {} -// }; - struct Worker : flux::Thread { explicit Worker(size_t _stack_size) : Thread(_stack_size) {} void run() override { @@ -81,28 +39,21 @@ struct Worker : flux::Thread { } } }; - int main() { // Create a new context and stack to execute foo. Pass the stack pointer to the end, // stack grows downwards for most architecture, from highest mem address -> lowest Worker worker(DEFAULT_STACK_SIZE); // need to heap allocate - worker.start(100); + worker.start(); + flux::Event fake{flux::Event::Type::NA, 0}; flux::KqueueReactor reactor; - reactor.subscribe(100, &worker); - reactor.set_timer(5000); + reactor.set_timer(1010, 1500, &worker); + // reactor.subscribe(1010, &worker); while (reactor.active()) { - printf("Working\n"); reactor.work(); } - flux::Event fake{flux::Event::Type::NA, 0}; - - while (worker.resume(&fake)) { - } - - printf("Done\n"); return 0; // CHECK_EX(false, "This is a test"); @@ -127,4 +78,4 @@ int main() { // struct sockaddr_in addr = {}; // addr.sin_family = AF_INET; // addr.sin_port = ntohs(64000); -} \ No newline at end of file +} diff --git a/src/reactor.cpp b/src/reactor.cpp index 21c2b98..a15da12 100644 --- a/src/reactor.cpp +++ b/src/reactor.cpp @@ -47,24 +47,17 @@ void Reactor::remove_thread(Thread *thread) { auto Reactor::active() const -> bool { return m_subcount > 0; } void Reactor::notify(Event event) { - printf("Reactor::notify(%d)\n", event.fd); std::vector cleanup; - printf("here\n"); for (const auto thread : m_subs_by_fd[event.fd]) { - printf("here2\n"); if (!thread->resume(&event)) { cleanup.push_back(thread); } } - printf("finished resuming\n"); - for (const auto thread : cleanup) { this->remove_thread(thread); } - printf("finished cleaning up\n"); - if (event.type == Event::Type::SocketHangup || event.type == Event::Type::SocketError) { this->remove_socket(event.fd); } diff --git a/src/thread.cpp b/src/thread.cpp index 9e341eb..c3251dc 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -2,13 +2,13 @@ using namespace flux; -Thread::Thread(size_t _stack_size) : m_stack_size(_stack_size) { - protected_fixedsize_stack stack_allocator(m_stack_size); +Thread::Thread(size_t _stack_size) { + StackAllocator stack_allocator(_stack_size); m_stack = stack_allocator.allocate(); } Thread::~Thread() { - protected_fixedsize_stack stack_allocator(m_stack.size); + StackAllocator stack_allocator(m_stack.size); stack_allocator.deallocate(m_stack); } @@ -18,25 +18,25 @@ auto Thread::wait() -> Event * { } auto Thread::resume(Event *event) -> bool { - m_return_context = jump_fcontext(m_thread_context, event); + printf("in resume\n"); + m_return_context = jump_fcontext(m_return_context.fctx, (void *)event); return m_return_context.data != THREAD_STATUS_COMPLETE; } -void Thread::start(size_t stack_size) { +void Thread::start() { // We enter the coroutine from a static routine because method signature of a // member function might be iffy, and we need to pass the `this` pointer. m_thread_context = make_fcontext(m_stack.sp, m_stack.size, Thread::enter); // Transfers control to this thread. The reason we pass `this` is that we want to // set the m_return_context to the reactor's context. - jump_fcontext(m_thread_context, (void *)this); + m_return_context = jump_fcontext(m_thread_context, (void *)this); } void Thread::enter(ReturnContext ctx) { auto *thread = reinterpret_cast(ctx.data); thread->m_return_context = ctx; - printf("Thread::enter\n"); thread->run();