Skip to content

Order asynchronous client operations at method call but not future evaluation #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ pretty_assertions = "1.1.0"
test-case = "1.2.3"
testcontainers = { git = "https://github.com/kezhuw/testcontainers-rs.git", branch = "zookeeper-client" }
futures = "0.3.21"
speculoos = "0.9.0"
481 changes: 314 additions & 167 deletions src/client/mod.rs

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/proto/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@ use num_enum::{IntoPrimitive, TryFromPrimitive};
#[derive(Copy, Clone, Debug, PartialEq, Eq, IntoPrimitive)]
pub enum PredefinedXid {
Notification = -1,
/// ZooKeeper server [hard-code -2 as ping response xid][ping-xid], so we have to use this and make sure
/// at most one ping in wire.
///
/// ping-xid: https://github.com/apache/zookeeper/blob/de7c5869d372e46af43979134d0e30b49d2319b1/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java#L215
Ping = -2,

/// Fortunately, ZooKeeper server [use xid from header](auth-xid) to reply auth request, so we can have
/// multiple auth requets in network.
///
/// auth-xid: https://github.com/apache/zookeeper/blob/de7c5869d372e46af43979134d0e30b49d2319b1/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java#L1621
Auth = -4,
SetWatches = -8,
}
Expand Down
1 change: 0 additions & 1 deletion src/proto/request_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ impl RequestHeader {
pub fn with_code(code: OpCode) -> RequestHeader {
let xid = match code {
OpCode::Ping => PredefinedXid::Ping.into(),
OpCode::Auth => PredefinedXid::Auth.into(),
OpCode::SetWatches | OpCode::SetWatches2 => PredefinedXid::SetWatches.into(),
_ => 0,
};
Expand Down
107 changes: 52 additions & 55 deletions src/session/depot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,23 @@ use std::io::{self, IoSlice};
use hashbrown::HashMap;
use strum::IntoEnumIterator;
use tokio::net::TcpStream;
use tokio::sync::oneshot;

use super::request::{self, MarshalledRequest, Operation, SessionOperation, StateResponser};
use super::request::{MarshalledRequest, Operation, SessionOperation, StateResponser};
use super::types::WatchMode;
use super::xid::Xid;
use super::SessionId;
use crate::error::Error;
use crate::proto::{AuthPacket, OpCode, RemoveWatchesRequest};

pub type AuthResponser = oneshot::Sender<Result<(), Error>>;
use crate::proto::{OpCode, PredefinedXid, RemoveWatchesRequest};

#[derive(Default)]
pub struct Depot {
xid: Xid,

pending_authes: Vec<SessionOperation>,

writing_slices: Vec<IoSlice<'static>>,
writing_operations: VecDeque<Operation>,
written_operations: VecDeque<SessionOperation>,
pending_auth: Option<(AuthPacket, AuthResponser)>,
written_operations: HashMap<i32, SessionOperation>,

watching_paths: HashMap<(&'static str, WatchMode), usize>,
unwatching_paths: HashMap<(&'static str, WatchMode), SessionOperation>,
Expand All @@ -33,10 +31,10 @@ impl Depot {
let writing_capacity = 128usize;
Depot {
xid: Default::default(),
pending_authes: Vec::with_capacity(5),
writing_slices: Vec::with_capacity(writing_capacity),
writing_operations: VecDeque::with_capacity(writing_capacity),
written_operations: VecDeque::with_capacity(128),
pending_auth: None,
written_operations: HashMap::with_capacity(128),
watching_paths: HashMap::with_capacity(32),
unwatching_paths: HashMap::with_capacity(32),
}
Expand All @@ -45,28 +43,40 @@ impl Depot {
pub fn for_connecting() -> Depot {
Depot {
xid: Default::default(),
pending_authes: Default::default(),
writing_slices: Vec::with_capacity(10),
writing_operations: VecDeque::with_capacity(10),
written_operations: VecDeque::with_capacity(10),
pending_auth: None,
written_operations: HashMap::with_capacity(10),
watching_paths: HashMap::new(),
unwatching_paths: HashMap::new(),
}
}

/// Clear all buffered operations from previous run.
pub fn clear(&mut self) {
self.pending_authes.clear();
self.writing_slices.clear();
self.watching_paths.clear();
self.unwatching_paths.clear();
self.writing_operations.clear();
self.written_operations.clear();
}

pub fn error(&mut self, err: Error) {
self.written_operations.drain(..).for_each(|operation| {
/// Error out ongoing operations except authes.
pub fn error(&mut self, err: &Error) {
self.written_operations.drain().for_each(|(_, operation)| {
if operation.request.get_code() == OpCode::Auth {
self.pending_authes.push(operation);
return;
}
operation.responser.send(Err(err.clone()));
});
self.writing_operations.drain(..).for_each(|operation| {
if let Operation::Session(operation) = operation {
if operation.request.get_code() == OpCode::Auth {
self.pending_authes.push(operation);
return;
}
operation.responser.send(Err(err.clone()));
}
});
Expand All @@ -77,42 +87,28 @@ impl Depot {
self.watching_paths.clear();
}

pub fn is_empty(&self) -> bool {
self.writing_operations.is_empty() && self.written_operations.is_empty()
}

pub fn pop_pending_auth(&mut self) -> Option<(AuthPacket, AuthResponser)> {
self.pending_auth.take()
/// Terminate all ongoing operations including authes.
pub fn terminate(&mut self, err: Error) {
self.error(&err);
for SessionOperation { responser, .. } in self.pending_authes.drain(..) {
responser.send(Err(err.clone()));
}
}

pub fn has_pending_auth(&self) -> bool {
self.pending_auth.is_some()
/// Check whether there is any ongoing operations.
pub fn is_empty(&self) -> bool {
self.writing_operations.is_empty() && self.written_operations.is_empty()
}

pub fn pop_reqeust(&mut self, xid: i32) -> Result<SessionOperation, Error> {
match self.written_operations.pop_front() {
pub fn pop_request(&mut self, xid: i32) -> Result<SessionOperation, Error> {
match self.written_operations.remove(&xid) {
None => Err(Error::UnexpectedError(format!("recv response with xid {} but no pending request", xid))),
Some(operation) => {
let request_xid = operation.request.get_xid();
if xid == request_xid {
return Ok(operation);
}
self.written_operations.push_front(operation);
Err(Error::UnexpectedError(format!("expect response xid {} but got {}", xid, request_xid)))
},
Some(operation) => Ok(operation),
}
}

pub fn pop_ping(&mut self) -> Result<(), Error> {
if let Some(operation) = self.written_operations.pop_front() {
let op_code = operation.request.get_code();
if op_code != OpCode::Ping {
self.written_operations.push_front(operation);
return Err(Error::UnexpectedError(format!("expect pending ping request, got {}", op_code)));
}
return Ok(());
}
Err(Error::UnexpectedError("expect pending ping request, got none".to_string()))
self.pop_request(PredefinedXid::Ping.into()).map(|_| ())
}

pub fn push_operation(&mut self, operation: Operation) {
Expand All @@ -126,9 +122,11 @@ impl Depot {
}

pub fn start(&mut self) {
if let Some((auth, responser)) = self.pending_auth.take() {
self.push_auth(auth, responser);
let mut pending_authes = std::mem::take(&mut self.pending_authes);
for operation in pending_authes.drain(..) {
self.push_session(operation);
}
self.pending_authes = pending_authes;
}

fn cancel_unwatch(&mut self, path: &'static str, mode: WatchMode) {
Expand Down Expand Up @@ -195,12 +193,6 @@ impl Depot {
.any(|mode| self.watching_paths.contains_key(&(path, mode)))
}

pub fn push_auth(&mut self, auth: AuthPacket, responser: AuthResponser) {
let operation = request::build_auth_operation(OpCode::Auth, &auth);
self.pending_auth = Some((auth, responser));
self.push_operation(Operation::Auth(operation));
}

pub fn write_operations(&mut self, sock: &TcpStream, session_id: SessionId) -> Result<(), Error> {
let result = sock.try_write_vectored(self.writing_slices.as_slice());
let mut written_bytes = match result {
Expand All @@ -226,13 +218,18 @@ impl Depot {
.unwrap_or(self.writing_slices.len());
if written_slices != 0 {
self.writing_slices.drain(..written_slices);
let written = self.writing_operations.drain(..written_slices).filter_map(|operation| {
if let Operation::Session(operation) = operation {
return Some(operation);
}
None
});
self.written_operations.extend(written);
self.writing_operations
.drain(..written_slices)
.filter_map(|operation| {
if let Operation::Session(operation) = operation {
return Some(operation);
}
None
})
.for_each(|operation| {
let xid = operation.request.get_xid();
self.written_operations.insert(xid, operation);
});
}
if written_bytes != 0 {
let (_, rest) = self.writing_slices[0].split_at(written_bytes);
Expand Down
Loading