Skip to content

Commit

Permalink
Merge branch 'master' into feature/fix_sql_common_get_db_name_method
Browse files Browse the repository at this point in the history
  • Loading branch information
sleeperdeep authored Sep 3, 2024
2 parents 574e980 + 1f3688a commit 90d5d4c
Show file tree
Hide file tree
Showing 42 changed files with 1,734 additions and 259 deletions.
1 change: 1 addition & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ module.exports = {
},
{
"DataHub Cloud Release History": [
"docs/managed-datahub/release-notes/v_0_3_5",
"docs/managed-datahub/release-notes/v_0_3_4",
"docs/managed-datahub/release-notes/v_0_3_3",
"docs/managed-datahub/release-notes/v_0_3_2",
Expand Down
33 changes: 33 additions & 0 deletions docs/managed-datahub/release-notes/v_0_3_5.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# v0.3.5
---

Release Availability Date
---
02-Sep-2024

Recommended CLI/SDK
---
- `v0.14.0.2` with release notes at https://github.com/acryldata/datahub/releases/tag/v0.14.0.2

If you are using an older CLI/SDK version, then please upgrade it. This applies for all CLI/SDK usages, if you are using it through your terminal, GitHub Actions, Airflow, in Python SDK somewhere, Java SDK, etc. This is a strong recommendation to upgrade, as we keep on pushing fixes in the CLI, and it helps us support you better.

## Release Changelog
---

- All changes in https://github.com/datahub-project/datahub/releases/tag/v0.14.0.2
- Note Breaking Changes: https://datahubproject.io/docs/how/updating-datahub/#0140

- Product changes
- Misc fixes and improvements for the Snowflake Tag Propagation Automation (Beta)
- Misc fixes and improvements for the Glossary Term Tag Propagation Automation (Beta)
- Misc fixes and improvements for the Column Docs Propagation Automation (Beta)
- Minor UX improvements on the groups profile page
- Add 'Explore All' button to search dropdown permanently
- Add toggle to filter out transformations in lineage viz
- Misc fixes and minor improvements around the subscriptions and slack integrations experience
- The new slack ingestion source enables one-click subscriptions for your users by automatically hydrating users' memberID. New users who sign up will also automatically have their memberIDs hydrated.
- Please reach out to the Acryl team to get assistance in setting this up.
- Installing or Re-installing the slackbot will now enable the `/datahub` command and ensure your `botToken` is compatible with the new slack ingestion source. You can reach out to the Acryl team for assistance with re-installation.

- Ingestion changes
- New configuration for dbt lineage, "merges" sources with their sibling in viz: set `prefer_sql_parser_lineage` and `skip_sources_in_lineage` in ingestion; set flag `HIDE_DBT_SOURCE_IN_LINEAGE=true` in gms
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@

databricks = {
# 0.1.11 appears to have authentication issues with azure databricks
# 0.22.0 has support for `include_browse` in metadata list apis
"databricks-sdk>=0.30.0",
"pyspark~=3.3.0",
"requests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
)
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.queries_extractor import (
BigQueryQueriesExtractor,
BigQueryQueriesExtractorConfig,
)
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
Expand All @@ -51,6 +55,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.ingestion.source_report.ingestion_stage import QUERIES_EXTRACTION
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.utilities.registries.domain_registry import DomainRegistry

Expand Down Expand Up @@ -139,6 +144,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
self.lineage_extractor = BigqueryLineageExtractor(
config,
self.report,
schema_resolver=self.sql_parser_schema_resolver,
identifiers=self.identifiers,
redundant_run_skip_handler=redundant_lineage_run_skip_handler,
)
Expand Down Expand Up @@ -196,7 +202,9 @@ def test_connection(config_dict: dict) -> TestConnectionReport:

def _init_schema_resolver(self) -> SchemaResolver:
schema_resolution_required = (
self.config.lineage_parse_view_ddl or self.config.lineage_use_sql_parser
self.config.use_queries_v2
or self.config.lineage_parse_view_ddl
or self.config.lineage_use_sql_parser
)
schema_ingestion_enabled = (
self.config.include_schema_metadata
Expand Down Expand Up @@ -244,22 +252,54 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for project in projects:
yield from self.bq_schema_extractor.get_project_workunits(project)

if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects], self.bq_schema_extractor.table_refs
)
if self.config.use_queries_v2:
self.report.set_ingestion_stage("*", "View and Snapshot Lineage")

if self.config.include_table_lineage:
yield from self.lineage_extractor.get_lineage_workunits(
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.sql_parser_schema_resolver,
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
self.bq_schema_extractor.table_refs,
)

self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)

