feat(datajob/flow): add environment filter using info aspects (#10814)

This commit is contained in:
Aseem Bansal 2024-07-22 16:03:48 +05:30 committed by GitHub
parent aa97cba3e8
commit 9f570a7521
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 172 additions and 4 deletions

View File

@ -22,6 +22,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- Protobuf CLI will no longer create binary encoded protoc custom properties. Flag added `-protocProp` in case this
behavior is required.
- #10814 Data flow info and data job info aspect will produce an additional field that will require a corresponding upgrade of server. Otherwise server can reject the aspects.
- #10868 - OpenAPI V3 - Creation of aspects will need to be wrapped within a `value` key and the API is now symmetric with respect to input and outputs.
Example Global Tags Aspect:

View File

@ -3,6 +3,7 @@ from dataclasses import dataclass, field
from typing import Callable, Dict, Iterable, List, Optional, Set, cast
import datahub.emitter.mce_builder as builder
from datahub.configuration.source_common import ALL_ENV_TYPES
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
@ -110,7 +111,20 @@ class DataFlow:
)
return [tags]
def _get_env(self) -> Optional[str]:
env: Optional[str] = None
if self.cluster in ALL_ENV_TYPES:
env = self.cluster
elif self.env in ALL_ENV_TYPES:
env = self.env
else:
logger.warning(
f"cluster {self.cluster} and {self.env} is not a valid environment type so Environment filter won't work."
)
return env
def generate_mce(self) -> MetadataChangeEventClass:
env = self._get_env()
flow_mce = MetadataChangeEventClass(
proposedSnapshot=DataFlowSnapshotClass(
urn=str(self.urn),
@ -120,6 +134,7 @@ class DataFlow:
description=self.description,
customProperties=self.properties,
externalUrl=self.url,
env=env,
),
*self.generate_ownership_aspect(),
*self.generate_tags_aspect(),
@ -130,6 +145,7 @@ class DataFlow:
return flow_mce
def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
env = self._get_env()
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataFlowInfoClass(
@ -137,6 +153,7 @@ class DataFlow:
description=self.description,
customProperties=self.properties,
externalUrl=self.url,
env=env,
),
)
yield mcp

View File

@ -1,7 +1,9 @@
import logging
from dataclasses import dataclass, field
from typing import Callable, Dict, Iterable, List, Optional, Set
import datahub.emitter.mce_builder as builder
from datahub.configuration.source_common import ALL_ENV_TYPES
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
@ -22,6 +24,8 @@ from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
logger = logging.getLogger(__name__)
@dataclass
class DataJob:
@ -103,6 +107,13 @@ class DataJob:
def generate_mcp(
self, materialize_iolets: bool = True
) -> Iterable[MetadataChangeProposalWrapper]:
env: Optional[str] = None
if self.flow_urn.cluster in ALL_ENV_TYPES:
env = self.flow_urn.cluster
else:
logger.warning(
f"cluster {self.flow_urn.cluster} is not a valid environment type so Environment filter won't work."
)
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataJobInfoClass(
@ -111,6 +122,7 @@ class DataJob:
description=self.description,
customProperties=self.properties,
externalUrl=self.url,
env=env,
),
)
yield mcp

View File

@ -7,7 +7,8 @@
"aspect": {
"json": {
"customProperties": {},
"name": "postgres"
"name": "postgres",
"env": "PROD"
}
},
"systemMetadata": {
@ -68,7 +69,8 @@
"name": "postgres",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
},
"systemMetadata": {

View File

@ -7,7 +7,8 @@
"aspect": {
"json": {
"customProperties": {},
"name": "postgres"
"name": "postgres",
"env": "PROD"
}
},
"systemMetadata": {
@ -68,7 +69,8 @@
"name": "postgres",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
},
"systemMetadata": {

View File

@ -4,6 +4,7 @@ import com.linkedin.common.CustomProperties
import com.linkedin.common.ExternalReference
import com.linkedin.common.Urn
import com.linkedin.common.TimeStamp
import com.linkedin.common.FabricType
/**
* Information about a Data processing flow
@ -63,4 +64,15 @@ record DataFlowInfo includes CustomProperties, ExternalReference {
}
}
lastModified: optional TimeStamp
/**
* Environment for this flow
*/
@Searchable = {
"fieldType": "KEYWORD",
"addToFilters": true,
"filterNameOverride": "Environment",
"queryByDefault": false
}
env: optional FabricType
}

View File

@ -5,6 +5,7 @@ import com.linkedin.common.CustomProperties
import com.linkedin.common.ExternalReference
import com.linkedin.common.DataFlowUrn
import com.linkedin.common.TimeStamp
import com.linkedin.common.FabricType
/**
* Information about a Data processing job
@ -72,4 +73,15 @@ record DataJobInfo includes CustomProperties, ExternalReference {
*/
@deprecated = "Use Data Process Instance model, instead"
status: optional JobStatus
/**
* Environment for this job
*/
@Searchable = {
"fieldType": "KEYWORD",
"addToFilters": true,
"filterNameOverride": "Environment",
"queryByDefault": false
}
env: optional FabricType
}

View File

@ -1491,6 +1491,17 @@
"fieldType" : "DATETIME"
}
}
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this flow",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataFlowInfo"
@ -1587,6 +1598,17 @@
"doc" : "Status of the job - Deprecated for Data Process Instance model.",
"optional" : true,
"deprecated" : "Use Data Process Instance model, instead"
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this job",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataJobInfo"

View File

@ -1541,6 +1541,17 @@
"fieldType" : "DATETIME"
}
}
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this flow",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataFlowInfo"
@ -1637,6 +1648,17 @@
"doc" : "Status of the job - Deprecated for Data Process Instance model.",
"optional" : true,
"deprecated" : "Use Data Process Instance model, instead"
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this job",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataJobInfo"

View File

@ -1228,6 +1228,17 @@
"fieldType" : "DATETIME"
}
}
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this flow",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataFlowInfo"
@ -1324,6 +1335,17 @@
"doc" : "Status of the job - Deprecated for Data Process Instance model.",
"optional" : true,
"deprecated" : "Use Data Process Instance model, instead"
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this job",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataJobInfo"

View File

@ -1228,6 +1228,17 @@
"fieldType" : "DATETIME"
}
}
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this flow",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataFlowInfo"
@ -1324,6 +1335,17 @@
"doc" : "Status of the job - Deprecated for Data Process Instance model.",
"optional" : true,
"deprecated" : "Use Data Process Instance model, instead"
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this job",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataJobInfo"

View File

@ -1541,6 +1541,17 @@
"fieldType" : "DATETIME"
}
}
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this flow",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataFlowInfo"
@ -1637,6 +1648,17 @@
"doc" : "Status of the job - Deprecated for Data Process Instance model.",
"optional" : true,
"deprecated" : "Use Data Process Instance model, instead"
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this job",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataJobInfo"