feat(ingest): support incremental column-level lineage (#10090)

This commit is contained in:
Harshal Sheth 2024-03-21 01:18:12 -07:00 committed by GitHub
parent 6c3834b38c
commit 8c21b178df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 114 additions and 471 deletions

View File

@ -1,18 +1,14 @@
import copy from typing import Iterable, Optional
from typing import Dict, Iterable, Optional
from pydantic.fields import Field from pydantic.fields import Field
from datahub.configuration.common import ConfigModel from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import datahub_guid, set_aspect from datahub.emitter.mce_builder import datahub_guid, set_aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
FineGrainedLineageClass, FineGrainedLineageClass,
MetadataChangeEventClass, MetadataChangeEventClass,
SystemMetadataClass, SystemMetadataClass,
UpstreamClass,
UpstreamLineageClass, UpstreamLineageClass,
) )
from datahub.specific.dataset import DatasetPatchBuilder from datahub.specific.dataset import DatasetPatchBuilder
@ -29,7 +25,7 @@ def convert_upstream_lineage_to_patch(
for fine_upstream in aspect.fineGrainedLineages or []: for fine_upstream in aspect.fineGrainedLineages or []:
patch_builder.add_fine_grained_upstream_lineage(fine_upstream) patch_builder.add_fine_grained_upstream_lineage(fine_upstream)
mcp = next(iter(patch_builder.build())) mcp = next(iter(patch_builder.build()))
return MetadataWorkUnit(id=f"{urn}-upstreamLineage", mcp_raw=mcp) return MetadataWorkUnit(id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp)
def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str: def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str:
@ -42,73 +38,7 @@ def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str:
) )
def _merge_upstream_lineage(
new_aspect: UpstreamLineageClass, gms_aspect: UpstreamLineageClass
) -> UpstreamLineageClass:
merged_aspect = copy.deepcopy(gms_aspect)
upstreams_map: Dict[str, UpstreamClass] = {
upstream.dataset: upstream for upstream in merged_aspect.upstreams
}
upstreams_updated = False
fine_upstreams_updated = False
for table_upstream in new_aspect.upstreams:
if table_upstream.dataset not in upstreams_map or (
table_upstream.auditStamp.time
> upstreams_map[table_upstream.dataset].auditStamp.time
):
upstreams_map[table_upstream.dataset] = table_upstream
upstreams_updated = True
if upstreams_updated:
merged_aspect.upstreams = list(upstreams_map.values())
if new_aspect.fineGrainedLineages and merged_aspect.fineGrainedLineages:
fine_upstreams_map: Dict[str, FineGrainedLineageClass] = {
get_fine_grained_lineage_key(fine_upstream): fine_upstream
for fine_upstream in merged_aspect.fineGrainedLineages
}
for column_upstream in new_aspect.fineGrainedLineages:
column_upstream_key = get_fine_grained_lineage_key(column_upstream)
if column_upstream_key not in fine_upstreams_map or (
column_upstream.confidenceScore
> fine_upstreams_map[column_upstream_key].confidenceScore
):
fine_upstreams_map[column_upstream_key] = column_upstream
fine_upstreams_updated = True
if fine_upstreams_updated:
merged_aspect.fineGrainedLineages = list(fine_upstreams_map.values())
else:
merged_aspect.fineGrainedLineages = (
new_aspect.fineGrainedLineages or gms_aspect.fineGrainedLineages
)
return merged_aspect
def _lineage_wu_via_read_modify_write(
graph: DataHubGraph,
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
gms_aspect = graph.get_aspect(urn, UpstreamLineageClass)
if gms_aspect:
new_aspect = _merge_upstream_lineage(aspect, gms_aspect)
else:
new_aspect = aspect
return MetadataChangeProposalWrapper(
entityUrn=urn, aspect=new_aspect, systemMetadata=system_metadata
).as_workunit()
def auto_incremental_lineage( def auto_incremental_lineage(
graph: Optional[DataHubGraph],
incremental_lineage: bool, incremental_lineage: bool,
stream: Iterable[MetadataWorkUnit], stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
@ -117,35 +47,23 @@ def auto_incremental_lineage(
return # early exit return # early exit
for wu in stream: for wu in stream:
urn = wu.get_urn()
lineage_aspect: Optional[UpstreamLineageClass] = wu.get_aspect_of_type( lineage_aspect: Optional[UpstreamLineageClass] = wu.get_aspect_of_type(
UpstreamLineageClass UpstreamLineageClass
) )
urn = wu.get_urn() if isinstance(wu.metadata, MetadataChangeEventClass):
set_aspect(
wu.metadata, None, UpstreamLineageClass
) # we'll handle upstreamLineage separately below
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu
if lineage_aspect: if lineage_aspect:
if isinstance(wu.metadata, MetadataChangeEventClass): if lineage_aspect.upstreams:
set_aspect(
wu.metadata, None, UpstreamLineageClass
) # we'll emit upstreamLineage separately below
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu
# TODO: Replace with CLL patch now that we have support for it.
if lineage_aspect.fineGrainedLineages:
if graph is None:
raise ValueError(
"Failed to handle incremental lineage, DataHubGraph is missing. "
"Use `datahub-rest` sink OR provide `datahub-api` config in recipe. "
)
yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
)
elif lineage_aspect.upstreams:
yield convert_upstream_lineage_to_patch( yield convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata urn, lineage_aspect, wu.metadata.systemMetadata
) )
else:
yield wu
class IncrementalLineageConfigMixin(ConfigModel): class IncrementalLineageConfigMixin(ConfigModel):

View File

@ -1,4 +1,5 @@
import atexit import atexit
import functools
import logging import logging
import os import os
import re import re
@ -27,6 +28,7 @@ from datahub.ingestion.api.decorators import (
platform_name, platform_name,
support_status, support_status,
) )
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.source import ( from datahub.ingestion.api.source import (
CapabilityReport, CapabilityReport,
MetadataWorkUnitProcessor, MetadataWorkUnitProcessor,
@ -167,11 +169,6 @@ def cleanup(config: BigQueryV2Config) -> None:
SourceCapability.USAGE_STATS, SourceCapability.USAGE_STATS,
"Enabled by default, can be disabled via configuration `include_usage_statistics`", "Enabled by default, can be disabled via configuration `include_usage_statistics`",
) )
@capability(
SourceCapability.DELETION_DETECTION,
"Optionally enabled via `stateful_ingestion.remove_stale_metadata`",
supported=True,
)
@capability( @capability(
SourceCapability.CLASSIFICATION, SourceCapability.CLASSIFICATION,
"Optionally enabled via `classification.enabled`", "Optionally enabled via `classification.enabled`",
@ -574,6 +571,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [ return [
*super().get_workunit_processors(), *super().get_workunit_processors(),
functools.partial(
auto_incremental_lineage, self.config.incremental_lineage
),
StaleEntityRemovalHandler.create( StaleEntityRemovalHandler.create(
self, self.config, self.ctx self, self.config, self.ctx
).workunit_processor, ).workunit_processor,

View File

@ -3,7 +3,6 @@ import os
from datetime import timedelta from datetime import timedelta
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import pydantic
from google.cloud import bigquery from google.cloud import bigquery
from google.cloud.logging_v2.client import Client as GCPLoggingClient from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator
@ -212,21 +211,11 @@ class BigQueryV2Config(
) )
extract_column_lineage: bool = Field( extract_column_lineage: bool = Field(
# TODO: Flip this default to True once we support patching column-level lineage.
default=False, default=False,
description="If enabled, generate column level lineage. " description="If enabled, generate column level lineage. "
"Requires lineage_use_sql_parser to be enabled. " "Requires lineage_use_sql_parser to be enabled.",
"This and `incremental_lineage` cannot both be enabled.",
) )
@pydantic.validator("extract_column_lineage")
def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
if v and values.get("incremental_lineage"):
raise ValueError(
"Cannot enable `extract_column_lineage` and `incremental_lineage` at the same time."
)
return v
extract_lineage_from_catalog: bool = Field( extract_lineage_from_catalog: bool = Field(
default=False, default=False,
description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage", description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage",

View File

@ -108,7 +108,7 @@ class RedshiftConfig(
) )
use_lineage_v2: bool = Field( use_lineage_v2: bool = Field(
default=False, default=True,
description="Whether to use the new SQL-based lineage collector.", description="Whether to use the new SQL-based lineage collector.",
) )
lineage_v2_generate_queries: bool = Field( lineage_v2_generate_queries: bool = Field(

View File

@ -1,7 +1,7 @@
import functools
import itertools import itertools
import logging import logging
from collections import defaultdict from collections import defaultdict
from functools import partial
from typing import Dict, Iterable, List, Optional, Type, Union from typing import Dict, Iterable, List, Optional, Type, Union
import humanfriendly import humanfriendly
@ -402,10 +402,8 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [ return [
*super().get_workunit_processors(), *super().get_workunit_processors(),
partial( functools.partial(
auto_incremental_lineage, auto_incremental_lineage, self.config.incremental_lineage
self.ctx.graph,
self.config.incremental_lineage,
), ),
StaleEntityRemovalHandler.create( StaleEntityRemovalHandler.create(
self, self.config, self.ctx self, self.config, self.ctx

View File

@ -1,10 +1,10 @@
import functools
import json import json
import logging import logging
import os import os
import os.path import os.path
import platform import platform
from dataclasses import dataclass from dataclasses import dataclass
from functools import partial
from typing import Callable, Dict, Iterable, List, Optional, Union from typing import Callable, Dict, Iterable, List, Optional, Union
from snowflake.connector import SnowflakeConnection from snowflake.connector import SnowflakeConnection
@ -510,10 +510,8 @@ class SnowflakeV2Source(
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [ return [
*super().get_workunit_processors(), *super().get_workunit_processors(),
partial( functools.partial(
auto_incremental_lineage, auto_incremental_lineage, self.config.incremental_lineage
self.ctx.graph,
self.config.incremental_lineage,
), ),
StaleEntityRemovalHandler.create( StaleEntityRemovalHandler.create(
self, self.config, self.ctx self, self.config, self.ctx

View File

@ -1,5 +1,6 @@
import contextlib import contextlib
import datetime import datetime
import functools
import logging import logging
import traceback import traceback
from dataclasses import dataclass, field from dataclasses import dataclass, field
@ -514,10 +515,8 @@ class SQLAlchemySource(StatefulIngestionSourceBase, TestableSource):
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [ return [
*super().get_workunit_processors(), *super().get_workunit_processors(),
partial( functools.partial(
auto_incremental_lineage, auto_incremental_lineage, self.config.incremental_lineage
self.ctx.graph,
self.config.incremental_lineage,
), ),
StaleEntityRemovalHandler.create( StaleEntityRemovalHandler.create(
self, self.config, self.ctx self, self.config, self.ctx

View File

@ -1,106 +0,0 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD)",
"type": "TRANSFORMED"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_b)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_c)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_a)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_b)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_b)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_c)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_c)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)"
],
"confidenceScore": 1.0
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "run-id",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -1,120 +0,0 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream3,PROD)",
"type": "TRANSFORMED"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_a)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_a)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream3,PROD),col_a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_b)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_b)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream3,PROD),col_b)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_c)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_c)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream3,PROD),col_c)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_a)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_b)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_b)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_c)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_c)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)"
],
"confidenceScore": 1.0
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "run-id",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -0,0 +1,83 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD)",
"changeType": "PATCH",
"aspectName": "upstreamLineage",
"aspect": {
"json": [
{
"op": "add",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD)",
"value": {
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD)",
"type": "TRANSFORMED"
}
},
{
"op": "add",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD)",
"value": {
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD)",
"type": "TRANSFORMED"
}
},
{
"op": "add",
"path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_a)",
"value": {
"confidenceScore": 1.0
}
},
{
"op": "add",
"path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_a)",
"value": {
"confidenceScore": 1.0
}
},
{
"op": "add",
"path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_b)",
"value": {
"confidenceScore": 1.0
}
},
{
"op": "add",
"path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_b)",
"value": {
"confidenceScore": 1.0
}
},
{
"op": "add",
"path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_c)",
"value": {
"confidenceScore": 1.0
}
},
{
"op": "add",
"path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_c)",
"value": {
"confidenceScore": 1.0
}
}
]
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "run-id",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -1,13 +1,9 @@
from typing import List, Optional from typing import List
from unittest.mock import MagicMock
import pytest
import datahub.metadata.schema_classes as models import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.sink.file import write_metadata_file from datahub.ingestion.sink.file import write_metadata_file
from tests.test_helpers import mce_helpers from tests.test_helpers import mce_helpers
@ -86,7 +82,6 @@ def test_incremental_table_lineage(tmp_path, pytestconfig):
aspect = base_table_lineage_aspect() aspect = base_table_lineage_aspect()
processed_wus = auto_incremental_lineage( processed_wus = auto_incremental_lineage(
graph=None,
incremental_lineage=True, incremental_lineage=True,
stream=[ stream=[
MetadataChangeProposalWrapper( MetadataChangeProposalWrapper(
@ -113,7 +108,6 @@ def test_incremental_table_lineage_empty_upstreams(tmp_path, pytestconfig):
) )
processed_wus = auto_incremental_lineage( processed_wus = auto_incremental_lineage(
graph=None,
incremental_lineage=True, incremental_lineage=True,
stream=[ stream=[
MetadataChangeProposalWrapper( MetadataChangeProposalWrapper(
@ -125,125 +119,15 @@ def test_incremental_table_lineage_empty_upstreams(tmp_path, pytestconfig):
assert [wu.metadata for wu in processed_wus] == [] assert [wu.metadata for wu in processed_wus] == []
@pytest.mark.parametrize( def test_incremental_column_lineage(tmp_path, pytestconfig):
"gms_aspect,current_aspect,output_aspect",
[
# emitting CLL upstreamLineage over table level upstreamLineage
[
base_table_lineage_aspect(),
base_cll_aspect(),
base_cll_aspect(),
],
# emitting upstreamLineage for the first time
[
None,
base_cll_aspect(),
base_cll_aspect(),
],
# emitting CLL upstreamLineage over same CLL upstreamLineage
[
base_cll_aspect(),
base_cll_aspect(),
base_cll_aspect(),
],
# emitting CLL upstreamLineage over same CLL upstreamLineage but with earlier timestamp
[
base_cll_aspect(), # default timestamp is 0
base_cll_aspect(timestamp=1643871600000),
base_cll_aspect(timestamp=1643871600000),
],
],
)
def test_incremental_column_level_lineage(
gms_aspect: Optional[models.UpstreamLineageClass],
current_aspect: models.UpstreamLineageClass,
output_aspect: models.UpstreamLineageClass,
) -> None:
mock_graph = MagicMock()
mock_graph.get_aspect.return_value = gms_aspect
dataset_urn = make_dataset_urn(platform, "dataset1")
processed_wus = auto_incremental_lineage(
graph=mock_graph,
incremental_lineage=True,
stream=[
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=current_aspect,
systemMetadata=system_metadata,
).as_workunit()
],
)
wu: MetadataWorkUnit = next(iter(processed_wus))
aspect = wu.get_aspect_of_type(models.UpstreamLineageClass)
assert aspect == output_aspect
def test_incremental_column_lineage_less_upstreams_in_gms_aspect(
tmp_path, pytestconfig
):
test_resources_dir = pytestconfig.rootpath / "tests/unit/api/source_helpers" test_resources_dir = pytestconfig.rootpath / "tests/unit/api/source_helpers"
test_file = tmp_path / "incremental_cll_less_upstreams_in_gms_aspect.json" test_file = tmp_path / "incremental_column_lineage.json"
golden_file = ( golden_file = test_resources_dir / "incremental_column_lineage_golden.json"
test_resources_dir / "incremental_cll_less_upstreams_in_gms_aspect_golden.json"
)
urn = make_dataset_urn(platform, "dataset1") urn = make_dataset_urn(platform, "dataset1")
aspect = base_cll_aspect() aspect = base_cll_aspect()
mock_graph = MagicMock()
mock_graph.get_aspect.return_value = make_lineage_aspect(
"dataset1",
upstreams=[make_dataset_urn(platform, name) for name in ["upstream1"]],
columns=["col_a", "col_b", "col_c"],
include_cll=True,
)
processed_wus = auto_incremental_lineage( processed_wus = auto_incremental_lineage(
graph=mock_graph,
incremental_lineage=True,
stream=[
MetadataChangeProposalWrapper(
entityUrn=urn, aspect=aspect, systemMetadata=system_metadata
).as_workunit()
],
)
write_metadata_file(
test_file,
[wu.metadata for wu in processed_wus],
)
mce_helpers.check_golden_file(
pytestconfig=pytestconfig, output_path=test_file, golden_path=golden_file
)
def test_incremental_column_lineage_more_upstreams_in_gms_aspect(
tmp_path, pytestconfig
):
test_resources_dir = pytestconfig.rootpath / "tests/unit/api/source_helpers"
test_file = tmp_path / "incremental_cll_more_upstreams_in_gms_aspect.json"
golden_file = (
test_resources_dir / "incremental_cll_more_upstreams_in_gms_aspect_golden.json"
)
urn = make_dataset_urn(platform, "dataset1")
aspect = base_cll_aspect()
mock_graph = MagicMock()
mock_graph.get_aspect.return_value = make_lineage_aspect(
"dataset1",
upstreams=[
make_dataset_urn(platform, name)
for name in ["upstream1", "upstream2", "upstream3"]
],
columns=["col_a", "col_b", "col_c"],
include_cll=True,
)
processed_wus = auto_incremental_lineage(
graph=mock_graph,
incremental_lineage=True, incremental_lineage=True,
stream=[ stream=[
MetadataChangeProposalWrapper( MetadataChangeProposalWrapper(