feat(ingest): dbt - improving dbt_meta mapping (#5237)

This commit is contained in:
Shirshanka Das 2022-06-24 04:43:12 -07:00 committed by GitHub
parent f66a6b41ef
commit 4b3135a0f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1740 additions and 223 deletions

View File

@ -89,6 +89,38 @@ dbt allows authors to define meta properties for datasets. Checkout this link to
actions such as add a tag, term or owner. For example if a dbt model has a meta config ```"has_pii": True```, we can define an action
that evaluates if the property is set to true and add, lets say, a ```pii``` tag.
To leverage this feature we require users to define mappings as part of the recipe. Following is how mappings can be defined -
<Tabs>
<TabItem value="yaml" label="YAML" default>
```yaml
meta_mapping:
business_owner:
match: ".*"
operation: "add_owner"
config:
owner_type: user
has_pii:
match: True
operation: "add_tag"
config:
tag: "has_pii_test"
int_property:
match: 1
operation: "add_tag"
config:
tag: "int_meta_property"
double_property:
match: 2.5
operation: "add_term"
config:
term: "double_meta_property"
data_governance.team_owner:
match: "Finance"
operation: "add_term"
config:
term: "Finance_test"
```
</TabItem>
<TabItem value="json" label="JSON">
```json
"meta_mapping": {
"business_owner": {
@ -118,6 +150,8 @@ To leverage this feature we require users to define mappings as part of the reci
},
}
```
</TabItem>
</Tabs>
We support the below actions -
1. add_tag - Requires ```tag``` property in config.
2. add_term - Requires ```term``` property in config.
@ -135,7 +169,7 @@ This works similarly as the dbt meta mapping but for the query tags
We support the below actions -
1. add_tag - Requires ```tag``` property in config.
The below example set as global tag the query tag `tag` key's value.
The below example set as global tag the query tag `tag` key's value.
```json
"query_tag_mapping":
{

View File

@ -2,13 +2,49 @@
dbt allows authors to define meta properties for datasets. Checkout this link to know more - [dbt meta](https://docs.getdbt.com/reference/resource-configs/meta). Our dbt source allows users to define
actions such as add a tag, term or owner. For example if a dbt model has a meta config ```"has_pii": True```, we can define an action
that evaluates if the property is set to true and add, lets say, a ```pii``` tag.
To leverage this feature we require users to define mappings as part of the recipe. Following is how mappings can be defined -
To leverage this feature we require users to define mappings as part of the recipe. The following section describes how you can build these mappings. Listed below is a meta_mapping section that among other things, looks for keys like `business_owner` and adds owners that are listed there.
<Tabs>
<TabItem value="yaml" label="YAML" default>
```yaml
meta_mapping:
business_owner:
match: ".*"
operation: "add_owner"
config:
owner_type: user
owner_category: BUSINESS_OWNER
has_pii:
match: True
operation: "add_tag"
config:
tag: "has_pii_test"
int_property:
match: 1
operation: "add_tag"
config:
tag: "int_meta_property"
double_property:
match: 2.5
operation: "add_term"
config:
term: "double_meta_property"
data_governance.team_owner:
match: "Finance"
operation: "add_term"
config:
term: "Finance_test"
```
</TabItem>
<TabItem value="json" label="JSON">
```json
"meta_mapping": {
"business_owner": {
"match": ".*",
"operation": "add_owner",
"config": {"owner_type": "user"},
"config": {"owner_type": "user", "owner_category": "BUSINESS_OWNER"},
},
"has_pii": {
"match": True,
@ -32,14 +68,97 @@ To leverage this feature we require users to define mappings as part of the reci
},
}
```
We support the below actions -
</TabItem>
</Tabs>
We support the following operations:
1. add_tag - Requires ```tag``` property in config.
2. add_term - Requires ```term``` property in config.
3. add_owner - Requires ```owner_type``` property in config which can be either user or group.
3. add_owner - Requires ```owner_type``` property in config which can be either user or group. Optionally accepts the ```owner_category``` config property which you can set to one of ```['TECHNICAL_OWNER', 'BUSINESS_OWNER', 'DATA_STEWARD', 'DATAOWNER'``` (defaults to `DATAOWNER`).
Note:
1. Currently, dbt meta mapping is only supported for meta configs defined at the top most level or a node in manifest file. If that is not preset we will look for meta in the config section of the node.
2. For string based meta properties we support regex matching.
1. Currently, dbt meta mapping is only supported for meta elements defined at the model level (not supported for columns).
2. For string meta properties we support regex matching.
With regex matching, you can also use the matched value to customize how you populate the tag, term or owner fields. Here are a few advanced examples:
#### Data Tier - Bronze, Silver, Gold
If your meta section looks like this:
```yaml
meta:
data_tier: Bronze # chosen from [Bronze,Gold,Silver]
```
and you wanted to attach a glossary term like `urn:li:glossaryTerm:Bronze` for all the models that have this value in the meta section attached to them, the following meta_mapping section would achieve that outcome:
```yaml
meta_mapping:
data_tier:
match: "Bronze|Silver|Gold"
operation: "add_term"
config:
term: "{{ $match }}"
```
to match any data_tier of Bronze, Silver or Gold and maps it to a glossary term with the same name.
#### Case Numbers - create tags
If your meta section looks like this:
```yaml
meta:
case: PLT-4678 # internal Case Number
```
and you want to generate tags that look like `case_4678` from this, you can use the following meta_mapping section:
```yaml
meta_mapping:
case:
match: "PLT-(.*)"
operation: "add_tag"
config:
tag: "case_{{ $match }}"
```
#### Stripping out leading @ sign
You can also match specific groups within the value to extract subsets of the matched value. e.g. if you have a meta section that looks like this:
```yaml
meta:
owner: "@finance-team"
business_owner: "@janet"
```
and you want to mark the finance-team as a group that owns the dataset (skipping the leading @ sign), while marking janet as an individual user (again, skipping the leading @ sign) that owns the dataset, you can use the following meta-mapping section.
```yaml
meta_mapping:
owner:
match: "^@(.*)"
operation: "add_owner"
config:
owner_type: group
business_owner:
match: "^@(?P<owner>(.*))"
operation: "add_owner"
config:
owner_type: user
owner_category: BUSINESS_OWNER
```
In the examples above, we show two ways of writing the matching regexes. In the first one, `^@(.*)` the first matching group (a.k.a. match.group(1)) is automatically inferred. In the second example, `^@(?P<owner>(.*))`, we use a named matching group (called owner, since we are matching an owner) to capture the string we want to provide to the ownership urn.
### dbt query_tag automated mappings
This works similarly as the dbt meta mapping but for the query tags
We support the below actions -
1. add_tag - Requires ```tag``` property in config.
The below example set as global tag the query tag `tag` key's value.
```json
"query_tag_mapping":
{
"tag":
"match": ".*"
"operation": "add_tag"
"config":
"tag": "{{ $match }}"
}
```
### Integrating with dbt test

View File

@ -368,7 +368,9 @@ def make_global_tag_aspect_with_tag_list(tags: List[str]) -> GlobalTagsClass:
def make_ownership_aspect_from_urn_list(
owner_urns: List[str], source_type: Optional[Union[str, OwnershipSourceTypeClass]]
owner_urns: List[str],
source_type: Optional[Union[str, OwnershipSourceTypeClass]],
owner_type: Union[str, OwnershipTypeClass] = OwnershipTypeClass.DATAOWNER,
) -> OwnershipClass:
for owner_urn in owner_urns:
assert owner_urn.startswith("urn:li:corpuser:") or owner_urn.startswith(
@ -381,7 +383,7 @@ def make_ownership_aspect_from_urn_list(
owners_list = [
OwnerClass(
owner=owner_urn,
type=OwnershipTypeClass.DATAOWNER,
type=owner_type,
source=ownership_source_type,
)
for owner_urn in owner_urns

View File

@ -263,6 +263,33 @@ class DBTConfig(StatefulIngestionConfigBase):
)
return aws_connection
@validator("meta_mapping")
def meta_mapping_validator(
cls, meta_mapping: Dict[str, Any], values: Dict, **kwargs: Any
) -> Dict[str, Any]:
for k, v in meta_mapping.items():
if "match" not in v:
raise ValueError(
f"meta_mapping section {k} doesn't have a match clause."
)
if "operation" not in v:
raise ValueError(
f"meta_mapping section {k} doesn't have an operation clause."
)
if v["operation"] == "add_owner":
owner_category = v["config"].get("owner_category")
if owner_category:
allowed_categories = [
value
for name, value in vars(OwnershipTypeClass).items()
if not name.startswith("_")
]
if (owner_category.upper()) not in allowed_categories:
raise ValueError(
f"Owner category {owner_category} is not one of {allowed_categories}"
)
return meta_mapping
@dataclass
class DBTColumn:
@ -1098,14 +1125,6 @@ class DBTSource(StatefulIngestionSourceBase):
def string_map(input_map: Dict[str, Any]) -> Dict[str, str]:
return {k: str(v) for k, v in input_map.items()}
if self.config.test_results_path:
yield from DBTTest.load_test_results(
self.config,
self.load_file_as_json(self.config.test_results_path),
test_nodes,
manifest_nodes,
)
for node in test_nodes:
node_datahub_urn = mce_builder.make_assertion_urn(
mce_builder.datahub_guid(
@ -1276,6 +1295,14 @@ class DBTSource(StatefulIngestionSourceBase):
self.report.report_workunit(soft_delete_wu)
yield soft_delete_wu
if self.config.test_results_path:
yield from DBTTest.load_test_results(
self.config,
self.load_file_as_json(self.config.test_results_path),
test_nodes,
manifest_nodes,
)
# create workunits from dbt nodes
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.config.write_semantics == "PATCH" and not self.ctx.graph:
@ -1399,7 +1426,6 @@ class DBTSource(StatefulIngestionSourceBase):
"SOURCE_CONTROL",
self.config.strip_user_ids_from_email,
)
for node in dbt_nodes:
node_datahub_urn = get_urn_from_dbtNode(
node.database,
@ -1650,7 +1676,10 @@ class DBTSource(StatefulIngestionSourceBase):
self, node: DBTNode, meta_owner_aspects: Any
) -> List[OwnerClass]:
owner_list: List[OwnerClass] = []
if node.owner:
if meta_owner_aspects and self.config.enable_meta_mapping:
# we disregard owners generated from node.owner because that is also coming from the meta section
owner_list = meta_owner_aspects.owners
elif node.owner:
owner: str = node.owner
if self.compiled_owner_extraction_pattern:
match: Optional[Any] = re.match(
@ -1672,9 +1701,6 @@ class DBTSource(StatefulIngestionSourceBase):
)
)
if meta_owner_aspects and self.config.enable_meta_mapping:
owner_list.extend(meta_owner_aspects.owners)
owner_list = sorted(owner_list, key=lambda x: x.owner)
return owner_list

View File

@ -1,9 +1,15 @@
import logging
import re
from typing import Any, Dict, Optional
from typing import Any, Dict, Match, Optional, Union
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import OwnerType
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipClass,
OwnershipSourceClass,
OwnershipTypeClass,
)
class Constants:
@ -15,6 +21,7 @@ class Constants:
TAG = "tag"
TERM = "term"
OWNER_TYPE = "owner_type"
OWNER_CATEGORY = "owner_category"
MATCH = "match"
USER_OWNER = "user"
GROUP_OWNER = "group"
@ -77,7 +84,7 @@ class OperationProcessor:
# operation_type: the type of operation (add_tag, add_term, etc.)
aspect_map: Dict[str, Any] = {} # map of aspect name to aspect object
try:
operations_map: Dict[str, set] = {}
operations_map: Dict[str, Union[set, list]] = {}
for operation_key in self.operation_defs:
operation_type = self.operation_defs.get(operation_key, {}).get(
Constants.OPERATION
@ -87,25 +94,36 @@ class OperationProcessor:
)
if not operation_type or not operation_config:
continue
if self.is_match(
maybe_match = self.get_match(
self.operation_defs[operation_key][Constants.MATCH],
raw_props.get(operation_key),
):
)
if maybe_match is not None:
operation = self.get_operation_value(
operation_key, operation_type, operation_config, raw_props
operation_key, operation_type, operation_config, maybe_match
)
if operation:
operations_value_set: set = operations_map.get(
operation_type, set()
)
operations_value_set.add(operation)
operations_map[operation_type] = operations_value_set
if isinstance(operation, str):
operations_value_set = operations_map.get(
operation_type, set()
)
operations_value_set.add(operation) # type: ignore
operations_map[operation_type] = operations_value_set
else:
operations_value_list = operations_map.get(
operation_type, list()
)
operations_value_list.append(operation) # type: ignore
operations_map[operation_type] = operations_value_list
aspect_map = self.convert_to_aspects(operations_map)
except Exception as e:
self.logger.error("Error while processing operation defs over raw_props", e)
return aspect_map
def convert_to_aspects(self, operation_map: Dict[str, set]) -> Dict[str, Any]:
def convert_to_aspects(
self, operation_map: Dict[str, Union[set, list]]
) -> Dict[str, Any]:
aspect_map: Dict[str, Any] = {}
if Constants.ADD_TAG_OPERATION in operation_map:
tag_aspect = mce_builder.make_global_tag_aspect_with_tag_list(
@ -113,9 +131,20 @@ class OperationProcessor:
)
aspect_map[Constants.ADD_TAG_OPERATION] = tag_aspect
if Constants.ADD_OWNER_OPERATION in operation_map:
owner_aspect = mce_builder.make_ownership_aspect_from_urn_list(
sorted(operation_map[Constants.ADD_OWNER_OPERATION]),
self.owner_source_type,
owner_aspect = OwnershipClass(
owners=[
OwnerClass(
owner=x.get("urn"),
type=x.get("category"),
source=OwnershipSourceClass(type=self.owner_source_type)
if self.owner_source_type
else None,
)
for x in sorted(
operation_map[Constants.ADD_OWNER_OPERATION],
key=lambda x: x["urn"],
)
]
)
aspect_map[Constants.ADD_OWNER_OPERATION] = owner_aspect
if Constants.ADD_TERM_OPERATION in operation_map:
@ -130,8 +159,22 @@ class OperationProcessor:
operation_key: str,
operation_type: str,
operation_config: Dict,
raw_props: Dict,
) -> Optional[str]:
match: Match,
) -> Optional[Union[str, Dict]]:
def _get_best_match(the_match: Match, group_name: str) -> str:
result = the_match.group(0)
try:
result = the_match.group(group_name)
return result
except IndexError:
pass
try:
result = the_match.group(1)
return result
except IndexError:
pass
return result
match_regexp = r"{{\s*\$match\s*}}"
if (
@ -139,10 +182,9 @@ class OperationProcessor:
and operation_config[Constants.TAG]
):
tag = operation_config[Constants.TAG]
if isinstance(raw_props[operation_key], str):
tag = re.sub(
match_regexp, raw_props[operation_key], tag, 0, re.MULTILINE
)
tag_id = _get_best_match(match, "tag")
if isinstance(tag_id, str):
tag = re.sub(match_regexp, tag_id, tag, 0, re.MULTILINE)
if self.tag_prefix:
tag = self.tag_prefix + tag
@ -151,22 +193,32 @@ class OperationProcessor:
operation_type == Constants.ADD_OWNER_OPERATION
and operation_config[Constants.OWNER_TYPE]
):
owner_id = raw_props[operation_key]
owner_id = _get_best_match(match, "owner")
owner_category = (
operation_config.get(Constants.OWNER_CATEGORY)
or OwnershipTypeClass.DATAOWNER
)
owner_category = owner_category.upper()
if self.strip_owner_email_id:
owner_id = self.sanitize_owner_ids(owner_id)
if operation_config[Constants.OWNER_TYPE] == Constants.USER_OWNER:
return mce_builder.make_owner_urn(owner_id, OwnerType.USER)
return {
"urn": mce_builder.make_owner_urn(owner_id, OwnerType.USER),
"category": owner_category,
}
elif operation_config[Constants.OWNER_TYPE] == Constants.GROUP_OWNER:
return mce_builder.make_owner_urn(owner_id, OwnerType.GROUP)
return {
"urn": mce_builder.make_owner_urn(owner_id, OwnerType.GROUP),
"category": owner_category,
}
elif (
operation_type == Constants.ADD_TERM_OPERATION
and operation_config[Constants.TERM]
):
term = operation_config[Constants.TERM]
if isinstance(raw_props[operation_key], str):
term = re.sub(
match_regexp, raw_props[operation_key], term, 0, re.MULTILINE
)
captured_term_id = _get_best_match(match, "term")
if isinstance(captured_term_id, str):
term = re.sub(match_regexp, captured_term_id, term, 0, re.MULTILINE)
return mce_builder.make_term_urn(term)
return None
@ -175,15 +227,13 @@ class OperationProcessor:
owner_id = owner_id[0 : owner_id.index("@")]
return owner_id
def is_match(self, match_clause: Any, raw_props_value: Any) -> bool:
def get_match(self, match_clause: Any, raw_props_value: Any) -> Optional[Match]:
# function to check if a match clause is satisfied to a value.
is_matching: bool
if type(raw_props_value) not in Constants.OPERAND_DATATYPE_SUPPORTED or type(
raw_props_value
) != type(match_clause):
is_matching = False
return None
elif type(raw_props_value) == str:
is_matching = True if re.match(match_clause, raw_props_value) else False
return re.match(match_clause, raw_props_value)
else:
is_matching = match_clause == raw_props_value
return is_matching
return re.match(str(match_clause), str(raw_props_value))

View File

@ -2983,6 +2983,7 @@
"name": "sha256"
},
"meta": {
"owner": "@alice2",
"business_owner": "jdoe.last@gmail.com",
"data_governance.team_owner": "Finance",
"has_pii": true,
@ -3008,10 +3009,7 @@
"quoting": {},
"schema": null,
"tags": [],
"vars": {},
"meta": {
"owner": "@alice2"
}
"vars": {}
},
"database": "pagila",
"deferred": false,
@ -3059,7 +3057,9 @@
"city"
]
],
"tags": [ "test_tag" ],
"tags": [
"test_tag"
],
"unique_id": "model.sample_dbt.customer_details",
"unrendered_config": {
"materialized": "ephemeral"
@ -3360,7 +3360,9 @@
"meta": {},
"name": "first_name",
"quote": null,
"tags": ["column_tag"]
"tags": [
"column_tag"
]
},
"last_name": {
"data_type": null,

View File

@ -27,6 +27,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "@alice2",
"business_owner": "jdoe.last@gmail.com",
"data_governance.team_owner": "Finance",
"has_pii": "True",
@ -67,8 +68,7 @@
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
}
}
},
@ -76,7 +76,8 @@
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": [
{
"tag": "urn:li:tag:dbt:test_tag"
"tag": "urn:li:tag:dbt:test_tag",
"context": null
}
]
}
@ -89,14 +90,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -120,8 +119,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.customer,PROD)",
"type": "TRANSFORMED"
@ -130,8 +128,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.address,PROD)",
"type": "TRANSFORMED"
@ -140,8 +137,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.city,PROD)",
"type": "TRANSFORMED"
@ -232,14 +228,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -344,8 +338,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.dbt_postgres.payments_by_customer_by_month,PROD)",
"type": "TRANSFORMED"
@ -354,8 +347,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,dbt-instance-1.pagila.dbt_postgres.customer_details,PROD)",
"type": "TRANSFORMED"
@ -441,14 +433,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -593,8 +583,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_01,PROD)",
"type": "TRANSFORMED"
@ -603,8 +592,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_02,PROD)",
"type": "TRANSFORMED"
@ -613,8 +601,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_02,PROD)",
"type": "TRANSFORMED"
@ -623,8 +610,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_03,PROD)",
"type": "TRANSFORMED"
@ -633,8 +619,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_04,PROD)",
"type": "TRANSFORMED"
@ -643,8 +628,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_05,PROD)",
"type": "TRANSFORMED"
@ -653,8 +637,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_06,PROD)",
"type": "TRANSFORMED"
@ -740,14 +723,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -832,8 +813,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.dbt_postgres.an-aliased-view-for-payments,PROD)",
"type": "TRANSFORMED"
@ -925,8 +905,7 @@
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
}
}
},
@ -938,14 +917,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 1581759273000,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -994,7 +971,8 @@
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:dbt:column_tag"
"tag": "urn:li:tag:dbt:column_tag",
"context": null
}
]
},
@ -1056,8 +1034,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.actor,PROD)",
"type": "TRANSFORMED"
@ -1135,14 +1112,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 1581759930000,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -1327,8 +1302,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.address,PROD)",
"type": "TRANSFORMED"
@ -1406,14 +1380,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 1581759987000,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -1498,8 +1470,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.category,PROD)",
"type": "TRANSFORMED"
@ -1577,14 +1548,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 1581759925000,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -1689,8 +1658,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.city,PROD)",
"type": "TRANSFORMED"
@ -1775,8 +1743,7 @@
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
}
}
},
@ -1788,14 +1755,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 1581759840000,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -1880,8 +1845,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.country,PROD)",
"type": "TRANSFORMED"
@ -1959,14 +1923,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 1581760640000,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -2191,8 +2153,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.customer,PROD)",
"type": "TRANSFORMED"
@ -2270,14 +2231,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 1580505371996,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -2422,8 +2381,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_01,PROD)",
"type": "TRANSFORMED"
@ -2509,8 +2467,7 @@
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
}
}
},
@ -2522,14 +2479,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 1582319845996,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -2674,8 +2629,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_02,PROD)",
"type": "TRANSFORMED"
@ -2753,14 +2707,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 1584998318996,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -2905,8 +2857,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_03,PROD)",
"type": "TRANSFORMED"
@ -2984,14 +2935,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 1588287228996,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -3136,8 +3085,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_04,PROD)",
"type": "TRANSFORMED"
@ -3215,14 +3163,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": 1589460269996,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -3367,8 +3313,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_05,PROD)",
"type": "TRANSFORMED"
@ -3446,14 +3391,12 @@
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"lastModified": {
"time": -62135596800000,
"actor": "urn:li:corpuser:dbt_executor",
"impersonator": null,
"message": null
"impersonator": null
},
"deleted": null,
"dataset": null,
@ -3598,8 +3541,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_06,PROD)",
"type": "TRANSFORMED"
@ -3633,8 +3575,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,dbt-instance-1.pagila.dbt_postgres.an-aliased-view-for-monthly-billing,PROD)",
"type": "TRANSFORMED"
@ -3668,8 +3609,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,dbt-instance-1.pagila.dbt_postgres.an-aliased-view-for-payments,PROD)",
"type": "TRANSFORMED"
@ -3703,8 +3643,7 @@
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,dbt-instance-1.pagila.dbt_postgres.payments_by_customer_by_month,PROD)",
"type": "TRANSFORMED"

View File

@ -27,6 +27,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "@alice2",
"business_owner": "jdoe.last@gmail.com",
"data_governance.team_owner": "Finance",
"has_pii": "True",
@ -75,7 +76,8 @@
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": [
{
"tag": "urn:li:tag:dbt:test_tag"
"tag": "urn:li:tag:dbt:test_tag",
"context": null
}
]
}
@ -969,7 +971,8 @@
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:dbt:column_tag"
"tag": "urn:li:tag:dbt:column_tag",
"context": null
}
]
},

View File

@ -8,6 +8,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "@alice2",
"business_owner": "jdoe.last@gmail.com",
"data_governance.team_owner": "Finance",
"has_pii": "True",
@ -56,7 +57,8 @@
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": [
{
"tag": "urn:li:tag:dbt:test_tag"
"tag": "urn:li:tag:dbt:test_tag",
"context": null
}
]
}

View File

@ -8,6 +8,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "@alice2",
"business_owner": "jdoe.last@gmail.com",
"data_governance.team_owner": "Finance",
"has_pii": "True",
@ -56,7 +57,8 @@
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": [
{
"tag": "urn:li:tag:dbt:test_tag"
"tag": "urn:li:tag:dbt:test_tag",
"context": null
}
]
}

View File

@ -62,6 +62,11 @@ class DbtTestConfig:
"enable_meta_mapping": False,
"write_semantics": "OVERRIDE",
"meta_mapping": {
"owner": {
"match": "^@(.*)",
"operation": "add_owner",
"config": {"owner_type": "user"},
},
"business_owner": {
"match": ".*",
"operation": "add_owner",

View File

@ -2,10 +2,12 @@ from typing import Any, Dict
from datahub.metadata.com.linkedin.pegasus2avro.common import GlobalTags
from datahub.metadata.schema_classes import (
GlobalTagsClass,
GlossaryTermsClass,
OwnerClass,
OwnershipClass,
OwnershipSourceTypeClass,
OwnershipTypeClass,
)
from datahub.utilities.mapping import OperationProcessor
@ -144,3 +146,88 @@ def test_operation_processor_no_email_strip_source_type_not_null():
new_owner: OwnerClass = ownership_aspect.owners[0]
assert new_owner.owner == "urn:li:corpuser:test_user@abc.com"
assert new_owner.source and new_owner.source.type == "SERVICE"
def test_operation_processor_advanced_matching_owners():
raw_props = {
"user_owner": "@test_user@abc.com",
}
processor = OperationProcessor(
operation_defs={
"user_owner": {
"match": "^@(.*)",
"operation": "add_owner",
"config": {"owner_type": "group"},
},
},
owner_source_type="SOURCE_CONTROL",
)
aspect_map = processor.process(raw_props)
assert "add_owner" in aspect_map
ownership_aspect: OwnershipClass = aspect_map["add_owner"]
assert len(ownership_aspect.owners) == 1
new_owner: OwnerClass = ownership_aspect.owners[0]
assert new_owner.owner == "urn:li:corpGroup:test_user@abc.com"
assert new_owner.source and new_owner.source.type == "SOURCE_CONTROL"
def test_operation_processor_ownership_category():
raw_props = {"user_owner": "@test_user", "business_owner": "alice"}
processor = OperationProcessor(
operation_defs={
"user_owner": {
"match": "^@(.*)",
"operation": "add_owner",
"config": {
"owner_type": "group",
"owner_category": OwnershipTypeClass.DATA_STEWARD,
},
},
"business_owner": {
"match": ".*",
"operation": "add_owner",
"config": {
"owner_type": "user",
"owner_category": OwnershipTypeClass.BUSINESS_OWNER,
},
},
},
owner_source_type="SOURCE_CONTROL",
)
aspect_map = processor.process(raw_props)
assert "add_owner" in aspect_map
ownership_aspect: OwnershipClass = aspect_map["add_owner"]
assert len(ownership_aspect.owners) == 2
new_owner: OwnerClass = ownership_aspect.owners[0]
assert new_owner.owner == "urn:li:corpGroup:test_user"
assert new_owner.source and new_owner.source.type == "SOURCE_CONTROL"
assert new_owner.type and new_owner.type == OwnershipTypeClass.DATA_STEWARD
new_owner = ownership_aspect.owners[1]
assert new_owner.owner == "urn:li:corpuser:alice"
assert new_owner.source and new_owner.source.type == "SOURCE_CONTROL"
assert new_owner.type and new_owner.type == OwnershipTypeClass.BUSINESS_OWNER
def test_operation_processor_advanced_matching_tags():
raw_props = {
"case": "PLT-4567",
}
processor = OperationProcessor(
operation_defs={
"case": {
"match": "^PLT-(.*)",
"operation": "add_tag",
"config": {"tag": "case_{{ $match }}"},
},
},
owner_source_type="SOURCE_CONTROL",
)
aspect_map = processor.process(raw_props)
assert "add_tag" in aspect_map
tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
assert len(tag_aspect.tags) == 1
assert tag_aspect.tags[0].tag == "urn:li:tag:case_4567"