From 6ff551cbcdb07a47cd7003cdb0648b450cfdb20f Mon Sep 17 00:00:00 2001 From: Edward Vaisman Date: Thu, 24 Feb 2022 20:02:38 -0500 Subject: [PATCH] feat(ingest): lineage-file - add ability to provide lineage manually through a file (#4116) --- docs/cli.md | 5 +- .../examples/bootstrap_data/file_lineage.yml | 31 +++ metadata-ingestion/setup.py | 2 + .../source_docs/file_lineage.md | 76 ++++++ .../src/datahub/configuration/common.py | 4 + .../ingestion/source/metadata/lineage.py | 192 +++++++++++++++ .../tests/unit/test_file_lineage_source.py | 222 ++++++++++++++++++ 7 files changed, 530 insertions(+), 2 deletions(-) create mode 100644 metadata-ingestion/examples/bootstrap_data/file_lineage.yml create mode 100644 metadata-ingestion/source_docs/file_lineage.md create mode 100644 metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py create mode 100644 metadata-ingestion/tests/unit/test_file_lineage_source.py diff --git a/docs/cli.md b/docs/cli.md index ff7704c499..3566631850 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -57,12 +57,13 @@ We use a plugin architecture so that you can install only the dependencies you a ### Sources -| Plugin Name | Install Command | Provides | -|-----------------------------------------------------------------|------------------------------------------------------------| ----------------------------------- | +| Plugin Name | Install Command | Provides | +|-------------------------------------------------------------------------------------|------------------------------------------------------------| ----------------------------------- | | [file](../metadata-ingestion/source_docs/file.md) | _included by default_ | File source and sink | | [athena](../metadata-ingestion/source_docs/athena.md) | `pip install 'acryl-datahub[athena]'` | AWS Athena source | | [bigquery](../metadata-ingestion/source_docs/bigquery.md) | `pip install 'acryl-datahub[bigquery]'` | BigQuery source | | [bigquery-usage](../metadata-ingestion/source_docs/bigquery.md) | `pip install 'acryl-datahub[bigquery-usage]'` | BigQuery usage statistics source | +| [datahub-lineage-file](../metadata-ingestion/source_docs/file_lineage.md) | _no additional dependencies_ | Lineage File source | | [datahub-business-glossary](../metadata-ingestion/source_docs/business_glossary.md) | _no additional dependencies_ | Business Glossary File source | | [dbt](../metadata-ingestion/source_docs/dbt.md) | _no additional dependencies_ | dbt source | | [druid](../metadata-ingestion/source_docs/druid.md) | `pip install 'acryl-datahub[druid]'` | Druid Source | diff --git a/metadata-ingestion/examples/bootstrap_data/file_lineage.yml b/metadata-ingestion/examples/bootstrap_data/file_lineage.yml new file mode 100644 index 0000000000..6ef8699546 --- /dev/null +++ b/metadata-ingestion/examples/bootstrap_data/file_lineage.yml @@ -0,0 +1,31 @@ +--- +version: 1 +lineage: + - entity: + name: topic3 + type: dataset + env: DEV + platform: kafka + upstream: + - entity: + name: topic2 + type: dataset + env: DEV + platform: kafka + - entity: + name: topic1 + type: dataset + env: DEV + platform: kafka + - entity: + name: topic2 + type: dataset + env: DEV + platform: kafka + upstream: + - entity: + name: kafka.topic2 + env: PROD + platform: snowflake + platform_instance: test + type: dataset \ No newline at end of file diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 943840279f..06efece69c 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -114,6 +114,7 @@ plugins: Dict[str, Set[str]] = { "bigquery-usage": bigquery_common | {"cachetools"}, "clickhouse": sql_common | {"clickhouse-sqlalchemy==0.1.8"}, "clickhouse-usage": sql_common | {"clickhouse-sqlalchemy==0.1.8"}, + "datahub-lineage-file": set(), "datahub-business-glossary": set(), "data-lake": {*aws_common, "pydeequ==1.0.1", "pyspark==3.0.3", "parse==1.19.0"}, "dbt": {"requests"}, @@ -314,6 +315,7 @@ entry_points = { "ldap = datahub.ingestion.source.ldap:LDAPSource", "looker = datahub.ingestion.source.looker:LookerDashboardSource", "lookml = datahub.ingestion.source.lookml:LookMLSource", + "datahub-lineage-file = datahub.ingestion.source.metadata.lineage:LineageFileSource", "datahub-business-glossary = datahub.ingestion.source.metadata.business_glossary:BusinessGlossaryFileSource", "mode = datahub.ingestion.source.mode:ModeSource", "mongodb = datahub.ingestion.source.mongodb:MongoDBSource", diff --git a/metadata-ingestion/source_docs/file_lineage.md b/metadata-ingestion/source_docs/file_lineage.md new file mode 100644 index 0000000000..b0c1eba18a --- /dev/null +++ b/metadata-ingestion/source_docs/file_lineage.md @@ -0,0 +1,76 @@ +# File Based Lineage + +For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). + +## Setup + +Works with `acryl-datahub` out of the box. + +## Capabilities + +This plugin pulls lineage metadata from a yaml-formatted file. An example of one such file is located in the examples +directory [here](../examples/bootstrap_data/file_lineage.yml). + +## Quickstart recipe + +Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration +options. + +For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes). + +```yml +source: + type: datahub-lineage-file + config: + # Coordinates + file: /path/to/file_lineage.yml + # Whether we want to query datahub-gms for upstream data + preserve_upstream: False + +sink: +# sink configs +``` + +## Config details + +Note that a `.` is used to denote nested fields in the YAML recipe. + +| Field | Required | Default | Description | +|---------------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `file` | ✅ | | Path to lineage file to ingest. | +| `preserve_upstream` | | `True` | Whether we want to query datahub-gms for upstream data. `False` means it will hard replace upstream data for a given entity. `True` means it will query the backend for existing upstreams and include it in the ingestion run | + +### Lineage File Format + +The lineage source file should be a `.yml` file with the following top-level keys: + +**version**: the version of lineage file config the config conforms to. Currently, the only version released +is `1`. + +**lineage**: the top level key of the lineage file containing a list of **EntityNodeConfig** objects + +**EntityNodeConfig**: + +- **entity**: **EntityConfig** object +- **upstream**: (optional) list of child **EntityNodeConfig** objects + +**EntityConfig**: + +- **name** : name of the entity +- **type**: type of the entity (only `dataset` is supported as of now) +- **env**: the environment of this entity. Should match the values in the + table [here](https://datahubproject.io/docs/graphql/enums/#fabrictype) +- **platform**: a valid platform like kafka, snowflake, etc.. +- **platform_instance**: optional string specifying the platform instance of this entity + +You can also view an example lineage file checked in [here](../examples/bootstrap_data/file_lineage.yml) + +## Compatibility + +Compatible with version 1 of lineage format. The source will be evolved as we publish newer versions of this +format. + +## Questions + +If you've got any questions on configuring this source, feel free to ping us +on [our Slack](https://slack.datahubproject.io/)! \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index 5e00712160..02710928a1 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -165,3 +165,7 @@ class KeyValuePattern(ConfigModel): """Return the list of allowed strings as a list, after taking into account deny patterns, if possible""" assert self.is_fully_specified_key() return self.rules + + +class VersionedConfig(ConfigModel): + version: str = "1" diff --git a/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py new file mode 100644 index 0000000000..e35de45c1a --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py @@ -0,0 +1,192 @@ +import logging +from dataclasses import dataclass, field +from typing import Any, Dict, Iterable, List, Optional, Union + +from pydantic import validator + +import datahub.metadata.schema_classes as models +from datahub.cli.cli_utils import get_aspects_for_entity +from datahub.configuration.common import ( + ConfigModel, + ConfigurationError, + VersionedConfig, +) +from datahub.configuration.config_loader import load_config_file +from datahub.configuration.source_common import EnvBasedSourceConfigBase +from datahub.emitter.mce_builder import ( + get_sys_time, + make_dataset_urn_with_platform_instance, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit + +logger = logging.getLogger(__name__) + +auditStamp = models.AuditStampClass( + time=get_sys_time(), actor="urn:li:corpUser:pythonEmitter" +) + + +class EntityConfig(EnvBasedSourceConfigBase): + name: str + type: str + platform: str + platform_instance: Optional[str] + + @validator("type") + def type_must_be_supported(cls, v: str) -> str: + allowed_types = ["dataset"] + if v not in allowed_types: + raise ConfigurationError( + f"Type must be one of {allowed_types}, {v} is not yet supported." + ) + return v + + +class EntityNodeConfig(ConfigModel): + entity: EntityConfig + upstream: Optional[List["EntityNodeConfig"]] + + +# https://pydantic-docs.helpmanual.io/usage/postponed_annotations/ required for when you reference a model within itself +EntityNodeConfig.update_forward_refs() + + +class LineageFileSourceConfig(ConfigModel): + file: str + preserve_upstream: bool = True + + +class LineageConfig(VersionedConfig): + lineage: List[EntityNodeConfig] + + @validator("version") + def version_must_be_1(cls, v): + if v != "1": + raise ValueError("Only version 1 is supported") + + +@dataclass +class LineageFileSource(Source): + config: LineageFileSourceConfig + report: SourceReport = field(default_factory=SourceReport) + + @classmethod + def create( + cls, config_dict: Dict[str, Any], ctx: PipelineContext + ) -> "LineageFileSource": + config = LineageFileSourceConfig.parse_obj(config_dict) + return cls(ctx, config) + + @staticmethod + def load_lineage_config(file_name: str) -> LineageConfig: + config = load_config_file(file_name) + lineage_config = LineageConfig.parse_obj(config) + return lineage_config + + @staticmethod + def get_lineage_metadata_change_event_proposal( + entities: List[EntityNodeConfig], preserve_upstream: bool + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Builds a list of events to be emitted to datahub by going through each entity and its upstream nodes + :param preserve_upstream: This field determines if we want to query the datahub backend to extract + the existing upstream lineages for each entity and preserve it + :param entities: A list of entities we want to build a proposal on + :return: Returns a list of metadata change event proposals to be emitted to datahub + """ + + def _get_entity_urn(entity_config: EntityConfig) -> Optional[str]: + """Helper inner function to extract a given entity_urn + A return value of None represents an unsupported entity type + """ + if entity_config.type == "dataset": + return make_dataset_urn_with_platform_instance( + platform=entity_config.platform, + name=entity_config.name, + env=entity_config.env, + platform_instance=entity_config.platform_instance, + ) + logger.warning(f"Entity type: {entity_config.type} is not supported!") + return None + + # loop through all the entities + for entity_node in entities: + new_upstreams: List[models.UpstreamClass] = [] + # if this entity has upstream nodes defined, we'll want to do some work. + # if no upstream nodes are present, we don't emit an MCP for it. + if entity_node.upstream: + entity = entity_node.entity + logger.info(f"Upstream detected for {entity}. Extracting urn...") + entity_urn = _get_entity_urn(entity) + if entity_urn: + # extract the old lineage and save it for the new mcp + if preserve_upstream: + old_upstream_lineage = get_aspects_for_entity( + entity_urn=entity_urn, + aspects=["upstreamLineage"], + typed=True, + ).get("upstreamLineage") + if old_upstream_lineage: + # Can't seem to get mypy to be happy about + # `Argument 1 to "list" has incompatible type "Optional[Any]"; + # expected "Iterable[UpstreamClass]"` + new_upstreams.extend( + old_upstream_lineage.get("upstreams") # type: ignore + ) + for upstream_entity_node in entity_node.upstream: + upstream_entity = upstream_entity_node.entity + upstream_entity_urn = _get_entity_urn(upstream_entity) + if upstream_entity_urn: + new_upstream = models.UpstreamClass( + dataset=upstream_entity_urn, + type=models.DatasetLineageTypeClass.TRANSFORMED, + auditStamp=auditStamp, + ) + new_upstreams.append(new_upstream) + else: + logger.warning( + f"Entity type: {upstream_entity.type} is unsupported. Upstream lineage will be skipped " + f"for {upstream_entity.name}->{entity.name}" + ) + new_upstream_lineage = models.UpstreamLineageClass( + upstreams=new_upstreams + ) + yield MetadataChangeProposalWrapper( + entityType=entity.type, + changeType=models.ChangeTypeClass.UPSERT, + entityUrn=entity_urn, + aspectName="upstreamLineage", + aspect=new_upstream_lineage, + ) + else: + logger.warning( + f"Entity type: {entity.type} is unsupported. Entity node {entity.name} and its " + f"upstream lineages will be skipped" + ) + + def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]: + lineage_config = self.load_lineage_config(self.config.file) + lineage = lineage_config.lineage + preserve_upstream = self.config.preserve_upstream + logger.debug(lineage_config) + logger.info(f"preserve_upstream is set to {self.config.preserve_upstream}") + for ( + metadata_change_event_proposal + ) in self.get_lineage_metadata_change_event_proposal( + lineage, preserve_upstream + ): + work_unit = MetadataWorkUnit( + f"lineage-{metadata_change_event_proposal.entityUrn}", + mcp=metadata_change_event_proposal, + ) + self.report.report_workunit(work_unit) + yield work_unit + + def get_report(self): + return self.report + + def close(self): + pass diff --git a/metadata-ingestion/tests/unit/test_file_lineage_source.py b/metadata-ingestion/tests/unit/test_file_lineage_source.py new file mode 100644 index 0000000000..703f17cd77 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_file_lineage_source.py @@ -0,0 +1,222 @@ +import logging +from typing import List + +import pytest +import yaml + +from datahub.configuration.common import ConfigurationError +from datahub.ingestion.source.metadata.lineage import LineageConfig, LineageFileSource +from datahub.metadata.schema_classes import UpstreamClass + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def basic_mcp(): + """ + The below mcp should represent a lineage that looks like this + topic1 - + ->topic3 + topic2 - + :return: + """ + sample_lineage = """ + lineage: + - entity: + name: topic3 + type: dataset + env: DEV + platform: kafka + upstream: + - entity: + name: topic1 + type: dataset + env: DEV + platform: kafka + - entity: + name: topic2 + type: dataset + env: DEV + platform: kafka + """ + config = yaml.safe_load(sample_lineage) + lineage_config: LineageConfig = LineageConfig.parse_obj(config) + mcp = list( + LineageFileSource.get_lineage_metadata_change_event_proposal( + entities=lineage_config.lineage, preserve_upstream=False + ) + ) + return mcp + + +def unsupported_entity_type_mcp(): + sample_lineage = """ + lineage: + - entity: + name: topic3 + type: NotSupported! + env: DEV + platform: kafka + upstream: + - entity: + name: topic1 + type: dataset + env: DEV + platform: kafka + - entity: + name: topic2 + type: dataset + env: DEV + platform: kafka + - entity: + name: topic6 + type: dataset + env: DEV + platform: kafka + upstream: + - entity: + name: topic4 + type: dataset + env: DEV + platform: kafka + - entity: + name: topic5 + type: dataset + env: DEV + platform: kafka + """ + config = yaml.safe_load(sample_lineage) + lineage_config: LineageConfig = LineageConfig.parse_obj(config) + mcp = list( + LineageFileSource.get_lineage_metadata_change_event_proposal( + entities=lineage_config.lineage, preserve_upstream=False + ) + ) + return mcp + + +def unsupported_upstream_entity_type_mcp(): + sample_lineage = """ + lineage: + - entity: + name: topic3 + type: dataset + env: DEV + platform: kafka + upstream: + - entity: + name: topic1 + type: NotSupported + env: DEV + platform: kafka + - entity: + name: topic2 + type: dataset + env: DEV + platform: kafka + """ + config = yaml.safe_load(sample_lineage) + lineage_config: LineageConfig = LineageConfig.parse_obj(config) + mcp = list( + LineageFileSource.get_lineage_metadata_change_event_proposal( + entities=lineage_config.lineage, preserve_upstream=False + ) + ) + return mcp + + +def unsupported_entity_env_mcp(): + sample_lineage = """ + lineage: + - entity: + name: topic3 + type: dataset + env: NotSupported! + platform: kafka + upstream: + - entity: + name: topic1 + type: dataset + env: DEV + platform: kafka + - entity: + name: topic2 + type: dataset + env: DEV + platform: kafka + - entity: + name: topic6 + type: dataset + env: DEV + platform: kafka + upstream: + - entity: + name: topic4 + type: dataset + env: DEV + platform: kafka + - entity: + name: topic5 + type: dataset + env: DEV + platform: kafka + """ + config = yaml.safe_load(sample_lineage) + lineage_config: LineageConfig = LineageConfig.parse_obj(config) + mcp = list( + LineageFileSource.get_lineage_metadata_change_event_proposal( + entities=lineage_config.lineage, preserve_upstream=False + ) + ) + return mcp + + +def test_basic_lineage_entity_root_node_urn(basic_mcp): + """ + Checks to see if the entityUrn extracted is correct for the root entity node + """ + + assert ( + basic_mcp[0].entityUrn + == "urn:li:dataset:(urn:li:dataPlatform:kafka,topic3,DEV)" + ) + + +def test_basic_lineage_upstream_urns(basic_mcp): + """ + Checks to see if the upstream urns are correct for a basic_mcp example + """ + basic_mcp_upstreams: List[UpstreamClass] = basic_mcp[0].aspect.upstreams + assert ( + basic_mcp_upstreams[0].dataset + == "urn:li:dataset:(urn:li:dataPlatform:kafka,topic1,DEV)" + and basic_mcp_upstreams[1].dataset + == "urn:li:dataset:(urn:li:dataPlatform:kafka,topic2,DEV)" + ) + + +def test_unsupported_entity_type(): + """ + Checks to see how we handle the case of unsupported entity types. + If validation is working correctly, it should raise a ConfigurationError + """ + with pytest.raises(ConfigurationError): + unsupported_entity_type_mcp() + + +def test_unsupported_upstream_entity_type(): + """ + Checks to see how invalid types work in the upstream node. + If validation is working correctly, it should raise a ConfigurationError + """ + with pytest.raises(ConfigurationError): + unsupported_upstream_entity_type_mcp() + + +def test_unsupported_entity_env(): + """ + Checks to see how invalid envs work. + If validation is working correctly, it should raise a ConfigurationError + """ + with pytest.raises(ConfigurationError): + unsupported_entity_env_mcp()