feat(ingestion/sqlglot): preserve CTEs when extracting SELECT from INSERT statements and add corresponding unit test (#14898)

This commit is contained in:
Anush Kumar 2025-10-01 04:52:35 -07:00 committed by GitHub
parent 8248999758
commit 00caa38adf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 93 additions and 1 deletions

View File

@ -1176,7 +1176,12 @@ def _try_extract_select(
statement = sqlglot.exp.Select().select("*").from_(statement)
elif isinstance(statement, sqlglot.exp.Insert):
# TODO Need to map column renames in the expressions part of the statement.
statement = statement.expression
# Preserve CTEs when extracting the SELECT expression from INSERT
original_ctes = statement.ctes
statement = statement.expression # Get the SELECT expression from the INSERT
if isinstance(statement, sqlglot.exp.Query) and original_ctes:
for cte in original_ctes:
statement = statement.with_(alias=cte.alias, as_=cte.this)
elif isinstance(statement, sqlglot.exp.Update):
# Assumption: the output table is already captured in the modified tables list.
statement = _extract_select_from_update(statement)

View File

@ -0,0 +1,72 @@
{
"query_type": "INSERT",
"query_type_props": {},
"query_fingerprint": "195448498ded7a1b4df767cf0a5ec53e2fa4c7b011234bafe0a60ff9d7d11c1d",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:tsql,db.schema.source_table,PROD)"
],
"out_tables": [
"urn:li:dataset:(urn:li:dataPlatform:tsql,db.schema.target_table,PROD)"
],
"column_lineage": [
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:tsql,db.schema.target_table,PROD)",
"column": "id",
"column_type": null,
"native_column_type": null
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:tsql,db.schema.source_table,PROD)",
"column": "id"
}
],
"logic": {
"is_direct_copy": true,
"column_logic": "[source_table].[id] AS [id]"
}
},
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:tsql,db.schema.target_table,PROD)",
"column": "name",
"column_type": null,
"native_column_type": null
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:tsql,db.schema.source_table,PROD)",
"column": "name"
}
],
"logic": {
"is_direct_copy": true,
"column_logic": "[source_table].[name] AS [name]"
}
},
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:tsql,db.schema.target_table,PROD)",
"column": "value",
"column_type": null,
"native_column_type": null
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:tsql,db.schema.source_table,PROD)",
"column": "value"
}
],
"logic": {
"is_direct_copy": true,
"column_logic": "[source_table].[value] AS [value]"
}
}
],
"joins": [],
"debug_info": {
"confidence": 0.2,
"generalized_statement": "WITH temp_cte AS (SELECT id AS id, name AS name, value AS value FROM db.schema.source_table) INSERT INTO db.schema.target_table (id, name, value) SELECT id, name, value FROM temp_cte"
}
}

View File

@ -199,6 +199,21 @@ insert into downstream (a, c) select a, c from upstream2
)
def test_insert_with_cte() -> None:
assert_sql_result(
"""
WITH temp_cte AS (
SELECT id, name, value
FROM db.schema.source_table
)
INSERT INTO db.schema.target_table (id, name, value)
SELECT id, name, value FROM temp_cte
""",
dialect="tsql",
expected_file=RESOURCE_DIR / "test_insert_with_cte.json",
)
def test_select_with_full_col_name() -> None:
# In this case, `widget` is a struct column.
# This also tests the `default_db` functionality.