fix(ingest): Handle empty column names in SQL parsing column lineage (#15013)

Co-authored-by: Kyungsoo Lee <kyungsl@Kyungsoos-MacBook-Pro.local>
This commit is contained in:
kyungsoo-datahub 2025-10-22 09:09:32 -07:00 committed by GitHub
parent 396868391d
commit eecc2e922c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 304 additions and 4 deletions

View File

@ -1340,11 +1340,25 @@ class SqlParsingAggregator(Closeable):
upstreams.setdefault(upstream, query.query_id)
for lineage_info in query.column_lineage:
for upstream_ref in lineage_info.upstreams:
cll[lineage_info.downstream.column].setdefault(
SchemaFieldUrn(upstream_ref.table, upstream_ref.column),
query.query_id,
if (
not lineage_info.downstream.column
or not lineage_info.downstream.column.strip()
):
logger.debug(
f"Skipping lineage entry with empty downstream column in query {query.query_id}"
)
continue
for upstream_ref in lineage_info.upstreams:
if upstream_ref.column and upstream_ref.column.strip():
cll[lineage_info.downstream.column].setdefault(
SchemaFieldUrn(upstream_ref.table, upstream_ref.column),
query.query_id,
)
else:
logger.debug(
f"Skipping empty column reference in lineage for query {query.query_id}"
)
# Finally, we can build our lineage edge.
required_queries = OrderedSet[QueryId]()

View File

@ -0,0 +1,53 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:e0ac0a087b1c788497e6ae3ed1abf6538c66f80a0b1e6fad83219813d6c95b27"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD),col_a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD),col_a)"
],
"confidenceScore": 1.0,
"query": "urn:li:query:e0ac0a087b1c788497e6ae3ed1abf6538c66f80a0b1e6fad83219813d6c95b27"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD),col_b)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD),col_b)"
],
"confidenceScore": 1.0,
"query": "urn:li:query:e0ac0a087b1c788497e6ae3ed1abf6538c66f80a0b1e6fad83219813d6c95b27"
}
]
}
}
}
]

View File

@ -0,0 +1,27 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:7bfef29f2b8442ffa6e240bfeb06b78bbf971b25dff80a9d2ba31cf5aef55e2e"
}
]
}
}
}
]

View File

@ -0,0 +1,41 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_downstream,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_upstream,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:9138bff85215113e471f5869fabf99797c37181899f5fb145ed05846437426d2"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_upstream,PROD),title)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_downstream,PROD),TITLE_DOWNSTREAM)"
],
"confidenceScore": 1.0,
"query": "urn:li:query:9138bff85215113e471f5869fabf99797c37181899f5fb145ed05846437426d2"
}
]
}
}
}
]

View File

