fix(sql-parser): prevent bad urns from alter table lineage (#11092)

This commit is contained in:
Harshal Sheth 2024-08-08 14:05:55 -07:00 committed by GitHub
parent 3d9a9541f1
commit 840b15083a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 231 additions and 113 deletions

View File

@ -350,8 +350,8 @@
"json": {
"timestampMillis": 1717179743558,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
@ -367,8 +367,8 @@
"json": {
"timestampMillis": 1717179743932,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
@ -552,8 +552,8 @@
"json": {
"timestampMillis": 1717179743960,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -742,8 +742,8 @@
"json": {
"timestampMillis": 1717179748679,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
@ -759,8 +759,8 @@
"json": {
"timestampMillis": 1717179749258,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
@ -875,8 +875,8 @@
"json": {
"timestampMillis": 1717179749324,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -1161,8 +1161,8 @@
"json": {
"timestampMillis": 1717179757397,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
@ -1178,8 +1178,8 @@
"json": {
"timestampMillis": 1717179758424,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
@ -1420,8 +1420,8 @@
"json": {
"timestampMillis": 1717179758496,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -1483,10 +1483,10 @@
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"inputDatasets": [],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)"
],
@ -1555,6 +1555,19 @@
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)",
@ -1640,19 +1653,6 @@
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
]
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a",
@ -1662,8 +1662,8 @@
"json": {
"timestampMillis": 1718733767964,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
@ -1679,8 +1679,8 @@
"json": {
"timestampMillis": 1718733768638,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -1697,10 +1697,10 @@
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"inputDatasets": [],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)"
],
@ -1809,19 +1809,6 @@
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)"
]
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372",
@ -1843,8 +1830,8 @@
"json": {
"timestampMillis": 1718733773354,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
@ -1860,8 +1847,8 @@
"json": {
"timestampMillis": 1718733774147,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -1870,5 +1857,18 @@
}
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)"
]
}
}
}
]

View File

