Skip to content

Commit

Permalink
fix decode array (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
estin authored Jan 9, 2024
1 parent cce08bb commit ecaa42d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 4 deletions.
92 changes: 89 additions & 3 deletions src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,15 +603,64 @@ fn decode_bytes(buf: &mut BytesMut, idx: usize) -> DecodeResult {
}
}

fn is_array_ready_to_decode(
buf: &mut BytesMut,
idx: usize,
array_size: usize,
) -> Result<(bool, usize), Error> {
let mut items: usize = 0;
let mut pos = idx;

// counting beginning of array items in buffer by `\r\n<type>`
loop {
let Some(new_pos) = buf[pos..].windows(2).position(|w| w.starts_with(b"\r\n")) else {
break;
};

if pos + new_pos + 2 >= buf.len() {
break;
}
pos += new_pos + 2;

items += match &buf[pos] {
// check nested array and calc it as item
b'*' => match decode_length(buf, pos) {
Ok(Some((_, -1))) => 1,
Ok(Some((p, size))) if size >= 0 => {
let (ready, end_of_scan) = is_array_ready_to_decode(buf, p, size as usize)?;
// nested array isn't ready
if !ready {
return Ok((false, end_of_scan));
}
pos = end_of_scan;
1
}
Ok(Some((_, size))) => {
return Err(Error::Parse(format!("Invalid array size: {}", size)))
}
_ => 0,
},
// array item found
b'$' | b':' | b'+' | b'-' => 1,
_ => 0,
};

if array_size <= items {
return Ok((true, pos));
}
}

Ok((array_size <= items, pos))
}

fn decode_array(buf: &mut BytesMut, idx: usize) -> DecodeResult {
match decode_length(buf, idx)? {
Some((pos, -1)) => Ok(Some((pos, Response::Nil))),
Some((pos, size)) if size >= 0 => {
let size = size as usize;

// ensure all array items present in buffer
let items = buf[idx..].windows(2).filter(|w| w == b"\r\n").count() / 2;
if items < size {
let (is_ready, _) = is_array_ready_to_decode(buf, idx, size)?;
if !is_ready {
return Ok(None);
}

Expand Down Expand Up @@ -767,6 +816,43 @@ mod tests {

let deserialized = codec.decode(&mut bytes).unwrap().unwrap();
assert_eq!(deserialized, resp);

// $ echo -e "mget key1 key2\r\nquit" | curl -s telnet://localhost:6379 | python -c "import sys; print(repr(sys.stdin.read()))"
// '*2\r\n$-1\r\n$-1\r\n+OK\r\n'
let mut bytes = BytesMut::copy_from_slice(b"*2\r\n$-1\r\n$-1\r\n");
let result = codec.decode(&mut bytes).unwrap();

assert_eq!(
result,
Some(Response::Array(vec![Response::Nil, Response::Nil]))
);

// uncomplete nested array data
// [[['a']]]
// $ echo -e 'eval "return {{{\'a\'}}}" 0\r\nquit' | curl -s telnet://localhost:6379 | python -c "import sys; print(repr(sys.stdin.read()))"
// '*1\r\n*1\r\n*1\r\n$1\r\na\r\n+OK\r\n'
let mut bytes = BytesMut::copy_from_slice(b"*1\r\n*1\r\n*1\r\n");
let result = codec.decode(&mut bytes).unwrap();
assert_eq!(result, None);

// receiving remain parts
bytes.extend_from_slice(b"$1\r\na");
let result = codec.decode(&mut bytes).unwrap();
assert_eq!(result, None);

bytes.extend_from_slice(b"\r");
let result = codec.decode(&mut bytes).unwrap();
assert_eq!(result, None);

bytes.extend_from_slice(b"\n");
let result = codec.decode(&mut bytes).unwrap();

assert_eq!(
result,
Some(Response::Array(vec![Response::Array(vec![
Response::Array(vec![Response::Bytes(Bytes::from_static(b"a"))])
])]))
);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! let key = gen_random_key();
//!
//! // create list with one value
//! redis.exec(cmd::LPush(&key, "value"));
//! redis.exec(cmd::LPush(&key, "value")).await?;
//!
//! // get value by index
//! let value = redis.exec(cmd::LIndex(&key, 0)).await?;
Expand Down

0 comments on commit ecaa42d

Please sign in to comment.