Skip to content

Commit f1abd15

Browse files
committed
Order asynchronous client operations at method call but not future evaluation
Resolves #6.
1 parent abfcde5 commit f1abd15

File tree

9 files changed

+548
-310
lines changed

9 files changed

+548
-310
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,4 @@ pretty_assertions = "1.1.0"
3434
test-case = "1.2.3"
3535
testcontainers = { git = "https://github.com/kezhuw/testcontainers-rs.git", branch = "zookeeper-client" }
3636
futures = "0.3.21"
37+
speculoos = "0.9.0"

src/client/mod.rs

+314-167
Large diffs are not rendered by default.

src/proto/consts.rs

+9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,16 @@ use num_enum::{IntoPrimitive, TryFromPrimitive};
44
#[derive(Copy, Clone, Debug, PartialEq, Eq, IntoPrimitive)]
55
pub enum PredefinedXid {
66
Notification = -1,
7+
/// ZooKeeper server [hard-code -2 as ping response xid][ping-xid], so we have to use this and make sure
8+
/// at most one ping in wire.
9+
///
10+
/// ping-xid: https://github.com/apache/zookeeper/blob/de7c5869d372e46af43979134d0e30b49d2319b1/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java#L215
711
Ping = -2,
12+
13+
/// Fortunately, ZooKeeper server [use xid from header](auth-xid) to reply auth request, so we can have
14+
/// multiple auth requets in network.
15+
///
16+
/// auth-xid: https://github.com/apache/zookeeper/blob/de7c5869d372e46af43979134d0e30b49d2319b1/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java#L1621
817
Auth = -4,
918
SetWatches = -8,
1019
}

src/proto/request_header.rs

