Skip to content

Commit

Permalink
fix(ingestion): groupby_unsorted (#12403)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgomezvillamor authored Jan 21, 2025
1 parent a20f660 commit 4b79e75
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import re
from base64 import b32decode
from collections import defaultdict
from itertools import groupby
from typing import Dict, Iterable, List, Optional, Set, Type, Union, cast

from google.cloud.bigquery.table import TableListItem
Expand Down Expand Up @@ -101,6 +100,7 @@
from datahub.metadata.urns import TagUrn
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.groupby import groupby_unsorted
from datahub.utilities.hive_schema_to_avro import (
HiveColumnToAvroConverter,
get_schema_fields_for_hive_column,
Expand Down Expand Up @@ -730,7 +730,7 @@ def gen_foreign_keys(
foreign_keys: List[BigqueryTableConstraint] = list(
filter(lambda x: x.type == "FOREIGN KEY", table.constraints)
)
for key, group in groupby(
for key, group in groupby_unsorted(
foreign_keys,
lambda x: f"{x.referenced_project_id}.{x.referenced_dataset}.{x.referenced_table_name}",
):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import itertools
import logging
import re
from abc import abstractmethod
Expand Down Expand Up @@ -111,6 +110,7 @@
parse_statements_and_pick,
try_format_query,
)
from datahub.utilities.groupby import groupby_unsorted
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.mapping import Constants, OperationProcessor
from datahub.utilities.time import datetime_to_ts_millis
Expand Down Expand Up @@ -1929,7 +1929,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
else None
),
)
for downstream, upstreams in itertools.groupby(
for downstream, upstreams in groupby_unsorted(
node.upstream_cll, lambda x: x.downstream_col
)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import json
import logging
from collections import namedtuple
from itertools import groupby
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

from pydantic.dataclasses import dataclass
Expand Down Expand Up @@ -58,6 +57,7 @@
SubTypesClass,
ViewPropertiesClass,
)
from datahub.utilities.groupby import groupby_unsorted
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column
from datahub.utilities.str_enum import StrEnum

Expand Down Expand Up @@ -490,7 +490,7 @@ def loop_tables(

iter_res = self._alchemy_client.execute_query(statement)

for key, group in groupby(iter_res, self._get_table_key):
for key, group in groupby_unsorted(iter_res, self._get_table_key):
schema_name = (
f"{db_name}.{key.schema}"
if self.config.include_catalog_name_in_ids
Expand Down Expand Up @@ -647,7 +647,7 @@ def get_hive_view_columns(self, inspector: Inspector) -> Iterable[ViewDataset]:
)

iter_res = self._alchemy_client.execute_query(statement)
for key, group in groupby(iter_res, self._get_table_key):
for key, group in groupby_unsorted(iter_res, self._get_table_key):
db_name = self.get_db_name(inspector)

schema_name = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from dataclasses import dataclass
from datetime import datetime
from functools import lru_cache
from itertools import groupby
from typing import (
Any,
Dict,
Expand Down Expand Up @@ -59,6 +58,7 @@
from datahub.metadata.schema_classes import SchemaMetadataClass
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage
from datahub.utilities.groupby import groupby_unsorted

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -286,7 +286,7 @@ def grouper(fk_row):

# TODO: Check if there's a better way
fk_dicts = list()
for constraint_info, constraint_cols in groupby(res, grouper):
for constraint_info, constraint_cols in groupby_unsorted(res, grouper):
fk_dict = {
"name": str(constraint_info["name"]),
"constrained_columns": list(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import dataclasses
import enum
import functools
import itertools
import json
import logging
import os
Expand Down Expand Up @@ -63,6 +62,7 @@
FileBackedDict,
FileBackedList,
)
from datahub.utilities.groupby import groupby_unsorted
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.ordered_set import OrderedSet
from datahub.utilities.perf_timer import PerfTimer
Expand Down Expand Up @@ -1314,8 +1314,8 @@ def _gen_lineage_for_downstream(
upstream_aspect.fineGrainedLineages = []
for downstream_column, all_upstream_columns in cll.items():
# Group by query ID.
for query_id, upstream_columns_for_query in itertools.groupby(
sorted(all_upstream_columns.items(), key=lambda x: x[1]),
for query_id, upstream_columns_for_query in groupby_unsorted(
all_upstream_columns.items(),
key=lambda x: x[1],
):
upstream_columns = [x[0] for x in upstream_columns_for_query]
Expand Down
17 changes: 17 additions & 0 deletions metadata-ingestion/src/datahub/utilities/groupby.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import collections
from typing import Callable, Iterable, Tuple, TypeVar

T = TypeVar("T")
K = TypeVar("K")


def groupby_unsorted(
iterable: Iterable[T], key: Callable[[T], K]
) -> Iterable[Tuple[K, Iterable[T]]]:
"""The default itertools.groupby() requires that the iterable is already sorted by the key.
This method is similar to groupby() but without the pre-sorted requirement."""

values = collections.defaultdict(list)
for v in iterable:
values[key(v)].append(v)
return values.items()
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,26 @@
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD),a)"
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD),a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD),a)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1"
"query": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD),a)"
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD),a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD),a)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e"
"query": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1"
},
{
"upstreamType": "FIELD_SET",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,26 @@
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),customer_id)"
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),customer_id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),customer_id)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45"
"query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),customer_id)"
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),customer_id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),customer_id)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8"
"query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45"
},
{
"upstreamType": "FIELD_SET",
Expand All @@ -100,26 +100,26 @@
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),return_date)"
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),return_date)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),return_date)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45"
"query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),return_date)"
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),return_date)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),return_date)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8"
"query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45"
},
{
"upstreamType": "FIELD_SET",
Expand Down
11 changes: 11 additions & 0 deletions metadata-ingestion/tests/unit/utilities/test_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage
from datahub.testing.doctest import assert_doctest
from datahub.utilities.delayed_iter import delayed_iter
from datahub.utilities.groupby import groupby_unsorted
from datahub.utilities.is_pytest import is_pytest_running
from datahub.utilities.urns.dataset_urn import DatasetUrn

Expand Down Expand Up @@ -335,3 +336,13 @@ def test_logging_name_extraction() -> None:

def test_is_pytest_running() -> None:
assert is_pytest_running()


def test_groupby_unsorted():
grouped = groupby_unsorted("ABCAC", key=lambda x: x)

assert list(grouped) == [
("A", ["A", "A"]),
("B", ["B"]),
("C", ["C", "C"]),
]

0 comments on commit 4b79e75

Please sign in to comment.