Skip to content

Commit

Permalink
Merge pull request databendlabs#998 from getong/fix-example-code
Browse files Browse the repository at this point in the history
Refactor: delete async-trait, fix typo, use tokio to replace async_std
  • Loading branch information
drmingdrmer authored Jan 29, 2024
2 parents 06b431b + abdc33c commit 3ed4228
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 24 deletions.
1 change: 0 additions & 1 deletion examples/raft-kv-memstore-singlethreaded/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ repository = "https://github.com/datafuselabs/openraft"
[dependencies]
openraft = { path = "../../openraft", features = ["serde", "storage-v2", "singlethreaded"] }

async-trait = "0.1.36"
clap = { version = "4.1.11", features = ["derive", "env"] }
reqwest = { version = "0.11.9", features = ["json"] }
serde = { version = "1.0.114", features = ["derive"] }
Expand Down
1 change: 0 additions & 1 deletion examples/raft-kv-memstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ path = "src/bin/main.rs"
openraft = { path = "../../openraft", features = ["serde", "storage-v2"] }

actix-web = "4.0.0-rc.2"
async-trait = "0.1.36"
clap = { version = "4.1.11", features = ["derive", "env"] }
reqwest = { version = "0.11.9", features = ["json"] }
serde = { version = "1.0.114", features = ["derive"] }
Expand Down
10 changes: 7 additions & 3 deletions examples/raft-kv-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ path = "src/bin/main.rs"
[dependencies]
openraft = { path = "../../openraft", features = ["serde", "storage-v2"] }

async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
async-trait = "0.1.36"
tokio = { version = "1.35.1", features = ["full"] }
byteorder = "1.4.3"
clap = { version = "4.1.11", features = ["derive", "env"] }
reqwest = { version = "0.11.9", features = ["json"] }
Expand All @@ -34,7 +33,12 @@ serde_json = "1.0.57"
tide = { version = "0.16" }
# for toy-rpc, use `serde_json` instead of the default `serde_bincode`:
# bincode which enabled by default by toy-rpc, does not support `#[serde(flatten)]`: https://docs.rs/bincode/2.0.0-alpha.1/bincode/serde/index.html#known-issues
toy-rpc = { version = "0.8.6", default-features = false, features = [ "serde_json", "ws_async_std", "server", "client", "async_std_runtime", ] }
toy-rpc = { version = "0.8.6", features = [
"ws_tokio",
"server",
"client",
"tokio_runtime",
] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }

Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-rocksdb/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use async_std::sync::RwLock;
use openraft::Config;
use tokio::sync::RwLock;

