Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add json lines support to query output #25698

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions influxdb3/src/commands/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct Config {
enum Format {
Pretty,
Json,
JsonLines,
Csv,
Parquet,
}
Expand All @@ -83,6 +84,7 @@ impl From<Format> for influxdb3_client::Format {
match this {
Format::Pretty => Self::Pretty,
Format::Json => Self::Json,
Format::JsonLines => Self::JsonLines,
Format::Csv => Self::Csv,
Format::Parquet => Self::Parquet,
}
Expand Down
85 changes: 85 additions & 0 deletions influxdb3/tests/server/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,91 @@ async fn api_v1_query_sql_not_found() {
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}

#[tokio::test]
async fn api_v3_query_jsonl_format() {
let server = TestServer::spawn().await;

server
.write_lp_to_db(
"foo",
"cpu,host=a,region=us-east usage=0.9 1
cpu,host=b,region=us-east usage=0.50 1
cpu,host=a,region=us-east usage=0.80 2
cpu,host=b,region=us-east usage=0.60 2
cpu,host=a,region=us-east usage=0.70 3
cpu,host=b,region=us-east usage=0.70 3
cpu,host=a,region=us-east usage=0.50 4
cpu,host=b,region=us-east usage=0.80 4",
Precision::Second,
)
.await
.unwrap();

struct TestCase<'a> {
database: Option<&'a str>,
query: &'a str,
expected: String,
}

let test_cases = [
TestCase {
database: Some("foo"),
query: "SELECT time, host, region, usage FROM cpu",
expected: "{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:01\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.9}\n\
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to this PR, but we should look into changing the name of iox::measurement to influxdb3::table

{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:01\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.5}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:02\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.8}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:02\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.6}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:03\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.7}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:03\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.7}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:04\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.5}\n\
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:04\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.8}\n"
.into(),
},
TestCase {
database: Some("foo"),
query: "SHOW MEASUREMENTS",
expected: "{\"iox::measurement\":\"measurements\",\"name\":\"cpu\"}\n".into(),
},
TestCase {
database: Some("foo"),
query: "SHOW FIELD KEYS",
expected: "{\"iox::measurement\":\"cpu\",\"fieldKey\":\"usage\",\"fieldType\":\"float\"}\n".into()
},
TestCase {
database: Some("foo"),
query: "SHOW TAG KEYS",
expected:
"{\"iox::measurement\":\"cpu\",\"tagKey\":\"host\"}\n\
{\"iox::measurement\":\"cpu\",\"tagKey\":\"region\"}\n".into()
},
TestCase {
database: None,
query: "SHOW DATABASES",
expected: "{\"iox::database\":\"foo\"}\n".into(),
},
TestCase {
database: None,
query: "SHOW RETENTION POLICIES",
expected: "{\"iox::database\":\"foo\",\"name\":\"autogen\"}\n".into(),
},
];
for t in test_cases {
let mut params = vec![("q", t.query), ("format", "json_lines")];
if let Some(db) = t.database {
params.push(("db", db))
}
let resp = server
.api_v3_query_influxql(&params)
.await
.text()
.await
.unwrap();
println!("\n{q}", q = t.query);
println!("{resp}");
assert_eq!(t.expected, resp, "query failed: {q}", q = t.query);
}
}

#[tokio::test]
async fn api_v1_query_json_format() {
let server = TestServer::spawn().await;
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,8 @@ impl Display for QueryKind {
#[serde(rename_all = "snake_case")]
pub enum Format {
Json,
#[serde(rename = "jsonl")]
JsonLines,
Csv,
Parquet,
Pretty,
Expand Down
45 changes: 37 additions & 8 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use std::pin::Pin;
use std::str::Utf8Error;
use std::string::FromUtf8Error;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use thiserror::Error;
use unicode_segmentation::UnicodeSegmentation;
Expand Down Expand Up @@ -1259,6 +1260,7 @@ pub(crate) enum QueryFormat {
Csv,
Pretty,
Json,
JsonLines,
}

impl QueryFormat {
Expand All @@ -1268,6 +1270,7 @@ impl QueryFormat {
Self::Csv => "text/csv",
Self::Pretty => "text/plain; charset=utf-8",
Self::Json => "application/json",
Self::JsonLines => "application/jsonl",
}
}

Expand All @@ -1291,7 +1294,7 @@ impl QueryFormat {
}

async fn record_batch_stream_to_body(
stream: Pin<Box<dyn RecordBatchStream + Send>>,
mut stream: Pin<Box<dyn RecordBatchStream + Send>>,
format: QueryFormat,
) -> Result<Body, Error> {
fn to_json(batches: Vec<RecordBatch>) -> Result<Bytes> {
Expand Down Expand Up @@ -1333,15 +1336,41 @@ async fn record_batch_stream_to_body(
Ok(Bytes::from(bytes))
}

let batches = stream.try_collect::<Vec<RecordBatch>>().await?;

match format {
QueryFormat::Pretty => to_pretty(batches),
QueryFormat::Parquet => to_parquet(batches),
QueryFormat::Csv => to_csv(batches),
QueryFormat::Json => to_json(batches),
QueryFormat::Pretty => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_pretty(batches).map(Body::from)
}
QueryFormat::Parquet => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_parquet(batches).map(Body::from)
}
QueryFormat::Csv => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_csv(batches).map(Body::from)
}
QueryFormat::Json => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_json(batches).map(Body::from)
}
QueryFormat::JsonLines => {
let stream = futures::stream::poll_fn(move |ctx| match stream.poll_next_unpin(ctx) {
Poll::Ready(Some(batch)) => {
let mut writer = arrow_json::LineDelimitedWriter::new(Vec::new());
let batch = match batch {
Ok(batch) => batch,
Err(e) => return Poll::Ready(Some(Err(e))),
};
writer.write(&batch).unwrap();
writer.finish().unwrap();
Poll::Ready(Some(Ok(Bytes::from(writer.into_inner()))))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
});
Ok(Body::wrap_stream(stream))
}
}
.map(Body::from)
}

// This is a hack around the fact that bool default is false not true
Expand Down
Loading