Skip to content

Commit 8e69c44

Browse files
committed
add eventloop implementations for call_* and call_when_*_completed functions
1 parent 68fa9dc commit 8e69c44

File tree

4 files changed

+194
-1
lines changed

4 files changed

+194
-1
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
bin.SCons
22
*.pyc
33
*~
4-
\#*\#
4+
\#*\#

include/boost/python.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
# include <boost/python/docstring_options.hpp>
2626
# include <boost/python/enum.hpp>
2727
# include <boost/python/errors.hpp>
28+
# include <boost/python/eventloop.hpp>
2829
# include <boost/python/exception_translator.hpp>
2930
# include <boost/python/exec.hpp>
3031
# include <boost/python/extract.hpp>

include/boost/python/eventloop.hpp

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
// Copyright Pan Yue 2021.
2+
// Distributed under the Boost Software License, Version 1.0. (See
3+
// accompanying file LICENSE_1_0.txt or copy at
4+
// http://www.boost.org/LICENSE_1_0.txt)
5+
6+
// TODO:
7+
// 1. posix::stream_descriptor need windows version
8+
// 2. call_* need return async.Handle
9+
# ifndef EVENT_LOOP_PY2021_H_
10+
# define EVENT_LOOP_PY2021_H_
11+
12+
#include <mutex>
13+
#include <functional>
14+
#include <unordered_map>
15+
#include <boost/asio.hpp>
16+
#include <boost/bind.hpp>
17+
#include <boost/python.hpp>
18+
19+
namespace a = boost::asio;
20+
namespace c = std::chrono;
21+
namespace py = boost::python;
22+
23+
namespace boost { namespace python { namespace eventloop {
24+
25+
class EventLoop
26+
{
27+
private:
28+
int64_t _timer_id = 0;
29+
a::io_context::strand _strand;
30+
std::unordered_map<int, std::unique_ptr<a::steady_timer>> _id_to_timer_map;
31+
// read: key = fd * 2 + 0, write: key = fd * 2 + 1
32+
std::unordered_map<int, std::unique_ptr<a::posix::stream_descriptor>> _descriptor_map;
33+
std::chrono::steady_clock::time_point _created_time;
34+
35+
void _add_reader_or_writer(int fd, py::object f, int key)
36+
{
37+
// add descriptor
38+
if (_descriptor_map.find(key) == _descriptor_map.end())
39+
{
40+
_descriptor_map.emplace(key,
41+
std::move(std::make_unique<a::posix::stream_descriptor>(_strand.context(), fd))
42+
);
43+
}
44+
45+
_descriptor_map.find(key)->second->async_wait(a::posix::descriptor::wait_type::wait_read,
46+
a::bind_executor(_strand, [key, f, loop=this] (const boost::system::error_code& ec)
47+
{
48+
// move descriptor
49+
auto iter = loop->_descriptor_map.find(key);
50+
if (iter != loop->_descriptor_map.end())
51+
{
52+
iter->second->release();
53+
loop->_descriptor_map.erase(iter);
54+
}
55+
loop->call_soon(f);
56+
}));
57+
return;
58+
}
59+
60+
void _remove_reader_or_writer(int key)
61+
{
62+
auto iter = _descriptor_map.find(key);
63+
if (iter != _descriptor_map.end())
64+
{
65+
iter->second->release();
66+
_descriptor_map.erase(iter);
67+
}
68+
}
69+
70+
public:
71+
EventLoop(a::io_context& ctx):
72+
_strand{ctx}, _created_time{std::chrono::steady_clock::now()}
73+
{
74+
}
75+
76+
// TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback.
77+
inline void call_soon(py::object f)
78+
{
79+
_strand.post([f, loop=this] {
80+
f(boost::ref(*loop));
81+
});
82+
return;
83+
}
84+
85+
// TODO: implement this
86+
void call_soon_thread_safe(py::object f)
87+
{
88+
return;
89+
}
90+
91+
// Schedule callback to be called after the given delay number of seconds
92+
// TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback.
93+
void call_later(double delay, py::object f)
94+
{
95+
// add timer
96+
_id_to_timer_map.emplace(_timer_id,
97+
std::move(std::make_unique<a::steady_timer>(_strand.context(),
98+
std::chrono::steady_clock::now() + std::chrono::nanoseconds(int64_t(delay * 1e9))))
99+
);
100+
101+
_id_to_timer_map.find(_timer_id)->second->async_wait(
102+
// remove timer
103+
a::bind_executor(_strand, [id=_timer_id, f, loop=this] (const boost::system::error_code& ec)
104+
{
105+
loop->_id_to_timer_map.erase(id);
106+
loop->call_soon(f);
107+
}));
108+
_timer_id++;
109+
}
110+
111+
void call_at(double when, py::object f)
112+
{
113+
double diff = when - time();
114+
if (diff > 0)
115+
return call_later(diff, f);
116+
return call_soon(f);
117+
}
118+
119+
double time()
120+
{
121+
auto now = std::chrono::steady_clock::now();
122+
std::chrono::duration<double> diff = now - _created_time;
123+
return diff.count();
124+
}
125+
126+
// week 2 ......start......
127+
128+
void add_reader(int fd, py::object f)
129+
{
130+
_add_reader_or_writer(fd, f, fd * 2);
131+
}
132+
133+
void remove_reader(int fd)
134+
{
135+
_remove_reader_or_writer(fd * 2);
136+
}
137+
138+
void add_writer(int fd, py::object f)
139+
{
140+
_add_reader_or_writer(fd, f, fd * 2 + 1);
141+
}
142+
143+
void remove_writer(int fd)
144+
{
145+
_remove_reader_or_writer(fd * 2 + 1);
146+
}
147+
148+
149+
void sock_recv()
150+
{
151+
152+
}
153+
154+
void sock_recv_into()
155+
{
156+
157+
}
158+
159+
void sock_sendall()
160+
{
161+
162+
}
163+
164+
void sock_connect()
165+
{
166+
167+
}
168+
169+
void sock_accept()
170+
{
171+
172+
}
173+
174+
void sock_sendfile()
175+
{
176+
177+
}
178+
179+
// week 2 ......end......
180+
181+
void run()
182+
{
183+
_strand.context().run();
184+
}
185+
};
186+
187+
188+
}}}
189+
190+
191+
# endif

test/Jamfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ bpl-test crossmod_exception
8484
: crossmod_exception.py crossmod_exception_a.cpp crossmod_exception_b.cpp
8585
]
8686

87+
[ bpl-test eventloop ]
8788
[ bpl-test injected ]
8889
[ bpl-test properties ]
8990
[ bpl-test return_arg ]

0 commit comments

Comments
 (0)