feat(ingest): lineage-file - add ability to provide lineage manually through a file (#4116)

This commit is contained in:
Edward Vaisman 2022-02-24 20:02:38 -05:00 committed by GitHub
parent f17c033d51
commit 6ff551cbcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 530 additions and 2 deletions

View File

@ -57,12 +57,13 @@ We use a plugin architecture so that you can install only the dependencies you a
### Sources
| Plugin Name | Install Command | Provides |
|-----------------------------------------------------------------|------------------------------------------------------------| ----------------------------------- |
| Plugin Name | Install Command | Provides |
|-------------------------------------------------------------------------------------|------------------------------------------------------------| ----------------------------------- |
| [file](../metadata-ingestion/source_docs/file.md) | _included by default_ | File source and sink |
| [athena](../metadata-ingestion/source_docs/athena.md) | `pip install 'acryl-datahub[athena]'` | AWS Athena source |
| [bigquery](../metadata-ingestion/source_docs/bigquery.md) | `pip install 'acryl-datahub[bigquery]'` | BigQuery source |
| [bigquery-usage](../metadata-ingestion/source_docs/bigquery.md) | `pip install 'acryl-datahub[bigquery-usage]'` | BigQuery usage statistics source |
| [datahub-lineage-file](../metadata-ingestion/source_docs/file_lineage.md) | _no additional dependencies_ | Lineage File source |
| [datahub-business-glossary](../metadata-ingestion/source_docs/business_glossary.md) | _no additional dependencies_ | Business Glossary File source |
| [dbt](../metadata-ingestion/source_docs/dbt.md) | _no additional dependencies_ | dbt source |
| [druid](../metadata-ingestion/source_docs/druid.md) | `pip install 'acryl-datahub[druid]'` | Druid Source |

View File

@ -0,0 +1,31 @@
---
version: 1
lineage:
- entity:
name: topic3
type: dataset
env: DEV
platform: kafka
upstream:
- entity:
name: topic2
type: dataset
env: DEV
platform: kafka
- entity:
name: topic1
type: dataset
env: DEV
platform: kafka
- entity:
name: topic2
type: dataset
env: DEV
platform: kafka
upstream:
- entity:
name: kafka.topic2
env: PROD
platform: snowflake
platform_instance: test
type: dataset

View File

@ -114,6 +114,7 @@ plugins: Dict[str, Set[str]] = {
"bigquery-usage": bigquery_common | {"cachetools"},
"clickhouse": sql_common | {"clickhouse-sqlalchemy==0.1.8"},
"clickhouse-usage": sql_common | {"clickhouse-sqlalchemy==0.1.8"},
"datahub-lineage-file": set(),
"datahub-business-glossary": set(),
"data-lake": {*aws_common, "pydeequ==1.0.1", "pyspark==3.0.3", "parse==1.19.0"},
"dbt": {"requests"},
@ -314,6 +315,7 @@ entry_points = {
"ldap = datahub.ingestion.source.ldap:LDAPSource",
"looker = datahub.ingestion.source.looker:LookerDashboardSource",
"lookml = datahub.ingestion.source.lookml:LookMLSource",
"datahub-lineage-file = datahub.ingestion.source.metadata.lineage:LineageFileSource",
"datahub-business-glossary = datahub.ingestion.source.metadata.business_glossary:BusinessGlossaryFileSource",
"mode = datahub.ingestion.source.mode:ModeSource",
"mongodb = datahub.ingestion.source.mongodb:MongoDBSource",

View File

@ -0,0 +1,76 @@
# File Based Lineage
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
## Setup
Works with `acryl-datahub` out of the box.
## Capabilities
This plugin pulls lineage metadata from a yaml-formatted file. An example of one such file is located in the examples
directory [here](../examples/bootstrap_data/file_lineage.yml).
## Quickstart recipe
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration
options.
For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes).
```yml
source:
type: datahub-lineage-file
config:
# Coordinates
file: /path/to/file_lineage.yml
# Whether we want to query datahub-gms for upstream data
preserve_upstream: False
sink:
# sink configs
```
## Config details
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
|---------------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `file` | ✅ | | Path to lineage file to ingest. |
| `preserve_upstream` | | `True` | Whether we want to query datahub-gms for upstream data. `False` means it will hard replace upstream data for a given entity. `True` means it will query the backend for existing upstreams and include it in the ingestion run |
### Lineage File Format
The lineage source file should be a `.yml` file with the following top-level keys:
**version**: the version of lineage file config the config conforms to. Currently, the only version released
is `1`.
**lineage**: the top level key of the lineage file containing a list of **EntityNodeConfig** objects
**EntityNodeConfig**:
- **entity**: **EntityConfig** object
- **upstream**: (optional) list of child **EntityNodeConfig** objects
**EntityConfig**:
- **name** : name of the entity
- **type**: type of the entity (only `dataset` is supported as of now)
- **env**: the environment of this entity. Should match the values in the
table [here](https://datahubproject.io/docs/graphql/enums/#fabrictype)
- **platform**: a valid platform like kafka, snowflake, etc..
- **platform_instance**: optional string specifying the platform instance of this entity
You can also view an example lineage file checked in [here](../examples/bootstrap_data/file_lineage.yml)
## Compatibility
Compatible with version 1 of lineage format. The source will be evolved as we publish newer versions of this
format.
## Questions
If you've got any questions on configuring this source, feel free to ping us
on [our Slack](https://slack.datahubproject.io/)!

View File

@ -165,3 +165,7 @@ class KeyValuePattern(ConfigModel):
"""Return the list of allowed strings as a list, after taking into account deny patterns, if possible"""
assert self.is_fully_specified_key()
return self.rules
class VersionedConfig(ConfigModel):
version: str = "1"

View File

@ -0,0 +1,192 @@
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Union
from pydantic import validator
import datahub.metadata.schema_classes as models
from datahub.cli.cli_utils import get_aspects_for_entity
from datahub.configuration.common import (
ConfigModel,
ConfigurationError,
VersionedConfig,
)
from datahub.configuration.config_loader import load_config_file
from datahub.configuration.source_common import EnvBasedSourceConfigBase
from datahub.emitter.mce_builder import (
get_sys_time,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
logger = logging.getLogger(__name__)
auditStamp = models.AuditStampClass(
time=get_sys_time(), actor="urn:li:corpUser:pythonEmitter"
)
class EntityConfig(EnvBasedSourceConfigBase):
name: str
type: str
platform: str
platform_instance: Optional[str]
@validator("type")
def type_must_be_supported(cls, v: str) -> str:
allowed_types = ["dataset"]
if v not in allowed_types:
raise ConfigurationError(
f"Type must be one of {allowed_types}, {v} is not yet supported."
)
return v
class EntityNodeConfig(ConfigModel):
entity: EntityConfig
upstream: Optional[List["EntityNodeConfig"]]
# https://pydantic-docs.helpmanual.io/usage/postponed_annotations/ required for when you reference a model within itself
EntityNodeConfig.update_forward_refs()
class LineageFileSourceConfig(ConfigModel):
file: str
preserve_upstream: bool = True
class LineageConfig(VersionedConfig):
lineage: List[EntityNodeConfig]
@validator("version")
def version_must_be_1(cls, v):
if v != "1":
raise ValueError("Only version 1 is supported")
@dataclass
class LineageFileSource(Source):
config: LineageFileSourceConfig
report: SourceReport = field(default_factory=SourceReport)
@classmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext
) -> "LineageFileSource":
config = LineageFileSourceConfig.parse_obj(config_dict)
return cls(ctx, config)
@staticmethod
def load_lineage_config(file_name: str) -> LineageConfig:
config = load_config_file(file_name)
lineage_config = LineageConfig.parse_obj(config)
return lineage_config
@staticmethod
def get_lineage_metadata_change_event_proposal(
entities: List[EntityNodeConfig], preserve_upstream: bool
) -> Iterable[MetadataChangeProposalWrapper]:
"""
Builds a list of events to be emitted to datahub by going through each entity and its upstream nodes
:param preserve_upstream: This field determines if we want to query the datahub backend to extract
the existing upstream lineages for each entity and preserve it
:param entities: A list of entities we want to build a proposal on
:return: Returns a list of metadata change event proposals to be emitted to datahub
"""
def _get_entity_urn(entity_config: EntityConfig) -> Optional[str]:
"""Helper inner function to extract a given entity_urn
A return value of None represents an unsupported entity type
"""
if entity_config.type == "dataset":
return make_dataset_urn_with_platform_instance(
platform=entity_config.platform,
name=entity_config.name,
env=entity_config.env,
platform_instance=entity_config.platform_instance,
)
logger.warning(f"Entity type: {entity_config.type} is not supported!")
return None
# loop through all the entities
for entity_node in entities:
new_upstreams: List[models.UpstreamClass] = []
# if this entity has upstream nodes defined, we'll want to do some work.
# if no upstream nodes are present, we don't emit an MCP for it.
if entity_node.upstream:
entity = entity_node.entity
logger.info(f"Upstream detected for {entity}. Extracting urn...")
entity_urn = _get_entity_urn(entity)
if entity_urn:
# extract the old lineage and save it for the new mcp
if preserve_upstream:
old_upstream_lineage = get_aspects_for_entity(
entity_urn=entity_urn,
aspects=["upstreamLineage"],
typed=True,
).get("upstreamLineage")
if old_upstream_lineage:
# Can't seem to get mypy to be happy about
# `Argument 1 to "list" has incompatible type "Optional[Any]";
# expected "Iterable[UpstreamClass]"`
new_upstreams.extend(
old_upstream_lineage.get("upstreams") # type: ignore
)
for upstream_entity_node in entity_node.upstream:
upstream_entity = upstream_entity_node.entity
upstream_entity_urn = _get_entity_urn(upstream_entity)
if upstream_entity_urn:
new_upstream = models.UpstreamClass(
dataset=upstream_entity_urn,
type=models.DatasetLineageTypeClass.TRANSFORMED,
auditStamp=auditStamp,
)
new_upstreams.append(new_upstream)
else:
logger.warning(
f"Entity type: {upstream_entity.type} is unsupported. Upstream lineage will be skipped "
f"for {upstream_entity.name}->{entity.name}"
)
new_upstream_lineage = models.UpstreamLineageClass(
upstreams=new_upstreams
)
yield MetadataChangeProposalWrapper(
entityType=entity.type,
changeType=models.ChangeTypeClass.UPSERT,
entityUrn=entity_urn,
aspectName="upstreamLineage",
aspect=new_upstream_lineage,
)
else:
logger.warning(
f"Entity type: {entity.type} is unsupported. Entity node {entity.name} and its "
f"upstream lineages will be skipped"
)
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
lineage_config = self.load_lineage_config(self.config.file)
lineage = lineage_config.lineage
preserve_upstream = self.config.preserve_upstream
logger.debug(lineage_config)
logger.info(f"preserve_upstream is set to {self.config.preserve_upstream}")
for (
metadata_change_event_proposal
) in self.get_lineage_metadata_change_event_proposal(
lineage, preserve_upstream
):
work_unit = MetadataWorkUnit(
f"lineage-{metadata_change_event_proposal.entityUrn}",
mcp=metadata_change_event_proposal,
)
self.report.report_workunit(work_unit)
yield work_unit
def get_report(self):
return self.report
def close(self):
pass

View File

@ -0,0 +1,222 @@
import logging
from typing import List
import pytest
import yaml
from datahub.configuration.common import ConfigurationError
from datahub.ingestion.source.metadata.lineage import LineageConfig, LineageFileSource
from datahub.metadata.schema_classes import UpstreamClass
logger = logging.getLogger(__name__)
@pytest.fixture
def basic_mcp():
"""
The below mcp should represent a lineage that looks like this
topic1 -
->topic3
topic2 -
:return:
"""
sample_lineage = """
lineage:
- entity:
name: topic3
type: dataset
env: DEV
platform: kafka
upstream:
- entity:
name: topic1
type: dataset
env: DEV
platform: kafka
- entity:
name: topic2
type: dataset
env: DEV
platform: kafka
"""
config = yaml.safe_load(sample_lineage)
lineage_config: LineageConfig = LineageConfig.parse_obj(config)
mcp = list(
LineageFileSource.get_lineage_metadata_change_event_proposal(
entities=lineage_config.lineage, preserve_upstream=False
)
)
return mcp
def unsupported_entity_type_mcp():
sample_lineage = """
lineage:
- entity:
name: topic3
type: NotSupported!
env: DEV
platform: kafka
upstream:
- entity:
name: topic1
type: dataset
env: DEV
platform: kafka
- entity:
name: topic2
type: dataset
env: DEV
platform: kafka
- entity:
name: topic6
type: dataset
env: DEV
platform: kafka
upstream:
- entity:
name: topic4
type: dataset
env: DEV
platform: kafka
- entity:
name: topic5
type: dataset
env: DEV
platform: kafka
"""
config = yaml.safe_load(sample_lineage)
lineage_config: LineageConfig = LineageConfig.parse_obj(config)
mcp = list(
LineageFileSource.get_lineage_metadata_change_event_proposal(
entities=lineage_config.lineage, preserve_upstream=False
)
)
return mcp
def unsupported_upstream_entity_type_mcp():
sample_lineage = """
lineage:
- entity:
name: topic3
type: dataset
env: DEV
platform: kafka
upstream:
- entity:
name: topic1
type: NotSupported
env: DEV
platform: kafka
- entity:
name: topic2
type: dataset
env: DEV
platform: kafka
"""
config = yaml.safe_load(sample_lineage)
lineage_config: LineageConfig = LineageConfig.parse_obj(config)
mcp = list(
LineageFileSource.get_lineage_metadata_change_event_proposal(
entities=lineage_config.lineage, preserve_upstream=False
)
)
return mcp
def unsupported_entity_env_mcp():
sample_lineage = """
lineage:
- entity:
name: topic3
type: dataset
env: NotSupported!
platform: kafka
upstream:
- entity:
name: topic1
type: dataset
env: DEV
platform: kafka
- entity:
name: topic2
type: dataset
env: DEV
platform: kafka
- entity:
name: topic6
type: dataset
env: DEV
platform: kafka
upstream:
- entity:
name: topic4
type: dataset
env: DEV
platform: kafka
- entity:
name: topic5
type: dataset
env: DEV
platform: kafka
"""
config = yaml.safe_load(sample_lineage)
lineage_config: LineageConfig = LineageConfig.parse_obj(config)
mcp = list(
LineageFileSource.get_lineage_metadata_change_event_proposal(
entities=lineage_config.lineage, preserve_upstream=False
)
)
return mcp
def test_basic_lineage_entity_root_node_urn(basic_mcp):
"""
Checks to see if the entityUrn extracted is correct for the root entity node
"""
assert (
basic_mcp[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:kafka,topic3,DEV)"
)
def test_basic_lineage_upstream_urns(basic_mcp):
"""
Checks to see if the upstream urns are correct for a basic_mcp example
"""
basic_mcp_upstreams: List[UpstreamClass] = basic_mcp[0].aspect.upstreams
assert (
basic_mcp_upstreams[0].dataset
== "urn:li:dataset:(urn:li:dataPlatform:kafka,topic1,DEV)"
and basic_mcp_upstreams[1].dataset
== "urn:li:dataset:(urn:li:dataPlatform:kafka,topic2,DEV)"
)
def test_unsupported_entity_type():
"""
Checks to see how we handle the case of unsupported entity types.
If validation is working correctly, it should raise a ConfigurationError
"""
with pytest.raises(ConfigurationError):
unsupported_entity_type_mcp()
def test_unsupported_upstream_entity_type():
"""
Checks to see how invalid types work in the upstream node.
If validation is working correctly, it should raise a ConfigurationError
"""
with pytest.raises(ConfigurationError):
unsupported_upstream_entity_type_mcp()
def test_unsupported_entity_env():
"""
Checks to see how invalid envs work.
If validation is working correctly, it should raise a ConfigurationError
"""
with pytest.raises(ConfigurationError):
unsupported_entity_env_mcp()