Query regarding Arrow Flight Server #5706
Replies: 4 comments 8 replies
-
My guess is I need to configure the tonic server to use a bigger message length but I could not find any resource for help |
Beta Was this translation helpful? Give feedback.
-
Yes I did it in there. I ever tried using it compression let svc = FlightServiceServer::new(service)
.max_encoding_message_size(usize::MAX)
.max_decoding_message_size(usize::MAX)
.send_compressed(CompressionEncoding::Gzip); But the result is the same. if I I found this issue in tonic. But it seems to be closed. |
Beta Was this translation helpful? Give feedback.
-
I dont think the load testing tool is the issue, because the ingestion takes place over http, and If I ingest the some data that is relatively small example [
{
"id": "8080",
"datetime": "24/Jun/2022:14:12:15 +0000",
"host": "153.10.110.81"
}
] there is no problem with the do_get request |
Beta Was this translation helpful? Give feedback.
-
I am invoking the request with Postman, and a simple client I wrote pub fn record_batches_to_json(records: &[&RecordBatch]) -> Vec<Map<String, Value>> {
let buf = vec![];
let mut writer = arrow_json::ArrayWriter::new(buf);
writer.write_batches(records).unwrap();
writer.finish().unwrap();
let buf = writer.into_inner();
let json_rows: Vec<Map<String, Value>> = match serde_json::from_reader(buf.as_slice()) {
Ok(json) => json,
Err(_) => vec![],
};
json_rows
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut args = args().collect::<Vec<_>>();
args.remove(0);
let end_time = args.pop().unwrap_or_else(|| "now".to_owned());
let start_time = args.pop().unwrap_or_else(|| "1min".to_owned());
let query = args
.pop()
.unwrap_or_else(|| "select * from teststream limit 1".to_owned());
println!("{}:{}:{}", query, start_time, end_time);
let channel = Channel::from_static("http://localhost:8002")
.connect()
.await?;
let client = FlightClient::new(channel);
let inn = client.into_inner().accept_compressed(tonic::codec::CompressionEncoding::Gzip).max_decoding_message_size(usize::MAX).max_encoding_message_size(4*1024*1024);
let mut client = FlightClient::new_from_inner(inn);
client.add_header("authorization", "Basic YWRtaW46YWRtaW4=")?;
let td = format!(
"{}\"query\":\"{}\", \"startTime\": \"{}\", \"endTime\": \"{}\"{}",
'{', query, start_time, end_time, '}'
);
let ticket_data = serde_json::from_str::<Value>(&td)?;
let mut ticket: Vec<u8> = vec![];
serde_json::to_writer(&mut ticket, &ticket_data)?;
let response = client
.do_get(Ticket {
ticket: ticket.into(),
})
.await?;
let batches: Vec<RecordBatch> = response.try_collect().await?;
dbg!(&batches);
let q = batches.iter().collect::<Vec<&RecordBatch>>();
let s = record_batches_to_json(&q);
let s = s.into_iter().map(Value::Object).collect::<Vec<_>>();
let q = serde_json::to_string_pretty(&s)?;
println!("{}", q);
Ok(())
} |
Beta Was this translation helpful? Give feedback.
-
Hi, I am trying to implement a basic Arrow Flight Server.
I am using a load testing tool to ingest data and query it with the arrow flight server.
When I run a query I get this error
Error, message length too large: found 5605265 bytes, the limit is: 4194304 bytes
Any help would be much appreciated.
Beta Was this translation helpful? Give feedback.
All reactions