Skip to content

Commit

Permalink
feat: Add json lines support to query output
Browse files Browse the repository at this point in the history
This commit adds support for JSON Lines which also lets us stream the
body back to the consumer, rather than needing to buffer the entirety
of the query in memory before sending it back to the user.

Closes #24654
  • Loading branch information
mgattozzi committed Dec 20, 2024
1 parent 2a132f1 commit 353a151
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 8 deletions.
2 changes: 2 additions & 0 deletions influxdb3/src/commands/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct Config {
enum Format {
Pretty,
Json,
JsonLines,
Csv,
Parquet,
}
Expand All @@ -83,6 +84,7 @@ impl From<Format> for influxdb3_client::Format {
match this {
Format::Pretty => Self::Pretty,
Format::Json => Self::Json,
Format::JsonLines => Self::JsonLines,
Format::Csv => Self::Csv,
Format::Parquet => Self::Parquet,
}
Expand Down
86 changes: 86 additions & 0 deletions influxdb3/tests/server/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,92 @@ async fn api_v1_query_sql_not_found() {
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}

#[tokio::test]
async fn api_v3_query_jsonl_format() {
let server = TestServer::spawn().await;

server
.write_lp_to_db(
"foo",
"cpu,host=a,region=us-east usage=0.9 1
cpu,host=b,region=us-east usage=0.50 1
cpu,host=a,region=us-east usage=0.80 2
cpu,host=b,region=us-east usage=0.60 2
cpu,host=a,region=us-east usage=0.70 3
cpu,host=b,region=us-east usage=0.70 3
cpu,host=a,region=us-east usage=0.50 4
cpu,host=b,region=us-east usage=0.80 4",
Precision::Second,
)
.await
.unwrap();

struct TestCase<'a> {
database: Option<&'a str>,
query: &'a str,
expected: String,
}

let test_cases = [
TestCase {
database: Some("foo"),
query: "SELECT time, host, region, usage FROM cpu",
expected:
"{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:01\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.9}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:01\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.5}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:02\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.8}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:02\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.6}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:03\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.7}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:03\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.7}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:04\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.5}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:04\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.8}\n"
.into(),
},
TestCase {
database: Some("foo"),
query: "SHOW MEASUREMENTS",
expected: "{\"iox::measurement\":\"measurements\",\"name\":\"cpu\"}\n".into(),
},
TestCase {
database: Some("foo"),
query: "SHOW FIELD KEYS",
expected: "{\"iox::measurement\":\"cpu\",\"fieldKey\":\"usage\",\"fieldType\":\"float\"}\n".into()
},
TestCase {
database: Some("foo"),
query: "SHOW TAG KEYS",
expected:
"{\"iox::measurement\":\"cpu\",\"tagKey\":\"host\"}\n\
{\"iox::measurement\":\"cpu\",\"tagKey\":\"region\"}\n".into()
},
TestCase {
database: None,
query: "SHOW DATABASES",
expected: "{\"iox::database\":\"foo\"}\n".into(),
},
TestCase {
database: None,
query: "SHOW RETENTION POLICIES",
expected: "{\"iox::database\":\"foo\",\"name\":\"autogen\"}\n".into(),
},
];
for t in test_cases {
let mut params = vec![("q", t.query), ("format", "json_lines")];
if let Some(db) = t.database {
params.push(("db", db))
}
let resp = server
.api_v3_query_influxql(&params)
.await
.text()
.await
.unwrap();
println!("\n{q}", q = t.query);
println!("{resp}");
assert_eq!(t.expected, resp, "query failed: {q}", q = t.query);
}
}

#[tokio::test]
async fn api_v1_query_json_format() {
let server = TestServer::spawn().await;
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,8 @@ impl Display for QueryKind {
#[serde(rename_all = "snake_case")]
pub enum Format {
Json,
#[serde(rename = "jsonl")]
JsonLines,
Csv,
Parquet,
Pretty,
Expand Down
45 changes: 37 additions & 8 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use std::pin::Pin;
use std::str::Utf8Error;
use std::string::FromUtf8Error;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use thiserror::Error;
use unicode_segmentation::UnicodeSegmentation;
Expand Down Expand Up @@ -1259,6 +1260,7 @@ pub(crate) enum QueryFormat {
Csv,
Pretty,
Json,
JsonLines,
}

impl QueryFormat {
Expand All @@ -1268,6 +1270,7 @@ impl QueryFormat {
Self::Csv => "text/csv",
Self::Pretty => "text/plain; charset=utf-8",
Self::Json => "application/json",
Self::JsonLines => "application/jsonl",
}
}

Expand All @@ -1291,7 +1294,7 @@ impl QueryFormat {
}

async fn record_batch_stream_to_body(
stream: Pin<Box<dyn RecordBatchStream + Send>>,
mut stream: Pin<Box<dyn RecordBatchStream + Send>>,
format: QueryFormat,
) -> Result<Body, Error> {
fn to_json(batches: Vec<RecordBatch>) -> Result<Bytes> {
Expand Down Expand Up @@ -1333,15 +1336,41 @@ async fn record_batch_stream_to_body(
Ok(Bytes::from(bytes))
}

let batches = stream.try_collect::<Vec<RecordBatch>>().await?;

match format {
QueryFormat::Pretty => to_pretty(batches),
QueryFormat::Parquet => to_parquet(batches),
QueryFormat::Csv => to_csv(batches),
QueryFormat::Json => to_json(batches),
QueryFormat::Pretty => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_pretty(batches).map(Body::from)
}
QueryFormat::Parquet => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_parquet(batches).map(Body::from)
}
QueryFormat::Csv => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_csv(batches).map(Body::from)
}
QueryFormat::Json => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_json(batches).map(Body::from)
}
QueryFormat::JsonLines => {
let stream = futures::stream::poll_fn(move |ctx| match stream.poll_next_unpin(ctx) {
Poll::Ready(Some(batch)) => {
let mut writer = arrow_json::LineDelimitedWriter::new(Vec::new());
let batch = match batch {
Ok(batch) => batch,
Err(e) => return Poll::Ready(Some(Err(e))),
};
writer.write(&batch).unwrap();
writer.finish().unwrap();
Poll::Ready(Some(Ok(Bytes::from(writer.into_inner()))))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
});
Ok(Body::wrap_stream(stream))
}
}
.map(Body::from)
}

// This is a hack around the fact that bool default is false not true
Expand Down

0 comments on commit 353a151

Please sign in to comment.