mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-17 13:04:04 +00:00
fix(ingest/bigquery): Filter out fine grained lineage with no upstreams (#8758)
This commit is contained in:
parent
21b2851be7
commit
a4e726872b
@ -183,6 +183,7 @@ def make_lineage_edges_from_parsing_result(
|
|||||||
column_mapping=frozenset(
|
column_mapping=frozenset(
|
||||||
LineageEdgeColumnMapping(out_column=out_column, in_columns=in_columns)
|
LineageEdgeColumnMapping(out_column=out_column, in_columns=in_columns)
|
||||||
for out_column, in_columns in column_mapping.items()
|
for out_column, in_columns in column_mapping.items()
|
||||||
|
if in_columns
|
||||||
),
|
),
|
||||||
auditStamp=audit_stamp,
|
auditStamp=audit_stamp,
|
||||||
type=lineage_type,
|
type=lineage_type,
|
||||||
|
|||||||
@ -1,6 +1,8 @@
|
|||||||
import datetime
|
import datetime
|
||||||
from typing import Dict, List, Set
|
from typing import Dict, List, Set
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
|
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
|
||||||
BigQueryTableRef,
|
BigQueryTableRef,
|
||||||
QueryEvent,
|
QueryEvent,
|
||||||
@ -14,15 +16,17 @@ from datahub.ingestion.source.bigquery_v2.lineage import (
|
|||||||
from datahub.utilities.sqlglot_lineage import SchemaResolver
|
from datahub.utilities.sqlglot_lineage import SchemaResolver
|
||||||
|
|
||||||
|
|
||||||
def test_lineage_with_timestamps():
|
@pytest.fixture
|
||||||
config = BigQueryV2Config()
|
def lineage_entries() -> List[QueryEvent]:
|
||||||
report = BigQueryV2Report()
|
return [
|
||||||
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report)
|
|
||||||
lineage_entries: List[QueryEvent] = [
|
|
||||||
QueryEvent(
|
QueryEvent(
|
||||||
timestamp=datetime.datetime.now(tz=datetime.timezone.utc),
|
timestamp=datetime.datetime.now(tz=datetime.timezone.utc),
|
||||||
actor_email="bla@bla.com",
|
actor_email="bla@bla.com",
|
||||||
query="testQuery",
|
query="""
|
||||||
|
INSERT INTO `my_project.my_dataset.my_table`
|
||||||
|
SELECT first.a, second.b FROM `my_project.my_dataset.my_source_table1` first
|
||||||
|
LEFT JOIN `my_project.my_dataset.my_source_table2` second ON first.id = second.id
|
||||||
|
""",
|
||||||
statementType="SELECT",
|
statementType="SELECT",
|
||||||
project_id="proj_12344",
|
project_id="proj_12344",
|
||||||
end_time=None,
|
end_time=None,
|
||||||
@ -73,6 +77,12 @@ def test_lineage_with_timestamps():
|
|||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None:
|
||||||
|
config = BigQueryV2Config()
|
||||||
|
report = BigQueryV2Report()
|
||||||
|
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report)
|
||||||
|
|
||||||
bq_table = BigQueryTableRef.from_string_name(
|
bq_table = BigQueryTableRef.from_string_name(
|
||||||
"projects/my_project/datasets/my_dataset/tables/my_table"
|
"projects/my_project/datasets/my_dataset/tables/my_table"
|
||||||
)
|
)
|
||||||
@ -90,3 +100,31 @@ def test_lineage_with_timestamps():
|
|||||||
)
|
)
|
||||||
assert upstream_lineage
|
assert upstream_lineage
|
||||||
assert len(upstream_lineage.upstreams) == 4
|
assert len(upstream_lineage.upstreams) == 4
|
||||||
|
|
||||||
|
|
||||||
|
def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None:
|
||||||
|
config = BigQueryV2Config(extract_column_lineage=True, incremental_lineage=False)
|
||||||
|
report = BigQueryV2Report()
|
||||||
|
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report)
|
||||||
|
|
||||||
|
bq_table = BigQueryTableRef.from_string_name(
|
||||||
|
"projects/my_project/datasets/my_dataset/tables/my_table"
|
||||||
|
)
|
||||||
|
|
||||||
|
lineage_map: Dict[str, Set[LineageEdge]] = extractor._create_lineage_map(
|
||||||
|
lineage_entries[:1],
|
||||||
|
sql_parser_schema_resolver=SchemaResolver(platform="bigquery"),
|
||||||
|
)
|
||||||
|
|
||||||
|
upstream_lineage = extractor.get_lineage_for_table(
|
||||||
|
bq_table=bq_table,
|
||||||
|
bq_table_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.my_dataset.my_table,PROD)",
|
||||||
|
lineage_metadata=lineage_map,
|
||||||
|
platform="bigquery",
|
||||||
|
)
|
||||||
|
assert upstream_lineage
|
||||||
|
assert len(upstream_lineage.upstreams) == 2
|
||||||
|
assert (
|
||||||
|
upstream_lineage.fineGrainedLineages
|
||||||
|
and len(upstream_lineage.fineGrainedLineages) == 2
|
||||||
|
)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user