Skip to content

Commit 53ecc35

Browse files
authored
Merge pull request #18 from Miaxos/feat-parse-proper-map-resp3
Feat: handle resp3 map
2 parents 4714808 + 836c177 commit 53ecc35

File tree

5 files changed

+279
-81
lines changed

5 files changed

+279
-81
lines changed

Cargo.lock

+13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/roster/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ default = []
2323

2424
[dependencies]
2525
anyhow.workspace = true
26+
async-recursion = "1"
2627
atoi_simd = "0.15"
2728
coarsetime = "0.1"
2829
crc = "3"
@@ -34,6 +35,7 @@ derive_builder.workspace = true
3435
dotenv.workspace = true
3536
futures = "0.3"
3637
futures-locks = "0.7"
38+
indexmap = "2"
3739
local-sync = "0.1"
3840
monoio = { workspace = true, features = ["bytes", "sync", "iouring"] }
3941
rustc-hash = "1.1.0"

app/roster/src/application/server/connection.rs

+4-80
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ use std::fmt::Debug;
22
use std::io::{self, Cursor};
33

44
use bytes::BytesMut;
5-
use monoio::buf::IoBuf;
65
use monoio::io::{
7-
AsyncReadRent, AsyncWriteRent, BufReader, BufWriter, OwnedReadHalf,
8-
OwnedWriteHalf, Splitable,
6+
AsyncReadRent, BufReader, BufWriter, OwnedReadHalf, OwnedWriteHalf,
7+
Splitable,
98
};
109
use monoio::net::TcpStream;
1110

11+
use super::frame::write::write_frame;
1212
use super::frame::Frame;
1313

