-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathEventLoop.cpp
128 lines (113 loc) · 3.14 KB
/
EventLoop.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*************************************************************************
> File Name: EventLoop.cpp
> Author: zhangfeng
> Mail: [email protected]
> Created Time: Mon 07 Oct 2019 03:18:39 PM CST
> Target:
************************************************************************/
#include "EventLoop.h"
#include "base/Logging.h"
#include "Util.h"
#include "sys/eventfd.h"
#include "sys/epoll.h"
#include<iostream>
using namespace std;
__thread EventLoop *t_loopInThisThread = 0;
int createEventfd() {
int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if(evtfd < 0) {
LOG << "eventfd failed";
abort();
}
return evtfd;
}
EventLoop::EventLoop() :
looping_(false),
poller_(new Epoll()),
wakeupFd_(createEventfd()),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
threadId_(CurrentThread::tid()),
pwakeupChannel_(new Channel(this, wakeupFd_)) {
if(t_loopInThisThread) {
LOG << "Has Exist other thread run in this thread";
} else {
t_loopInThisThread = this;
}
pwakeupChannel_->setEvents(EPOLLIN | EPOLLET);
pwakeupChannel_->setReadHandler(bind(&EventLoop::handleRead, this));
pwakeupChannel_->setConnHandler(bind(&EventLoop::handleConn, this));
poller_->epoll_add(pwakeupChannel_, 0);
}
void EventLoop::handleConn() {
updatePoller(pwakeupChannel_, 0);
}
EventLoop::~EventLoop() {
close(wakeupFd_);
t_loopInThisThread = nullptr;
}
void EventLoop::wakeup() {
uint64_t one = 1;
ssize_t n = writen(wakeupFd_, (char*)(&one), sizeof one);
if(n != sizeof one) {
LOG << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
void EventLoop::handleRead() {
uint64_t one = 1;
ssize_t n = readn(wakeupFd_, &one, sizeof one);
if(n != sizeof one) {
LOG << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
}
pwakeupChannel_->setEvents(EPOLLIN | EPOLLET);
}
void EventLoop::runInLoop(Functor &&cb) {
if(isInLoopThread())
cb();
else
queueInLoop(std::move(cb));
}
void EventLoop::queueInLoop(Functor&& cb) {
{
MutexLockGuard lock(mutex_);
pendingFunctors_.emplace_back(std::move(cb));
}
if(!isInLoopThread() || callingPendingFunctors_)
wakeup();
}
void EventLoop::loop() {
assert(!looping_);
assert(isInLoopThread());
looping_ = true;
quit_ = false;
std::vector<SP_Channel> ret;
while(!quit_) {
ret.clear();
ret = poller_->poll();
eventHandling_ = true;
for(auto &it : ret)
it->handleEvents();
eventHandling_ = false;
doPendingFunctors();
poller_->handleExpired();
}
looping_ = false;
}
void EventLoop::doPendingFunctors() {
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for(size_t i = 0; i < functors.size(); i++)
functors[i]();
callingPendingFunctors_ = false;
}
void EventLoop::quit() {
quit_ = true;
if(!isInLoopThread()) {
wakeup();
}
}