queries_extractor = BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
window=self.config,
user_email_pattern=self.config.usage.user_email_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_statistics,
include_operations=self.config.usage.include_operational_stats,
top_n_queries=self.config.usage.top_n_queries,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
)
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
else:
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects], self.bq_schema_extractor.table_refs
)

if self.config.include_table_lineage:
yield from self.lineage_extractor.get_lineage_workunits(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
self.bq_schema_extractor.table_refs,
)

def get_report(self) -> BigQueryV2Report:
return self.report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ class BigQueryV2Config(
"enabled.",
)

use_queries_v2: bool = Field(
default=False,
description="If enabled, uses the new queries extractor to extract queries from bigquery.",
)

@property
def have_table_data_read_permission(self) -> bool:
return self.use_tables_list_query_v2 or self.is_profiling_enabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
BigQueryIdentifierConfig,
)
from datahub.ingestion.source.bigquery_v2.bigquery_report import (
BigQueryQueriesExtractorReport,
BigQuerySchemaApiPerfReport,
)
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigQuerySchemaApi
Expand All @@ -25,7 +26,6 @@
from datahub.ingestion.source.bigquery_v2.queries_extractor import (
BigQueryQueriesExtractor,
BigQueryQueriesExtractorConfig,
BigQueryQueriesExtractorReport,
)

logger = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ class BigQueryProcessingPerfReport(Report):
usage_state_size: Optional[str] = None


@dataclass
class BigQueryQueriesExtractorReport(Report):
query_log_fetch_timer: PerfTimer = field(default_factory=PerfTimer)
audit_log_preprocessing_timer: PerfTimer = field(default_factory=PerfTimer)
audit_log_load_timer: PerfTimer = field(default_factory=PerfTimer)
sql_aggregator: Optional[SqlAggregatorReport] = None
num_queries_by_project: TopKDict[str, int] = field(default_factory=int_top_k_dict)

num_total_queries: int = 0
num_unique_queries: int = 0


@dataclass
class BigQueryV2Report(
ProfilingSqlReport,
Expand Down Expand Up @@ -143,10 +155,8 @@ class BigQueryV2Report(

snapshots_scanned: int = 0

num_view_definitions_parsed: int = 0
num_view_definitions_failed_parsing: int = 0
num_view_definitions_failed_column_parsing: int = 0
view_definitions_parsing_failures: LossyList[str] = field(default_factory=LossyList)
# view lineage
sql_aggregator: Optional[SqlAggregatorReport] = None

read_reasons_stat: Counter[str] = field(default_factory=collections.Counter)
operation_types_stat: Counter[str] = field(default_factory=collections.Counter)
Expand All @@ -171,8 +181,7 @@ class BigQueryV2Report(
usage_end_time: Optional[datetime] = None
stateful_usage_ingestion_enabled: bool = False

# lineage/usage v2
sql_aggregator: Optional[SqlAggregatorReport] = None
queries_extractor: Optional[BigQueryQueriesExtractorReport] = None

def set_ingestion_stage(self, project_id: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{project_id}: {stage}")
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ def lineage_capability_test(
report: BigQueryV2Report,
) -> CapabilityReport:
lineage_extractor = BigqueryLineageExtractor(
connection_conf, report, BigQueryIdentifierBuilder(connection_conf, report)
connection_conf,
report,
schema_resolver=SchemaResolver(platform="bigquery"),
identifiers=BigQueryIdentifierBuilder(connection_conf, report),
)
for project_id in project_ids:
try:
Expand Down
Loading

0 comments on commit 90d5d4c

Please sign in to comment.