Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues/49 #56

Merged
merged 12 commits into from
Jul 6, 2024
Merged

Issues/49 #56

merged 12 commits into from
Jul 6, 2024

Conversation

ravi-databricks
Copy link
Contributor

@ravi-databricks ravi-databricks commented Jun 25, 2024

Need to provide ability so that file metadata can be added to dataframe

e.g

import dlt
@dlt.table
def bronze():
  return (spark.readStream.format("cloudFiles")
    # define the schema for the ~6 common columns across files.  All other input fields will be "rescued" into a JSON string column that can be queried via dot notation.
    .schema("Common1 string, Common2 string, _file_path string") # _file_path is a hidden auto-field but shows up in rescueData column JSON with this method.  Spoofing that I have the same column in my input file so i can drop this spoofed column later
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .option("cloudFiles.rescuedDataColumn","extraFields") # override default _rescuedData column name with whatever you want to call this column 
    .option("header","true")
    .load("/Volumes/vol/data/*.txt")
    .select("*","_metadata") # add file metadata information to output
    .drop("_file_path") # discard dummy input column to keep _file_path out of extraFields rescue data column
  )

Introduced select_metadata_cols inside source_details in onboarding file

      "source_metadata": {
            "include_autoloader_metadata_column": "True",
            "autoloader_metadata_col_name": "source_metadata",
            "select_metadata_cols": {
               "input_file_name": "_metadata.file_name",
               "input_file_path": "_metadata.file_path"
            }

This will be utilized to add metadata columns to target tables.

Introducing custom_transform_func so that customers can bring their own transformations:
e.g.
you have custom transformations defined as python code as show below which takes dataframe as input and returns transformed dataframe

from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
def custom_tranform_func_test(input_df) -> DataFrame:
  return input_df.withColumn('custom_col', lit('test'))

you should able to pass this function to dlt-meta invoke function as below

layer = spark.conf.get("layer", None)
from src.dataflow_pipeline import DataflowPipeline
DataflowPipeline.invoke_dlt_pipeline(spark, layer, custom_tranform_func=custom_tranform_func_test)

Copy link

codecov bot commented Jun 25, 2024

Codecov Report

Attention: Patch coverage is 56.25000% with 14 lines in your changes missing coverage. Please review.

Please upload report for BASE (feature/v0.0.8@3555aaa). Learn more about missing BASE report.

Files Patch % Lines
src/pipeline_readers.py 33.33% 13 Missing and 1 partial ⚠️
Additional details and impacted files
@@                Coverage Diff                @@
##             feature/v0.0.8      #56   +/-   ##
=================================================
  Coverage                  ?   88.57%           
=================================================
  Files                     ?        8           
  Lines                     ?      858           
  Branches                  ?      168           
=================================================
  Hits                      ?      760           
  Misses                    ?       46           
  Partials                  ?       52           
Flag Coverage Δ
unittests 88.57% <56.25%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

1.Custom Transformations for bronze layer
2.Unit tests
3.Increased version to v0.0.8
@ravi-databricks ravi-databricks added this to the v0.0.8 milestone Jul 3, 2024
@ravi-databricks ravi-databricks added the enhancement New feature or request label Jul 3, 2024
@@ -17,7 +17,7 @@ draft: false
| data_flow_id | This is unique identifer for pipeline |
| data_flow_group | This is group identifer for launching multiple pipelines under single DLT |
| source_format | Source format e.g `cloudFiles`, `eventhub`, `kafka`, `delta` |
| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database` and for eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.accessKeySecretName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported <br> In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization <br> .e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` |
| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database`, `source_metadata` For eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.accessKeySecretName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported <br> In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization <br> .e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a period before "For eventhub="

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

@ravi-databricks ravi-databricks merged commit f82e08b into feature/v0.0.8 Jul 6, 2024
1 check passed
@ravi-databricks ravi-databricks mentioned this pull request Jul 29, 2024
@ravi-databricks ravi-databricks deleted the issues/49 branch August 9, 2024 17:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants