Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add binary support #172

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 67 additions & 7 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
cluster_topology::SLOT_SIZE,
cmd,
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
FromRedisValue, InfoDict,
FromRedisValue, InfoDict, ToRedisArgs,
};
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
use async_std::task::{spawn, JoinHandle};
Expand Down Expand Up @@ -139,16 +139,16 @@ where
})
}

// Special handling for `SCAN` command, using cluster_scan
/// Special handling for `SCAN` command, using `cluster_scan`.
/// If you wish to use a match pattern, use [`cluster_scan_with_pattern`].
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
///
/// # Arguments
///
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `match_pattern` - An optional match pattern of requested keys.
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
Expand All @@ -174,7 +174,7 @@ where
/// let mut keys: Vec<String> = vec![];
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan(scan_state_rc, None, None, None).await.unwrap();
/// connection.cluster_scan(scan_state_rc, None, None).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
Expand All @@ -191,13 +191,73 @@ where
pub async fn cluster_scan(
&mut self,
scan_state_rc: ScanStateRC,
match_pattern: Option<&str>,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(scan_state_rc, None, count, object_type);
self.route_cluster_scan(cluster_scan_args).await
}

/// Special handling for `SCAN` command, using `cluster_scan_with_pattern`.
/// It is a special case of [`cluster_scan`], with an additional match pattern.
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
///
/// # Arguments
///
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `match_pattern` - A match pattern of requested keys.
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
///
/// # Returns
///
/// A [`ScanStateRC`] for the updated state of the scan and the vector of keys that were found in the scan.
/// structure of returned value:
/// `Ok((ScanStateRC, Vec<Value>))`
///
/// When the scan is finished [`ScanStateRC`] will be None, and can be checked by calling `scan_state_wrapper.is_finished()`.
///
/// # Example
/// ```rust,no_run
/// use redis::cluster::ClusterClient;
/// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType};
///
/// async fn scan_all_cluster() -> Vec<String> {
/// let nodes = vec!["redis://127.0.0.1/"];
/// let client = ClusterClient::new(nodes).unwrap();
/// let mut connection = client.get_async_connection(None).await.unwrap();
/// let mut scan_state_rc = ScanStateRC::new();
/// let mut keys: Vec<String> = vec![];
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
/// .map(|v| from_redis_value(&v).unwrap())
/// .collect::<Vec<String>>(); // Change the type of `keys` to `Vec<String>`
/// keys.append(&mut scan_keys);
/// if scan_state_rc.is_finished() {
/// break;
/// }
/// }
/// keys
/// }
/// ```
pub async fn cluster_scan_with_pattern<K: ToRedisArgs>(
&mut self,
scan_state_rc: ScanStateRC,
match_pattern: K,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(
scan_state_rc,
match_pattern.map(|s| s.to_string()),
Some(match_pattern.to_redis_args().concat()),
count,
object_type,
);
Expand Down
10 changes: 5 additions & 5 deletions redis/src/commands/cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ const BITS_ARRAY_SIZE: usize = NUM_OF_SLOTS / BITS_PER_U64;
const END_OF_SCAN: u16 = NUM_OF_SLOTS as u16 + 1;
type SlotsBitsArray = [u64; BITS_ARRAY_SIZE];

#[derive(Debug, Clone)]
#[derive(Clone)]
pub(crate) struct ClusterScanArgs {
pub(crate) scan_state_cursor: ScanStateRC,
match_pattern: Option<String>,
match_pattern: Option<Vec<u8>>,
count: Option<usize>,
object_type: Option<ObjectType>,
}
Expand All @@ -59,7 +59,7 @@ pub enum ObjectType {
impl ClusterScanArgs {
pub(crate) fn new(
scan_state_cursor: ScanStateRC,
match_pattern: Option<String>,
match_pattern: Option<Vec<u8>>,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> Self {
Expand Down Expand Up @@ -487,7 +487,7 @@ where
async fn send_scan<C>(
scan_state: &ScanState,
core: &C,
match_pattern: Option<String>,
match_pattern: Option<Vec<u8>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BytesMut is more efficient, any special reason why you choose Vec<u8> ? (BytesMut is similar to Vec<u8> but with less copies and allocations)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The built in function of the library is to_redis_args which return Vec

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to refactor but not at this point

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense not to make a drastic change at this point

count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<Value>
Expand Down Expand Up @@ -518,7 +518,7 @@ where
async fn retry_scan<C>(
scan_state: &ScanState,
core: &C,
match_pattern: Option<String>,
match_pattern: Option<Vec<u8>>,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<(RedisResult<Value>, ScanState)>
Expand Down
43 changes: 19 additions & 24 deletions redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ mod test_cluster_scan_async {
let mut keys: Vec<String> = vec![];
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, None, None)
.cluster_scan(scan_state_rc, None, None)
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -112,7 +112,7 @@ mod test_cluster_scan_async {
loop {
count += 1;
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, None, None)
.cluster_scan(scan_state_rc, None, None)
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -183,9 +183,8 @@ mod test_cluster_scan_async {
let mut result: RedisResult<Value> = Ok(Value::Nil);
loop {
count += 1;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
connection.cluster_scan(scan_state_rc, None, None).await;
let (next_cursor, scan_keys) = match scan_response {
Ok((cursor, keys)) => (cursor, keys),
Err(e) => {
Expand Down Expand Up @@ -256,9 +255,8 @@ mod test_cluster_scan_async {
let mut count = 0;
loop {
count += 1;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
connection.cluster_scan(scan_state_rc, None, None).await;
if scan_response.is_err() {
println!("error: {:?}", scan_response);
}
Expand Down Expand Up @@ -427,9 +425,8 @@ mod test_cluster_scan_async {
let mut count = 0;
loop {
count += 1;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
connection.cluster_scan(scan_state_rc, None, None).await;
if scan_response.is_err() {
println!("error: {:?}", scan_response);
}
Expand Down Expand Up @@ -497,7 +494,7 @@ mod test_cluster_scan_async {
let mut keys: Vec<String> = vec![];
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, None, None)
.cluster_scan(scan_state_rc, None, None)
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -557,7 +554,7 @@ mod test_cluster_scan_async {
let mut keys: Vec<String> = vec![];
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, None, None)
.cluster_scan(scan_state_rc, None, None)
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -625,7 +622,7 @@ mod test_cluster_scan_async {
let mut keys: Vec<String> = vec![];
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, Some("key:pattern:*"), None, None)
.cluster_scan_with_pattern(scan_state_rc, "key:pattern:*", None, None)
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -683,7 +680,7 @@ mod test_cluster_scan_async {
let mut keys: Vec<String> = vec![];
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, None, Some(ObjectType::Set))
.cluster_scan(scan_state_rc, None, Some(ObjectType::Set))
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -736,11 +733,11 @@ mod test_cluster_scan_async {
let mut comparing_times = 0;
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc.clone(), None, Some(100), None)
.cluster_scan(scan_state_rc.clone(), Some(100), None)
.await
.unwrap();
let (_, scan_without_count_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, Some(100), None)
.cluster_scan(scan_state_rc, Some(100), None)
.await
.unwrap();
if !scan_keys.is_empty() && !scan_without_count_keys.is_empty() {
Expand Down Expand Up @@ -795,9 +792,8 @@ mod test_cluster_scan_async {
let mut count = 0;
loop {
count += 1;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
connection.cluster_scan(scan_state_rc, None, None).await;
if scan_response.is_err() {
println!("error: {:?}", scan_response);
}
Expand All @@ -810,7 +806,7 @@ mod test_cluster_scan_async {
if count == 5 {
drop(cluster);
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc.clone(), None, None, None)
.cluster_scan(scan_state_rc.clone(), None, None)
.await;
assert!(scan_response.is_err());
break;
Expand All @@ -819,9 +815,8 @@ mod test_cluster_scan_async {
cluster = TestClusterContext::new(3, 0);
connection = cluster.async_connection(None).await;
loop {
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
connection.cluster_scan(scan_state_rc, None, None).await;
if scan_response.is_err() {
println!("error: {:?}", scan_response);
}
Expand Down