Skip to content

Commit

Permalink
Merge pull request #14 from msdotnetclr/issue-13
Browse files Browse the repository at this point in the history
introduced new source detail option for eventhub source:  eventhub.accessKeySecretName
Details can be found [here](#13)
  • Loading branch information
ravi-databricks authored Oct 12, 2023
2 parents 28ddde6 + ecbbe81 commit f5f6c34
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 8 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

[Please read through the Keep a Changelog (~5min)](https://keepachangelog.com/en/1.0.0/).

## [v0.0.4] - 2023-10-09
### Added
- Functionality to introduce an new option for event hub configuration. Namely a source_details option 'eventhub.accessKeySecretName' to properly construct the eh_shared_key_value properly. Without this option, there were errors while connecting to the event hub service (linked to [issue-13 - java.lang.RuntimeException: non-nullable field authBytes was serialized as null #13](https://github.com/databrickslabs/dlt-meta/issues/13))

## [v0.0.3] - 2023-06-07
### Fixed
- infer datatypes from sequence_by to __START_AT, __END_AT for apply changes API
Expand Down
6 changes: 4 additions & 2 deletions docs/content/getting_started/additionals.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export DATABRICKS_TOKEN=<DATABRICKS TOKEN> # Account needs permission to create
5. Run integration test against cloudfile or eventhub or kafka using below options:
5a. Run the command for cloudfiles ```python integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=cloudfiles --dbfs_path=dbfs:/tmp/DLT-META/```

5b. Run the command for eventhub ```python integration-tests/run-integration-test.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer --eventhub_consumer_accesskey_name=consumer```
5b. Run the command for eventhub ```python integration-tests/run-integration-test.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer --eventhub_producer_accesskey_secret_name=producer --eventhub_consumer_accesskey_name=consumer --eventhub_consumer_accesskey_secret_name=consumer```

For eventhub integration tests, the following are the prerequisites:
1. Needs eventhub instance running
Expand All @@ -36,7 +36,9 @@ export DATABRICKS_TOKEN=<DATABRICKS TOKEN> # Account needs permission to create
3. Provide eventhub port : --eventhub_port
4. Provide databricks secret scope name : --eventhub_secrets_scope_name
5. Provide eventhub producer access key name : --eventhub_producer_accesskey_name
6. Provide eventhub access key name : --eventhub_consumer_accesskey_name
6. Provide eventhub consumer access key name : --eventhub_consumer_accesskey_name
7. Provide eventhub producer access key secret name : --eventhub_producer_accesskey_secret_name
8. Provide eventhub consumer access key secret name : --eventhub_consumer_accesskey_secret_name


5c. Run the command for kafka ```python3 integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=kafka --dbfs_path=dbfs:/tmp/DLT-META/ --kafka_topic_name=dlt-meta-integration-test --kafka_broker=host:9092```
Expand Down
2 changes: 1 addition & 1 deletion docs/content/getting_started/metadatapreperation.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ draft: false
| data_flow_id | This is unique identifer for pipeline |
| data_flow_group | This is group identifer for launching multiple pipelines under single DLT |
| source_format | Source format e.g `cloudFiles`, `eventhub`, `kafka`, `delta` |
| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database` and for eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported <br> In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization <br> .e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` |
| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database` and for eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.accessKeySecretName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported <br> In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization <br> .e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` |
| bronze_database_{env} | Delta lake bronze database name. |
| bronze_table | Delta lake bronze table name |
| bronze_reader_options | Reader options which can be provided to spark reader <br> e.g multiline=true,header=true in json format |
Expand Down
3 changes: 2 additions & 1 deletion examples/onboarding.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
"source_schema_path": "{dbfs_path}/integration-tests/resources/eventhub_iot_schema.ddl",
"eventhub.accessKeyName": "{eventhub_accesskey_name}",
"eventhub.name": "{eventhub_name}",
"eventhub.accessKeySecretName": "{eventhub_accesskey_secret_name}",
"eventhub.secretsScopeName": "{eventhub_secrets_scope_name}",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
Expand Down Expand Up @@ -120,4 +121,4 @@
"silver_table_path_prd": "",
"silver_transformation_json_it": ""
}
]
]
3 changes: 2 additions & 1 deletion integration-tests/conf/dlt-meta/eventhub-onboarding.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"source_schema_path": "{dbfs_path}/integration-tests/resources/eventhub_iot_schema.ddl",
"eventhub.accessKeyName": "{eventhub_accesskey_name}",
"eventhub.name": "{eventhub_name}",
"eventhub.accessKeySecretName": "{eventhub_accesskey_secret_name}",
"eventhub.secretsScopeName": "{eventhub_secrets_scope_name}",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
Expand All @@ -16,7 +17,7 @@
},
"bronze_reader_options": {
"maxOffsetsPerTrigger": "50000",
"startingOffsets": "latest",
"startingOffsets": "earliest",
"failOnDataLoss": "false",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "60000"
Expand Down
Binary file modified integration-tests/eventhub_runners.dbc
Binary file not shown.
16 changes: 15 additions & 1 deletion integration-tests/run-integration-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def create_eventhub_workflow_spec(args, job_spec_dict):
"eventhub_namespace": args.__getattribute__("eventhub_namespace"),
"eventhub_secrets_scope_name": args.__getattribute__("eventhub_secrets_scope_name"),
"eventhub_accesskey_name": args.__getattribute__("eventhub_producer_accesskey_name"),
"eventhub_accesskey_secret_name": args.__getattribute__("eventhub_producer_accesskey_secret_name"),
"eventhub_input_data": f"/{dbfs_file_path}/integration-tests/resources/data/iot/iot.json"
}
}
Expand Down Expand Up @@ -498,6 +499,7 @@ def create_eventhub_onboarding(args, eventhub_template, dbfs_tmp_path, run_id):
onboard_obj = json.load(f)
eventhub_name = args.__getattribute__("eventhub_name").lower()
eventhub_accesskey_name = args.__getattribute__("eventhub_consumer_accesskey_name").lower()
eventhub_accesskey_secret_name = args.__getattribute__("eventhub_consumer_accesskey_secret_name").lower()
eventhub_secrets_scope_name = args.__getattribute__("eventhub_secrets_scope_name").lower()
eventhub_namespace = args.__getattribute__("eventhub_namespace").lower()
eventhub_port = args.__getattribute__("eventhub_port").lower()
Expand All @@ -511,6 +513,8 @@ def create_eventhub_onboarding(args, eventhub_template, dbfs_tmp_path, run_id):
data_flow[key][source_key] = source_value.format(eventhub_name=eventhub_name)
if 'eventhub_accesskey_name' in source_value:
data_flow[key][source_key] = source_value.format(eventhub_accesskey_name=eventhub_accesskey_name)
if 'eventhub_accesskey_secret_name' in source_value:
data_flow[key][source_key] = source_value.format(eventhub_accesskey_secret_name=eventhub_accesskey_secret_name)
if 'eventhub_secrets_scope_name' in source_value:
data_flow[key][source_key] = source_value.format(eventhub_secrets_scope_name=eventhub_secrets_scope_name)
if 'eventhub_nmspace' in source_value:
Expand Down Expand Up @@ -662,6 +666,8 @@ def process_arguments():
parser.add_argument("--eventhub_name", help="Provide eventhub_name e.g --eventhub_name=iot")
parser.add_argument("--eventhub_producer_accesskey_name", help="Provide access key that has write permission on the eventhub e.g --eventhub_producer_accesskey_name=iotProducerAccessKey")
parser.add_argument("--eventhub_consumer_accesskey_name", help="Provide access key that has read permission on the eventhub e.g --eventhub_consumer_accesskey_name=iotConsumerAccessKey")
parser.add_argument("--eventhub_producer_accesskey_secret_name", help="Provide name of the secret that stores access key with write permission on the eventhub. Optional if same as `eventhub_producer_accesskey_name` e.g --eventhub_producer_accesskey_secret_name=iotProducerAccessKey")
parser.add_argument("--eventhub_consumer_accesskey_secret_name", help="Provide name of the secret that stores access key with read permission on the eventhub. Optional if same as `eventhub_consumer_accesskey_name` e.g --eventhub_consumer_accesskey_secret_name=iotConsumerAccessKey")
parser.add_argument("--eventhub_secrets_scope_name",
help="Provide eventhub_secrets_scope_name e.g --eventhub_secrets_scope_name=eventhubs_creds")
parser.add_argument("--eventhub_namespace", help="Provide eventhub_namespace e.g --eventhub_namespace=topic-standard")
Expand All @@ -684,7 +690,15 @@ def process_arguments():
if source.lower() not in supported_sources:
raise Exception("Invalid value for --source! Supported values: --source=cloudfiles")
if source.lower() == "eventhub":
eventhub_madatory_args = ["eventhub_name", "eventhub_producer_accesskey_name", "eventhub_consumer_accesskey_name", "eventhub_secrets_scope_name", "eventhub_namespace", "eventhub_port"]
eventhub_madatory_args = ["eventhub_name",
"eventhub_producer_accesskey_name",
"eventhub_consumer_accesskey_name",
"eventhub_secrets_scope_name",
"eventhub_producer_accesskey_secret_name",
"eventhub_consumer_accesskey_secret_name",
"eventhub_namespace",
"eventhub_port"
]
check_mandatory_arg(args, eventhub_madatory_args)
if source.lower() == "kafka":
kafka_madatory_args = ["kafka_topic_name", "kafka_broker"]
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""
setup(
name="dlt_meta",
version="0.0.3",
version="0.0.4",
python_requires=">=3.8",
setup_requires=["wheel>=0.37.1,<=0.41.2"],
install_requires=INSTALL_REQUIRES,
Expand Down
6 changes: 5 additions & 1 deletion src/pipeline_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,12 @@ def get_eventhub_kafka_options(spark, bronze_dataflow_spec):
eh_port = bronze_dataflow_spec.sourceDetails.get("eventhub.port")
eh_name = bronze_dataflow_spec.sourceDetails.get("eventhub.name")
eh_shared_key_name = bronze_dataflow_spec.sourceDetails.get("eventhub.accessKeyName")
secret_name = bronze_dataflow_spec.sourceDetails.get("eventhub.accessKeySecretName")
if not secret_name:
# set default value if "eventhub.accessKeySecretName" is not specified
secret_name = eh_shared_key_name
secret_scope = bronze_dataflow_spec.sourceDetails.get("eventhub.secretsScopeName")
eh_shared_key_value = dbutils.secrets.get(secret_scope, eh_shared_key_name)
eh_shared_key_value = dbutils.secrets.get(secret_scope, secret_name)
eh_shared_key_value = f"SharedAccessKeyName={eh_shared_key_name};SharedAccessKey={eh_shared_key_value}"
eh_conn_str = f"Endpoint=sb://{eh_namespace}.servicebus.windows.net/;{eh_shared_key_value}"
eh_kafka_str = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule"
Expand Down
1 change: 1 addition & 0 deletions tests/resources/onboarding.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
"source_schema_path": "tests/resources/schema/eventhub_iot_schema.ddl",
"eventhub.accessKeyName": "iotIngestionAccessKey",
"eventhub.name": "iot",
"eventhub.accessKeySecretName": "iotIngestionAccessKey",
"eventhub.secretsScopeName": "eventhubs_creds",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
Expand Down
1 change: 1 addition & 0 deletions tests/resources/onboarding_eventhub.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"source_schema_path": "tests/resources/schema/eventhub_iot_schema.ddl",
"eventhub.accessKeyName": "iotIngestionAccessKey",
"eventhub.name": "iot",
"eventhub.accessKeySecretName": "iotIngestionAccessKey",
"eventhub.secretsScopeName": "eventhubs_creds",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
Expand Down
1 change: 1 addition & 0 deletions tests/resources/onboarding_v2.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
"source_schema_path": "tests/resources/schema/eventhub_iot_schema.ddl",
"eventhub.accessKeyName": "iotIngestionAccessKey",
"eventhub.name": "iot",
"eventhub.accessKeySecretName": "iotIngestionAccessKey",
"eventhub.secretsScopeName": "eventhubs_creds",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
Expand Down
48 changes: 48 additions & 0 deletions tests/test_pipeline_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class PipelineReadersTests(DLTFrameworkTestCase):
"source_schema_path": "tests/resources/schema/eventhub_iot_schema.ddl",
"eventhub.accessKeyName": "iotIngestionAccessKey",
"eventhub.name": "iot",
"eventhub.accessKeySecretName": "iotIngestionAccessKey",
"eventhub.secretsScopeName": "eventhubs_creds",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
Expand Down Expand Up @@ -80,6 +81,44 @@ class PipelineReadersTests(DLTFrameworkTestCase):
"updateDate": datetime.now,
"updatedBy": "dlt-meta-unittest"
}

