diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index 208b1ba8befa..432efca38360 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -287,6 +287,8 @@ where } } + let allow_extra_fields_in_struct = self.schema.is_some(); + // struct type let dtype = if let Some(mut schema) = self.schema { if let Some(overwrite) = self.schema_overwrite { @@ -338,7 +340,11 @@ where dtype }; - let arr = polars_json::json::deserialize(&json_value, dtype)?; + let arr = polars_json::json::deserialize( + &json_value, + dtype, + allow_extra_fields_in_struct, + )?; let arr = arr.as_any().downcast_ref::().ok_or_else( || polars_err!(ComputeError: "can only deserialize json objects"), )?; diff --git a/crates/polars-json/src/json/deserialize.rs b/crates/polars-json/src/json/deserialize.rs index 600bd6dbea53..5e6977eff3e2 100644 --- a/crates/polars-json/src/json/deserialize.rs +++ b/crates/polars-json/src/json/deserialize.rs @@ -17,169 +17,245 @@ const JSON_NULL_VALUE: BorrowedValue = BorrowedValue::Static(StaticNode::Null); fn deserialize_boolean_into<'a, A: Borrow>>( target: &mut MutableBooleanArray, rows: &[A], -) { - let iter = rows.iter().map(|row| match row.borrow() { +) -> PolarsResult<()> { + let mut err_idx = rows.len(); + let iter = rows.iter().enumerate().map(|(i, row)| match row.borrow() { BorrowedValue::Static(StaticNode::Bool(v)) => Some(v), - _ => None, + BorrowedValue::Static(StaticNode::Null) => None, + _ => { + err_idx = if err_idx == rows.len() { i } else { err_idx }; + None + }, }); target.extend_trusted_len(iter); + check_err_idx(rows, err_idx, "boolean") } fn deserialize_primitive_into<'a, T: NativeType + NumCast, A: Borrow>>( target: &mut MutablePrimitiveArray, rows: &[A], -) { - let iter = rows.iter().map(|row| match row.borrow() { +) -> PolarsResult<()> { + let mut err_idx = rows.len(); + let iter = rows.iter().enumerate().map(|(i, row)| match row.borrow() { BorrowedValue::Static(StaticNode::I64(v)) => T::from(*v), BorrowedValue::Static(StaticNode::U64(v)) => T::from(*v), BorrowedValue::Static(StaticNode::F64(v)) => T::from(*v), BorrowedValue::Static(StaticNode::Bool(v)) => T::from(*v as u8), - _ => None, + BorrowedValue::Static(StaticNode::Null) => None, + _ => { + err_idx = if err_idx == rows.len() { i } else { err_idx }; + None + }, }); target.extend_trusted_len(iter); + check_err_idx(rows, err_idx, "numeric") } -fn deserialize_binary<'a, A: Borrow>>(rows: &[A]) -> BinaryArray { - let iter = rows.iter().map(|row| match row.borrow() { +fn deserialize_binary<'a, A: Borrow>>( + rows: &[A], +) -> PolarsResult> { + let mut err_idx = rows.len(); + let iter = rows.iter().enumerate().map(|(i, row)| match row.borrow() { BorrowedValue::String(v) => Some(v.as_bytes()), - _ => None, + BorrowedValue::Static(StaticNode::Null) => None, + _ => { + err_idx = if err_idx == rows.len() { i } else { err_idx }; + None + }, }); - BinaryArray::from_trusted_len_iter(iter) + let out = BinaryArray::from_trusted_len_iter(iter); + check_err_idx(rows, err_idx, "binary")?; + Ok(out) } fn deserialize_utf8_into<'a, O: Offset, A: Borrow>>( target: &mut MutableUtf8Array, rows: &[A], -) { +) -> PolarsResult<()> { + let mut err_idx = rows.len(); let mut scratch = String::new(); - for row in rows { + for (i, row) in rows.iter().enumerate() { match row.borrow() { BorrowedValue::String(v) => target.push(Some(v.as_ref())), BorrowedValue::Static(StaticNode::Bool(v)) => { target.push(Some(if *v { "true" } else { "false" })) }, - BorrowedValue::Static(node) if !matches!(node, StaticNode::Null) => { + BorrowedValue::Static(StaticNode::Null) => target.push_null(), + BorrowedValue::Static(node) => { write!(scratch, "{node}").unwrap(); target.push(Some(scratch.as_str())); scratch.clear(); }, - _ => target.push_null(), + _ => { + err_idx = if err_idx == rows.len() { i } else { err_idx }; + }, } } + check_err_idx(rows, err_idx, "string") } fn deserialize_utf8view_into<'a, A: Borrow>>( target: &mut MutableBinaryViewArray, rows: &[A], -) { +) -> PolarsResult<()> { + let mut err_idx = rows.len(); let mut scratch = String::new(); - for row in rows { + for (i, row) in rows.iter().enumerate() { match row.borrow() { BorrowedValue::String(v) => target.push_value(v.as_ref()), BorrowedValue::Static(StaticNode::Bool(v)) => { target.push_value(if *v { "true" } else { "false" }) }, - BorrowedValue::Static(node) if !matches!(node, StaticNode::Null) => { + BorrowedValue::Static(StaticNode::Null) => target.push_null(), + BorrowedValue::Static(node) => { write!(scratch, "{node}").unwrap(); target.push_value(scratch.as_str()); scratch.clear(); }, - _ => target.push_null(), + _ => { + err_idx = if err_idx == rows.len() { i } else { err_idx }; + }, } } + check_err_idx(rows, err_idx, "string") } fn deserialize_list<'a, A: Borrow>>( rows: &[A], dtype: ArrowDataType, -) -> ListArray { + allow_extra_fields_in_struct: bool, +) -> PolarsResult> { + let mut err_idx = rows.len(); let child = ListArray::::get_child_type(&dtype); let mut validity = MutableBitmap::with_capacity(rows.len()); let mut offsets = Offsets::::with_capacity(rows.len()); let mut inner = vec![]; - rows.iter().for_each(|row| match row.borrow() { - BorrowedValue::Array(value) => { - inner.extend(value.iter()); - validity.push(true); - offsets - .try_push(value.len()) - .expect("List offset is too large :/"); - }, - BorrowedValue::Static(StaticNode::Null) => { - validity.push(false); - offsets.extend_constant(1) - }, - value @ (BorrowedValue::Static(_) | BorrowedValue::String(_)) => { - inner.push(value); - validity.push(true); - offsets.try_push(1).expect("List offset is too large :/"); - }, - _ => { - validity.push(false); - offsets.extend_constant(1); - }, - }); + rows.iter() + .enumerate() + .for_each(|(i, row)| match row.borrow() { + BorrowedValue::Array(value) => { + inner.extend(value.iter()); + validity.push(true); + offsets + .try_push(value.len()) + .expect("List offset is too large :/"); + }, + BorrowedValue::Static(StaticNode::Null) => { + validity.push(false); + offsets.extend_constant(1) + }, + value @ (BorrowedValue::Static(_) | BorrowedValue::String(_)) => { + inner.push(value); + validity.push(true); + offsets.try_push(1).expect("List offset is too large :/"); + }, + _ => { + err_idx = if err_idx == rows.len() { i } else { err_idx }; + }, + }); + + check_err_idx(rows, err_idx, "list")?; - let values = _deserialize(&inner, child.clone()); + let values = _deserialize(&inner, child.clone(), allow_extra_fields_in_struct)?; - ListArray::::new(dtype, offsets.into(), values, validity.into()) + Ok(ListArray::::new( + dtype, + offsets.into(), + values, + validity.into(), + )) } fn deserialize_struct<'a, A: Borrow>>( rows: &[A], dtype: ArrowDataType, -) -> StructArray { + allow_extra_fields_in_struct: bool, +) -> PolarsResult { + let mut err_idx = rows.len(); let fields = StructArray::get_fields(&dtype); - let mut values = fields + let mut out_values = fields .iter() .map(|f| (f.name.as_str(), (f.dtype(), vec![]))) .collect::>(); let mut validity = MutableBitmap::with_capacity(rows.len()); + // Custom error tracker + let mut extra_field = None; - rows.iter().for_each(|row| { + rows.iter().enumerate().for_each(|(i, row)| { match row.borrow() { - BorrowedValue::Object(value) => { - values.iter_mut().for_each(|(s, (_, inner))| { - inner.push(value.get(*s).unwrap_or(&JSON_NULL_VALUE)) - }); + BorrowedValue::Object(values) => { + let mut n_matched = 0usize; + for (&key, &mut (_, ref mut inner)) in out_values.iter_mut() { + if let Some(v) = values.get(key) { + n_matched += 1; + inner.push(v) + } else { + inner.push(&JSON_NULL_VALUE) + } + } + validity.push(true); + + if n_matched < values.len() && extra_field.is_none() { + for k in values.keys() { + if !out_values.contains_key(k.as_ref()) { + extra_field = Some(k.as_ref()) + } + } + } }, - _ => { - values + BorrowedValue::Static(StaticNode::Null) => { + out_values .iter_mut() .for_each(|(_, (_, inner))| inner.push(&JSON_NULL_VALUE)); validity.push(false); }, + _ => { + err_idx = if err_idx == rows.len() { i } else { err_idx }; + }, }; }); + if let Some(v) = extra_field { + if !allow_extra_fields_in_struct { + polars_bail!(ComputeError: "extra key in struct data: {}", v) + } + } + + check_err_idx(rows, err_idx, "struct")?; + // ensure we collect in the proper order let values = fields .iter() .map(|fld| { - let (dtype, vals) = values.get(fld.name.as_str()).unwrap(); - _deserialize(vals, (*dtype).clone()) + let (dtype, vals) = out_values.get(fld.name.as_str()).unwrap(); + _deserialize(vals, (*dtype).clone(), allow_extra_fields_in_struct) }) - .collect::>(); + .collect::>>()?; - StructArray::new(dtype.clone(), rows.len(), values, validity.into()) + Ok(StructArray::new( + dtype.clone(), + rows.len(), + values, + validity.into(), + )) } fn fill_array_from( - f: fn(&mut MutablePrimitiveArray, &[B]), + f: fn(&mut MutablePrimitiveArray, &[B]) -> PolarsResult<()>, dtype: ArrowDataType, rows: &[B], -) -> Box +) -> PolarsResult> where T: NativeType, A: From> + Array, { let mut array = MutablePrimitiveArray::::with_capacity(rows.len()).to(dtype); - f(&mut array, rows); - Box::new(A::from(array)) + f(&mut array, rows)?; + Ok(Box::new(A::from(array))) } /// A trait describing an array with a backing store that can be preallocated to @@ -236,22 +312,34 @@ impl Container for MutableUtf8Array { } } -fn fill_generic_array_from(f: fn(&mut M, &[B]), rows: &[B]) -> Box +fn fill_generic_array_from( + f: fn(&mut M, &[B]) -> PolarsResult<()>, + rows: &[B], +) -> PolarsResult> where M: Container, A: From + Array, { let mut array = M::with_capacity(rows.len()); - f(&mut array, rows); - Box::new(A::from(array)) + f(&mut array, rows)?; + Ok(Box::new(A::from(array))) } pub(crate) fn _deserialize<'a, A: Borrow>>( rows: &[A], dtype: ArrowDataType, -) -> Box { + allow_extra_fields_in_struct: bool, +) -> PolarsResult> { match &dtype { - ArrowDataType::Null => Box::new(NullArray::new(dtype, rows.len())), + ArrowDataType::Null => { + if let Some(err_idx) = (0..rows.len()) + .find(|i| !matches!(rows[*i].borrow(), BorrowedValue::Static(StaticNode::Null))) + { + check_err_idx(rows, err_idx, "null")?; + } + + Ok(Box::new(NullArray::new(dtype, rows.len()))) + }, ArrowDataType::Boolean => { fill_generic_array_from::<_, _, BooleanArray>(deserialize_boolean_into, rows) }, @@ -277,7 +365,8 @@ pub(crate) fn _deserialize<'a, A: Borrow>>( fill_array_from::<_, _, PrimitiveArray>(deserialize_primitive_into, dtype, rows) }, ArrowDataType::Timestamp(tu, tz) => { - let iter = rows.iter().map(|row| match row.borrow() { + let mut err_idx = rows.len(); + let iter = rows.iter().enumerate().map(|(i, row)| match row.borrow() { BorrowedValue::Static(StaticNode::I64(v)) => Some(*v), BorrowedValue::String(v) => match (tu, tz) { (_, None) => temporal_conversions::utf8_to_naive_timestamp_scalar(v, "%+", tu), @@ -286,9 +375,15 @@ pub(crate) fn _deserialize<'a, A: Borrow>>( temporal_conversions::utf8_to_timestamp_scalar(v, "%+", &tz, tu) }, }, - _ => None, + BorrowedValue::Static(StaticNode::Null) => None, + _ => { + err_idx = if err_idx == rows.len() { i } else { err_idx }; + None + }, }); - Box::new(Int64Array::from_iter(iter).to(dtype)) + let out = Box::new(Int64Array::from_iter(iter).to(dtype)); + check_err_idx(rows, err_idx, "timestamp")?; + Ok(out) }, ArrowDataType::UInt8 => { fill_array_from::<_, _, PrimitiveArray>(deserialize_primitive_into, dtype, rows) @@ -315,19 +410,51 @@ pub(crate) fn _deserialize<'a, A: Borrow>>( ArrowDataType::Utf8View => { fill_generic_array_from::<_, _, Utf8ViewArray>(deserialize_utf8view_into, rows) }, - ArrowDataType::LargeList(_) => Box::new(deserialize_list(rows, dtype)), - ArrowDataType::LargeBinary => Box::new(deserialize_binary(rows)), - ArrowDataType::Struct(_) => Box::new(deserialize_struct(rows, dtype)), + ArrowDataType::LargeList(_) => Ok(Box::new(deserialize_list( + rows, + dtype, + allow_extra_fields_in_struct, + )?)), + ArrowDataType::LargeBinary => Ok(Box::new(deserialize_binary(rows)?)), + ArrowDataType::Struct(_) => Ok(Box::new(deserialize_struct( + rows, + dtype, + allow_extra_fields_in_struct, + )?)), _ => todo!(), } } -pub fn deserialize(json: &BorrowedValue, dtype: ArrowDataType) -> PolarsResult> { +pub fn deserialize( + json: &BorrowedValue, + dtype: ArrowDataType, + allow_extra_fields_in_struct: bool, +) -> PolarsResult> { match json { BorrowedValue::Array(rows) => match dtype { - ArrowDataType::LargeList(inner) => Ok(_deserialize(rows, inner.dtype)), + ArrowDataType::LargeList(inner) => { + _deserialize(rows, inner.dtype, allow_extra_fields_in_struct) + }, _ => todo!("read an Array from a non-Array data type"), }, - _ => Ok(_deserialize(&[json], dtype)), + _ => _deserialize(&[json], dtype, allow_extra_fields_in_struct), } } + +fn check_err_idx<'a>( + rows: &[impl Borrow>], + err_idx: usize, + type_name: &'static str, +) -> PolarsResult<()> { + if err_idx != rows.len() { + polars_bail!( + ComputeError: + r#"error deserializing value "{:?}" as {}. \ + Try increasing `infer_schema_length` or specifying a schema. + "#, + rows[err_idx].borrow(), type_name, + ) + } + + Ok(()) +} diff --git a/crates/polars-json/src/ndjson/deserialize.rs b/crates/polars-json/src/ndjson/deserialize.rs index 94a482b7b275..4441691cf034 100644 --- a/crates/polars-json/src/ndjson/deserialize.rs +++ b/crates/polars-json/src/ndjson/deserialize.rs @@ -18,19 +18,28 @@ pub fn deserialize_iter<'a>( dtype: ArrowDataType, buf_size: usize, count: usize, + allow_extra_fields_in_struct: bool, ) -> PolarsResult { let mut arr: Vec> = Vec::new(); let mut buf = Vec::with_capacity(std::cmp::min(buf_size + count + 2, u32::MAX as usize)); buf.push(b'['); - fn _deserializer(s: &mut [u8], dtype: ArrowDataType) -> PolarsResult> { + fn _deserializer( + s: &mut [u8], + dtype: ArrowDataType, + allow_extra_fields_in_struct: bool, + ) -> PolarsResult> { let out = simd_json::to_borrowed_value(s) .map_err(|e| PolarsError::ComputeError(format!("json parsing error: '{e}'").into()))?; - Ok(if let BorrowedValue::Array(rows) = out { - super::super::json::deserialize::_deserialize(&rows, dtype.clone()) + if let BorrowedValue::Array(rows) = out { + super::super::json::deserialize::_deserialize( + &rows, + dtype.clone(), + allow_extra_fields_in_struct, + ) } else { unreachable!() - }) + } } let mut row_iter = rows.peekable(); @@ -42,7 +51,11 @@ pub fn deserialize_iter<'a>( if buf.len() + next_row_length >= u32::MAX as usize { let _ = buf.pop(); buf.push(b']'); - arr.push(_deserializer(&mut buf, dtype.clone())?); + arr.push(_deserializer( + &mut buf, + dtype.clone(), + allow_extra_fields_in_struct, + )?); buf.clear(); buf.push(b'['); } @@ -53,9 +66,13 @@ pub fn deserialize_iter<'a>( buf.push(b']'); if arr.is_empty() { - _deserializer(&mut buf, dtype.clone()) + _deserializer(&mut buf, dtype.clone(), allow_extra_fields_in_struct) } else { - arr.push(_deserializer(&mut buf, dtype.clone())?); + arr.push(_deserializer( + &mut buf, + dtype.clone(), + allow_extra_fields_in_struct, + )?); concatenate_owned_unchecked(&arr) } } diff --git a/crates/polars-ops/src/chunked_array/strings/json_path.rs b/crates/polars-ops/src/chunked_array/strings/json_path.rs index 02b3c076efd7..7ef813d63027 100644 --- a/crates/polars-ops/src/chunked_array/strings/json_path.rs +++ b/crates/polars-ops/src/chunked_array/strings/json_path.rs @@ -98,6 +98,8 @@ pub trait Utf8JsonPathImpl: AsString { infer_schema_len: Option, ) -> PolarsResult { let ca = self.as_string(); + // Ignore extra fields instead of erroring if the dtype was explicitly given. + let allow_extra_fields_in_struct = dtype.is_some(); let dtype = match dtype { Some(dt) => dt, None => ca.json_infer(infer_schema_len)?, @@ -110,6 +112,7 @@ pub trait Utf8JsonPathImpl: AsString { dtype.to_arrow(CompatLevel::newest()), buf_size, ca.len(), + allow_extra_fields_in_struct, ) .map_err(|e| polars_err!(ComputeError: "error deserializing JSON: {}", e))?; Series::try_from((PlSmallStr::EMPTY, array)) diff --git a/py-polars/tests/unit/io/test_json.py b/py-polars/tests/unit/io/test_json.py index a4afd57e73e4..f664b9f760f3 100644 --- a/py-polars/tests/unit/io/test_json.py +++ b/py-polars/tests/unit/io/test_json.py @@ -68,9 +68,10 @@ def test_write_json_decimal() -> None: def test_json_infer_schema_length_11148() -> None: response = [{"col1": 1}] * 2 + [{"col1": 1, "col2": 2}] * 1 - result = pl.read_json(json.dumps(response).encode(), infer_schema_length=2) - with pytest.raises(AssertionError): - assert set(result.columns) == {"col1", "col2"} + with pytest.raises( + pl.exceptions.ComputeError, match="extra key in struct data: col2" + ): + pl.read_json(json.dumps(response).encode(), infer_schema_length=2) response = [{"col1": 1}] * 2 + [{"col1": 1, "col2": 2}] * 1 result = pl.read_json(json.dumps(response).encode(), infer_schema_length=3) @@ -433,7 +434,12 @@ def test_empty_list_json() -> None: def test_json_infer_3_dtypes() -> None: # would SO before df = pl.DataFrame({"a": ["{}", "1", "[1, 2]"]}) - out = df.select(pl.col("a").str.json_decode()) + + with pytest.raises(pl.exceptions.ComputeError): + df.select(pl.col("a").str.json_decode()) + + df = pl.DataFrame({"a": [None, "1", "[1, 2]"]}) + out = df.select(pl.col("a").str.json_decode(dtype=pl.List(pl.String))) assert out["a"].to_list() == [None, ["1"], ["1", "2"]] assert out.dtypes[0] == pl.List(pl.String) diff --git a/py-polars/tests/unit/operations/namespaces/string/test_string.py b/py-polars/tests/unit/operations/namespaces/string/test_string.py index 6b221c1706e8..ab44b4e9603a 100644 --- a/py-polars/tests/unit/operations/namespaces/string/test_string.py +++ b/py-polars/tests/unit/operations/namespaces/string/test_string.py @@ -1755,3 +1755,41 @@ def test_extract_many() -> None: assert df.select(pl.col("values").str.extract_many("patterns")).to_dict( as_series=False ) == {"values": [["disco"], ["rhap", "ody"]]} + + +def test_json_decode_raise_on_data_type_mismatch_13061() -> None: + assert_series_equal( + pl.Series(["null", "null"]).str.json_decode(infer_schema_length=1), + pl.Series([None, None]), + ) + + with pytest.raises(ComputeError): + pl.Series(["null", "1"]).str.json_decode(infer_schema_length=1) + + assert_series_equal( + pl.Series(["null", "1"]).str.json_decode(infer_schema_length=2), + pl.Series([None, 1]), + ) + + +def test_json_decode_struct_schema() -> None: + with pytest.raises(ComputeError, match="extra key in struct data: b"): + pl.Series([r'{"a": 1}', r'{"a": 2, "b": 2}']).str.json_decode( + infer_schema_length=1 + ) + + assert_series_equal( + pl.Series([r'{"a": 1}', r'{"a": 2, "b": 2}']).str.json_decode( + infer_schema_length=2 + ), + pl.Series([{"a": 1, "b": None}, {"a": 2, "b": 2}]), + ) + + # If the schema was explicitly given, then we ignore extra fields. + # TODO: There should be a `columns=` parameter to this. + assert_series_equal( + pl.Series([r'{"a": 1}', r'{"a": 2, "b": 2}']).str.json_decode( + dtype=pl.Struct({"a": pl.Int64}) + ), + pl.Series([{"a": 1}, {"a": 2}]), + )