-
-
Notifications
You must be signed in to change notification settings - Fork 40
/
client.rs
169 lines (145 loc) · 5.05 KB
/
client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// LNP Node: node running lightning network protocol and generalized lightning
// channels.
// Written in 2020-2022 by
// Dr. Maxim Orlovsky <[email protected]>
//
// To the extent possible under law, the author(s) have dedicated all
// copyright and related and neighboring rights to this software to
// the public domain worldwide. This software is distributed without
// any warranty.
//
// You should have received a copy of the MIT License along with this software.
// If not, see <https://opensource.org/licenses/MIT>.
use std::thread::sleep;
use std::time::Duration;
use colored::Colorize;
use internet2::addr::ServiceAddr;
use internet2::ZmqSocketType;
use lnp::addr::LnpAddr;
use microservices::esb::{self, BusId, ClientId};
use microservices::util::OptionDetails;
use crate::{BusMsg, Error, RpcMsg, ServiceId};
// We have just a single service bus (RPC), so we can use any id
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Default, Display)]
#[display("LNPRPC")]
struct RpcBus;
impl BusId for RpcBus {
type Address = ServiceId;
}
type Bus = esb::EndpointList<RpcBus>;
#[repr(C)]
pub struct Client {
identity: ClientId,
response_queue: Vec<RpcMsg>,
esb: esb::Controller<RpcBus, BusMsg, Handler>,
}
impl Client {
pub fn with(connect: ServiceAddr) -> Result<Self, Error> {
use bitcoin::secp256k1::rand;
debug!("RPC socket {}", connect);
debug!("Setting up RPC client...");
let identity = rand::random();
let bus_config = esb::BusConfig::with_addr(
connect,
ZmqSocketType::RouterConnect,
Some(ServiceId::router()),
);
let esb = esb::Controller::with(
map! {
RpcBus => bus_config
},
Handler { identity: ServiceId::Client(identity) },
)?;
// We have to sleep in order for ZMQ to bootstrap
sleep(Duration::from_secs_f32(0.1));
Ok(Self { identity, response_queue: empty!(), esb })
}
pub fn identity(&self) -> ClientId { self.identity }
pub fn request(&mut self, daemon: ServiceId, req: RpcMsg) -> Result<(), Error> {
debug!("Executing {}", req);
self.esb.send_to(RpcBus, daemon, BusMsg::Rpc(req))?;
Ok(())
}
pub fn response(&mut self) -> Result<RpcMsg, Error> {
if self.response_queue.is_empty() {
for poll in self.esb.recv_poll()? {
match poll.request {
BusMsg::Rpc(msg) => self.response_queue.push(msg),
}
}
}
Ok(self.response_queue.pop().expect("We always have at least one element"))
}
pub fn report_failure(&mut self) -> Result<RpcMsg, Error> {
match self.response()? {
RpcMsg::Failure(fail) => {
eprintln!("{}: {}", "Request failure".bright_red(), fail.to_string().red());
Err(Error::Rpc(fail.into_microservice_failure().into()))
}
resp => Ok(resp),
}
}
pub fn report_response(&mut self) -> Result<(), Error> {
let resp = self.report_failure()?;
println!("{:#}", resp);
Ok(())
}
pub fn report_progress(&mut self) -> Result<usize, Error> {
let mut counter = 0;
let mut finished = false;
while !finished {
finished = true;
counter += 1;
match self.report_failure()? {
// Failure is already covered by `report_response()`
RpcMsg::Progress(info) => {
println!("{}", info);
finished = false;
}
RpcMsg::Success(OptionDetails(Some(info))) => {
println!("{}{}", "Success: ".bright_green(), info);
}
RpcMsg::Success(OptionDetails(None)) => {
println!("{}", "Success".bright_green());
}
other => {
eprintln!(
"{}: {}",
"Unexpected message".bright_yellow(),
other.to_string().yellow()
);
return Err(Error::Other(s!("Unexpected server response")));
}
}
}
Ok(counter)
}
}
impl Client {
pub fn connect(&mut self, remote_peer: LnpAddr) -> Result<(), Error> {
self.request(ServiceId::LnpBroker, RpcMsg::ConnectPeer(remote_peer))?;
self.report_response()
}
}
pub struct Handler {
identity: ServiceId,
}
impl esb::Handler<RpcBus> for Handler {
type Request = BusMsg;
type Error = Error;
fn identity(&self) -> ServiceId { self.identity.clone() }
fn handle(
&mut self,
_: &mut Bus,
_: RpcBus,
_: ServiceId,
_: BusMsg,
) -> Result<(), Self::Error> {
// Cli does not receive replies for now
Ok(())
}
fn handle_err(&mut self, _: &mut Bus, err: esb::Error<ServiceId>) -> Result<(), Self::Error> {
// We simply propagate the error since it already has been reported
Err(err.into())
}
}