Adds support for Domains in CSV source (#5372)

This commit is contained in:
Pedro Silva 2022-07-15 09:50:41 +01:00 committed by GitHub
parent ef02bf4b04
commit b2edd44b6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 114 additions and 9 deletions

View File

@ -1,4 +1,4 @@
resource,subresource,glossary_terms,tags,owners,ownership_type,description resource,subresource,glossary_terms,tags,owners,ownership_type,description
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,Engineering
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo! "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo!
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar?
Can't render this file because it has a wrong number of fields in line 2.

View File

@ -15,6 +15,7 @@ from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.emitter.serialization_helper import post_json_transform from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
DatasetUsageStatisticsClass, DatasetUsageStatisticsClass,
DomainsClass,
GlobalTagsClass, GlobalTagsClass,
GlossaryTermsClass, GlossaryTermsClass,
OwnershipClass, OwnershipClass,
@ -196,6 +197,13 @@ class DataHubGraph(DatahubRestEmitter):
aspect_type=GlossaryTermsClass, aspect_type=GlossaryTermsClass,
) )
def get_domain(self, entity_urn: str) -> Optional[DomainsClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="domains",
aspect_type=DomainsClass,
)
def get_usage_aspects_from_urn( def get_usage_aspects_from_urn(
self, entity_urn: str, start_timestamp: int, end_timestamp: int self, entity_urn: str, start_timestamp: int, end_timestamp: int
) -> Optional[List[DatasetUsageStatisticsClass]]: ) -> Optional[List[DatasetUsageStatisticsClass]]:

View File

