-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathprotocol_driver.h
152 lines (125 loc) · 5.59 KB
/
protocol_driver.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef DISTBENCH_PROTOCOL_DRIVER_H_
#define DISTBENCH_PROTOCOL_DRIVER_H_
#include <string_view>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/synchronization/notification.h"
#include "distbench.pb.h"
#include "simple_clock.h"
namespace distbench {
struct ClientRpcState {
GenericRequestResponse request;
GenericRequestResponse response;
absl::Time prior_start_time = absl::InfinitePast();
absl::Time start_time = absl::InfinitePast();
absl::Time end_time;
bool success;
};
struct ServerRpcState {
void SetSendResponseFunction(
std::function<void(void)> send_response_function);
void SendResponseIfSet();
void SetFreeStateFunction(std::function<void(void)> free_state_function);
void FreeStateIfSet();
const GenericRequestResponse* request;
GenericRequestResponse response;
bool have_dedicated_thread = false;
// If an RPC protocol automatically frees the request message when a response
// is sent, and the API defines the request as a const pointer (meaning we
// cannot use std::move to extract its contents), then we must make a copy of
// the request (or just the first 5 fields) in order to avoid a dangling
// pointer. In that case the request pointer can be set to point to
// request_copy, which has the proper lifetime. Otherwise this is unused.
GenericRequestResponse request_copy;
private:
std::function<void(void)> send_response_function_;
std::function<void(void)> free_state_function_;
};
struct TransportStat {
std::string name;
int64_t value;
};
using RpcId = int64_t;
class ProtocolDriverClient {
public:
virtual ~ProtocolDriverClient() {}
virtual absl::Status Initialize(const ProtocolDriverOptions& pd_opts) {
return absl::OkStatus();
}
// Client interface =========================================================
virtual void SetNumPeers(int num_peers) = 0;
virtual void SetNumMultiServerChannels(int num_channels);
virtual absl::Status SetupMultiServerChannel(
const ::google::protobuf::RepeatedPtrField<NamedSetting>& settings,
const std::vector<int>& peer_ids, int channel_id);
// Allocate local resources that are needed to establish a connection
// E.g. an unconnected RoCE QueuePair. Returns opaque data. If no local
// resources are needed, this is a NOP.
virtual absl::StatusOr<std::string> Preconnect();
// Actually establish a conection, given the opaque data from the
// the responder. E.g. connect the local and remote RoCE queue pairs.
virtual absl::Status HandleConnect(std::string remote_connection_info,
int peer) = 0;
virtual void InitiateRpc(int peer_index, ClientRpcState* state,
std::function<void(void)> done_callback) = 0;
virtual void InitiateRpcToMultiServerChannel(
int channel_index, ClientRpcState* state,
std::function<void(void)> done_callback);
virtual void ChurnConnection(int peer) = 0;
virtual void ShutdownClient() = 0;
// Misc interface ===========================================================
virtual std::vector<TransportStat> GetTransportStats() = 0;
};
class ProtocolDriverServer {
public:
virtual ~ProtocolDriverServer() {}
virtual absl::Status Initialize(const ProtocolDriverOptions& pd_opts,
int* port) {
return absl::OkStatus();
}
// Server interface =========================================================
virtual void SetHandler(
std::function<std::function<void()>(ServerRpcState* state)> handler) = 0;
// Return the address of a running server that a client can connect to, or
// actually establish a single conection, given the opaque data from the
// initiator. E.g. allocate an unconnected RoCE queue pair, and connect it
// to the remote queue pair, and return the info about the newly allocated
// queue pair so that the initiator can connect the queue pairs on its end.
virtual absl::StatusOr<std::string> HandlePreConnect(
std::string_view remote_connection_info, int peer) = 0;
virtual void ShutdownServer() = 0;
// Handle the remote side responding with an RPC error by cleaning up
// the local resources associated with the opaque data.
virtual void HandleConnectFailure(std::string_view local_connection_info);
// Misc interface ===========================================================
virtual std::vector<TransportStat> GetTransportStats() = 0;
};
class ProtocolDriver : public ProtocolDriverClient,
public ProtocolDriverServer {
public:
virtual ~ProtocolDriver() {}
virtual absl::Status Initialize(const ProtocolDriverOptions& pd_opts,
int* port) = 0;
virtual std::vector<TransportStat> GetTransportStats() = 0;
// Misc interface ===========================================================
virtual SimpleClock& GetClock();
private:
// Hide the Initialize functions of the base classes:
using ProtocolDriverClient::Initialize;
using ProtocolDriverServer::Initialize;
};
} // namespace distbench
#endif // DISTBENCH_PROTOCOL_DRIVER_H_