diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 0a5ada28a4..a093ce4596 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -1340,11 +1340,25 @@ class SqlParsingAggregator(Closeable): upstreams.setdefault(upstream, query.query_id) for lineage_info in query.column_lineage: - for upstream_ref in lineage_info.upstreams: - cll[lineage_info.downstream.column].setdefault( - SchemaFieldUrn(upstream_ref.table, upstream_ref.column), - query.query_id, + if ( + not lineage_info.downstream.column + or not lineage_info.downstream.column.strip() + ): + logger.debug( + f"Skipping lineage entry with empty downstream column in query {query.query_id}" ) + continue + + for upstream_ref in lineage_info.upstreams: + if upstream_ref.column and upstream_ref.column.strip(): + cll[lineage_info.downstream.column].setdefault( + SchemaFieldUrn(upstream_ref.table, upstream_ref.column), + query.query_id, + ) + else: + logger.debug( + f"Skipping empty column reference in lineage for query {query.query_id}" + ) # Finally, we can build our lineage edge. required_queries = OrderedSet[QueryId]() diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_snowflake_lineage_golden.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_snowflake_lineage_golden.json new file mode 100644 index 0000000000..51acb74067 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_snowflake_lineage_golden.json @@ -0,0 +1,53 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:e0ac0a087b1c788497e6ae3ed1abf6538c66f80a0b1e6fad83219813d6c95b27" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD),col_a)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD),col_a)" + ], + "confidenceScore": 1.0, + "query": "urn:li:query:e0ac0a087b1c788497e6ae3ed1abf6538c66f80a0b1e6fad83219813d6c95b27" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD),col_b)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD),col_b)" + ], + "confidenceScore": 1.0, + "query": "urn:li:query:e0ac0a087b1c788497e6ae3ed1abf6538c66f80a0b1e6fad83219813d6c95b27" + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_downstream_column_in_snowflake_lineage_golden.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_downstream_column_in_snowflake_lineage_golden.json new file mode 100644 index 0000000000..5bab6d0ea8 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_downstream_column_in_snowflake_lineage_golden.json @@ -0,0 +1,27 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:7bfef29f2b8442ffa6e240bfeb06b78bbf971b25dff80a9d2ba31cf5aef55e2e" + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_partial_empty_downstream_column_in_snowflake_lineage_golden.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_partial_empty_downstream_column_in_snowflake_lineage_golden.json new file mode 100644 index 0000000000..8e16ce654b --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_partial_empty_downstream_column_in_snowflake_lineage_golden.json @@ -0,0 +1,41 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_downstream,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_upstream,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:9138bff85215113e471f5869fabf99797c37181899f5fb145ed05846437426d2" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_upstream,PROD),title)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_downstream,PROD),TITLE_DOWNSTREAM)" + ], + "confidenceScore": 1.0, + "query": "urn:li:query:9138bff85215113e471f5869fabf99797c37181899f5fb145ed05846437426d2" + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index 9882f6aec4..98ed52c6c3 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -1132,3 +1132,168 @@ def test_diamond_problem(pytestconfig: pytest.Config, tmp_path: pathlib.Path) -> pytestconfig.rootpath / "tests/unit/sql_parsing/aggregator_goldens/test_diamond_problem_golden.json", ) + + +@freeze_time(FROZEN_TIME) +def test_empty_column_in_snowflake_lineage( + pytestconfig: pytest.Config, tmp_path: pathlib.Path +) -> None: + """Test that column lineage with empty string column names doesn't cause errors. + + Note: Uses KnownQueryLineageInfo instead of ObservedQuery since empty column names from + external systems would require mocking _run_sql_parser(). + """ + aggregator = SqlParsingAggregator( + platform="snowflake", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + ) + + downstream_urn = DatasetUrn("snowflake", "dev.public.target_table").urn() + upstream_urn = DatasetUrn("snowflake", "dev.public.source_table").urn() + + known_query_lineage = KnownQueryLineageInfo( + query_text="insert into target_table (col_a, col_b, col_c) select col_a, col_b, col_c from source_table", + downstream=downstream_urn, + upstreams=[upstream_urn], + column_lineage=[ + ColumnLineageInfo( + downstream=DownstreamColumnRef(table=downstream_urn, column="col_a"), + upstreams=[ColumnRef(table=upstream_urn, column="col_a")], + ), + ColumnLineageInfo( + downstream=DownstreamColumnRef(table=downstream_urn, column="col_b"), + upstreams=[ + ColumnRef(table=upstream_urn, column="col_b"), + ColumnRef(table=upstream_urn, column=""), + ], + ), + ColumnLineageInfo( + downstream=DownstreamColumnRef(table=downstream_urn, column="col_c"), + upstreams=[ + ColumnRef(table=upstream_urn, column=""), + ], + ), + ], + timestamp=_ts(20), + query_type=QueryType.INSERT, + ) + + aggregator.add_known_query_lineage(known_query_lineage) + + mcpws = [mcp for mcp in aggregator.gen_metadata()] + lineage_mcpws = [mcpw for mcpw in mcpws if mcpw.aspectName == "upstreamLineage"] + out_path = tmp_path / "mcpw.json" + write_metadata_file(out_path, lineage_mcpws) + + mce_helpers.check_golden_file( + pytestconfig, + out_path, + RESOURCE_DIR / "test_empty_column_in_snowflake_lineage_golden.json", + ) + + +@freeze_time(FROZEN_TIME) +def test_empty_downstream_column_in_snowflake_lineage( + pytestconfig: pytest.Config, tmp_path: pathlib.Path +) -> None: + """Test that column lineage with empty downstream column names doesn't cause errors. + + Note: Uses KnownQueryLineageInfo instead of ObservedQuery since empty column names from + external systems would require mocking _run_sql_parser(). + """ + aggregator = SqlParsingAggregator( + platform="snowflake", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + ) + + downstream_urn = DatasetUrn("snowflake", "dev.public.target_table").urn() + upstream_urn = DatasetUrn("snowflake", "dev.public.source_table").urn() + + known_query_lineage = KnownQueryLineageInfo( + query_text='create table target_table as select $1 as "", $2 as " " from source_table', + downstream=downstream_urn, + upstreams=[upstream_urn], + column_lineage=[ + ColumnLineageInfo( + downstream=DownstreamColumnRef(table=downstream_urn, column=""), + upstreams=[ColumnRef(table=upstream_urn, column="col_a")], + ), + ColumnLineageInfo( + downstream=DownstreamColumnRef(table=downstream_urn, column=" "), + upstreams=[ColumnRef(table=upstream_urn, column="col_b")], + ), + ], + timestamp=_ts(20), + query_type=QueryType.CREATE_TABLE_AS_SELECT, + ) + + aggregator.add_known_query_lineage(known_query_lineage) + + mcpws = [mcp for mcp in aggregator.gen_metadata()] + lineage_mcpws = [mcpw for mcpw in mcpws if mcpw.aspectName == "upstreamLineage"] + out_path = tmp_path / "mcpw.json" + write_metadata_file(out_path, lineage_mcpws) + + mce_helpers.check_golden_file( + pytestconfig, + out_path, + RESOURCE_DIR / "test_empty_downstream_column_in_snowflake_lineage_golden.json", + ) + + +@freeze_time(FROZEN_TIME) +def test_partial_empty_downstream_column_in_snowflake_lineage( + pytestconfig: pytest.Config, tmp_path: pathlib.Path +) -> None: + """Test that column lineage with mix of empty and valid downstream columns works correctly. + + Note: Uses KnownQueryLineageInfo instead of ObservedQuery since empty column names from + external systems would require mocking _run_sql_parser(). + """ + aggregator = SqlParsingAggregator( + platform="snowflake", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + ) + + downstream_urn = DatasetUrn("snowflake", "dev.public.empty_downstream").urn() + upstream_urn = DatasetUrn("snowflake", "dev.public.empty_upstream").urn() + + known_query_lineage = KnownQueryLineageInfo( + query_text='create table empty_downstream as select $1 as "", $2 as "TITLE_DOWNSTREAM" from empty_upstream', + downstream=downstream_urn, + upstreams=[upstream_urn], + column_lineage=[ + ColumnLineageInfo( + downstream=DownstreamColumnRef(table=downstream_urn, column=""), + upstreams=[ColumnRef(table=upstream_urn, column="name")], + ), + ColumnLineageInfo( + downstream=DownstreamColumnRef( + table=downstream_urn, column="TITLE_DOWNSTREAM" + ), + upstreams=[ColumnRef(table=upstream_urn, column="title")], + ), + ], + timestamp=_ts(20), + query_type=QueryType.CREATE_TABLE_AS_SELECT, + ) + + aggregator.add_known_query_lineage(known_query_lineage) + + mcpws = [mcp for mcp in aggregator.gen_metadata()] + lineage_mcpws = [mcpw for mcpw in mcpws if mcpw.aspectName == "upstreamLineage"] + out_path = tmp_path / "mcpw.json" + write_metadata_file(out_path, lineage_mcpws) + + mce_helpers.check_golden_file( + pytestconfig, + out_path, + RESOURCE_DIR + / "test_partial_empty_downstream_column_in_snowflake_lineage_golden.json", + )