1
+ // Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
2
+ //
3
+ // Licensed under the BSD 3-Clause License (the "License"); you may not use this file
4
+ // except in compliance with the License. You may obtain a copy of the License at
5
+ //
6
+ // https://opensource.org/licenses/BSD-3-Clause
7
+ //
8
+ // Unless required by applicable law or agreed to in writing, software distributed
9
+ // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
10
+ // CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
11
+ // language governing permissions and limitations under the License.
12
+ //
13
+
14
+ #include < arpa/inet.h>
15
+ #include < errno.h>
16
+ #include < netinet/in.h>
17
+ #include < signal.h>
18
+ #include < stdlib.h>
19
+ #include < string.h>
20
+ #include < sys/socket.h>
21
+ #include < unistd.h>
22
+
23
+ #include < chrono>
24
+ #include < future>
25
+ #include < iostream>
26
+ #include < memory>
27
+ #include < string>
28
+ #include < thread>
29
+
30
+ #include " polaris/consumer.h"
31
+
32
+ class ConsumerServer {
33
+ public:
34
+ ConsumerServer (const std::string& host, int port, const polaris::ServiceKey& provider_service);
35
+
36
+ ~ConsumerServer ();
37
+
38
+ int Start ();
39
+
40
+ void Stop ();
41
+
42
+ private:
43
+ std::string Proccess (const std::string& message);
44
+
45
+ int Send (const std::string& host, int port, const std::string& request, std::string& response);
46
+
47
+ private:
48
+ std::string host_;
49
+ int port_;
50
+ polaris::ServiceKey provider_service_;
51
+
52
+ std::atomic<bool > stop_;
53
+ std::unique_ptr<std::thread> accept_thread_;
54
+
55
+ std::unique_ptr<polaris::ConsumerApi> consumer_;
56
+ };
57
+
58
+ bool signal_received = false ;
59
+ void SignalHandler (int signum) {
60
+ std::cout << " Interrupt signal (" << signum << " ) received." << std::endl;
61
+ signal_received = true ;
62
+ }
63
+
64
+ int main (int argc, char ** argv) {
65
+ if (argc < 5 ) {
66
+ std::cout << " usage: " << argv[0 ] << " host port service_namespace service_name" << std::endl;
67
+ return -1 ;
68
+ }
69
+ // register signal handler
70
+ signal (SIGINT, SignalHandler);
71
+
72
+ polaris::ServiceKey service_key = {argv[3 ], argv[4 ]};
73
+ ConsumerServer server (argv[1 ], atoi (argv[2 ]), service_key);
74
+
75
+ // 启动服务
76
+ if (server.Start () != 0 ) {
77
+ return -2 ;
78
+ }
79
+
80
+ // 循环等待退出信号
81
+ while (!signal_received) {
82
+ sleep (1 );
83
+ }
84
+
85
+ // 反注册完成以后再停止服务
86
+ server.Stop ();
87
+
88
+ return 0 ;
89
+ }
90
+
91
+ ConsumerServer::ConsumerServer (const std::string& host, int port,
92
+ const polaris::ServiceKey& provider_service)
93
+ : host_(host), port_(port), provider_service_(provider_service), stop_(false ) {
94
+ consumer_ = std::unique_ptr<polaris::ConsumerApi>(polaris::ConsumerApi::CreateWithDefaultFile ());
95
+ }
96
+
97
+ ConsumerServer::~ConsumerServer () {}
98
+
99
+ int ConsumerServer::Start () {
100
+ auto sock_listener = socket (AF_INET, SOCK_STREAM, 0 );
101
+ if (sock_listener < 0 ) {
102
+ std::cerr << " create socket with error: " << errno << std::endl;
103
+ return -1 ;
104
+ }
105
+
106
+ // address info to bind socket
107
+ sockaddr_in server_addr;
108
+ server_addr.sin_family = AF_INET;
109
+ inet_pton (AF_INET, host_.c_str (), &server_addr.sin_addr );
110
+ server_addr.sin_port = htons (port_);
111
+
112
+ // bind socket
113
+ if (bind (sock_listener, (sockaddr*)&server_addr, sizeof (server_addr)) < 0 ) {
114
+ std::cerr << " bind to " << host_ << " :" << port_ << " failed with error: " << errno
115
+ << std::endl;
116
+ close (sock_listener);
117
+ return -2 ;
118
+ }
119
+
120
+ // start listening
121
+ if (listen (sock_listener, SOMAXCONN) < 0 ) {
122
+ std::cerr << " listen to " << host_ << " :" << port_ << " failed with error: " << errno
123
+ << std::endl;
124
+ close (sock_listener);
125
+ return -3 ;
126
+ }
127
+ std::cout << " listen to " << host_ << " :" << port_ << " success" << std::endl;
128
+
129
+ // create accept thread
130
+ accept_thread_ = std::unique_ptr<std::thread>(new std::thread ([=] {
131
+ while (!stop_) {
132
+ fd_set set;
133
+ FD_ZERO (&set);
134
+ FD_SET (sock_listener, &set);
135
+ struct timeval timeout;
136
+ timeout.tv_sec = 2 ;
137
+ timeout.tv_usec = 0 ;
138
+ int ret = select (sock_listener + 1 , &set, NULL , NULL , &timeout);
139
+ if (ret <= 0 ) {
140
+ continue ;
141
+ }
142
+ sockaddr_in client_addr;
143
+ socklen_t client_addr_size = sizeof (client_addr);
144
+ int sock_client;
145
+ if ((sock_client = accept (sock_listener, (sockaddr*)&client_addr, &client_addr_size)) < 0 ) {
146
+ std::cerr << " accept connection failed with error:" << errno << std::endl;
147
+ continue ;
148
+ }
149
+
150
+ // 处理客户端连接
151
+ std::async (std::launch::async, [=] {
152
+ char buffer[1024 ];
153
+ auto bytes = recv (sock_client, buffer, sizeof (buffer), 0 );
154
+ if (bytes <= 0 ) {
155
+ std::cerr << " received message failed: " << errno << std::endl;
156
+ close (sock_client);
157
+ return ;
158
+ }
159
+ std::string response = Proccess (buffer);
160
+ bytes = send (sock_client, response.data (), response.size (), 0 );
161
+ close (sock_client);
162
+
163
+ if (bytes < 0 ) {
164
+ std::cerr << " send response failed: " << errno << std::endl;
165
+ }
166
+ });
167
+ }
168
+ close (sock_listener);
169
+ }));
170
+
171
+ return 0 ;
172
+ }
173
+
174
+ std::string ConsumerServer::Proccess (const std::string& message) {
175
+ // 获取provider服务实例
176
+ polaris::GetOneInstanceRequest instance_requst (provider_service_);
177
+ polaris::Instance instance;
178
+ auto ret_code = consumer_->GetOneInstance (instance_requst, instance);
179
+ if (ret_code != polaris::kReturnOk ) {
180
+ std::cout << " get one instance for service with error: "
181
+ << polaris::ReturnCodeToMsg (ret_code).c_str () << std::endl;
182
+ }
183
+
184
+ // 调用业务
185
+ std::string response;
186
+ auto begin_time = std::chrono::steady_clock::now ();
187
+ int send_ret = Send (instance.GetHost (), instance.GetPort (), message, response);
188
+ auto end_time = std::chrono::steady_clock::now ();
189
+
190
+ // 上报调用结果
191
+ polaris::ServiceCallResult result;
192
+ result.SetServiceNamespace (provider_service_.namespace_ );
193
+ result.SetServiceName (provider_service_.name_ );
194
+ result.SetInstanceId (instance.GetId ());
195
+ result.SetDelay (
196
+ std::chrono::duration_cast<std::chrono::milliseconds>(end_time - begin_time).count ());
197
+ result.SetRetCode (send_ret);
198
+ result.SetRetStatus (send_ret >= 0 ? polaris::kCallRetOk : polaris::kCallRetError );
199
+ if ((ret_code = consumer_->UpdateServiceCallResult (result)) != polaris::kReturnOk ) {
200
+ std::cout << " update call result for instance with error:"
201
+ << " msg:" << polaris::ReturnCodeToMsg (ret_code).c_str () << std::endl;
202
+ }
203
+
204
+ if (send_ret) {
205
+ response =
206
+ " send msg to " + instance.GetHost () + " :" + std::to_string (instance.GetPort ()) + " failed" ;
207
+ }
208
+ std::cout << response << std::endl;
209
+ return response;
210
+ }
211
+
212
+ int ConsumerServer::Send (const std::string& host, int port, const std::string& request,
213
+ std::string& response) {
214
+ // create a socket
215
+ int sock_fd = socket (AF_INET, SOCK_STREAM, 0 );
216
+ if (sock_fd < 0 ) {
217
+ std::cout << " create socket failed: " << errno << std::endl;
218
+ return -1 ;
219
+ }
220
+
221
+ sockaddr_in server_addr;
222
+ server_addr.sin_family = AF_INET;
223
+ inet_pton (AF_INET, host.c_str (), &server_addr.sin_addr );
224
+ server_addr.sin_port = htons (port);
225
+
226
+ if (connect (sock_fd, (sockaddr*)&server_addr, sizeof (server_addr)) < 0 ) {
227
+ std::cerr << " connection establish failed: " << errno << std::endl;
228
+ close (sock_fd);
229
+ return -2 ;
230
+ }
231
+
232
+ // send the message
233
+ int bytes_send = send (sock_fd, request.data (), request.length (), 0 );
234
+ if (bytes_send < 0 ) {
235
+ std::cerr << " send message failed: " << errno << std::endl;
236
+ close (sock_fd);
237
+ return -3 ;
238
+ }
239
+
240
+ char buffer[4096 ];
241
+ int bytes_recv = recv (sock_fd, &buffer, sizeof (buffer), 0 );
242
+ if (bytes_recv <= 0 ) {
243
+ std::cerr << " receive message failed: " << errno << std::endl;
244
+ close (sock_fd);
245
+ return -4 ;
246
+ }
247
+
248
+ close (sock_fd);
249
+ response = std::string (buffer);
250
+ return 0 ;
251
+ }
252
+
253
+ void ConsumerServer::Stop () {
254
+ stop_ = true ;
255
+ if (accept_thread_) {
256
+ accept_thread_->join ();
257
+ }
258
+ }
0 commit comments