feat(dataproduct): output ports (#15000)

This commit is contained in:
Sergio Gómez Villamor 2025-10-14 15:49:04 +02:00 committed by GitHub
parent 4ce4718729
commit c4e104eba2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 405 additions and 1 deletions

View File

@ -10,6 +10,9 @@ assets:
- urn:li:dashboard:(looker,dashboards.19)
- urn:li:dataFlow:(airflow,snowflake_load,prod)
output_ports:
- urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.pet_details,PROD)
owners:
- id: urn:li:corpuser:jdoe
type: BUSINESS_OWNER

View File

@ -113,6 +113,7 @@ class DataProduct(ConfigModel):
terms: Optional[List[str]] = None
properties: Optional[Dict[str, LaxStr]] = None
external_url: Optional[str] = None
output_ports: Optional[List[str]] = None
_original_yaml_dict: Optional[dict] = None
@pydantic.validator("assets", each_item=True)
@ -124,6 +125,22 @@ class DataProduct(ConfigModel):
return v
@pydantic.validator("output_ports", each_item=True)
def output_ports_must_be_urns(cls, v: str) -> str:
try:
Urn.create_from_string(v)
except Exception as e:
raise ValueError(f"Output port {v} is not an urn: {e}") from e
return v
@pydantic.validator("output_ports", each_item=True)
def output_ports_must_be_from_asset_list(cls, v: str, values: dict) -> str:
assets = values.get("assets", [])
if v not in assets:
raise ValueError(f"Output port {v} is not in asset list")
return v
@property
def urn(self) -> str:
if self.id.startswith("urn:li:dataProduct:"):
@ -181,6 +198,7 @@ class DataProduct(ConfigModel):
DataProductAssociationClass(
destinationUrn=asset,
created=self._mint_auditstamp("yaml"),
outputPort=asset in (self.output_ports or []),
)
for asset in self.assets
]
@ -204,6 +222,7 @@ class DataProduct(ConfigModel):
DataProductAssociationClass(
destinationUrn=asset,
created=self._mint_auditstamp("yaml"),
outputPort=asset in (self.output_ports or []),
)
for asset in self.assets or []
],
@ -369,6 +388,13 @@ class DataProduct(ConfigModel):
external_url=(
data_product_properties.externalUrl if data_product_properties else None
),
output_ports=[
e.destinationUrn
for e in (data_product_properties.assets or [])
if e.outputPort
]
if data_product_properties
else None,
)
def _patch_ownership(

View File

@ -0,0 +1,31 @@
id: pet_of_the_week
domain: Marketing
display_name: Pet of the Week Campaign
description: |-
This campaign includes Pet of the Week data.
assets:
- urn:li:container:DATABASE
- urn:li:container:SCHEMA
- urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes)
output_ports:
- urn:li:container:SCHEMA
- urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes)
owners:
- id: urn:li:corpuser:jdoe
type: BUSINESS_OWNER
properties:
version: 2.0
classification: pii
tags:
- urn:li:tag:awesome
terms:
- urn:li:glossaryTerm:ClientsAndAccounts.AccountBalance
external_url: https://github.com/datahub-project/datahub

View File

@ -0,0 +1,31 @@
id: pet_of_the_week
domain: Marketing
display_name: Pet of the Week Campaign
description: |-
This campaign includes Pet of the Week data.
assets:
- urn:li:container:DATABASE
- urn:li:container:SCHEMA
- urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes)
output_ports:
- urn:li:container:SCHEMA
- urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes)
owners:
- id: urn:li:corpuser:jdoe
type: BUSINESS_OWNER
properties:
version: 2.0
classification: pii
tags:
- urn:li:tag:awesome
terms:
- urn:li:glossaryTerm:ClientsAndAccounts.AccountBalance
external_url: https://github.com/datahub-project/datahub

View File

