Skip to content

Commit

Permalink
chore(deps): Bump simd-json from 0.10.6 to 0.11.1 & fix error message (
Browse files Browse the repository at this point in the history
…#12550)

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: xxchan <[email protected]>
  • Loading branch information
dependabot[bot] and xxchan authored Oct 7, 2023
1 parent 4fe8294 commit b0fc8ca
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 100 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = "0.10.6"
simd-json = "0.11.1"
strum = "0.25"
strum_macros = "0.25"
tempfile = "3"
Expand Down
106 changes: 35 additions & 71 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,78 +460,42 @@ mod tests {
let mut parser = build_parser(columns.clone(), Default::default()).await;

let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2);
// i64 overflow
let data0 = br#"{"payload":{"before":null,"after":{"O_KEY":9223372036854775808,"O_BOOL":1,"O_TINY":33,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data0.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected fail");
}
// bool incorrect value
let data1 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":2,"O_TINY":33,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data1.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected failed");
}
// i16 overflow
let data2 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":32768,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data2.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected to fail");
}
// i32 overflow
let data3 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":33,"O_INT":2147483648,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data3.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected to fail");
}
// float32 overflow
let data4 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":33,"O_INT":444,"O_REAL":3.80282347E38,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data4.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected to fail");
}
// float64 will cause debezium simd_json_parser to panic, therefore included in the next
// test case below
}

