Skip to content

Commit 552828e

Browse files
committed
Raw JSON writer (#5314)
1 parent a0148ba commit 552828e

File tree

3 files changed

+478
-43
lines changed

3 files changed

+478
-43
lines changed

arrow-json/src/writer.rs

+58-41
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,6 @@
2020
//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of
2121
//! JSON objects or JSON formatted byte streams.
2222
//!
23-
//! ## Writing JSON Objects
24-
//!
25-
//! To serialize [`RecordBatch`]es into array of
26-
//! [JSON](https://docs.serde.rs/serde_json/) objects, use
27-
//! [`record_batches_to_json_rows`]:
28-
//!
29-
//! ```
30-
//! # use std::sync::Arc;
31-
//! # use arrow_array::{Int32Array, RecordBatch};
32-
//! # use arrow_schema::{DataType, Field, Schema};
33-
//!
34-
//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
35-
//! let a = Int32Array::from(vec![1, 2, 3]);
36-
//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
37-
//!
38-
//! let json_rows = arrow_json::writer::record_batches_to_json_rows(&[&batch]).unwrap();
39-
//! assert_eq!(
40-
//! serde_json::Value::Object(json_rows[1].clone()),
41-
//! serde_json::json!({"a": 2}),
42-
//! );
43-
//! ```
44-
//!
4523
//! ## Writing JSON formatted byte streams
4624
//!
4725
//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use
@@ -97,6 +75,8 @@
9775
//! In order to explicitly write null values for keys, configure a custom [`Writer`] by
9876
//! using a [`WriterBuilder`] to construct a [`Writer`].
9977
78+
mod encoder;
79+
10080
use std::iter;
10181
use std::{fmt::Debug, io::Write};
10282

@@ -109,7 +89,9 @@ use arrow_array::types::*;
10989
use arrow_array::*;
11090
use arrow_schema::*;
11191

92+
use crate::writer::encoder::EncoderOptions;
11293
use arrow_cast::display::{ArrayFormatter, FormatOptions};
94+
use encoder::make_encoder;
11395

11496
fn primitive_array_to_json<T>(array: &dyn Array) -> Result<Vec<Value>, ArrowError>
11597
where
@@ -481,6 +463,7 @@ fn set_column_for_json_rows(
481463

482464
/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON
483465
/// [`JsonMap`]s (objects)
466+
#[deprecated(note = "Use Writer")]
484467
pub fn record_batches_to_json_rows(
485468
batches: &[&RecordBatch],
486469
) -> Result<Vec<JsonMap<String, Value>>, ArrowError> {
@@ -597,11 +580,7 @@ pub type ArrayWriter<W> = Writer<W, JsonArray>;
597580

598581
/// JSON writer builder.
599582
#[derive(Debug, Clone, Default)]
600-
pub struct WriterBuilder {
601-
/// Controls whether null values should be written explicitly for keys
602-
/// in objects, or whether the key should be omitted entirely.
603-
explicit_nulls: bool,
604-
}
583+
pub struct WriterBuilder(EncoderOptions);
605584

606585
impl WriterBuilder {
607586
/// Create a new builder for configuring JSON writing options.
@@ -629,7 +608,7 @@ impl WriterBuilder {
629608

630609
/// Returns `true` if this writer is configured to keep keys with null values.
631610
pub fn explicit_nulls(&self) -> bool {
632-
self.explicit_nulls
611+
self.0.explicit_nulls
633612
}
634613

635614
/// Set whether to keep keys with null values, or to omit writing them.
@@ -654,7 +633,7 @@ impl WriterBuilder {
654633
///
655634
/// Default is to skip nulls (set to `false`).
656635
pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
657-
self.explicit_nulls = explicit_nulls;
636+
self.0.explicit_nulls = explicit_nulls;
658637
self
659638
}
660639

@@ -669,7 +648,7 @@ impl WriterBuilder {
669648
started: false,
670649
finished: false,
671650
format: F::default(),
672-
explicit_nulls: self.explicit_nulls,
651+
options: self.0,
673652
}
674653
}
675654
}
@@ -703,7 +682,7 @@ where
703682
format: F,
704683

705684
/// Whether keys with null values should be written or skipped
706-
explicit_nulls: bool,
685+
options: EncoderOptions,
707686
}
708687

709688
impl<W, F> Writer<W, F>
@@ -718,11 +697,12 @@ where
718697
started: false,
719698
finished: false,
720699
format: F::default(),
721-
explicit_nulls: false,
700+
options: EncoderOptions::default(),
722701
}
723702
}
724703

