From cebfab154a70119e843987a20bf9ed8a8cf8862b Mon Sep 17 00:00:00 2001 From: ravi-databricks <37003292+ravi-databricks@users.noreply.github.com> Date: Sat, 17 Aug 2024 13:36:18 -0700 Subject: [PATCH 1/2] Added no schema for cdc_apply_changes support --- src/dataflow_pipeline.py | 52 ++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/src/dataflow_pipeline.py b/src/dataflow_pipeline.py index dfd9fb1..8e811e2 100644 --- a/src/dataflow_pipeline.py +++ b/src/dataflow_pipeline.py @@ -408,29 +408,9 @@ def cdc_apply_changes(self): if cdc_apply_changes is None: raise Exception("cdcApplychanges is None! ") - struct_schema = ( - StructType.fromJson(self.schema_json) - if isinstance(self.dataflowSpec, BronzeDataflowSpec) - else self.silver_schema - ) - - sequenced_by_data_type = None - - if cdc_apply_changes.except_column_list: - modified_schema = StructType([]) - if struct_schema: - for field in struct_schema.fields: - if field.name not in cdc_apply_changes.except_column_list: - modified_schema.add(field) - if field.name == cdc_apply_changes.sequence_by: - sequenced_by_data_type = field.dataType - struct_schema = modified_schema - else: - raise Exception(f"Schema is None for {self.dataflowSpec} for cdc_apply_changes! ") - - if struct_schema and cdc_apply_changes.scd_type == "2": - struct_schema.add(StructField("__START_AT", sequenced_by_data_type)) - struct_schema.add(StructField("__END_AT", sequenced_by_data_type)) + struct_schema = None + if self.schema_json: + struct_schema = self.modify_schema_for_cdc_changes(cdc_apply_changes) target_path = None if self.uc_enabled else self.dataflowSpec.targetDetails["path"] @@ -464,6 +444,32 @@ def cdc_apply_changes(self): ignore_null_updates_except_column_list=cdc_apply_changes.ignore_null_updates_except_column_list ) + def modify_schema_for_cdc_changes(self, cdc_apply_changes): + struct_schema = ( + StructType.fromJson(self.schema_json) + if isinstance(self.dataflowSpec, BronzeDataflowSpec) + else self.silver_schema + ) + + sequenced_by_data_type = None + + if cdc_apply_changes.except_column_list: + modified_schema = StructType([]) + if struct_schema: + for field in struct_schema.fields: + if field.name not in cdc_apply_changes.except_column_list: + modified_schema.add(field) + if field.name == cdc_apply_changes.sequence_by: + sequenced_by_data_type = field.dataType + struct_schema = modified_schema + else: + raise Exception(f"Schema is None for {self.dataflowSpec} for cdc_apply_changes! ") + + if struct_schema and cdc_apply_changes.scd_type == "2": + struct_schema.add(StructField("__START_AT", sequenced_by_data_type)) + struct_schema.add(StructField("__END_AT", sequenced_by_data_type)) + return struct_schema + def create_streaming_table(self, struct_schema, target_path=None): expect_all_dict, expect_all_or_drop_dict, expect_all_or_fail_dict = self.get_dq_expectations() dlt.create_streaming_table( From 8e2dc8596bdf1d5274d8f475f03803638256a65b Mon Sep 17 00:00:00 2001 From: ravi-databricks <37003292+ravi-databricks@users.noreply.github.com> Date: Mon, 19 Aug 2024 10:34:17 -0700 Subject: [PATCH 2/2] Added unit tests --- src/dataflow_pipeline.py | 4 ++++ tests/generate_delta_tables.py | 2 +- tests/test_dataflow_pipeline.py | 40 +++++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/src/dataflow_pipeline.py b/src/dataflow_pipeline.py index 8e811e2..bbf4582 100644 --- a/src/dataflow_pipeline.py +++ b/src/dataflow_pipeline.py @@ -445,6 +445,10 @@ def cdc_apply_changes(self): ) def modify_schema_for_cdc_changes(self, cdc_apply_changes): + if isinstance(self.dataflowSpec, BronzeDataflowSpec) and self.schema_json is None: + return None + if isinstance(self.dataflowSpec, SilverDataflowSpec) and self.silver_schema is None: + return None struct_schema = ( StructType.fromJson(self.schema_json) if isinstance(self.dataflowSpec, BronzeDataflowSpec) diff --git a/tests/generate_delta_tables.py b/tests/generate_delta_tables.py index 69cd8f0..52d623f 100644 --- a/tests/generate_delta_tables.py +++ b/tests/generate_delta_tables.py @@ -21,4 +21,4 @@ transactions_parquet_df = spark.read.options(**options).json("tests/resources/data/transactions") transactions_parquet_df.withColumn("_rescued_data", lit("Test")).write.format("delta").mode("overwrite").save( - "tests/resources/delta/transactions") + "tests/resources/delta/transactions") \ No newline at end of file diff --git a/tests/test_dataflow_pipeline.py b/tests/test_dataflow_pipeline.py index 835edc1..5cbdc05 100644 --- a/tests/test_dataflow_pipeline.py +++ b/tests/test_dataflow_pipeline.py @@ -1003,3 +1003,43 @@ def test_read_append_flows(self, mock_view): bronze_dataflowSpec_df.appendFlows = None with self.assertRaises(Exception): pipeline = DataflowPipeline(self.spark, bronze_dataflowSpec_df, view_name, None) + + @patch('dlt.table', new_callable=MagicMock) + def test_modify_schema_for_cdc_changes(self, mock_dlt_table): + mock_dlt_table.table.return_value = None + cdc_apply_changes_json = """{ + "keys": ["id"], + "sequence_by": "operation_date", + "scd_type": "2", + "except_column_list": ["operation", "operation_date", "_rescued_data"] + }""" + cdc_apply_changes = DataflowSpecUtils.get_cdc_apply_changes(cdc_apply_changes_json) + bmap = DataflowPipelineTests.bronze_dataflow_spec_map + ddlSchemaStr = ( + self.spark.read.text(paths="tests/resources/schema/customer_schema.ddl") + .select("value") + .collect()[0]["value"] + ) + schema = T._parse_datatype_string(ddlSchemaStr) + bronze_dataflow_spec = BronzeDataflowSpec( + **bmap + ) + bronze_dataflow_spec.schema = json.dumps(schema.jsonValue()) + bronze_dataflow_spec.cdcApplyChanges = json.dumps(self.silver_cdc_apply_changes_scd2) + bronze_dataflow_spec.dataQualityExpectations = None + view_name = f"{bronze_dataflow_spec.targetDetails['table']}_inputView" + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, view_name, None) + expected_schema = T.StructType([ + T.StructField("address", T.StringType()), + T.StructField("email", T.StringType()), + T.StructField("firstname", T.StringType()), + T.StructField("id", T.StringType()), + T.StructField("lastname", T.StringType()), + T.StructField("__START_AT", T.StringType()), + T.StructField("__END_AT", T.StringType()) + ]) + modified_schema = pipeline.modify_schema_for_cdc_changes(cdc_apply_changes) + self.assertEqual(modified_schema, expected_schema) + pipeline.schema_json = None + modified_schema = pipeline.modify_schema_for_cdc_changes(cdc_apply_changes) + self.assertEqual(modified_schema, None)