diff --git a/metadata-ingestion/examples/data_product/dataproduct.yaml b/metadata-ingestion/examples/data_product/dataproduct.yaml index c8158aae9d..343bd94f69 100644 --- a/metadata-ingestion/examples/data_product/dataproduct.yaml +++ b/metadata-ingestion/examples/data_product/dataproduct.yaml @@ -10,6 +10,9 @@ assets: - urn:li:dashboard:(looker,dashboards.19) - urn:li:dataFlow:(airflow,snowflake_load,prod) +output_ports: + - urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.pet_details,PROD) + owners: - id: urn:li:corpuser:jdoe type: BUSINESS_OWNER diff --git a/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py b/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py index cd087b8315..571e01a1f4 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py +++ b/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py @@ -113,6 +113,7 @@ class DataProduct(ConfigModel): terms: Optional[List[str]] = None properties: Optional[Dict[str, LaxStr]] = None external_url: Optional[str] = None + output_ports: Optional[List[str]] = None _original_yaml_dict: Optional[dict] = None @pydantic.validator("assets", each_item=True) @@ -124,6 +125,22 @@ class DataProduct(ConfigModel): return v + @pydantic.validator("output_ports", each_item=True) + def output_ports_must_be_urns(cls, v: str) -> str: + try: + Urn.create_from_string(v) + except Exception as e: + raise ValueError(f"Output port {v} is not an urn: {e}") from e + + return v + + @pydantic.validator("output_ports", each_item=True) + def output_ports_must_be_from_asset_list(cls, v: str, values: dict) -> str: + assets = values.get("assets", []) + if v not in assets: + raise ValueError(f"Output port {v} is not in asset list") + return v + @property def urn(self) -> str: if self.id.startswith("urn:li:dataProduct:"): @@ -181,6 +198,7 @@ class DataProduct(ConfigModel): DataProductAssociationClass( destinationUrn=asset, created=self._mint_auditstamp("yaml"), + outputPort=asset in (self.output_ports or []), ) for asset in self.assets ] @@ -204,6 +222,7 @@ class DataProduct(ConfigModel): DataProductAssociationClass( destinationUrn=asset, created=self._mint_auditstamp("yaml"), + outputPort=asset in (self.output_ports or []), ) for asset in self.assets or [] ], @@ -369,6 +388,13 @@ class DataProduct(ConfigModel): external_url=( data_product_properties.externalUrl if data_product_properties else None ), + output_ports=[ + e.destinationUrn + for e in (data_product_properties.assets or []) + if e.outputPort + ] + if data_product_properties + else None, ) def _patch_ownership( diff --git a/metadata-ingestion/tests/unit/api/entities/dataproducts/dataproduct_output_ports.yaml b/metadata-ingestion/tests/unit/api/entities/dataproducts/dataproduct_output_ports.yaml new file mode 100644 index 0000000000..d6c494cef9 --- /dev/null +++ b/metadata-ingestion/tests/unit/api/entities/dataproducts/dataproduct_output_ports.yaml @@ -0,0 +1,31 @@ +id: pet_of_the_week +domain: Marketing +display_name: Pet of the Week Campaign +description: |- + This campaign includes Pet of the Week data. + +assets: + - urn:li:container:DATABASE + - urn:li:container:SCHEMA + - urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes) + +output_ports: + - urn:li:container:SCHEMA + - urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes) + +owners: + - id: urn:li:corpuser:jdoe + type: BUSINESS_OWNER + +properties: + version: 2.0 + classification: pii + +tags: + - urn:li:tag:awesome + +terms: + - urn:li:glossaryTerm:ClientsAndAccounts.AccountBalance + +external_url: https://github.com/datahub-project/datahub + diff --git a/metadata-ingestion/tests/unit/api/entities/dataproducts/dataproduct_output_ports_upsert.yaml b/metadata-ingestion/tests/unit/api/entities/dataproducts/dataproduct_output_ports_upsert.yaml new file mode 100644 index 0000000000..d6c494cef9 --- /dev/null +++ b/metadata-ingestion/tests/unit/api/entities/dataproducts/dataproduct_output_ports_upsert.yaml @@ -0,0 +1,31 @@ +id: pet_of_the_week +domain: Marketing +display_name: Pet of the Week Campaign +description: |- + This campaign includes Pet of the Week data. + +assets: + - urn:li:container:DATABASE + - urn:li:container:SCHEMA + - urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes) + +output_ports: + - urn:li:container:SCHEMA + - urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes) + +owners: + - id: urn:li:corpuser:jdoe + type: BUSINESS_OWNER + +properties: + version: 2.0 + classification: pii + +tags: + - urn:li:tag:awesome + +terms: + - urn:li:glossaryTerm:ClientsAndAccounts.AccountBalance + +external_url: https://github.com/datahub-project/datahub + diff --git a/metadata-ingestion/tests/unit/api/entities/dataproducts/golden_dataproduct_output_ports.json b/metadata-ingestion/tests/unit/api/entities/dataproducts/golden_dataproduct_output_ports.json new file mode 100644 index 0000000000..4da9d53c4e --- /dev/null +++ b/metadata-ingestion/tests/unit/api/entities/dataproducts/golden_dataproduct_output_ports.json @@ -0,0 +1,128 @@ +[ +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "UPSERT", + "aspectName": "dataProductProperties", + "aspect": { + "json": { + "customProperties": { + "version": "2.0", + "classification": "pii" + }, + "externalUrl": "https://github.com/datahub-project/datahub", + "name": "Pet of the Week Campaign", + "description": "This campaign includes Pet of the Week data.", + "assets": [ + { + "destinationUrn": "urn:li:container:DATABASE", + "created": { + "time": 1681455600000, + "actor": "urn:li:corpuser:datahub", + "message": "yaml" + }, + "outputPort": false + }, + { + "destinationUrn": "urn:li:container:SCHEMA", + "created": { + "time": 1681455600000, + "actor": "urn:li:corpuser:datahub", + "message": "yaml" + }, + "outputPort": true + }, + { + "destinationUrn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes)", + "created": { + "time": 1681455600000, + "actor": "urn:li:corpuser:datahub", + "message": "yaml" + }, + "outputPort": true + } + ] + } + } +}, +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:12345" + ] + } + } +}, +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [ + { + "tag": "urn:li:tag:awesome" + } + ] + } + } +}, +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "UPSERT", + "aspectName": "glossaryTerms", + "aspect": { + "json": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:ClientsAndAccounts.AccountBalance" + } + ], + "auditStamp": { + "time": 1681455600000, + "actor": "urn:li:corpuser:datahub", + "message": "yaml" + } + } + } +}, +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:jdoe", + "type": "BUSINESS_OWNER" + } + ], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + } +}, +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +} +] + diff --git a/metadata-ingestion/tests/unit/api/entities/dataproducts/golden_dataproduct_output_ports_upsert.json b/metadata-ingestion/tests/unit/api/entities/dataproducts/golden_dataproduct_output_ports_upsert.json new file mode 100644 index 0000000000..76e3103011 --- /dev/null +++ b/metadata-ingestion/tests/unit/api/entities/dataproducts/golden_dataproduct_output_ports_upsert.json @@ -0,0 +1,149 @@ +[ +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "PATCH", + "aspectName": "dataProductProperties", + "aspect": { + "json": [ + { + "op": "add", + "path": "/description", + "value": "This campaign includes Pet of the Week data." + }, + { + "op": "add", + "path": "/name", + "value": "Pet of the Week Campaign" + }, + { + "op": "add", + "path": "/assets", + "value": [ + { + "destinationUrn": "urn:li:container:DATABASE", + "created": { + "time": 1681455600000, + "actor": "urn:li:corpuser:datahub", + "message": "yaml" + }, + "outputPort": false + }, + { + "destinationUrn": "urn:li:container:SCHEMA", + "created": { + "time": 1681455600000, + "actor": "urn:li:corpuser:datahub", + "message": "yaml" + }, + "outputPort": true + }, + { + "destinationUrn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,test_feature_table_all_feature_dtypes)", + "created": { + "time": 1681455600000, + "actor": "urn:li:corpuser:datahub", + "message": "yaml" + }, + "outputPort": true + } + ] + }, + { + "op": "add", + "path": "/customProperties", + "value": { + "version": "2.0", + "classification": "pii" + } + }, + { + "op": "add", + "path": "/externalUrl", + "value": "https://github.com/datahub-project/datahub" + } + ] + } +}, +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:12345" + ] + } + } +}, +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [ + { + "tag": "urn:li:tag:awesome" + } + ] + } + } +}, +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "UPSERT", + "aspectName": "glossaryTerms", + "aspect": { + "json": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:ClientsAndAccounts.AccountBalance" + } + ], + "auditStamp": { + "time": 1681455600000, + "actor": "urn:li:corpuser:datahub", + "message": "yaml" + } + } + } +}, +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:jdoe", + "type": "BUSINESS_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + } +}, +{ + "entityType": "dataProduct", + "entityUrn": "urn:li:dataProduct:pet_of_the_week", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +} +] + diff --git a/metadata-ingestion/tests/unit/api/entities/dataproducts/test_dataproduct.py b/metadata-ingestion/tests/unit/api/entities/dataproducts/test_dataproduct.py index 9d3a50eca3..c9d50e6880 100644 --- a/metadata-ingestion/tests/unit/api/entities/dataproducts/test_dataproduct.py +++ b/metadata-ingestion/tests/unit/api/entities/dataproducts/test_dataproduct.py @@ -59,8 +59,18 @@ def check_yaml_golden_file(input_file: str, golden_file: str) -> bool: [ ("dataproduct.yaml", False, "golden_dataproduct_out.json"), ("dataproduct_upsert.yaml", True, "golden_dataproduct_out_upsert.json"), + ( + "dataproduct_output_ports.yaml", + False, + "golden_dataproduct_output_ports.json", + ), + ( + "dataproduct_output_ports_upsert.yaml", + True, + "golden_dataproduct_output_ports_upsert.json", + ), ], - ids=["update", "upsert"], + ids=["update", "upsert", "update_with_output_ports", "upsert_with_output_ports"], ) def test_dataproduct_from_yaml( pytestconfig: pytest.Config, @@ -203,3 +213,29 @@ def test_dataproduct_ownership_type_urn_patch_yaml( str(dataproduct_output_file), str(test_resources_dir / "dataproduct_ownership_type_urn.yaml"), ) + + +def test_dataproduct_output_ports_validation_not_urn( + base_mock_graph: MockDataHubGraph, +) -> None: + """Test that output_ports must be valid URNs""" + with pytest.raises(ValueError, match="Output port .* is not an urn"): + DataProduct( + id="test_product", + domain="urn:li:domain:12345", + assets=["urn:li:container:DATABASE", "not-a-urn"], + output_ports=["not-a-urn"], + ) + + +def test_dataproduct_output_ports_validation_not_in_assets( + base_mock_graph: MockDataHubGraph, +) -> None: + """Test that output_ports must be in the assets list""" + with pytest.raises(ValueError, match="Output port .* is not in asset list"): + DataProduct( + id="test_product", + domain="urn:li:domain:12345", + assets=["urn:li:container:DATABASE"], + output_ports=["urn:li:container:SCHEMA"], + )