From 07ec6e9818b2ca571923f88fd5a5709ec9f0cfa0 Mon Sep 17 00:00:00 2001 From: ravi-databricks <37003292+ravi-databricks@users.noreply.github.com> Date: Tue, 23 Jul 2024 11:07:21 -0700 Subject: [PATCH] Added unit tests for silver layer fanout --- src/onboard_dataflowspec.py | 2 +- tests/resources/onboarding_silverfanout.json | 36 +++++++++++++++++++ .../silver_transformations_fanout.json | 19 ++++++++++ tests/test_onboard_dataflowspec.py | 17 +++++++++ tests/utils.py | 1 + 5 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 tests/resources/onboarding_silverfanout.json create mode 100644 tests/resources/silver_transformations_fanout.json diff --git a/src/onboard_dataflowspec.py b/src/onboard_dataflowspec.py index d03df02..221d805 100644 --- a/src/onboard_dataflowspec.py +++ b/src/onboard_dataflowspec.py @@ -733,7 +733,7 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env): data = [] onboarding_rows = onboarding_df.collect() - mandatory_fields = ["data_flow_id", "data_flow_group", "source_details", f"silver_database_{env}", + mandatory_fields = ["data_flow_id", "data_flow_group", f"silver_database_{env}", "silver_table", f"silver_transformation_json_{env}"] # f"silver_table_path_{env}", for onboarding_row in onboarding_rows: diff --git a/tests/resources/onboarding_silverfanout.json b/tests/resources/onboarding_silverfanout.json new file mode 100644 index 0000000..9eafc12 --- /dev/null +++ b/tests/resources/onboarding_silverfanout.json @@ -0,0 +1,36 @@ +[ + { + "data_flow_id": "104", + "data_flow_group": "A1", + "bronze_database_dev": "bronze", + "bronze_database_staging": "bronze", + "bronze_database_prd": "bronze", + "bronze_table": "customers", + "bronze_table_path_dev": "tests/resources/delta/customers", + "silver_database_dev": "silver", + "silver_database_staging": "silver", + "silver_database_prd": "silver", + "silver_table": "customers_clean", + "silver_cdc_apply_changes": { + "keys": [ + "id" + ], + "sequence_by": "operation_date", + "scd_type": "1", + "apply_as_deletes": "operation = 'DELETE'", + "except_column_list": [ + "operation", + "operation_date", + "_rescued_data" + ] + }, + "silver_table_path_dev": "tests/resources/data/silver/customers", + "silver_table_properties": { + "pipelines.autoOptimize.managed": "false", + "pipelines.reset.allowed": "false", + "pipelines.autoOptimize.zOrderCols": "id,email" + }, + "silver_transformation_json_dev": "tests/resources/silver_transformations_fanout.json", + "silver_data_quality_expectations_json_dev": "tests/resources/dqe/customers/silver_data_quality_expectations.json" + } +] \ No newline at end of file diff --git a/tests/resources/silver_transformations_fanout.json b/tests/resources/silver_transformations_fanout.json new file mode 100644 index 0000000..d000ff3 --- /dev/null +++ b/tests/resources/silver_transformations_fanout.json @@ -0,0 +1,19 @@ +[ + { + "target_table": "customers_clean", + "select_exp": [ + "address", + "email", + "firstname", + "id", + "lastname", + "operation_date", + "operation", + "_rescued_data" + ], + "where_clause": [ + "id IS NOT NULL", + "email is not NULL" + ] + } +] \ No newline at end of file diff --git a/tests/test_onboard_dataflowspec.py b/tests/test_onboard_dataflowspec.py index 0988a37..34bde58 100644 --- a/tests/test_onboard_dataflowspec.py +++ b/tests/test_onboard_dataflowspec.py @@ -310,3 +310,20 @@ def test_bronze_dataflow_spec_append_flow(self): silver_dataflowSpec_df.show(truncate=False) self.assertEqual(bronze_dataflowSpec_df.count(), 3) self.assertEqual(silver_dataflowSpec_df.count(), 3) + + def test_silver_fanout_dataflow_spec_dataframe(self): + """Test for onboardDataflowspec with fanout scenario.""" + local_params = copy.deepcopy(self.onboarding_bronze_silver_params_map) + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, local_params) + onboardDataFlowSpecs.onboard_dataflow_specs() + local_params["onboarding_file_path"] = self.onboarding_silver_fanout_json_file + del local_params["bronze_dataflowspec_table"] + del local_params["bronze_dataflowspec_path"] + local_params["overwrite"] = "False" + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, local_params) + onboardDataFlowSpecs.onboard_silver_dataflow_spec() + silver_dataflowSpec_df = self.read_dataflowspec( + self.onboarding_bronze_silver_params_map['database'], + self.onboarding_bronze_silver_params_map['silver_dataflowspec_table']) + silver_dataflowSpec_df.show(truncate=False) + self.assertEqual(silver_dataflowSpec_df.count(), 4) diff --git a/tests/utils.py b/tests/utils.py index 03e9268..9a6d5e0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -39,6 +39,7 @@ def setUp(self): self.onboarding_type2_json_file = "tests/resources/onboarding_ac_type2.json" self.onboarding_bronze_type2_json_file = "tests/resources/onboarding_ac_bronze_type2.json" self.onboarding_append_flow_json_file = "tests/resources/onboarding_append_flow.json" + self.onboarding_silver_fanout_json_file = "tests/resources/onboarding_silverfanout.json" self.deltaPipelinesMetaStoreOps.drop_database("ravi_dlt_demo") self.deltaPipelinesMetaStoreOps.create_database("ravi_dlt_demo", "Unittest") self.onboarding_bronze_silver_params_map = {