Skip to content

Commit

Permalink
rpc: Read byte-by-byte from stream and properly handle lines.
Browse files Browse the repository at this point in the history
This wraps the reader side into a BufReader and read_from_stream now
reads byte-by-byte from the stream using an intermediate buffer.
The BufReader doc[0] claims that using this for such reading operation
is advantageous and useful for small and repeated reads to a socket.

read_from_stream stops reading after reaching LF, and optionally pops
CR from the back of the buffer to handle CRLF endings.

[0] https://docs.rs/smol/latest/smol/io/struct.BufReader.html
  • Loading branch information
parazyd committed Nov 20, 2023
1 parent 26ee589 commit 9066e55
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 40 deletions.
5 changes: 3 additions & 2 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::sync::Arc;

use log::{debug, error};
use smol::{channel, Executor};
use smol::{channel, io::BufReader, Executor};
use tinyjson::JsonValue;
use url::Url;

Expand Down Expand Up @@ -92,7 +92,8 @@ impl RpcClient {
) -> Result<()> {
debug!(target: "rpc::client::reqrep_loop()", "Starting reqrep loop");

let (mut reader, mut writer) = smol::io::split(stream);
let (reader, mut writer) = smol::io::split(stream);
let mut reader = BufReader::new(reader);

loop {
let mut buf = Vec::with_capacity(INIT_BUF_SIZE);
Expand Down
58 changes: 31 additions & 27 deletions src/rpc/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use std::time::Duration;

use smol::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};

use super::jsonrpc::*;
use crate::{error::RpcError, net::transport::PtStream, system::io_timeout, Result};
Expand All @@ -28,71 +28,75 @@ pub(super) const MAX_BUF_SIZE: usize = 1024 * 8192; // 8M
pub(super) const READ_TIMEOUT: Duration = Duration::from_secs(30);

/// Internal read function that reads from the active stream into a buffer.
/// Reading stops upon reaching CRLF or LF, or when `MAX_BUF_SIZE` is reached.
pub(super) async fn read_from_stream(
reader: &mut ReadHalf<Box<dyn PtStream>>,
reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
buf: &mut Vec<u8>,
with_timeout: bool,
) -> Result<usize> {
let mut total_read = 0;

// Intermediate buffer we use to read byte-by-byte.
let mut tmpbuf = [0_u8];

while total_read < MAX_BUF_SIZE {
buf.resize(total_read + INIT_BUF_SIZE, 0);

// Lame we have to duplicate this code, but it is what it is.
if with_timeout {
match io_timeout(READ_TIMEOUT, reader.read(&mut buf[total_read..])).await {
match io_timeout(READ_TIMEOUT, reader.read(&mut tmpbuf)).await {
Ok(0) if total_read == 0 => {
return Err(
RpcError::ConnectionClosed("Connection closed cleanly".to_string()).into()
)
}
Ok(0) => break, // Finished reading
Ok(n) => {
total_read += n;
if buf[total_read - 1] == b'\n' || buf[total_read - 1] == b'\r' {
// Check for '\n' or '\r' character
break
}
if total_read >= 2 &&
buf[total_read - 2] == b'\r' &&
buf[total_read - 1] == b'\n'
{
// Handle '\r\n' sequence
Ok(_) => {
// When we reach '\n', pop a possible '\r' from the buffer and bail.
if tmpbuf[0] == b'\n' {
if buf[total_read - 1] == b'\r' {
buf.pop();
total_read -= 1;
}
break
}

// Copy the read byte to the destination buffer.
buf[total_read] = tmpbuf[0];
total_read += 1;
}

Err(e) => return Err(RpcError::IoError(e.kind()).into()),
}
} else {
match reader.read(&mut buf[total_read..]).await {
match reader.read(&mut tmpbuf).await {
Ok(0) if total_read == 0 => {
return Err(
RpcError::ConnectionClosed("Connection closed cleanly".to_string()).into()
)
}
Ok(0) => break, // Finished reading
Ok(n) => {
total_read += n;
if buf[total_read - 1] == b'\n' || buf[total_read - 1] == b'\r' {
// Check for '\n' or '\r' character
break
}
if total_read >= 2 &&
buf[total_read - 2] == b'\r' &&
buf[total_read - 1] == b'\n'
{
// Handle '\r\n' sequence
Ok(_) => {
// When we reach '\n', pop a possible '\r' from the buffer and bail.
if tmpbuf[0] == b'\n' {
if buf[total_read - 1] == b'\r' {
buf.pop();
total_read -= 1;
}
break
}

// Copy the read byte to the destination buffer.
buf[total_read] = tmpbuf[0];
total_read += 1;
}

Err(e) => return Err(RpcError::IoError(e.kind()).into()),
}
}
}

// Truncate buffer to actual data size
// Trunacate buffer to actual data size
buf.truncate(total_read);
Ok(total_read)
}
Expand Down
17 changes: 6 additions & 11 deletions src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{collections::HashSet, io::ErrorKind, sync::Arc};
use async_trait::async_trait;
use log::{debug, error, info};
use smol::{
io::{ReadHalf, WriteHalf},
io::{BufReader, ReadHalf, WriteHalf},
lock::{Mutex, MutexGuard},
};
use tinyjson::JsonValue;
Expand Down Expand Up @@ -75,8 +75,9 @@ pub trait RequestHandler: Sync + Send {

/// Accept function that should run inside a loop for accepting incoming
/// JSON-RPC requests and passing them to the [`RequestHandler`].
#[allow(clippy::type_complexity)]
pub async fn accept(
reader: Arc<Mutex<ReadHalf<Box<dyn PtStream>>>>,
reader: Arc<Mutex<BufReader<ReadHalf<Box<dyn PtStream>>>>>,
writer: Arc<Mutex<WriteHalf<Box<dyn PtStream>>>>,
addr: Url,
rh: Arc<impl RequestHandler + 'static>,
Expand Down Expand Up @@ -105,7 +106,7 @@ pub async fn accept(
let _ = read_from_stream(&mut reader_lock, &mut buf, false).await?;
drop(reader_lock);

let read_string = match String::from_utf8(buf) {
let line = match String::from_utf8(buf) {
Ok(v) => v,
Err(e) => {
error!(
Expand All @@ -116,14 +117,8 @@ pub async fn accept(
}
};

// Implementation note:
// When using this JSON-RPC server with XMRig, an issue arises.
// XMRig tends to send something we do not parse as a line in
// read_from_stream(), so as a stop-gap hack we do this:
let line = read_string.trim().lines().take(1).next().unwrap();

// Parse the line as JSON
let val: JsonValue = match line.parse() {
let val: JsonValue = match line.trim().parse() {
Ok(v) => v,
Err(e) => {
error!(
Expand Down Expand Up @@ -283,7 +278,7 @@ async fn run_accept_loop(
info!(target: "rpc::server", "[RPC] Server accepted conn from {}", url);

let (reader, writer) = smol::io::split(stream);
let reader = Arc::new(Mutex::new(reader));
let reader = Arc::new(Mutex::new(BufReader::new(reader)));
let writer = Arc::new(Mutex::new(writer));

let task = StoppableTask::new();
Expand Down

0 comments on commit 9066e55

Please sign in to comment.