diff --git a/docs/content/getting_started/additionals2.md b/docs/content/getting_started/additionals2.md index aec53d4..2fe18e5 100644 --- a/docs/content/getting_started/additionals2.md +++ b/docs/content/getting_started/additionals2.md @@ -18,9 +18,9 @@ draft: false 4. If you already have datatbricks CLI installed with profile as given [here](https://docs.databricks.com/en/dev-tools/cli/profiles.html), you can skip above export step and provide `--profile=` option while running command in step:4 5. Run integration test against cloudfile or eventhub or kafka using below options: - - 5a. Run the command for cloudfiles ```python integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=cloudfiles --dbfs_path=dbfs:/tmp/DLT-META/``` + - 5a. Run the command for cloudfiles ```python integration-tests/run_integration_tests.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=cloudfiles --dbfs_path=dbfs:/tmp/DLT-META/``` - - 5b. Run the command for eventhub ```python integration-tests/run-integration-test.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer --eventhub_consumer_accesskey_name=consumer``` + - 5b. Run the command for eventhub ```python integration-tests/run_integration_tests.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer --eventhub_consumer_accesskey_name=consumer``` - - For eventhub integration tests, the following are the prerequisites: 1. Needs eventhub instance running @@ -36,7 +36,7 @@ draft: false 6. Provide eventhub access key name : --eventhub_consumer_accesskey_name - - 5c. Run the command for kafka ```python3 integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=kafka --dbfs_path=dbfs:/tmp/DLT-META/ --kafka_topic_name=dlt-meta-integration-test --kafka_broker=host:9092``` + - 5c. Run the command for kafka ```python3 integration-tests/run_integration_tests.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=kafka --dbfs_path=dbfs:/tmp/DLT-META/ --kafka_topic_name=dlt-meta-integration-test --kafka_broker=host:9092``` - - For kafka integration tests, the following are the prerequisites: 1. Needs kafka instance running diff --git a/docs/content/getting_started/metadatapreperation.md b/docs/content/getting_started/metadatapreperation.md index b84df3e..27199dd 100644 --- a/docs/content/getting_started/metadatapreperation.md +++ b/docs/content/getting_started/metadatapreperation.md @@ -17,7 +17,7 @@ draft: false | data_flow_id | This is unique identifer for pipeline | | data_flow_group | This is group identifer for launching multiple pipelines under single DLT | | source_format | Source format e.g `cloudFiles`, `eventhub`, `kafka`, `delta` | -| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database` and for eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.accessKeySecretName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported
In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization
.e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` | +| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database`, `source_metadata` For eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.accessKeySecretName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported
In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization
.e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` | | bronze_database_{env} | Delta lake bronze database name. | | bronze_table | Delta lake bronze table name | | bronze_reader_options | Reader options which can be provided to spark reader
e.g multiline=true,header=true in json format | diff --git a/examples/onboarding.json b/examples/onboarding.json index cd6ca16..e21ffc6 100644 --- a/examples/onboarding.json +++ b/examples/onboarding.json @@ -7,7 +7,15 @@ "source_details": { "source_database": "APP", "source_table": "CUSTOMERS", - "source_path_it": "{dbfs_path}/cdc_raw/customers" + "source_path_it": "{dbfs_path}/cdc_raw/customers", + "source_metadata": { + "include_autoloader_metadata_column": "True", + "autoloader_metadata_col_name": "source_metadata", + "select_metadata_cols": { + "input_file_name": "_metadata.file_name", + "input_file_path": "_metadata.file_path" + } + } }, "bronze_database_it": "bronze_it_{run_id}", "bronze_table": "customers_cdc", diff --git a/integration_tests/conf/cloudfiles-onboarding.template b/integration_tests/conf/cloudfiles-onboarding.template index 9a735bc..b0960f2 100644 --- a/integration_tests/conf/cloudfiles-onboarding.template +++ b/integration_tests/conf/cloudfiles-onboarding.template @@ -8,6 +8,14 @@ "source_database": "APP", "source_table": "CUSTOMERS", "source_path_it": "{dbfs_path}/integration_tests/resources/data/customers", + "source_metadata": { + "include_autoloader_metadata_column": "True", + "autoloader_metadata_col_name": "source_metadata", + "select_metadata_cols": { + "input_file_name": "_metadata.file_name", + "input_file_path": "_metadata.file_path" + } + }, "source_schema_path": "{dbfs_path}/integration_tests/resources/customers.ddl" }, "bronze_database_it": "{uc_catalog_name}.{bronze_schema}", @@ -80,6 +88,9 @@ "source_database": "APP", "source_table": "TRANSACTIONS", "source_path_it": "{dbfs_path}/integration_tests/resources/data/transactions", + "source_metadata": { + "include_autoloader_metadata_column": "True" + }, "source_schema_path": "{dbfs_path}/integration_tests/resources/transactions.ddl" }, "bronze_database_it": "{uc_catalog_name}.{bronze_schema}", diff --git a/integration_tests/run_integration_tests.py b/integration_tests/run_integration_tests.py index 694d911..7ad4b27 100644 --- a/integration_tests/run_integration_tests.py +++ b/integration_tests/run_integration_tests.py @@ -724,7 +724,8 @@ def create_cluster(self, runner_conf: DLTMetaRunnerConf): spark_conf=spark_confs, autotermination_minutes=30, spark_env_vars={ - "PYSPARK_PYTHON": "/databricks/python3/bin/python3" + "PYSPARK_PYTHON": "/databricks/python3/bin/python3", + "WSFS_ENABLE": "false" }, data_security_mode=mode ).result() @@ -742,7 +743,7 @@ def run(self, runner_conf: DLTMetaRunnerConf): print(e) finally: print("Cleaning up...") - # self.clean_up(runner_conf) + self.clean_up(runner_conf) def download_test_results(self, runner_conf: DLTMetaRunnerConf): ws_output_file = self.ws.workspace.download(runner_conf.test_output_file_path) diff --git a/setup.py b/setup.py index 98e1219..6343251 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ """ setup( name="dlt_meta", - version="0.0.7", + version="0.0.8", python_requires=">=3.8", setup_requires=["wheel>=0.37.1,<=0.42.0"], install_requires=INSTALL_REQUIRES, diff --git a/src/__about__.py b/src/__about__.py index eead319..9123cf0 100644 --- a/src/__about__.py +++ b/src/__about__.py @@ -1 +1 @@ -__version__ = '0.0.5' +__version__ = '0.0.8' diff --git a/src/dataflow_pipeline.py b/src/dataflow_pipeline.py index 6fb95da..57cd2d0 100644 --- a/src/dataflow_pipeline.py +++ b/src/dataflow_pipeline.py @@ -54,7 +54,7 @@ class DataflowPipeline: [type]: [description] """ - def __init__(self, spark, dataflow_spec, view_name, view_name_quarantine=None): + def __init__(self, spark, dataflow_spec, view_name, view_name_quarantine=None, custom_tranform_func=None): """Initialize Constructor.""" logger.info( f"""dataflowSpec={dataflow_spec} , @@ -62,11 +62,15 @@ def __init__(self, spark, dataflow_spec, view_name, view_name_quarantine=None): view_name_quarantine={view_name_quarantine}""" ) if isinstance(dataflow_spec, BronzeDataflowSpec) or isinstance(dataflow_spec, SilverDataflowSpec): - self.__initialize_dataflow_pipeline(spark, dataflow_spec, view_name, view_name_quarantine) + self.__initialize_dataflow_pipeline( + spark, dataflow_spec, view_name, view_name_quarantine, custom_tranform_func + ) else: raise Exception("Dataflow not supported!") - def __initialize_dataflow_pipeline(self, spark, dataflow_spec, view_name, view_name_quarantine): + def __initialize_dataflow_pipeline( + self, spark, dataflow_spec, view_name, view_name_quarantine, custom_tranform_func + ): """Initialize dataflow pipeline state.""" self.spark = spark uc_enabled_str = spark.conf.get("spark.databricks.unityCatalog.enabled", "False") @@ -76,6 +80,7 @@ def __initialize_dataflow_pipeline(self, spark, dataflow_spec, view_name, view_n self.view_name = view_name if view_name_quarantine: self.view_name_quarantine = view_name_quarantine + self.custom_tranform_func = custom_tranform_func if dataflow_spec.cdcApplyChanges: self.cdcApplyChanges = DataflowSpecUtils.get_cdc_apply_changes(self.dataflowSpec.cdcApplyChanges) else: @@ -205,14 +210,21 @@ def read_bronze(self) -> DataFrame: self.schema_json ) bronze_dataflow_spec: BronzeDataflowSpec = self.dataflowSpec + input_df = None if bronze_dataflow_spec.sourceFormat == "cloudFiles": - return pipeline_reader.read_dlt_cloud_files() + input_df = pipeline_reader.read_dlt_cloud_files() elif bronze_dataflow_spec.sourceFormat == "delta": - return PipelineReaders.read_dlt_delta() + input_df = PipelineReaders.read_dlt_delta() elif bronze_dataflow_spec.sourceFormat == "eventhub" or bronze_dataflow_spec.sourceFormat == "kafka": - return PipelineReaders.read_kafka() + input_df = PipelineReaders.read_kafka() else: raise Exception(f"{bronze_dataflow_spec.sourceFormat} source format not supported") + return self.apply_custom_transform_fun(input_df) + + def apply_custom_transform_fun(self, input_df): + if self.custom_tranform_func: + input_df = self.custom_tranform_func(input_df) + return input_df def get_silver_schema(self): """Get Silver table Schema.""" @@ -266,7 +278,7 @@ def read_silver(self) -> DataFrame: if len(where_clause_str.strip()) > 0: for where_clause in where_clause: raw_delta_table_stream = raw_delta_table_stream.where(where_clause) - return raw_delta_table_stream + return self.apply_custom_transform_fun(raw_delta_table_stream) def write_to_delta(self): """Write to Delta.""" @@ -483,7 +495,7 @@ def run_dlt(self): self.write() @staticmethod - def invoke_dlt_pipeline(spark, layer): + def invoke_dlt_pipeline(spark, layer, custom_tranform_func=None): """Invoke dlt pipeline will launch dlt with given dataflowspec. Args: @@ -504,7 +516,8 @@ def invoke_dlt_pipeline(spark, layer): and dataflowSpec.quarantineTargetDetails != {}: quarantine_input_view_name = ( f"{dataflowSpec.quarantineTargetDetails['table']}" - f"_{layer}_quarantine_inputView" + f"_{layer}_quarantine_inputView", + custom_tranform_func ) else: logger.info("quarantine_input_view_name set to None") @@ -513,6 +526,7 @@ def invoke_dlt_pipeline(spark, layer): dataflowSpec, f"{dataflowSpec.targetDetails['table']}_{layer}_inputView", quarantine_input_view_name, + custom_tranform_func ) dlt_data_flow.run_dlt() diff --git a/src/onboard_dataflowspec.py b/src/onboard_dataflowspec.py index 808e715..156d110 100644 --- a/src/onboard_dataflowspec.py +++ b/src/onboard_dataflowspec.py @@ -458,7 +458,6 @@ def __get_bronze_dataflow_spec_dataframe(self, onboarding_df, env): raise Exception(f"Source format {source_format} not supported in DLT-META! row={onboarding_row}") source_details, bronze_reader_config_options, schema = self.get_bronze_source_details_reader_options_schema( onboarding_row, env) - bronze_target_format = "delta" bronze_target_details = { "database": onboarding_row["bronze_database_{}".format(env)], @@ -611,6 +610,14 @@ def get_bronze_source_details_reader_options_schema(self, onboarding_row, env): source_details["source_database"] = source_details_file["source_database"] if "source_table" in source_details_file: source_details["source_table"] = source_details_file["source_table"] + if "source_metadata" in source_details_file: + source_metadata_dict = self.__delete_none(source_details_file["source_metadata"].asDict()) + if "select_metadata_cols" in source_metadata_dict: + select_metadata_cols = self.__delete_none( + source_metadata_dict["select_metadata_cols"].asDict() + ) + source_metadata_dict["select_metadata_cols"] = select_metadata_cols + source_details["source_metadata"] = json.dumps(self.__delete_none(source_metadata_dict)) elif source_format.lower() == "eventhub" or source_format.lower() == "kafka": source_details = source_details_file if "source_schema_path" in source_details_file: diff --git a/src/pipeline_readers.py b/src/pipeline_readers.py index d9de7b6..902e143 100644 --- a/src/pipeline_readers.py +++ b/src/pipeline_readers.py @@ -1,5 +1,6 @@ """PipelineReaders providers DLT readers functionality.""" import logging +import json from pyspark.sql import DataFrame from pyspark.sql.types import StructType from pyspark.sql.functions import from_json, col @@ -29,22 +30,41 @@ def read_dlt_cloud_files(self) -> DataFrame: DataFrame: _description_ """ logger.info("In read_dlt_cloud_files func") + input_df = None source_path = self.source_details["path"] - if self.schema_json and self.source_format != "delta": schema = StructType.fromJson(self.schema_json) - return ( + input_df = ( self.spark.readStream.format(self.source_format) .options(**self.reader_config_options) .schema(schema) .load(source_path) ) else: - return ( + input_df = ( self.spark.readStream.format(self.source_format) .options(**self.reader_config_options) .load(source_path) ) + if self.source_details and "source_metadata" in self.source_details.keys(): + input_df = PipelineReaders.add_cloudfiles_metadata(self.source_details, input_df) + return input_df + + @staticmethod + def add_cloudfiles_metadata(sourceDetails, input_df): + source_metadata_json = json.loads(sourceDetails.get("source_metadata")) + keys = source_metadata_json.keys() + if "include_autoloader_metadata_column" in keys: + if "autoloader_metadata_col_name" in source_metadata_json: + source_metadata_col_name = source_metadata_json["autoloader_metadata_col_name"] + input_df = input_df.selectExpr("*", f"_metadata as {source_metadata_col_name}") + else: + input_df = input_df.selectExpr("*", "_metadata as source_metadata") + if "select_metadata_cols" in source_metadata_json: + select_metadata_cols = source_metadata_json["select_metadata_cols"] + for select_metadata_col in select_metadata_cols: + input_df = input_df.withColumn(select_metadata_col, col(select_metadata_cols[select_metadata_col])) + return input_df def read_dlt_delta(self) -> DataFrame: """Read dlt delta. diff --git a/tests/resources/onboarding.json b/tests/resources/onboarding.json index d0b9c7f..9ccab56 100644 --- a/tests/resources/onboarding.json +++ b/tests/resources/onboarding.json @@ -1 +1,168 @@ -[{"data_flow_id": "100", "data_flow_group": "A1", "source_system": "MYSQL", "source_format": "cloudFiles", "source_details": {"source_database": "APP", "source_table": "CUSTOMERS", "source_path_dev": "tests/resources/data/customers", "source_schema_path": "tests/resources/schema/customer_schema.ddl"}, "bronze_database_dev": "bronze", "bronze_database_staging": "bronze", "bronze_database_prd": "bronze", "bronze_table": "customers_cdc", "bronze_reader_options": {"cloudFiles.format": "json", "cloudFiles.inferColumnTypes": "true", "cloudFiles.rescuedDataColumn": "_rescued_data"}, "bronze_table_path_dev": "tests/resources/delta/customers", "bronze_table_properties": {"pipelines.autoOptimize.managed": "false", "pipelines.reset.allowed": "false"}, "bronze_data_quality_expectations_json_dev": "tests/resources/dqe/customers/bronze_data_quality_expectations.json", "silver_database_dev": "silver", "silver_database_staging": "silver", "silver_database_prd": "silver", "silver_table": "customers", "silver_cdc_apply_changes": {"keys": ["id"], "sequence_by": "operation_date", "scd_type": "1", "apply_as_deletes": "operation = 'DELETE'", "except_column_list": ["operation", "operation_date", "_rescued_data"]}, "silver_table_path_dev": "tests/resources/data/silver/customers", "silver_table_properties": {"pipelines.autoOptimize.managed": "false", "pipelines.reset.allowed": "false", "pipelines.autoOptimize.zOrderCols": "id,email"}, "silver_transformation_json_dev": "tests/resources/silver_transformations.json", "silver_data_quality_expectations_json_dev": "tests/resources/dqe/customers/silver_data_quality_expectations.json"}, {"data_flow_id": "101", "data_flow_group": "A1", "source_system": "MYSQL", "source_format": "cloudFiles", "source_details": {"source_database": "APP", "source_table": "TRANSACTIONS", "source_path_prd": "tests/resources/data/transactions", "source_path_dev": "tests/resources/data/transactions"}, "bronze_database_dev": "bronze", "bronze_database_staging": "bronze", "bronze_database_prd": "bronze", "bronze_table": "transactions_cdc", "bronze_reader_options": {"cloudFiles.format": "json", "cloudFiles.inferColumnTypes": "true", "cloudFiles.rescuedDataColumn": "_rescued_data"}, "bronze_table_path_dev": "tests/resources/delta/transactions", "bronze_table_path_staging": "s3://db-dlt-meta-staging/demo/data/bronze/transactions", "bronze_table_path_prd": "s3://db-dlt-meta-prod/demo/data/bronze/transactions", "bronze_table_properties": {"pipelines.reset.allowed": "false"}, "bronze_data_quality_expectations_json_dev": "tests/resources/dqe/transactions/bronze_data_quality_expectations.json", "bronze_database_quarantine_dev": "bronze", "bronze_database_quarantine_staging": "bronze", "bronze_database_quarantine_prd": "bronze", "bronze_quarantine_table": "transactions_cdc_quarantine", "bronze_quarantine_table_path_dev": "tests/resources/data/bronze/transactions_quarantine", "silver_database_dev": "silver", "silver_database_preprd": "silver", "silver_database_prd": "silver", "silver_table": "transactions", "silver_cdc_apply_changes": {"keys": ["id"], "sequence_by": "operation_date", "scd_type": "1", "apply_as_deletes": "operation = 'DELETE'", "except_column_list": ["operation", "operation_date", "_rescued_data"]}, "silver_partition_columns": "transaction_date", "silver_table_path_dev": "tests/resources/data/silver/transactions", "silver_transformation_json_dev": "tests/resources/silver_transformations.json", "silver_table_properties": {"pipelines.reset.allowed": "false", "pipelines.autoOptimize.zOrderCols": "id, customer_id"}, "silver_data_quality_expectations_json_dev": "tests/resources/dqe/transactions/silver_data_quality_expectations.json"}, {"data_flow_id": "103", "data_flow_group": "A2", "source_system": "MYSQL", "source_format": "eventhub", "source_details": {"source_schema_path": "tests/resources/schema/eventhub_iot_schema.ddl", "eventhub.accessKeyName": "iotIngestionAccessKey", "eventhub.name": "iot", "eventhub.accessKeySecretName": "iotIngestionAccessKey", "eventhub.secretsScopeName": "eventhubs_creds", "kafka.sasl.mechanism": "PLAIN", "kafka.security.protocol": "SASL_SSL", "kafka.bootstrap.servers": "standard.servicebus.windows.net:9093"}, "bronze_database_dev": "bronze", "bronze_database_staging": "bronze", "bronze_database_prd": "bronze", "bronze_table": "iot_cdc", "bronze_reader_options": {"maxOffsetsPerTrigger": "50000", "startingOffsets": "latest", "failOnDataLoss": "false", "kafka.request.timeout.ms": "60000", "kafka.session.timeout.ms": "60000"}, "bronze_table_path_dev": "tests/resources/delta/iot_cdc", "bronze_table_path_staging": "s3://db-dlt-meta-staging/demo/data/bronze/iot_cdc", "bronze_table_path_prd": "s3://db-dlt-meta-prod/demo/data/bronze/iot_cdc", "bronze_data_quality_expectations_json_dev": "tests/resources/dqe/iot_cdc/bronze_data_quality_expectations.json", "silver_database_dev": "silver", "silver_table": "iot_cdc", "silver_cdc_apply_changes": {"keys": ["device_id"], "sequence_by": "timestamp", "scd_type": "1", "apply_as_deletes": "operation = 'DELETE'", "except_column_list": []}, "silver_table_path_dev": "tests/resources/data/silver/iot_cdc", "silver_transformation_json_dev": "tests/resources/silver_transformations.json", "silver_data_quality_expectations_json_dev": "tests/resources/dqe/iot_cdc/silver_data_quality_expectations.json"}] \ No newline at end of file +[ + { + "data_flow_id": "100", + "data_flow_group": "A1", + "source_system": "MYSQL", + "source_format": "cloudFiles", + "source_details": { + "source_database": "APP", + "source_table": "CUSTOMERS", + "source_path_dev": "tests/resources/data/customers", + "source_schema_path": "tests/resources/schema/customer_schema.ddl", + "source_metadata": { + "include_autoloader_metadata_column": "True", + "autoloader_metadata_col_name": "source_metadata", + "select_metadata_cols": { + "input_file_name": "_metadata.file_name", + "input_file_path": "_metadata.file_path" + } + } + }, + "bronze_database_dev": "bronze", + "bronze_database_staging": "bronze", + "bronze_database_prd": "bronze", + "bronze_table": "customers_cdc", + "bronze_reader_options": { + "cloudFiles.format": "json", + "cloudFiles.inferColumnTypes": "true", + "cloudFiles.rescuedDataColumn": "_rescued_data" + }, + "bronze_table_path_dev": "tests/resources/delta/customers", + "bronze_table_properties": { + "pipelines.autoOptimize.managed": "false", + "pipelines.reset.allowed": "false" + }, + "bronze_data_quality_expectations_json_dev": "tests/resources/dqe/customers/bronze_data_quality_expectations.json", + "silver_database_dev": "silver", + "silver_database_staging": "silver", + "silver_database_prd": "silver", + "silver_table": "customers", + "silver_cdc_apply_changes": { + "keys": [ + "id" + ], + "sequence_by": "operation_date", + "scd_type": "1", + "apply_as_deletes": "operation = 'DELETE'", + "except_column_list": [ + "operation", + "operation_date", + "_rescued_data" + ] + }, + "silver_table_path_dev": "tests/resources/data/silver/customers", + "silver_table_properties": { + "pipelines.autoOptimize.managed": "false", + "pipelines.reset.allowed": "false", + "pipelines.autoOptimize.zOrderCols": "id,email" + }, + "silver_transformation_json_dev": "tests/resources/silver_transformations.json", + "silver_data_quality_expectations_json_dev": "tests/resources/dqe/customers/silver_data_quality_expectations.json" + }, + { + "data_flow_id": "101", + "data_flow_group": "A1", + "source_system": "MYSQL", + "source_format": "cloudFiles", + "source_details": { + "source_database": "APP", + "source_table": "TRANSACTIONS", + "source_path_prd": "tests/resources/data/transactions", + "source_path_dev": "tests/resources/data/transactions", + "source_metadata": { + "include_autoloader_metadata_column": "True" + } + }, + "bronze_database_dev": "bronze", + "bronze_database_staging": "bronze", + "bronze_database_prd": "bronze", + "bronze_table": "transactions_cdc", + "bronze_reader_options": { + "cloudFiles.format": "json", + "cloudFiles.inferColumnTypes": "true", + "cloudFiles.rescuedDataColumn": "_rescued_data" + }, + "bronze_table_path_dev": "tests/resources/delta/transactions", + "bronze_table_path_staging": "s3://db-dlt-meta-staging/demo/data/bronze/transactions", + "bronze_table_path_prd": "s3://db-dlt-meta-prod/demo/data/bronze/transactions", + "bronze_table_properties": { + "pipelines.reset.allowed": "false" + }, + "bronze_data_quality_expectations_json_dev": "tests/resources/dqe/transactions/bronze_data_quality_expectations.json", + "bronze_database_quarantine_dev": "bronze", + "bronze_database_quarantine_staging": "bronze", + "bronze_database_quarantine_prd": "bronze", + "bronze_quarantine_table": "transactions_cdc_quarantine", + "bronze_quarantine_table_path_dev": "tests/resources/data/bronze/transactions_quarantine", + "silver_database_dev": "silver", + "silver_database_preprd": "silver", + "silver_database_prd": "silver", + "silver_table": "transactions", + "silver_cdc_apply_changes": { + "keys": [ + "id" + ], + "sequence_by": "operation_date", + "scd_type": "1", + "apply_as_deletes": "operation = 'DELETE'", + "except_column_list": [ + "operation", + "operation_date", + "_rescued_data" + ] + }, + "silver_partition_columns": "transaction_date", + "silver_table_path_dev": "tests/resources/data/silver/transactions", + "silver_transformation_json_dev": "tests/resources/silver_transformations.json", + "silver_table_properties": { + "pipelines.reset.allowed": "false", + "pipelines.autoOptimize.zOrderCols": "id, customer_id" + }, + "silver_data_quality_expectations_json_dev": "tests/resources/dqe/transactions/silver_data_quality_expectations.json" + }, + { + "data_flow_id": "103", + "data_flow_group": "A2", + "source_system": "MYSQL", + "source_format": "eventhub", + "source_details": { + "source_schema_path": "tests/resources/schema/eventhub_iot_schema.ddl", + "eventhub.accessKeyName": "iotIngestionAccessKey", + "eventhub.name": "iot", + "eventhub.accessKeySecretName": "iotIngestionAccessKey", + "eventhub.secretsScopeName": "eventhubs_creds", + "kafka.sasl.mechanism": "PLAIN", + "kafka.security.protocol": "SASL_SSL", + "kafka.bootstrap.servers": "standard.servicebus.windows.net:9093" + }, + "bronze_database_dev": "bronze", + "bronze_database_staging": "bronze", + "bronze_database_prd": "bronze", + "bronze_table": "iot_cdc", + "bronze_reader_options": { + "maxOffsetsPerTrigger": "50000", + "startingOffsets": "latest", + "failOnDataLoss": "false", + "kafka.request.timeout.ms": "60000", + "kafka.session.timeout.ms": "60000" + }, + "bronze_table_path_dev": "tests/resources/delta/iot_cdc", + "bronze_table_path_staging": "s3://db-dlt-meta-staging/demo/data/bronze/iot_cdc", + "bronze_table_path_prd": "s3://db-dlt-meta-prod/demo/data/bronze/iot_cdc", + "bronze_data_quality_expectations_json_dev": "tests/resources/dqe/iot_cdc/bronze_data_quality_expectations.json", + "silver_database_dev": "silver", + "silver_table": "iot_cdc", + "silver_cdc_apply_changes": { + "keys": [ + "device_id" + ], + "sequence_by": "timestamp", + "scd_type": "1", + "apply_as_deletes": "operation = 'DELETE'", + "except_column_list": [] + }, + "silver_table_path_dev": "tests/resources/data/silver/iot_cdc", + "silver_transformation_json_dev": "tests/resources/silver_transformations.json", + "silver_data_quality_expectations_json_dev": "tests/resources/dqe/iot_cdc/silver_data_quality_expectations.json" + } +] \ No newline at end of file diff --git a/tests/test_dataflow_pipeline.py b/tests/test_dataflow_pipeline.py index 1d0838d..e903297 100644 --- a/tests/test_dataflow_pipeline.py +++ b/tests/test_dataflow_pipeline.py @@ -6,6 +6,7 @@ import copy from pyspark.sql.functions import lit, expr import pyspark.sql.types as T +from pyspark.sql import DataFrame from tests.utils import DLTFrameworkTestCase from unittest.mock import MagicMock, patch from src.dataflow_spec import BronzeDataflowSpec, SilverDataflowSpec @@ -126,7 +127,11 @@ def test_invoke_dlt_pipeline_bronz_positive(self, run_dlt): "bronze.dataflowspecTable", f"{database}.{bronze_dataflow_table}", ) - DataflowPipeline.invoke_dlt_pipeline(self.spark, "bronze") + + def custom_tranform_func_test(input_df) -> DataFrame: + return input_df.withColumn('custom_col', lit('test_value')) + + DataflowPipeline.invoke_dlt_pipeline(self.spark, "bronze", custom_tranform_func_test) assert run_dlt.called @patch.object(DataflowPipeline, "run_dlt", return_value={"called"}) @@ -151,7 +156,9 @@ def test_invoke_dlt_pipeline_silver_positive(self, run_dlt): .mode("overwrite").saveAsTable("bronze.transactions_cdc") ) - DataflowPipeline.invoke_dlt_pipeline(self.spark, "silver") + def custom_tranform_func_test(input_df) -> DataFrame: + return input_df.withColumn('custom_col', lit('test_value')) + DataflowPipeline.invoke_dlt_pipeline(self.spark, "silver", custom_tranform_func_test) assert run_dlt.called @patch.object(DataflowPipeline, "read", return_value={"called"}) diff --git a/tests/test_pipeline_readers.py b/tests/test_pipeline_readers.py index 70bbcdc..5b75da8 100644 --- a/tests/test_pipeline_readers.py +++ b/tests/test_pipeline_readers.py @@ -2,6 +2,9 @@ from datetime import datetime import sys import os +import json +from pyspark.sql.functions import lit, struct +from pyspark.sql.types import StructType from src.dataflow_spec import BronzeDataflowSpec from src.pipeline_readers import PipelineReaders from tests.utils import DLTFrameworkTestCase @@ -27,7 +30,9 @@ class PipelineReadersTests(DLTFrameworkTestCase): "dataFlowId": "1", "dataFlowGroup": "A1", "sourceFormat": "json", - "sourceDetails": {"path": "tests/resources/data/customers"}, + "sourceDetails": { + "path": "tests/resources/data/customers", + }, "readerConfigOptions": { }, "targetFormat": "delta", @@ -171,8 +176,18 @@ def setUp(self): self.onboarding_bronze_silver_params_map["silver_dataflowspec_path"], ) - def test_read_cloud_files_positive(self): + @patch.object(PipelineReaders, "add_cloudfiles_metadata", return_value={"called"}) + @patch.object(SparkSession, "readStream") + def test_read_cloud_files_withmetadata_cols_positive(self, SparkSession, add_cloudfiles_metadata): """Test read_cloud_files positive.""" + mock_format = MagicMock() + mock_options = MagicMock() + mock_load = MagicMock() + mock_schema = MagicMock() + SparkSession.readStream.format.return_value = mock_format + mock_format.options.return_value = mock_options + mock_options.schema.return_value = mock_schema + mock_schema.load.return_value = mock_load bronze_map = PipelineReadersTests.bronze_dataflow_spec_map schema_ddl = "tests/resources/schema/customer_schema.ddl" ddlSchemaStr = self.spark.read.text(paths=schema_ddl, wholetext=True).collect()[0]["value"] @@ -180,18 +195,34 @@ def test_read_cloud_files_positive(self): schema = spark_schema.jsonValue() schema_map = {"schema": schema} bronze_map.update(schema_map) - source_format_map = {"sourceFormat": "json"} - bronze_map.update(source_format_map) + source_metdata_json = { + "include_autoloader_metadata_column": "True", + "autoloader_metadata_col_name": "source_metadata", + "select_metadata_cols": { + "input_file_name": "_metadata.file_name", + "input_file_path": "_metadata.file_path" + } + } + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_map) + bronze_dataflow_spec.sourceDetails["source_metadata"] = json.dumps(source_metdata_json) bronze_dataflow_spec = BronzeDataflowSpec(**bronze_map) pipeline_readers = PipelineReaders( - self.spark, + SparkSession, bronze_dataflow_spec.sourceFormat, bronze_dataflow_spec.sourceDetails, bronze_dataflow_spec.readerConfigOptions, schema ) - customer_df = pipeline_readers.read_dlt_cloud_files() - self.assertIsNotNone(customer_df) + pipeline_readers.read_dlt_cloud_files() + SparkSession.readStream.format.assert_called_once_with("json") + SparkSession.readStream.format.return_value.options.assert_called_once_with( + **bronze_dataflow_spec.readerConfigOptions + ) + struct_schema = StructType.fromJson(schema) + SparkSession.readStream.format.return_value.options.return_value.schema.assert_called_once_with(struct_schema) + (SparkSession.readStream.format.return_value.options.return_value.schema + .return_value.load.assert_called_once_with(bronze_dataflow_spec.sourceDetails["path"])) + assert add_cloudfiles_metadata.called @patch.object(SparkSession, "readStream") def test_read_cloud_files_no_schema(self, SparkSession): @@ -392,3 +423,24 @@ def test_eventhub_positive(self, SparkSession): ) customer_df = pipeline_readers.read_kafka() self.assertIsNotNone(customer_df) + + def test_add_cloudfiles_metadata(self): + """Test add_cloudfiles_metadata.""" + bronze_map = PipelineReadersTests.bronze_dataflow_spec_map + source_format_map = {"sourceFormat": "json"} + bronze_map.update(source_format_map) + source_metdata_json = { + "include_autoloader_metadata_column": "True", + "autoloader_metadata_col_name": "source_metadata", + "select_metadata_cols": { + "input_file_name": "_metadata.file_name", + "input_file_path": "_metadata.file_path" + } + } + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_map) + bronze_dataflow_spec.sourceDetails["source_metadata"] = json.dumps(source_metdata_json) + df = (self.spark.read.json("tests/resources/data/customers") + .withColumn('_metadata', struct(*[lit("filename").alias("file_name"), + lit("file_path").alias('file_path')]))) + df = PipelineReaders.add_cloudfiles_metadata(bronze_dataflow_spec.sourceDetails, df) + self.assertIsNotNone(df)