mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-07 17:23:11 +00:00
49 lines
1.5 KiB
Python
49 lines
1.5 KiB
Python
from datahub.sql_parsing._sqlglot_patch import SQLGLOT_PATCHED
|
|
|
|
import time
|
|
|
|
import pytest
|
|
import sqlglot
|
|
import sqlglot.errors
|
|
import sqlglot.lineage
|
|
import sqlglot.optimizer
|
|
|
|
from datahub.utilities.cooperative_timeout import (
|
|
CooperativeTimeoutError,
|
|
cooperative_timeout,
|
|
)
|
|
from datahub.utilities.perf_timer import PerfTimer
|
|
|
|
assert SQLGLOT_PATCHED
|
|
|
|
|
|
def test_cooperative_timeout_sql() -> None:
|
|
statement = sqlglot.parse_one("SELECT pg_sleep(3)", dialect="postgres")
|
|
with pytest.raises(
|
|
CooperativeTimeoutError
|
|
), PerfTimer() as timer, cooperative_timeout(timeout=0.6):
|
|
while True:
|
|
# sql() implicitly calls copy(), which is where we check for the timeout.
|
|
assert statement.sql() is not None
|
|
time.sleep(0.0001)
|
|
assert 0.6 <= timer.elapsed_seconds() <= 1.0
|
|
|
|
|
|
def test_scope_circular_dependency() -> None:
|
|
scope = sqlglot.optimizer.build_scope(
|
|
sqlglot.parse_one("WITH w AS (SELECT * FROM q) SELECT * FROM w")
|
|
)
|
|
assert scope is not None
|
|
|
|
cte_scope = scope.cte_scopes[0]
|
|
cte_scope.cte_scopes.append(cte_scope)
|
|
|
|
with pytest.raises(sqlglot.errors.OptimizeError, match="circular scope dependency"):
|
|
list(scope.traverse())
|
|
|
|
|
|
def test_lineage_node_subfield() -> None:
|
|
expression = sqlglot.parse_one("SELECT 1 AS test")
|
|
node = sqlglot.lineage.Node("test", expression, expression, subfield="subfield") # type: ignore
|
|
assert node.subfield == "subfield" # type: ignore
|