@ -18,6 +18,7 @@ from datahub.ingestion.source_config.csv_enricher import CSVEnricherConfig
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
AuditStampClass, AuditStampClass,
ChangeTypeClass, ChangeTypeClass,
DomainsClass,
EditableDatasetPropertiesClass, EditableDatasetPropertiesClass,
EditableSchemaFieldInfoClass, EditableSchemaFieldInfoClass,
EditableSchemaMetadataClass, EditableSchemaMetadataClass,
@ -39,6 +40,7 @@ TAGS_ASPECT_NAME = "globalTags"
OWNERSHIP_ASPECT_NAME = "ownership" OWNERSHIP_ASPECT_NAME = "ownership"
EDITABLE_DATASET_PROPERTIES_ASPECT_NAME = "editableDatasetProperties" EDITABLE_DATASET_PROPERTIES_ASPECT_NAME = "editableDatasetProperties"
ACTOR = "urn:li:corpuser:ingestion" ACTOR = "urn:li:corpuser:ingestion"
DOMAIN_ASPECT_NAME = "domains"
def get_audit_stamp() -> AuditStampClass: def get_audit_stamp() -> AuditStampClass:
@ -69,6 +71,7 @@ class SubResourceRow:
term_associations: List[GlossaryTermAssociationClass] term_associations: List[GlossaryTermAssociationClass]
tag_associations: List[TagAssociationClass] tag_associations: List[TagAssociationClass]
description: Optional[str] description: Optional[str]
domain: Optional[str]
@dataclass @dataclass
@ -78,6 +81,7 @@ class CSVEnricherReport(SourceReport):
num_owners_workunits_produced: int = 0 num_owners_workunits_produced: int = 0
num_description_workunits_produced: int = 0 num_description_workunits_produced: int = 0
num_editable_schema_metadata_workunits_produced: int = 0 num_editable_schema_metadata_workunits_produced: int = 0
num_domain_workunits_produced: int = 0
@platform_name("CSV") @platform_name("CSV")
@ -85,17 +89,17 @@ class CSVEnricherReport(SourceReport):
@support_status(SupportStatus.INCUBATING) @support_status(SupportStatus.INCUBATING)
class CSVEnricherSource(Source): class CSVEnricherSource(Source):
""" """
This plugin is used to apply glossary terms, tags and owners at the entity level. It can also be used to apply tags This plugin is used to apply glossary terms, tags, owners and domain at the entity level. It can also be used to apply tags
and glossary terms at the column level. These values are read from a CSV file and can be used to either overwrite and glossary terms at the column level. These values are read from a CSV file and can be used to either overwrite
or append the above aspects to entities. or append the above aspects to entities.
The format of the CSV must be like so, with a few example rows. The format of the CSV must be like so, with a few example rows.
|resource |subresource|glossary_terms |tags |owners |ownership_type |description | |resource |subresource|glossary_terms |tags |owners |ownership_type |description |domain |
|----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------| |----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------|---------------------------|
|urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description| |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description|urn:li:domain:Engineering |
|urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | |
|urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | |
Note that the first row does not have a subresource populated. That means any glossary terms, tags, and owners will Note that the first row does not have a subresource populated. That means any glossary terms, tags, and owners will
be applied at the entity field. If a subresource IS populated (as it is for the second and third rows), glossary be applied at the entity field. If a subresource IS populated (as it is for the second and third rows), glossary
@ -254,6 +258,38 @@ class CSVEnricherSource(Source):
) )
return owners_wu return owners_wu
def get_resource_domain_work_unit(
self,
entity_urn: str,
entity_type: str,
domain: Optional[str],
) -> Optional[MetadataWorkUnit]:
# Check if there is a domain to add. If not, return None.
if not domain:
return None
current_domain: Optional[DomainsClass] = None
if self.ctx.graph and not self.should_overwrite:
# Get the existing domain for the entity from the DataHub graph
current_domain = self.ctx.graph.get_domain(entity_urn=entity_urn)
if not current_domain:
# If we want to overwrite or there is no existing domain, create a new object
current_domain = DomainsClass([domain])
domain_mcpw: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType=entity_type,
entityUrn=entity_urn,
changeType=ChangeTypeClass.UPSERT,
aspectName=DOMAIN_ASPECT_NAME,
aspect=current_domain,
)
domain_wu: MetadataWorkUnit = MetadataWorkUnit(
id=f"{entity_urn}-{DOMAIN_ASPECT_NAME}",
mcp=domain_mcpw,
)
return domain_wu
def get_resource_description_work_unit( def get_resource_description_work_unit(
self, self,
entity_urn: str, entity_urn: str,
@ -308,6 +344,7 @@ class CSVEnricherSource(Source):
term_associations: List[GlossaryTermAssociationClass], term_associations: List[GlossaryTermAssociationClass],
tag_associations: List[TagAssociationClass], tag_associations: List[TagAssociationClass],
owners: List[OwnerClass], owners: List[OwnerClass],
domain: Optional[str],
description: Optional[str], description: Optional[str],
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
maybe_terms_wu: Optional[ maybe_terms_wu: Optional[
@ -344,6 +381,18 @@ class CSVEnricherSource(Source):
self.report.report_workunit(maybe_owners_wu) self.report.report_workunit(maybe_owners_wu)
yield maybe_owners_wu yield maybe_owners_wu
maybe_domain_wu: Optional[
MetadataWorkUnit
] = self.get_resource_domain_work_unit(
entity_urn=entity_urn,
entity_type=entity_type,
domain=domain,
)
if maybe_domain_wu:
self.report.num_domain_workunits_produced += 1
self.report.report_workunit(maybe_domain_wu)
yield maybe_domain_wu
maybe_description_wu: Optional[ maybe_description_wu: Optional[
MetadataWorkUnit MetadataWorkUnit
] = self.get_resource_description_work_unit( ] = self.get_resource_description_work_unit(
@ -512,6 +561,7 @@ class CSVEnricherSource(Source):
# Sanitizing the terms string to just get the list of term urns # Sanitizing the terms string to just get the list of term urns
terms_array_string = sanitize_array_string(row["glossary_terms"]) terms_array_string = sanitize_array_string(row["glossary_terms"])
term_urns: List[str] = terms_array_string.split(self.config.array_delimiter) term_urns: List[str] = terms_array_string.split(self.config.array_delimiter)
term_associations: List[GlossaryTermAssociationClass] = [ term_associations: List[GlossaryTermAssociationClass] = [
GlossaryTermAssociationClass(term) for term in term_urns GlossaryTermAssociationClass(term) for term in term_urns
] ]
@ -524,6 +574,7 @@ class CSVEnricherSource(Source):
# Sanitizing the tags string to just get the list of tag urns # Sanitizing the tags string to just get the list of tag urns
tags_array_string = sanitize_array_string(row["tags"]) tags_array_string = sanitize_array_string(row["tags"])
tag_urns: List[str] = tags_array_string.split(self.config.array_delimiter) tag_urns: List[str] = tags_array_string.split(self.config.array_delimiter)
tag_associations: List[TagAssociationClass] = [ tag_associations: List[TagAssociationClass] = [
TagAssociationClass(tag) for tag in tag_urns TagAssociationClass(tag) for tag in tag_urns
] ]
@ -546,6 +597,7 @@ class CSVEnricherSource(Source):
# Sanitizing the owners string to just get the list of owner urns # Sanitizing the owners string to just get the list of owner urns
owners_array_string: str = sanitize_array_string(row["owners"]) owners_array_string: str = sanitize_array_string(row["owners"])
owner_urns: List[str] = owners_array_string.split(self.config.array_delimiter) owner_urns: List[str] = owners_array_string.split(self.config.array_delimiter)
owners: List[OwnerClass] = [ owners: List[OwnerClass] = [
OwnerClass(owner_urn, type=ownership_type) for owner_urn in owner_urns OwnerClass(owner_urn, type=ownership_type) for owner_urn in owner_urns
] ]
@ -576,6 +628,12 @@ class CSVEnricherSource(Source):
row, is_resource_row row, is_resource_row
) )
domain: Optional[str] = (
row["domain"]
if row["domain"] and entity_type == DATASET_ENTITY_TYPE
else None
)
description: Optional[str] = ( description: Optional[str] = (
row["description"] row["description"]
if row["description"] and entity_type == DATASET_ENTITY_TYPE if row["description"] and entity_type == DATASET_ENTITY_TYPE
@ -589,6 +647,7 @@ class CSVEnricherSource(Source):
term_associations=term_associations, term_associations=term_associations,
tag_associations=tag_associations, tag_associations=tag_associations,
owners=owners, owners=owners,
domain=domain,
description=description, description=description,
): ):
yield wu yield wu
@ -611,6 +670,7 @@ class CSVEnricherSource(Source):
term_associations=term_associations, term_associations=term_associations,
tag_associations=tag_associations, tag_associations=tag_associations,
description=description, description=description,
domain=domain,
) )
) )

