refactor(ingest/biz-glossary): simplify business glossary source (#7912)

This commit is contained in:
Harshal Sheth 2023-05-04 05:31:58 +05:30 committed by GitHub
parent a9e0038199
commit ca5dffa54d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 689 additions and 133 deletions

View File

@ -98,8 +98,8 @@ nodes:
source_ref: FIBO source_ref: FIBO
source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account" source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account"
values: values:
- House.Colors.Red - Housing.Colors.Red
- House.Colors.Pink - Housing.Colors.Pink
- name: Kitchen - name: Kitchen
description: a room or area where food is prepared and cooked. description: a room or area where food is prepared and cooked.
@ -113,7 +113,7 @@ nodes:
source_ref: FIBO source_ref: FIBO
source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account" source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account"
related_terms: related_terms:
- House.Kitchen - Housing.Kitchen
knowledge_links: knowledge_links:
- url: "https://en.wikipedia.org/wiki/Spoon" - url: "https://en.wikipedia.org/wiki/Spoon"
label: Wiki link label: Wiki link

View File

@ -4,12 +4,12 @@ from dataclasses import dataclass
from hashlib import md5 from hashlib import md5
from typing import Any, List, Optional, Set, Tuple from typing import Any, List, Optional, Set, Tuple
import confluent_kafka
import jsonref import jsonref
from confluent_kafka.schema_registry.schema_registry_client import ( from confluent_kafka.schema_registry.schema_registry_client import (
RegisteredSchema, RegisteredSchema,
Schema, Schema,
SchemaReference, SchemaReference,
SchemaRegistryClient,
) )
from datahub.ingestion.extractor import protobuf_util, schema_util from datahub.ingestion.extractor import protobuf_util, schema_util
@ -45,15 +45,12 @@ class ConfluentSchemaRegistry(KafkaSchemaRegistryBase):
) -> None: ) -> None:
self.source_config: KafkaSourceConfig = source_config self.source_config: KafkaSourceConfig = source_config
self.report: KafkaSourceReport = report self.report: KafkaSourceReport = report
# Use the fully qualified name for SchemaRegistryClient to make it mock patchable for testing. self.schema_registry_client = SchemaRegistryClient(
self.schema_registry_client = (
confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient(
{ {
"url": source_config.connection.schema_registry_url, "url": source_config.connection.schema_registry_url,
**source_config.connection.schema_registry_config, **source_config.connection.schema_registry_config,
} }
) )
)
self.known_schema_registry_subjects: List[str] = [] self.known_schema_registry_subjects: List[str] = []
try: try:
self.known_schema_registry_subjects.extend( self.known_schema_registry_subjects.extend(

View File

@ -1165,6 +1165,9 @@ class LookerUserRegistry:
self.looker_api_wrapper = looker_api self.looker_api_wrapper = looker_api
def get_by_id(self, id_: str) -> Optional[LookerUser]: def get_by_id(self, id_: str) -> Optional[LookerUser]:
if not id_:
return None
logger.debug(f"Will get user {id_}") logger.debug(f"Will get user {id_}")
raw_user: Optional[User] = self.looker_api_wrapper.get_user( raw_user: Optional[User] = self.looker_api_wrapper.get_user(

View File

@ -2,7 +2,7 @@ import logging
import pathlib import pathlib
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Union from typing import Any, Dict, Iterable, List, Optional, TypeVar, Union
from pydantic import validator from pydantic import validator
from pydantic.fields import Field from pydantic.fields import Field
@ -13,25 +13,28 @@ from datahub.configuration.config_loader import load_config_file
from datahub.emitter.mce_builder import datahub_guid, make_group_urn, make_user_urn from datahub.emitter.mce_builder import datahub_guid, make_group_urn, make_user_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import ( # SourceCapability,; capability, from datahub.ingestion.api.decorators import (
SupportStatus, SupportStatus,
config_class, config_class,
platform_name, platform_name,
support_status, support_status,
) )
from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.registries.domain_registry import DomainRegistry from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.source_helpers import auto_workunit_reporter from datahub.utilities.source_helpers import (
auto_status_aspect,
auto_workunit,
auto_workunit_reporter,
)
from datahub.utilities.urn_encoder import UrnEncoder from datahub.utilities.urn_encoder import UrnEncoder
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
valid_status: models.StatusClass = models.StatusClass(removed=False) GlossaryNodeInterface = TypeVar(
"GlossaryNodeInterface", "GlossaryNodeConfig", "BusinessGlossaryConfig"
# This needed to map path presents in inherits, contains, values, and related_terms to terms' optional id )
path_vs_id: Dict[str, Optional[str]] = {}
class Owners(ConfigModel): class Owners(ConfigModel):
@ -60,27 +63,30 @@ class GlossaryTermConfig(ConfigModel):
knowledge_links: Optional[List[KnowledgeCard]] knowledge_links: Optional[List[KnowledgeCard]]
domain: Optional[str] domain: Optional[str]
# Private fields.
_urn: str
class GlossaryNodeConfig(ConfigModel): class GlossaryNodeConfig(ConfigModel):
id: Optional[str] id: Optional[str]
name: str name: str
description: str description: str
owners: Optional[Owners] owners: Optional[Owners]
terms: Optional[List[GlossaryTermConfig]] terms: Optional[List["GlossaryTermConfig"]]
nodes: Optional[List["GlossaryNodeConfig"]] nodes: Optional[List["GlossaryNodeConfig"]]
knowledge_links: Optional[List[KnowledgeCard]] knowledge_links: Optional[List[KnowledgeCard]]
# Private fields.
GlossaryNodeConfig.update_forward_refs() _urn: str
class DefaultConfig(ConfigModel): class DefaultConfig(ConfigModel):
"""Holds defaults for populating fields in glossary terms""" """Holds defaults for populating fields in glossary terms"""
source: str source: Optional[str]
owners: Owners owners: Owners
url: Optional[str] = None url: Optional[str] = None
source_type: Optional[str] = "INTERNAL" source_type: str = "INTERNAL"
class BusinessGlossarySourceConfig(ConfigModel): class BusinessGlossarySourceConfig(ConfigModel):
@ -88,15 +94,15 @@ class BusinessGlossarySourceConfig(ConfigModel):
description="File path or URL to business glossary file to ingest." description="File path or URL to business glossary file to ingest."
) )
enable_auto_id: bool = Field( enable_auto_id: bool = Field(
description="Generate id field from GlossaryNode and GlossaryTerm's name field", description="Generate guid urns instead of a plaintext path urn with the node/term's hierarchy.",
default=False, default=False,
) )
class BusinessGlossaryConfig(DefaultConfig): class BusinessGlossaryConfig(DefaultConfig):
version: str version: str
nodes: Optional[List[GlossaryNodeConfig]] terms: Optional[List["GlossaryTermConfig"]]
terms: Optional[List[GlossaryTermConfig]] nodes: Optional[List["GlossaryNodeConfig"]]
@validator("version") @validator("version")
def version_must_be_1(cls, v): def version_must_be_1(cls, v):
@ -166,17 +172,17 @@ def get_owners(owners: Owners) -> models.OwnershipClass:
def get_mces( def get_mces(
glossary: BusinessGlossaryConfig, glossary: BusinessGlossaryConfig,
path_vs_id: Dict[str, str],
ingestion_config: BusinessGlossarySourceConfig, ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext, ctx: PipelineContext,
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]: ) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
path: List[str] = []
root_owners = get_owners(glossary.owners) root_owners = get_owners(glossary.owners)
if glossary.nodes: if glossary.nodes:
for node in glossary.nodes: for node in glossary.nodes:
yield from get_mces_from_node( yield from get_mces_from_node(
node, node,
path + [node.name], path_vs_id=path_vs_id,
parentNode=None, parentNode=None,
parentOwners=root_owners, parentOwners=root_owners,
defaults=glossary, defaults=glossary,
@ -188,7 +194,7 @@ def get_mces(
for term in glossary.terms: for term in glossary.terms:
yield from get_mces_from_term( yield from get_mces_from_term(
term, term,
path + [term.name], path_vs_id=path_vs_id,
parentNode=None, parentNode=None,
parentOwnership=root_owners, parentOwnership=root_owners,
defaults=glossary, defaults=glossary,
@ -237,16 +243,15 @@ def make_domain_mcp(
def get_mces_from_node( def get_mces_from_node(
glossaryNode: GlossaryNodeConfig, glossaryNode: GlossaryNodeConfig,
path: List[str], path_vs_id: Dict[str, str],
parentNode: Optional[str], parentNode: Optional[str],
parentOwners: models.OwnershipClass, parentOwners: models.OwnershipClass,
defaults: DefaultConfig, defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig, ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext, ctx: PipelineContext,
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]: ) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
node_urn = make_glossary_node_urn( node_urn = glossaryNode._urn
path, glossaryNode.id, ingestion_config.enable_auto_id
)
node_info = models.GlossaryNodeInfoClass( node_info = models.GlossaryNodeInfoClass(
definition=glossaryNode.description, definition=glossaryNode.description,
parentNode=parentNode, parentNode=parentNode,
@ -259,7 +264,7 @@ def get_mces_from_node(
node_snapshot = models.GlossaryNodeSnapshotClass( node_snapshot = models.GlossaryNodeSnapshotClass(
urn=node_urn, urn=node_urn,
aspects=[node_info, node_owners, valid_status], aspects=[node_info, node_owners],
) )
yield get_mce_from_snapshot(node_snapshot) yield get_mce_from_snapshot(node_snapshot)
@ -274,7 +279,7 @@ def get_mces_from_node(
for node in glossaryNode.nodes: for node in glossaryNode.nodes:
yield from get_mces_from_node( yield from get_mces_from_node(
node, node,
path + [node.name], path_vs_id=path_vs_id,
parentNode=node_urn, parentNode=node_urn,
parentOwners=node_owners, parentOwners=node_owners,
defaults=defaults, defaults=defaults,
@ -286,7 +291,7 @@ def get_mces_from_node(
for term in glossaryNode.terms: for term in glossaryNode.terms:
yield from get_mces_from_term( yield from get_mces_from_term(
glossaryTerm=term, glossaryTerm=term,
path=path + [term.name], path_vs_id=path_vs_id,
parentNode=node_urn, parentNode=node_urn,
parentOwnership=node_owners, parentOwnership=node_owners,
defaults=defaults, defaults=defaults,
@ -313,29 +318,28 @@ def get_domain_class(
def get_mces_from_term( def get_mces_from_term(
glossaryTerm: GlossaryTermConfig, glossaryTerm: GlossaryTermConfig,
path: List[str], path_vs_id: Dict[str, str],
parentNode: Optional[str], parentNode: Optional[str],
parentOwnership: models.OwnershipClass, parentOwnership: models.OwnershipClass,
defaults: DefaultConfig, defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig, ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext, ctx: PipelineContext,
) -> Iterable[Union[models.MetadataChangeEventClass, MetadataChangeProposalWrapper]]: ) -> Iterable[Union[models.MetadataChangeEventClass, MetadataChangeProposalWrapper]]:
term_urn = make_glossary_term_urn( term_urn = glossaryTerm._urn
path, glossaryTerm.id, ingestion_config.enable_auto_id
)
aspects: List[ aspects: List[
Union[ Union[
models.GlossaryTermInfoClass, models.GlossaryTermInfoClass,
models.GlossaryRelatedTermsClass, models.GlossaryRelatedTermsClass,
models.OwnershipClass, models.OwnershipClass,
models.StatusClass,
models.GlossaryTermKeyClass, models.GlossaryTermKeyClass,
models.StatusClass,
models.BrowsePathsClass, models.BrowsePathsClass,
] ]
] = [] ] = []
term_info = models.GlossaryTermInfoClass( term_info = models.GlossaryTermInfoClass(
definition=glossaryTerm.description, definition=glossaryTerm.description,
termSource=glossaryTerm.term_source # type: ignore termSource=glossaryTerm.term_source
if glossaryTerm.term_source is not None if glossaryTerm.term_source is not None
else defaults.source_type, else defaults.source_type,
sourceRef=glossaryTerm.source_ref sourceRef=glossaryTerm.source_ref
@ -432,27 +436,46 @@ def get_mces_from_term(
yield mcp yield mcp
def populate_path_vs_id(glossary: BusinessGlossaryConfig) -> None: def materialize_all_node_urns(
path: List[str] = [] glossary: BusinessGlossaryConfig, enable_auto_id: bool
) -> None:
"""After this runs, all nodes will have an id value that is a valid urn."""
def _process_child_terms(parent_node: GlossaryNodeConfig, path: List[str]) -> None: def _process_child_terms(
path_vs_id[".".join(path + [parent_node.name])] = parent_node.id parent_node: GlossaryNodeInterface, path: List[str]
) -> None:
for term in parent_node.terms or []:
term._urn = make_glossary_term_urn(
path + [term.name], term.id, enable_auto_id
)
if parent_node.terms: for node in parent_node.nodes or []:
for term in parent_node.terms: node._urn = make_glossary_node_urn(
path_vs_id[".".join(path + [parent_node.name] + [term.name])] = term.id path + [node.name], node.id, enable_auto_id
)
_process_child_terms(node, path + [node.name])
if parent_node.nodes: _process_child_terms(glossary, [])
for node in parent_node.nodes:
_process_child_terms(node, path + [parent_node.name])
if glossary.nodes:
for node in glossary.nodes:
_process_child_terms(node, path)
if glossary.terms: def populate_path_vs_id(glossary: BusinessGlossaryConfig) -> Dict[str, str]:
for term in glossary.terms: # This needed to map paths present in inherits, contains, values, and related_terms to term's
path_vs_id[".".join(path + [term.name])] = term.id # urn, if one was manually specified.
path_vs_id: Dict[str, str] = {}
def _process_child_terms(
parent_node: GlossaryNodeInterface, path: List[str]
) -> None:
for term in parent_node.terms or []:
path_vs_id[".".join(path + [term.name])] = term._urn
for node in parent_node.nodes or []:
path_vs_id[".".join(path + [node.name])] = node._urn
_process_child_terms(node, path + [node.name])
_process_child_terms(glossary, [])
return path_vs_id
@platform_name("Business Glossary") @platform_name("Business Glossary")
@ -472,28 +495,36 @@ class BusinessGlossaryFileSource(Source):
config = BusinessGlossarySourceConfig.parse_obj(config_dict) config = BusinessGlossarySourceConfig.parse_obj(config_dict)
return cls(ctx, config) return cls(ctx, config)
@classmethod
def load_glossary_config( def load_glossary_config(
self, file_name: Union[str, pathlib.Path] cls, file_name: Union[str, pathlib.Path]
) -> BusinessGlossaryConfig: ) -> BusinessGlossaryConfig:
config = load_config_file(file_name) config = load_config_file(file_name)
glossary_cfg = BusinessGlossaryConfig.parse_obj(config) glossary_cfg = BusinessGlossaryConfig.parse_obj(config)
return glossary_cfg return glossary_cfg
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]: def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal()) return auto_workunit_reporter(
self.report,
auto_status_aspect(
self.get_workunits_internal(),
),
)
def get_workunits_internal( def get_workunits_internal(
self, self,
) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]: ) -> Iterable[MetadataWorkUnit]:
glossary_config = self.load_glossary_config(self.config.file) glossary_config = self.load_glossary_config(self.config.file)
populate_path_vs_id(glossary_config)
for event in get_mces( materialize_all_node_urns(glossary_config, self.config.enable_auto_id)
glossary_config, ingestion_config=self.config, ctx=self.ctx path_vs_id = populate_path_vs_id(glossary_config)
for event in auto_workunit(
get_mces(
glossary_config, path_vs_id, ingestion_config=self.config, ctx=self.ctx
)
): ):
if isinstance(event, models.MetadataChangeEventClass): yield event
yield MetadataWorkUnit(f"{event.proposedSnapshot.urn}", mce=event)
elif isinstance(event, MetadataChangeProposalWrapper):
yield event.as_workunit()
def get_report(self): def get_report(self):
return self.report return self.report

View File

@ -23,11 +23,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -248,11 +243,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -414,11 +404,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -538,11 +523,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -589,5 +569,200 @@
"lastObserved": 1586847600000, "lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00" "runId": "datahub-business-glossary-2020_04_14-07_00_00"
} }
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:47caec536f4dd5b56de2b94b3897f136",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:510f2c45a4622cb5ae7d4616c2aeafa2",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:67ea696a55826c17399d05e1299c330a",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:a52cf29fb9afc324fbc4976e43dbaa92",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:23f52095a9a4d310bc3a7851b483a462",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:26a6cb647cc4796632eecab6db746d92",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:4faf1eed790370f65942f2998a7993d6",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:9a589ce0a808216a95511b3ed2e223b1",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:aa0c9a43967840932fe68b1ffc0815ec",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:ad2c819afb36131c86398364213ad233",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:cab86764c6cdaab05d69306cf6e0ba94",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:d47e082e7d78362d2d1bc34c318ede70",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:dda3991a819112a7bc4c097cc0b16a09",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
} }
] ]

View File

@ -23,11 +23,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -248,11 +243,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -414,11 +404,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -538,11 +523,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -589,5 +569,200 @@
"lastObserved": 1586847600000, "lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00" "runId": "datahub-business-glossary-2020_04_14-07_00_00"
} }
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:Classification",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:Clients And Accounts",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:KPIs",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:Personal Information",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:4faf1eed790370f65942f2998a7993d6",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Classification.Confidential",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Classification.Highly Confidential",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Classification.Sensitive",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Clients And Accounts.Account",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Clients And Accounts.Balance",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Personal Information.Address",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Personal Information.Email",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Personal Information.Gender",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
} }
] ]

View File

@ -23,11 +23,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -248,11 +243,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -414,11 +404,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -538,11 +523,6 @@
"actor": "urn:li:corpuser:unknown" "actor": "urn:li:corpuser:unknown"
} }
} }
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
} }
] ]
} }
@ -589,5 +569,200 @@
"lastObserved": 1629795600000, "lastObserved": 1629795600000,
"runId": "remote-4" "runId": "remote-4"
} }
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:Classification",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:Clients And Accounts",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:KPIs",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryNode",
"entityUrn": "urn:li:glossaryNode:Personal Information",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:4faf1eed790370f65942f2998a7993d6",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Classification.Confidential",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Classification.Highly Confidential",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Classification.Sensitive",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Clients And Accounts.Account",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Clients And Accounts.Balance",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Personal Information.Address",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Personal Information.Email",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Personal Information.Gender",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "remote-4"
}
} }
] ]

View File

@ -205,7 +205,7 @@ def test_close(mock_kafka, mock_admin_client):
@patch( @patch(
"datahub.ingestion.source.kafka.confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient", "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient",
autospec=True, autospec=True,
) )
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True) @patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
@ -372,7 +372,7 @@ def test_kafka_source_workunits_schema_registry_subject_name_strategies(
], ],
) )
@patch( @patch(
"datahub.ingestion.source.kafka.confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient", "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient",
autospec=True, autospec=True,
) )
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True) @patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)