diff --git a/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py b/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py index 67a5b3630a455..9d2a65663ba37 100644 --- a/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py +++ b/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py @@ -31,6 +31,7 @@ class AssertionCircuitBreaker(AbstractCircuitBreaker): The circuit breaker checks if there are passing assertion on the Dataset. """ + config: AssertionCircuitBreakerConfig def __init__(self, config: AssertionCircuitBreakerConfig): diff --git a/metadata-ingestion/src/datahub/api/circuit_breaker/circuit_breaker.py b/metadata-ingestion/src/datahub/api/circuit_breaker/circuit_breaker.py index f8554334281d8..a3c54046faf68 100644 --- a/metadata-ingestion/src/datahub/api/circuit_breaker/circuit_breaker.py +++ b/metadata-ingestion/src/datahub/api/circuit_breaker/circuit_breaker.py @@ -34,9 +34,11 @@ def __init__( # Select your transport with a defined url endpoint self.transport = RequestsHTTPTransport( url=datahub_host + "/api/graphql", - headers={"Authorization": "Bearer " + datahub_token} - if datahub_token is not None - else None, + headers=( + {"Authorization": "Bearer " + datahub_token} + if datahub_token is not None + else None + ), method="POST", timeout=timeout, ) diff --git a/metadata-ingestion/src/datahub/api/entities/datacontract/datacontract.py b/metadata-ingestion/src/datahub/api/entities/datacontract/datacontract.py index e0ef85d5fd66c..4a68fa6c66ada 100644 --- a/metadata-ingestion/src/datahub/api/entities/datacontract/datacontract.py +++ b/metadata-ingestion/src/datahub/api/entities/datacontract/datacontract.py @@ -179,14 +179,16 @@ def generate_mcp( aspects=[ DataContractPropertiesClass( entity=self.entity, - schema=[SchemaContractClass(assertion=schema_assertion_urn)] - if schema_assertion_urn - else None, - freshness=[ - FreshnessContractClass(assertion=freshness_assertion_urn) - ] - if freshness_assertion_urn - else None, + schema=( + [SchemaContractClass(assertion=schema_assertion_urn)] + if schema_assertion_urn + else None + ), + freshness=( + [FreshnessContractClass(assertion=freshness_assertion_urn)] + if freshness_assertion_urn + else None + ), dataQuality=[ DataQualityContractClass(assertion=dq_assertion_urn) for dq_assertion_urn in dq_assertions @@ -195,9 +197,11 @@ def generate_mcp( # Also emit status. StatusClass(removed=False), # Emit the contract state as PENDING. - DataContractStatusClass(state=DataContractStateClass.PENDING) - if True - else None, + ( + DataContractStatusClass(state=DataContractStateClass.PENDING) + if True + else None + ), ], ) diff --git a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py index bf521ded5dbf3..d406fa36e00db 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py +++ b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py @@ -190,14 +190,16 @@ def end_event_mcp( timestampMillis=end_timestamp_millis, result=DataProcessInstanceRunResultClass( type=result, - nativeResultType=result_type - if result_type is not None - else self.orchestrator, + nativeResultType=( + result_type if result_type is not None else self.orchestrator + ), ), attempt=attempt, - durationMillis=(end_timestamp_millis - start_timestamp_millis) - if start_timestamp_millis - else None, + durationMillis=( + (end_timestamp_millis - start_timestamp_millis) + if start_timestamp_millis + else None + ), ), ) yield mcp @@ -258,9 +260,11 @@ def generate_mcp( aspect=DataProcessInstanceRelationships( upstreamInstances=[str(urn) for urn in self.upstream_urns], parentTemplate=str(self.template_urn) if self.template_urn else None, - parentInstance=str(self.parent_instance) - if self.parent_instance is not None - else None, + parentInstance=( + str(self.parent_instance) + if self.parent_instance is not None + else None + ), ), ) yield mcp diff --git a/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py b/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py index 64b10d31487e0..8f58fa469a7d9 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py +++ b/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py @@ -343,27 +343,31 @@ def from_datahub(cls, graph: DataHubGraph, id: str) -> DataProduct: tags: Optional[GlobalTagsClass] = graph.get_aspect(id, GlobalTagsClass) return DataProduct( id=id, - display_name=data_product_properties.name - if data_product_properties - else None, + display_name=( + data_product_properties.name if data_product_properties else None + ), domain=domains.domains[0], - description=data_product_properties.description - if data_product_properties - else None, - assets=[e.destinationUrn for e in data_product_properties.assets or []] - if data_product_properties - else None, + description=( + data_product_properties.description if data_product_properties else None + ), + assets=( + [e.destinationUrn for e in data_product_properties.assets or []] + if data_product_properties + else None + ), owners=yaml_owners, - terms=[term.urn for term in glossary_terms.terms] - if glossary_terms - else None, + terms=( + [term.urn for term in glossary_terms.terms] if glossary_terms else None + ), tags=[tag.tag for tag in tags.tags] if tags else None, - properties=data_product_properties.customProperties - if data_product_properties - else None, - external_url=data_product_properties.externalUrl - if data_product_properties - else None, + properties=( + data_product_properties.customProperties + if data_product_properties + else None + ), + external_url=( + data_product_properties.externalUrl if data_product_properties else None + ), ) def _patch_ownership( diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index f9a188c65feef..315f2249d2e5c 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -94,20 +94,24 @@ def from_schema_field( description=schema_field.description, label=schema_field.label, created=schema_field.created.__dict__ if schema_field.created else None, - lastModified=schema_field.lastModified.__dict__ - if schema_field.lastModified - else None, + lastModified=( + schema_field.lastModified.__dict__ + if schema_field.lastModified + else None + ), recursive=schema_field.recursive, - globalTags=schema_field.globalTags.__dict__ - if schema_field.globalTags - else None, - glossaryTerms=schema_field.glossaryTerms.__dict__ - if schema_field.glossaryTerms - else None, + globalTags=( + schema_field.globalTags.__dict__ if schema_field.globalTags else None + ), + glossaryTerms=( + schema_field.glossaryTerms.__dict__ + if schema_field.glossaryTerms + else None + ), isPartitioningKey=schema_field.isPartitioningKey, - jsonProps=json.loads(schema_field.jsonProps) - if schema_field.jsonProps - else None, + jsonProps=( + json.loads(schema_field.jsonProps) if schema_field.jsonProps else None + ), ) @validator("urn", pre=True, always=True) @@ -300,9 +304,11 @@ def generate_mcp( properties=[ StructuredPropertyValueAssignmentClass( propertyUrn=f"urn:li:structuredProperty:{prop_key}", - values=prop_value - if isinstance(prop_value, list) - else [prop_value], + values=( + prop_value + if isinstance(prop_value, list) + else [prop_value] + ), ) for prop_key, prop_value in field.structured_properties.items() ] @@ -359,9 +365,11 @@ def generate_mcp( properties=[ StructuredPropertyValueAssignmentClass( propertyUrn=f"urn:li:structuredProperty:{prop_key}", - values=prop_value - if isinstance(prop_value, list) - else [prop_value], + values=( + prop_value + if isinstance(prop_value, list) + else [prop_value] + ), ) for prop_key, prop_value in self.structured_properties.items() ] @@ -501,25 +509,29 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": return Dataset( # type: ignore[call-arg] urn=urn, - description=dataset_properties.description - if dataset_properties and dataset_properties.description - else None, - name=dataset_properties.name - if dataset_properties and dataset_properties.name - else None, + description=( + dataset_properties.description + if dataset_properties and dataset_properties.description + else None + ), + name=( + dataset_properties.name + if dataset_properties and dataset_properties.name + else None + ), schema=Dataset._schema_from_schema_metadata(graph, urn), tags=[tag.tag for tag in tags.tags] if tags else None, - glossary_terms=[term.urn for term in glossary_terms.terms] - if glossary_terms - else None, + glossary_terms=( + [term.urn for term in glossary_terms.terms] if glossary_terms else None + ), owners=yaml_owners, - properties=dataset_properties.customProperties - if dataset_properties - else None, + properties=( + dataset_properties.customProperties if dataset_properties else None + ), subtypes=[subtype for subtype in subtypes.typeNames] if subtypes else None, - structured_properties=structured_properties_map - if structured_properties - else None, + structured_properties=( + structured_properties_map if structured_properties else None + ), ) def to_yaml( diff --git a/metadata-ingestion/src/datahub/api/entities/forms/forms.py b/metadata-ingestion/src/datahub/api/entities/forms/forms.py index 8fb7ea0bf11ed..9188ea33d6c68 100644 --- a/metadata-ingestion/src/datahub/api/entities/forms/forms.py +++ b/metadata-ingestion/src/datahub/api/entities/forms/forms.py @@ -197,11 +197,13 @@ def validate_prompts(self, emitter: DataHubGraph) -> List[FormPromptClass]: title=prompt.title, description=prompt.description, type=prompt.type, - structuredPropertyParams=StructuredPropertyParamsClass( - urn=prompt.structured_property_urn - ) - if prompt.structured_property_urn - else None, + structuredPropertyParams=( + StructuredPropertyParamsClass( + urn=prompt.structured_property_urn + ) + if prompt.structured_property_urn + else None + ), required=prompt.required, ) ) @@ -339,9 +341,11 @@ def from_datahub(graph: DataHubGraph, urn: str) -> "Forms": title=prompt_raw.title, description=prompt_raw.description, type=prompt_raw.type, - structured_property_urn=prompt_raw.structuredPropertyParams.urn - if prompt_raw.structuredPropertyParams - else None, + structured_property_urn=( + prompt_raw.structuredPropertyParams.urn + if prompt_raw.structuredPropertyParams + else None + ), ) ) return Forms( diff --git a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py index ed97948de9034..44fd32d5a426b 100644 --- a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py +++ b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py @@ -126,19 +126,23 @@ def create(file: str) -> None: ], cardinality=structuredproperty.cardinality, immutable=structuredproperty.immutable, - allowedValues=[ - PropertyValueClass( - value=v.value, description=v.description - ) - for v in structuredproperty.allowed_values - ] - if structuredproperty.allowed_values - else None, - typeQualifier={ - "allowedTypes": structuredproperty.type_qualifier.allowed_types - } - if structuredproperty.type_qualifier - else None, + allowedValues=( + [ + PropertyValueClass( + value=v.value, description=v.description + ) + for v in structuredproperty.allowed_values + ] + if structuredproperty.allowed_values + else None + ), + typeQualifier=( + { + "allowedTypes": structuredproperty.type_qualifier.allowed_types + } + if structuredproperty.type_qualifier + else None + ), ), ) emitter.emit_mcp(mcp) @@ -160,20 +164,22 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties": description=structured_property.description, entity_types=structured_property.entityTypes, cardinality=structured_property.cardinality, - allowed_values=[ - AllowedValue( - value=av.value, - description=av.description, - ) - for av in structured_property.allowedValues or [] - ] - if structured_property.allowedValues is not None - else None, - type_qualifier={ - "allowed_types": structured_property.typeQualifier.get("allowedTypes") - } - if structured_property.typeQualifier - else None, + allowed_values=( + [ + AllowedValue( + value=av.value, + description=av.description, + ) + for av in structured_property.allowedValues or [] + ] + if structured_property.allowedValues is not None + else None + ), + type_qualifier=( + {"allowed_types": structured_property.typeQualifier.get("allowedTypes")} + if structured_property.typeQualifier + else None + ), ) def to_yaml( diff --git a/metadata-ingestion/src/datahub/api/graphql/base.py b/metadata-ingestion/src/datahub/api/graphql/base.py index 3654bd3881699..c1ea6b71a6d14 100644 --- a/metadata-ingestion/src/datahub/api/graphql/base.py +++ b/metadata-ingestion/src/datahub/api/graphql/base.py @@ -23,9 +23,11 @@ def __init__( # Select your transport with a defined url endpoint self.transport = RequestsHTTPTransport( url=datahub_host + "/api/graphql", - headers={"Authorization": "Bearer " + datahub_token} - if datahub_token is not None - else None, + headers=( + {"Authorization": "Bearer " + datahub_token} + if datahub_token is not None + else None + ), method="POST", timeout=timeout, ) diff --git a/metadata-ingestion/src/datahub/cli/docker_cli.py b/metadata-ingestion/src/datahub/cli/docker_cli.py index 9189a881f9ce7..971d4e6e72aa1 100644 --- a/metadata-ingestion/src/datahub/cli/docker_cli.py +++ b/metadata-ingestion/src/datahub/cli/docker_cli.py @@ -240,9 +240,11 @@ def _attempt_stop(quickstart_compose_file: List[pathlib.Path]) -> None: compose_files_for_stopping = ( quickstart_compose_file if quickstart_compose_file - else [pathlib.Path(default_quickstart_compose_file)] - if default_quickstart_compose_file - else None + else ( + [pathlib.Path(default_quickstart_compose_file)] + if default_quickstart_compose_file + else None + ) ) if compose_files_for_stopping: # docker-compose stop @@ -868,10 +870,10 @@ def download_compose_files( # also allow local files request_session = requests.Session() request_session.mount("file://", FileAdapter()) - with open( - quickstart_compose_file_name, "wb" - ) if quickstart_compose_file_name else tempfile.NamedTemporaryFile( - suffix=".yml", delete=False + with ( + open(quickstart_compose_file_name, "wb") + if quickstart_compose_file_name + else tempfile.NamedTemporaryFile(suffix=".yml", delete=False) ) as tmp_file: path = pathlib.Path(tmp_file.name) quickstart_compose_file_list.append(path) @@ -892,10 +894,10 @@ def download_compose_files( default_consumer_compose_file = ( Path(DATAHUB_ROOT_FOLDER) / "quickstart/docker-compose.consumers.yml" ) - with open( - default_consumer_compose_file, "wb" - ) if default_consumer_compose_file else tempfile.NamedTemporaryFile( - suffix=".yml", delete=False + with ( + open(default_consumer_compose_file, "wb") + if default_consumer_compose_file + else tempfile.NamedTemporaryFile(suffix=".yml", delete=False) ) as tmp_file: path = pathlib.Path(tmp_file.name) quickstart_compose_file_list.append(path) @@ -914,10 +916,10 @@ def download_compose_files( default_kafka_compose_file = ( Path(DATAHUB_ROOT_FOLDER) / "quickstart/docker-compose.kafka-setup.yml" ) - with open( - default_kafka_compose_file, "wb" - ) if default_kafka_compose_file else tempfile.NamedTemporaryFile( - suffix=".yml", delete=False + with ( + open(default_kafka_compose_file, "wb") + if default_kafka_compose_file + else tempfile.NamedTemporaryFile(suffix=".yml", delete=False) ) as tmp_file: path = pathlib.Path(tmp_file.name) quickstart_compose_file_list.append(path) diff --git a/metadata-ingestion/src/datahub/cli/lite_cli.py b/metadata-ingestion/src/datahub/cli/lite_cli.py index 841c2f27528b7..957ee16245dd8 100644 --- a/metadata-ingestion/src/datahub/cli/lite_cli.py +++ b/metadata-ingestion/src/datahub/cli/lite_cli.py @@ -84,10 +84,14 @@ def shell_complete(self, ctx, param, incomplete): try: completions = lite.ls(path) return [ - CompletionItem(browseable.auto_complete.suggested_path, type="plain") - if browseable.auto_complete - else CompletionItem( - f"{incomplete}/{browseable.name}".replace("//", "/") + ( + CompletionItem( + browseable.auto_complete.suggested_path, type="plain" + ) + if browseable.auto_complete + else CompletionItem( + f"{incomplete}/{browseable.name}".replace("//", "/") + ) ) for browseable in completions if not browseable.leaf @@ -240,12 +244,16 @@ def ls(path: Optional[str]) -> None: for browseable in [b for b in browseables if b.auto_complete is None]: click.secho( browseable.name, - fg="white" - if browseable.leaf - else "green" - if browseable.id.startswith("urn:") - and not browseable.id.startswith("urn:li:systemNode") - else "cyan", + fg=( + "white" + if browseable.leaf + else ( + "green" + if browseable.id.startswith("urn:") + and not browseable.id.startswith("urn:li:systemNode") + else "cyan" + ) + ), ) except PathNotFoundException: click.echo(f"Path not found: {path}") diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index a7578e39374ac..c21361eb256c1 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -204,9 +204,9 @@ def gen_containers( externalUrl=external_url, qualifiedName=qualified_name, created=TimeStamp(time=created) if created is not None else None, - lastModified=TimeStamp(time=last_modified) - if last_modified is not None - else None, + lastModified=( + TimeStamp(time=last_modified) if last_modified is not None else None + ), ), ).as_workunit() @@ -220,9 +220,11 @@ def gen_containers( entityUrn=f"{container_urn}", aspect=DataPlatformInstance( platform=f"{make_data_platform_urn(container_key.platform)}", - instance=f"{make_dataplatform_instance_urn(container_key.platform, container_key.instance)}" - if container_key.instance - else None, + instance=( + f"{make_dataplatform_instance_urn(container_key.platform, container_key.instance)}" + if container_key.instance + else None + ), ), ).as_workunit() diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 234d7e5e255d7..c783d9a35814b 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -1740,9 +1740,9 @@ def report_assertion_result( "type": type, "properties": properties, "externalUrl": external_url, - "error": {"type": error_type, "message": error_message} - if error_type - else None, + "error": ( + {"type": error_type, "message": error_message} if error_type else None + ), } res = self.execute_graphql( diff --git a/metadata-ingestion/src/datahub/ingestion/graph/filters.py b/metadata-ingestion/src/datahub/ingestion/graph/filters.py index edb45fa5c2dbc..588090ec56727 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/filters.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/filters.py @@ -113,7 +113,7 @@ def _get_env_filters(env: str) -> List[SearchFilterRule]: { "field": "env", "value": env, - } + }, # Note that not all entity types have an env (e.g. dashboards / charts). # If the env filter is specified, these will be excluded. ] diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 58fb1cb1eb2c2..3b9b5dbf63e18 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -1351,11 +1351,13 @@ def _get_delta_schema_metadata() -> Optional[SchemaMetadata]: def get_data_platform_instance() -> DataPlatformInstanceClass: return DataPlatformInstanceClass( platform=make_data_platform_urn(self.platform), - instance=make_dataplatform_instance_urn( - self.platform, self.source_config.platform_instance - ) - if self.source_config.platform_instance - else None, + instance=( + make_dataplatform_instance_urn( + self.platform, self.source_config.platform_instance + ) + if self.source_config.platform_instance + else None + ), ) @lru_cache(maxsize=None) diff --git a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py index f40e6504f1188..09ce8b5b05203 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py @@ -260,9 +260,9 @@ def _get_schema_and_fields( self.report.warning( title="Failed to get subject schema from schema registry", message=f"Failed to get {kafka_entity} {schema_type_str or ''} schema from schema registry", - context=f"{topic}: {topic_subject}" - if not is_subject - else topic_subject, + context=( + f"{topic}: {topic_subject}" if not is_subject else topic_subject + ), exc=e, ) else: @@ -320,9 +320,11 @@ def _get_schema_fields( fields = schema_util.avro_schema_to_mce_fields( avro_schema, is_key_schema=is_key_schema, - meta_mapping_processor=self.field_meta_processor - if self.source_config.enable_meta_mapping - else None, + meta_mapping_processor=( + self.field_meta_processor + if self.source_config.enable_meta_mapping + else None + ), schema_tags_field=self.source_config.schema_tags_field, tag_prefix=self.source_config.tag_prefix, ) @@ -334,9 +336,11 @@ def _get_schema_fields( base_name: str = topic.replace(".", "_") fields = protobuf_util.protobuf_schema_to_mce_fields( ProtobufSchema( - f"{base_name}-key.proto" - if is_key_schema - else f"{base_name}-value.proto", + ( + f"{base_name}-key.proto" + if is_key_schema + else f"{base_name}-value.proto" + ), schema.schema_str, ), imported_schemas, diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py index bc19940afdd1e..10dd9e9e7e029 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py @@ -396,9 +396,11 @@ def get_partition_from_path(self, path: str) -> Optional[List[Tuple[str, str]]]: partition_keys.append( ( named_vars.named["partition_key"][key], - named_vars.named["partition_value"][key] - if "partition_value" in named_vars.named - else named_vars.named["partition"][key], + ( + named_vars.named["partition_value"][key] + if "partition_value" in named_vars.named + else named_vars.named["partition"][key] + ), ) ) return partition_keys diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_tests.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_tests.py index 5770ce712d628..dc4e5d426fe42 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_tests.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_tests.py @@ -176,20 +176,27 @@ def make_assertion_from_test( dataset=upstream_urn, scope=assertion_params.scope, operator=assertion_params.operator, - fields=[mce_builder.make_schema_field_urn(upstream_urn, column_name)] - if ( - assertion_params.scope == DatasetAssertionScopeClass.DATASET_COLUMN - and column_name - ) - else [], + fields=( + [mce_builder.make_schema_field_urn(upstream_urn, column_name)] + if ( + assertion_params.scope + == DatasetAssertionScopeClass.DATASET_COLUMN + and column_name + ) + else [] + ), nativeType=node.name, aggregation=assertion_params.aggregation, - parameters=assertion_params.parameters(kw_args) - if assertion_params.parameters - else None, - logic=assertion_params.logic_fn(kw_args) - if assertion_params.logic_fn - else None, + parameters=( + assertion_params.parameters(kw_args) + if assertion_params.parameters + else None + ), + logic=( + assertion_params.logic_fn(kw_args) + if assertion_params.logic_fn + else None + ), nativeParameters=_string_map(kw_args), ), ) @@ -244,10 +251,12 @@ def make_assertion_result_from_test( asserteeUrn=upstream_urn, runId=test_result.invocation_id, result=AssertionResultClass( - type=AssertionResultTypeClass.SUCCESS - if test_result.status == "pass" - or (not test_warnings_are_errors and test_result.status == "warn") - else AssertionResultTypeClass.FAILURE, + type=( + AssertionResultTypeClass.SUCCESS + if test_result.status == "pass" + or (not test_warnings_are_errors and test_result.status == "warn") + else AssertionResultTypeClass.FAILURE + ), nativeResults=test_result.native_results, ), status=AssertionRunStatusClass.COMPLETE, diff --git a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py index 653b80c116adf..a5a195e05a635 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py +++ b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py @@ -333,7 +333,6 @@ def http_auth(self) -> Optional[Tuple[str, str]]: @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") class ElasticsearchSource(Source): - """ This plugin extracts the following: @@ -479,11 +478,15 @@ def _extract_mcps( entityUrn=dataset_urn, aspect=SubTypesClass( typeNames=[ - DatasetSubTypes.ELASTIC_INDEX_TEMPLATE - if not is_index - else DatasetSubTypes.ELASTIC_INDEX - if not data_stream - else DatasetSubTypes.ELASTIC_DATASTREAM + ( + DatasetSubTypes.ELASTIC_INDEX_TEMPLATE + if not is_index + else ( + DatasetSubTypes.ELASTIC_INDEX + if not data_stream + else DatasetSubTypes.ELASTIC_DATASTREAM + ) + ) ] ), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index b459b47deb153..704a6f20a5c19 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -111,9 +111,11 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: for table_lineage in connector.table_lineage: input_dataset_urn = DatasetUrn.create_from_ids( platform_id=source_platform, - table_name=f"{source_database.lower()}.{table_lineage.source_table}" - if source_database - else table_lineage.source_table, + table_name=( + f"{source_database.lower()}.{table_lineage.source_table}" + if source_database + else table_lineage.source_table + ), env=source_platform_detail.env, platform_instance=source_platform_detail.platform_instance, ) @@ -132,23 +134,27 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: fine_grained_lineage.append( FineGrainedLineage( upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, - upstreams=[ - builder.make_schema_field_urn( - str(input_dataset_urn), - column_lineage.source_column, - ) - ] - if input_dataset_urn - else [], + upstreams=( + [ + builder.make_schema_field_urn( + str(input_dataset_urn), + column_lineage.source_column, + ) + ] + if input_dataset_urn + else [] + ), downstreamType=FineGrainedLineageDownstreamType.FIELD, - downstreams=[ - builder.make_schema_field_urn( - str(output_dataset_urn), - column_lineage.destination_column, - ) - ] - if output_dataset_urn - else [], + downstreams=( + [ + builder.make_schema_field_urn( + str(output_dataset_urn), + column_lineage.destination_column, + ) + ] + if output_dataset_urn + else [] + ), ) ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py b/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py index be051a74ed9c7..18838af9bdf85 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py @@ -118,9 +118,11 @@ def create_equivalent_s3_path_specs(self): s3_path_specs.append( PathSpec( include=path_spec.include.replace("gs://", "s3://"), - exclude=[exc.replace("gs://", "s3://") for exc in path_spec.exclude] - if path_spec.exclude - else None, + exclude=( + [exc.replace("gs://", "s3://") for exc in path_spec.exclude] + if path_spec.exclude + else None + ), file_types=path_spec.file_types, default_extension=path_spec.default_extension, table_name=path_spec.table_name, diff --git a/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py b/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py index 49b6422902299..dda81b0e34a8d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py +++ b/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py @@ -664,9 +664,9 @@ def _map_okta_user_profile(self, profile: UserProfile) -> CorpUserInfoClass: full_name = f"{profile.firstName} {profile.lastName}" return CorpUserInfoClass( active=True, - displayName=profile.displayName - if profile.displayName is not None - else full_name, + displayName=( + profile.displayName if profile.displayName is not None else full_name + ), firstName=profile.firstName, lastName=profile.lastName, fullName=full_name, diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index 266f9f6db5762..0b201278142e3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -402,11 +402,13 @@ def get_table_names(self) -> List[Tuple]: # List of Tuple containing (schema, table) tables: List[Tuple] = [ ( - unquote( - table_id.split(sep)[-2], leading_quote_char, trailing_quote_char - ) - if len(table_id.split(sep)) > 1 - else "", + ( + unquote( + table_id.split(sep)[-2], leading_quote_char, trailing_quote_char + ) + if len(table_id.split(sep)) > 1 + else "" + ), unquote( table_id.split(sep)[-1], leading_quote_char, trailing_quote_char ), @@ -593,9 +595,11 @@ def get_parser( source_platform="mongodb", database_name=connector_manifest.config.get("database"), topic_prefix=connector_manifest.config.get("topic_prefix"), - transforms=connector_manifest.config["transforms"].split(",") - if "transforms" in connector_manifest.config - else [], + transforms=( + connector_manifest.config["transforms"].split(",") + if "transforms" in connector_manifest.config + else [] + ), ) return parser diff --git a/metadata-ingestion/src/datahub/ingestion/source/ldap.py b/metadata-ingestion/src/datahub/ingestion/source/ldap.py index 9c7fba68f263b..236e91a86700c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ldap.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ldap.py @@ -1,4 +1,5 @@ """LDAP Source""" + import contextlib import dataclasses from typing import Any, Dict, Iterable, List, Optional diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_query_model.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_query_model.py index 7ed46c8f7084c..72898921c3683 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_query_model.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_query_model.py @@ -68,7 +68,9 @@ def to_write_query(self) -> WriteQuery: model=cast(str, self.model.value), # the cast is jut to silent the lint view=cast(str, self.explore.value), fields=[cast(str, field.value) for field in self.fields], - filters={filter_.value: self.filters[filter_] for filter_ in self.filters} - if self.filters is not None - else {}, + filters=( + {filter_.value: self.filters[filter_] for filter_ in self.filters} + if self.filters is not None + else {} + ), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py index c97025d75229b..93af0effa9f1f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py @@ -74,16 +74,20 @@ def from_dashboard(cls, dashboard: Dashboard) -> "LookerDashboardForUsage": id=dashboard.id, view_count=dashboard.view_count, favorite_count=dashboard.favorite_count, - last_viewed_at=round(dashboard.last_viewed_at.timestamp() * 1000) - if dashboard.last_viewed_at - else None, - looks=[ - LookerChartForUsage.from_chart(e.look) - for e in dashboard.dashboard_elements - if e.look is not None - ] - if dashboard.dashboard_elements - else [], + last_viewed_at=( + round(dashboard.last_viewed_at.timestamp() * 1000) + if dashboard.last_viewed_at + else None + ), + looks=( + [ + LookerChartForUsage.from_chart(e.look) + for e in dashboard.dashboard_elements + if e.look is not None + ] + if dashboard.dashboard_elements + else [] + ), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/str_functions.py b/metadata-ingestion/src/datahub/ingestion/source/looker/str_functions.py index 5426d2b8ab952..6f5c1248e3ef7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/str_functions.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/str_functions.py @@ -1,6 +1,7 @@ """ Here write down functions which are operating on string. Like replacing some character and so on """ + import re diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 0917a9e9faafe..de1022b5482ce 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -328,9 +328,11 @@ def create_fields(self) -> List[ViewField]: ViewField( name=cll.downstream.column, label="", - type=cll.downstream.native_column_type - if cll.downstream.native_column_type is not None - else "unknown", + type=( + cll.downstream.native_column_type + if cll.downstream.native_column_type is not None + else "unknown" + ), description="", field_type=ViewFieldType.UNKNOWN, upstream_fields=_drop_hive_dot_from_upstream(cll.upstreams), diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py index 6e8d939325d5b..5106b9817d351 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py @@ -281,9 +281,11 @@ def new_powerbi_dataset(workspace_id: str, raw_instance: dict) -> PowerBIDataset id=raw_instance["id"], name=raw_instance.get("name"), description=raw_instance.get("description", ""), - webUrl="{}/details".format(raw_instance.get("webUrl")) - if raw_instance.get("webUrl") is not None - else None, + webUrl=( + "{}/details".format(raw_instance.get("webUrl")) + if raw_instance.get("webUrl") is not None + else None + ), workspace_id=workspace_id, parameters={}, tables=[], diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py index d6c7076d49507..8854f9ff48348 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py @@ -315,11 +315,11 @@ def custom_properties( "createdDate": str(report.created_date), "modifiedBy": report.modified_by or "", "modifiedDate": str(report.modified_date) or str(report.created_date), - "dataSource": str( - [report.connection_string for report in _report.data_sources] - ) - if _report.data_sources - else "", + "dataSource": ( + str([report.connection_string for report in _report.data_sources]) + if _report.data_sources + else "" + ), } # DashboardInfo mcp diff --git a/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_sense.py b/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_sense.py index b6c48dd3c488e..8d6eaa4bf1047 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_sense.py +++ b/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_sense.py @@ -195,9 +195,11 @@ def _gen_space_workunit(self, space: Space) -> Iterable[MetadataWorkUnit]: description=space.description, sub_types=[BIContainerSubTypes.QLIK_SPACE], extra_properties={Constant.TYPE: str(space.type)}, - owner_urn=builder.make_user_urn(owner_username) - if self.config.ingest_owner and owner_username - else None, + owner_urn=( + builder.make_user_urn(owner_username) + if self.config.ingest_owner and owner_username + else None + ), external_url=f"https://{self.config.tenant_hostname}/catalog?space_filter={space.id}", created=int(space.createdAt.timestamp() * 1000), last_modified=int(space.updatedAt.timestamp() * 1000), @@ -458,9 +460,11 @@ def _gen_app_workunit(self, app: App) -> Iterable[MetadataWorkUnit]: sub_types=[BIContainerSubTypes.QLIK_APP], parent_container_key=self._gen_space_key(app.spaceId), extra_properties={Constant.QRI: app.qri, Constant.USAGE: app.qUsage}, - owner_urn=builder.make_user_urn(owner_username) - if self.config.ingest_owner and owner_username - else None, + owner_urn=( + builder.make_user_urn(owner_username) + if self.config.ingest_owner and owner_username + else None + ), external_url=f"https://{self.config.tenant_hostname}/sense/app/{app.id}/overview", created=int(app.createdAt.timestamp() * 1000), last_modified=int(app.updatedAt.timestamp() * 1000), @@ -500,9 +504,11 @@ def _gen_schema_fields( schema_field = SchemaField( fieldPath=field.name, type=SchemaFieldDataTypeClass( - type=FIELD_TYPE_MAPPING.get(field.dataType, NullType)() - if field.dataType - else NullType() + type=( + FIELD_TYPE_MAPPING.get(field.dataType, NullType)() + if field.dataType + else NullType() + ) ), nativeDataType=field.dataType if field.dataType else "", nullable=field.nullable, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py index a37aec675cdfa..e0bf8b23dd0f7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py @@ -272,18 +272,22 @@ def _gen_access_events_from_history_query( userid=row[field_names.index("userid")], username=row[field_names.index("username")], query=row[field_names.index("query")], - querytxt=row[field_names.index("querytxt")].strip() - if row[field_names.index("querytxt")] - else None, + querytxt=( + row[field_names.index("querytxt")].strip() + if row[field_names.index("querytxt")] + else None + ), tbl=row[field_names.index("tbl")], database=row[field_names.index("database")], schema=row[field_names.index("schema")], table=row[field_names.index("table")], starttime=row[field_names.index("starttime")], endtime=row[field_names.index("endtime")], - operation_type=row[field_names.index("operation_type")] - if "operation_type" in field_names - else None, + operation_type=( + row[field_names.index("operation_type")] + if "operation_type" in field_names + else None + ), ) except pydantic.error_wrappers.ValidationError as e: logging.warning( diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index ef5ed3c6304c9..e8c70260ebc7c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -464,9 +464,11 @@ def add_partition_columns_to_schema( for partition_key in partition_keys: fields.append( SchemaField( - fieldPath=f"{partition_key[0]}" - if not is_fieldpath_v2 - else f"[version=2.0].[type=string].{partition_key[0]}", + fieldPath=( + f"{partition_key[0]}" + if not is_fieldpath_v2 + else f"[version=2.0].[type=string].{partition_key[0]}" + ), nativeDataType="string", type=SchemaFieldDataTypeClass(StringTypeClass()), isPartitioningKey=True, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py index 88cb1f821ff0d..8309c469f67c5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py @@ -333,15 +333,19 @@ def get_resource_workunits( lastModified=ChangeAuditStampsClass( created=AuditStampClass( time=round(resource.created_time.timestamp() * 1000), - actor=make_user_urn(resource.created_by) - if resource.created_by - else "urn:li:corpuser:unknown", + actor=( + make_user_urn(resource.created_by) + if resource.created_by + else "urn:li:corpuser:unknown" + ), ), lastModified=AuditStampClass( time=round(resource.modified_time.timestamp() * 1000), - actor=make_user_urn(resource.modified_by) - if resource.modified_by - else "urn:li:corpuser:unknown", + actor=( + make_user_urn(resource.modified_by) + if resource.modified_by + else "urn:li:corpuser:unknown" + ), ), ), customProperties={ diff --git a/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma.py b/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma.py index 5db5e543510db..dd4b65a2cbdf2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma.py @@ -175,9 +175,11 @@ def _gen_workspace_workunit( container_key=self._gen_workspace_key(workspace.workspaceId), name=workspace.name, sub_types=[BIContainerSubTypes.SIGMA_WORKSPACE], - owner_urn=builder.make_user_urn(owner_username) - if self.config.ingest_owner and owner_username - else None, + owner_urn=( + builder.make_user_urn(owner_username) + if self.config.ingest_owner and owner_username + else None + ), created=int(workspace.createdAt.timestamp() * 1000), last_modified=int(workspace.updatedAt.timestamp() * 1000), ) @@ -534,16 +536,20 @@ def _gen_workbook_workunit(self, workbook: Workbook) -> Iterable[MetadataWorkUni container_key=workbook_key, name=workbook.name, sub_types=[BIContainerSubTypes.SIGMA_WORKBOOK], - parent_container_key=self._gen_workspace_key(workbook.workspaceId) - if workbook.workspaceId - else None, + parent_container_key=( + self._gen_workspace_key(workbook.workspaceId) + if workbook.workspaceId + else None + ), extra_properties={ "path": workbook.path, "latestVersion": str(workbook.latestVersion), }, - owner_urn=builder.make_user_urn(owner_username) - if self.config.ingest_owner and owner_username - else None, + owner_urn=( + builder.make_user_urn(owner_username) + if self.config.ingest_owner and owner_username + else None + ), external_url=workbook.url, tags=[workbook.badge] if workbook.badge else None, created=int(workbook.createdAt.timestamp() * 1000), diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py index e981ed3e2e665..d39e95a884dbc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py @@ -237,9 +237,11 @@ def get_connect_args(self) -> dict: p_key = serialization.load_pem_private_key( pkey_bytes, - password=self.private_key_password.get_secret_value().encode() - if self.private_key_password is not None - else None, + password=( + self.private_key_password.get_secret_value().encode() + if self.private_key_password is not None + else None + ), backend=default_backend(), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py b/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py index b2c40f914bddc..aeb21e88d0443 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py @@ -229,9 +229,11 @@ def _get_all_table_comments_and_properties(self, connection, **kw): for table in result: all_table_comments[(table.database, table.table_name)] = { "text": table.comment, - "properties": {k: str(v) for k, v in json.loads(table.properties).items()} - if table.properties - else {}, + "properties": ( + {k: str(v) for k, v in json.loads(table.properties).items()} + if table.properties + else {} + ), } return all_table_comments diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py index 8b517747307f8..21e7fad334331 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py @@ -164,9 +164,11 @@ def urn(self) -> str: flow_id=self.entity.flow.formatted_name, job_id=self.entity.formatted_name, cluster=self.entity.flow.cluster, - platform_instance=self.entity.flow.platform_instance - if self.entity.flow.platform_instance - else None, + platform_instance=( + self.entity.flow.platform_instance + if self.entity.flow.platform_instance + else None + ), ) def add_property( @@ -223,9 +225,9 @@ def urn(self) -> str: orchestrator=self.entity.orchestrator, flow_id=self.entity.formatted_name, cluster=self.entity.cluster, - platform_instance=self.entity.platform_instance - if self.entity.platform_instance - else None, + platform_instance=( + self.entity.platform_instance if self.entity.platform_instance else None + ), ) @property diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 45c80ce1cdce2..4352ab2f987e9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -1068,11 +1068,13 @@ def get_schema_fields_for_column( field = SchemaField( fieldPath=column["name"], type=get_column_type(self.report, dataset_name, column["type"]), - nativeDataType=full_type - if full_type is not None - else get_native_data_type_for_sqlalchemy_type( - column["type"], - inspector=inspector, + nativeDataType=( + full_type + if full_type is not None + else get_native_data_type_for_sqlalchemy_type( + column["type"], + inspector=inspector, + ) ), description=column.get("comment", None), nullable=column["nullable"], diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/analyze_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/unity/analyze_profiler.py index 4c8b22f2399b2..995690be790c4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/analyze_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/analyze_profiler.py @@ -82,13 +82,15 @@ def gen_dataset_profile_workunit( rowCount=row_count, columnCount=table_profile.num_columns, sizeInBytes=table_profile.total_size, - fieldProfiles=[ - self._gen_dataset_field_profile(row_count, column_profile) - for column_profile in table_profile.column_profiles - if column_profile # Drop column profiles with no data - ] - if self.config.include_columns - else None, + fieldProfiles=( + [ + self._gen_dataset_field_profile(row_count, column_profile) + for column_profile in table_profile.column_profiles + if column_profile # Drop column profiles with no data + ] + if self.config.include_columns + else None + ), ) return MetadataChangeProposalWrapper( entityUrn=self.dataset_urn_builder(ref), diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py index c66189d99f738..f84f6c1b0c08d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py @@ -151,9 +151,11 @@ class TableReference: @classmethod def create(cls, table: "Table") -> "TableReference": return cls( - table.schema.catalog.metastore.id - if table.schema.catalog.metastore - else None, + ( + table.schema.catalog.metastore.id + if table.schema.catalog.metastore + else None + ), table.schema.catalog.name, table.schema.name, table.name, diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py index f07e7a92d8762..5eec2ca587ead 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py @@ -128,9 +128,9 @@ def _generate_operation_workunit( operation_aspect = OperationClass( timestampMillis=int(time.time() * 1000), lastUpdatedTimestamp=int(query.end_time.timestamp() * 1000), - actor=self.user_urn_builder(query.user_name) - if query.user_name - else None, + actor=( + self.user_urn_builder(query.user_name) if query.user_name else None + ), operationType=OPERATION_STATEMENT_TYPES[query.statement_type], affectedDatasets=[ self.table_urn_builder(table) for table in table_info.source_tables diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py index c474e423030e0..4045917eb830e 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py @@ -144,11 +144,11 @@ class PatternAddDatasetDataProduct(AddDatasetDataProduct): def __init__(self, config: PatternDatasetDataProductConfig, ctx: PipelineContext): dataset_to_data_product = config.dataset_to_data_product_urns_pattern generic_config = AddDatasetDataProductConfig( - get_data_product_to_add=lambda dataset_urn: dataset_to_data_product.value( - dataset_urn - )[0] - if dataset_to_data_product.value(dataset_urn) - else None, + get_data_product_to_add=lambda dataset_urn: ( + dataset_to_data_product.value(dataset_urn)[0] + if dataset_to_data_product.value(dataset_urn) + else None + ), is_container=config.is_container, ) super().__init__(generic_config, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_terms.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_terms.py index 047252a5eeff0..a7e92d4bd7edb 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_terms.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_terms.py @@ -82,10 +82,12 @@ def extend_field( new_glossary_term = GlossaryTermsClass( terms=[], - auditStamp=schema_field.glossaryTerms.auditStamp - if schema_field.glossaryTerms is not None - else AuditStampClass( - time=builder.get_sys_time(), actor="urn:li:corpUser:restEmitter" + auditStamp=( + schema_field.glossaryTerms.auditStamp + if schema_field.glossaryTerms is not None + else AuditStampClass( + time=builder.get_sys_time(), actor="urn:li:corpUser:restEmitter" + ) ), ) new_glossary_term.terms.extend(unique_gloseary_terms) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_terms.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_terms.py index f21e3ec319349..3daf52e32ed4b 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_terms.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_terms.py @@ -73,10 +73,12 @@ def transform_aspect( ) out_glossary_terms: GlossaryTermsClass = GlossaryTermsClass( terms=[], - auditStamp=in_glossary_terms.auditStamp - if in_glossary_terms is not None - else AuditStampClass( - time=builder.get_sys_time(), actor="urn:li:corpUser:restEmitter" + auditStamp=( + in_glossary_terms.auditStamp + if in_glossary_terms is not None + else AuditStampClass( + time=builder.get_sys_time(), actor="urn:li:corpUser:restEmitter" + ) ), ) # Check if user want to keep existing terms diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index fb776ca8d2328..0a59380531ad3 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -257,10 +257,12 @@ def transform( transformed_aspect = self.transform_aspect( entity_urn=urn, aspect_name=self.aspect_name(), - aspect=last_seen_mcp.aspect - if last_seen_mcp - and last_seen_mcp.aspectName == self.aspect_name() - else None, + aspect=( + last_seen_mcp.aspect + if last_seen_mcp + and last_seen_mcp.aspectName == self.aspect_name() + else None + ), ) if transformed_aspect: structured_urn = Urn.from_string(urn) @@ -269,9 +271,11 @@ def transform( MetadataChangeProposalWrapper( entityUrn=urn, entityType=structured_urn.get_type(), - systemMetadata=last_seen_mcp.systemMetadata - if last_seen_mcp - else last_seen_mce_system_metadata, + systemMetadata=( + last_seen_mcp.systemMetadata + if last_seen_mcp + else last_seen_mce_system_metadata + ), aspectName=self.aspect_name(), aspect=transformed_aspect, ) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py index 27311ff998cbf..245a3aa3d9db1 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py @@ -90,9 +90,11 @@ def convert_tag_as_per_mapping(self, tag: str) -> str: tag = tag[:index] + new_char + tag[index + len(old_char) :] # Adjust indices for overlapping replacements indices = [ - each + (len(new_char) - len(old_char)) - if each > index - else each + ( + each + (len(new_char) - len(old_char)) + if each > index + else each + ) for each in indices ] indices.append(index) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/generic_aspect_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/generic_aspect_transformer.py index 4dc5f12005e49..3c0bc00633e7c 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/generic_aspect_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/generic_aspect_transformer.py @@ -38,7 +38,8 @@ def transform_generic_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[GenericAspectClass] ) -> Optional[GenericAspectClass]: """Implement this method to transform the single custom aspect for an entity. - The purpose of this abstract method is to reinforce the use of GenericAspectClass.""" + The purpose of this abstract method is to reinforce the use of GenericAspectClass. + """ pass def _transform_or_record_mcpc( @@ -114,9 +115,11 @@ def transform( changeType="UPSERT", aspectName=self.aspect_name(), aspect=transformed_aspect, - systemMetadata=last_seen_mcp.systemMetadata - if last_seen_mcp - else last_seen_mce_system_metadata, + systemMetadata=( + last_seen_mcp.systemMetadata + if last_seen_mcp + else last_seen_mce_system_metadata + ), ), metadata=record_metadata, ) diff --git a/metadata-ingestion/src/datahub/upgrade/upgrade.py b/metadata-ingestion/src/datahub/upgrade/upgrade.py index 9e2639abdca41..d940dfd78a82e 100644 --- a/metadata-ingestion/src/datahub/upgrade/upgrade.py +++ b/metadata-ingestion/src/datahub/upgrade/upgrade.py @@ -196,11 +196,11 @@ async def retrieve_version_stats( current=VersionStats( version=current_server_version, release_date=current_server_release_date ), - latest=VersionStats( - version=last_server_version, release_date=last_server_date - ) - if last_server_version - else None, + latest=( + VersionStats(version=last_server_version, release_date=last_server_date) + if last_server_version + else None + ), current_server_type=current_server_type, ) diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index 6cc7d9c50b775..4ea42d568da63 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -292,9 +292,11 @@ def convert_to_aspects(self, operation_map: Dict[str, list]) -> Dict[str, Any]: owner=x.get("urn"), type=x.get("category"), typeUrn=x.get("categoryUrn"), - source=OwnershipSourceClass(type=self.owner_source_type) - if self.owner_source_type - else None, + source=( + OwnershipSourceClass(type=self.owner_source_type) + if self.owner_source_type + else None + ), ) for x in sorted( operation_map[Constants.ADD_OWNER_OPERATION], diff --git a/metadata-ingestion/src/datahub/utilities/ratelimiter.py b/metadata-ingestion/src/datahub/utilities/ratelimiter.py index 3d47d25e14c49..c32041a63417c 100644 --- a/metadata-ingestion/src/datahub/utilities/ratelimiter.py +++ b/metadata-ingestion/src/datahub/utilities/ratelimiter.py @@ -7,7 +7,6 @@ # Modified version of https://github.com/RazerM/ratelimiter/blob/master/ratelimiter/_sync.py class RateLimiter(AbstractContextManager): - """Provides rate limiting for an operation with a configurable number of requests for a time period. """ diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py index d0e38de661dd1..5a8802c7a0a49 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py +++ b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py @@ -114,9 +114,11 @@ def get_tables(self) -> List[str]: table_normalized = re.sub( r"^.", "", - str(table) - if not self._use_raw_names - else f"{table.schema.raw_name}.{table.raw_name}", + ( + str(table) + if not self._use_raw_names + else f"{table.schema.raw_name}.{table.raw_name}" + ), ) result.append(str(table_normalized)) diff --git a/metadata-ingestion/tests/integration/s3/test_s3.py b/metadata-ingestion/tests/integration/s3/test_s3.py index b45f1f78fc55a..54156610c6872 100644 --- a/metadata-ingestion/tests/integration/s3/test_s3.py +++ b/metadata-ingestion/tests/integration/s3/test_s3.py @@ -112,9 +112,9 @@ def s3_populate(pytestconfig, s3_resource, s3_client, bucket_names): bkt.upload_file( full_path, rel_path, # Set content type for `no_extension/small` file to text/csv - ExtraArgs={"ContentType": "text/csv"} - if "." not in rel_path - else {}, + ExtraArgs=( + {"ContentType": "text/csv"} if "." not in rel_path else {} + ), ) s3_client.put_object_tagging( Bucket=bucket_name, diff --git a/metadata-ingestion/tests/performance/bigquery/bigquery_events.py b/metadata-ingestion/tests/performance/bigquery/bigquery_events.py index 0e0bfe78c260f..bf3d566da8d27 100644 --- a/metadata-ingestion/tests/performance/bigquery/bigquery_events.py +++ b/metadata-ingestion/tests/performance/bigquery/bigquery_events.py @@ -47,9 +47,11 @@ def generate_events( for query in queries: project = ( # Most queries are run in the project of the tables they access table_to_project[ - query.object_modified.name - if query.object_modified - else query.fields_accessed[0].table.name + ( + query.object_modified.name + if query.object_modified + else query.fields_accessed[0].table.name + ) ] if random.random() >= proabability_of_project_mismatch else random.choice(projects) @@ -71,9 +73,11 @@ def generate_events( query=query.text, statementType=random.choice(OPERATION_TYPE_MAP[query.type]), project_id=project, - destinationTable=ref_from_table(query.object_modified, table_to_project) - if query.object_modified - else None, + destinationTable=( + ref_from_table(query.object_modified, table_to_project) + if query.object_modified + else None + ), referencedTables=list( dict.fromkeys( # Preserve order ref_from_table(field.table, table_to_project) @@ -90,9 +94,11 @@ def generate_events( ) ), referencedViews=referencedViews, - payload=dataclasses.asdict(query) - if config.debug_include_full_payloads - else None, + payload=( + dataclasses.asdict(query) + if config.debug_include_full_payloads + else None + ), query_on_view=True if referencedViews else False, ) ) @@ -118,9 +124,11 @@ def generate_events( resource=ref, fieldsRead=list(columns), readReason=random.choice(READ_REASONS), - payload=dataclasses.asdict(query) - if config.debug_include_full_payloads - else None, + payload=( + dataclasses.asdict(query) + if config.debug_include_full_payloads + else None + ), ) ) diff --git a/metadata-ingestion/tests/performance/data_generation.py b/metadata-ingestion/tests/performance/data_generation.py index 9b80d6260d408..fcff13edf5936 100644 --- a/metadata-ingestion/tests/performance/data_generation.py +++ b/metadata-ingestion/tests/performance/data_generation.py @@ -7,6 +7,7 @@ This is a work in progress, built piecemeal as needed. """ + import random from abc import ABCMeta, abstractmethod from collections import OrderedDict diff --git a/metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py b/metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py index ee1caf6783ec1..cb3a1c165acdd 100644 --- a/metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py +++ b/metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py @@ -120,9 +120,9 @@ def tables(self, schema: Schema) -> Iterable[Table]: updated_at=None, updated_by=None, table_id="", - view_definition=table.definition - if isinstance(table, data_model.View) - else None, + view_definition=( + table.definition if isinstance(table, data_model.View) else None + ), properties={}, ) diff --git a/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py b/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py index 2d43b24e10763..cafca521ae014 100644 --- a/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py +++ b/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py @@ -37,20 +37,22 @@ def make_lineage_aspect( ) for upstream_urn in upstreams ], - fineGrainedLineages=[ - models.FineGrainedLineageClass( - upstreamType=models.FineGrainedLineageUpstreamTypeClass.FIELD_SET, - downstreamType=models.FineGrainedLineageDownstreamTypeClass.FIELD, - upstreams=[ - make_schema_field_urn(upstream_urn, col) - for upstream_urn in upstreams - ], - downstreams=[make_schema_field_urn(dataset_urn, col)], - ) - for col in columns - ] - if include_cll - else None, + fineGrainedLineages=( + [ + models.FineGrainedLineageClass( + upstreamType=models.FineGrainedLineageUpstreamTypeClass.FIELD_SET, + downstreamType=models.FineGrainedLineageDownstreamTypeClass.FIELD, + upstreams=[ + make_schema_field_urn(upstream_urn, col) + for upstream_urn in upstreams + ], + downstreams=[make_schema_field_urn(dataset_urn, col)], + ) + for col in columns + ] + if include_cll + else None + ), ) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index b8a1222125d10..506bfd9c12674 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -163,12 +163,14 @@ def create_and_run_test_pipeline( "tests.unit.test_source.FakeSource.get_workunits" ) as mock_getworkunits: mock_getworkunits.return_value = [ - workunit.MetadataWorkUnit( - id=f"test-workunit-mce-{e.proposedSnapshot.urn}", mce=e - ) - if isinstance(e, MetadataChangeEventClass) - else workunit.MetadataWorkUnit( - id=f"test-workunit-mcp-{e.entityUrn}-{e.aspectName}", mcp=e + ( + workunit.MetadataWorkUnit( + id=f"test-workunit-mce-{e.proposedSnapshot.urn}", mce=e + ) + if isinstance(e, MetadataChangeEventClass) + else workunit.MetadataWorkUnit( + id=f"test-workunit-mcp-{e.entityUrn}-{e.aspectName}", mcp=e + ) ) for e in events ]