@ -1132,3 +1132,168 @@ def test_diamond_problem(pytestconfig: pytest.Config, tmp_path: pathlib.Path) ->
pytestconfig.rootpath
/ "tests/unit/sql_parsing/aggregator_goldens/test_diamond_problem_golden.json",
)
@freeze_time(FROZEN_TIME)
def test_empty_column_in_snowflake_lineage(
pytestconfig: pytest.Config, tmp_path: pathlib.Path
) -> None:
"""Test that column lineage with empty string column names doesn't cause errors.
Note: Uses KnownQueryLineageInfo instead of ObservedQuery since empty column names from
external systems would require mocking _run_sql_parser().
"""
aggregator = SqlParsingAggregator(
platform="snowflake",
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)
downstream_urn = DatasetUrn("snowflake", "dev.public.target_table").urn()
upstream_urn = DatasetUrn("snowflake", "dev.public.source_table").urn()
known_query_lineage = KnownQueryLineageInfo(
query_text="insert into target_table (col_a, col_b, col_c) select col_a, col_b, col_c from source_table",
downstream=downstream_urn,
upstreams=[upstream_urn],
column_lineage=[
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column="col_a"),
upstreams=[ColumnRef(table=upstream_urn, column="col_a")],
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column="col_b"),
upstreams=[
ColumnRef(table=upstream_urn, column="col_b"),
ColumnRef(table=upstream_urn, column=""),
],
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column="col_c"),
upstreams=[
ColumnRef(table=upstream_urn, column=""),
],
),
],
timestamp=_ts(20),
query_type=QueryType.INSERT,
)
aggregator.add_known_query_lineage(known_query_lineage)
mcpws = [mcp for mcp in aggregator.gen_metadata()]
lineage_mcpws = [mcpw for mcpw in mcpws if mcpw.aspectName == "upstreamLineage"]
out_path = tmp_path / "mcpw.json"
write_metadata_file(out_path, lineage_mcpws)
mce_helpers.check_golden_file(
pytestconfig,
out_path,
RESOURCE_DIR / "test_empty_column_in_snowflake_lineage_golden.json",
)
@freeze_time(FROZEN_TIME)
def test_empty_downstream_column_in_snowflake_lineage(
pytestconfig: pytest.Config, tmp_path: pathlib.Path
) -> None:
"""Test that column lineage with empty downstream column names doesn't cause errors.
Note: Uses KnownQueryLineageInfo instead of ObservedQuery since empty column names from
external systems would require mocking _run_sql_parser().
"""
aggregator = SqlParsingAggregator(
platform="snowflake",
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)
downstream_urn = DatasetUrn("snowflake", "dev.public.target_table").urn()
upstream_urn = DatasetUrn("snowflake", "dev.public.source_table").urn()
known_query_lineage = KnownQueryLineageInfo(
query_text='create table target_table as select $1 as "", $2 as " " from source_table',
downstream=downstream_urn,
upstreams=[upstream_urn],
column_lineage=[
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column=""),
upstreams=[ColumnRef(table=upstream_urn, column="col_a")],
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column=" "),
upstreams=[ColumnRef(table=upstream_urn, column="col_b")],
),
],
timestamp=_ts(20),
query_type=QueryType.CREATE_TABLE_AS_SELECT,
)
aggregator.add_known_query_lineage(known_query_lineage)
mcpws = [mcp for mcp in aggregator.gen_metadata()]
lineage_mcpws = [mcpw for mcpw in mcpws if mcpw.aspectName == "upstreamLineage"]
out_path = tmp_path / "mcpw.json"
write_metadata_file(out_path, lineage_mcpws)
mce_helpers.check_golden_file(
pytestconfig,
out_path,
RESOURCE_DIR / "test_empty_downstream_column_in_snowflake_lineage_golden.json",
)
@freeze_time(FROZEN_TIME)
def test_partial_empty_downstream_column_in_snowflake_lineage(
pytestconfig: pytest.Config, tmp_path: pathlib.Path
) -> None:
"""Test that column lineage with mix of empty and valid downstream columns works correctly.
Note: Uses KnownQueryLineageInfo instead of ObservedQuery since empty column names from
external systems would require mocking _run_sql_parser().
"""
aggregator = SqlParsingAggregator(
platform="snowflake",
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)
downstream_urn = DatasetUrn("snowflake", "dev.public.empty_downstream").urn()
upstream_urn = DatasetUrn("snowflake", "dev.public.empty_upstream").urn()
known_query_lineage = KnownQueryLineageInfo(
query_text='create table empty_downstream as select $1 as "", $2 as "TITLE_DOWNSTREAM" from empty_upstream',
downstream=downstream_urn,
upstreams=[upstream_urn],
column_lineage=[
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column=""),
upstreams=[ColumnRef(table=upstream_urn, column="name")],
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(
table=downstream_urn, column="TITLE_DOWNSTREAM"
),
upstreams=[ColumnRef(table=upstream_urn, column="title")],
),
],
timestamp=_ts(20),
query_type=QueryType.CREATE_TABLE_AS_SELECT,
)
aggregator.add_known_query_lineage(known_query_lineage)
mcpws = [mcp for mcp in aggregator.gen_metadata()]
lineage_mcpws = [mcpw for mcpw in mcpws if mcpw.aspectName == "upstreamLineage"]
out_path = tmp_path / "mcpw.json"
write_metadata_file(out_path, lineage_mcpws)
mce_helpers.check_golden_file(
pytestconfig,
out_path,
RESOURCE_DIR
/ "test_partial_empty_downstream_column_in_snowflake_lineage_golden.json",
)