bronze_eventhub_dataflow_spec_omit_secret_map = {
"dataFlowId": "1",
"dataFlowGroup": "A1",
"sourceFormat": "eventhub",
"sourceDetails": {
"source_schema_path": "tests/resources/schema/eventhub_iot_schema.ddl",
"eventhub.accessKeyName": "iotIngestionAccessKey",
"eventhub.name": "iot",
"eventhub.secretsScopeName": "eventhubs_creds",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"eventhub.namespace": "ganesh-standard",
"eventhub.port": "9093"
},
"readerConfigOptions": {
"maxOffsetsPerTrigger": "50000",
"startingOffsets": "latest",
"failOnDataLoss": "false",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "60000"
},
"targetFormat": "delta",
"targetDetails": {"database": "bronze", "table": "customer", "path": "tests/localtest/delta/customers"},
"tableProperties": {},
"schema": None,
"partitionColumns": [""],
"cdcApplyChanges": None,
"dataQualityExpectations": None,
"quarantineTargetDetails": None,
"quarantineTableProperties": None,
"version": "v1",
"createDate": datetime.now,
"createdBy": "dlt-meta-unittest",
"updateDate": datetime.now,
"updatedBy": "dlt-meta-unittest"
}

bronze_kafka_dataflow_spec_map = {
"dataFlowId": "1",
"dataFlowGroup": "A1",
Expand Down Expand Up @@ -174,6 +213,15 @@ def test_get_eventhub_kafka_options(self, get_db_utils, dbutils):
kafka_options = PipelineReaders.get_eventhub_kafka_options(self.spark, bronze_dataflow_spec)
self.assertIsNotNone(kafka_options)

@patch.object(PipelineReaders, "get_db_utils", return_value=dbutils)
@patch.object(dbutils, "secrets.get", return_value={"called"})
def test_get_eventhub_kafka_options_omit_secret(self, get_db_utils, dbutils):
"""Test Get kafka options."""
bronze_map = PipelineReadersTests.bronze_eventhub_dataflow_spec_omit_secret_map
bronze_dataflow_spec = BronzeDataflowSpec(**bronze_map)
kafka_options = PipelineReaders.get_eventhub_kafka_options(self.spark, bronze_dataflow_spec)
self.assertIsNotNone(kafka_options)

@patch.object(PipelineReaders, "get_db_utils", return_value=dbutils)
@patch.object(dbutils, "secrets.get", return_value={"called"})
def test_get_kafka_options_ssl_exception(self, get_db_utils, dbutils):
Expand Down

0 comments on commit f5f6c34

Please sign in to comment.