diff --git a/metadata-ingestion/examples/bootstrap_data/business_glossary.yml b/metadata-ingestion/examples/bootstrap_data/business_glossary.yml index b3fc700258..d35430d6d0 100644 --- a/metadata-ingestion/examples/bootstrap_data/business_glossary.yml +++ b/metadata-ingestion/examples/bootstrap_data/business_glossary.yml @@ -98,8 +98,8 @@ nodes: source_ref: FIBO source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account" values: - - House.Colors.Red - - House.Colors.Pink + - Housing.Colors.Red + - Housing.Colors.Pink - name: Kitchen description: a room or area where food is prepared and cooked. @@ -113,7 +113,7 @@ nodes: source_ref: FIBO source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account" related_terms: - - House.Kitchen + - Housing.Kitchen knowledge_links: - url: "https://en.wikipedia.org/wiki/Spoon" label: Wiki link diff --git a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py index 15d1aac621..83ff337e2a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py @@ -4,12 +4,12 @@ from dataclasses import dataclass from hashlib import md5 from typing import Any, List, Optional, Set, Tuple -import confluent_kafka import jsonref from confluent_kafka.schema_registry.schema_registry_client import ( RegisteredSchema, Schema, SchemaReference, + SchemaRegistryClient, ) from datahub.ingestion.extractor import protobuf_util, schema_util @@ -45,14 +45,11 @@ class ConfluentSchemaRegistry(KafkaSchemaRegistryBase): ) -> None: self.source_config: KafkaSourceConfig = source_config self.report: KafkaSourceReport = report - # Use the fully qualified name for SchemaRegistryClient to make it mock patchable for testing. - self.schema_registry_client = ( - confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient( - { - "url": source_config.connection.schema_registry_url, - **source_config.connection.schema_registry_config, - } - ) + self.schema_registry_client = SchemaRegistryClient( + { + "url": source_config.connection.schema_registry_url, + **source_config.connection.schema_registry_config, + } ) self.known_schema_registry_subjects: List[str] = [] try: diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index ebbd49f658..25bc6f4359 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -1165,6 +1165,9 @@ class LookerUserRegistry: self.looker_api_wrapper = looker_api def get_by_id(self, id_: str) -> Optional[LookerUser]: + if not id_: + return None + logger.debug(f"Will get user {id_}") raw_user: Optional[User] = self.looker_api_wrapper.get_user( diff --git a/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py b/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py index ecb0bab91c..352fd689e6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py @@ -2,7 +2,7 @@ import logging import pathlib import time from dataclasses import dataclass, field -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, TypeVar, Union from pydantic import validator from pydantic.fields import Field @@ -13,25 +13,28 @@ from datahub.configuration.config_loader import load_config_file from datahub.emitter.mce_builder import datahub_guid, make_group_urn, make_user_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.api.decorators import ( # SourceCapability,; capability, +from datahub.ingestion.api.decorators import ( SupportStatus, config_class, platform_name, support_status, ) from datahub.ingestion.api.source import Source, SourceReport -from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit +from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.graph.client import DataHubGraph from datahub.utilities.registries.domain_registry import DomainRegistry -from datahub.utilities.source_helpers import auto_workunit_reporter +from datahub.utilities.source_helpers import ( + auto_status_aspect, + auto_workunit, + auto_workunit_reporter, +) from datahub.utilities.urn_encoder import UrnEncoder logger = logging.getLogger(__name__) -valid_status: models.StatusClass = models.StatusClass(removed=False) - -# This needed to map path presents in inherits, contains, values, and related_terms to terms' optional id -path_vs_id: Dict[str, Optional[str]] = {} +GlossaryNodeInterface = TypeVar( + "GlossaryNodeInterface", "GlossaryNodeConfig", "BusinessGlossaryConfig" +) class Owners(ConfigModel): @@ -60,27 +63,30 @@ class GlossaryTermConfig(ConfigModel): knowledge_links: Optional[List[KnowledgeCard]] domain: Optional[str] + # Private fields. + _urn: str + class GlossaryNodeConfig(ConfigModel): id: Optional[str] name: str description: str owners: Optional[Owners] - terms: Optional[List[GlossaryTermConfig]] + terms: Optional[List["GlossaryTermConfig"]] nodes: Optional[List["GlossaryNodeConfig"]] knowledge_links: Optional[List[KnowledgeCard]] - -GlossaryNodeConfig.update_forward_refs() + # Private fields. + _urn: str class DefaultConfig(ConfigModel): """Holds defaults for populating fields in glossary terms""" - source: str + source: Optional[str] owners: Owners url: Optional[str] = None - source_type: Optional[str] = "INTERNAL" + source_type: str = "INTERNAL" class BusinessGlossarySourceConfig(ConfigModel): @@ -88,15 +94,15 @@ class BusinessGlossarySourceConfig(ConfigModel): description="File path or URL to business glossary file to ingest." ) enable_auto_id: bool = Field( - description="Generate id field from GlossaryNode and GlossaryTerm's name field", + description="Generate guid urns instead of a plaintext path urn with the node/term's hierarchy.", default=False, ) class BusinessGlossaryConfig(DefaultConfig): version: str - nodes: Optional[List[GlossaryNodeConfig]] - terms: Optional[List[GlossaryTermConfig]] + terms: Optional[List["GlossaryTermConfig"]] + nodes: Optional[List["GlossaryNodeConfig"]] @validator("version") def version_must_be_1(cls, v): @@ -166,17 +172,17 @@ def get_owners(owners: Owners) -> models.OwnershipClass: def get_mces( glossary: BusinessGlossaryConfig, + path_vs_id: Dict[str, str], ingestion_config: BusinessGlossarySourceConfig, ctx: PipelineContext, ) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]: - path: List[str] = [] root_owners = get_owners(glossary.owners) if glossary.nodes: for node in glossary.nodes: yield from get_mces_from_node( node, - path + [node.name], + path_vs_id=path_vs_id, parentNode=None, parentOwners=root_owners, defaults=glossary, @@ -188,7 +194,7 @@ def get_mces( for term in glossary.terms: yield from get_mces_from_term( term, - path + [term.name], + path_vs_id=path_vs_id, parentNode=None, parentOwnership=root_owners, defaults=glossary, @@ -237,16 +243,15 @@ def make_domain_mcp( def get_mces_from_node( glossaryNode: GlossaryNodeConfig, - path: List[str], + path_vs_id: Dict[str, str], parentNode: Optional[str], parentOwners: models.OwnershipClass, defaults: DefaultConfig, ingestion_config: BusinessGlossarySourceConfig, ctx: PipelineContext, ) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]: - node_urn = make_glossary_node_urn( - path, glossaryNode.id, ingestion_config.enable_auto_id - ) + node_urn = glossaryNode._urn + node_info = models.GlossaryNodeInfoClass( definition=glossaryNode.description, parentNode=parentNode, @@ -259,7 +264,7 @@ def get_mces_from_node( node_snapshot = models.GlossaryNodeSnapshotClass( urn=node_urn, - aspects=[node_info, node_owners, valid_status], + aspects=[node_info, node_owners], ) yield get_mce_from_snapshot(node_snapshot) @@ -274,7 +279,7 @@ def get_mces_from_node( for node in glossaryNode.nodes: yield from get_mces_from_node( node, - path + [node.name], + path_vs_id=path_vs_id, parentNode=node_urn, parentOwners=node_owners, defaults=defaults, @@ -286,7 +291,7 @@ def get_mces_from_node( for term in glossaryNode.terms: yield from get_mces_from_term( glossaryTerm=term, - path=path + [term.name], + path_vs_id=path_vs_id, parentNode=node_urn, parentOwnership=node_owners, defaults=defaults, @@ -313,29 +318,28 @@ def get_domain_class( def get_mces_from_term( glossaryTerm: GlossaryTermConfig, - path: List[str], + path_vs_id: Dict[str, str], parentNode: Optional[str], parentOwnership: models.OwnershipClass, defaults: DefaultConfig, ingestion_config: BusinessGlossarySourceConfig, ctx: PipelineContext, ) -> Iterable[Union[models.MetadataChangeEventClass, MetadataChangeProposalWrapper]]: - term_urn = make_glossary_term_urn( - path, glossaryTerm.id, ingestion_config.enable_auto_id - ) + term_urn = glossaryTerm._urn + aspects: List[ Union[ models.GlossaryTermInfoClass, models.GlossaryRelatedTermsClass, models.OwnershipClass, - models.StatusClass, models.GlossaryTermKeyClass, + models.StatusClass, models.BrowsePathsClass, ] ] = [] term_info = models.GlossaryTermInfoClass( definition=glossaryTerm.description, - termSource=glossaryTerm.term_source # type: ignore + termSource=glossaryTerm.term_source if glossaryTerm.term_source is not None else defaults.source_type, sourceRef=glossaryTerm.source_ref @@ -432,27 +436,46 @@ def get_mces_from_term( yield mcp -def populate_path_vs_id(glossary: BusinessGlossaryConfig) -> None: - path: List[str] = [] +def materialize_all_node_urns( + glossary: BusinessGlossaryConfig, enable_auto_id: bool +) -> None: + """After this runs, all nodes will have an id value that is a valid urn.""" - def _process_child_terms(parent_node: GlossaryNodeConfig, path: List[str]) -> None: - path_vs_id[".".join(path + [parent_node.name])] = parent_node.id + def _process_child_terms( + parent_node: GlossaryNodeInterface, path: List[str] + ) -> None: + for term in parent_node.terms or []: + term._urn = make_glossary_term_urn( + path + [term.name], term.id, enable_auto_id + ) - if parent_node.terms: - for term in parent_node.terms: - path_vs_id[".".join(path + [parent_node.name] + [term.name])] = term.id + for node in parent_node.nodes or []: + node._urn = make_glossary_node_urn( + path + [node.name], node.id, enable_auto_id + ) + _process_child_terms(node, path + [node.name]) - if parent_node.nodes: - for node in parent_node.nodes: - _process_child_terms(node, path + [parent_node.name]) + _process_child_terms(glossary, []) - if glossary.nodes: - for node in glossary.nodes: - _process_child_terms(node, path) - if glossary.terms: - for term in glossary.terms: - path_vs_id[".".join(path + [term.name])] = term.id +def populate_path_vs_id(glossary: BusinessGlossaryConfig) -> Dict[str, str]: + # This needed to map paths present in inherits, contains, values, and related_terms to term's + # urn, if one was manually specified. + path_vs_id: Dict[str, str] = {} + + def _process_child_terms( + parent_node: GlossaryNodeInterface, path: List[str] + ) -> None: + for term in parent_node.terms or []: + path_vs_id[".".join(path + [term.name])] = term._urn + + for node in parent_node.nodes or []: + path_vs_id[".".join(path + [node.name])] = node._urn + _process_child_terms(node, path + [node.name]) + + _process_child_terms(glossary, []) + + return path_vs_id @platform_name("Business Glossary") @@ -472,28 +495,36 @@ class BusinessGlossaryFileSource(Source): config = BusinessGlossarySourceConfig.parse_obj(config_dict) return cls(ctx, config) + @classmethod def load_glossary_config( - self, file_name: Union[str, pathlib.Path] + cls, file_name: Union[str, pathlib.Path] ) -> BusinessGlossaryConfig: config = load_config_file(file_name) glossary_cfg = BusinessGlossaryConfig.parse_obj(config) return glossary_cfg - def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]: - return auto_workunit_reporter(self.report, self.get_workunits_internal()) + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + return auto_workunit_reporter( + self.report, + auto_status_aspect( + self.get_workunits_internal(), + ), + ) def get_workunits_internal( self, - ) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]: + ) -> Iterable[MetadataWorkUnit]: glossary_config = self.load_glossary_config(self.config.file) - populate_path_vs_id(glossary_config) - for event in get_mces( - glossary_config, ingestion_config=self.config, ctx=self.ctx + + materialize_all_node_urns(glossary_config, self.config.enable_auto_id) + path_vs_id = populate_path_vs_id(glossary_config) + + for event in auto_workunit( + get_mces( + glossary_config, path_vs_id, ingestion_config=self.config, ctx=self.ctx + ) ): - if isinstance(event, models.MetadataChangeEventClass): - yield MetadataWorkUnit(f"{event.proposedSnapshot.urn}", mce=event) - elif isinstance(event, MetadataChangeProposalWrapper): - yield event.as_workunit() + yield event def get_report(self): return self.report diff --git a/metadata-ingestion/tests/integration/business-glossary/glossary_events_auto_id_golden.json b/metadata-ingestion/tests/integration/business-glossary/glossary_events_auto_id_golden.json index 4b4f4d247e..b8cc922f0c 100644 --- a/metadata-ingestion/tests/integration/business-glossary/glossary_events_auto_id_golden.json +++ b/metadata-ingestion/tests/integration/business-glossary/glossary_events_auto_id_golden.json @@ -23,11 +23,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -248,11 +243,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -414,11 +404,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -538,11 +523,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -589,5 +569,200 @@ "lastObserved": 1586847600000, "runId": "datahub-business-glossary-2020_04_14-07_00_00" } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:47caec536f4dd5b56de2b94b3897f136", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:510f2c45a4622cb5ae7d4616c2aeafa2", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:67ea696a55826c17399d05e1299c330a", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:a52cf29fb9afc324fbc4976e43dbaa92", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:23f52095a9a4d310bc3a7851b483a462", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:26a6cb647cc4796632eecab6db746d92", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:4faf1eed790370f65942f2998a7993d6", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:9a589ce0a808216a95511b3ed2e223b1", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:aa0c9a43967840932fe68b1ffc0815ec", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:ad2c819afb36131c86398364213ad233", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:cab86764c6cdaab05d69306cf6e0ba94", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:d47e082e7d78362d2d1bc34c318ede70", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:dda3991a819112a7bc4c097cc0b16a09", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/business-glossary/glossary_events_golden.json b/metadata-ingestion/tests/integration/business-glossary/glossary_events_golden.json index e615eefc19..e2b525658e 100644 --- a/metadata-ingestion/tests/integration/business-glossary/glossary_events_golden.json +++ b/metadata-ingestion/tests/integration/business-glossary/glossary_events_golden.json @@ -23,11 +23,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -248,11 +243,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -414,11 +404,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -538,11 +523,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -589,5 +569,200 @@ "lastObserved": 1586847600000, "runId": "datahub-business-glossary-2020_04_14-07_00_00" } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:Classification", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:Clients And Accounts", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:KPIs", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:Personal Information", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:4faf1eed790370f65942f2998a7993d6", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Classification.Confidential", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Classification.Highly Confidential", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Classification.Sensitive", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Clients And Accounts.Account", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Clients And Accounts.Balance", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Personal Information.Address", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Personal Information.Email", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Personal Information.Gender", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/remote/golden/remote_glossary_golden.json b/metadata-ingestion/tests/integration/remote/golden/remote_glossary_golden.json index 62119140f2..1e1932822a 100644 --- a/metadata-ingestion/tests/integration/remote/golden/remote_glossary_golden.json +++ b/metadata-ingestion/tests/integration/remote/golden/remote_glossary_golden.json @@ -23,11 +23,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -248,11 +243,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -414,11 +404,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -538,11 +523,6 @@ "actor": "urn:li:corpuser:unknown" } } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } } ] } @@ -589,5 +569,200 @@ "lastObserved": 1629795600000, "runId": "remote-4" } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:Classification", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:Clients And Accounts", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:KPIs", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryNode", + "entityUrn": "urn:li:glossaryNode:Personal Information", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:4faf1eed790370f65942f2998a7993d6", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Classification.Confidential", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Classification.Highly Confidential", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Classification.Sensitive", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Clients And Accounts.Account", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Clients And Accounts.Balance", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Personal Information.Address", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Personal Information.Email", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Personal Information.Gender", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "remote-4" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_kafka_source.py b/metadata-ingestion/tests/unit/test_kafka_source.py index fa0769b64d..1c9b95686a 100644 --- a/metadata-ingestion/tests/unit/test_kafka_source.py +++ b/metadata-ingestion/tests/unit/test_kafka_source.py @@ -205,7 +205,7 @@ def test_close(mock_kafka, mock_admin_client): @patch( - "datahub.ingestion.source.kafka.confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient", + "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient", autospec=True, ) @patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True) @@ -372,7 +372,7 @@ def test_kafka_source_workunits_schema_registry_subject_name_strategies( ], ) @patch( - "datahub.ingestion.source.kafka.confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient", + "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient", autospec=True, ) @patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)