-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ impl RequestHeader {
1313
pub fn with_code(code: OpCode) -> RequestHeader {
1414
let xid = match code {
1515
OpCode::Ping => PredefinedXid::Ping.into(),
16-
OpCode::Auth => PredefinedXid::Auth.into(),
1716
OpCode::SetWatches | OpCode::SetWatches2 => PredefinedXid::SetWatches.into(),
1817
_ => 0,
1918
};

src/session/depot.rs

+52-55
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,23 @@ use std::io::{self, IoSlice};
44
use hashbrown::HashMap;
55
use strum::IntoEnumIterator;
66
use tokio::net::TcpStream;
7-
use tokio::sync::oneshot;
87

9-
use super::request::{self, MarshalledRequest, Operation, SessionOperation, StateResponser};
8+
use super::request::{MarshalledRequest, Operation, SessionOperation, StateResponser};
109
use super::types::WatchMode;
1110
use super::xid::Xid;
1211
use super::SessionId;
1312
use crate::error::Error;
14-
use crate::proto::{AuthPacket, OpCode, RemoveWatchesRequest};
15-
16-
pub type AuthResponser = oneshot::Sender<Result<(), Error>>;
13+
use crate::proto::{OpCode, PredefinedXid, RemoveWatchesRequest};
1714

1815
#[derive(Default)]
1916
pub struct Depot {
2017
xid: Xid,
2118

19+
pending_authes: Vec<SessionOperation>,
20+
2221
writing_slices: Vec<IoSlice<'static>>,
2322
writing_operations: VecDeque<Operation>,
24-
written_operations: VecDeque<SessionOperation>,
25-
pending_auth: Option<(AuthPacket, AuthResponser)>,
23+
written_operations: HashMap<i32, SessionOperation>,
2624

2725
watching_paths: HashMap<(&'static str, WatchMode), usize>,
2826
unwatching_paths: HashMap<(&'static str, WatchMode), SessionOperation>,
@@ -33,10 +31,10 @@ impl Depot {
3331
let writing_capacity = 128usize;
3432
Depot {
3533
xid: Default::default(),
34+
pending_authes: Vec::with_capacity(5),
3635
writing_slices: Vec::with_capacity(writing_capacity),
3736
writing_operations: VecDeque::with_capacity(writing_capacity),
38-
written_operations: VecDeque::with_capacity(128),
39-
pending_auth: None,
37+
written_operations: HashMap::with_capacity(128),
4038
watching_paths: HashMap::with_capacity(32),
4139
unwatching_paths: HashMap::with_capacity(32),
4240
}
@@ -45,28 +43,40 @@ impl Depot {
4543
pub fn for_connecting() -> Depot {
4644
Depot {
4745
xid: Default::default(),
46+
pending_authes: Default::default(),
4847
writing_slices: Vec::with_capacity(10),
4948
writing_operations: VecDeque::with_capacity(10),
50-
written_operations: VecDeque::with_capacity(10),
51-
pending_auth: None,
49+
written_operations: HashMap::with_capacity(10),
5250
watching_paths: HashMap::new(),
5351
unwatching_paths: HashMap::new(),
5452
}
5553
}
5654

55+
/// Clear all buffered operations from previous run.
5756
pub fn clear(&mut self) {
57+
self.pending_authes.clear();
5858
self.writing_slices.clear();
5959
self.watching_paths.clear();
60+
self.unwatching_paths.clear();
6061
self.writing_operations.clear();
6162
self.written_operations.clear();
6263
}
6364

64-
pub fn error(&mut self, err: Error) {
65-
self.written_operations.drain(..).for_each(|operation| {
65+
/// Error out ongoing operations except authes.
66+
pub fn error(&mut self, err: &Error) {
67+
self.written_operations.drain().for_each(|(_, operation)| {
68+
if operation.request.get_code() == OpCode::Auth {
69+
self.pending_authes.push(operation);
70+
return;
71+
}
6672
operation.responser.send(Err(err.clone()));
6773
});
6874
self.writing_operations.drain(..).for_each(|operation| {
6975
if let Operation::Session(operation) = operation {
76+
if operation.request.get_code() == OpCode::Auth {
77+
self.pending_authes.push(operation);
78+
return;
79+
}
7080
operation.responser.send(Err(err.clone()));
7181
}
7282
});
@@ -77,42 +87,28 @@ impl Depot {
7787
self.watching_paths.clear();
7888
}
7989

80-
pub fn is_empty(&self) -> bool {
81-
self.writing_operations.is_empty() && self.written_operations.is_empty()
82-
}
83-
84-
pub fn pop_pending_auth(&mut self) -> Option<(AuthPacket, AuthResponser)> {
85-
self.pending_auth.take()
90+
/// Terminate all ongoing operations including authes.
91+
pub fn terminate(&mut self, err: Error) {
92+
self.error(&err);
93+
for SessionOperation { responser, .. } in self.pending_authes.drain(..) {
94+
responser.send(Err(err.clone()));
95+
}
8696
}
8797

88-
pub fn has_pending_auth(&self) -> bool {
89-
self.pending_auth.is_some()
98+
/// Check whether there is any ongoing operations.
99+
pub fn is_empty(&self) -> bool {
100+
self.writing_operations.is_empty() && self.written_operations.is_empty()
90101
}
91102

92-
pub fn pop_reqeust(&mut self, xid: i32) -> Result<SessionOperation, Error> {
93-
match self.written_operations.pop_front() {
103+
pub fn pop_request(&mut self, xid: i32) -> Result<SessionOperation, Error> {
104+
match self.written_operations.remove(&xid) {
94105
None => Err(Error::UnexpectedError(format!("recv response with xid {} but no pending request", xid))),
95-
Some(operation) => {
96-
let request_xid = operation.request.get_xid();
97-
if xid == request_xid {
98-
return Ok(operation);
99-
}
100-
self.written_operations.push_front(operation);
101-
Err(Error::UnexpectedError(format!("expect response xid {} but got {}", xid, request_xid)))
102-
},
106+
Some(operation) => Ok(operation),
103107
}
104108
}
105109

106110
pub fn pop_ping(&mut self) -> Result<(), Error> {
107-
if let Some(operation) = self.written_operations.pop_front() {
108-
let op_code = operation.request.get_code();
109-
if op_code != OpCode::Ping {
110-
self.written_operations.push_front(operation);
111-
return Err(Error::UnexpectedError(format!("expect pending ping request, got {}", op_code)));
112-
}
113-
return Ok(());
114-
}
115-
Err(Error::UnexpectedError("expect pending ping request, got none".to_string()))
111+
self.pop_request(PredefinedXid::Ping.into()).map(|_| ())
116112
}
117113

118114
pub fn push_operation(&mut self, operation: Operation) {
@@ -126,9 +122,11 @@ impl Depot {
126122
}
127123

128124
pub fn start(&mut self) {
129-
if let Some((auth, responser)) = self.pending_auth.take() {
130-
self.push_auth(auth, responser);
125+
let mut pending_authes = std::mem::take(&mut self.pending_authes);
126+
for operation in pending_authes.drain(..) {
127+
self.push_session(operation);
131128
}
129+
self.pending_authes = pending_authes;
132130
}
133131

134132
fn cancel_unwatch(&mut self, path: &'static str, mode: WatchMode) {
@@ -195,12 +193,6 @@ impl Depot {
195193
.any(|mode| self.watching_paths.contains_key(&(path, mode)))
196194
}
197195

198-
pub fn push_auth(&mut self, auth: AuthPacket, responser: AuthResponser) {
199-
let operation = request::build_auth_operation(OpCode::Auth, &auth);
200-
self.pending_auth = Some((auth, responser));
201-
self.push_operation(Operation::Auth(operation));
202-
}
203-
204196
pub fn write_operations(&mut self, sock: &TcpStream, session_id: SessionId) -> Result<(), Error> {
205197
let result = sock.try_write_vectored(self.writing_slices.as_slice());
206198
let mut written_bytes = match result {
@@ -226,13 +218,18 @@ impl Depot {
226218
.unwrap_or(self.writing_slices.len());
227219
if written_slices != 0 {
228220
self.writing_slices.drain(..written_slices);
229-
let written = self.writing_operations.drain(..written_slices).filter_map(|operation| {
230-
if let Operation::Session(operation) = operation {
231-
return Some(operation);
232-
}
233-
None
234-
});
235-
self.written_operations.extend(written);
221+
self.writing_operations
222+
.drain(..written_slices)
223+
.filter_map(|operation| {
224+
if let Operation::Session(operation) = operation {
225+
return Some(operation);
226+
}
227+
None
228+
})
229+
.for_each(|operation| {
230+
let xid = operation.request.get_xid();
231+
self.written_operations.insert(xid, operation);
232+
});
236233
}
237234
if written_bytes != 0 {
238235
let (_, rest) = self.writing_slices[0].split_at(written_bytes);

0 commit comments

Comments
 (0)