diff --git a/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv b/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv index 9023699f38..d53f68f218 100644 --- a/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv +++ b/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv @@ -1,4 +1,4 @@ resource,subresource,glossary_terms,tags,owners,ownership_type,description -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,Engineering "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo! "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 9b069f148f..f79e792b54 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -15,6 +15,7 @@ from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.emitter.serialization_helper import post_json_transform from datahub.metadata.schema_classes import ( DatasetUsageStatisticsClass, + DomainsClass, GlobalTagsClass, GlossaryTermsClass, OwnershipClass, @@ -196,6 +197,13 @@ class DataHubGraph(DatahubRestEmitter): aspect_type=GlossaryTermsClass, ) + def get_domain(self, entity_urn: str) -> Optional[DomainsClass]: + return self.get_aspect_v2( + entity_urn=entity_urn, + aspect="domains", + aspect_type=DomainsClass, + ) + def get_usage_aspects_from_urn( self, entity_urn: str, start_timestamp: int, end_timestamp: int ) -> Optional[List[DatasetUsageStatisticsClass]]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index 8a18279260..1ac8096873 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -18,6 +18,7 @@ from datahub.ingestion.source_config.csv_enricher import CSVEnricherConfig from datahub.metadata.schema_classes import ( AuditStampClass, ChangeTypeClass, + DomainsClass, EditableDatasetPropertiesClass, EditableSchemaFieldInfoClass, EditableSchemaMetadataClass, @@ -39,6 +40,7 @@ TAGS_ASPECT_NAME = "globalTags" OWNERSHIP_ASPECT_NAME = "ownership" EDITABLE_DATASET_PROPERTIES_ASPECT_NAME = "editableDatasetProperties" ACTOR = "urn:li:corpuser:ingestion" +DOMAIN_ASPECT_NAME = "domains" def get_audit_stamp() -> AuditStampClass: @@ -69,6 +71,7 @@ class SubResourceRow: term_associations: List[GlossaryTermAssociationClass] tag_associations: List[TagAssociationClass] description: Optional[str] + domain: Optional[str] @dataclass @@ -78,6 +81,7 @@ class CSVEnricherReport(SourceReport): num_owners_workunits_produced: int = 0 num_description_workunits_produced: int = 0 num_editable_schema_metadata_workunits_produced: int = 0 + num_domain_workunits_produced: int = 0 @platform_name("CSV") @@ -85,17 +89,17 @@ class CSVEnricherReport(SourceReport): @support_status(SupportStatus.INCUBATING) class CSVEnricherSource(Source): """ - This plugin is used to apply glossary terms, tags and owners at the entity level. It can also be used to apply tags + This plugin is used to apply glossary terms, tags, owners and domain at the entity level. It can also be used to apply tags and glossary terms at the column level. These values are read from a CSV file and can be used to either overwrite or append the above aspects to entities. The format of the CSV must be like so, with a few example rows. - |resource |subresource|glossary_terms |tags |owners |ownership_type |description | - |----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------| - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description| - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | + |resource |subresource|glossary_terms |tags |owners |ownership_type |description |domain | + |----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------|---------------------------| + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description|urn:li:domain:Engineering | + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | | + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | | Note that the first row does not have a subresource populated. That means any glossary terms, tags, and owners will be applied at the entity field. If a subresource IS populated (as it is for the second and third rows), glossary @@ -254,6 +258,38 @@ class CSVEnricherSource(Source): ) return owners_wu + def get_resource_domain_work_unit( + self, + entity_urn: str, + entity_type: str, + domain: Optional[str], + ) -> Optional[MetadataWorkUnit]: + # Check if there is a domain to add. If not, return None. + if not domain: + return None + + current_domain: Optional[DomainsClass] = None + if self.ctx.graph and not self.should_overwrite: + # Get the existing domain for the entity from the DataHub graph + current_domain = self.ctx.graph.get_domain(entity_urn=entity_urn) + + if not current_domain: + # If we want to overwrite or there is no existing domain, create a new object + current_domain = DomainsClass([domain]) + + domain_mcpw: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityType=entity_type, + entityUrn=entity_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName=DOMAIN_ASPECT_NAME, + aspect=current_domain, + ) + domain_wu: MetadataWorkUnit = MetadataWorkUnit( + id=f"{entity_urn}-{DOMAIN_ASPECT_NAME}", + mcp=domain_mcpw, + ) + return domain_wu + def get_resource_description_work_unit( self, entity_urn: str, @@ -308,6 +344,7 @@ class CSVEnricherSource(Source): term_associations: List[GlossaryTermAssociationClass], tag_associations: List[TagAssociationClass], owners: List[OwnerClass], + domain: Optional[str], description: Optional[str], ) -> Iterable[MetadataWorkUnit]: maybe_terms_wu: Optional[ @@ -344,6 +381,18 @@ class CSVEnricherSource(Source): self.report.report_workunit(maybe_owners_wu) yield maybe_owners_wu + maybe_domain_wu: Optional[ + MetadataWorkUnit + ] = self.get_resource_domain_work_unit( + entity_urn=entity_urn, + entity_type=entity_type, + domain=domain, + ) + if maybe_domain_wu: + self.report.num_domain_workunits_produced += 1 + self.report.report_workunit(maybe_domain_wu) + yield maybe_domain_wu + maybe_description_wu: Optional[ MetadataWorkUnit ] = self.get_resource_description_work_unit( @@ -512,6 +561,7 @@ class CSVEnricherSource(Source): # Sanitizing the terms string to just get the list of term urns terms_array_string = sanitize_array_string(row["glossary_terms"]) term_urns: List[str] = terms_array_string.split(self.config.array_delimiter) + term_associations: List[GlossaryTermAssociationClass] = [ GlossaryTermAssociationClass(term) for term in term_urns ] @@ -524,6 +574,7 @@ class CSVEnricherSource(Source): # Sanitizing the tags string to just get the list of tag urns tags_array_string = sanitize_array_string(row["tags"]) tag_urns: List[str] = tags_array_string.split(self.config.array_delimiter) + tag_associations: List[TagAssociationClass] = [ TagAssociationClass(tag) for tag in tag_urns ] @@ -546,6 +597,7 @@ class CSVEnricherSource(Source): # Sanitizing the owners string to just get the list of owner urns owners_array_string: str = sanitize_array_string(row["owners"]) owner_urns: List[str] = owners_array_string.split(self.config.array_delimiter) + owners: List[OwnerClass] = [ OwnerClass(owner_urn, type=ownership_type) for owner_urn in owner_urns ] @@ -576,6 +628,12 @@ class CSVEnricherSource(Source): row, is_resource_row ) + domain: Optional[str] = ( + row["domain"] + if row["domain"] and entity_type == DATASET_ENTITY_TYPE + else None + ) + description: Optional[str] = ( row["description"] if row["description"] and entity_type == DATASET_ENTITY_TYPE @@ -589,6 +647,7 @@ class CSVEnricherSource(Source): term_associations=term_associations, tag_associations=tag_associations, owners=owners, + domain=domain, description=description, ): yield wu @@ -611,6 +670,7 @@ class CSVEnricherSource(Source): term_associations=term_associations, tag_associations=tag_associations, description=description, + domain=domain, ) ) diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json index 610e083e65..bd3f390f2b 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json @@ -56,6 +56,25 @@ "properties": null } }, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "value": "{\"domains\":[\"urn:li:domain:Engineering\"]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "test-csv-enricher", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, { "auditHeader": null, "entityType": "dataset", diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv index 9023699f38..d92dcbf717 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv @@ -1,4 +1,4 @@ -resource,subresource,glossary_terms,tags,owners,ownership_type,description -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description +resource,subresource,glossary_terms,tags,owners,ownership_type,description,domain +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo! "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_csv_enricher_source.py b/metadata-ingestion/tests/unit/test_csv_enricher_source.py index d331f127c0..c294605f72 100644 --- a/metadata-ingestion/tests/unit/test_csv_enricher_source.py +++ b/metadata-ingestion/tests/unit/test_csv_enricher_source.py @@ -185,3 +185,21 @@ def test_get_resource_description_work_unit_produced(): DATASET_URN, DATASET_ENTITY_TYPE, new_description ) assert maybe_description_wu + + +def test_get_resource_domain_no_domain(): + source = create_mocked_csv_enricher_source() + new_domain = None + maybe_domain_wu = source.get_resource_domain_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, new_domain + ) + assert not maybe_domain_wu + + +def test_get_resource_domain_work_unit_produced(): + source = create_mocked_csv_enricher_source() + new_domain = "domain" + maybe_domain_wu = source.get_resource_domain_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, new_domain + ) + assert maybe_domain_wu