From e094b22a7c1d408d59d69c61af3aecae86128957 Mon Sep 17 00:00:00 2001 From: Shachar Date: Fri, 31 May 2024 13:38:09 +0300 Subject: [PATCH 1/9] Send retries in multi-node reconnect to new connection. This fixes a situtation where a multi-node request is sent after a disconnect, and is repeatedly being sent to the old, disconnected connection. This is caused by multi-node requests being routed with a specific connection, instead of looking for a new connection after a disconnect. https://github.com/amazon-contributing/redis-rs/issues/144 --- redis/src/cluster_async/mod.rs | 39 +++++---- redis/tests/support/cluster.rs | 62 +++++++++----- redis/tests/test_cluster_async.rs | 130 +++++++++++++++++++++++++++--- 3 files changed, 183 insertions(+), 48 deletions(-) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index c2640fa73..f679a27b4 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -606,26 +606,31 @@ impl RequestInfo { } } - fn reset_redirect(&mut self) { + fn reset_routing(&mut self) { + let fix_route = |route: &mut InternalSingleNodeRouting| { + match route { + InternalSingleNodeRouting::Redirect { + previous_routing, .. + } => { + let previous_routing = std::mem::take(previous_routing.as_mut()); + *route = previous_routing; + } + // If a specific connection is specified, then reconnecting without resetting the routing + // will mean that the request is still routed to the old connection. + InternalSingleNodeRouting::Connection { address, .. } => { + *route = InternalSingleNodeRouting::ByAddress(address.to_string()); + } + _ => {} + } + }; match &mut self.cmd { CmdArg::Cmd { routing, .. } => { - if let InternalRoutingInfo::SingleNode(InternalSingleNodeRouting::Redirect { - previous_routing, - .. - }) = routing - { - let previous_routing = std::mem::take(previous_routing.as_mut()); - *routing = previous_routing.into(); + if let InternalRoutingInfo::SingleNode(route) = routing { + fix_route(route); } } CmdArg::Pipeline { route, .. } => { - if let InternalSingleNodeRouting::Redirect { - previous_routing, .. - } = route - { - let previous_routing = std::mem::take(previous_routing.as_mut()); - *route = previous_routing; - } + fix_route(route); } // cluster_scan is sent as a normal command internally so we will not reach that point. CmdArg::ClusterScan { .. } => { @@ -742,7 +747,7 @@ impl Future for Request { OperationTarget::NotFound => { // TODO - this is essentially a repeat of the retirable error. probably can remove duplication. let mut request = this.request.take().unwrap(); - request.info.reset_redirect(); + request.info.reset_routing(); return Next::RefreshSlots { request, sleep_duration: Some(sleep_duration), @@ -784,7 +789,7 @@ impl Future for Request { crate::types::RetryMethod::Reconnect => { let mut request = this.request.take().unwrap(); // TODO should we reset the redirect here? - request.info.reset_redirect(); + request.info.reset_routing(); warn!("disconnected from {:?}", address); Next::Reconnect { request, diff --git a/redis/tests/support/cluster.rs b/redis/tests/support/cluster.rs index 3a11d95a4..29c97e54c 100644 --- a/redis/tests/support/cluster.rs +++ b/redis/tests/support/cluster.rs @@ -24,6 +24,7 @@ use tempfile::TempDir; use crate::support::{build_keys_and_certs_for_tls, Module}; +use super::get_random_available_port; #[cfg(feature = "tls-rustls")] use super::{build_single_client, load_certs_from_file}; @@ -80,6 +81,36 @@ fn port_in_use(addr: &str) -> bool { socket.connect(&socket_addr.into()).is_ok() } +pub struct RedisClusterConfiguration { + pub nodes: u16, + pub replicas: u16, + pub modules: Vec, + pub mtls_enabled: bool, + pub ports: Vec, +} + +impl RedisClusterConfiguration { + pub fn single_replica_config() -> Self { + Self { + nodes: 6, + replicas: 1, + ..Default::default() + } + } +} + +impl Default for RedisClusterConfiguration { + fn default() -> Self { + Self { + nodes: 3, + replicas: 0, + modules: vec![], + mtls_enabled: false, + ports: vec![], + } + } +} + pub struct RedisCluster { pub servers: Vec, pub folders: Vec, @@ -98,26 +129,21 @@ impl RedisCluster { pub fn client_name() -> &'static str { "test_cluster_client" } + pub fn new(configuration: RedisClusterConfiguration) -> RedisCluster { + let RedisClusterConfiguration { + nodes, + replicas, + modules, + mtls_enabled, + mut ports, + } = configuration; - pub fn new(nodes: u16, replicas: u16) -> RedisCluster { - RedisCluster::with_modules(nodes, replicas, &[], false) - } - - #[cfg(feature = "tls-rustls")] - pub fn new_with_mtls(nodes: u16, replicas: u16) -> RedisCluster { - RedisCluster::with_modules(nodes, replicas, &[], true) - } - - pub fn with_modules( - nodes: u16, - replicas: u16, - modules: &[Module], - mtls_enabled: bool, - ) -> RedisCluster { + if ports.is_empty() { + ports = (0..nodes).map(|_| get_random_available_port()).collect(); + } let mut servers = vec![]; let mut folders = vec![]; let mut addrs = vec![]; - let start_port = 7000; let mut tls_paths = None; let mut is_tls = false; @@ -136,9 +162,7 @@ impl RedisCluster { let max_attempts = 5; - for node in 0..nodes { - let port = start_port + node; - + for port in ports { servers.push(RedisServer::new_with_addr_tls_modules_and_spawner( ClusterType::build_addr(port), None, diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index afb248890..ee3d4c828 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -3324,6 +3324,118 @@ mod cluster_async { assert!(res.is_ok()); } + #[test] + fn test_async_cluster_handle_complete_server_disconnect_without_panicking() { + let cluster = + TestClusterContext::new_with_cluster_client_builder(|builder| builder.retries(2)); + block_on_all(async move { + let mut connection = cluster.async_connection().await; + drop(cluster); + for _ in 0..5 { + let cmd = cmd("PING"); + let result = connection + .route_command(&cmd, RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + .await; + // TODO - this should be a NoConnectionError, but ATM we get the errors from the failing + assert!(result.is_err()); + // This will route to all nodes - different path through the code. + let result = connection.req_packed_command(&cmd).await; + // TODO - this should be a NoConnectionError, but ATM we get the errors from the failing + assert!(result.is_err()); + } + Ok::<_, RedisError>(()) + }) + .unwrap(); + } + + #[test] + fn test_async_cluster_reconnect_after_complete_server_disconnect() { + let cluster = + TestClusterContext::new_with_cluster_client_builder(|builder| builder.retries(2)); + + block_on_all(async move { + let ports: Vec<_> = cluster + .nodes + .iter() + .map(|info| match info.addr { + redis::ConnectionAddr::Tcp(_, port) => port, + redis::ConnectionAddr::TcpTls { port, .. } => port, + redis::ConnectionAddr::Unix(_) => panic!("no unix sockets in cluster tests"), + }) + .collect(); + + let mut connection = cluster.async_connection().await; + drop(cluster); + + let cmd = cmd("PING"); + + let result = connection + .route_command(&cmd, RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + .await; + // TODO - this should be a NoConnectionError, but ATM we get the errors from the failing + assert!(result.is_err()); + + // This will route to all nodes - different path through the code. + let result = connection.req_packed_command(&cmd).await; + // TODO - this should be a NoConnectionError, but ATM we get the errors from the failing + assert!(result.is_err()); + + let _cluster = RedisCluster::new(RedisClusterConfiguration { + ports: ports.clone(), + ..Default::default() + }); + + let result = connection.req_packed_command(&cmd).await.unwrap(); + assert_eq!(result, Value::SimpleString("PONG".to_string())); + + Ok::<_, RedisError>(()) + }) + .unwrap(); + } + + #[test] + fn test_async_cluster_reconnect_after_complete_server_disconnect_route_to_many() { + let cluster = + TestClusterContext::new_with_cluster_client_builder(|builder| builder.retries(3)); + + block_on_all(async move { + let ports: Vec<_> = cluster + .nodes + .iter() + .map(|info| match info.addr { + redis::ConnectionAddr::Tcp(_, port) => port, + redis::ConnectionAddr::TcpTls { port, .. } => port, + redis::ConnectionAddr::Unix(_) => panic!("no unix sockets in cluster tests"), + }) + .collect(); + + let mut connection = cluster.async_connection().await; + drop(cluster); + + // recreate cluster + let _cluster = RedisCluster::new(RedisClusterConfiguration { + ports: ports.clone(), + ..Default::default() + }); + + let cmd = cmd("PING"); + // explicitly route to all primaries and request all succeeded + let result = connection + .route_command( + &cmd, + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllMasters, + Some(redis::cluster_routing::ResponsePolicy::AllSucceeded), + )), + ) + .await; + assert!(result.is_ok()); + + Ok::<_, RedisError>(()) + }) + .unwrap(); + } + #[test] fn test_async_cluster_saves_reconnected_connection() { let name = "test_async_cluster_saves_reconnected_connection"; @@ -3401,12 +3513,9 @@ mod cluster_async { #[test] fn test_async_cluster_periodic_checks_use_management_connection() { - let cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| builder.periodic_topology_checks(Duration::from_millis(10)), - false, - ); + let cluster = TestClusterContext::new_with_cluster_client_builder(|builder| { + builder.periodic_topology_checks(Duration::from_millis(10)) + }); block_on_all(async move { let mut connection = cluster.async_connection(None).await; @@ -3498,12 +3607,9 @@ mod cluster_async { // This test will check two aspects: // 1. Ensuring that after a disconnection in the management connection, a new management connection is established. // 2. Confirming that a failure in the management connection does not impact the user connection, which should remain intact. - let cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| builder.periodic_topology_checks(Duration::from_millis(10)), - false, - ); + let cluster = TestClusterContext::new_with_cluster_client_builder(|builder| { + builder.periodic_topology_checks(Duration::from_millis(10)) + }); block_on_all(async move { let mut connection = cluster.async_connection(None).await; let _client_list = "".to_string(); From 950798a1f77f4823baf77e5c08486bad2fe4d6ca Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Wed, 3 Apr 2024 23:35:46 +0300 Subject: [PATCH 2/9] Add configuration for cluster tests. This makes it easy to add new variables to the tests without requiring large modifications. --- redis/benches/bench_cluster.rs | 9 +-- redis/benches/bench_cluster_async.rs | 3 +- redis/tests/support/cluster.rs | 37 ++++++--- redis/tests/test_cluster.rs | 69 +++++++---------- redis/tests/test_cluster_async.rs | 108 +++++++++++++-------------- 5 files changed, 109 insertions(+), 117 deletions(-) diff --git a/redis/benches/bench_cluster.rs b/redis/benches/bench_cluster.rs index da854474a..b0ea3c773 100644 --- a/redis/benches/bench_cluster.rs +++ b/redis/benches/bench_cluster.rs @@ -77,7 +77,8 @@ fn bench_pipeline(c: &mut Criterion, con: &mut redis::cluster::ClusterConnection } fn bench_cluster_setup(c: &mut Criterion) { - let cluster = TestClusterContext::new(6, 1); + let cluster = + TestClusterContext::new_with_config(RedisClusterConfiguration::single_replica_config()); cluster.wait_for_cluster_up(); let mut con = cluster.connection(); @@ -87,11 +88,9 @@ fn bench_cluster_setup(c: &mut Criterion) { #[allow(dead_code)] fn bench_cluster_read_from_replicas_setup(c: &mut Criterion) { - let cluster = TestClusterContext::new_with_cluster_client_builder( - 6, - 1, + let cluster = TestClusterContext::new_with_config_and_builder( + RedisClusterConfiguration::single_replica_config(), |builder| builder.read_from_replicas(), - false, ); cluster.wait_for_cluster_up(); diff --git a/redis/benches/bench_cluster_async.rs b/redis/benches/bench_cluster_async.rs index 347908f9e..215920ddd 100644 --- a/redis/benches/bench_cluster_async.rs +++ b/redis/benches/bench_cluster_async.rs @@ -76,7 +76,8 @@ fn bench_cluster_async( } fn bench_cluster_setup(c: &mut Criterion) { - let cluster = TestClusterContext::new(6, 1); + let cluster = + TestClusterContext::new_with_config(RedisClusterConfiguration::single_replica_config()); cluster.wait_for_cluster_up(); let runtime = current_thread_runtime(); let mut con = runtime.block_on(cluster.async_connection(None)); diff --git a/redis/tests/support/cluster.rs b/redis/tests/support/cluster.rs index 29c97e54c..22ff91bea 100644 --- a/redis/tests/support/cluster.rs +++ b/redis/tests/support/cluster.rs @@ -168,7 +168,7 @@ impl RedisCluster { None, tls_paths.clone(), mtls_enabled, - modules, + &modules, |cmd| { let tempdir = tempfile::Builder::new() .prefix("redis") @@ -381,25 +381,40 @@ pub struct TestClusterContext { } impl TestClusterContext { - pub fn new(nodes: u16, replicas: u16) -> TestClusterContext { - Self::new_with_cluster_client_builder(nodes, replicas, identity, false) + pub fn new() -> TestClusterContext { + Self::new_with_config(RedisClusterConfiguration::default()) } - #[cfg(feature = "tls-rustls")] - pub fn new_with_mtls(nodes: u16, replicas: u16) -> TestClusterContext { - Self::new_with_cluster_client_builder(nodes, replicas, identity, true) + pub fn new_with_mtls() -> TestClusterContext { + Self::new_with_config_and_builder( + RedisClusterConfiguration { + mtls_enabled: true, + ..Default::default() + }, + identity, + ) + } + + pub fn new_with_config(cluster_config: RedisClusterConfiguration) -> TestClusterContext { + Self::new_with_config_and_builder(cluster_config, identity) + } + + pub fn new_with_cluster_client_builder(initializer: F) -> TestClusterContext + where + F: FnOnce(redis::cluster::ClusterClientBuilder) -> redis::cluster::ClusterClientBuilder, + { + Self::new_with_config_and_builder(RedisClusterConfiguration::default(), initializer) } - pub fn new_with_cluster_client_builder( - nodes: u16, - replicas: u16, + pub fn new_with_config_and_builder( + cluster_config: RedisClusterConfiguration, initializer: F, - mtls_enabled: bool, ) -> TestClusterContext where F: FnOnce(redis::cluster::ClusterClientBuilder) -> redis::cluster::ClusterClientBuilder, { - let cluster = RedisCluster::new(nodes, replicas); + let mtls_enabled = cluster_config.mtls_enabled; + let cluster = RedisCluster::new(cluster_config); let initial_nodes: Vec = cluster .iter_servers() .map(RedisServer::connection_info) diff --git a/redis/tests/test_cluster.rs b/redis/tests/test_cluster.rs index cbeddd2fe..cdcec61dd 100644 --- a/redis/tests/test_cluster.rs +++ b/redis/tests/test_cluster.rs @@ -17,7 +17,7 @@ mod cluster { #[test] fn test_cluster_basics() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut con = cluster.connection(); redis::cmd("SET") @@ -36,16 +36,11 @@ mod cluster { #[test] fn test_cluster_with_username_and_password() { - let cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| { - builder - .username(RedisCluster::username().to_string()) - .password(RedisCluster::password().to_string()) - }, - false, - ); + let cluster = TestClusterContext::new_with_cluster_client_builder(|builder| { + builder + .username(RedisCluster::username().to_string()) + .password(RedisCluster::password().to_string()) + }); cluster.disable_default_user(); let mut con = cluster.connection(); @@ -66,26 +61,19 @@ mod cluster { #[test] fn test_cluster_with_bad_password() { - let cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| { - builder - .username(RedisCluster::username().to_string()) - .password("not the right password".to_string()) - }, - false, - ); + let cluster = TestClusterContext::new_with_cluster_client_builder(|builder| { + builder + .username(RedisCluster::username().to_string()) + .password("not the right password".to_string()) + }); assert!(cluster.client.get_connection(None).is_err()); } #[test] fn test_cluster_read_from_replicas() { - let cluster = TestClusterContext::new_with_cluster_client_builder( - 6, - 1, + let cluster = TestClusterContext::new_with_config_and_builder( + RedisClusterConfiguration::single_replica_config(), |builder| builder.read_from_replicas(), - false, ); let mut con = cluster.connection(); @@ -107,7 +95,7 @@ mod cluster { #[test] fn test_cluster_eval() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut con = cluster.connection(); let rv = redis::cmd("EVAL") @@ -131,7 +119,7 @@ mod cluster { if use_protocol() == ProtocolVersion::RESP2 { return; } - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut connection = cluster.connection(); @@ -160,7 +148,7 @@ mod cluster { #[test] fn test_cluster_multi_shard_commands() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut connection = cluster.connection(); @@ -175,7 +163,7 @@ mod cluster { #[test] #[cfg(feature = "script")] fn test_cluster_script() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut con = cluster.connection(); let script = redis::Script::new( @@ -192,7 +180,7 @@ mod cluster { #[test] fn test_cluster_pipeline() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); cluster.wait_for_cluster_up(); let mut con = cluster.connection(); @@ -209,7 +197,7 @@ mod cluster { #[test] fn test_cluster_pipeline_multiple_keys() { use redis::FromRedisValue; - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); cluster.wait_for_cluster_up(); let mut con = cluster.connection(); @@ -245,7 +233,7 @@ mod cluster { #[test] fn test_cluster_pipeline_invalid_command() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); cluster.wait_for_cluster_up(); let mut con = cluster.connection(); @@ -359,7 +347,7 @@ mod cluster { #[test] fn test_cluster_pipeline_command_ordering() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); cluster.wait_for_cluster_up(); let mut con = cluster.connection(); let mut pipe = cluster_pipe(); @@ -385,7 +373,7 @@ mod cluster { #[test] #[ignore] // Flaky fn test_cluster_pipeline_ordering_with_improper_command() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); cluster.wait_for_cluster_up(); let mut con = cluster.connection(); let mut pipe = cluster_pipe(); @@ -975,12 +963,9 @@ mod cluster { #[test] fn test_cluster_with_client_name() { - let cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| builder.client_name(RedisCluster::client_name().to_string()), - false, - ); + let cluster = TestClusterContext::new_with_cluster_client_builder(|builder| { + builder.client_name(RedisCluster::client_name().to_string()) + }); let mut con = cluster.connection(); let client_info: String = redis::cmd("CLIENT").arg("INFO").query(&mut con).unwrap(); @@ -1047,7 +1032,7 @@ mod cluster { #[test] fn test_cluster_basics_with_mtls() { - let cluster = TestClusterContext::new_with_mtls(3, 0); + let cluster = TestClusterContext::new_with_mtls(); let client = create_cluster_client_from_cluster(&cluster, true).unwrap(); let mut con = client.get_connection(None).unwrap(); @@ -1068,7 +1053,7 @@ mod cluster { #[test] fn test_cluster_should_not_connect_without_mtls() { - let cluster = TestClusterContext::new_with_mtls(3, 0); + let cluster = TestClusterContext::new_with_mtls(); let client = create_cluster_client_from_cluster(&cluster, false).unwrap(); let connection = client.get_connection(None); diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index ee3d4c828..9786cbb05 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -39,7 +39,7 @@ mod cluster_async { #[test] fn test_async_cluster_basic_cmd() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); block_on_all(async move { let mut connection = cluster.async_connection(None).await; @@ -61,7 +61,7 @@ mod cluster_async { #[test] fn test_async_cluster_basic_eval() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); block_on_all(async move { let mut connection = cluster.async_connection(None).await; @@ -80,7 +80,7 @@ mod cluster_async { #[test] fn test_async_cluster_basic_script() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); block_on_all(async move { let mut connection = cluster.async_connection(None).await; @@ -99,7 +99,7 @@ mod cluster_async { #[test] fn test_async_cluster_route_flush_to_specific_node() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); block_on_all(async move { let mut connection = cluster.async_connection(None).await; @@ -134,7 +134,7 @@ mod cluster_async { #[test] fn test_async_cluster_route_flush_to_node_by_address() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); block_on_all(async move { let mut connection = cluster.async_connection(None).await; @@ -174,7 +174,11 @@ mod cluster_async { #[test] fn test_async_cluster_route_info_to_nodes() { - let cluster = TestClusterContext::new(12, 1); + let cluster = TestClusterContext::new_with_config(RedisClusterConfiguration { + nodes: 12, + replicas: 1, + ..Default::default() + }); let split_to_addresses_and_info = |res| -> (Vec, Vec) { if let Value::Map(values) = res { @@ -257,7 +261,7 @@ mod cluster_async { return; } block_on_all(async move { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut connection = cluster.async_connection(None).await; @@ -292,7 +296,7 @@ mod cluster_async { #[test] fn test_async_cluster_basic_pipe() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); block_on_all(async move { let mut connection = cluster.async_connection(None).await; @@ -311,7 +315,7 @@ mod cluster_async { #[test] fn test_async_cluster_multi_shard_commands() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); block_on_all(async move { let mut connection = cluster.async_connection(None).await; @@ -330,7 +334,15 @@ mod cluster_async { #[test] fn test_async_cluster_basic_failover() { block_on_all(async move { - test_failover(&TestClusterContext::new(6, 1), 10, 123, false).await; + test_failover( + &TestClusterContext::new_with_config( + RedisClusterConfiguration::single_replica_config(), + ), + 10, + 123, + false, + ) + .await; Ok::<_, RedisError>(()) }) .unwrap() @@ -518,7 +530,7 @@ mod cluster_async { #[test] fn test_async_cluster_error_in_inner_connection() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); block_on_all(async move { let mut con = cluster.async_generic_connection::().await; @@ -543,7 +555,7 @@ mod cluster_async { #[test] #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] fn test_async_cluster_async_std_basic_cmd() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); block_on_all_using_async_std(async { let mut connection = cluster.async_connection(None).await; @@ -2026,16 +2038,11 @@ mod cluster_async { #[test] fn test_async_cluster_with_username_and_password() { - let cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| { - builder - .username(RedisCluster::username().to_string()) - .password(RedisCluster::password().to_string()) - }, - false, - ); + let cluster = TestClusterContext::new_with_cluster_client_builder(|builder| { + builder + .username(RedisCluster::username().to_string()) + .password(RedisCluster::password().to_string()) + }); cluster.disable_default_user(); block_on_all(async move { @@ -2275,12 +2282,8 @@ mod cluster_async { #[test] fn test_async_cluster_handle_complete_server_disconnect_without_panicking() { - let cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| builder.retries(2), - false, - ); + let cluster = + TestClusterContext::new_with_cluster_client_builder(|builder| builder.retries(2)); block_on_all(async move { let mut connection = cluster.async_connection(None).await; drop(cluster); @@ -2303,12 +2306,8 @@ mod cluster_async { #[test] fn test_async_cluster_reconnect_after_complete_server_disconnect() { - let cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| builder.retries(2), - false, - ); + let cluster = + TestClusterContext::new_with_cluster_client_builder(|builder| builder.retries(2)); block_on_all(async move { let mut connection = cluster.async_connection(None).await; @@ -2327,12 +2326,9 @@ mod cluster_async { // TODO - this should be a NoConnectionError, but ATM we get the errors from the failing assert!(result.is_err()); - let _cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| builder.retries(2), - false, - ); + let _cluster = TestClusterContext::new_with_cluster_client_builder(|builder| { + builder.retries(2) + }); let result = connection.req_packed_command(&cmd).await.unwrap(); assert_eq!(result, Value::SimpleString("PONG".to_string())); @@ -3034,11 +3030,13 @@ mod cluster_async { // Since we are not executing key-based commands, we won't encounter MOVED errors that trigger a slot refresh. // Consequently, we anticipate that only the periodic topology check will detect this change and trigger topology refresh. // If successful, the node to which we route the CLUSTER NODES command should be the newly promoted node with a different node ID. - let cluster = TestClusterContext::new_with_cluster_client_builder( - 6, - 1, + let cluster = TestClusterContext::new_with_config_and_builder( + RedisClusterConfiguration { + nodes: 6, + replicas: 1, + ..Default::default() + }, |builder| builder.periodic_topology_checks(Duration::from_millis(10)), - false, ); block_on_all(async move { @@ -3096,12 +3094,9 @@ mod cluster_async { // This test aims to verify that the management connections used for periodic checks are reconnected, in case that they get killed. // In order to test this, we choose a single node, kill all connections to it which aren't user connections, and then wait until new // connections are created. - let cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| builder.periodic_topology_checks(Duration::from_millis(10)), - false, - ); + let cluster = TestClusterContext::new_with_cluster_client_builder(|builder| { + builder.periodic_topology_checks(Duration::from_millis(10)) + }); block_on_all(async move { let routing = RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new( @@ -3141,12 +3136,9 @@ mod cluster_async { #[test] fn test_async_cluster_with_client_name() { - let cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| builder.client_name(RedisCluster::client_name().to_string()), - false, - ); + let cluster = TestClusterContext::new_with_cluster_client_builder(|builder| { + builder.client_name(RedisCluster::client_name().to_string()) + }); block_on_all(async move { let mut connection = cluster.async_connection(None).await; @@ -3677,7 +3669,7 @@ mod cluster_async { #[test] fn test_async_cluster_basic_cmd_with_mtls() { - let cluster = TestClusterContext::new_with_mtls(3, 0); + let cluster = TestClusterContext::new_with_mtls(); block_on_all(async move { let client = create_cluster_client_from_cluster(&cluster, true).unwrap(); let mut connection = client.get_async_connection(None).await.unwrap(); @@ -3699,7 +3691,7 @@ mod cluster_async { #[test] fn test_async_cluster_should_not_connect_without_mtls_enabled() { - let cluster = TestClusterContext::new_with_mtls(3, 0); + let cluster = TestClusterContext::new_with_mtls(); block_on_all(async move { let client = create_cluster_client_from_cluster(&cluster, false).unwrap(); let connection = client.get_async_connection(None).await; From b45ca56faac9d953cd0014d826fc8a7f77d2aee5 Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Tue, 14 May 2024 10:40:38 +0300 Subject: [PATCH 3/9] Handle errors even when out of retries. Async cluster connections now can handle request errors even when the request shouldn't retry. Before this change, topology refreshes and reconnects only happened on retries. This change ensures that they will happen regardless of retries. --- redis/src/cluster_async/mod.rs | 90 ++++++---- redis/tests/test_cluster_async.rs | 271 +++++++++++++++++++++--------- 2 files changed, 248 insertions(+), 113 deletions(-) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index f679a27b4..ede357411 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -680,15 +680,18 @@ enum Next { address: ArcStr, }, Reconnect { - request: PendingRequest, + // if not set, then a reconnect should happen without sending a request afterwards + request: Option>, target: ArcStr, }, RefreshSlots { - request: PendingRequest, + // if not set, then a slot refresh should happen without sending a request afterwards + request: Option>, sleep_duration: Option, }, ReconnectToInitialNodes { - request: PendingRequest, + // if not set, then a reconnect should happen without sending a request afterwards + request: Option>, }, Done, } @@ -719,16 +722,39 @@ impl Future for Request { } Err((target, err)) => { let request = this.request.as_mut().unwrap(); - + // TODO - would be nice if we didn't need to repeat this code twice, with & without retries. if request.retry >= this.retry_params.number_of_retries { + let next = if err.kind() == ErrorKind::ClusterConnectionNotFound { + Next::ReconnectToInitialNodes { request: None }.into() + } else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect) + || matches!(target, OperationTarget::NotFound) + { + Next::RefreshSlots { + request: None, + sleep_duration: None, + } + .into() + } else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) { + if let OperationTarget::Node { address } = target { + Next::Reconnect { + request: None, + target: address, + } + .into() + } else { + Next::Done.into() + } + } else { + Next::Done.into() + }; self.respond(Err(err)); - return Next::Done.into(); + return next; } request.retry = request.retry.saturating_add(1); if err.kind() == ErrorKind::ClusterConnectionNotFound { return Next::ReconnectToInitialNodes { - request: this.request.take().unwrap(), + request: Some(this.request.take().unwrap()), } .into(); } @@ -749,7 +775,7 @@ impl Future for Request { let mut request = this.request.take().unwrap(); request.info.reset_routing(); return Next::RefreshSlots { - request, + request: Some(request), sleep_duration: Some(sleep_duration), } .into(); @@ -773,7 +799,7 @@ impl Future for Request { .map(|(node, _slot)| Redirect::Moved(node.to_string())), ); Next::RefreshSlots { - request, + request: Some(request), sleep_duration: None, } .into() @@ -792,7 +818,7 @@ impl Future for Request { request.info.reset_routing(); warn!("disconnected from {:?}", address); Next::Reconnect { - request, + request: Some(request), target: address, } .into() @@ -1848,36 +1874,42 @@ where } => { poll_flush_action = poll_flush_action.change_state(PollFlushAction::RebuildSlots); - let future: RequestState< - Pin + Send>>, - > = match sleep_duration { - Some(sleep_duration) => RequestState::Sleep { - sleep: boxed_sleep(sleep_duration), - }, - None => RequestState::Future { - future: Box::pin(Self::try_request( - request.info.clone(), - self.inner.clone(), - )), - }, - }; - self.in_flight_requests.push(Box::pin(Request { - retry_params: self.inner.cluster_params.retry_params.clone(), - request: Some(request), - future, - })); + if let Some(request) = request { + let future: RequestState< + Pin + Send>>, + > = match sleep_duration { + Some(sleep_duration) => RequestState::Sleep { + sleep: boxed_sleep(sleep_duration), + }, + None => RequestState::Future { + future: Box::pin(Self::try_request( + request.info.clone(), + self.inner.clone(), + )), + }, + }; + self.in_flight_requests.push(Box::pin(Request { + retry_params: self.inner.cluster_params.retry_params.clone(), + request: Some(request), + future, + })); + } } Next::Reconnect { request, target, .. } => { poll_flush_action = poll_flush_action.change_state(PollFlushAction::Reconnect(vec![target])); - self.inner.pending_requests.lock().unwrap().push(request); + if let Some(request) = request { + self.inner.pending_requests.lock().unwrap().push(request); + } } Next::ReconnectToInitialNodes { request } => { poll_flush_action = poll_flush_action .change_state(PollFlushAction::ReconnectFromInitialConnections); - self.inner.pending_requests.lock().unwrap().push(request); + if let Some(request) = request { + self.inner.pending_requests.lock().unwrap().push(request); + } } } } diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index 9786cbb05..c3a094690 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -36,6 +36,12 @@ mod cluster_async { use crate::support::*; use tokio::sync::mpsc; + fn broken_pipe_error() -> RedisError { + RedisError::from(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "mock-io-error", + )) + } #[test] fn test_async_cluster_basic_cmd() { @@ -913,7 +919,7 @@ mod cluster_async { } started.store(true, atomic::Ordering::SeqCst); - if contains_slice(cmd, b"PING") { + if contains_slice(cmd, b"PING") || contains_slice(cmd, b"SETNAME") { return Err(Ok(Value::SimpleString("OK".into()))); } @@ -1112,6 +1118,175 @@ mod cluster_async { } } + #[test] + fn test_async_cluster_refresh_topology_even_with_zero_retries() { + let name = "test_async_cluster_refresh_topology_even_with_zero_retries"; + + let should_refresh = atomic::AtomicBool::new(false); + + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(0), + name, + move |cmd: &[u8], port| { + if !should_refresh.load(atomic::Ordering::SeqCst) { + respond_startup(name, cmd)?; + } + + if contains_slice(cmd, b"PING") || contains_slice(cmd, b"SETNAME") { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + return Err(Ok(Value::Array(vec![ + Value::Array(vec![ + Value::Int(0), + Value::Int(1), + Value::Array(vec![ + Value::BulkString(name.as_bytes().to_vec()), + Value::Int(6379), + ]), + ]), + Value::Array(vec![ + Value::Int(2), + Value::Int(16383), + Value::Array(vec![ + Value::BulkString(name.as_bytes().to_vec()), + Value::Int(6380), + ]), + ]), + ]))); + } + + if contains_slice(cmd, b"GET") { + let get_response = Err(Ok(Value::BulkString(b"123".to_vec()))); + match port { + 6380 => get_response, + // Respond that the key exists on a node that does not yet have a connection: + _ => { + // Should not attempt to refresh slots more than once: + assert!(!should_refresh.swap(true, Ordering::SeqCst)); + Err(parse_redis_value( + format!("-MOVED 123 {name}:6380\r\n").as_bytes(), + )) + } + } + } else { + panic!("unexpected command {:?}", String::from_utf8(cmd.to_vec())) + } + }, + ); + + let value = runtime.block_on( + cmd("GET") + .arg("test") + .query_async::<_, Option>(&mut connection), + ); + + // The user should receive an initial error, because there are no retries and the first request failed. + assert_eq!( + value, + Err(RedisError::from(( + ErrorKind::Moved, + "An error was signalled by the server", + "test_async_cluster_refresh_topology_even_with_zero_retries:6380".to_string() + ))) + ); + + let value = runtime.block_on( + cmd("GET") + .arg("test") + .query_async::<_, Option>(&mut connection), + ); + + assert_eq!(value, Ok(Some(123))); + } + + #[test] + fn test_async_cluster_reconnect_even_with_zero_retries() { + let name = "test_async_cluster_reconnect_even_with_zero_retries"; + + let should_reconnect = atomic::AtomicBool::new(true); + let connection_count = Arc::new(atomic::AtomicU16::new(0)); + let connection_count_clone = connection_count.clone(); + + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(0), + name, + move |cmd: &[u8], port| { + match respond_startup(name, cmd) { + Ok(_) => {} + Err(err) => { + connection_count.fetch_add(1, Ordering::Relaxed); + return Err(err); + } + } + + if contains_slice(cmd, b"ECHO") && port == 6379 { + // Should not attempt to refresh slots more than once: + if should_reconnect.swap(false, Ordering::SeqCst) { + Err(Err(broken_pipe_error())) + } else { + Err(Ok(Value::BulkString(b"PONG".to_vec()))) + } + } else { + panic!("unexpected command {:?}", String::from_utf8(cmd.to_vec())) + } + }, + ); + + // We expect 6 calls in total. MockEnv creates both synchronous and asynchronous connections, which make the following calls: + // - 1 call by the sync connection to `CLUSTER SLOTS` for initializing the client's topology map. + // - 3 calls by the async connection to `PING`: one for the user connection when creating the node from initial addresses, + // and two more for checking the user and management connections during client initialization in `refresh_slots`. + // - 1 call by the async connection to `CLIENT SETNAME` for setting up the management connection name. + // - 1 call by the async connection to `CLUSTER SLOTS` for initializing the client's topology map. + // Note: If additional nodes or setup calls are added, this number should increase. + let expected_init_calls = 6; + assert_eq!( + connection_count_clone.load(Ordering::Relaxed), + expected_init_calls + ); + + let value = runtime.block_on(connection.route_command( + &cmd("ECHO"), + RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { + host: name.to_string(), + port: 6379, + }), + )); + + // The user should receive an initial error, because there are no retries and the first request failed. + assert_eq!( + value.unwrap_err().to_string(), + broken_pipe_error().to_string() + ); + + let value = runtime.block_on(connection.route_command( + &cmd("ECHO"), + RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { + host: name.to_string(), + port: 6379, + }), + )); + + assert_eq!(value, Ok(Value::BulkString(b"PONG".to_vec()))); + // `expected_init_calls` plus another PING for a new user connection created from refresh_connections + assert_eq!( + connection_count_clone.load(Ordering::Relaxed), + expected_init_calls + 1 + ); + } + #[test] fn test_async_cluster_ask_redirect() { let name = "node"; @@ -1214,10 +1389,7 @@ mod cluster_async { .. } = MockEnv::new(name, move |cmd: &[u8], port| { if port != 6379 && port != 6380 { - return Err(Err(RedisError::from(std::io::Error::new( - std::io::ErrorKind::BrokenPipe, - "mock-io-error", - )))); + return Err(Err(broken_pipe_error())); } respond_startup_two_nodes(name, cmd)?; let count = completed.fetch_add(1, Ordering::SeqCst); @@ -2304,40 +2476,6 @@ mod cluster_async { .unwrap(); } - #[test] - fn test_async_cluster_reconnect_after_complete_server_disconnect() { - let cluster = - TestClusterContext::new_with_cluster_client_builder(|builder| builder.retries(2)); - - block_on_all(async move { - let mut connection = cluster.async_connection(None).await; - drop(cluster); - for _ in 0..5 { - let cmd = cmd("PING"); - - let result = connection - .route_command(&cmd, RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) - .await; - // TODO - this should be a NoConnectionError, but ATM we get the errors from the failing - assert!(result.is_err()); - - // This will route to all nodes - different path through the code. - let result = connection.req_packed_command(&cmd).await; - // TODO - this should be a NoConnectionError, but ATM we get the errors from the failing - assert!(result.is_err()); - - let _cluster = TestClusterContext::new_with_cluster_client_builder(|builder| { - builder.retries(2) - }); - - let result = connection.req_packed_command(&cmd).await.unwrap(); - assert_eq!(result, Value::SimpleString("PONG".to_string())); - } - Ok::<_, RedisError>(()) - }) - .unwrap(); - } - #[test] fn test_async_cluster_restore_resp3_pubsub_state_after_complete_server_disconnect() { // let cluster = TestClusterContext::new_with_cluster_client_builder( @@ -2905,17 +3043,12 @@ mod cluster_async { ); } - let cluster = TestClusterContext::new_with_cluster_client_builder( - 3, - 0, - |builder| { - builder - .retries(3) - .use_protocol(ProtocolVersion::RESP3) - .pubsub_subscriptions(client_subscriptions.clone()) - }, - false, - ); + let cluster = TestClusterContext::new_with_cluster_client_builder(|builder| { + builder + .retries(3) + .use_protocol(ProtocolVersion::RESP3) + .pubsub_subscriptions(client_subscriptions.clone()) + }); block_on_all(async move { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); @@ -3316,30 +3449,6 @@ mod cluster_async { assert!(res.is_ok()); } - #[test] - fn test_async_cluster_handle_complete_server_disconnect_without_panicking() { - let cluster = - TestClusterContext::new_with_cluster_client_builder(|builder| builder.retries(2)); - block_on_all(async move { - let mut connection = cluster.async_connection().await; - drop(cluster); - for _ in 0..5 { - let cmd = cmd("PING"); - let result = connection - .route_command(&cmd, RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) - .await; - // TODO - this should be a NoConnectionError, but ATM we get the errors from the failing - assert!(result.is_err()); - // This will route to all nodes - different path through the code. - let result = connection.req_packed_command(&cmd).await; - // TODO - this should be a NoConnectionError, but ATM we get the errors from the failing - assert!(result.is_err()); - } - Ok::<_, RedisError>(()) - }) - .unwrap(); - } - #[test] fn test_async_cluster_reconnect_after_complete_server_disconnect() { let cluster = @@ -3356,7 +3465,7 @@ mod cluster_async { }) .collect(); - let mut connection = cluster.async_connection().await; + let mut connection = cluster.async_connection(None).await; drop(cluster); let cmd = cmd("PING"); @@ -3401,7 +3510,7 @@ mod cluster_async { }) .collect(); - let mut connection = cluster.async_connection().await; + let mut connection = cluster.async_connection(None).await; drop(cluster); // recreate cluster @@ -3467,10 +3576,7 @@ mod cluster_async { if connect_attempt > 5 { panic!("Too many pings!"); } - Err(Err(RedisError::from(std::io::Error::new( - std::io::ErrorKind::BrokenPipe, - "mock-io-error", - )))) + Err(Err(broken_pipe_error())) } else { respond_startup_two_nodes(name, cmd)?; let past_get_attempts = get_attempts.fetch_add(1, Ordering::Relaxed); @@ -3478,10 +3584,7 @@ mod cluster_async { if past_get_attempts == 0 { // Error once with io-error, ensure connection is reestablished w/out calling // other node (i.e., not doing a full slot rebuild) - Err(Err(RedisError::from(std::io::Error::new( - std::io::ErrorKind::BrokenPipe, - "mock-io-error", - )))) + Err(Err(broken_pipe_error())) } else { Err(Ok(Value::BulkString(b"123".to_vec()))) } From 0a47439b61155c36c1e39b50a637f3a959af1996 Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Tue, 26 Mar 2024 18:55:33 +0200 Subject: [PATCH 4/9] Update rustls + tokio-rustls + futures-rustls --- Cargo.lock | 202 +++++++++++++++++++++++++++++++++++-- redis/Cargo.toml | 8 +- redis/tests/support/mod.rs | 7 ++ 3 files changed, 205 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce94dfeaa..ff7322ddb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,6 +276,32 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-lc-rs" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df33e4a55b03f8780ba55041bc7be91a2a8ec8c03517b0379d2d6c96d2c30d95" +dependencies = [ + "aws-lc-sys", + "mirai-annotations", + "paste", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ede3d6e360a48436fee127cb81710834407b1ec0c48a001cc29dec9005f73e" +dependencies = [ + "bindgen", + "cmake", + "dunce", + "fs_extra", + "libc", + "paste", +] + [[package]] name = "backoff" version = "0.3.0" @@ -338,6 +364,29 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bindgen" +version = "0.69.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +dependencies = [ + "bitflags 2.4.2", + "cexpr", + "clang-sys", + "itertools", + "lazy_static", + "lazycell", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.48", + "which", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -451,6 +500,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -490,6 +548,17 @@ dependencies = [ "half", ] +[[package]] +name = "clang-sys" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "3.2.25" @@ -511,6 +580,15 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "cmake" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +dependencies = [ + "cc", +] + [[package]] name = "combine" version = "4.6.6" @@ -684,6 +762,12 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "dunce" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" + [[package]] name = "either" version = "1.9.0" @@ -797,6 +881,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "funty" version = "2.0.0" @@ -892,9 +982,9 @@ dependencies = [ [[package]] name = "futures-rustls" -version = "0.25.1" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8d8a2499f0fecc0492eb3e47eab4e92da7875e1028ad2528f214ac3346ca04e" +checksum = "a8f2f12607f92c69b12ed746fabf9ca4f5c482cba46679c1a75b874ed7c26adb" dependencies = [ "futures-io", "rustls", @@ -960,6 +1050,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "gloo-timers" version = "0.2.6" @@ -1017,6 +1113,15 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "idna" version = "0.5.0" @@ -1112,12 +1217,28 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "libloading" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" +dependencies = [ + "cfg-if", + "windows-targets 0.52.0", +] + [[package]] name = "libm" version = "0.2.8" @@ -1161,6 +1282,12 @@ version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -1181,6 +1308,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mirai-annotations" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" + [[package]] name = "native-tls" version = "0.2.11" @@ -1199,6 +1332,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "num-bigint" version = "0.4.4" @@ -1350,6 +1493,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1469,6 +1618,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d3928fb5db768cb86f891ff014f0144589297e3c6a1aba6ed7cecfdace270c7" +dependencies = [ + "proc-macro2", + "syn 2.0.48", +] + [[package]] name = "proc-macro-crate" version = "3.1.0" @@ -1817,6 +1976,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustix" version = "0.37.27" @@ -1846,11 +2011,13 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.3" +version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99008d7ad0bbbea527ec27bddbc0e432c5b87d8175178cee68d2eec9c4a1813c" +checksum = "8c4d6d8ad9f2492485e13453acbb291dd08f64441b6609c491f1c2cd2c6b4fe1" dependencies = [ + "aws-lc-rs", "log", + "once_cell", "ring", "rustls-pki-types", "rustls-webpki", @@ -1889,10 +2056,11 @@ checksum = "868e20fada228fefaf6b652e00cc73623d54f8171e7352c18bb281571f2d92da" [[package]] name = "rustls-webpki" -version = "0.102.1" +version = "0.102.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef4ca26037c909dedb327b48c3327d0ba91d3dd3c4e05dad328f210ffb68e95b" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -2009,6 +2177,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "simdutf8" version = "0.1.4" @@ -2270,9 +2444,9 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ "rustls", "rustls-pki-types", @@ -2528,6 +2702,18 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix 0.38.30", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/redis/Cargo.toml b/redis/Cargo.toml index 0d836a698..96ad5484a 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -78,11 +78,11 @@ tokio-native-tls = { version = "0.3", optional = true } async-native-tls = { version = "0.4", optional = true } # Only needed for rustls -rustls = { version = "0.22", optional = true } +rustls = { version = "0.23", optional = true } webpki-roots = { version = "0.26", optional = true } rustls-native-certs = { version = "0.7", optional = true } -tokio-rustls = { version = "0.25", optional = true } -futures-rustls = { version = "0.25", optional = true } +tokio-rustls = { version = "0.26", optional = true } +futures-rustls = { version = "0.26", optional = true } rustls-pemfile = { version = "2", optional = true } rustls-pki-types = { version = "1", optional = true } @@ -114,7 +114,7 @@ cluster = ["crc16", "rand", "derivative"] script = ["sha1_smol"] tls-native-tls = ["native-tls"] tls-rustls = ["rustls", "rustls-native-certs", "rustls-pemfile", "rustls-pki-types"] -tls-rustls-insecure = ["tls-rustls"] +tls-rustls-insecure = ["tls-rustls", "rustls/ring"] tls-rustls-webpki-roots = ["tls-rustls", "webpki-roots"] async-std-comp = ["aio", "async-std", "backoff-std-async"] async-std-native-tls-comp = ["async-std-comp", "async-native-tls", "tls-native-tls"] diff --git a/redis/tests/support/mod.rs b/redis/tests/support/mod.rs index b55049e7c..5a127daa3 100644 --- a/redis/tests/support/mod.rs +++ b/redis/tests/support/mod.rs @@ -246,6 +246,13 @@ impl RedisServer { modules: &[Module], spawner: F, ) -> RedisServer { + #[cfg(feature = "rustls")] + if rustls::crypto::CryptoProvider::get_default().is_none() { + rustls::crypto::ring::default_provider() + .install_default() + .unwrap(); + } + let mut redis_cmd = process::Command::new("redis-server"); if let Some(config_path) = config_file { From 50991a188b04a7d971226792ae1d4449280093c5 Mon Sep 17 00:00:00 2001 From: Shachar Date: Wed, 29 May 2024 16:10:55 +0300 Subject: [PATCH 5/9] Log the server / cluster logfile on error. This allows us to find errors that aren't visible in stdout/stderr. --- redis/tests/support/cluster.rs | 20 ++++++++- redis/tests/support/mod.rs | 74 ++++++++++++++++++++++++---------- 2 files changed, 71 insertions(+), 23 deletions(-) diff --git a/redis/tests/support/cluster.rs b/redis/tests/support/cluster.rs index 22ff91bea..ba6d3b658 100644 --- a/redis/tests/support/cluster.rs +++ b/redis/tests/support/cluster.rs @@ -3,6 +3,7 @@ use std::convert::identity; use std::env; +use std::io::Read; use std::process; use std::thread::sleep; use std::time::Duration; @@ -207,10 +208,24 @@ impl RedisCluster { let mut process = cmd.spawn().unwrap(); sleep(Duration::from_millis(100)); + let log_file_index = cmd.get_args().position(|arg|arg == "--logfile").unwrap() + 1; + let log_file_path = cmd.get_args().nth(log_file_index).unwrap(); match process.try_wait() { Ok(Some(status)) => { + let stdout = process.stdout.map_or(String::new(), |mut out|{ + let mut str = String::new(); + out.read_to_string(&mut str).unwrap(); + str + }); + let stderr = process.stderr.map_or(String::new(), |mut out|{ + let mut str = String::new(); + out.read_to_string(&mut str).unwrap(); + str + }); + + let log_file_contents = std::fs::read_to_string(log_file_path).unwrap(); let err = - format!("redis server creation failed with status {status:?}"); + format!("redis server creation failed with status {status:?}.\nstdout: `{stdout}`.\nstderr: `{stderr}`\nlog file: {log_file_contents}"); if cur_attempts == max_attempts { panic!("{err}"); } @@ -222,7 +237,8 @@ impl RedisCluster { let mut cur_attempts = 0; loop { if cur_attempts == max_attempts { - panic!("redis server creation failed: Port {port} closed") + let log_file_contents = std::fs::read_to_string(log_file_path).unwrap(); + panic!("redis server creation failed: Port {port} closed. {log_file_contents}") } if port_in_use(&addr) { return process; diff --git a/redis/tests/support/mod.rs b/redis/tests/support/mod.rs index 5a127daa3..860cc4eac 100644 --- a/redis/tests/support/mod.rs +++ b/redis/tests/support/mod.rs @@ -143,7 +143,8 @@ pub enum Module { pub struct RedisServer { pub process: process::Child, - pub(crate) tempdir: tempfile::TempDir, + tempdir: tempfile::TempDir, + log_file: PathBuf, pub(crate) addr: redis::ConnectionAddr, pub(crate) tls_paths: Option, } @@ -176,6 +177,10 @@ impl RedisServer { RedisServer::with_modules(&[], true) } + pub fn log_file_contents(&self) -> String { + std::fs::read_to_string(self.log_file.clone()).unwrap() + } + pub fn get_addr(port: u16) -> ConnectionAddr { let server_type = ServerType::get_intended(); match server_type { @@ -279,7 +284,8 @@ impl RedisServer { .prefix("redis") .tempdir() .expect("failed to create tempdir"); - redis_cmd.arg("--logfile").arg(Self::log_file(&tempdir)); + let log_file = Self::log_file(&tempdir); + redis_cmd.arg("--logfile").arg(log_file.clone()); match addr { redis::ConnectionAddr::Tcp(ref bind, server_port) => { redis_cmd @@ -290,6 +296,7 @@ impl RedisServer { RedisServer { process: spawner(&mut redis_cmd), + log_file, tempdir, addr, tls_paths: None, @@ -329,6 +336,7 @@ impl RedisServer { RedisServer { process: spawner(&mut redis_cmd), + log_file, tempdir, addr, tls_paths: Some(tls_paths), @@ -342,6 +350,7 @@ impl RedisServer { .arg(path); RedisServer { process: spawner(&mut redis_cmd), + log_file, tempdir, addr, tls_paths: None, @@ -446,15 +455,27 @@ impl TestContext { } pub fn with_tls(tls_files: TlsFilePaths, mtls_enabled: bool) -> TestContext { + Self::with_modules_and_tls(&[], mtls_enabled, Some(tls_files)) + } + + pub fn with_modules(modules: &[Module], mtls_enabled: bool) -> TestContext { + Self::with_modules_and_tls(modules, mtls_enabled, None) + } + + fn with_modules_and_tls( + modules: &[Module], + mtls_enabled: bool, + tls_files: Option, + ) -> Self { let redis_port = get_random_available_port(); let addr: ConnectionAddr = RedisServer::get_addr(redis_port); let server = RedisServer::new_with_addr_tls_modules_and_spawner( addr, None, - Some(tls_files), + tls_files, mtls_enabled, - &[], + modules, |cmd| { cmd.spawn() .unwrap_or_else(|err| panic!("Failed to run {cmd:?}: {err}")) @@ -467,25 +488,36 @@ impl TestContext { #[cfg(not(feature = "tls-rustls"))] let client = redis::Client::open(server.connection_info()).unwrap(); - Self::connect_with_retries(&client); + let mut con; - TestContext { - server, - client, - protocol: use_protocol(), + let millisecond = Duration::from_millis(1); + let mut retries = 0; + loop { + match client.get_connection() { + Err(err) => { + if err.is_connection_refusal() { + sleep(millisecond); + retries += 1; + if retries > 100000 { + panic!( + "Tried to connect too many times, last error: {err}, logfile: {}", + server.log_file_contents() + ); + } + } else { + panic!( + "Could not connect: {err}, logfile: {}", + server.log_file_contents() + ); + } + } + Ok(x) => { + con = x; + break; + } + } } - } - - pub fn with_modules(modules: &[Module], mtls_enabled: bool) -> TestContext { - let server = RedisServer::with_modules(modules, mtls_enabled); - - #[cfg(feature = "tls-rustls")] - let client = - build_single_client(server.connection_info(), &server.tls_paths, mtls_enabled).unwrap(); - #[cfg(not(feature = "tls-rustls"))] - let client = redis::Client::open(server.connection_info()).unwrap(); - - Self::connect_with_retries(&client); + redis::cmd("FLUSHDB").execute(&mut con); TestContext { server, From bb3396578656ad016c5852b0af086415b4e6635b Mon Sep 17 00:00:00 2001 From: barshaul Date: Wed, 26 Jun 2024 13:49:57 +0000 Subject: [PATCH 6/9] Fix --- redis/tests/support/mod.rs | 2 +- redis/tests/test_cluster_scan.rs | 30 ++++++++++++++++++------------ 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/redis/tests/support/mod.rs b/redis/tests/support/mod.rs index 860cc4eac..fa4fb07b7 100644 --- a/redis/tests/support/mod.rs +++ b/redis/tests/support/mod.rs @@ -493,7 +493,7 @@ impl TestContext { let millisecond = Duration::from_millis(1); let mut retries = 0; loop { - match client.get_connection() { + match client.get_connection(None) { Err(err) => { if err.is_connection_refusal() { sleep(millisecond); diff --git a/redis/tests/test_cluster_scan.rs b/redis/tests/test_cluster_scan.rs index 6ada32cbf..4a69c5d92 100644 --- a/redis/tests/test_cluster_scan.rs +++ b/redis/tests/test_cluster_scan.rs @@ -47,7 +47,7 @@ mod test_cluster_scan_async { #[tokio::test] async fn test_async_cluster_scan() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut connection = cluster.async_connection(None).await; // Set some keys @@ -88,7 +88,7 @@ mod test_cluster_scan_async { #[tokio::test] // test cluster scan with slot migration in the middle async fn test_async_cluster_scan_with_migration() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut connection = cluster.async_connection(None).await; // Set some keys @@ -163,7 +163,7 @@ mod test_cluster_scan_async { #[tokio::test] // test cluster scan with node fail in the middle async fn test_async_cluster_scan_with_fail() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut connection = cluster.async_connection(None).await; // Set some keys for i in 0..1000 { @@ -221,8 +221,11 @@ mod test_cluster_scan_async { #[tokio::test] // Test cluster scan with killing all masters during scan async fn test_async_cluster_scan_with_all_masters_down() { - let cluster = TestClusterContext::new(6, 1); - + let cluster = TestClusterContext::new_with_config(RedisClusterConfiguration { + nodes: 6, + replicas: 1, + ..Default::default() + }); let mut connection = cluster.async_connection(None).await; let mut expected_keys: Vec = Vec::new(); @@ -367,8 +370,11 @@ mod test_cluster_scan_async { #[tokio::test] // Test cluster scan with killing all replicas during scan async fn test_async_cluster_scan_with_all_replicas_down() { - let cluster = TestClusterContext::new(6, 1); - + let cluster = TestClusterContext::new_with_config(RedisClusterConfiguration { + nodes: 6, + replicas: 1, + ..Default::default() + }); let mut connection = cluster.async_connection(None).await; let mut expected_keys: Vec = Vec::new(); @@ -463,7 +469,7 @@ mod test_cluster_scan_async { #[tokio::test] // Test cluster scan with setting keys for each iteration async fn test_async_cluster_scan_set_in_the_middle() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut connection = cluster.async_connection(None).await; let mut expected_keys: Vec = Vec::new(); let mut i = 0; @@ -522,7 +528,7 @@ mod test_cluster_scan_async { #[tokio::test] // Test cluster scan with deleting keys for each iteration async fn test_async_cluster_scan_dell_in_the_middle() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut connection = cluster.async_connection(None).await; let mut expected_keys: Vec = Vec::new(); @@ -584,7 +590,7 @@ mod test_cluster_scan_async { #[tokio::test] // Testing cluster scan with Pattern option async fn test_async_cluster_scan_with_pattern() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut connection = cluster.async_connection(None).await; let mut expected_keys: Vec = Vec::new(); let mut i = 0; @@ -642,7 +648,7 @@ mod test_cluster_scan_async { #[tokio::test] // Testing cluster scan with TYPE option async fn test_async_cluster_scan_with_type() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut connection = cluster.async_connection(None).await; let mut expected_keys: Vec = Vec::new(); let mut i = 0; @@ -700,7 +706,7 @@ mod test_cluster_scan_async { #[tokio::test] // Testing cluster scan with COUNT option async fn test_async_cluster_scan_with_count() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new(); let mut connection = cluster.async_connection(None).await; let mut expected_keys: Vec = Vec::new(); let mut i = 0; From da4cbb0e8cad4fc8d6109a8097b8c6e476258ae9 Mon Sep 17 00:00:00 2001 From: barshaul Date: Wed, 26 Jun 2024 13:56:54 +0000 Subject: [PATCH 7/9] try to revoke cache --- redis/tests/test_cluster_async.rs | 45 ++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index c3a094690..399a8f841 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -2557,19 +2557,22 @@ mod cluster_async { // ) // ); // } - + // let ports: Vec<_> = cluster + // .nodes + // .iter() + // .map(|info| match info.addr { + // redis::ConnectionAddr::Tcp(_, port) => port, + // redis::ConnectionAddr::TcpTls { port, .. } => port, + // redis::ConnectionAddr::Unix(_) => panic!("no unix sockets in cluster tests"), + // }) + // .collect(); // // drop and recreate cluster and connections // drop(cluster); - // println!("*********** DROPPED **********"); - - // let cluster = TestClusterContext::new_with_cluster_client_builder( - // 3, - // 0, - // |builder| builder.retries(3).use_protocol(ProtocolVersion::RESP3), - // //|builder| builder.retries(3), - // false, - // ); - + // println!("*********** DROPPED **********");; + // let _cluster = RedisCluster::new(RedisClusterConfiguration { + // ports: ports.clone(), + // ..Default::default() + // }); // let result = connection // .route_command(&redis::Cmd::new().arg("PUBLISH").arg("test_channel").arg("test_message_from_node_0"), RoutingInfo::SingleNode(node_0_route.clone())) // .await; @@ -2776,7 +2779,15 @@ mod cluster_async { // ] // ) // ); - + // let ports: Vec<_> = cluster + // .nodes + // .iter() + // .map(|info| match info.addr { + // redis::ConnectionAddr::Tcp(_, port) => port, + // redis::ConnectionAddr::TcpTls { port, .. } => port, + // redis::ConnectionAddr::Unix(_) => panic!("no unix sockets in cluster tests"), + // }) + // .collect(); // // simulate scale in // drop(cluster); // println!("*********** DROPPED **********"); @@ -2920,7 +2931,15 @@ mod cluster_async { // ) // ); // } - + // let ports: Vec<_> = cluster + // .nodes + // .iter() + // .map(|info| match info.addr { + // redis::ConnectionAddr::Tcp(_, port) => port, + // redis::ConnectionAddr::TcpTls { port, .. } => port, + // redis::ConnectionAddr::Unix(_) => panic!("no unix sockets in cluster tests"), + // }) + // .collect(); // // drop and recreate cluster and connections // drop(cluster); // println!("*********** DROPPED **********"); From 4776cd710b6818d6cdf39d0169b5254786f27371 Mon Sep 17 00:00:00 2001 From: barshaul Date: Wed, 26 Jun 2024 14:00:01 +0000 Subject: [PATCH 8/9] Remove cache --- .github/workflows/rust.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index fc70af1f0..5c6170d8d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -65,8 +65,6 @@ jobs: toolchain: ${{ matrix.rust }} components: rustfmt - - uses: Swatinem/rust-cache@v2 - - uses: actions/checkout@v4 - name: Run tests From 6e2a22e6f1b9792b8d254abf463615b23f64f2f9 Mon Sep 17 00:00:00 2001 From: barshaul Date: Wed, 26 Jun 2024 15:03:53 +0000 Subject: [PATCH 9/9] Fix tests --- redis/tests/test_cluster_scan.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/tests/test_cluster_scan.rs b/redis/tests/test_cluster_scan.rs index 4a69c5d92..16a9a8e6b 100644 --- a/redis/tests/test_cluster_scan.rs +++ b/redis/tests/test_cluster_scan.rs @@ -770,7 +770,7 @@ mod test_cluster_scan_async { // Testing cluster scan when connection fails in the middle and we get an error // then cluster up again and scanning can continue without any problem async fn test_async_cluster_scan_failover() { - let mut cluster = TestClusterContext::new(3, 0); + let mut cluster = TestClusterContext::new(); let mut connection = cluster.async_connection(None).await; let mut i = 0; loop { @@ -811,7 +811,7 @@ mod test_cluster_scan_async { break; }; } - cluster = TestClusterContext::new(3, 0); + cluster = TestClusterContext::new(); connection = cluster.async_connection(None).await; loop { let scan_response: RedisResult<(ScanStateRC, Vec)> = connection