forked from trpc-group/trpc-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathforward_service.cc
113 lines (82 loc) · 3.61 KB
/
forward_service.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//
#include "examples/features/fiber_forward/proxy/forward_service.h"
#include <memory>
#include <string>
#include <utility>
#include "trpc/client/client_context.h"
#include "trpc/client/make_client_context.h"
#include "trpc/client/trpc_client.h"
#include "trpc/coroutine/fiber.h"
#include "trpc/coroutine/fiber_latch.h"
#include "trpc/log/trpc_log.h"
namespace examples::forward {
ForwardServiceImpl::ForwardServiceImpl() {
greeter_proxy_ =
::trpc::GetTrpcClient()->GetProxy<::trpc::test::helloworld::GreeterServiceProxy>("trpc.test.helloworld.Greeter");
}
::trpc::Status ForwardServiceImpl::Route(::trpc::ServerContextPtr context,
const ::trpc::test::helloworld::HelloRequest* request,
::trpc::test::helloworld::HelloReply* reply) {
TRPC_FMT_INFO("Forward request:{}, req id:{}", request->msg(), context->GetRequestId());
auto client_context = ::trpc::MakeClientContext(context, greeter_proxy_);
::trpc::test::helloworld::HelloRequest route_request;
route_request.set_msg(request->msg());
::trpc::test::helloworld::HelloReply route_reply;
// block current fiber, not block current fiber worker thread
::trpc::Status status = greeter_proxy_->SayHello(client_context, route_request, &route_reply);
TRPC_FMT_INFO("Forward status:{}, route_reply:{}", status.ToString(), route_reply.msg());
reply->set_msg(route_reply.msg());
return status;
}
::trpc::Status ForwardServiceImpl::ParallelRoute(::trpc::ServerContextPtr context,
const ::trpc::test::helloworld::HelloRequest* request,
::trpc::test::helloworld::HelloReply* reply) {
TRPC_FMT_INFO("Forward request:{}, req id:{}", request->msg(), context->GetRequestId());
// send two requests in parallel to helloworldserver
int exe_count = 2;
::trpc::FiberLatch l(exe_count);
std::vector<::trpc::test::helloworld::HelloReply> vec_final_reply;
vec_final_reply.resize(exe_count);
int i = 0;
while (i < exe_count) {
bool ret = ::trpc::StartFiberDetached([this, &l, &context, &request, i, &vec_final_reply] {
std::string msg = request->msg();
msg += ", index";
msg += std::to_string(i);
trpc::test::helloworld::HelloRequest request;
request.set_msg(msg);
auto client_context = ::trpc::MakeClientContext(context, greeter_proxy_);
::trpc::Status status = greeter_proxy_->SayHello(client_context, request, &vec_final_reply[i]);
TRPC_FMT_INFO("Forward i: {}, status:{}, route_reply:{}", i, status.ToString(), vec_final_reply[i].msg());
// when reduced to 0, the FiberLatch `Wait` operation will be notified
l.CountDown();
});
if (!ret) {
std::string msg("failed, index:");
msg += std::to_string(i);
vec_final_reply[i].set_msg("failed.");
l.CountDown();
}
i += 1;
}
// wait for two requests to return
// block current fiber, not block current fiber worker thread
l.Wait();
reply->set_msg("parallel result: ");
for (size_t i = 0; i < vec_final_reply.size(); i++) {
reply->set_msg(reply->msg() + vec_final_reply[i].msg());
}
return ::trpc::kSuccStatus;
}
} // namespace examples::forward