@ -336,8 +336,8 @@
"json": {
"timestampMillis": 1717180072004,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
@ -382,8 +382,8 @@
"json": {
"timestampMillis": 1719864194882,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
@ -435,8 +435,8 @@
"json": {
"timestampMillis": 1717180072275,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -641,8 +641,8 @@
"json": {
"timestampMillis": 1717180078196,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
@ -722,8 +722,8 @@
"json": {
"timestampMillis": 1717180078619,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -1000,8 +1000,8 @@
"json": {
"timestampMillis": 1717180084642,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
@ -1081,8 +1081,8 @@
"json": {
"timestampMillis": 1717180085266,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -1186,10 +1186,10 @@
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"inputDatasets": [],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)"
],
@ -1287,8 +1287,8 @@
"json": {
"timestampMillis": 1717180091148,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
@ -1368,8 +1368,8 @@
"json": {
"timestampMillis": 1717180091923,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -1499,10 +1499,10 @@
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"inputDatasets": [],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)"
],
@ -1613,8 +1613,8 @@
"json": {
"timestampMillis": 1717180096108,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
@ -1630,8 +1630,8 @@
"json": {
"timestampMillis": 1719864203487,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
@ -1712,8 +1712,8 @@
"json": {
"timestampMillis": 1717180096993,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -1727,10 +1727,10 @@
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"inputs": [
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)"
]
}
@ -1740,10 +1740,10 @@
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"inputs": [
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
]
}

View File

@ -189,35 +189,49 @@ def _table_level_lineage(
statement: sqlglot.Expression, dialect: sqlglot.Dialect
) -> Tuple[Set[_TableName], Set[_TableName]]:
# Generate table-level lineage.
modified = {
_TableName.from_sqlglot_table(expr.this)
for expr in statement.find_all(
sqlglot.exp.Create,
sqlglot.exp.Insert,
sqlglot.exp.Update,
sqlglot.exp.Delete,
sqlglot.exp.Merge,
)
# In some cases like "MERGE ... then INSERT (col1, col2) VALUES (col1, col2)",
# the `this` on the INSERT part isn't a table.
if isinstance(expr.this, sqlglot.exp.Table)
} | {
# For statements that include a column list, like
# CREATE DDL statements and `INSERT INTO table (col1, col2) SELECT ...`
# the table name is nested inside a Schema object.
_TableName.from_sqlglot_table(expr.this.this)
for expr in statement.find_all(
sqlglot.exp.Create,
sqlglot.exp.Insert,
)
if isinstance(expr.this, sqlglot.exp.Schema)
and isinstance(expr.this.this, sqlglot.exp.Table)
}
modified = (
{
_TableName.from_sqlglot_table(expr.this)
for expr in statement.find_all(
sqlglot.exp.Create,
sqlglot.exp.Insert,
sqlglot.exp.Update,
sqlglot.exp.Delete,
sqlglot.exp.Merge,
sqlglot.exp.AlterTable,
)
# In some cases like "MERGE ... then INSERT (col1, col2) VALUES (col1, col2)",
# the `this` on the INSERT part isn't a table.
if isinstance(expr.this, sqlglot.exp.Table)
}
| {
# For statements that include a column list, like
# CREATE DDL statements and `INSERT INTO table (col1, col2) SELECT ...`
# the table name is nested inside a Schema object.
_TableName.from_sqlglot_table(expr.this.this)
for expr in statement.find_all(
sqlglot.exp.Create,
sqlglot.exp.Insert,
)
if isinstance(expr.this, sqlglot.exp.Schema)
and isinstance(expr.this.this, sqlglot.exp.Table)
}
| {
# For drop statements, we only want it if a table/view is being dropped.
# Other "kinds" will not have table.name populated.
_TableName.from_sqlglot_table(expr.this)
for expr in ([statement] if isinstance(statement, sqlglot.exp.Drop) else [])
if isinstance(expr.this, sqlglot.exp.Table)
and expr.this.this
and expr.this.name
}
)
tables = (
{
_TableName.from_sqlglot_table(table)
for table in statement.find_all(sqlglot.exp.Table)
if not isinstance(table.parent, sqlglot.exp.Drop)
}
# ignore references created in this query
- modified

View File

@ -15,7 +15,6 @@ from datahub.sql_parsing.sqlglot_lineage import (
logger = logging.getLogger(__name__)
# TODO: Hook this into the standard --update-golden-files mechanism.
UPDATE_FILES = os.environ.get("UPDATE_SQLPARSER_FILES", "false").lower() == "true"

View File

@ -0,0 +1,14 @@
{
"query_type": "UNKNOWN",
"query_type_props": {},
"query_fingerprint": "7d04253c3add0194c557942ef9b7485f38e68762d300dad364b9cec8656035b3",
"in_tables": [],
"out_tables": [
"urn:li:dataset:(urn:li:dataPlatform:bigquery,my-bq-project.covid_data.covid_deaths,PROD)"
],
"column_lineage": null,
"debug_info": {
"confidence": 0.2,
"generalized_statement": "ALTER TABLE `my-bq-project.covid_data.covid_deaths` DROP COLUMN patient_name"
}
}

View File

@ -0,0 +1,12 @@
{
"query_type": "UNKNOWN",
"query_type_props": {},
"query_fingerprint": "4eefab57619a812a94030acce0071857561265945e79d798563adb53bd0b9646",
"in_tables": [],
"out_tables": [],
"column_lineage": null,
"debug_info": {
"confidence": 0.9,
"generalized_statement": "DROP SCHEMA my_schema"
}
}

View File

@ -0,0 +1,14 @@
{
"query_type": "UNKNOWN",
"query_type_props": {},
"query_fingerprint": "d1c29ad73325b08bb66e62ec00ba1d5be4412ec72b4bbc9c094f1272b9da4f86",
"in_tables": [],
"out_tables": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,my_schema.my_table,PROD)"
],
"column_lineage": null,
"debug_info": {
"confidence": 0.2,
"generalized_statement": "DROP TABLE my_schema.my_table"
}
}

View File

@ -0,0 +1,14 @@
{
"query_type": "UNKNOWN",
"query_type_props": {},
"query_fingerprint": "35a3c60e7ed98884dde3f1f5fe9079f844832430589a3326b97d617b8303f191",
"in_tables": [],
"out_tables": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,my_schema.my_view,PROD)"
],
"column_lineage": null,
"debug_info": {
"confidence": 0.2,
"generalized_statement": "DROP VIEW my_schema.my_view"
}
}

View File

@ -2,11 +2,22 @@ import pathlib
import pytest
import datahub.testing.check_sql_parser_result as checker
from datahub.testing.check_sql_parser_result import assert_sql_result
RESOURCE_DIR = pathlib.Path(__file__).parent / "goldens"
@pytest.fixture(autouse=True)
def set_update_sql_parser(
pytestconfig: pytest.Config, monkeypatch: pytest.MonkeyPatch
) -> None:
update_golden = pytestconfig.getoption("--update-golden-files")
if update_golden:
monkeypatch.setattr(checker, "UPDATE_FILES", True)
def test_invalid_sql():
assert_sql_result(
"""
@ -1202,3 +1213,43 @@ ORDER BY
dialect="bigquery",
expected_file=RESOURCE_DIR / "test_bigquery_information_schema_query.json",
)
def test_bigquery_alter_table_column() -> None:
assert_sql_result(
"""\
ALTER TABLE `my-bq-project.covid_data.covid_deaths` drop COLUMN patient_name
""",
dialect="bigquery",
expected_file=RESOURCE_DIR / "test_bigquery_alter_table_column.json",
)
def test_sqlite_drop_table() -> None:
assert_sql_result(
"""\
DROP TABLE my_schema.my_table
""",
dialect="sqlite",
expected_file=RESOURCE_DIR / "test_sqlite_drop_table.json",
)
def test_sqlite_drop_view() -> None:
assert_sql_result(
"""\
DROP VIEW my_schema.my_view
""",
dialect="sqlite",
expected_file=RESOURCE_DIR / "test_sqlite_drop_view.json",
)
def test_snowflake_drop_schema() -> None:
assert_sql_result(
"""\
DROP SCHEMA my_schema
""",
dialect="snowflake",
expected_file=RESOURCE_DIR / "test_snowflake_drop_schema.json",
)