diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index 20bed6099cdae3..0382f821a31ca3 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -307,6 +307,7 @@ module.exports = { }, { "DataHub Cloud Release History": [ + "docs/managed-datahub/release-notes/v_0_3_5", "docs/managed-datahub/release-notes/v_0_3_4", "docs/managed-datahub/release-notes/v_0_3_3", "docs/managed-datahub/release-notes/v_0_3_2", diff --git a/docs/managed-datahub/release-notes/v_0_3_5.md b/docs/managed-datahub/release-notes/v_0_3_5.md new file mode 100644 index 00000000000000..468f2bd59a918b --- /dev/null +++ b/docs/managed-datahub/release-notes/v_0_3_5.md @@ -0,0 +1,33 @@ +# v0.3.5 +--- + +Release Availability Date +--- +02-Sep-2024 + +Recommended CLI/SDK +--- +- `v0.14.0.2` with release notes at https://github.com/acryldata/datahub/releases/tag/v0.14.0.2 + +If you are using an older CLI/SDK version, then please upgrade it. This applies for all CLI/SDK usages, if you are using it through your terminal, GitHub Actions, Airflow, in Python SDK somewhere, Java SDK, etc. This is a strong recommendation to upgrade, as we keep on pushing fixes in the CLI, and it helps us support you better. + +## Release Changelog +--- + +- All changes in https://github.com/datahub-project/datahub/releases/tag/v0.14.0.2 + - Note Breaking Changes: https://datahubproject.io/docs/how/updating-datahub/#0140 + +- Product changes + - Misc fixes and improvements for the Snowflake Tag Propagation Automation (Beta) + - Misc fixes and improvements for the Glossary Term Tag Propagation Automation (Beta) + - Misc fixes and improvements for the Column Docs Propagation Automation (Beta) + - Minor UX improvements on the groups profile page + - Add 'Explore All' button to search dropdown permanently + - Add toggle to filter out transformations in lineage viz + - Misc fixes and minor improvements around the subscriptions and slack integrations experience + - The new slack ingestion source enables one-click subscriptions for your users by automatically hydrating users' memberID. New users who sign up will also automatically have their memberIDs hydrated. + - Please reach out to the Acryl team to get assistance in setting this up. + - Installing or Re-installing the slackbot will now enable the `/datahub` command and ensure your `botToken` is compatible with the new slack ingestion source. You can reach out to the Acryl team for assistance with re-installation. + +- Ingestion changes + - New configuration for dbt lineage, "merges" sources with their sibling in viz: set `prefer_sql_parser_lineage` and `skip_sources_in_lineage` in ingestion; set flag `HIDE_DBT_SOURCE_IN_LINEAGE=true` in gms diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 22ff8025aa0a06..cbe3a6c250c1e7 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -301,6 +301,7 @@ databricks = { # 0.1.11 appears to have authentication issues with azure databricks + # 0.22.0 has support for `include_browse` in metadata list apis "databricks-sdk>=0.30.0", "pyspark~=3.3.0", "requests", diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index f37f5358f9e17d..cbd22b689e0d8b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -39,6 +39,10 @@ ) from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler +from datahub.ingestion.source.bigquery_v2.queries_extractor import ( + BigQueryQueriesExtractor, + BigQueryQueriesExtractorConfig, +) from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.ingestion.source.state.redundant_run_skip_handler import ( @@ -51,6 +55,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) +from datahub.ingestion.source_report.ingestion_stage import QUERIES_EXTRACTION from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.utilities.registries.domain_registry import DomainRegistry @@ -139,6 +144,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): self.lineage_extractor = BigqueryLineageExtractor( config, self.report, + schema_resolver=self.sql_parser_schema_resolver, identifiers=self.identifiers, redundant_run_skip_handler=redundant_lineage_run_skip_handler, ) @@ -196,7 +202,9 @@ def test_connection(config_dict: dict) -> TestConnectionReport: def _init_schema_resolver(self) -> SchemaResolver: schema_resolution_required = ( - self.config.lineage_parse_view_ddl or self.config.lineage_use_sql_parser + self.config.use_queries_v2 + or self.config.lineage_parse_view_ddl + or self.config.lineage_use_sql_parser ) schema_ingestion_enabled = ( self.config.include_schema_metadata @@ -244,22 +252,54 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: for project in projects: yield from self.bq_schema_extractor.get_project_workunits(project) - if self.config.include_usage_statistics: - yield from self.usage_extractor.get_usage_workunits( - [p.id for p in projects], self.bq_schema_extractor.table_refs - ) + if self.config.use_queries_v2: + self.report.set_ingestion_stage("*", "View and Snapshot Lineage") - if self.config.include_table_lineage: - yield from self.lineage_extractor.get_lineage_workunits( + yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots( [p.id for p in projects], - self.sql_parser_schema_resolver, self.bq_schema_extractor.view_refs_by_project, self.bq_schema_extractor.view_definitions, self.bq_schema_extractor.snapshot_refs_by_project, self.bq_schema_extractor.snapshots_by_ref, - self.bq_schema_extractor.table_refs, ) + self.report.set_ingestion_stage("*", QUERIES_EXTRACTION) + + queries_extractor = BigQueryQueriesExtractor( + connection=self.config.get_bigquery_client(), + schema_api=self.bq_schema_extractor.schema_api, + config=BigQueryQueriesExtractorConfig( + window=self.config, + user_email_pattern=self.config.usage.user_email_pattern, + include_lineage=self.config.include_table_lineage, + include_usage_statistics=self.config.include_usage_statistics, + include_operations=self.config.usage.include_operational_stats, + top_n_queries=self.config.usage.top_n_queries, + ), + structured_report=self.report, + filters=self.filters, + identifiers=self.identifiers, + schema_resolver=self.sql_parser_schema_resolver, + discovered_tables=self.bq_schema_extractor.table_refs, + ) + self.report.queries_extractor = queries_extractor.report + yield from queries_extractor.get_workunits_internal() + else: + if self.config.include_usage_statistics: + yield from self.usage_extractor.get_usage_workunits( + [p.id for p in projects], self.bq_schema_extractor.table_refs + ) + + if self.config.include_table_lineage: + yield from self.lineage_extractor.get_lineage_workunits( + [p.id for p in projects], + self.bq_schema_extractor.view_refs_by_project, + self.bq_schema_extractor.view_definitions, + self.bq_schema_extractor.snapshot_refs_by_project, + self.bq_schema_extractor.snapshots_by_ref, + self.bq_schema_extractor.table_refs, + ) + def get_report(self) -> BigQueryV2Report: return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index c5a8b2ab7fbe33..cfbefa5bff65ce 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -404,6 +404,11 @@ class BigQueryV2Config( "enabled.", ) + use_queries_v2: bool = Field( + default=False, + description="If enabled, uses the new queries extractor to extract queries from bigquery.", + ) + @property def have_table_data_read_permission(self) -> bool: return self.use_tables_list_query_v2 or self.is_profiling_enabled() diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py index fffb5cfc8abfdf..ed27aae19ce963 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py @@ -15,6 +15,7 @@ BigQueryIdentifierConfig, ) from datahub.ingestion.source.bigquery_v2.bigquery_report import ( + BigQueryQueriesExtractorReport, BigQuerySchemaApiPerfReport, ) from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigQuerySchemaApi @@ -25,7 +26,6 @@ from datahub.ingestion.source.bigquery_v2.queries_extractor import ( BigQueryQueriesExtractor, BigQueryQueriesExtractorConfig, - BigQueryQueriesExtractorReport, ) logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index d68468fd56c9bc..b333bcf695a464 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -58,6 +58,18 @@ class BigQueryProcessingPerfReport(Report): usage_state_size: Optional[str] = None +@dataclass +class BigQueryQueriesExtractorReport(Report): + query_log_fetch_timer: PerfTimer = field(default_factory=PerfTimer) + audit_log_preprocessing_timer: PerfTimer = field(default_factory=PerfTimer) + audit_log_load_timer: PerfTimer = field(default_factory=PerfTimer) + sql_aggregator: Optional[SqlAggregatorReport] = None + num_queries_by_project: TopKDict[str, int] = field(default_factory=int_top_k_dict) + + num_total_queries: int = 0 + num_unique_queries: int = 0 + + @dataclass class BigQueryV2Report( ProfilingSqlReport, @@ -143,10 +155,8 @@ class BigQueryV2Report( snapshots_scanned: int = 0 - num_view_definitions_parsed: int = 0 - num_view_definitions_failed_parsing: int = 0 - num_view_definitions_failed_column_parsing: int = 0 - view_definitions_parsing_failures: LossyList[str] = field(default_factory=LossyList) + # view lineage + sql_aggregator: Optional[SqlAggregatorReport] = None read_reasons_stat: Counter[str] = field(default_factory=collections.Counter) operation_types_stat: Counter[str] = field(default_factory=collections.Counter) @@ -171,8 +181,7 @@ class BigQueryV2Report( usage_end_time: Optional[datetime] = None stateful_usage_ingestion_enabled: bool = False - # lineage/usage v2 - sql_aggregator: Optional[SqlAggregatorReport] = None + queries_extractor: Optional[BigQueryQueriesExtractorReport] = None def set_ingestion_stage(self, project_id: str, stage: str) -> None: self.report_ingestion_stage_start(f"{project_id}: {stage}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py index d0f111f451c0e1..27beb7b0254c41 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py @@ -137,7 +137,10 @@ def lineage_capability_test( report: BigQueryV2Report, ) -> CapabilityReport: lineage_extractor = BigqueryLineageExtractor( - connection_conf, report, BigQueryIdentifierBuilder(connection_conf, report) + connection_conf, + report, + schema_resolver=SchemaResolver(platform="bigquery"), + identifiers=BigQueryIdentifierBuilder(connection_conf, report), ) for project_id in project_ids: try: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 16d472d4dedd2a..c9d0738bea7dca 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -24,6 +24,7 @@ from datahub.configuration.pattern_utils import is_schema_allowed from datahub.emitter import mce_builder from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.source_helpers import auto_workunit from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( AuditLogEntry, @@ -53,6 +54,7 @@ from datahub.ingestion.source.state.redundant_run_skip_handler import ( RedundantLineageRunSkipHandler, ) +from datahub.ingestion.source_report.ingestion_stage import LINEAGE_EXTRACTION from datahub.metadata.schema_classes import ( AuditStampClass, DatasetLineageTypeClass, @@ -63,6 +65,7 @@ UpstreamLineageClass, ) from datahub.sql_parsing.schema_resolver import SchemaResolver +from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult, sqlglot_lineage from datahub.utilities import memory_footprint from datahub.utilities.file_backed_collections import FileBackedDict @@ -201,38 +204,20 @@ def make_lineage_edges_from_parsing_result( return list(table_edges.values()) -def make_lineage_edge_for_snapshot( - snapshot: BigqueryTableSnapshot, -) -> Optional[LineageEdge]: - if snapshot.base_table_identifier: - base_table_name = str( - BigQueryTableRef.from_bigquery_table(snapshot.base_table_identifier) - ) - return LineageEdge( - table=base_table_name, - column_mapping=frozenset( - LineageEdgeColumnMapping( - out_column=column.field_path, - in_columns=frozenset([column.field_path]), - ) - for column in snapshot.columns - ), - auditStamp=datetime.now(timezone.utc), - type=DatasetLineageTypeClass.TRANSFORMED, - ) - return None - - class BigqueryLineageExtractor: def __init__( self, config: BigQueryV2Config, report: BigQueryV2Report, + *, + schema_resolver: SchemaResolver, identifiers: BigQueryIdentifierBuilder, redundant_run_skip_handler: Optional[RedundantLineageRunSkipHandler] = None, ): self.config = config self.report = report + self.schema_resolver = schema_resolver + self.identifiers = identifiers self.audit_log_api = BigQueryAuditLogApi( report.audit_log_api_perf, @@ -246,6 +231,23 @@ def __init__( self.report.lineage_end_time, ) = self.get_time_window() + self.datasets_skip_audit_log_lineage: Set[str] = set() + + self.aggregator = SqlParsingAggregator( + platform=self.identifiers.platform, + platform_instance=self.identifiers.identifier_config.platform_instance, + env=self.identifiers.identifier_config.env, + schema_resolver=self.schema_resolver, + eager_graph_load=False, + generate_lineage=True, + generate_queries=True, + generate_usage_statistics=False, + generate_query_usage_statistics=False, + generate_operations=False, + format_queries=True, + ) + self.report.sql_aggregator = self.aggregator.report + def get_time_window(self) -> Tuple[datetime, datetime]: if self.redundant_run_skip_handler: return self.redundant_run_skip_handler.suggest_run_time_window( @@ -271,10 +273,46 @@ def _should_ingest_lineage(self) -> bool: return True + def get_lineage_workunits_for_views_and_snapshots( + self, + projects: List[str], + view_refs_by_project: Dict[str, Set[str]], + view_definitions: FileBackedDict[str], + snapshot_refs_by_project: Dict[str, Set[str]], + snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot], + ) -> Iterable[MetadataWorkUnit]: + for project in projects: + if self.config.lineage_parse_view_ddl: + for view in view_refs_by_project[project]: + self.datasets_skip_audit_log_lineage.add(view) + self.aggregator.add_view_definition( + view_urn=self.identifiers.gen_dataset_urn_from_raw_ref( + BigQueryTableRef.from_string_name(view) + ), + view_definition=view_definitions[view], + default_db=project, + ) + + for snapshot_ref in snapshot_refs_by_project[project]: + snapshot = snapshots_by_ref[snapshot_ref] + if not snapshot.base_table_identifier: + continue + self.datasets_skip_audit_log_lineage.add(snapshot_ref) + snapshot_urn = self.identifiers.gen_dataset_urn_from_raw_ref( + BigQueryTableRef.from_string_name(snapshot_ref) + ) + base_table_urn = self.identifiers.gen_dataset_urn_from_raw_ref( + BigQueryTableRef(snapshot.base_table_identifier) + ) + self.aggregator.add_known_lineage_mapping( + upstream_urn=base_table_urn, downstream_urn=snapshot_urn + ) + + yield from auto_workunit(self.aggregator.gen_metadata()) + def get_lineage_workunits( self, projects: List[str], - sql_parser_schema_resolver: SchemaResolver, view_refs_by_project: Dict[str, Set[str]], view_definitions: FileBackedDict[str], snapshot_refs_by_project: Dict[str, Set[str]], @@ -283,39 +321,22 @@ def get_lineage_workunits( ) -> Iterable[MetadataWorkUnit]: if not self._should_ingest_lineage(): return - datasets_skip_audit_log_lineage: Set[str] = set() - dataset_lineage: Dict[str, Set[LineageEdge]] = {} - for project in projects: - self.populate_snapshot_lineage( - dataset_lineage, - snapshot_refs_by_project[project], - snapshots_by_ref, - ) - - if self.config.lineage_parse_view_ddl: - self.populate_view_lineage_with_sql_parsing( - dataset_lineage, - view_refs_by_project[project], - view_definitions, - sql_parser_schema_resolver, - project, - ) - datasets_skip_audit_log_lineage.update(dataset_lineage.keys()) - for lineage_key in dataset_lineage.keys(): - yield from self.gen_lineage_workunits_for_table( - dataset_lineage, BigQueryTableRef.from_string_name(lineage_key) - ) + yield from self.get_lineage_workunits_for_views_and_snapshots( + projects, + view_refs_by_project, + view_definitions, + snapshot_refs_by_project, + snapshots_by_ref, + ) if self.config.use_exported_bigquery_audit_metadata: projects = ["*"] # project_id not used when using exported metadata for project in projects: - self.report.set_ingestion_stage(project, "Lineage Extraction") + self.report.set_ingestion_stage(project, LINEAGE_EXTRACTION) yield from self.generate_lineage( project, - sql_parser_schema_resolver, - datasets_skip_audit_log_lineage, table_refs, ) @@ -328,8 +349,6 @@ def get_lineage_workunits( def generate_lineage( self, project_id: str, - sql_parser_schema_resolver: SchemaResolver, - datasets_skip_audit_log_lineage: Set[str], table_refs: Set[str], ) -> Iterable[MetadataWorkUnit]: logger.info(f"Generate lineage for {project_id}") @@ -339,9 +358,7 @@ def generate_lineage( lineage = self.lineage_via_catalog_lineage_api(project_id) else: events = self._get_parsed_audit_log_events(project_id) - lineage = self._create_lineage_map( - events, sql_parser_schema_resolver - ) + lineage = self._create_lineage_map(events) except Exception as e: self.report.lineage_failed_extraction.append(project_id) self.report.warning( @@ -367,7 +384,7 @@ def generate_lineage( # as they may contain indirectly referenced tables. if ( lineage_key not in table_refs - or lineage_key in datasets_skip_audit_log_lineage + or lineage_key in self.datasets_skip_audit_log_lineage ): continue @@ -375,58 +392,6 @@ def generate_lineage( lineage, BigQueryTableRef.from_string_name(lineage_key) ) - def populate_view_lineage_with_sql_parsing( - self, - view_lineage: Dict[str, Set[LineageEdge]], - view_refs: Set[str], - view_definitions: FileBackedDict[str], - sql_parser_schema_resolver: SchemaResolver, - default_project: str, - ) -> None: - for view in view_refs: - view_definition = view_definitions[view] - raw_view_lineage = sqlglot_lineage( - view_definition, - schema_resolver=sql_parser_schema_resolver, - default_db=default_project, - ) - if raw_view_lineage.debug_info.table_error: - logger.debug( - f"Failed to parse lineage for view {view}: {raw_view_lineage.debug_info.table_error}" - ) - self.report.num_view_definitions_failed_parsing += 1 - self.report.view_definitions_parsing_failures.append( - f"Table-level sql parsing error for view {view}: {raw_view_lineage.debug_info.table_error}" - ) - continue - elif raw_view_lineage.debug_info.column_error: - self.report.num_view_definitions_failed_column_parsing += 1 - self.report.view_definitions_parsing_failures.append( - f"Column-level sql parsing error for view {view}: {raw_view_lineage.debug_info.column_error}" - ) - else: - self.report.num_view_definitions_parsed += 1 - - ts = datetime.now(timezone.utc) - view_lineage[view] = set( - make_lineage_edges_from_parsing_result( - raw_view_lineage, - audit_stamp=ts, - lineage_type=DatasetLineageTypeClass.VIEW, - ) - ) - - def populate_snapshot_lineage( - self, - snapshot_lineage: Dict[str, Set[LineageEdge]], - snapshot_refs: Set[str], - snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot], - ) -> None: - for snapshot in snapshot_refs: - lineage_edge = make_lineage_edge_for_snapshot(snapshots_by_ref[snapshot]) - if lineage_edge: - snapshot_lineage[snapshot] = {lineage_edge} - def gen_lineage_workunits_for_table( self, lineage: Dict[str, Set[LineageEdge]], table_ref: BigQueryTableRef ) -> Iterable[MetadataWorkUnit]: @@ -687,7 +652,6 @@ def _parse_exported_bigquery_audit_metadata( def _create_lineage_map( self, entries: Iterable[QueryEvent], - sql_parser_schema_resolver: SchemaResolver, ) -> Dict[str, Set[LineageEdge]]: logger.info("Entering create lineage map function") lineage_map: Dict[str, Set[LineageEdge]] = collections.defaultdict(set) @@ -751,7 +715,7 @@ def _create_lineage_map( query = e.query raw_lineage = sqlglot_lineage( query, - schema_resolver=sql_parser_schema_resolver, + schema_resolver=self.schema_resolver, default_db=e.project_id, ) logger.debug( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 8457f4e37b3d26..8e1d27847f2b68 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -2,25 +2,29 @@ import logging import pathlib import tempfile -from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Dict, Iterable, List, Optional, TypedDict +from typing import Collection, Dict, Iterable, List, Optional, TypedDict from google.cloud.bigquery import Client -from pydantic import Field +from pydantic import Field, PositiveInt from datahub.configuration.common import AllowDenyPattern from datahub.configuration.time_window_config import ( BaseTimeWindowConfig, get_time_bucket, ) -from datahub.ingestion.api.report import Report from datahub.ingestion.api.source import SourceReport from datahub.ingestion.api.source_helpers import auto_workunit from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.graph.client import DataHubGraph -from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier +from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( + BigqueryTableIdentifier, + BigQueryTableRef, +) from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryBaseConfig +from datahub.ingestion.source.bigquery_v2.bigquery_report import ( + BigQueryQueriesExtractorReport, +) from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryProject, BigQuerySchemaApi, @@ -35,7 +39,6 @@ from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.sql_parsing.sql_parsing_aggregator import ( ObservedQuery, - SqlAggregatorReport, SqlParsingAggregator, ) from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint @@ -44,8 +47,6 @@ FileBackedDict, FileBackedList, ) -from datahub.utilities.perf_timer import PerfTimer -from datahub.utilities.stats_collections import TopKDict, int_top_k_dict from datahub.utilities.time import datetime_to_ts_millis logger = logging.getLogger(__name__) @@ -95,6 +96,10 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig): description="regex patterns for user emails to filter in usage.", ) + top_n_queries: PositiveInt = Field( + default=10, description="Number of top queries to save to each table." + ) + include_lineage: bool = True include_queries: bool = True include_usage_statistics: bool = True @@ -108,18 +113,6 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig): ) -@dataclass -class BigQueryQueriesExtractorReport(Report): - query_log_fetch_timer: PerfTimer = field(default_factory=PerfTimer) - audit_log_preprocessing_timer: PerfTimer = field(default_factory=PerfTimer) - audit_log_load_timer: PerfTimer = field(default_factory=PerfTimer) - sql_aggregator: Optional[SqlAggregatorReport] = None - num_queries_by_project: TopKDict[str, int] = field(default_factory=int_top_k_dict) - - num_total_queries: int = 0 - num_unique_queries: int = 0 - - class BigQueryQueriesExtractor: """ Extracts query audit log and generates usage/lineage/operation workunits. @@ -128,6 +121,7 @@ class BigQueryQueriesExtractor: 1. For every lineage/operation workunit, corresponding query id is also present 2. Operation aspect for a particular query is emitted at max once(last occurence) for a day 3. "DROP" operation accounts for usage here + 4. userEmail is not populated in datasetUsageStatistics aspect, only user urn """ @@ -141,7 +135,7 @@ def __init__( identifiers: BigQueryIdentifierBuilder, graph: Optional[DataHubGraph] = None, schema_resolver: Optional[SchemaResolver] = None, - discovered_tables: Optional[List[str]] = None, + discovered_tables: Optional[Collection[str]] = None, ): self.connection = connection @@ -150,8 +144,7 @@ def __init__( self.identifiers = identifiers self.schema_api = schema_api self.report = BigQueryQueriesExtractorReport() - # self.filters = filters - self.discovered_tables = discovered_tables + self.discovered_tables = set(discovered_tables) if discovered_tables else None self.structured_report = structured_report @@ -171,6 +164,7 @@ def __init__( start_time=self.config.window.start_time, end_time=self.config.window.end_time, user_email_pattern=self.config.user_email_pattern, + top_n_queries=self.config.top_n_queries, ), generate_operations=self.config.include_operations, is_temp_table=self.is_temp_table, @@ -192,19 +186,35 @@ def local_temp_path(self) -> pathlib.Path: def is_temp_table(self, name: str) -> bool: try: - return BigqueryTableIdentifier.from_string_name(name).dataset.startswith( - self.config.temp_table_dataset_prefix - ) + table = BigqueryTableIdentifier.from_string_name(name) + + if table.dataset.startswith(self.config.temp_table_dataset_prefix): + return True + + # This is also a temp table if + # 1. this name would be allowed by the dataset patterns, and + # 2. we have a list of discovered tables, and + # 3. it's not in the discovered tables list + if ( + self.filters.is_allowed(table) + and self.discovered_tables + and str(BigQueryTableRef(table)) not in self.discovered_tables + ): + return True + except Exception: logger.warning(f"Error parsing table name {name} ") - return False + return False def is_allowed_table(self, name: str) -> bool: try: - table_id = BigqueryTableIdentifier.from_string_name(name) - if self.discovered_tables and str(table_id) not in self.discovered_tables: + table = BigqueryTableIdentifier.from_string_name(name) + if ( + self.discovered_tables + and str(BigQueryTableRef(table)) not in self.discovered_tables + ): return False - return self.filters.is_allowed(table_id) + return self.filters.is_allowed(table) except Exception: logger.warning(f"Error parsing table name {name} ") return False diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py index 4c2b0c276b9e7e..bc19940afdd1e3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py @@ -493,7 +493,7 @@ def validate_path_spec(cls, values: Dict) -> Dict[str, Any]: if ( include_ext not in values["file_types"] - and include_ext != "*" + and include_ext not in ["*", ""] and not values["default_extension"] and include_ext not in SUPPORTED_COMPRESSIONS ): diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index f8b1c6dd93d6d9..02eb096b240f52 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -161,6 +161,10 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin default=AllowDenyPattern.allow_all(), description="Regex patterns for connectors to filter in ingestion.", ) + destination_patterns: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns for destinations to filter in ingestion.", + ) include_column_lineage: bool = Field( default=True, description="Populates table->table column lineage.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index c899fe04d2c48e..b459b47deb153a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -283,6 +283,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: logger.info("Fivetran plugin execution is started") connectors = self.audit_log.get_allowed_connectors_list( self.config.connector_patterns, + self.config.destination_patterns, self.report, self.config.history_sync_lookback_period, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index d8ce68e8345ec7..31c16139066e43 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -251,6 +251,7 @@ def _fill_connectors_jobs( def get_allowed_connectors_list( self, connector_patterns: AllowDenyPattern, + destination_patterns: AllowDenyPattern, report: FivetranSourceReport, syncs_interval: int, ) -> List[Connector]: @@ -261,6 +262,9 @@ def get_allowed_connectors_list( if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]): report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) continue + if not destination_patterns.allowed(connector[Constant.DESTINATION_ID]): + report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) + continue connectors.append( Connector( connector_id=connector[Constant.CONNECTOR_ID], diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index 8f1b79251c466f..3069c62e3a240f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -47,6 +47,13 @@ class DataLakeSourceConfig( None, description="Whether or not to create tags in datahub from the s3 object", ) + use_s3_content_type: bool = Field( + default=False, + description=( + "If enabled, use S3 Object metadata to determine content type over file extension, if set." + " Warning: this requires a separate query to S3 for each object, which can be slow for large datasets." + ), + ) # Whether to update the table schema when schema in files within the partitions are updated _update_schema_on_partition_file_updates_deprecation = pydantic_field_deprecated( @@ -145,13 +152,27 @@ def check_path_specs_and_infer_platform( return path_specs @pydantic.validator("platform", always=True) - def platform_not_empty(cls, platform: str, values: dict) -> str: + def platform_valid(cls, platform: str, values: dict) -> str: inferred_platform = values.get( "platform", None ) # we may have inferred it above platform = platform or inferred_platform if not platform: raise ValueError("platform must not be empty") + + if platform != "s3" and values.get("use_s3_bucket_tags"): + raise ValueError( + "Cannot grab s3 bucket tags when platform is not s3. Remove the flag or ingest from s3." + ) + if platform != "s3" and values.get("use_s3_object_tags"): + raise ValueError( + "Cannot grab s3 object tags when platform is not s3. Remove the flag or ingest from s3." + ) + if platform != "s3" and values.get("use_s3_content_type"): + raise ValueError( + "Cannot grab s3 object content type when platform is not s3. Remove the flag or ingest from s3." + ) + return platform @pydantic.root_validator(skip_on_failure=True) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 55e25ebe88d125..ef5ed3c6304c92 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -47,6 +47,7 @@ from datahub.ingestion.source.s3.config import DataLakeSourceConfig, PathSpec from datahub.ingestion.source.s3.report import DataLakeSourceReport from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet +from datahub.ingestion.source.schema_inference.base import SchemaInferenceBase from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, ) @@ -82,7 +83,6 @@ # Hack to support the .gzip extension with smart_open. so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"]) - # config flags to emit telemetry for config_options_to_report = [ "platform", @@ -161,6 +161,7 @@ class BrowsePath: timestamp: datetime size: int partitions: List[Folder] + content_type: Optional[str] = None @dataclasses.dataclass @@ -175,6 +176,7 @@ class TableData: partitions: Optional[List[Folder]] = None max_partition: Optional[Folder] = None min_partition: Optional[Folder] = None + content_type: Optional[str] = None @platform_name("S3 / Local Files", id="s3") @@ -378,8 +380,6 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: # capabilities of smart_open. file = smart_open(table_data.full_path, "rb") - fields = [] - extension = pathlib.Path(table_data.full_path).suffix from datahub.ingestion.source.data_lake_common.path_spec import ( SUPPORTED_COMPRESSIONS, @@ -391,38 +391,24 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: if extension == "" and path_spec.default_extension: extension = f".{path_spec.default_extension}" - try: - if extension == ".parquet": - fields = parquet.ParquetInferrer().infer_schema(file) - elif extension == ".csv": - fields = csv_tsv.CsvInferrer( - max_rows=self.source_config.max_rows - ).infer_schema(file) - elif extension == ".tsv": - fields = csv_tsv.TsvInferrer( - max_rows=self.source_config.max_rows - ).infer_schema(file) - elif extension == ".jsonl": - fields = json.JsonInferrer( - max_rows=self.source_config.max_rows, format="jsonl" - ).infer_schema(file) - elif extension == ".json": - fields = json.JsonInferrer().infer_schema(file) - elif extension == ".avro": - fields = avro.AvroInferrer().infer_schema(file) - else: + fields = [] + inferrer = self._get_inferrer(extension, table_data.content_type) + if inferrer: + try: + fields = inferrer.infer_schema(file) + logger.debug(f"Extracted fields in schema: {fields}") + except Exception as e: self.report.report_warning( table_data.full_path, - f"file {table_data.full_path} has unsupported extension", + f"could not infer schema for file {table_data.full_path}: {e}", ) - file.close() - except Exception as e: + else: self.report.report_warning( table_data.full_path, - f"could not infer schema for file {table_data.full_path}: {e}", + f"file {table_data.full_path} has unsupported extension", ) - file.close() - logger.debug(f"Extracted fields in schema: {fields}") + file.close() + if self.source_config.sort_schema_fields: fields = sorted(fields, key=lambda f: f.fieldPath) @@ -433,6 +419,36 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: return fields + def _get_inferrer( + self, extension: str, content_type: Optional[str] + ) -> Optional[SchemaInferenceBase]: + if content_type == "application/vnd.apache.parquet": + return parquet.ParquetInferrer() + elif content_type == "text/csv": + return csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows) + elif content_type == "text/tab-separated-values": + return csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows) + elif content_type == "application/json": + return json.JsonInferrer() + elif content_type == "application/avro": + return avro.AvroInferrer() + elif extension == ".parquet": + return parquet.ParquetInferrer() + elif extension == ".csv": + return csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows) + elif extension == ".tsv": + return csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows) + elif extension == ".jsonl": + return json.JsonInferrer( + max_rows=self.source_config.max_rows, format="jsonl" + ) + elif extension == ".json": + return json.JsonInferrer() + elif extension == ".avro": + return avro.AvroInferrer() + else: + return None + def add_partition_columns_to_schema( self, path_spec: PathSpec, full_path: str, fields: List[SchemaField] ) -> None: @@ -734,26 +750,25 @@ def extract_table_name(self, path_spec: PathSpec, named_vars: dict) -> str: def extract_table_data( self, path_spec: PathSpec, - path: str, - timestamp: datetime, - size: int, - partitions: List[Folder], + browse_path: BrowsePath, ) -> TableData: + path = browse_path.file + partitions = browse_path.partitions logger.debug(f"Getting table data for path: {path}") table_name, table_path = path_spec.extract_table_name_and_path(path) - table_data = TableData( + return TableData( display_name=table_name, is_s3=self.is_s3_platform(), full_path=path, partitions=partitions, max_partition=partitions[-1] if partitions else None, min_partition=partitions[0] if partitions else None, - timestamp=timestamp, + timestamp=browse_path.timestamp, table_path=table_path, number_of_files=1, size_in_bytes=( - size - if size + browse_path.size + if browse_path.size else sum( [ partition.size if partition.size else 0 @@ -761,8 +776,8 @@ def extract_table_data( ] ) ), + content_type=browse_path.content_type, ) - return table_data def resolve_templated_folders(self, bucket_name: str, prefix: str) -> Iterable[str]: folder_split: List[str] = prefix.split("*", 1) @@ -1001,6 +1016,7 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa timestamp=max_folder.modification_time, size=max_folder.size, partitions=partitions, + # TODO: Support content type inference for partitions ) except Exception as e: # This odd check if being done because boto does not have a proper exception to catch @@ -1021,11 +1037,17 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa for obj in bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE): s3_path = self.create_s3_path(obj.bucket_name, obj.key) logger.debug(f"Path: {s3_path}") + + content_type = None + if self.source_config.use_s3_content_type: + content_type = s3.Object(obj.bucket_name, obj.key).content_type + yield BrowsePath( file=s3_path, timestamp=obj.last_modified, size=obj.size, partitions=[], + content_type=content_type, ) def create_s3_path(self, bucket_name: str, key: str) -> str: @@ -1078,15 +1100,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) table_dict: Dict[str, TableData] = {} for browse_path in file_browser: - if not path_spec.allowed(browse_path.file): - continue - table_data = self.extract_table_data( - path_spec, + if not path_spec.allowed( browse_path.file, - browse_path.timestamp, - browse_path.size, - browse_path.partitions, - ) + ignore_ext=self.is_s3_platform() + and self.source_config.use_s3_content_type, + ): + continue + table_data = self.extract_table_data(path_spec, browse_path) if table_data.table_path not in table_dict: table_dict[table_data.table_path] = table_data else: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 0c861b1334d9fa..4da232518cde2d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -166,7 +166,7 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config): # If we're ingestion schema metadata for tables/views, then we will populate # schemas into the resolver as we go. We only need to do a bulk fetch # if we're not ingesting schema metadata as part of ingestion. - ( + not ( self.config.include_technical_schema and self.config.include_tables and self.config.include_views diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py index c99fe3b09c5bb5..eea10d940bd1c8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py @@ -135,8 +135,8 @@ def get_table_names(self, schema_name: str) -> List[str]: def get_view_names(self, schema_name: str) -> List[str]: try: rows = self._execute_sql(f"SHOW VIEWS FROM `{schema_name}`") - # 3 columns - database, tableName, isTemporary - return [row.tableName for row in rows] + # 4 columns - namespace, viewName, isTemporary, isMaterialized + return [row.viewName for row in rows] except Exception as e: self.report.report_warning("Failed to get views for schema", schema_name) logger.warning( diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 112acd8101297f..bd987c2da7c764 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -109,7 +109,7 @@ def __init__( self.hive_metastore_proxy = hive_metastore_proxy def check_basic_connectivity(self) -> bool: - return bool(self._workspace_client.catalogs.list()) + return bool(self._workspace_client.catalogs.list(include_browse=True)) def assigned_metastore(self) -> Optional[Metastore]: response = self._workspace_client.metastores.summary() @@ -119,7 +119,7 @@ def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]: if self.hive_metastore_proxy: yield self.hive_metastore_proxy.hive_metastore_catalog(metastore) - response = self._workspace_client.catalogs.list() + response = self._workspace_client.catalogs.list(include_browse=True) if not response: logger.info("Catalogs not found") return @@ -131,7 +131,9 @@ def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]: def catalog( self, catalog_name: str, metastore: Optional[Metastore] ) -> Optional[Catalog]: - response = self._workspace_client.catalogs.get(catalog_name) + response = self._workspace_client.catalogs.get( + catalog_name, include_browse=True + ) if not response: logger.info(f"Catalog {catalog_name} not found") return None @@ -148,7 +150,9 @@ def schemas(self, catalog: Catalog) -> Iterable[Schema]: ): yield from self.hive_metastore_proxy.hive_metastore_schemas(catalog) return - response = self._workspace_client.schemas.list(catalog_name=catalog.name) + response = self._workspace_client.schemas.list( + catalog_name=catalog.name, include_browse=True + ) if not response: logger.info(f"Schemas not found for catalog {catalog.id}") return @@ -166,7 +170,9 @@ def tables(self, schema: Schema) -> Iterable[Table]: return with patch("databricks.sdk.service.catalog.TableInfo", TableInfoWithGeneration): response = self._workspace_client.tables.list( - catalog_name=schema.catalog.name, schema_name=schema.name + catalog_name=schema.catalog.name, + schema_name=schema.name, + include_browse=True, ) if not response: logger.info(f"Tables not found for schema {schema.id}") diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 764c2b42537bb4..29204c58fa4b9a 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -37,6 +37,7 @@ from datahub.sql_parsing.sqlglot_lineage import ( ColumnLineageInfo, ColumnRef, + DownstreamColumnRef, SqlParsingResult, infer_output_schema, sqlglot_lineage, @@ -72,6 +73,8 @@ class QueryLogSetting(enum.Enum): _DEFAULT_QUERY_LOG_SETTING = QueryLogSetting[ os.getenv("DATAHUB_SQL_AGG_QUERY_LOG") or QueryLogSetting.DISABLED.name ] +MAX_UPSTREAM_TABLES_COUNT = 300 +MAX_FINEGRAINEDLINEAGE_COUNT = 2000 @dataclasses.dataclass @@ -229,6 +232,8 @@ class SqlAggregatorReport(Report): num_unique_query_fingerprints: Optional[int] = None num_urns_with_lineage: Optional[int] = None num_lineage_skipped_due_to_filters: int = 0 + num_table_lineage_trimmed_due_to_large_size: int = 0 + num_column_lineage_trimmed_due_to_large_size: int = 0 # Queries. num_queries_entities_generated: int = 0 @@ -570,9 +575,6 @@ def add_known_lineage_mapping( Because this method takes in urns, it does not require that the urns are part of the platform that the aggregator is configured for. - TODO: In the future, this method will also generate CLL if we have - schemas for either the upstream or downstream. - The known lineage mapping does not contribute to usage statistics or operations. Args: @@ -585,6 +587,21 @@ def add_known_lineage_mapping( # We generate a fake "query" object to hold the lineage. query_id = self._known_lineage_query_id() + # Generate CLL if schema of downstream is known + column_lineage: List[ColumnLineageInfo] = [] + if self._schema_resolver.has_urn(downstream_urn): + schema = self._schema_resolver._resolve_schema_info(downstream_urn) + if schema: + column_lineage = [ + ColumnLineageInfo( + downstream=DownstreamColumnRef( + table=downstream_urn, column=field_path + ), + upstreams=[ColumnRef(table=upstream_urn, column=field_path)], + ) + for field_path in schema + ] + # Register the query. self._add_to_query_map( QueryMetadata( @@ -596,7 +613,7 @@ def add_known_lineage_mapping( latest_timestamp=None, actor=None, upstreams=[upstream_urn], - column_lineage=[], + column_lineage=column_lineage, column_usage={}, confidence_score=1.0, ) @@ -1154,6 +1171,26 @@ def _gen_lineage_for_downstream( confidenceScore=queries_map[query_id].confidence_score, ) ) + + if len(upstream_aspect.upstreams) > MAX_UPSTREAM_TABLES_COUNT: + logger.warning( + f"Too many upstream tables for {downstream_urn}: {len(upstream_aspect.upstreams)}" + f"Keeping only {MAX_UPSTREAM_TABLES_COUNT} table level upstreams/" + ) + upstream_aspect.upstreams = upstream_aspect.upstreams[ + :MAX_UPSTREAM_TABLES_COUNT + ] + self.report.num_table_lineage_trimmed_due_to_large_size += 1 + if len(upstream_aspect.fineGrainedLineages) > MAX_FINEGRAINEDLINEAGE_COUNT: + logger.warning( + f"Too many upstream columns for {downstream_urn}: {len(upstream_aspect.fineGrainedLineages)}" + f"Keeping only {MAX_FINEGRAINEDLINEAGE_COUNT} column level upstreams/" + ) + upstream_aspect.fineGrainedLineages = upstream_aspect.fineGrainedLineages[ + :MAX_FINEGRAINEDLINEAGE_COUNT + ] + self.report.num_column_lineage_trimmed_due_to_large_size += 1 + upstream_aspect.fineGrainedLineages = ( upstream_aspect.fineGrainedLineages or None ) diff --git a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json index e7b2a7c4a9f4bb..bcbdd02506f734 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json +++ b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json @@ -401,6 +401,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "glossaryTerm", "entityUrn": "urn:li:glossaryTerm:Age", @@ -417,6 +433,97 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "project-id-1.bigquery-dataset-1.snapshot-table-1", + "platform": "urn:li:dataPlatform:bigquery", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "age", + "nullable": false, + "description": "comment", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "INT", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Test Policy Tag" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "email", + "nullable": false, + "description": "comment", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "STRING", + "recursive": false, + "globalTags": { + "tags": [] + }, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https://console.cloud.google.com/bigquery?project=project-id-1&ws=!1m5!1m4!4m3!1sproject-id-1!2sbigquery-dataset-1!3ssnapshot-table-1", + "name": "snapshot-table-1", + "qualifiedName": "project-id-1.bigquery-dataset-1.snapshot-table-1", + "description": "", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "glossaryTerm", "entityUrn": "urn:li:glossaryTerm:Email_Address", @@ -433,6 +540,57 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Bigquery Table Snapshot" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,project-id-1)" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "tag", "entityUrn": "urn:li:tag:Test Policy Tag", @@ -448,5 +606,83 @@ "runId": "bigquery-2022_02_03-07_00_00", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:068bd9323110994a40019fcf6cfc60d3", + "urn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3" + }, + { + "id": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0", + "urn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1643871600000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", + "type": "COPY" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD),age)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD),age)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD),email)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD),email)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index dff7f18db6135c..36199ee0e26000 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -11,6 +11,7 @@ DynamicTypedClassifierConfig, ) from datahub.ingestion.glossary.datahub_classifier import DataHubClassifierConfig +from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_data_reader import BigQueryDataReader from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, @@ -18,6 +19,7 @@ BigqueryProject, BigQuerySchemaApi, BigqueryTable, + BigqueryTableSnapshot, ) from datahub.ingestion.source.bigquery_v2.bigquery_schema_gen import ( BigQuerySchemaGenerator, @@ -47,7 +49,7 @@ def recipe(mcp_output_path: str, override: dict = {}) -> dict: "config": { "project_ids": ["project-id-1"], "include_usage_statistics": False, - "include_table_lineage": False, + "include_table_lineage": True, "include_data_platform_instance": True, "classification": ClassificationConfig( enabled=True, @@ -68,6 +70,7 @@ def recipe(mcp_output_path: str, override: dict = {}) -> dict: @freeze_time(FROZEN_TIME) +@patch.object(BigQuerySchemaApi, "get_snapshots_for_dataset") @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQuerySchemaGenerator, "get_core_table_details") @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") @@ -85,6 +88,7 @@ def test_bigquery_v2_ingest( get_datasets_for_project_id, get_core_table_details, get_tables_for_dataset, + get_snapshots_for_dataset, pytestconfig, tmp_path, ): @@ -100,31 +104,35 @@ def test_bigquery_v2_ingest( {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} ) table_name = "table-1" + snapshot_table_name = "snapshot-table-1" get_core_table_details.return_value = {table_name: table_list_item} + columns = [ + BigqueryColumn( + name="age", + ordinal_position=1, + is_nullable=False, + field_path="col_1", + data_type="INT", + comment="comment", + is_partition_column=False, + cluster_column_position=None, + policy_tags=["Test Policy Tag"], + ), + BigqueryColumn( + name="email", + ordinal_position=1, + is_nullable=False, + field_path="col_2", + data_type="STRING", + comment="comment", + is_partition_column=False, + cluster_column_position=None, + ), + ] + get_columns_for_dataset.return_value = { - table_name: [ - BigqueryColumn( - name="age", - ordinal_position=1, - is_nullable=False, - field_path="col_1", - data_type="INT", - comment="comment", - is_partition_column=False, - cluster_column_position=None, - policy_tags=["Test Policy Tag"], - ), - BigqueryColumn( - name="email", - ordinal_position=1, - is_nullable=False, - field_path="col_2", - data_type="STRING", - comment="comment", - is_partition_column=False, - cluster_column_position=None, - ), - ] + table_name: columns, + snapshot_table_name: columns, } get_sample_data_for_table.return_value = { "age": [random.randint(1, 80) for i in range(20)], @@ -140,6 +148,20 @@ def test_bigquery_v2_ingest( rows_count=None, ) get_tables_for_dataset.return_value = iter([bigquery_table]) + snapshot_table = BigqueryTableSnapshot( + name=snapshot_table_name, + comment=None, + created=None, + last_altered=None, + size_in_bytes=None, + rows_count=None, + base_table_identifier=BigqueryTableIdentifier( + project_id="project-id-1", + dataset="bigquery-dataset-1", + table="table-1", + ), + ) + get_snapshots_for_dataset.return_value = iter([snapshot_table]) pipeline_config_dict: Dict[str, Any] = recipe(mcp_output_path=mcp_output_path) diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py index fb51aac9fa246d..d9effd33f5d278 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py @@ -6,7 +6,6 @@ import pytest from freezegun import freeze_time -from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.sql_parsing.sql_parsing_aggregator import ObservedQuery from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedList from tests.test_helpers import mce_helpers @@ -58,15 +57,12 @@ def test_queries_ingestion(project_client, client, pytestconfig, monkeypatch, tm "config": { "project_ids": ["gcp-staging", "gcp-staging-2"], "local_temp_path": tmp_path, + "top_n_queries": 20, }, }, "sink": {"type": "file", "config": {"filename": mcp_output_path}}, } - # This is hacky to pick all queries instead of any 10. - # Should be easy to remove once top_n_queries is supported in queries config - monkeypatch.setattr(BaseUsageConfig.__fields__["top_n_queries"], "default", 20) - pipeline = run_and_get_pipeline(pipeline_config_dict) pipeline.pretty_print_summary() diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 5e0e20234cc992..0f5d098ee39c4a 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -205,6 +205,11 @@ def test_fivetran_with_snowflake_dest(pytestconfig, tmp_path): "postgres", ] }, + "destination_patterns": { + "allow": [ + "interval_unconstitutional", + ] + }, "sources_to_database": { "calendar_elected": "postgres_db", }, @@ -291,6 +296,11 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_ "postgres", ] }, + "destination_patterns": { + "allow": [ + "interval_unconstitutional", + ] + }, "sources_to_database": { "calendar_elected": "postgres_db", }, diff --git a/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_file_inference_without_extension.json b/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_file_inference_without_extension.json new file mode 100644 index 00000000000000..d50f00efacaa06 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_file_inference_without_extension.json @@ -0,0 +1,769 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "schema_inferred_from": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small", + "number_of_files": "1", + "size_in_bytes": "172" + }, + "name": "small", + "description": "", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "small", + "platform": "urn:li:dataPlatform:s3", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "1st chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "2nd chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "3rd chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "4th chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "Progression Quality", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1615443388097, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1586848010000 + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "bucket_name": "my-test-bucket" + }, + "name": "my-test-bucket" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "S3 bucket" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a" + }, + "name": "folder_a" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa" + }, + "name": "folder_aa" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:c8d940d2010edd365619411b385b11e4" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa/folder_aaa" + }, + "name": "folder_aaa" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + }, + { + "id": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "urn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension" + }, + "name": "no_extension" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + }, + { + "id": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "urn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + }, + { + "id": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "urn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:9b4624d58669059c9e62afb3d7341944" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + }, + { + "id": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "urn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + }, + { + "id": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "urn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9" + }, + { + "id": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "urn": "urn:li:container:9b4624d58669059c9e62afb3d7341944" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/s3/file_inference_without_extension.json b/metadata-ingestion/tests/integration/s3/sources/s3/file_inference_without_extension.json new file mode 100644 index 00000000000000..87f7950946780a --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/s3/file_inference_without_extension.json @@ -0,0 +1,19 @@ +{ + "type": "s3", + "config": { + "platform_instance": "test-platform-instance", + "env": "DEV", + "use_s3_content_type": true, + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/*" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/file_without_extension.json b/metadata-ingestion/tests/integration/s3/sources/shared/file_without_extension.json new file mode 100644 index 00000000000000..235b588e5719bf --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/file_without_extension.json @@ -0,0 +1,19 @@ +{ + "type": "s3", + "config": { + "platform_instance": "test-platform-instance", + "env": "DEV", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/*", + "default_extension": "csv" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition.json new file mode 100644 index 00000000000000..c06e411005399e --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition.json @@ -0,0 +1,17 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/*.*" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_exclude.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_exclude.json new file mode 100644 index 00000000000000..8d05bfcc60a0c7 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_exclude.json @@ -0,0 +1,20 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/*.*", + "exclude": [ + "**/food_csv/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_filename.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_filename.json new file mode 100644 index 00000000000000..421483c74a8681 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_filename.json @@ -0,0 +1,19 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/*.*", + "file_types": ["csv"], + "table_name": "{dept}.{table}" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_glob.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_glob.json new file mode 100644 index 00000000000000..bf3f832f9c7b4a --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_glob.json @@ -0,0 +1,20 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/*/folder_aa/*/{table}/*.*", + "exclude": [ + "**/food_csv/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_basic.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_basic.json new file mode 100644 index 00000000000000..c6afe024f6a2c8 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_basic.json @@ -0,0 +1,21 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/{partition[0]}/{partition[1]}/*.*", + "table_name": "{dept}.{table}", + "exclude":[ + "**/folder_aaaa/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_keyval.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_keyval.json new file mode 100644 index 00000000000000..9bb6d129858ab0 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_keyval.json @@ -0,0 +1,18 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.*", + "table_name": "{dept}.{table}" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_update_schema.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_update_schema.json new file mode 100644 index 00000000000000..b542f60a6a8aa6 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_update_schema.json @@ -0,0 +1,22 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/{partition[0]}/{partition[1]}/*.*", + "sample_files": true, + "table_name": "{dept}.{table}", + "exclude":[ + "**/folder_aaaa/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/multiple_files.json b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_files.json new file mode 100644 index 00000000000000..77be022895cfca --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_files.json @@ -0,0 +1,18 @@ +{ + "type": "s3", + "config": { + "platform_instance": "test-platform-instance", + "env": "DEV", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/*.*" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/multiple_spec_for_files.json b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_spec_for_files.json new file mode 100644 index 00000000000000..d051421e154e72 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_spec_for_files.json @@ -0,0 +1,23 @@ +{ + "type": "s3", + "config": { + "path_specs": [ + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro" + }, + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_csv.csv" + } + ], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + }, + "use_s3_bucket_tags": true, + "use_s3_object_tags": true + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/multiple_specs_of_different_buckets.json b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_specs_of_different_buckets.json new file mode 100644 index 00000000000000..d6aec502e11954 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_specs_of_different_buckets.json @@ -0,0 +1,23 @@ +{ + "type": "s3", + "config": { + "path_specs": [ + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro" + }, + { + "include": "s3://my-test-bucket-2/folder_a/folder_aa/folder_aaa/chord_progressions_csv.csv" + } + ], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + }, + "use_s3_bucket_tags": true, + "use_s3_object_tags": true + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/single_file.json b/metadata-ingestion/tests/integration/s3/sources/shared/single_file.json new file mode 100644 index 00000000000000..96e5eafa562dd3 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/single_file.json @@ -0,0 +1,20 @@ +{ + "type": "s3", + "config": { + "path_specs": [ + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro" + } + ], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + }, + "use_s3_bucket_tags": true, + "use_s3_object_tags": true + } +} diff --git a/metadata-ingestion/tests/integration/s3/test_s3.py b/metadata-ingestion/tests/integration/s3/test_s3.py index 4137c6c5c399ea..b45f1f78fc55a8 100644 --- a/metadata-ingestion/tests/integration/s3/test_s3.py +++ b/metadata-ingestion/tests/integration/s3/test_s3.py @@ -109,7 +109,13 @@ def s3_populate(pytestconfig, s3_resource, s3_client, bucket_names): full_path = os.path.join(root, file) rel_path = os.path.relpath(full_path, test_resources_dir) file_list.append(rel_path) - bkt.upload_file(full_path, rel_path) + bkt.upload_file( + full_path, + rel_path, # Set content type for `no_extension/small` file to text/csv + ExtraArgs={"ContentType": "text/csv"} + if "." not in rel_path + else {}, + ) s3_client.put_object_tagging( Bucket=bucket_name, Key=rel_path, @@ -143,18 +149,24 @@ def touch_local_files(pytestconfig): os.utime(full_path, times=(current_time_sec, current_time_sec)) -SOURCE_FILES_PATH = "./tests/integration/s3/sources/s3" -source_files = os.listdir(SOURCE_FILES_PATH) +SHARED_SOURCE_FILES_PATH = "./tests/integration/s3/sources/shared" +shared_source_files = [ + (SHARED_SOURCE_FILES_PATH, p) for p in os.listdir(SHARED_SOURCE_FILES_PATH) +] + +S3_SOURCE_FILES_PATH = "./tests/integration/s3/sources/s3" +s3_source_files = [(S3_SOURCE_FILES_PATH, p) for p in os.listdir(S3_SOURCE_FILES_PATH)] @pytest.mark.integration -@pytest.mark.parametrize("source_file", source_files) +@pytest.mark.parametrize("source_file_tuple", shared_source_files + s3_source_files) def test_data_lake_s3_ingest( - pytestconfig, s3_populate, source_file, tmp_path, mock_time + pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time ): + source_dir, source_file = source_file_tuple test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/" - f = open(os.path.join(SOURCE_FILES_PATH, source_file)) + f = open(os.path.join(source_dir, source_file)) source = json.load(f) config_dict = {} @@ -184,12 +196,13 @@ def test_data_lake_s3_ingest( @pytest.mark.integration -@pytest.mark.parametrize("source_file", source_files) +@pytest.mark.parametrize("source_file_tuple", shared_source_files) def test_data_lake_local_ingest( - pytestconfig, touch_local_files, source_file, tmp_path, mock_time + pytestconfig, touch_local_files, source_file_tuple, tmp_path, mock_time ): + source_dir, source_file = source_file_tuple test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/" - f = open(os.path.join(SOURCE_FILES_PATH, source_file)) + f = open(os.path.join(source_dir, source_file)) source = json.load(f) config_dict = {} diff --git a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py index 22a48efdec41d7..c078f1b77fd1be 100644 --- a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py +++ b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py @@ -276,6 +276,9 @@ def register_mock_data(workspace_client): TableEntry = namedtuple("TableEntry", ["database", "tableName", "isTemporary"]) +ViewEntry = namedtuple( + "ViewEntry", ["namespace", "viewName", "isTemporary", "isMaterialized"] +) def mock_hive_sql(query): @@ -418,7 +421,7 @@ def mock_hive_sql(query): TableEntry("bronze_kambi", "view1", False), ] elif query == "SHOW VIEWS FROM `bronze_kambi`": - return [TableEntry("bronze_kambi", "view1", False)] + return [ViewEntry("bronze_kambi", "view1", False, False)] return [] diff --git a/metadata-ingestion/tests/unit/test_bigquery_lineage.py b/metadata-ingestion/tests/unit/test_bigquery_lineage.py index 6bd5cc4d3226e2..7456f2fd1d91c2 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_lineage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_lineage.py @@ -83,7 +83,10 @@ def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None: config = BigQueryV2Config() report = BigQueryV2Report() extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( - config, report, BigQueryIdentifierBuilder(config, report) + config, + report, + schema_resolver=SchemaResolver(platform="bigquery"), + identifiers=BigQueryIdentifierBuilder(config, report), ) bq_table = BigQueryTableRef.from_string_name( @@ -91,8 +94,7 @@ def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None: ) lineage_map: Dict[str, Set[LineageEdge]] = extractor._create_lineage_map( - iter(lineage_entries), - sql_parser_schema_resolver=SchemaResolver(platform="bigquery"), + iter(lineage_entries) ) upstream_lineage = extractor.get_lineage_for_table( @@ -108,7 +110,10 @@ def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None: config = BigQueryV2Config(extract_column_lineage=True, incremental_lineage=False) report = BigQueryV2Report() extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( - config, report, BigQueryIdentifierBuilder(config, report) + config, + report, + schema_resolver=SchemaResolver(platform="bigquery"), + identifiers=BigQueryIdentifierBuilder(config, report), ) bq_table = BigQueryTableRef.from_string_name( @@ -117,7 +122,6 @@ def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None: lineage_map: Dict[str, Set[LineageEdge]] = extractor._create_lineage_map( lineage_entries[:1], - sql_parser_schema_resolver=SchemaResolver(platform="bigquery"), ) upstream_lineage = extractor.get_lineage_for_table( diff --git a/metadata-integration/java/openlineage-converter/build.gradle b/metadata-integration/java/openlineage-converter/build.gradle index 6783d300a5fe44..2e04881ab5ccda 100644 --- a/metadata-integration/java/openlineage-converter/build.gradle +++ b/metadata-integration/java/openlineage-converter/build.gradle @@ -47,6 +47,10 @@ shadowJar { exclude('log4j2.*', 'log4j.*') } +jar { + archiveClassifier = 'lib' +} + //task sourcesJar(type: Jar) { // classifier 'sources' // from sourceSets.main.allJava