feat(ingestion): Business Glossary# Add domain support in GlossaryTerm ingestion (#6829)

* lint fix

* domain in term

* domain in term

* review comments

* add todo

Co-authored-by: MohdSiddique Bagwan <mohdsiddique.bagwan@gslab.com>
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
mohdsiddique 2022-12-23 04:17:57 +05:30 committed by GitHub
parent 53fe13ebed
commit 9daa8ed56f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 104 additions and 25 deletions

View File

@ -36,6 +36,7 @@ The business glossary source file should be a `.yml` file with the following top
- **inherits**: (optional) List of **GlossaryTerm** that this term inherits from
- **contains**: (optional) List of **GlossaryTerm** that this term contains
- **custom_properties**: A map of key/value pairs of arbitrary custom properties
- **domain**: (optional) domain name or domain urn
You can also view an example business glossary file checked in [here](../../../examples/bootstrap_data/business_glossary.yml)

View File

@ -23,6 +23,7 @@ nodes:
description: Highly Confidential Data
custom_properties:
is_confidential: true
domain: Marketing
- name: PersonalInformation
description: All terms related to personal information
owners:

View File

@ -18,6 +18,7 @@ from datahub.emitter.mce_builder import (
make_user_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import ( # SourceCapability,; capability,
SupportStatus,
config_class,
@ -26,6 +27,8 @@ from datahub.ingestion.api.decorators import ( # SourceCapability,; capability,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.urn_encoder import UrnEncoder
logger = logging.getLogger(__name__)
@ -64,6 +67,7 @@ class GlossaryTermConfig(ConfigModel):
related_terms: Optional[List[str]]
custom_properties: Optional[Dict[str, str]]
knowledge_links: Optional[List[KnowledgeCard]]
domain: Optional[str]
class GlossaryNodeConfig(ConfigModel):
@ -170,7 +174,9 @@ def get_owners(owners: Owners) -> models.OwnershipClass:
def get_mces(
glossary: BusinessGlossaryConfig, ingestion_config: BusinessGlossarySourceConfig
glossary: BusinessGlossaryConfig,
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
path: List[str] = []
root_owners = get_owners(glossary.owners)
@ -184,6 +190,7 @@ def get_mces(
parentOwners=root_owners,
defaults=glossary,
ingestion_config=ingestion_config,
ctx=ctx,
)
if glossary.terms:
@ -195,6 +202,7 @@ def get_mces(
parentOwnership=root_owners,
defaults=glossary,
ingestion_config=ingestion_config,
ctx=ctx,
)
@ -230,6 +238,12 @@ def make_institutional_memory_mcp(
return None
def make_domain_mcp(
term_urn: str, domain_aspect: models.DomainsClass
) -> MetadataChangeProposalWrapper:
return MetadataChangeProposalWrapper(entityUrn=term_urn, aspect=domain_aspect)
def get_mces_from_node(
glossaryNode: GlossaryNodeConfig,
path: List[str],
@ -237,6 +251,7 @@ def get_mces_from_node(
parentOwners: models.OwnershipClass,
defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
node_urn = make_glossary_node_urn(
path, glossaryNode.id, ingestion_config.enable_auto_id
@ -273,6 +288,7 @@ def get_mces_from_node(
parentOwners=node_owners,
defaults=defaults,
ingestion_config=ingestion_config,
ctx=ctx,
)
if glossaryNode.terms:
@ -284,9 +300,26 @@ def get_mces_from_node(
parentOwnership=node_owners,
defaults=defaults,
ingestion_config=ingestion_config,
ctx=ctx,
)
def get_domain_class(
graph: Optional[DataHubGraph], domains: List[str]
) -> models.DomainsClass:
# FIXME: In the ideal case, the domain registry would be an instance variable so that it
# preserves its cache across calls to this function. However, the current implementation
# requires the full list of domains to be passed in at instantiation time, so we can't
# actually do that.
domain_registry: DomainRegistry = DomainRegistry(
cached_domains=[k for k in domains], graph=graph
)
domain_class = models.DomainsClass(
domains=[domain_registry.get_domain_urn(domain) for domain in domains]
)
return domain_class
def get_mces_from_term(
glossaryTerm: GlossaryTermConfig,
path: List[str],
@ -294,6 +327,7 @@ def get_mces_from_term(
parentOwnership: models.OwnershipClass,
defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[models.MetadataChangeEventClass, MetadataChangeProposalWrapper]]:
term_urn = make_glossary_term_urn(
path, glossaryTerm.id, ingestion_config.enable_auto_id
@ -388,6 +422,11 @@ def get_mces_from_term(
ownership = get_owners(glossaryTerm.owners)
aspects.append(ownership)
if glossaryTerm.domain is not None:
yield make_domain_mcp(
term_urn, get_domain_class(ctx.graph, [glossaryTerm.domain])
)
term_snapshot: models.GlossaryTermSnapshotClass = models.GlossaryTermSnapshotClass(
urn=term_urn,
aspects=aspects,
@ -450,7 +489,9 @@ class BusinessGlossaryFileSource(Source):
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
glossary_config = self.load_glossary_config(self.config.file)
populate_path_vs_id(glossary_config)
for event in get_mces(glossary_config, ingestion_config=self.config):
for event in get_mces(
glossary_config, ingestion_config=self.config, ctx=self.ctx
):
if isinstance(event, models.MetadataChangeEventClass):
wu = MetadataWorkUnit(f"{event.proposedSnapshot.urn}", mce=event)
self.report.report_workunit(wu)

View File

@ -26,6 +26,8 @@ nodes:
description: Highly Confidential Data
custom_properties:
is_confidential: true
domain: Marketing
- name: Personal Information
description: All terms related to personal information
owners:

View File

@ -145,6 +145,20 @@
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Classification.Highly Confidential",
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"value": "{\"domains\": [\"urn:li:domain:Marketing\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.GlossaryTermSnapshot": {

View File

@ -1,10 +0,0 @@
source:
type: datahub-business-glossary
config:
# Coordinates
file: ./business_glossary.yml
sink:
type: file
config:
filename: glossary_events.json

View File

@ -1,39 +1,69 @@
import shutil
from typing import List
from typing import Any, Dict, List
import pytest
from freezegun import freeze_time
from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.metadata import business_glossary
from tests.test_helpers import mce_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
FROZEN_TIME = "2020-04-14 07:00:00"
def get_default_recipe(
glossary_yml_file_path: str, event_output_file_path: str
) -> Dict[str, Any]:
return {
"source": {
"type": "datahub-business-glossary",
"config": {"file": glossary_yml_file_path},
},
"sink": {
"type": "file",
"config": {
"filename": event_output_file_path,
},
},
}
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_glossary_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
def test_glossary_ingest(
mock_datahub_graph, docker_compose_runner, pytestconfig, tmp_path, mock_time
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/business-glossary"
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "glossary_to_file.yml").resolve()
shutil.copy(test_resources_dir / "business_glossary.yml", tmp_path)
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)
# These paths change from one instance run of the clickhouse docker to the other, and the FROZEN_TIME does not apply to these.
# These paths change from one instance run of the clickhouse docker to the other,
# and the FROZEN_TIME does not apply to these.
ignore_paths: List[str] = [
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['metadata_modification_time'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['data_paths'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['metadata_path'\]",
]
output_mces_path: str = f"{tmp_path}/glossary_events.json"
golden_mces_path: str = f"{test_resources_dir}/glossary_events_golden.json"
pipeline = Pipeline.create(
get_default_recipe(
glossary_yml_file_path=f"{test_resources_dir}/business_glossary.yml",
event_output_file_path=output_mces_path,
)
)
pipeline.ctx.graph = mock_datahub_graph(
DatahubClientConfig()
) # Mock to resolve domain
pipeline.run()
pipeline.raise_from_status()
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
ignore_paths=ignore_paths,
output_path=tmp_path / "glossary_events.json",
golden_path=test_resources_dir / "glossary_events_golden.json",
output_path=output_mces_path,
golden_path=golden_mces_path,
)