Skip to content

Commit

Permalink
Change: remove deprecated RaftNetwork methods without option argument
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Feb 24, 2024
1 parent b73f906 commit 22cd3bb
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ impl RaftNetworkFactory<TypeConfig> for Router {
}

impl RaftNetwork<TypeConfig> for Connection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
let resp = self
.router
Expand All @@ -65,7 +66,11 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ impl RaftNetworkFactory<TypeConfig> for Router {
}

impl RaftNetwork<TypeConfig> for Connection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
let resp = self
.router
Expand All @@ -65,7 +66,11 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
13 changes: 10 additions & 3 deletions examples/raft-kv-memstore-singlethreaded/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use openraft::error::InstallSnapshotError;
use openraft::error::RemoteError;
use openraft::network::RPCOption;
use openraft::raft::AppendEntriesRequest;
use openraft::raft::AppendEntriesResponse;
use openraft::raft::InstallSnapshotRequest;
Expand Down Expand Up @@ -32,9 +33,10 @@ impl RaftNetworkFactory<TypeConfig> for Router {
}

impl RaftNetwork<TypeConfig> for Connection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
let resp = self
.router
Expand All @@ -44,9 +46,10 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_install_snapshot(
async fn install_snapshot(
&mut self,
req: InstallSnapshotRequest<TypeConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<NodeId>, typ::RPCError<InstallSnapshotError>> {
let resp = self
.router
Expand All @@ -56,7 +59,11 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
13 changes: 10 additions & 3 deletions examples/raft-kv-memstore/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use openraft::error::InstallSnapshotError;
use openraft::error::NetworkError;
use openraft::error::RemoteError;
use openraft::network::RPCOption;
use openraft::network::RaftNetwork;
use openraft::network::RaftNetworkFactory;
use openraft::raft::AppendEntriesRequest;
Expand Down Expand Up @@ -77,21 +78,27 @@ pub struct NetworkConnection {
}

impl RaftNetwork<TypeConfig> for NetworkConnection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
self.owner.send_rpc(self.target, &self.target_node, "raft-append", req).await
}

async fn send_install_snapshot(
async fn install_snapshot(
&mut self,
req: InstallSnapshotRequest<TypeConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<NodeId>, typ::RPCError<InstallSnapshotError>> {
self.owner.send_rpc(self.target, &self.target_node, "raft-snapshot", req).await
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
self.owner.send_rpc(self.target, &self.target_node, "raft-vote", req).await
}
}
18 changes: 11 additions & 7 deletions examples/raft-kv-rocksdb/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use openraft::error::NetworkError;
use openraft::error::RPCError;
use openraft::error::RaftError;
use openraft::error::RemoteError;
use openraft::network::RPCOption;
use openraft::network::RaftNetwork;
use openraft::network::RaftNetworkFactory;
use openraft::raft::AppendEntriesRequest;
Expand Down Expand Up @@ -99,7 +100,7 @@ fn to_error<E: std::error::Error + 'static + Clone>(e: toy_rpc::Error, target: N
// 99 | ) -> Result<AppendEntriesResponse<NodeId>, RPCError<NodeId, Node, RaftError<NodeId>>>
// {
// | ___________________________________________________________________________________________^
// 100 | | tracing::debug!(req = debug(&req), "send_append_entries");
// 100 | | tracing::debug!(req = debug(&req), "append_entries");
// 101 | |
// 102 | | let c = self.c().await?;
// ... |
Expand All @@ -112,11 +113,12 @@ fn to_error<E: std::error::Error + 'static + Clone>(e: toy_rpc::Error, target: N
#[allow(clippy::blocks_in_conditions)]
impl RaftNetwork<TypeConfig> for NetworkConnection {
#[tracing::instrument(level = "debug", skip_all, err(Debug))]
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, RPCError<NodeId, Node, RaftError<NodeId>>> {
tracing::debug!(req = debug(&req), "send_append_entries");
tracing::debug!(req = debug(&req), "append_entries");

let c = self.c().await?;
tracing::debug!("got connection");
Expand All @@ -128,20 +130,22 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
}

#[tracing::instrument(level = "debug", skip_all, err(Debug))]
async fn send_install_snapshot(
async fn install_snapshot(
&mut self,
req: InstallSnapshotRequest<TypeConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<NodeId>, RPCError<NodeId, Node, RaftError<NodeId, InstallSnapshotError>>> {
tracing::debug!(req = debug(&req), "send_install_snapshot");
tracing::debug!(req = debug(&req), "install_snapshot");
self.c().await?.raft().snapshot(req).await.map_err(|e| to_error(e, self.target))
}

#[tracing::instrument(level = "debug", skip_all, err(Debug))]
async fn send_vote(
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, RPCError<NodeId, Node, RaftError<NodeId>>> {
tracing::debug!(req = debug(&req), "send_vote");
tracing::debug!(req = debug(&req), "vote");
self.c().await?.raft().vote(req).await.map_err(|e| to_error(e, self.target))
}
}
2 changes: 1 addition & 1 deletion openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ where
Closed(#[from] ReplicationClosed),

// TODO(xp): two sub type: StorageError / TransportError
// TODO(xp): a sub error for just send_append_entries()
// TODO(xp): a sub error for just append_entries()
#[error(transparent)]
StorageError(#[from] StorageError<NID>),

Expand Down
100 changes: 19 additions & 81 deletions openraft/src/network/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,6 @@ use crate::Vote;
///
/// A single network instance is used to connect to a single target node. The network instance is
/// constructed by the [`RaftNetworkFactory`](`crate::network::RaftNetworkFactory`).
///
/// ### 2023-05-03: New API with options
///
/// - This trait introduced 3 new API `append_entries`, `install_snapshot` and `vote` which accept
/// an additional argument [`RPCOption`], and deprecated the old API `send_append_entries`,
/// `send_install_snapshot` and `send_vote`.
///
/// - The old API will be **removed** in `0.9`. An application can still implement the old API
/// without any changes. Openraft calls only the new API and the default implementation will
/// delegate to the old API.
///
/// - Implementing the new APIs will disable the old APIs.
#[add_async_trait]
pub trait RaftNetwork<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
Expand All @@ -52,37 +40,26 @@ where C: RaftTypeConfig
&mut self,
rpc: AppendEntriesRequest<C>,
option: RPCOption,
) -> Result<AppendEntriesResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
let _ = option;
#[allow(deprecated)]
self.send_append_entries(rpc).await
}
) -> Result<AppendEntriesResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>>;

/// Send an InstallSnapshot RPC to the target.
#[cfg(not(feature = "generic-snapshot-data"))]
#[deprecated(since = "0.9.0", note = "use `snapshot()` instead for sending a complete snapshot")]
async fn install_snapshot(
&mut self,
rpc: InstallSnapshotRequest<C>,
option: RPCOption,
_rpc: InstallSnapshotRequest<C>,
_option: RPCOption,
) -> Result<
InstallSnapshotResponse<C::NodeId>,
RPCError<C::NodeId, C::Node, RaftError<C::NodeId, InstallSnapshotError>>,
> {
let _ = option;
#[allow(deprecated)]
self.send_install_snapshot(rpc).await
}
>;

/// Send a RequestVote RPC to the target.
async fn vote(
&mut self,
rpc: VoteRequest<C::NodeId>,
option: RPCOption,
) -> Result<VoteResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
let _ = option;
#[allow(deprecated)]
self.send_vote(rpc).await
}
) -> Result<VoteResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>>;

/// Send a complete Snapshot to the target.
///
Expand All @@ -99,67 +76,28 @@ where C: RaftTypeConfig
/// with this vote.
///
/// `cancel` get `Ready` when the caller decides to cancel this snapshot transmission.
#[cfg(feature = "generic-snapshot-data")]
async fn snapshot(
&mut self,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend,
option: RPCOption,
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>> {
#[cfg(not(feature = "generic-snapshot-data"))]
{
use crate::network::stream_snapshot;
use crate::network::stream_snapshot::SnapshotTransport;

let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?;
Ok(resp)
}
#[cfg(feature = "generic-snapshot-data")]
{
let _ = (vote, snapshot, cancel, option);
unimplemented!(
"no default implementation for RaftNetwork::snapshot() if `generic-snapshot-data` feature is enabled"
)
}
}
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>;

/// Send an AppendEntries RPC to the target Raft node (§5).
#[deprecated(
since = "0.8.4",
note = "use `append_entries` instead. This method will be removed in 0.9"
)]
async fn send_append_entries(
&mut self,
rpc: AppendEntriesRequest<C>,
) -> Result<AppendEntriesResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
let _ = rpc;
unimplemented!("send_append_entries is deprecated")
}

/// Send an InstallSnapshot RPC to the target Raft node (§7).
#[deprecated(
since = "0.8.4",
note = "use `install_snapshot` instead. This method will be removed in 0.9"
)]
async fn send_install_snapshot(
#[cfg(not(feature = "generic-snapshot-data"))]
async fn snapshot(
&mut self,
rpc: InstallSnapshotRequest<C>,
) -> Result<
InstallSnapshotResponse<C::NodeId>,
RPCError<C::NodeId, C::Node, RaftError<C::NodeId, InstallSnapshotError>>,
> {
let _ = rpc;
unimplemented!("send_install_snapshot is deprecated")
}
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend,
option: RPCOption,
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>> {
use crate::network::stream_snapshot;
use crate::network::stream_snapshot::SnapshotTransport;

/// Send a RequestVote RPC to the target Raft node (§5).
#[deprecated(since = "0.8.4", note = "use `vote` instead. This method will be removed in 0.9")]
async fn send_vote(
&mut self,
rpc: VoteRequest<C::NodeId>,
) -> Result<VoteResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
let _ = rpc;
unimplemented!("send_vote is deprecated")
let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?;
Ok(resp)
}

/// Build a backoff instance if the target node is temporarily(or permanently) unreachable.
Expand Down
10 changes: 5 additions & 5 deletions openraft/src/raft/message/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ impl<C: RaftTypeConfig> MessageSummary<AppendEntriesRequest<C>> for AppendEntrie

/// The response to an `AppendEntriesRequest`.
///
/// [`RaftNetwork::send_append_entries`] returns this type only when received an RPC reply.
/// [`RaftNetwork::append_entries`] returns this type only when received an RPC reply.
/// Otherwise it should return [`RPCError`].
///
/// [`RPCError`]: crate::error::RPCError
/// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries
/// [`RaftNetwork::append_entries`]: crate::network::RaftNetwork::append_entries
#[derive(Debug)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
Expand All @@ -67,20 +67,20 @@ pub enum AppendEntriesResponse<NID: NodeId> {

/// Successfully sent the first portion of log entries.
///
/// [`RaftNetwork::send_append_entries`] can return a partial success.
/// [`RaftNetwork::append_entries`] can return a partial success.
/// For example, it tries to send log entries `[1-2..3-10]`, the application is allowed to send
/// just `[1-2..1-3]` and return `PartialSuccess(1-3)`,
///
/// ### Caution
///
/// The returned matching log id must be **greater than or equal to** the first log
/// id([`AppendEntriesRequest::prev_log_id`]) of the entries to send. If no RPC reply is
/// received, [`RaftNetwork::send_append_entries`] must return an [`RPCError`] to inform
/// received, [`RaftNetwork::append_entries`] must return an [`RPCError`] to inform
/// Openraft that the first log id([`AppendEntriesRequest::prev_log_id`]) may not match on
/// the remote target node.
///
/// [`RPCError`]: crate::error::RPCError
/// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries
/// [`RaftNetwork::append_entries`]: crate::network::RaftNetwork::append_entries
PartialSuccess(Option<LogId<NID>>),

/// The first log id([`AppendEntriesRequest::prev_log_id`]) of the entries to send does not
Expand Down
Loading

0 comments on commit 22cd3bb

Please sign in to comment.