725704
/// Write a single JSON row to the output writer
705+
#[deprecated(note = "Use Writer::write")]
726706
pub fn write_row(&mut self, row: &Value) -> Result<(), ArrowError> {
727707
let is_first_row = !self.started;
728708
if !self.started {
@@ -738,18 +718,48 @@ where
738718
Ok(())
739719
}
740720

741-
/// Convert the `RecordBatch` into JSON rows, and write them to the output
721+
/// Serialize `batch` to JSON output
742722
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
743-
for row in record_batches_to_json_rows_internal(&[batch], self.explicit_nulls)? {
744-
self.write_row(&Value::Object(row))?;
723+
if batch.num_rows() == 0 {
724+
return Ok(());
745725
}
726+
727+
// BufWriter uses a buffer size of 8KB
728+
// We therefore double this and flush once we have more than 8KB
729+
let mut buffer = Vec::with_capacity(16 * 1024);
730+
731+
let mut is_first_row = !self.started;
732+
if !self.started {
733+
self.format.start_stream(&mut buffer)?;
734+
self.started = true;
735+
}
736+
737+
let array = StructArray::from(batch.clone());
738+
let mut encoder = make_encoder(&array, &self.options)?;
739+
740+
for idx in 0..batch.num_rows() {
741+
self.format.start_row(&mut buffer, is_first_row)?;
742+
is_first_row = false;
743+
744+
encoder.encode(idx, &mut buffer);
745+
if buffer.len() > 8 * 1024 {
746+
self.writer.write_all(&buffer)?;
747+
buffer.clear();
748+
}
749+
self.format.end_row(&mut buffer)?;
750+
}
751+
752+
if !buffer.is_empty() {
753+
self.writer.write_all(&buffer)?;
754+
}
755+
746756
Ok(())
747757
}
748758

749-
/// Convert the [`RecordBatch`] into JSON rows, and write them to the output
759+
/// Serialize `batches` to JSON output
750760
pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
751-
for row in record_batches_to_json_rows_internal(batches, self.explicit_nulls)? {
752-
self.write_row(&Value::Object(row))?;
761+
for b in batches {
762+
self.write(b)?;
753763
}
754764
Ok(())
755765
}
@@ -803,6 +813,9 @@ mod tests {
803813

804814
/// Asserts that the NDJSON `input` is semantically identical to `expected`
805815
fn assert_json_eq(input: &[u8], expected: &str) {
816+
let s = std::str::from_utf8(input).unwrap();
817+
println!("{s}");
818+
806819
let expected: Vec<Option<Value>> = expected
807820
.split('\n')
808821
.map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
@@ -1453,6 +1466,7 @@ mod tests {
14531466
}
14541467

14551468
#[test]
1469+
#[allow(deprecated)]
14561470
fn json_writer_one_row() {
14571471
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
14581472
let v = json!({ "an": "object" });
@@ -1465,6 +1479,7 @@ mod tests {
14651479
}
14661480

14671481
#[test]
1482+
#[allow(deprecated)]
14681483
fn json_writer_two_rows() {
14691484
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
14701485
let v = json!({ "an": "object" });
@@ -1564,9 +1579,9 @@ mod tests {
15641579
r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
15651580
{"a":{"list":[null]},"b":{"list":[null]}}
15661581
{"a":{"list":[]},"b":{"list":[]}}
1567-
{"a":null,"b":{"list":[3,null]}}
1582+
{"b":{"list":[3,null]}}
15681583
{"a":{"list":[4,5]},"b":{"list":[4,5]}}
1569-
{"a":null,"b":{}}
1584+
{"b":{}}
15701585
{"a":{},"b":{}}
15711586
"#,
15721587
);
@@ -1621,7 +1636,7 @@ mod tests {
16211636
assert_json_eq(
16221637
&buf,
16231638
r#"{"map":{"foo":10}}
1624-
{"map":null}
1639+
{}
16251640
{"map":{}}
16261641
{"map":{"bar":20,"baz":30,"qux":40}}
16271642
{"map":{"quux":50}}
@@ -1918,6 +1933,8 @@ mod tests {
19181933
writer.finish()?;
19191934
}
19201935

1936+
println!("{}", std::str::from_utf8(&buf).unwrap());
1937+
19211938
let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
19221939
let expected = serde_json::from_value::<Vec<Value>>(json!([
19231940
{

0 commit comments

Comments
 (0)