@ -0,0 +1,128 @@
[
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "UPSERT",
"aspectName": "dataProductProperties",
"aspect": {
"json": {
"customProperties": {
"version": "2.0",
"classification": "pii"
},
"externalUrl": "https://github.com/datahub-project/datahub",
"name": "Pet of the Week Campaign",
"description": "This campaign includes Pet of the Week data.",
"assets": [
{
"destinationUrn": "urn:li:container:DATABASE",
"created": {
"time": 1681455600000,
"actor": "urn:li:corpuser:datahub",
"message": "yaml"
},
"outputPort": false
},
{
"destinationUrn": "urn:li:container:SCHEMA",
"created": {
"time": 1681455600000,
"actor": "urn:li:corpuser:datahub",
"message": "yaml"
},
"outputPort": true
},
{
"destinationUrn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes)",
"created": {
"time": 1681455600000,
"actor": "urn:li:corpuser:datahub",
"message": "yaml"
},
"outputPort": true
}
]
}
}
},
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"json": {
"domains": [
"urn:li:domain:12345"
]
}
}
},
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": [
{
"tag": "urn:li:tag:awesome"
}
]
}
}
},
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "UPSERT",
"aspectName": "glossaryTerms",
"aspect": {
"json": {
"terms": [
{
"urn": "urn:li:glossaryTerm:ClientsAndAccounts.AccountBalance"
}
],
"auditStamp": {
"time": 1681455600000,
"actor": "urn:li:corpuser:datahub",
"message": "yaml"
}
}
}
},
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:jdoe",
"type": "BUSINESS_OWNER"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
},
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
}
]

View File

@ -0,0 +1,149 @@
[
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "PATCH",
"aspectName": "dataProductProperties",
"aspect": {
"json": [
{
"op": "add",
"path": "/description",
"value": "This campaign includes Pet of the Week data."
},
{
"op": "add",
"path": "/name",
"value": "Pet of the Week Campaign"
},
{
"op": "add",
"path": "/assets",
"value": [
{
"destinationUrn": "urn:li:container:DATABASE",
"created": {
"time": 1681455600000,
"actor": "urn:li:corpuser:datahub",
"message": "yaml"
},
"outputPort": false
},
{
"destinationUrn": "urn:li:container:SCHEMA",
"created": {
"time": 1681455600000,
"actor": "urn:li:corpuser:datahub",
"message": "yaml"
},
"outputPort": true
},
{
"destinationUrn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes)",
"created": {
"time": 1681455600000,
"actor": "urn:li:corpuser:datahub",
"message": "yaml"
},
"outputPort": true
}
]
},
{
"op": "add",
"path": "/customProperties",
"value": {
"version": "2.0",
"classification": "pii"
}
},
{
"op": "add",
"path": "/externalUrl",
"value": "https://github.com/datahub-project/datahub"
}
]
}
},
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"json": {
"domains": [
"urn:li:domain:12345"
]
}
}
},
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": [
{
"tag": "urn:li:tag:awesome"
}
]
}
}
},
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "UPSERT",
"aspectName": "glossaryTerms",
"aspect": {
"json": {
"terms": [
{
"urn": "urn:li:glossaryTerm:ClientsAndAccounts.AccountBalance"
}
],
"auditStamp": {
"time": 1681455600000,
"actor": "urn:li:corpuser:datahub",
"message": "yaml"
}
}
}
},
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:jdoe",
"type": "BUSINESS_OWNER"
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
},
{
"entityType": "dataProduct",
"entityUrn": "urn:li:dataProduct:pet_of_the_week",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
}
]

View File

@ -59,8 +59,18 @@ def check_yaml_golden_file(input_file: str, golden_file: str) -> bool:
[
("dataproduct.yaml", False, "golden_dataproduct_out.json"),
("dataproduct_upsert.yaml", True, "golden_dataproduct_out_upsert.json"),
(
"dataproduct_output_ports.yaml",
False,
"golden_dataproduct_output_ports.json",
),
(
"dataproduct_output_ports_upsert.yaml",
True,
"golden_dataproduct_output_ports_upsert.json",
),
],
ids=["update", "upsert"],
ids=["update", "upsert", "update_with_output_ports", "upsert_with_output_ports"],
)
def test_dataproduct_from_yaml(
pytestconfig: pytest.Config,
@ -203,3 +213,29 @@ def test_dataproduct_ownership_type_urn_patch_yaml(
str(dataproduct_output_file),
str(test_resources_dir / "dataproduct_ownership_type_urn.yaml"),
)
def test_dataproduct_output_ports_validation_not_urn(
base_mock_graph: MockDataHubGraph,
) -> None:
"""Test that output_ports must be valid URNs"""
with pytest.raises(ValueError, match="Output port .* is not an urn"):
DataProduct(
id="test_product",
domain="urn:li:domain:12345",
assets=["urn:li:container:DATABASE", "not-a-urn"],
output_ports=["not-a-urn"],
)
def test_dataproduct_output_ports_validation_not_in_assets(
base_mock_graph: MockDataHubGraph,
) -> None:
"""Test that output_ports must be in the assets list"""
with pytest.raises(ValueError, match="Output port .* is not in asset list"):
DataProduct(
id="test_product",
domain="urn:li:domain:12345",
assets=["urn:li:container:DATABASE"],
output_ports=["urn:li:container:SCHEMA"],
)