Skip to content

Commit

Permalink
cancel on buffer full
Browse files Browse the repository at this point in the history
  • Loading branch information
blahgeek committed Jan 1, 2024
1 parent 4e7a7c4 commit 181ae5d
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 21 deletions.
129 changes: 129 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = { version = "1.0", features = ["derive"]}
serde_json = "1.0"
anyhow = "1.0"
lazy_static = "1.4"
smallvec = "1.11"
tempfile = "3.9"
env_logger = "0.10"
log = "0.4.20"

[[example]]
name = "native-json-parser"
Expand Down
100 changes: 80 additions & 20 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,85 @@
use anyhow::Result;
use std::sync::{mpsc, Arc, atomic::{AtomicI32, self}};

use log::{warn, info};
use anyhow::Result;
use serde_json as json;

use crate::{rpcio, bytecode};
use crate::lsp_message::{LspRequest, LspResponse, LspResponseError};

fn run_channel_to_writer(channel_sub: std::sync::mpsc::Receiver<String>,
writer: impl std::io::Write) -> Result<()> {
fn process_channel_to_writer(channel_sub: mpsc::Receiver<String>,
channel_counter: Option<Arc<AtomicI32>>,
writer: impl std::io::Write) -> Result<()> {
let mut bufwriter = std::io::BufWriter::new(writer);
for msg in channel_sub.iter() {
if let Some(ref channel_counter) = channel_counter {
channel_counter.fetch_add(-1, atomic::Ordering::AcqRel);
}
rpcio::rpc_write(&mut bufwriter, &msg)?;
}
Ok(())
}

fn run_reader_to_channel(reader: impl std::io::Read,
channel_pub: std::sync::mpsc::Sender<String>) -> Result<()> {
const MAX_PENDING_MSG_COUNT: i32 = 128;

// Read from client, write to server.
// Or, if server is blocked, reject the request and fake reply when possible
fn process_client_reader(reader: impl std::io::Read,
server_channel_pub: mpsc::Sender<String>,
server_channel_counter: Arc<AtomicI32>,
client_channel_pub: mpsc::Sender<String>) -> Result<()> {
let mut bufreader = std::io::BufReader::new(reader);
loop {
let msg = rpcio::rpc_read(&mut bufreader)?;
if msg.is_empty() {
break
}
channel_pub.send(msg)?;

if server_channel_counter.load(atomic::Ordering::Acquire) >= MAX_PENDING_MSG_COUNT {
let lsp_request: LspRequest = json::from_str(&msg)?;
// only cancel when it's not notification
if !lsp_request.is_notification() {
warn!("Buffer full, rejecting request: {} (id={:?})",
lsp_request.method, lsp_request.id);
let resp = LspResponse {
jsonrpc: lsp_request.jsonrpc,
id: lsp_request.id.unwrap(),
result: json::Value::Null,
error: Some(LspResponseError {
code: -32803,
message: "[emacs-lsp-booster] Server is busy".to_string(),
}),
};
client_channel_pub.send(json::to_string(&resp)?)?;
continue;
}
}

server_channel_pub.send(msg)?;
server_channel_counter.fetch_add(1, atomic::Ordering::AcqRel);
}

Ok(())
}

fn run_read_to_bytecode_to_channel(reader: impl std::io::Read,
channel_pub: std::sync::mpsc::Sender<String>) -> Result<()> {
fn process_server_reader(reader: impl std::io::Read,
channel_pub: mpsc::Sender<String>) -> Result<()> {
let mut bufreader = std::io::BufReader::new(reader);
loop {
let msg = rpcio::rpc_read(&mut bufreader)?;
if msg.is_empty() {
break
}
let json_val = json::from_str(&msg)?;
let bytecode_str = bytecode::generate_bytecode_repl(&json_val)?;
channel_pub.send(bytecode_str)?;
match bytecode::generate_bytecode_repl(&json_val) {
Ok(bytecode_str) => {
channel_pub.send(bytecode_str)?;
},
Err(e) => {
warn!("Failed to generate bytecode: {}; fallback to original json", e);
channel_pub.send(msg)?;
},
}
}
Ok(())
}
Expand All @@ -49,21 +93,37 @@ pub fn run_app_forever(client_reader: impl std::io::Read + Send + 'static,
.stderr(std::process::Stdio::inherit())
.spawn()?;

let (c2s_channel_pub, c2s_channel_sub) = std::sync::mpsc::channel::<String>();
let (s2c_channel_pub, s2c_channel_sub) = std::sync::mpsc::channel::<String>();
let (c2s_channel_pub, c2s_channel_sub) = mpsc::channel::<String>();
let c2s_channel_counter = Arc::new(AtomicI32::new(0));
let (s2c_channel_pub, s2c_channel_sub) = mpsc::channel::<String>();

let threads = vec![
{
let c2s_channel_counter = c2s_channel_counter.clone();
std::thread::spawn(move || {
info!("Started client->server write thread");
process_channel_to_writer(c2s_channel_sub, Some(c2s_channel_counter), proc.stdin.unwrap()).unwrap();
info!("Finished client->server write thread");
})
},
std::thread::spawn(move || {
run_channel_to_writer(c2s_channel_sub, proc.stdin.unwrap()).unwrap()
}),
std::thread::spawn(move || {
run_channel_to_writer(s2c_channel_sub, client_writer).unwrap()
}),
std::thread::spawn(move || {
run_read_to_bytecode_to_channel(proc.stdout.unwrap(), s2c_channel_pub).unwrap()
info!("Started server->client write thread");
process_channel_to_writer(s2c_channel_sub, None, client_writer).unwrap();
info!("Finished server->client write thread");
}),
{
let s2c_channel_pub = s2c_channel_pub.clone();
std::thread::spawn(move || {
info!("Started server->client read thread");
process_server_reader(proc.stdout.unwrap(), s2c_channel_pub).unwrap();
info!("Finished server->client read thread");
})
},
std::thread::spawn(move || {
run_reader_to_channel(client_reader, c2s_channel_pub).unwrap()
info!("Started client->server read thread");
process_client_reader(
client_reader, c2s_channel_pub, c2s_channel_counter, s2c_channel_pub).unwrap();
info!("Finished client->server read thread");
}),
];

Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod bytecode;
pub mod bytecode;
mod rpcio;
mod lsp_message;
pub mod app;
Loading

0 comments on commit 181ae5d

Please sign in to comment.