View File

@ -56,6 +56,25 @@
"properties": null "properties": null
} }
}, },
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"value": "{\"domains\":[\"urn:li:domain:Engineering\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "test-csv-enricher",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{ {
"auditHeader": null, "auditHeader": null,
"entityType": "dataset", "entityType": "dataset",

View File

@ -1,4 +1,4 @@
resource,subresource,glossary_terms,tags,owners,ownership_type,description resource,subresource,glossary_terms,tags,owners,ownership_type,description,domain
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo! "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo!
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar?
Can't render this file because it has a wrong number of fields in line 3.

View File

@ -185,3 +185,21 @@ def test_get_resource_description_work_unit_produced():
DATASET_URN, DATASET_ENTITY_TYPE, new_description DATASET_URN, DATASET_ENTITY_TYPE, new_description
) )
assert maybe_description_wu assert maybe_description_wu
def test_get_resource_domain_no_domain():
source = create_mocked_csv_enricher_source()
new_domain = None
maybe_domain_wu = source.get_resource_domain_work_unit(
DATASET_URN, DATASET_ENTITY_TYPE, new_domain
)
assert not maybe_domain_wu
def test_get_resource_domain_work_unit_produced():
source = create_mocked_csv_enricher_source()
new_domain = "domain"
maybe_domain_wu = source.get_resource_domain_work_unit(
DATASET_URN, DATASET_ENTITY_TYPE, new_domain
)
assert maybe_domain_wu