From 82ca92f8f99fab713d974ece65d8826f08167c59 Mon Sep 17 00:00:00 2001 From: Aditya Radhakrishnan Date: Tue, 21 Jun 2022 23:55:39 -0700 Subject: [PATCH] feat(ingest): adds csv enricher ingestion source (#5221) --- .../demo_data/csv_enricher_demo_data.csv | 4 + .../csv_enricher_to_datahub_rest.dhub.yml | 16 + metadata-ingestion/setup.py | 1 + .../datahub/ingestion/source/csv_enricher.py | 547 ++++++++++++++++++ .../ingestion/source_config/csv_enricher.py | 18 + .../src/datahub/utilities/urns/dataset_urn.py | 17 + .../csv-enricher/csv_enricher_golden.json | 78 +++ .../csv-enricher/csv_enricher_test_data.csv | 4 + .../csv-enricher/test_csv_enricher.py | 57 ++ .../tests/unit/test_csv_enricher_source.py | 169 ++++++ 10 files changed, 911 insertions(+) create mode 100644 metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv create mode 100644 metadata-ingestion/examples/recipes/csv_enricher_to_datahub_rest.dhub.yml create mode 100644 metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_config/csv_enricher.py create mode 100644 metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json create mode 100644 metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv create mode 100644 metadata-ingestion/tests/integration/csv-enricher/test_csv_enricher.py create mode 100644 metadata-ingestion/tests/unit/test_csv_enricher_source.py diff --git a/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv b/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv new file mode 100644 index 0000000000..0719ae1bfe --- /dev/null +++ b/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv @@ -0,0 +1,4 @@ +resource,subresource,glossary_terms,tags,owners +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:SavingAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe] +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],, +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy], \ No newline at end of file diff --git a/metadata-ingestion/examples/recipes/csv_enricher_to_datahub_rest.dhub.yml b/metadata-ingestion/examples/recipes/csv_enricher_to_datahub_rest.dhub.yml new file mode 100644 index 0000000000..1138cc4445 --- /dev/null +++ b/metadata-ingestion/examples/recipes/csv_enricher_to_datahub_rest.dhub.yml @@ -0,0 +1,16 @@ +--- +# see https://datahubproject.io/docs/metadata-ingestion/source_docs/csv for complete documentation +source: + type: "csv-enricher" + config: + filename: "/Users/adityaradhakrishnan/code/datahub-fork/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv" + should_overwrite: false + delimiter: "," + array_delimiter: "|" + + +# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation +sink: + type: "datahub-rest" + config: + server: "http://localhost:8080" \ No newline at end of file diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index caa16ca700..ed67ce3841 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -447,6 +447,7 @@ if is_py37_or_newer: entry_points = { "console_scripts": ["datahub = datahub.entrypoints:main"], "datahub.ingestion.source.plugins": [ + "csv-enricher = datahub.ingestion.source.csv_enricher:CSVEnricherSource", "file = datahub.ingestion.source.file:GenericFileSource", "sqlalchemy = datahub.ingestion.source.sql.sql_generic:SQLAlchemyGenericSource", "athena = datahub.ingestion.source.sql.athena:AthenaSource", diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py new file mode 100644 index 0000000000..1cbe446d41 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -0,0 +1,547 @@ +import csv +import time +from dataclasses import dataclass +from typing import Dict, Iterable, List, Optional, Set, Tuple + +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SupportStatus, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source_config.csv_enricher import CSVEnricherConfig +from datahub.metadata.schema_classes import ( + AuditStampClass, + ChangeTypeClass, + EditableSchemaFieldInfoClass, + EditableSchemaMetadataClass, + GlobalTagsClass, + GlossaryTermAssociationClass, + GlossaryTermsClass, + OwnerClass, + OwnershipClass, + OwnershipTypeClass, + TagAssociationClass, +) +from datahub.utilities.urns.dataset_urn import DatasetUrn +from datahub.utilities.urns.urn import Urn + +SCHEMA_ASPECT_NAME = "editableSchemaMetadata" +DATASET_ENTITY_TYPE = "dataset" +GLOSSARY_TERMS_ASPECT_NAME = "glossaryTerms" +TAGS_ASPECT_NAME = "globalTags" +OWNERSHIP_ASPECT_NAME = "ownership" +ACTOR = "urn:li:corpuser:ingestion" + + +def get_audit_stamp() -> AuditStampClass: + now = int(time.time() * 1000) + return AuditStampClass(now, ACTOR) + + +def maybe_remove_prefix(s: str, prefix: str) -> str: + if not s.startswith(prefix): + return s + return s[len(prefix) :] + + +def maybe_remove_suffix(s: str, suffix: str) -> str: + if not s.endswith(suffix): + return s + return s[: -len(suffix)] + + +def sanitize_array_string(s: str) -> str: + return maybe_remove_suffix(maybe_remove_prefix(s, "["), "]") + + +@dataclass +class SubResourceRow: + entity_urn: str + field_path: str + term_associations: List[GlossaryTermAssociationClass] + tag_associations: List[TagAssociationClass] + + +@dataclass +class CSVEnricherReport(SourceReport): + num_glossary_term_workunits_produced: int = 0 + num_tag_workunits_produced: int = 0 + num_owners_workunits_produced: int = 0 + num_editable_schema_metadata_workunits_produced: int = 0 + + +@platform_name("CSV") +@config_class(CSVEnricherConfig) +@support_status(SupportStatus.INCUBATING) +class CSVEnricherSource(Source): + """ + This plugin is used to apply glossary terms, tags and owners at the entity level. It can also be used to apply tags + and glossary terms at the column level. These values are read from a CSV file and can be used to either overwrite + or append the above aspects to entities. + + The format of the CSV must be like so, with a few example rows. + + | resource | subresource | glossary_terms | tags | owners | + |--------------------------------------------------------------------------|-------------|---------------------------------------|---------------------|-------------------------------------------------| + | urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD) | | [urn:li:glossaryTerm:AccountBalance] | [urn:li:tag:Legacy] | [urn:li:corpuser:datahub] | + | urn:li:dataset:(urn:li:dataPlatform:bigquery,SampleBigqueryDataset,PROD) | field_foo | [urn:li:glossaryTerm:CustomerAccount] | | | + | urn:li:dataset:(urn:li:dataPlatform:redshift,SampleRedshiftDataset,PROD) | field_bar | | [urn:li:tag:Legacy] | | + + Note that the first row does not have a subresource populated. That means any glossary terms, tags, and owners will + be applied at the entity field. If a subresource IS populated (as it is for the second and third rows), glossary + terms and tags will be applied on the subresource. Every row MUST have a resource. Also note that owners can only + be applied at the resource level and will be ignored if populated for a row with a subresource. + """ + + # @classmethod + # def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: + # config = CSVEnricherConfig.parse_obj(config_dict) + # return cls(config, ctx) + + def __init__(self, config: CSVEnricherConfig, ctx: PipelineContext): + super().__init__(ctx) + self.config: CSVEnricherConfig = config + self.ctx: PipelineContext = ctx + self.report: CSVEnricherReport = CSVEnricherReport() + # Map from entity urn to a list of SubResourceRow. + self.editable_schema_metadata_map: Dict[str, List[SubResourceRow]] = {} + + def get_resource_glossary_terms_work_unit( + self, + entity_urn: str, + entity_type: str, + term_associations: List[GlossaryTermAssociationClass], + ) -> Optional[MetadataWorkUnit]: + # Check if there are glossary terms to add. If not, return None. + if len(term_associations) <= 0: + return None + + current_terms: Optional[GlossaryTermsClass] = None + if not self.config.should_overwrite: + # Cannot append if the DataHub graph is None + if not self.ctx.graph: + return None + + # Get the existing terms for the entity from the DataHub graph + current_terms = self.ctx.graph.get_glossary_terms(entity_urn=entity_urn) + + if not current_terms: + # If we want to overwrite or there are no existing terms, create a new GlossaryTerms object + current_terms = GlossaryTermsClass(term_associations, get_audit_stamp()) + else: + current_term_urns: Set[str] = set( + [term.urn for term in current_terms.terms] + ) + term_associations_filtered: List[GlossaryTermAssociationClass] = [ + association + for association in term_associations + if association.urn not in current_term_urns + ] + # If there are no new glossary terms to add, we don't need to emit a work unit. + if len(term_associations_filtered) <= 0: + return None + + # Add any terms that don't already exist in the existing GlossaryTerms object to the object + current_terms.terms.extend(term_associations_filtered) + + terms_mcpw: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityType=entity_type, + entityUrn=entity_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName=GLOSSARY_TERMS_ASPECT_NAME, + aspect=current_terms, + ) + terms_wu: MetadataWorkUnit = MetadataWorkUnit( + id=f"{entity_urn}-{GLOSSARY_TERMS_ASPECT_NAME}", + mcp=terms_mcpw, + ) + return terms_wu + + def get_resource_tags_work_unit( + self, + entity_urn: str, + entity_type: str, + tag_associations: List[TagAssociationClass], + ) -> Optional[MetadataWorkUnit]: + # Check if there are tags to add. If not, return None. + if len(tag_associations) <= 0: + return None + + current_tags: Optional[GlobalTagsClass] = None + if not self.config.should_overwrite: + # Cannot append if the DataHub graph is None + if not self.ctx.graph: + return None + + # Get the existing tags for the entity from the DataHub graph + current_tags = self.ctx.graph.get_tags(entity_urn=entity_urn) + + if not current_tags: + # If we want to overwrite or there are no existing tags, create a new GlobalTags object + current_tags = GlobalTagsClass(tag_associations) + else: + current_tag_urns: Set[str] = set([tag.tag for tag in current_tags.tags]) + tag_associations_filtered: List[TagAssociationClass] = [ + association + for association in tag_associations + if association.tag not in current_tag_urns + ] + # If there are no new tags to add, we don't need to emit a work unit. + if len(tag_associations_filtered) <= 0: + return None + + # Add any terms that don't already exist in the existing GlobalTags object to the object + current_tags.tags.extend(tag_associations_filtered) + + tags_mcpw: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityType=entity_type, + entityUrn=entity_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName=TAGS_ASPECT_NAME, + aspect=current_tags, + ) + tags_wu: MetadataWorkUnit = MetadataWorkUnit( + id=f"{entity_urn}-{TAGS_ASPECT_NAME}", + mcp=tags_mcpw, + ) + return tags_wu + + def get_resource_owners_work_unit( + self, + entity_urn: str, + entity_type: str, + owners: List[OwnerClass], + ) -> Optional[MetadataWorkUnit]: + # Check if there are owners to add. If not, return None. + if len(owners) <= 0: + return None + + current_ownership: Optional[OwnershipClass] = None + if not self.config.should_overwrite: + # Cannot append if the DataHub graph is None + if not self.ctx.graph: + return None + + # Get the existing owner for the entity from the DataHub graph + current_ownership = self.ctx.graph.get_ownership(entity_urn=entity_urn) + + if not current_ownership: + # If we want to overwrite or there are no existing tags, create a new GlobalTags object + current_ownership = OwnershipClass(owners, get_audit_stamp()) + else: + current_owner_urns: Set[str] = set( + [owner.owner for owner in current_ownership.owners] + ) + owners_filtered: List[OwnerClass] = [ + owner for owner in owners if owner.owner not in current_owner_urns + ] + # If there are no new owners to add, we don't need to emit a work unit. + if len(owners_filtered) <= 0: + return None + + # Add any terms that don't already exist in the existing GlobalTags object to the object + current_ownership.owners.extend(owners_filtered) + + owners_mcpw: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityType=entity_type, + entityUrn=entity_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName=OWNERSHIP_ASPECT_NAME, + aspect=current_ownership, + ) + owners_wu: MetadataWorkUnit = MetadataWorkUnit( + id=f"{entity_urn}-{OWNERSHIP_ASPECT_NAME}", + mcp=owners_mcpw, + ) + return owners_wu + + def process_sub_resource_row( + self, + sub_resource_row: SubResourceRow, + current_editable_schema_metadata: EditableSchemaMetadataClass, + needs_write: bool, + ) -> Tuple[EditableSchemaMetadataClass, bool]: + field_path: str = sub_resource_row.field_path + term_associations: List[ + GlossaryTermAssociationClass + ] = sub_resource_row.term_associations + tag_associations: List[TagAssociationClass] = sub_resource_row.tag_associations + has_terms: bool = len(term_associations) > 0 + has_tags: bool = len(tag_associations) > 0 + + # We can skip this row if there are no tags or terms to add. + if not has_tags and not has_terms: + return current_editable_schema_metadata, needs_write + + # Objects that may or not be written depending on which conditions get triggered. + field_info_to_set = EditableSchemaFieldInfoClass(fieldPath=field_path) + terms_aspect = ( + GlossaryTermsClass(term_associations, get_audit_stamp()) + if has_terms + else None + ) + if terms_aspect: + field_info_to_set.glossaryTerms = terms_aspect + tags_aspect = GlobalTagsClass(tag_associations) if has_tags else None + if tags_aspect: + field_info_to_set.globalTags = tags_aspect + + # Boolean field to tell whether we have found a field match. + field_match = False + for field_info in current_editable_schema_metadata.editableSchemaFieldInfo: + if ( + DatasetUrn._get_simple_field_path_from_v2_field_path( + field_info.fieldPath + ) + == field_path + ): + # we have some editable schema metadata for this field + field_match = True + if has_terms: + if field_info.glossaryTerms and not self.config.should_overwrite: + current_term_urns = set( + [term.urn for term in field_info.glossaryTerms.terms] + ) + term_associations_filtered = [ + association + for association in term_associations + if association.urn not in current_term_urns + ] + if len(term_associations_filtered) > 0: + field_info.glossaryTerms.terms.extend( + term_associations_filtered + ) + needs_write = True + else: + field_info.glossaryTerms = terms_aspect + needs_write = True + + if has_tags: + if field_info.globalTags and not self.config.should_overwrite: + current_tag_urns = set( + [tag.tag for tag in field_info.globalTags.tags] + ) + tag_associations_filtered = [ + association + for association in tag_associations + if association.tag not in current_tag_urns + ] + if len(tag_associations_filtered) > 0: + field_info.globalTags.tags.extend(tag_associations_filtered) + needs_write = True + else: + field_info.globalTags = tags_aspect + needs_write = True + + if not field_match: + # this field isn't present in the editable schema metadata aspect, add it + current_editable_schema_metadata.editableSchemaFieldInfo.append( + field_info_to_set + ) + needs_write = True + return current_editable_schema_metadata, needs_write + + def get_sub_resource_work_units(self) -> Iterable[MetadataWorkUnit]: + # Iterate over the map + for entity_urn in self.editable_schema_metadata_map: + # Boolean field to tell whether we need to write an MCPW. + needs_write = False + + current_editable_schema_metadata: Optional[ + EditableSchemaMetadataClass + ] = None + if not self.config.should_overwrite: + # Cannot append if the DataHub graph is None + if not self.ctx.graph: + continue + + # Fetch the current editable schema metadata + current_editable_schema_metadata = self.ctx.graph.get_aspect_v2( + entity_urn=entity_urn, + aspect=SCHEMA_ASPECT_NAME, + aspect_type=EditableSchemaMetadataClass, + ) + + # Create a new editable schema metadata for the dataset if it doesn't exist + if not current_editable_schema_metadata: + current_editable_schema_metadata = EditableSchemaMetadataClass( + editableSchemaFieldInfo=[], + created=get_audit_stamp(), + ) + needs_write = True + + # Iterate over each sub resource row + for sub_resource_row in self.editable_schema_metadata_map[ + entity_urn + ]: # type: SubResourceRow + ( + current_editable_schema_metadata, + needs_write, + ) = self.process_sub_resource_row( + sub_resource_row, current_editable_schema_metadata, needs_write + ) + + # Write an MCPW if needed. + if needs_write: + editable_schema_metadata_mcpw: MetadataChangeProposalWrapper = ( + MetadataChangeProposalWrapper( + entityType=DATASET_ENTITY_TYPE, + changeType=ChangeTypeClass.UPSERT, + entityUrn=entity_urn, + aspectName=SCHEMA_ASPECT_NAME, + aspect=current_editable_schema_metadata, + ) + ) + wu: MetadataWorkUnit = MetadataWorkUnit( + id=f"{entity_urn}-{SCHEMA_ASPECT_NAME}", + mcp=editable_schema_metadata_mcpw, + ) + yield wu + + def maybe_extract_glossary_terms( + self, row: Dict[str, str] + ) -> List[GlossaryTermAssociationClass]: + if not row["glossary_terms"]: + return [] + + # Sanitizing the terms string to just get the list of term urns + terms_array_string = sanitize_array_string(row["glossary_terms"]) + term_urns: List[str] = terms_array_string.split(self.config.array_delimiter) + term_associations: List[GlossaryTermAssociationClass] = [ + GlossaryTermAssociationClass(term) for term in term_urns + ] + return term_associations + + def maybe_extract_tags(self, row: Dict[str, str]) -> List[TagAssociationClass]: + if not row["tags"]: + return [] + + # Sanitizing the tags string to just get the list of tag urns + tags_array_string = sanitize_array_string(row["tags"]) + tag_urns: List[str] = tags_array_string.split(self.config.array_delimiter) + tag_associations: List[TagAssociationClass] = [ + TagAssociationClass(tag) for tag in tag_urns + ] + return tag_associations + + def maybe_extract_owners( + self, row: Dict[str, str], is_resource_row: bool + ) -> List[OwnerClass]: + if not is_resource_row: + return [] + + if not row["owners"]: + return [] + + # Sanitizing the owners string to just get the list of owner urns + owners_array_string = sanitize_array_string(row["owners"]) + owner_urns: List[str] = owners_array_string.split(self.config.array_delimiter) + owners: List[OwnerClass] = [ + OwnerClass(owner_urn, type=OwnershipTypeClass.NONE) + for owner_urn in owner_urns + ] + return owners + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + with open(self.config.filename, "r") as f: + rows = csv.DictReader(f, delimiter=self.config.delimiter) + for row in rows: + # We need the resource to move forward + if not row["resource"]: + continue + + is_resource_row: bool = not row["subresource"] + + entity_urn = row["resource"] + entity_type = Urn.create_from_string(row["resource"]).get_type() + + term_associations: List[ + GlossaryTermAssociationClass + ] = self.maybe_extract_glossary_terms(row) + + # If this is a resource row, try to emit an MCP + if len(term_associations) > 0 and is_resource_row: + maybe_terms_wu: Optional[ + MetadataWorkUnit + ] = self.get_resource_glossary_terms_work_unit( + entity_urn=entity_urn, + entity_type=entity_type, + term_associations=term_associations, + ) + if maybe_terms_wu: + self.report.num_glossary_term_workunits_produced += 1 + self.report.report_workunit(maybe_terms_wu) + yield maybe_terms_wu + + tag_associations: List[TagAssociationClass] = self.maybe_extract_tags( + row + ) + + # If this a resource row, try to emit an MCP + if len(tag_associations) > 0 and is_resource_row: + maybe_tags_wu: Optional[ + MetadataWorkUnit + ] = self.get_resource_tags_work_unit( + entity_urn=entity_urn, + entity_type=entity_type, + tag_associations=tag_associations, + ) + if maybe_tags_wu: + self.report.num_tag_workunits_produced += 1 + self.report.report_workunit(maybe_tags_wu) + yield maybe_tags_wu + + owners: List[OwnerClass] = self.maybe_extract_owners( + row, is_resource_row + ) + if len(owners) > 0: + maybe_owners_wu: Optional[ + MetadataWorkUnit + ] = self.get_resource_owners_work_unit( + entity_urn=entity_urn, + entity_type=entity_type, + owners=owners, + ) + if maybe_owners_wu: + self.report.num_owners_workunits_produced += 1 + self.report.report_workunit(maybe_owners_wu) + yield maybe_owners_wu + + # Check if this row is applying aspects at the subresource level. Note that this only corresponds + # to EditableSchemaMetadata for now. + if not is_resource_row: + # Only dataset sub-resources are currently supported. + if entity_type != DATASET_ENTITY_TYPE: + continue + + field_path = row["subresource"] + if entity_urn not in self.editable_schema_metadata_map: + self.editable_schema_metadata_map[entity_urn] = [] + # Add the row to the map from entity (dataset) to SubResource rows. We cannot emit work units for + # EditableSchemaMetadata until we parse the whole CSV due to read-modify-write issues. + self.editable_schema_metadata_map[entity_urn].append( + SubResourceRow( + entity_urn=entity_urn, + field_path=field_path, + term_associations=term_associations, + tag_associations=tag_associations, + ) + ) + + # Yield sub resource work units once the map has been fully populated. + for wu in self.get_sub_resource_work_units(): + self.report.workunits_produced += 1 + self.report.num_editable_schema_metadata_workunits_produced += 1 + self.report.report_workunit(wu) + yield wu + + def get_report(self): + return self.report + + def close(self): + pass diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source_config/csv_enricher.py new file mode 100644 index 0000000000..faf8fd9a73 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source_config/csv_enricher.py @@ -0,0 +1,18 @@ +import pydantic + +from datahub.configuration.common import ConfigModel + + +class CSVEnricherConfig(ConfigModel): + filename: str = pydantic.Field(description="Path to ingestion CSV file") + should_overwrite: bool = pydantic.Field( + default=False, + description="Whether the ingestion should overwrite. Otherwise, we will append data.", + ) + delimiter: str = pydantic.Field( + default=",", description="Delimiter to use when parsing CSV" + ) + array_delimiter: str = pydantic.Field( + default="|", + description="Delimiter to use when parsing array fields (tags, terms, owners)", + ) diff --git a/metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py b/metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py index fc10323e72..760361db21 100644 --- a/metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py +++ b/metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py @@ -99,3 +99,20 @@ class DatasetUrn(Urn): raise InvalidUrnError( f"Invalid env:{env}. Allowed evn are {DatasetUrn.VALID_FABRIC_SET}" ) + + """A helper function to extract simple . path notation from the v2 field path""" + + @staticmethod + def _get_simple_field_path_from_v2_field_path(field_path: str) -> str: + if field_path.startswith("[version=2.0]"): + # this is a v2 field path + tokens = [ + t + for t in field_path.split(".") + if not (t.startswith("[") or t.endswith("]")) + ] + path = ".".join(tokens) + return path + else: + # not a v2, we assume this is a simple path + return field_path diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json new file mode 100644 index 0000000000..28ae94e595 --- /dev/null +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json @@ -0,0 +1,78 @@ +[ +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "glossaryTerms", + "aspect": { + "value": "{\"terms\": [{\"urn\": \"urn:li:glossaryTerm:AccountBalance\"}], \"auditStamp\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "test-csv-enricher", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "value": "{\"tags\": [{\"tag\": \"urn:li:tag:Legacy\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "test-csv-enricher", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "value": "{\"owners\": [{\"owner\": \"urn:li:corpuser:datahub\", \"type\": \"NONE\"}, {\"owner\": \"urn:li:corpuser:jdoe\", \"type\": \"NONE\"}], \"lastModified\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "test-csv-enricher", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "editableSchemaMetadata", + "aspect": { + "value": "{\"created\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}, \"lastModified\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"editableSchemaFieldInfo\": [{\"fieldPath\": \"field_foo\", \"glossaryTerms\": {\"terms\": [{\"urn\": \"urn:li:glossaryTerm:AccountBalance\"}], \"auditStamp\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}}, {\"fieldPath\": \"field_bar\", \"globalTags\": {\"tags\": [{\"tag\": \"urn:li:tag:Legacy\"}]}}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "test-csv-enricher", + "registryName": null, + "registryVersion": null, + "properties": null + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv new file mode 100644 index 0000000000..10e3e653d7 --- /dev/null +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv @@ -0,0 +1,4 @@ +resource,subresource,glossary_terms,tags,owners +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:AccountBalance],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe] +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],, +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy], \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/csv-enricher/test_csv_enricher.py b/metadata-ingestion/tests/integration/csv-enricher/test_csv_enricher.py new file mode 100644 index 0000000000..dea2e2284d --- /dev/null +++ b/metadata-ingestion/tests/integration/csv-enricher/test_csv_enricher.py @@ -0,0 +1,57 @@ +import pathlib + +from freezegun import freeze_time + +from datahub.ingestion.run.pipeline import Pipeline +from datahub.ingestion.source.csv_enricher import CSVEnricherConfig +from tests.test_helpers import mce_helpers + +FROZEN_TIME = "2022-02-03 07:00:00" + + +def test_csv_enricher_config(): + config = CSVEnricherConfig.parse_obj( + dict( + filename="../integration/csv_enricher/csv_enricher_test_data.csv", + should_overwrite=True, + delimiter=",", + array_delimiter="|", + ) + ) + assert config + + +@freeze_time(FROZEN_TIME) +def test_csv_enricher_source(pytestconfig, tmp_path): + test_resources_dir: pathlib.Path = ( + pytestconfig.rootpath / "tests/integration/csv-enricher" + ) + + pipeline = Pipeline.create( + { + "run_id": "test-csv-enricher", + "source": { + "type": "csv-enricher", + "config": { + "filename": f"{test_resources_dir}/csv_enricher_test_data.csv", + "should_overwrite": True, + "delimiter": ",", + "array_delimiter": "|", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/csv_enricher.json", + }, + }, + } + ) + pipeline.run() + pipeline.raise_from_status() + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "csv_enricher.json", + golden_path=test_resources_dir / "csv_enricher_golden.json", + ) diff --git a/metadata-ingestion/tests/unit/test_csv_enricher_source.py b/metadata-ingestion/tests/unit/test_csv_enricher_source.py new file mode 100644 index 0000000000..747bfa62bd --- /dev/null +++ b/metadata-ingestion/tests/unit/test_csv_enricher_source.py @@ -0,0 +1,169 @@ +from typing import Dict, List, Union +from unittest import mock + +from datahub.emitter import mce_builder +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.csv_enricher import CSVEnricherConfig, CSVEnricherSource +from datahub.metadata.schema_classes import ( + GlossaryTermAssociationClass, + OwnerClass, + OwnershipSourceClass, + OwnershipTypeClass, + TagAssociationClass, +) + +DATASET_URN = ( + "urn:li:dataset:(urn:li:dataPlatform:bigquery,test_dataset.test.Test,PROD)" +) +DATASET_ENTITY_TYPE = "dataset" + + +def create_owners_list_from_urn_list( + owner_urns: List[str], source_type: str +) -> List[OwnerClass]: + ownership_source_type: Union[None, OwnershipSourceClass] = None + if source_type: + ownership_source_type = OwnershipSourceClass(type=source_type) + owners_list = [ + OwnerClass( + owner=owner_urn, + type=OwnershipTypeClass.DATAOWNER, + source=ownership_source_type, + ) + for owner_urn in owner_urns + ] + return owners_list + + +def create_mocked_csv_enricher_source() -> CSVEnricherSource: + ctx = PipelineContext("test-run-id") + graph = mock.MagicMock() + graph.get_ownership.return_value = mce_builder.make_ownership_aspect_from_urn_list( + ["urn:li:corpuser:olduser1"], "AUDIT" + ) + graph.get_glossary_terms.return_value = ( + mce_builder.make_glossary_terms_aspect_from_urn_list( + ["urn:li:glossaryTerm:oldterm1", "urn:li:glossaryTerm:oldterm2"] + ) + ) + graph.get_tags.return_value = mce_builder.make_global_tag_aspect_with_tag_list( + ["oldtag1", "oldtag2"] + ) + graph.get_aspect_v2.return_value = None + ctx.graph = graph + return CSVEnricherSource( + CSVEnricherConfig(**create_base_csv_enricher_config()), ctx + ) + + +def create_base_csv_enricher_config() -> Dict: + return dict( + { + "filename": "../integration/csv_enricher/csv_enricher_test_data.csv", + "should_overwrite": False, + "delimiter": ",", + "array_delimiter": "|", + }, + ) + + +def test_get_resource_glossary_terms_work_unit_no_terms(): + source = create_mocked_csv_enricher_source() + maybe_terms_wu = source.get_resource_glossary_terms_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, [] + ) + assert not maybe_terms_wu + + +def test_get_resource_glossary_terms_no_new_glossary_terms(): + source = create_mocked_csv_enricher_source() + new_glossary_terms = [ + "urn:li:glossaryTerm:oldterm1", + "urn:li:glossaryTerm:oldterm2", + ] + term_associations: List[GlossaryTermAssociationClass] = [ + GlossaryTermAssociationClass(term) for term in new_glossary_terms + ] + maybe_terms_wu = source.get_resource_glossary_terms_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, term_associations + ) + assert not maybe_terms_wu + + +def test_get_resource_glossary_terms_work_unit_produced(): + source = create_mocked_csv_enricher_source() + new_glossary_terms = [ + "urn:li:glossaryTerm:newterm1", + "urn:li:glossaryTerm:newterm2", + ] + term_associations: List[GlossaryTermAssociationClass] = [ + GlossaryTermAssociationClass(term) for term in new_glossary_terms + ] + maybe_terms_wu = source.get_resource_glossary_terms_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, term_associations + ) + assert maybe_terms_wu + + +def test_get_resource_tags_work_unit_no_tags(): + source = create_mocked_csv_enricher_source() + maybe_tags_wu = source.get_resource_tags_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, [] + ) + assert not maybe_tags_wu + + +def test_get_resource_tags_no_new_tags(): + source = create_mocked_csv_enricher_source() + new_tags = ["urn:li:tag:oldtag1", "urn:li:tag:oldtag2"] + tag_associations: List[TagAssociationClass] = [ + TagAssociationClass(tag) for tag in new_tags + ] + maybe_tags_wu = source.get_resource_tags_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, tag_associations + ) + assert not maybe_tags_wu + + +def test_get_resource_tags_work_unit_produced(): + source = create_mocked_csv_enricher_source() + new_tags = ["urn:li:tag:newtag1", "urn:li:tag:newtag2"] + tag_associations: List[TagAssociationClass] = [ + TagAssociationClass(tag) for tag in new_tags + ] + maybe_tags_wu = source.get_resource_tags_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, tag_associations + ) + assert maybe_tags_wu + + +def test_get_resource_owners_work_unit_no_terms(): + source = create_mocked_csv_enricher_source() + maybe_owners_wu = source.get_resource_owners_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, [] + ) + assert not maybe_owners_wu + + +def test_get_resource_owners_no_new_owners(): + source = create_mocked_csv_enricher_source() + new_owners = ["urn:li:corpuser:owner1", "urn:li:corpuser:owner2"] + owners: List[OwnerClass] = [ + OwnerClass(owner, type=OwnershipTypeClass.NONE) for owner in new_owners + ] + maybe_owners_wu = source.get_resource_owners_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, owners + ) + assert maybe_owners_wu + + +def test_get_resource_owners_work_unit_produced(): + source = create_mocked_csv_enricher_source() + new_owners = ["urn:li:corpuser:owner1", "urn:li:corpuser:owner2"] + owners: List[OwnerClass] = [ + OwnerClass(owner, type=OwnershipTypeClass.NONE) for owner in new_owners + ] + maybe_owners_wu = source.get_resource_owners_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, owners + ) + assert maybe_owners_wu