From 9f570a7521a0f18a6fa1a27faa3788f04c80dbe9 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Mon, 22 Jul 2024 16:03:48 +0530 Subject: [PATCH] feat(datajob/flow): add environment filter using info aspects (#10814) --- docs/how/updating-datahub.md | 1 + .../datahub/api/entities/datajob/dataflow.py | 17 ++++++++++++++ .../datahub/api/entities/datajob/datajob.py | 12 ++++++++++ ...nowflake_empty_connection_user_golden.json | 6 +++-- .../fivetran/fivetran_snowflake_golden.json | 6 +++-- .../com/linkedin/datajob/DataFlowInfo.pdl | 12 ++++++++++ .../com/linkedin/datajob/DataJobInfo.pdl | 12 ++++++++++ .../com.linkedin.entity.aspects.snapshot.json | 22 +++++++++++++++++++ ...com.linkedin.entity.entities.snapshot.json | 22 +++++++++++++++++++ .../com.linkedin.entity.runs.snapshot.json | 22 +++++++++++++++++++ ...nkedin.operations.operations.snapshot.json | 22 +++++++++++++++++++ ...m.linkedin.platform.platform.snapshot.json | 22 +++++++++++++++++++ 12 files changed, 172 insertions(+), 4 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index ef9990ca38..ffceb7a5d1 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -22,6 +22,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - Protobuf CLI will no longer create binary encoded protoc custom properties. Flag added `-protocProp` in case this behavior is required. +- #10814 Data flow info and data job info aspect will produce an additional field that will require a corresponding upgrade of server. Otherwise server can reject the aspects. - #10868 - OpenAPI V3 - Creation of aspects will need to be wrapped within a `value` key and the API is now symmetric with respect to input and outputs. Example Global Tags Aspect: diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py index cb2c536bba..3870e6978e 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py @@ -3,6 +3,7 @@ from dataclasses import dataclass, field from typing import Callable, Dict, Iterable, List, Optional, Set, cast import datahub.emitter.mce_builder as builder +from datahub.configuration.source_common import ALL_ENV_TYPES from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.metadata.schema_classes import ( @@ -110,7 +111,20 @@ class DataFlow: ) return [tags] + def _get_env(self) -> Optional[str]: + env: Optional[str] = None + if self.cluster in ALL_ENV_TYPES: + env = self.cluster + elif self.env in ALL_ENV_TYPES: + env = self.env + else: + logger.warning( + f"cluster {self.cluster} and {self.env} is not a valid environment type so Environment filter won't work." + ) + return env + def generate_mce(self) -> MetadataChangeEventClass: + env = self._get_env() flow_mce = MetadataChangeEventClass( proposedSnapshot=DataFlowSnapshotClass( urn=str(self.urn), @@ -120,6 +134,7 @@ class DataFlow: description=self.description, customProperties=self.properties, externalUrl=self.url, + env=env, ), *self.generate_ownership_aspect(), *self.generate_tags_aspect(), @@ -130,6 +145,7 @@ class DataFlow: return flow_mce def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: + env = self._get_env() mcp = MetadataChangeProposalWrapper( entityUrn=str(self.urn), aspect=DataFlowInfoClass( @@ -137,6 +153,7 @@ class DataFlow: description=self.description, customProperties=self.properties, externalUrl=self.url, + env=env, ), ) yield mcp diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py index e56e9f059d..514f0a5093 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py @@ -1,7 +1,9 @@ +import logging from dataclasses import dataclass, field from typing import Callable, Dict, Iterable, List, Optional, Set import datahub.emitter.mce_builder as builder +from datahub.configuration.source_common import ALL_ENV_TYPES from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.metadata.schema_classes import ( @@ -22,6 +24,8 @@ from datahub.utilities.urns.data_flow_urn import DataFlowUrn from datahub.utilities.urns.data_job_urn import DataJobUrn from datahub.utilities.urns.dataset_urn import DatasetUrn +logger = logging.getLogger(__name__) + @dataclass class DataJob: @@ -103,6 +107,13 @@ class DataJob: def generate_mcp( self, materialize_iolets: bool = True ) -> Iterable[MetadataChangeProposalWrapper]: + env: Optional[str] = None + if self.flow_urn.cluster in ALL_ENV_TYPES: + env = self.flow_urn.cluster + else: + logger.warning( + f"cluster {self.flow_urn.cluster} is not a valid environment type so Environment filter won't work." + ) mcp = MetadataChangeProposalWrapper( entityUrn=str(self.urn), aspect=DataJobInfoClass( @@ -111,6 +122,7 @@ class DataJob: description=self.description, customProperties=self.properties, externalUrl=self.url, + env=env, ), ) yield mcp diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json index d2ae437605..29b186978a 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json @@ -7,7 +7,8 @@ "aspect": { "json": { "customProperties": {}, - "name": "postgres" + "name": "postgres", + "env": "PROD" } }, "systemMetadata": { @@ -68,7 +69,8 @@ "name": "postgres", "type": { "string": "COMMAND" - } + }, + "env": "PROD" } }, "systemMetadata": { diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json index 59e545183a..0cd3bb83f9 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json @@ -7,7 +7,8 @@ "aspect": { "json": { "customProperties": {}, - "name": "postgres" + "name": "postgres", + "env": "PROD" } }, "systemMetadata": { @@ -68,7 +69,8 @@ "name": "postgres", "type": { "string": "COMMAND" - } + }, + "env": "PROD" } }, "systemMetadata": { diff --git a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl index 2ff3e8cd93..766181df01 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl @@ -4,6 +4,7 @@ import com.linkedin.common.CustomProperties import com.linkedin.common.ExternalReference import com.linkedin.common.Urn import com.linkedin.common.TimeStamp +import com.linkedin.common.FabricType /** * Information about a Data processing flow @@ -63,4 +64,15 @@ record DataFlowInfo includes CustomProperties, ExternalReference { } } lastModified: optional TimeStamp + + /** + * Environment for this flow + */ + @Searchable = { + "fieldType": "KEYWORD", + "addToFilters": true, + "filterNameOverride": "Environment", + "queryByDefault": false + } + env: optional FabricType } diff --git a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl index 250fb76003..46879e359e 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl @@ -5,6 +5,7 @@ import com.linkedin.common.CustomProperties import com.linkedin.common.ExternalReference import com.linkedin.common.DataFlowUrn import com.linkedin.common.TimeStamp +import com.linkedin.common.FabricType /** * Information about a Data processing job @@ -72,4 +73,15 @@ record DataJobInfo includes CustomProperties, ExternalReference { */ @deprecated = "Use Data Process Instance model, instead" status: optional JobStatus + + /** + * Environment for this job + */ + @Searchable = { + "fieldType": "KEYWORD", + "addToFilters": true, + "filterNameOverride": "Environment", + "queryByDefault": false + } + env: optional FabricType } diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json index 72578be8c5..eb92cf75a4 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json @@ -1491,6 +1491,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1587,6 +1598,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index 9b93f1184c..0c983a021d 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -1541,6 +1541,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1637,6 +1648,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json index 18ef55011e..4af65cdb48 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json @@ -1228,6 +1228,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1324,6 +1335,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json index cf05978820..e788c5d28c 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json @@ -1228,6 +1228,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1324,6 +1335,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json index 15f16dd2ea..dbdba0040d 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json @@ -1541,6 +1541,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1637,6 +1648,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo"