Skip to content

Commit

Permalink
feat: Add json lines support to query output (#25698)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgattozzi authored Dec 20, 2024
1 parent 048e45f commit c764d37
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 8 deletions.
2 changes: 2 additions & 0 deletions influxdb3/src/commands/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct Config {
enum Format {
Pretty,
Json,
JsonLines,
Csv,
Parquet,
}
Expand All @@ -83,6 +84,7 @@ impl From<Format> for influxdb3_client::Format {
match this {
Format::Pretty => Self::Pretty,
Format::Json => Self::Json,
Format::JsonLines => Self::JsonLines,
Format::Csv => Self::Csv,
Format::Parquet => Self::Parquet,
}
Expand Down
85 changes: 85 additions & 0 deletions influxdb3/tests/server/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,91 @@ async fn api_v1_query_sql_not_found() {
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}

#[tokio::test]
async fn api_v3_query_jsonl_format() {
let server = TestServer::spawn().await;

server
.write_lp_to_db(
"foo",
"cpu,host=a,region=us-east usage=0.9 1
cpu,host=b,region=us-east usage=0.50 1
cpu,host=a,region=us-east usage=0.80 2
cpu,host=b,region=us-east usage=0.60 2
cpu,host=a,region=us-east usage=0.70 3
cpu,host=b,region=us-east usage=0.70 3
cpu,host=a,region=us-east usage=0.50 4
cpu,host=b,region=us-east usage=0.80 4",
Precision::Second,
)
.await
.unwrap();

struct TestCase<'a> {
database: Option<&'a str>,
query: &'a str,
expected: String,
}

let test_cases = [
TestCase {
database: Some("foo"),
query: "SELECT time, host, region, usage FROM cpu",
expected: "{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:01\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.9}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:01\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.5}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:02\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.8}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:02\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.6}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:03\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.7}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:03\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.7}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:04\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.5}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:04\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.8}\n"
.into(),
},
TestCase {
database: Some("foo"),
query: "SHOW MEASUREMENTS",
expected: "{\"iox::measurement\":\"measurements\",\"name\":\"cpu\"}\n".into(),
},
TestCase {
database: Some("foo"),
query: "SHOW FIELD KEYS",
expected: "{\"iox::measurement\":\"cpu\",\"fieldKey\":\"usage\",\"fieldType\":\"float\"}\n".into()
},
TestCase {
database: Some("foo"),
query: "SHOW TAG KEYS",
expected:
"{\"iox::measurement\":\"cpu\",\"tagKey\":\"host\"}\n\
{\"iox::measurement\":\"cpu\",\"tagKey\":\"region\"}\n".into()
},
TestCase {
database: None,
query: "SHOW DATABASES",
expected: "{\"iox::database\":\"foo\"}\n".into(),
},
TestCase {
database: None,
query: "SHOW RETENTION POLICIES",
expected: "{\"iox::database\":\"foo\",\"name\":\"autogen\"}\n".into(),
},
];
for t in test_cases {
let mut params = vec![("q", t.query), ("format", "json_lines")];
if let Some(db) = t.database {
params.push(("db", db))
}
let resp = server
.api_v3_query_influxql(&params)
.await
.text()
.await
.unwrap();
println!("\n{q}", q = t.query);
println!("{resp}");
assert_eq!(t.expected, resp, "query failed: {q}", q = t.query);
}
}

#[tokio::test]
async fn api_v1_query_json_format() {
let server = TestServer::spawn().await;
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,8 @@ impl Display for QueryKind {
#[serde(rename_all = "snake_case")]
pub enum Format {
Json,
#[serde(rename = "jsonl")]
JsonLines,
Csv,
Parquet,
Pretty,
Expand Down
45 changes: 37 additions & 8 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use std::pin::Pin;
use std::str::Utf8Error;
use std::string::FromUtf8Error;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use thiserror::Error;
use unicode_segmentation::UnicodeSegmentation;
Expand Down Expand Up @@ -1262,6 +1263,7 @@ pub(crate) enum QueryFormat {
Csv,
Pretty,
Json,
JsonLines,
}

impl QueryFormat {
Expand All @@ -1271,6 +1273,7 @@ impl QueryFormat {
Self::Csv => "text/csv",
Self::Pretty => "text/plain; charset=utf-8",
Self::Json => "application/json",
Self::JsonLines => "application/jsonl",
}
}

Expand All @@ -1294,7 +1297,7 @@ impl QueryFormat {
}

async fn record_batch_stream_to_body(
stream: Pin<Box<dyn RecordBatchStream + Send>>,
mut stream: Pin<Box<dyn RecordBatchStream + Send>>,
format: QueryFormat,
) -> Result<Body, Error> {
fn to_json(batches: Vec<RecordBatch>) -> Result<Bytes> {
Expand Down Expand Up @@ -1336,15 +1339,41 @@ async fn record_batch_stream_to_body(
Ok(Bytes::from(bytes))
}

let batches = stream.try_collect::<Vec<RecordBatch>>().await?;

match format {
QueryFormat::Pretty => to_pretty(batches),
QueryFormat::Parquet => to_parquet(batches),
QueryFormat::Csv => to_csv(batches),
QueryFormat::Json => to_json(batches),
QueryFormat::Pretty => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_pretty(batches).map(Body::from)
}
QueryFormat::Parquet => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_parquet(batches).map(Body::from)
}
QueryFormat::Csv => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_csv(batches).map(Body::from)
}
QueryFormat::Json => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_json(batches).map(Body::from)
}
QueryFormat::JsonLines => {
let stream = futures::stream::poll_fn(move |ctx| match stream.poll_next_unpin(ctx) {
Poll::Ready(Some(batch)) => {
let mut writer = arrow_json::LineDelimitedWriter::new(Vec::new());
let batch = match batch {
Ok(batch) => batch,
Err(e) => return Poll::Ready(Some(Err(e))),
};
writer.write(&batch).unwrap();
writer.finish().unwrap();
Poll::Ready(Some(Ok(Bytes::from(writer.into_inner()))))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
});
Ok(Body::wrap_stream(stream))
}
}
.map(Body::from)
}

// This is a hack around the fact that bool default is false not true
Expand Down

0 comments on commit c764d37

Please sign in to comment.