use crate::ExampleRaft;
use crate::NodeId;
Expand All @@ -12,7 +12,7 @@ use crate::NodeId;
pub struct App {
pub id: NodeId,
pub api_addr: String,
pub rcp_addr: String,
pub rpc_addr: String,
pub raft: ExampleRaft,
pub key_values: Arc<RwLock<BTreeMap<String, String>>>,
pub config: Arc<Config>,
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct Opt {
pub rpc_addr: String,
}

#[async_std::main]
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Setup the logger
tracing_subscriber::fmt()
Expand Down
12 changes: 6 additions & 6 deletions examples/raft-kv-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::io::Cursor;
use std::path::Path;
use std::sync::Arc;

use async_std::net::TcpListener;
use async_std::task;
use openraft::Config;
use openraft::TokioRuntime;
use tokio::net::TcpListener;
use tokio::task;

use crate::app::App;
use crate::network::api;
Expand Down Expand Up @@ -79,7 +79,7 @@ pub async fn start_example_raft_node<P>(
node_id: NodeId,
dir: P,
http_addr: String,
rcp_addr: String,
rpc_addr: String,
) -> std::io::Result<()>
where
P: AsRef<Path>,
Expand Down Expand Up @@ -107,7 +107,7 @@ where
let app = Arc::new(App {
id: node_id,
api_addr: http_addr.clone(),
rcp_addr: rcp_addr.clone(),
rpc_addr: rpc_addr.clone(),
raft,
key_values: kvs,
config,
Expand All @@ -117,7 +117,7 @@ where

let server = toy_rpc::Server::builder().register(echo_service).build();

let listener = TcpListener::bind(rcp_addr).await.unwrap();
let listener = TcpListener::bind(rpc_addr).await.unwrap();
let handle = task::spawn(async move {
server.accept_websocket(listener).await.unwrap();
});
Expand All @@ -130,6 +130,6 @@ where
api::rest(&mut app);

app.listen(http_addr).await?;
handle.await;
_ = handle.await;
Ok(())
}
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/network/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn init(req: Request<Arc<App>>) -> tide::Result {
let mut nodes = BTreeMap::new();
let node = Node {
api_addr: req.state().api_addr.clone(),
rpc_addr: req.state().rcp_addr.clone(),
rpc_addr: req.state().rpc_addr.clone(),
};

nodes.insert(req.state().id, node);
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::ops::RangeBounds;
use std::path::Path;
use std::sync::Arc;

use async_std::sync::RwLock;
use byteorder::BigEndian;
use byteorder::ReadBytesExt;
use byteorder::WriteBytesExt;
Expand Down Expand Up @@ -35,6 +34,7 @@ use rocksdb::Options;
use rocksdb::DB;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::RwLock;

use crate::typ;
use crate::Node;
Expand Down
19 changes: 11 additions & 8 deletions examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use std::panic::PanicInfo;
use std::thread;
use std::time::Duration;

use async_std::task::block_on;
use maplit::btreemap;
use maplit::btreeset;
use raft_kv_rocksdb::client::ExampleClient;
use raft_kv_rocksdb::start_example_raft_node;
use raft_kv_rocksdb::store::Request;
use raft_kv_rocksdb::Node;
use tokio::runtime::Handle;
use tracing_subscriber::EnvFilter;

pub fn log_panic(panic: &PanicInfo) {
Expand All @@ -36,7 +36,7 @@ pub fn log_panic(panic: &PanicInfo) {

/// Setup a cluster of 3 nodes.
/// Write to it and read from it.
#[async_std::test(flavor = "multi_thread", worker_threads = 8)]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {
// --- The client itself does not store addresses for all nodes, but just node id.
// Thus we need a supporting component to provide mapping from node id to node address.
Expand Down Expand Up @@ -77,23 +77,26 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {
let d2 = tempfile::TempDir::new()?;
let d3 = tempfile::TempDir::new()?;

let handle = Handle::current();
let handle_clone = handle.clone();
let _h1 = thread::spawn(move || {
let x = block_on(start_example_raft_node(1, d1.path(), get_addr(1), get_rpc_addr(1)));
let x = handle_clone.block_on(start_example_raft_node(1, d1.path(), get_addr(1), get_rpc_addr(1)));
println!("x: {:?}", x);
});

let handle_clone = handle.clone();
let _h2 = thread::spawn(move || {
let x = block_on(start_example_raft_node(2, d2.path(), get_addr(2), get_rpc_addr(2)));
let x = handle_clone.block_on(start_example_raft_node(2, d2.path(), get_addr(2), get_rpc_addr(2)));
println!("x: {:?}", x);
});

let _h3 = thread::spawn(move || {
let x = block_on(start_example_raft_node(3, d3.path(), get_addr(3), get_rpc_addr(3)));
let x = handle.block_on(start_example_raft_node(3, d3.path(), get_addr(3), get_rpc_addr(3)));
println!("x: {:?}", x);
});

// Wait for server to start up.
async_std::task::sleep(Duration::from_millis(1_000)).await;
tokio::time::sleep(Duration::from_millis(1_000)).await;

// --- Create a client to the first node, as a control handle to the cluster.

Expand Down Expand Up @@ -172,7 +175,7 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {

// --- Wait for a while to let the replication get done.

async_std::task::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(500)).await;

// --- Read it on every node.

Expand Down Expand Up @@ -200,7 +203,7 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {
})
.await?;

async_std::task::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(500)).await;

// --- Read it on every node.

Expand Down

0 comments on commit 3ed4228

Please sign in to comment.