Skip to content

Commit

Permalink
Introduce DeserializedMessage for carrying schema information into th…
Browse files Browse the repository at this point in the history
…e writers

The DeserializedMessage carries optional inferred schema information
along with the message itself. This is useful for understanding whether
schema evolution hould happen "later" in the message processing
pipeline.

The downside of this behavior is that there will be performance impact
as arrow_json does schema inference.

Sponsored-by: Raft LLC
  • Loading branch information
rtyler committed Jan 9, 2024
1 parent 93e3d73 commit f49b4f7
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 62 deletions.
13 changes: 9 additions & 4 deletions src/coercions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use serde_json::Value;
use std::collections::HashMap;
use std::str::FromStr;

use crate::serialization::DeserializedMessage;

#[derive(Debug, Clone, PartialEq)]
#[allow(unused)]
enum CoercionNode {
Expand Down Expand Up @@ -72,7 +74,7 @@ fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {

/// Applies all data coercions specified by the [`CoercionTree`] to the [`Value`].
/// Though it does not currently, this function should approximate or improve on the coercions applied by [Spark's `from_json`](https://spark.apache.org/docs/latest/api/sql/index.html#from_json)
pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) {
pub(crate) fn coerce(value: &mut DeserializedMessage, coercion_tree: &CoercionTree) {
if let Some(context) = value.as_object_mut() {
for (field_name, coercion) in coercion_tree.root.iter() {
if let Some(value) = context.get_mut(field_name) {
Expand Down Expand Up @@ -322,7 +324,7 @@ mod tests {

let coercion_tree = create_coercion_tree(&delta_schema);

let mut messages = vec![
let mut messages: Vec<DeserializedMessage> = vec![
json!({
"level1_string": "a",
"level1_integer": 0,
Expand Down Expand Up @@ -380,7 +382,10 @@ mod tests {
// This is valid epoch micros, but typed as a string on the way in. We WON'T coerce it.
"level1_timestamp": "1636668718000000",
}),
];
]
.into_iter()
.map(|f| f.into())
.collect();

for message in messages.iter_mut() {
coerce(message, &coercion_tree);
Expand Down Expand Up @@ -447,7 +452,7 @@ mod tests {
];

for i in 0..messages.len() {
assert_eq!(messages[i], expected[i]);
assert_eq!(messages[i].clone().message(), expected[i]);
}
}
}
15 changes: 10 additions & 5 deletions src/dead_letters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

use crate::serialization::DeserializedMessage;
use crate::{transforms::TransformError, writer::*};

#[cfg(feature = "s3")]
Expand Down Expand Up @@ -55,11 +56,11 @@ impl DeadLetter {

/// Creates a dead letter from a failed transform.
/// `base64_bytes` will always be `None`.
pub(crate) fn from_failed_transform(value: &Value, err: TransformError) -> Self {
pub(crate) fn from_failed_transform(value: &DeserializedMessage, err: TransformError) -> Self {
let timestamp = Utc::now();
Self {
base64_bytes: None,
json_string: Some(value.to_string()),
json_string: Some(value.clone().message().to_string()),
error: Some(err.to_string()),
timestamp: timestamp
.timestamp_nanos_opt()
Expand Down Expand Up @@ -286,9 +287,10 @@ impl DeadLetterQueue for DeltaSinkDeadLetterQueue {
.map(|dl| {
serde_json::to_value(dl)
.map_err(|e| DeadLetterQueueError::SerdeJson { source: e })
.and_then(|mut v| {
.and_then(|v| {
self.transformer
.transform(&mut v, None as Option<&BorrowedMessage>)?;
// TODO: this can't be right, shouldn't this function takje DeserializedMessage
.transform(&mut v.clone().into(), None as Option<&BorrowedMessage>)?;
Ok(v)
})
})
Expand All @@ -297,7 +299,10 @@ impl DeadLetterQueue for DeltaSinkDeadLetterQueue {

let version = self
.delta_writer
.insert_all(&mut self.table, values)
.insert_all(
&mut self.table,
values.into_iter().map(|v| v.into()).collect(),
)
.await?;

if self.write_checkpoints {
Expand Down
11 changes: 7 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ mod dead_letters;
mod delta_helpers;
mod metrics;
mod offsets;
mod serialization;
#[allow(missing_docs)]
pub mod serialization;
mod transforms;
mod value_buffers;
/// Doc
Expand All @@ -56,6 +57,7 @@ use crate::value_buffers::{ConsumedBuffers, ValueBuffers};
use crate::{
dead_letters::*,
metrics::*,
serialization::*,
transforms::*,
writer::{DataWriter, DataWriterError},
};
Expand Down Expand Up @@ -207,8 +209,9 @@ pub enum IngestError {
}

/// Formats for message parsing
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub enum MessageFormat {
#[default]
/// Parses messages as json and uses the inferred schema
DefaultJson,

Expand Down Expand Up @@ -733,7 +736,7 @@ struct IngestProcessor {
coercion_tree: CoercionTree,
table: DeltaTable,
delta_writer: DataWriter,
value_buffers: ValueBuffers,
value_buffers: ValueBuffers<DeserializedMessage>,
delta_partition_offsets: HashMap<DataTypePartition, Option<DataTypeOffset>>,
latency_timer: Instant,
dlq: Box<dyn DeadLetterQueue>,
Expand Down Expand Up @@ -864,7 +867,7 @@ impl IngestProcessor {
async fn deserialize_message<M>(
&mut self,
msg: &M,
) -> Result<Value, MessageDeserializationError>
) -> Result<DeserializedMessage, MessageDeserializationError>
where
M: Message + Send + Sync,
{
Expand Down
97 changes: 88 additions & 9 deletions src/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,53 @@ use serde_json::Value;

use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat};

use deltalake_core::arrow::datatypes::Schema as ArrowSchema;

/// Structure which contains the [serde_json::Value] and the inferred schema of the message
///
/// The [ArrowSchema] helps with schema evolution
#[derive(Clone, Debug, Default, PartialEq)]
pub struct DeserializedMessage {
message: Value,
schema: Option<ArrowSchema>,
}

impl DeserializedMessage {
pub fn schema(&self) -> &Option<ArrowSchema> {
&self.schema
}
pub fn message(self) -> Value {
self.message
}
pub fn get(&self, key: &str) -> Option<&Value> {
self.message.get(key)
}
pub fn as_object_mut(&mut self) -> Option<&mut serde_json::Map<String, Value>> {
self.message.as_object_mut()
}
}

/// Allow for `.into()` on [Value] for ease of use
impl From<Value> for DeserializedMessage {
fn from(message: Value) -> Self {
// XXX: This seems wasteful, this function should go away, and the deserializers should
// infer straight from the buffer stream
let iter = vec![message.clone()].into_iter().map(|v| Ok(v));
let schema =
match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) {
Ok(schema) => Some(schema),
_ => None,
};
Self { message, schema }
}
}

#[async_trait]
pub(crate) trait MessageDeserializer {
async fn deserialize(
&mut self,
message_bytes: &[u8],
) -> Result<Value, MessageDeserializationError>;
) -> Result<DeserializedMessage, MessageDeserializationError>;
}

pub(crate) struct MessageDeserializerFactory {}
Expand Down Expand Up @@ -80,11 +121,15 @@ impl MessageDeserializerFactory {
}
}

#[derive(Clone, Debug, Default)]
struct DefaultDeserializer {}

#[async_trait]
impl MessageDeserializer for DefaultDeserializer {
async fn deserialize(&mut self, payload: &[u8]) -> Result<Value, MessageDeserializationError> {
async fn deserialize(
&mut self,
payload: &[u8],
) -> Result<DeserializedMessage, MessageDeserializationError> {
let value: Value = match serde_json::from_slice(payload) {
Ok(v) => v,
Err(e) => {
Expand All @@ -94,7 +139,41 @@ impl MessageDeserializer for DefaultDeserializer {
}
};

Ok(value)
Ok(value.into())
}
}

#[cfg(test)]
mod default_tests {
use super::*;

#[tokio::test]
async fn deserialize_with_schema() {
let mut deser = DefaultDeserializer::default();
let message = deser
.deserialize(r#"{"hello" : "world"}"#.as_bytes())
.await
.expect("Failed to deserialize trivial JSON");
assert!(
message.schema().is_some(),
"The DeserializedMessage doesn't have a schema!"
);
}

#[tokio::test]
async fn deserialize_simple_json() {
#[derive(serde::Deserialize)]
struct HW {
hello: String,
}

let mut deser = DefaultDeserializer::default();
let message = deser
.deserialize(r#"{"hello" : "world"}"#.as_bytes())
.await
.expect("Failed to deserialize trivial JSON");
let value: HW = serde_json::from_value(message.message).expect("Failed to coerce");
assert_eq!("world", value.hello);
}
}

Expand All @@ -116,11 +195,11 @@ impl MessageDeserializer for AvroDeserializer {
async fn deserialize(
&mut self,
message_bytes: &[u8],
) -> Result<Value, MessageDeserializationError> {
) -> Result<DeserializedMessage, MessageDeserializationError> {
match self.decoder.decode_with_schema(Some(message_bytes)).await {
Ok(drs) => match drs {
Some(v) => match Value::try_from(v.value) {
Ok(v) => Ok(v),
Ok(v) => Ok(v.into()),
Err(e) => Err(MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(
message_bytes,
Expand All @@ -147,7 +226,7 @@ impl MessageDeserializer for AvroSchemaDeserializer {
async fn deserialize(
&mut self,
message_bytes: &[u8],
) -> Result<Value, MessageDeserializationError> {
) -> Result<DeserializedMessage, MessageDeserializationError> {
let reader_result = match &self.schema {
None => apache_avro::Reader::new(Cursor::new(message_bytes)),
Some(schema) => apache_avro::Reader::with_schema(schema, Cursor::new(message_bytes)),
Expand All @@ -162,7 +241,7 @@ impl MessageDeserializer for AvroSchemaDeserializer {
};

return match v {
Ok(value) => Ok(value),
Ok(value) => Ok(value.into()),
Err(e) => Err(MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(
message_bytes,
Expand Down Expand Up @@ -221,11 +300,11 @@ impl MessageDeserializer for JsonDeserializer {
async fn deserialize(
&mut self,
message_bytes: &[u8],
) -> Result<Value, MessageDeserializationError> {
) -> Result<DeserializedMessage, MessageDeserializationError> {
let decoder = self.decoder.borrow_mut();
match decoder.decode(Some(message_bytes)).await {
Ok(drs) => match drs {
Some(v) => Ok(v.value),
Some(v) => Ok(v.value.into()),
None => return Err(MessageDeserializationError::EmptyPayload),
},
Err(e) => {
Expand Down
17 changes: 11 additions & 6 deletions src/transforms.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::serialization::DeserializedMessage;
use chrono::prelude::*;
use jmespatch::{
functions::{ArgumentType, CustomFunction, Signature},
Expand Down Expand Up @@ -348,13 +349,13 @@ impl Transformer {
/// The optional `kafka_message` must be provided to include well known Kafka properties in the value.
pub(crate) fn transform<M>(
&self,
value: &mut Value,
value: &mut DeserializedMessage,
kafka_message: Option<&M>,
) -> Result<(), TransformError>
where
M: Message,
{
let data = Variable::try_from(value.clone())?;
let data = Variable::try_from(value.clone().message())?;

match value.as_object_mut() {
Some(map) => {
Expand All @@ -378,7 +379,7 @@ impl Transformer {
Ok(())
}
_ => Err(TransformError::ValueNotAnObject {
value: value.to_owned(),
value: value.clone().message(),
}),
}
}
Expand Down Expand Up @@ -510,7 +511,7 @@ mod tests {

#[test]
fn transforms_with_substr() {
let mut test_value = json!({
let test_value = json!({
"name": "A",
"modified": "2021-03-16T14:38:58Z",
});
Expand All @@ -524,6 +525,7 @@ mod tests {
0,
None,
);
let mut test_value: DeserializedMessage = test_value.into();

let mut transforms = HashMap::new();

Expand All @@ -540,6 +542,7 @@ mod tests {

let name = test_value.get("name").unwrap().as_str().unwrap();
let modified = test_value.get("modified").unwrap().as_str().unwrap();
println!("TEST: {test_value:?}");
let modified_date = test_value.get("modified_date").unwrap().as_str().unwrap();

assert_eq!("A", name);
Expand Down Expand Up @@ -567,7 +570,7 @@ mod tests {
fn test_transforms_with_epoch_seconds_to_iso8601() {
let expected_iso = "2021-07-20T23:18:18Z";

let mut test_value = json!({
let test_value = json!({
"name": "A",
"epoch_seconds_float": 1626823098.51995,
"epoch_seconds_int": 1626823098,
Expand All @@ -584,6 +587,7 @@ mod tests {
0,
None,
);
let mut test_value: DeserializedMessage = test_value.into();

let mut transforms = HashMap::new();
transforms.insert(
Expand Down Expand Up @@ -640,7 +644,7 @@ mod tests {

#[test]
fn test_transforms_with_kafka_meta() {
let mut test_value = json!({
let test_value = json!({
"name": "A",
"modified": "2021-03-16T14:38:58Z",
});
Expand All @@ -655,6 +659,7 @@ mod tests {
None,
);

let mut test_value: DeserializedMessage = test_value.into();
let mut transforms = HashMap::new();

transforms.insert("_kafka_offset".to_string(), "kafka.offset".to_string());
Expand Down
Loading

0 comments on commit f49b4f7

Please sign in to comment.