Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues/49 #56

Merged
merged 12 commits into from
Jul 6, 2024
6 changes: 3 additions & 3 deletions docs/content/getting_started/additionals2.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ draft: false
4. If you already have datatbricks CLI installed with profile as given [here](https://docs.databricks.com/en/dev-tools/cli/profiles.html), you can skip above export step and provide `--profile=<your databricks profile name>` option while running command in step:4

5. Run integration test against cloudfile or eventhub or kafka using below options:
- 5a. Run the command for cloudfiles ```python integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=cloudfiles --dbfs_path=dbfs:/tmp/DLT-META/```
- 5a. Run the command for cloudfiles ```python integration-tests/run_integration_tests.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=cloudfiles --dbfs_path=dbfs:/tmp/DLT-META/```

- 5b. Run the command for eventhub ```python integration-tests/run-integration-test.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer --eventhub_consumer_accesskey_name=consumer```
- 5b. Run the command for eventhub ```python integration-tests/run_integration_tests.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer --eventhub_consumer_accesskey_name=consumer```

- - For eventhub integration tests, the following are the prerequisites:
1. Needs eventhub instance running
Expand All @@ -36,7 +36,7 @@ draft: false
6. Provide eventhub access key name : --eventhub_consumer_accesskey_name


- 5c. Run the command for kafka ```python3 integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=kafka --dbfs_path=dbfs:/tmp/DLT-META/ --kafka_topic_name=dlt-meta-integration-test --kafka_broker=host:9092```
- 5c. Run the command for kafka ```python3 integration-tests/run_integration_tests.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=kafka --dbfs_path=dbfs:/tmp/DLT-META/ --kafka_topic_name=dlt-meta-integration-test --kafka_broker=host:9092```

- - For kafka integration tests, the following are the prerequisites:
1. Needs kafka instance running
Expand Down
2 changes: 1 addition & 1 deletion docs/content/getting_started/metadatapreperation.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ draft: false
| data_flow_id | This is unique identifer for pipeline |
| data_flow_group | This is group identifer for launching multiple pipelines under single DLT |
| source_format | Source format e.g `cloudFiles`, `eventhub`, `kafka`, `delta` |
| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database` and for eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.accessKeySecretName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported <br> In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization <br> .e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` |
| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database`, `source_metadata` For eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.accessKeySecretName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported <br> In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization <br> .e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a period before "For eventhub="

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

| bronze_database_{env} | Delta lake bronze database name. |
| bronze_table | Delta lake bronze table name |
| bronze_reader_options | Reader options which can be provided to spark reader <br> e.g multiline=true,header=true in json format |
Expand Down
10 changes: 9 additions & 1 deletion examples/onboarding.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@
"source_details": {
"source_database": "APP",
"source_table": "CUSTOMERS",
"source_path_it": "{dbfs_path}/cdc_raw/customers"
"source_path_it": "{dbfs_path}/cdc_raw/customers",
"source_metadata": {
"include_autoloader_metadata_column": "True",
"autoloader_metadata_col_name": "source_metadata",
"select_metadata_cols": {
"input_file_name": "_metadata.file_name",
"input_file_path": "_metadata.file_path"
}
}
},
"bronze_database_it": "bronze_it_{run_id}",
"bronze_table": "customers_cdc",
Expand Down
11 changes: 11 additions & 0 deletions integration_tests/conf/cloudfiles-onboarding.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
"source_database": "APP",
"source_table": "CUSTOMERS",
"source_path_it": "{dbfs_path}/integration_tests/resources/data/customers",
"source_metadata": {
"include_autoloader_metadata_column": "True",
"autoloader_metadata_col_name": "source_metadata",
"select_metadata_cols": {
"input_file_name": "_metadata.file_name",
"input_file_path": "_metadata.file_path"
}
},
"source_schema_path": "{dbfs_path}/integration_tests/resources/customers.ddl"
},
"bronze_database_it": "{uc_catalog_name}.{bronze_schema}",
Expand Down Expand Up @@ -80,6 +88,9 @@
"source_database": "APP",
"source_table": "TRANSACTIONS",
"source_path_it": "{dbfs_path}/integration_tests/resources/data/transactions",
"source_metadata": {
"include_autoloader_metadata_column": "True"
},
"source_schema_path": "{dbfs_path}/integration_tests/resources/transactions.ddl"
},
"bronze_database_it": "{uc_catalog_name}.{bronze_schema}",
Expand Down
5 changes: 3 additions & 2 deletions integration_tests/run_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,8 @@ def create_cluster(self, runner_conf: DLTMetaRunnerConf):
spark_conf=spark_confs,
autotermination_minutes=30,
spark_env_vars={
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
"PYSPARK_PYTHON": "/databricks/python3/bin/python3",
"WSFS_ENABLE": "false"
},
data_security_mode=mode
).result()
Expand All @@ -742,7 +743,7 @@ def run(self, runner_conf: DLTMetaRunnerConf):
print(e)
finally:
print("Cleaning up...")
# self.clean_up(runner_conf)
self.clean_up(runner_conf)

def download_test_results(self, runner_conf: DLTMetaRunnerConf):
ws_output_file = self.ws.workspace.download(runner_conf.test_output_file_path)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""
setup(
name="dlt_meta",
version="0.0.7",
version="0.0.8",
python_requires=">=3.8",
setup_requires=["wheel>=0.37.1,<=0.42.0"],
install_requires=INSTALL_REQUIRES,
Expand Down
2 changes: 1 addition & 1 deletion src/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.0.5'
__version__ = '0.0.8'
32 changes: 23 additions & 9 deletions src/dataflow_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,23 @@ class DataflowPipeline:
[type]: [description]
"""

def __init__(self, spark, dataflow_spec, view_name, view_name_quarantine=None):
def __init__(self, spark, dataflow_spec, view_name, view_name_quarantine=None, custom_tranform_func=None):
"""Initialize Constructor."""
logger.info(
f"""dataflowSpec={dataflow_spec} ,
view_name={view_name},
view_name_quarantine={view_name_quarantine}"""
)
if isinstance(dataflow_spec, BronzeDataflowSpec) or isinstance(dataflow_spec, SilverDataflowSpec):
self.__initialize_dataflow_pipeline(spark, dataflow_spec, view_name, view_name_quarantine)
self.__initialize_dataflow_pipeline(
spark, dataflow_spec, view_name, view_name_quarantine, custom_tranform_func
)
else:
raise Exception("Dataflow not supported!")

def __initialize_dataflow_pipeline(self, spark, dataflow_spec, view_name, view_name_quarantine):
def __initialize_dataflow_pipeline(
self, spark, dataflow_spec, view_name, view_name_quarantine, custom_tranform_func
):
"""Initialize dataflow pipeline state."""
self.spark = spark
uc_enabled_str = spark.conf.get("spark.databricks.unityCatalog.enabled", "False")
Expand All @@ -76,6 +80,7 @@ def __initialize_dataflow_pipeline(self, spark, dataflow_spec, view_name, view_n
self.view_name = view_name
if view_name_quarantine:
self.view_name_quarantine = view_name_quarantine
self.custom_tranform_func = custom_tranform_func
if dataflow_spec.cdcApplyChanges:
self.cdcApplyChanges = DataflowSpecUtils.get_cdc_apply_changes(self.dataflowSpec.cdcApplyChanges)
else:
Expand Down Expand Up @@ -205,14 +210,21 @@ def read_bronze(self) -> DataFrame:
self.schema_json
)
bronze_dataflow_spec: BronzeDataflowSpec = self.dataflowSpec
input_df = None
if bronze_dataflow_spec.sourceFormat == "cloudFiles":
return pipeline_reader.read_dlt_cloud_files()
input_df = pipeline_reader.read_dlt_cloud_files()
elif bronze_dataflow_spec.sourceFormat == "delta":
return PipelineReaders.read_dlt_delta()
input_df = PipelineReaders.read_dlt_delta()
elif bronze_dataflow_spec.sourceFormat == "eventhub" or bronze_dataflow_spec.sourceFormat == "kafka":
return PipelineReaders.read_kafka()
input_df = PipelineReaders.read_kafka()
else:
raise Exception(f"{bronze_dataflow_spec.sourceFormat} source format not supported")
return self.apply_custom_transform_fun(input_df)

def apply_custom_transform_fun(self, input_df):
if self.custom_tranform_func:
input_df = self.custom_tranform_func(input_df)
return input_df

def get_silver_schema(self):
"""Get Silver table Schema."""
Expand Down Expand Up @@ -266,7 +278,7 @@ def read_silver(self) -> DataFrame:
if len(where_clause_str.strip()) > 0:
for where_clause in where_clause:
raw_delta_table_stream = raw_delta_table_stream.where(where_clause)
return raw_delta_table_stream
return self.apply_custom_transform_fun(raw_delta_table_stream)

def write_to_delta(self):
"""Write to Delta."""
Expand Down Expand Up @@ -483,7 +495,7 @@ def run_dlt(self):
self.write()

@staticmethod
def invoke_dlt_pipeline(spark, layer):
def invoke_dlt_pipeline(spark, layer, custom_tranform_func=None):
"""Invoke dlt pipeline will launch dlt with given dataflowspec.

Args:
Expand All @@ -504,7 +516,8 @@ def invoke_dlt_pipeline(spark, layer):
and dataflowSpec.quarantineTargetDetails != {}:
quarantine_input_view_name = (
f"{dataflowSpec.quarantineTargetDetails['table']}"
f"_{layer}_quarantine_inputView"
f"_{layer}_quarantine_inputView",
custom_tranform_func
)
else:
logger.info("quarantine_input_view_name set to None")
Expand All @@ -513,6 +526,7 @@ def invoke_dlt_pipeline(spark, layer):
dataflowSpec,
f"{dataflowSpec.targetDetails['table']}_{layer}_inputView",
quarantine_input_view_name,
custom_tranform_func
)

dlt_data_flow.run_dlt()
9 changes: 8 additions & 1 deletion src/onboard_dataflowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ def __get_bronze_dataflow_spec_dataframe(self, onboarding_df, env):
raise Exception(f"Source format {source_format} not supported in DLT-META! row={onboarding_row}")
source_details, bronze_reader_config_options, schema = self.get_bronze_source_details_reader_options_schema(
onboarding_row, env)

bronze_target_format = "delta"
bronze_target_details = {
"database": onboarding_row["bronze_database_{}".format(env)],
Expand Down Expand Up @@ -611,6 +610,14 @@ def get_bronze_source_details_reader_options_schema(self, onboarding_row, env):
source_details["source_database"] = source_details_file["source_database"]
if "source_table" in source_details_file:
source_details["source_table"] = source_details_file["source_table"]
if "source_metadata" in source_details_file:
source_metadata_dict = self.__delete_none(source_details_file["source_metadata"].asDict())
if "select_metadata_cols" in source_metadata_dict:
select_metadata_cols = self.__delete_none(
source_metadata_dict["select_metadata_cols"].asDict()
)
source_metadata_dict["select_metadata_cols"] = select_metadata_cols
source_details["source_metadata"] = json.dumps(self.__delete_none(source_metadata_dict))
elif source_format.lower() == "eventhub" or source_format.lower() == "kafka":
source_details = source_details_file
if "source_schema_path" in source_details_file:
Expand Down
26 changes: 23 additions & 3 deletions src/pipeline_readers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""PipelineReaders providers DLT readers functionality."""
import logging
import json
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType
from pyspark.sql.functions import from_json, col
Expand Down Expand Up @@ -29,22 +30,41 @@ def read_dlt_cloud_files(self) -> DataFrame:
DataFrame: _description_
"""
logger.info("In read_dlt_cloud_files func")
input_df = None
source_path = self.source_details["path"]

if self.schema_json and self.source_format != "delta":
schema = StructType.fromJson(self.schema_json)
return (
input_df = (
self.spark.readStream.format(self.source_format)
.options(**self.reader_config_options)
.schema(schema)
.load(source_path)
)
else:
return (
input_df = (
self.spark.readStream.format(self.source_format)
.options(**self.reader_config_options)
.load(source_path)
)
if self.source_details and "source_metadata" in self.source_details.keys():
input_df = PipelineReaders.add_cloudfiles_metadata(self.source_details, input_df)
return input_df

@staticmethod
def add_cloudfiles_metadata(sourceDetails, input_df):
source_metadata_json = json.loads(sourceDetails.get("source_metadata"))
keys = source_metadata_json.keys()
if "include_autoloader_metadata_column" in keys:
if "autoloader_metadata_col_name" in source_metadata_json:
source_metadata_col_name = source_metadata_json["autoloader_metadata_col_name"]
input_df = input_df.selectExpr("*", f"_metadata as {source_metadata_col_name}")
else:
input_df = input_df.selectExpr("*", "_metadata as source_metadata")
if "select_metadata_cols" in source_metadata_json:
select_metadata_cols = source_metadata_json["select_metadata_cols"]
for select_metadata_col in select_metadata_cols:
input_df = input_df.withColumn(select_metadata_col, col(select_metadata_cols[select_metadata_col]))
return input_df

def read_dlt_delta(self) -> DataFrame:
"""Read dlt delta.
Expand Down
Loading
Loading