-
Notifications
You must be signed in to change notification settings - Fork 85
/
server.hpp
156 lines (135 loc) · 3.17 KB
/
server.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#pragma once
#include <thread>
#include <mutex>
#include "connection.hpp"
#include "io_service_pool.hpp"
#include "router.hpp"
using boost::asio::ip::tcp;
class server : private boost::noncopyable
{
public:
server(short port, size_t size, size_t timeout_milli = 0) : io_service_pool_(size), timeout_milli_(timeout_milli),
acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))
{
#ifdef PUB_SUB
register_handler("sub_timax", &server::sub, this);
#endif
router::get().set_callback(std::bind(&server::callback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
do_accept();
}
~server()
{
io_service_pool_.stop();
thd_->join();
}
void run()
{
thd_ = std::make_shared<std::thread>([this] {io_service_pool_.run(); });
}
template<typename Function>
void register_handler(std::string const & name, const Function& f)
{
router::get().register_handler(name, f);
}
template<typename Function, typename Self>
void register_handler(std::string const & name, const Function& f, Self* self)
{
router::get().register_handler(name, f, self);
}
void remove_handler(std::string const& name)
{
router::get().remove_handler(name);
}
private:
void do_accept()
{
conn_.reset(new connection(io_service_pool_.get_io_service(), timeout_milli_));
acceptor_.async_accept(conn_->socket(), [this](boost::system::error_code ec)
{
if (ec)
{
//todo log
}
else
{
conn_->start();
}
do_accept();
});
}
private:
std::string sub(const std::string& topic)
{
return topic;
}
void pub(const std::string& topic, const std::string& result)
{
decltype(conn_map_.equal_range(topic)) temp;
std::unique_lock<std::mutex> lock(mtx_);
auto range = conn_map_.equal_range(topic);
if (range.first == range.second)
return;
temp = range;
lock.unlock();
for (auto it = range.first; it != range.second;)
{
auto ptr = it->second.lock();
if (!ptr)
it = conn_map_.erase(it);
else
{
ptr->response(result.c_str());
++it;
}
}
lock.lock(); //clear invalid connection
for (auto it = conn_map_.cbegin(); it != conn_map_.end();)
{
auto ptr = it->second.lock();
if (!ptr)
{
it = conn_map_.erase(it);
}
else
{
++it;
}
}
}
//this callback from router, tell the server which connection sub the topic and the result of handler
void callback(const std::string& topic, const char* result, std::shared_ptr<connection> conn, bool has_error = false)
{
#ifdef PUB_SUB
if (!has_error)
{
//log
return;
}
if (topic == "sub_timax")
{
rapidjson::Document doc;
doc.Parse(result);
auto handler_name = doc["result"].GetString();
std::weak_ptr<connection> wp(conn);
conn_map_.emplace(handler_name, wp);
conn->response(result);
return;
}
if (!conn_map_.empty())
{
pub(topic, result);
conn->read_head();
}
#else
conn->response(result);
#endif
}
//insert
std::multimap<std::string, std::weak_ptr<connection>> conn_map_;
io_service_pool io_service_pool_;
tcp::acceptor acceptor_;
std::shared_ptr<connection> conn_;
std::shared_ptr<std::thread> thd_;
std::size_t timeout_milli_;
std::mutex mtx_;
};