Skip to content

Commit 3ab4ec9

Browse files
authored
feat(ingest/dbt): support a datahub section in meta mappings (datahub-project#10371)
1 parent 7e69247 commit 3ab4ec9

File tree

3 files changed

+136
-36
lines changed

3 files changed

+136
-36
lines changed

metadata-ingestion/src/datahub/emitter/mce_builder.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ def make_ml_model_group_urn(platform: str, group_name: str, env: str) -> str:
367367
)
368368

369369

370-
def get_class_fields(_class: Type[object]) -> Iterable[str]:
370+
def _get_enum_options(_class: Type[object]) -> Iterable[str]:
371371
return [
372372
f
373373
for f in dir(_class)
@@ -378,7 +378,8 @@ def get_class_fields(_class: Type[object]) -> Iterable[str]:
378378
def validate_ownership_type(ownership_type: str) -> Tuple[str, Optional[str]]:
379379
if ownership_type.startswith("urn:li:"):
380380
return OwnershipTypeClass.CUSTOM, ownership_type
381-
if ownership_type in get_class_fields(OwnershipTypeClass):
381+
ownership_type = ownership_type.upper()
382+
if ownership_type in _get_enum_options(OwnershipTypeClass):
382383
return ownership_type, None
383384
raise ValueError(f"Unexpected ownership type: {ownership_type}")
384385

metadata-ingestion/src/datahub/utilities/mapping.py

+83-33
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@
66
from functools import reduce
77
from typing import Any, Dict, List, Mapping, Match, Optional, Union, cast
88

9+
from datahub.configuration.common import ConfigModel
910
from datahub.emitter import mce_builder
10-
from datahub.emitter.mce_builder import OwnerType
11+
from datahub.emitter.mce_builder import (
12+
OwnerType,
13+
make_user_urn,
14+
validate_ownership_type,
15+
)
1116
from datahub.metadata.schema_classes import (
1217
AuditStampClass,
1318
InstitutionalMemoryClass,
@@ -83,6 +88,36 @@ class Constants:
8388
SEPARATOR = "separator"
8489

8590

91+
class _MappingOwner(ConfigModel):
92+
owner: str
93+
owner_type: str = OwnershipTypeClass.DATAOWNER
94+
95+
96+
class _DatahubProps(ConfigModel):
97+
owners: List[Union[str, _MappingOwner]]
98+
99+
def make_owner_category_list(self) -> List[Dict]:
100+
res = []
101+
for owner in self.owners:
102+
if isinstance(owner, str):
103+
owner_id = owner
104+
owner_category = OwnershipTypeClass.DATAOWNER
105+
else:
106+
owner_id = owner.owner
107+
owner_category = owner.owner_type
108+
owner_id = make_user_urn(owner_id)
109+
owner_category, owner_category_urn = validate_ownership_type(owner_category)
110+
111+
res.append(
112+
{
113+
"urn": owner_id,
114+
"category": owner_category,
115+
"categoryUrn": owner_category_urn,
116+
}
117+
)
118+
return res
119+
120+
86121
class OperationProcessor:
87122
"""
88123
A general class that processes a dictionary of properties and operations defined on it.
@@ -128,7 +163,7 @@ def __init__(
128163
self.owner_source_type = owner_source_type
129164
self.match_nested_props = match_nested_props
130165

131-
def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]:
166+
def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: # noqa: C901
132167
# Defining the following local variables -
133168
# operations_map - the final resulting map when operations are processed.
134169
# Against each operation the values to be applied are stored.
@@ -137,9 +172,35 @@ def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]:
137172
# operation config: map which contains the parameters to carry out that operation.
138173
# For e.g for add_tag operation config will have the tag value.
139174
# operation_type: the type of operation (add_tag, add_term, etc.)
140-
aspect_map: Dict[str, Any] = {} # map of aspect name to aspect object
175+
176+
# Process the special "datahub" property, which supports tags, terms, and owners.
177+
operations_map: Dict[str, list] = {}
178+
try:
179+
datahub_prop = raw_props.get("datahub")
180+
if datahub_prop and isinstance(datahub_prop, dict):
181+
if datahub_prop.get("tags"):
182+
# Note that tags get converted to urns later because we need to support the tag prefix.
183+
tags = datahub_prop["tags"]
184+
operations_map.setdefault(Constants.ADD_TAG_OPERATION, []).extend(
185+
tags
186+
)
187+
188+
if datahub_prop.get("terms"):
189+
terms = datahub_prop["terms"]
190+
operations_map.setdefault(Constants.ADD_TERM_OPERATION, []).extend(
191+
mce_builder.make_term_urn(term) for term in terms
192+
)
193+
194+
if datahub_prop.get("owners"):
195+
owners = _DatahubProps.parse_obj_allow_extras(datahub_prop)
196+
operations_map.setdefault(Constants.ADD_OWNER_OPERATION, []).extend(
197+
owners.make_owner_category_list()
198+
)
199+
except Exception as e:
200+
logger.error(f"Error while processing datahub property: {e}")
201+
202+
# Process the actual directives.
141203
try:
142-
operations_map: Dict[str, Union[set, list]] = {}
143204
for operation_key in self.operation_defs:
144205
operation_type = self.operation_defs.get(operation_key, {}).get(
145206
Constants.OPERATION
@@ -177,42 +238,36 @@ def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]:
177238
isinstance(operation, list)
178239
and operation_type == Constants.ADD_OWNER_OPERATION
179240
):
180-
operation_value_list = operations_map.get(
181-
operation_type, list()
182-
)
183-
cast(List, operation_value_list).extend(
241+
operations_map.setdefault(operation_type, []).extend(
184242
operation
185-
) # cast to silent the lint
186-
operations_map[operation_type] = operation_value_list
243+
)
187244

188245
elif isinstance(operation, (str, list)):
189-
operations_value_set = operations_map.get(
190-
operation_type, set()
246+
operations_map.setdefault(operation_type, []).extend(
247+
operation
248+
if isinstance(operation, list)
249+
else [operation]
191250
)
192-
if isinstance(operation, list):
193-
operations_value_set.update(operation) # type: ignore
194-
else:
195-
operations_value_set.add(operation) # type: ignore
196-
operations_map[operation_type] = operations_value_set
197251
else:
198-
operations_value_list = operations_map.get(
199-
operation_type, list()
252+
operations_map.setdefault(operation_type, []).append(
253+
operation
200254
)
201-
operations_value_list.append(operation) # type: ignore
202-
operations_map[operation_type] = operations_value_list
203-
aspect_map = self.convert_to_aspects(operations_map)
204255
except Exception as e:
205256
logger.error(f"Error while processing operation defs over raw_props: {e}")
257+
258+
aspect_map: Dict[str, Any] = {} # map of aspect name to aspect object
259+
try:
260+
aspect_map = self.convert_to_aspects(operations_map)
261+
except Exception as e:
262+
logger.error(f"Error while converting operations map to aspects: {e}")
206263
return aspect_map
207264

208-
def convert_to_aspects(
209-
self, operation_map: Dict[str, Union[set, list]]
210-
) -> Dict[str, Any]:
265+
def convert_to_aspects(self, operation_map: Dict[str, list]) -> Dict[str, Any]:
211266
aspect_map: Dict[str, Any] = {}
212267

213268
if Constants.ADD_TAG_OPERATION in operation_map:
214269
tag_aspect = mce_builder.make_global_tag_aspect_with_tag_list(
215-
sorted(operation_map[Constants.ADD_TAG_OPERATION])
270+
sorted(set(operation_map[Constants.ADD_TAG_OPERATION]))
216271
)
217272

218273
aspect_map[Constants.ADD_TAG_OPERATION] = tag_aspect
@@ -240,7 +295,7 @@ def convert_to_aspects(
240295

241296
if Constants.ADD_TERM_OPERATION in operation_map:
242297
term_aspect = mce_builder.make_glossary_terms_aspect_from_urn_list(
243-
sorted(operation_map[Constants.ADD_TERM_OPERATION])
298+
sorted(set(operation_map[Constants.ADD_TERM_OPERATION]))
244299
)
245300
aspect_map[Constants.ADD_TERM_OPERATION] = term_aspect
246301

@@ -319,12 +374,7 @@ def get_operation_value(
319374
operation_config.get(Constants.OWNER_CATEGORY)
320375
or OwnershipTypeClass.DATAOWNER
321376
)
322-
owner_category_urn: Optional[str] = None
323-
if owner_category.startswith("urn:li:"):
324-
owner_category_urn = owner_category
325-
owner_category = OwnershipTypeClass.DATAOWNER
326-
else:
327-
owner_category = owner_category.upper()
377+
owner_category, owner_category_urn = validate_ownership_type(owner_category)
328378

329379
if self.strip_owner_email_id:
330380
owner_ids = [

metadata-ingestion/tests/unit/test_mapping.py

+50-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ def test_operation_processor_ownership_category():
235235
new_owner = ownership_aspect.owners[2]
236236
assert new_owner.owner == "urn:li:corpuser:bob"
237237
assert new_owner.source and new_owner.source.type == "SOURCE_CONTROL"
238-
assert new_owner.type == OwnershipTypeClass.DATAOWNER # dummy value
238+
assert new_owner.type == OwnershipTypeClass.CUSTOM
239239
assert new_owner.typeUrn == "urn:li:ownershipType:architect"
240240

241241

@@ -347,3 +347,52 @@ def test_operation_processor_matching_dot_props():
347347
tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
348348
assert len(tag_aspect.tags) == 1
349349
assert tag_aspect.tags[0].tag == "urn:li:tag:pii"
350+
351+
352+
def test_operation_processor_datahub_props():
353+
raw_props = {
354+
"datahub": {
355+
"tags": ["tag1", "tag2"],
356+
"terms": ["term1", "term2"],
357+
"owners": [
358+
"owner1",
359+
"urn:li:corpGroup:group1",
360+
{
361+
"owner": "owner2",
362+
"owner_type": "urn:li:ownershipType:steward",
363+
},
364+
{
365+
"owner": "urn:li:corpGroup:group2",
366+
"owner_type": "urn:li:ownershipType:steward",
367+
},
368+
],
369+
}
370+
}
371+
372+
processor = OperationProcessor(
373+
operation_defs={},
374+
owner_source_type="SOURCE_CONTROL",
375+
)
376+
aspect_map = processor.process(raw_props)
377+
378+
assert isinstance(aspect_map["add_owner"], OwnershipClass)
379+
assert [
380+
(owner.owner, owner.type, owner.typeUrn)
381+
for owner in aspect_map["add_owner"].owners
382+
] == [
383+
("urn:li:corpGroup:group1", "DATAOWNER", None),
384+
("urn:li:corpGroup:group2", "CUSTOM", "urn:li:ownershipType:steward"),
385+
("urn:li:corpuser:owner1", "DATAOWNER", None),
386+
("urn:li:corpuser:owner2", "CUSTOM", "urn:li:ownershipType:steward"),
387+
]
388+
389+
assert isinstance(aspect_map["add_tag"], GlobalTagsClass)
390+
assert [tag_association.tag for tag_association in aspect_map["add_tag"].tags] == [
391+
"urn:li:tag:tag1",
392+
"urn:li:tag:tag2",
393+
]
394+
395+
assert isinstance(aspect_map["add_term"], GlossaryTermsClass)
396+
assert [
397+
term_association.urn for term_association in aspect_map["add_term"].terms
398+
] == ["urn:li:glossaryTerm:term1", "urn:li:glossaryTerm:term2"]

0 commit comments

Comments
 (0)