feat(ingestion): Add typeUrn handling to ownership transformers (#9370)

This commit is contained in:
skrydal 2023-12-13 20:34:20 +01:00 committed by GitHub
parent 3cde9549a2
commit a495d652e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 86 additions and 60 deletions

View File

@ -55,12 +55,12 @@ transformers:
```
## Simple Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|--------------|---------------|------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | `DATAOWNER` | ownership type of the owners. |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
|--------------------|----------|--------------|-------------|---------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
For transformer behaviour on `replace_existing` and `semantics`, please refer section [Relationship Between replace_existing And semantics](#relationship-between-replace_existing-and-semantics).
@ -95,7 +95,7 @@ transformers:
- "urn:li:corpuser:username1"
- "urn:li:corpuser:username2"
- "urn:li:corpGroup:groupname"
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owners, however overwrite the owners available for the dataset on DataHub GMS
```yaml
@ -107,7 +107,7 @@ transformers:
- "urn:li:corpuser:username1"
- "urn:li:corpuser:username2"
- "urn:li:corpGroup:groupname"
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owners, however keep the owners available for the dataset on DataHub GMS
```yaml
@ -124,12 +124,12 @@ transformers:
## Pattern Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|--------- |-----------------------|------------------|-----------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | `DATAOWNER` | ownership type of the owners. |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
|--------------------|----------|----------------------|-------------|-----------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
lets suppose wed like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module thats included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners.
@ -158,7 +158,7 @@ The config, which wed append to our ingestion recipe YAML, would look like th
rules:
".*example1.*": ["urn:li:corpuser:username1"]
".*example2.*": ["urn:li:corpuser:username2"]
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owner, however overwrite the owners available for the dataset on DataHub GMS
```yaml
@ -170,7 +170,7 @@ The config, which wed append to our ingestion recipe YAML, would look like th
rules:
".*example1.*": ["urn:li:corpuser:username1"]
".*example2.*": ["urn:li:corpuser:username2"]
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owner, however keep the owners available for the dataset on DataHub GMS
```yaml

View File

@ -9,12 +9,13 @@ from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Iterable,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
cast,
get_type_hints,
)
@ -342,26 +343,20 @@ def make_ml_model_group_urn(platform: str, group_name: str, env: str) -> str:
)
def is_valid_ownership_type(ownership_type: Optional[str]) -> bool:
return ownership_type is not None and ownership_type in [
OwnershipTypeClass.TECHNICAL_OWNER,
OwnershipTypeClass.BUSINESS_OWNER,
OwnershipTypeClass.DATA_STEWARD,
OwnershipTypeClass.NONE,
OwnershipTypeClass.DEVELOPER,
OwnershipTypeClass.DATAOWNER,
OwnershipTypeClass.DELEGATE,
OwnershipTypeClass.PRODUCER,
OwnershipTypeClass.CONSUMER,
OwnershipTypeClass.STAKEHOLDER,
def get_class_fields(_class: Type[object]) -> Iterable[str]:
return [
f
for f in dir(_class)
if not callable(getattr(_class, f)) and not f.startswith("_")
]
def validate_ownership_type(ownership_type: Optional[str]) -> str:
if is_valid_ownership_type(ownership_type):
return cast(str, ownership_type)
else:
raise ValueError(f"Unexpected ownership type: {ownership_type}")
def validate_ownership_type(ownership_type: str) -> Tuple[str, Optional[str]]:
if ownership_type.startswith("urn:li:"):
return OwnershipTypeClass.CUSTOM, ownership_type
if ownership_type in get_class_fields(OwnershipTypeClass):
return ownership_type, None
raise ValueError(f"Unexpected ownership type: {ownership_type}")
def make_lineage_mce(

View File

@ -14,11 +14,8 @@ from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)
from datahub.metadata._schema_classes import OwnershipTypeClass
from datahub.metadata.schema_classes import OwnerClass, OwnershipClass
class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel):
@ -102,7 +99,7 @@ class AddDatasetOwnership(DatasetOwnershipTransformer):
class DatasetOwnershipBaseConfig(TransformerSemanticsConfigModel):
ownership_type: Optional[str] = OwnershipTypeClass.DATAOWNER
ownership_type: str = OwnershipTypeClass.DATAOWNER
class SimpleDatasetOwnershipConfig(DatasetOwnershipBaseConfig):
@ -114,11 +111,14 @@ class SimpleAddDatasetOwnership(AddDatasetOwnership):
"""Transformer that adds a specified set of owners to each dataset."""
def __init__(self, config: SimpleDatasetOwnershipConfig, ctx: PipelineContext):
ownership_type = builder.validate_ownership_type(config.ownership_type)
ownership_type, ownership_type_urn = builder.validate_ownership_type(
config.ownership_type
)
owners = [
OwnerClass(
owner=owner,
type=ownership_type,
typeUrn=ownership_type_urn,
)
for owner in config.owner_urns
]
@ -147,29 +147,17 @@ class PatternDatasetOwnershipConfig(DatasetOwnershipBaseConfig):
class PatternAddDatasetOwnership(AddDatasetOwnership):
"""Transformer that adds a specified set of owners to each dataset."""
def getOwners(
self,
key: str,
owner_pattern: KeyValuePattern,
ownership_type: Optional[str] = None,
) -> List[OwnerClass]:
owners = [
OwnerClass(
owner=owner,
type=builder.validate_ownership_type(ownership_type),
)
for owner in owner_pattern.value(key)
]
return owners
def __init__(self, config: PatternDatasetOwnershipConfig, ctx: PipelineContext):
ownership_type = builder.validate_ownership_type(config.ownership_type)
owner_pattern = config.owner_pattern
ownership_type, ownership_type_urn = builder.validate_ownership_type(
config.ownership_type
)
generic_config = AddDatasetOwnershipConfig(
get_owners_to_add=lambda urn: [
OwnerClass(
owner=owner,
type=ownership_type,
typeUrn=ownership_type_urn,
)
for owner in owner_pattern.value(urn)
],

View File

@ -214,7 +214,10 @@ class TestPipeline(object):
"transformers": [
{
"type": "simple_add_dataset_ownership",
"config": {"owner_urns": ["urn:li:corpuser:foo"]},
"config": {
"owner_urns": ["urn:li:corpuser:foo"],
"ownership_type": "urn:li:ownershipType:__system__technical_owner",
},
}
],
"sink": {"type": "tests.test_helpers.sink_helpers.RecordingSink"},

View File

@ -234,7 +234,7 @@ def test_simple_dataset_ownership_transformation(mock_time):
assert last_event.entityUrn == outputs[0].record.proposedSnapshot.urn
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in last_event.aspect.owners
]
)
@ -247,7 +247,7 @@ def test_simple_dataset_ownership_transformation(mock_time):
assert len(second_ownership_aspect.owners) == 3
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in second_ownership_aspect.owners
]
)
@ -293,6 +293,44 @@ def test_simple_dataset_ownership_with_type_transformation(mock_time):
assert ownership_aspect.owners[0].type == models.OwnershipTypeClass.PRODUCER
def test_simple_dataset_ownership_with_type_urn_transformation(mock_time):
input = make_generic_dataset()
transformer = SimpleAddDatasetOwnership.create(
{
"owner_urns": [
builder.make_user_urn("person1"),
],
"ownership_type": "urn:li:ownershipType:__system__technical_owner",
},
PipelineContext(run_id="test"),
)
output = list(
transformer.transform(
[
RecordEnvelope(input, metadata={}),
RecordEnvelope(EndOfStream(), metadata={}),
]
)
)
assert len(output) == 3
# original MCE is unchanged
assert input == output[0].record
ownership_aspect = output[1].record.aspect
assert isinstance(ownership_aspect, OwnershipClass)
assert len(ownership_aspect.owners) == 1
assert ownership_aspect.owners[0].type == OwnershipTypeClass.CUSTOM
assert (
ownership_aspect.owners[0].typeUrn
== "urn:li:ownershipType:__system__technical_owner"
)
def _test_extract_tags(in_urn: str, regex_str: str, out_tag: str) -> None:
input = make_generic_dataset(entity_urn=in_urn)
transformer = ExtractDatasetTags.create(
@ -883,6 +921,7 @@ def test_pattern_dataset_ownership_transformation(mock_time):
".*example2.*": [builder.make_user_urn("person2")],
}
},
"ownership_type": "DATAOWNER",
},
PipelineContext(run_id="test"),
)
@ -2233,6 +2272,7 @@ def test_simple_dataset_ownership_transformer_semantics_patch(mock_datahub_graph
"replace_existing": False,
"semantics": TransformerSemantics.PATCH,
"owner_urns": [owner2],
"ownership_type": "DATAOWNER",
},
pipeline_context=pipeline_context,
)