1414
/// Send and receive `Frame` values from a remote peer.
@@ -179,83 +179,7 @@ impl WriteConnection {
179179
/// *buffered* write stream. The data will be written to the buffer.
180180
/// Once the buffer is full, it is flushed to the underlying socket.
181181
pub async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> {
182-
// Arrays are encoded by encoding each entry. All other frame types are
183-
// considered literals. For now, mini-redis is not able to encode
184-
// recursive frame structures. See below for more details.
185-
match frame {
186-
Frame::Array(val) => {
187-
// Encode the length of the array.
188-
self.stream_w.write(&[b'*']).await.0?;
189-
self.write_decimal(val.len() as u64).await?;
190-
191-
// Iterate and encode each entry in the array.
192-
for entry in &**val {
193-
self.write_value(entry).await?;
194-
}
195-
}
196-
// The frame type is a literal. Encode the value directly.
197-
_ => self.write_value(frame).await?,
198-
}
199-
200-
// Ensure the encoded frame is written to the socket. The calls above
201-
// are to the buffered stream and writes. Calling `flush` writes the
202-
// remaining contents of the buffer to the socket.
203-
self.stream_w.flush().await
204-
}
205-
206-
/// Write a frame literal to the stream
207-
async fn write_value(&mut self, frame: &Frame) -> io::Result<()> {
208-
match frame {
209-
Frame::Simple(val) => {
210-
self.stream_w.write(&[b'+']).await.0?;
211-
self.stream_w.write(val.as_bytes().slice(..)).await.0?;
212-
self.stream_w.write(&[b'\r', b'\n']).await.0?;
213-
}
214-
Frame::Error(val) => {
215-
self.stream_w.write(&[b'-']).await.0?;
216-
self.stream_w.write(val.as_bytes().slice(..)).await.0?;
217-
self.stream_w.write(&[b'\r', b'\n']).await.0?;
218-
}
219-
Frame::Integer(val) => {
220-
self.stream_w.write(&[b':']).await.0?;
221-
self.write_decimal(*val).await?;
222-
}
223-
Frame::Null => {
224-
self.stream_w.write(b"$-1\r\n").await.0?;
225-
}
226-
Frame::Bulk(val) => {
227-
let len = val.len();
228-
229-
self.stream_w.write([b'$'].as_slice()).await.0?;
230-
self.write_decimal(len as u64).await?;
231-
self.stream_w.write(val.slice(..)).await.0?;
232-
self.stream_w.write(&[b'\r', b'\n']).await.0?;
233-
}
234-
// Encoding an `Array` from within a value cannot be done using a
235-
// recursive strategy. In general, async fns do not support
236-
// recursion. Mini-redis has not needed to encode nested arrays yet,
237-
// so for now it is skipped.
238-
Frame::Array(_val) => unreachable!(),
239-
}
240-
241-
Ok(())
242-
}
243-
244-
/// Write a decimal frame to the stream_w
245-
async fn write_decimal(&mut self, val: u64) -> io::Result<()> {
246-
use std::io::Write;
247-
248-
// Convert the value to a string
249-
let buf = vec![0u8; 20];
250-
let mut buf = Cursor::new(buf);
251-
write!(&mut buf, "{}", val)?;
252-
253-
let pos = buf.position() as usize;
254-
255-
self.stream_w.write(buf.into_inner().slice(..pos)).await.0?;
256-
self.stream_w.write(b"\r\n").await.0?;
257-
258-
Ok(())
182+
write_frame(&mut self.stream_w, frame).await
259183
}
260184

261185
pub fn into_inner(self) -> OwnedWriteHalf<TcpStream> {

app/roster/src/application/server/frame.rs app/roster/src/application/server/frame/mod.rs

+68-1
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,49 @@ use std::string::FromUtf8Error;
88

99
use bytes::{Buf, Bytes, BytesMut};
1010
use bytestring::ByteString;
11+
use indexmap::IndexMap;
12+
13+
pub(crate) mod write;
1114

1215
/// A frame in the Redis protocol.
13-
#[derive(Clone, Debug)]
16+
#[derive(Clone, Debug, Eq, PartialEq)]
1417
pub enum Frame {
1518
Simple(ByteString),
1619
Error(ByteString),
1720
Integer(u64),
1821
Bulk(Bytes),
1922
Null,
2023
Array(Vec<Frame>),
24+
Map(IndexMap<Frame, Frame>),
25+
}
26+
27+
// TODO(@miaxos): hacky hash derivation for now, to test a little.
28+
impl core::hash::Hash for Frame {
29+
fn hash<H: core::hash::Hasher>(&self, ra_expand_state: &mut H) {
30+
core::mem::discriminant(self).hash(ra_expand_state);
31+
match self {
32+
Frame::Simple(f0) => {
33+
f0.hash(ra_expand_state);
34+
}
35+
Frame::Error(f0) => {
36+
f0.hash(ra_expand_state);
37+
}
38+
Frame::Integer(f0) => {
39+
f0.hash(ra_expand_state);
40+
}
41+
Frame::Bulk(f0) => {
42+
f0.hash(ra_expand_state);
43+
}
44+
Frame::Null => {}
45+
Frame::Array(f0) => {
46+
f0.hash(ra_expand_state);
47+
}
48+
Frame::Map(_) => {
49+
// TODO: Should test the behavior of redis in this case.
50+
unimplemented!("")
51+
}
52+
}
53+
}
2154
}
2255

2356
#[derive(thiserror::Error, Debug)]
@@ -68,6 +101,16 @@ impl Frame {
68101

69102
Ok(())
70103
}
104+
b'%' => {
105+
let len = get_decimal_mut(src)?;
106+
107+
// Key and value frames
108+
for _ in 0..(len * 2) {
109+
Frame::check(src)?;
110+
}
111+
112+
Ok(())
113+
}
71114
actual => Err(format!(
72115
"protocol error; invalid frame type byte `{}`",
73116
actual
@@ -137,6 +180,18 @@ impl Frame {
137180

138181
Ok(Frame::Array(out))
139182
}
183+
b'%' => {
184+
let len = get_decimal(src)?.try_into()?;
185+
let mut out = IndexMap::with_capacity(len);
186+
187+
for _ in (0..(len * 2)).step_by(2) {
188+
let key = Frame::parse(src)?;
189+
let value = Frame::parse(src)?;
190+
out.insert(key, value);
191+
}
192+
193+
Ok(Frame::Map(out))
194+
}
140195
_ => unimplemented!(),
141196
}
142197
}
@@ -321,4 +376,16 @@ mod tests {
321376
assert!(Frame::check(&mut cur).is_ok());
322377
}
323378
}
379+
380+
#[test]
381+
fn test_map_frame() {
382+
let test_case: Vec<&[u8]> =
383+
vec![b"%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n"];
384+
385+
for t in test_case {
386+
let b = BytesMut::from(t);
387+
let mut cur = Cursor::new(&b);
388+
assert!(Frame::check(&mut cur).is_ok());
389+
}
390+
}
324391
}

0 commit comments

Comments
 (0)