#[tokio::test]
#[should_panic]
async fn test2_debezium_json_parser_overflow_f64() {
let columns = vec![SourceColumnDesc::simple(
"O_DOUBLE",
DataType::Float64,
ColumnId::from(0),
)];
let mut parser = build_parser(columns.clone(), Default::default()).await;
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2);
let data = br#"{"payload":{"before":null,"after":{"O_DOUBLE":1.797695E308},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678174483000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":563,"row":0,"thread":3,"query":null},"op":"c","ts_ms":1678174483866,"transaction":null}}"#;
if let Err(e) = parser
.parse_inner(None, Some(data.to_vec()), builder.row_writer())
.await
{
println!("{:?}", e.to_string());
} else {
panic!("the test case is expected to fail");
let normal_values = ["111", "1", "33", "444", "555.0", "666.0"];
let overflow_values = [
"9223372036854775808",
"2",
"32768",
"2147483648",
"3.80282347E38",
"1.797695E308",
];

for i in 0..6 {
let mut values = normal_values;
values[i] = overflow_values[i];
let data = format!(
r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
values[0], values[1], values[2], values[3], values[4], values[5]
).as_bytes().to_vec();
let e = parser
.parse_inner(None, Some(data), builder.row_writer())
.await
.unwrap_err();
println!("{}", e);
if i < 5 {
// For other overflow, the parsing succeeds but the type conversion fails
assert!(
e.to_string().contains("AccessError: TypeError"),
"i={i}, actual error: {e}"
);
} else {
// For f64 overflow, the parsing fails
assert!(
e.to_string().contains("InvalidNumber"),
"i={i}, actual error: {e}"
);
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,14 @@ impl SourceStreamChunkRowWriter<'_> {
}

/// Transaction control message. Currently only used by Debezium messages.
#[derive(Debug)]
pub enum TransactionControl {
Begin { id: Box<str> },
Commit { id: Box<str> },
}

/// The result of parsing a message.
#[derive(Debug)]
pub enum ParseResult {
/// Some rows are parsed and written to the [`SourceStreamChunkRowWriter`].
Rows,
Expand Down
51 changes: 25 additions & 26 deletions src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::types::{
DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
use risingwave_common::util::iter_util::ZipEqFast;
use simd_json::{BorrowedValue, TryTypeError, ValueAccess, ValueType};
use simd_json::{BorrowedValue, ValueAccess, ValueType};

use super::{Access, AccessError, AccessResult};
use crate::parser::common::json_object_get_case_insensitive;
Expand Down Expand Up @@ -157,6 +157,7 @@ impl JsonParseOptions {
got: value.value_type().to_string(),
value: value.to_string(),
};

let v: ScalarImpl = match (type_expected, value.value_type()) {
(_, ValueType::Null) => return Ok(None),
// ---- Boolean -----
Expand Down Expand Up @@ -205,7 +206,7 @@ impl JsonParseOptions {
(
Some(DataType::Int16),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) => value.try_as_i16()?.into(),
) => value.try_as_i16().map_err(|_| create_error())?.into(),

(Some(DataType::Int16), ValueType::String)
if matches!(
Expand All @@ -226,7 +227,7 @@ impl JsonParseOptions {
(
Some(DataType::Int32),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) => value.try_as_i32()?.into(),
) => value.try_as_i32().map_err(|_| create_error())?.into(),

(Some(DataType::Int32), ValueType::String)
if matches!(
Expand All @@ -244,11 +245,13 @@ impl JsonParseOptions {
.into()
}
// ---- Int64 -----
(None, ValueType::I64 | ValueType::U64) => value.try_as_i64()?.into(),
(None, ValueType::I64 | ValueType::U64) => {
value.try_as_i64().map_err(|_| create_error())?.into()
}
(
Some(DataType::Int64),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) => value.try_as_i64()?.into(),
) => value.try_as_i64().map_err(|_| create_error())?.into(),

(Some(DataType::Int64), ValueType::String)
if matches!(
Expand All @@ -270,7 +273,7 @@ impl JsonParseOptions {
Some(DataType::Float32),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
(value.try_as_i64()? as f32).into()
(value.try_as_i64().map_err(|_| create_error())? as f32).into()
}
(Some(DataType::Float32), ValueType::String)
if matches!(
Expand All @@ -287,13 +290,15 @@ impl JsonParseOptions {
.map_err(|_| create_error())?
.into()
}
(Some(DataType::Float32), ValueType::F64) => value.try_as_f32()?.into(),
(Some(DataType::Float32), ValueType::F64) => {
value.try_as_f32().map_err(|_| create_error())?.into()
}
// ---- Float64 -----
(
Some(DataType::Float64),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
(value.try_as_i64()? as f64).into()
(value.try_as_i64().map_err(|_| create_error())? as f64).into()
}
(Some(DataType::Float64), ValueType::String)
if matches!(
Expand All @@ -310,20 +315,24 @@ impl JsonParseOptions {
.map_err(|_| create_error())?
.into()
}
(Some(DataType::Float64) | None, ValueType::F64) => value.try_as_f64()?.into(),
(Some(DataType::Float64) | None, ValueType::F64) => {
value.try_as_f64().map_err(|_| create_error())?.into()
}
// ---- Decimal -----
(Some(DataType::Decimal) | None, ValueType::I128 | ValueType::U128) => {
Decimal::from_str(&value.try_as_i128()?.to_string())
Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
.map_err(|_| create_error())?
.into()
}
(Some(DataType::Decimal), ValueType::I64 | ValueType::U64) => {
Decimal::from(value.try_as_i64()?).into()
Decimal::from(value.try_as_i64().map_err(|_| create_error())?).into()
}

(Some(DataType::Decimal), ValueType::F64) => Decimal::try_from(value.try_as_f64()?)
.map_err(|_| create_error())?
.into(),
(Some(DataType::Decimal), ValueType::F64) => {
Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
.map_err(|_| create_error())?
.into()
}

(Some(DataType::Decimal), ValueType::String) => ScalarImpl::Decimal(
Decimal::from_str(value.as_str().unwrap()).map_err(|_err| create_error())?,
Expand Down Expand Up @@ -353,7 +362,7 @@ impl JsonParseOptions {
(
Some(DataType::Date),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) => Date::with_days_since_unix_epoch(value.try_as_i32()?)
) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
.map_err(|_| create_error())?
.into(),
(Some(DataType::Date), ValueType::String) => value
Expand Down Expand Up @@ -526,7 +535,7 @@ impl JsonParseOptions {
(
Some(DataType::Int256),
ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
) => Int256::from(value.try_as_i64()?).into(),
) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),

(Some(DataType::Int256), ValueType::String) => {
Int256::from_str(value.as_str().unwrap())
Expand Down Expand Up @@ -579,13 +588,3 @@ where
self.options.parse(value, type_expected)
}
}

impl From<TryTypeError> for AccessError {
fn from(value: TryTypeError) -> Self {
AccessError::TypeError {
expected: value.expected.to_string(),
got: value.expected.to_string(),
value: Default::default(),
}
}
}

0 comments on commit b0fc8ca

Please sign in to comment.