Sergio Gómez Villamor 7a27e8e7fd
fix(ingest/sql-queries): handle string timestamps and empty table names (#14205)
Co-authored-by: Claude <noreply@anthropic.com>
2025-07-25 22:33:49 +02:00

429 lines
17 KiB
Python

from datetime import datetime, timezone
import pytest
from datahub.ingestion.source.sql_queries import QueryEntry, SqlQueriesSourceConfig
from datahub.metadata.urns import CorpUserUrn, DatasetUrn
class TestQueryEntry:
@pytest.mark.parametrize(
"entry_dict,entry_config,expected_query_entry,should_raise",
[
# Timestamp format variations
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": 1609459200,
"user": "test_user",
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,table1,PROD)"
)
],
session_id=None,
),
False,
id="numeric_unix_timestamp",
),
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": 1609459200.5,
"user": "test_user",
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(
2021, 1, 1, 0, 0, 0, 500000, tzinfo=timezone.utc
),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,table1,PROD)"
)
],
session_id=None,
),
False,
id="float_unix_timestamp",
),
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": "1609459200",
"user": "test_user",
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,table1,PROD)"
)
],
session_id=None,
),
False,
id="string_unix_timestamp",
),
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": "2021-01-01T00:00:00Z",
"user": "test_user",
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,table1,PROD)"
)
],
session_id=None,
),
False,
id="iso_format_with_z",
),
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": "2025-07-19 15:08:07.000000",
"user": "test_user",
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2025, 7, 19, 15, 8, 7, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,table1,PROD)"
)
],
session_id=None,
),
False,
id="datetime_string_with_microseconds",
),
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": "2025-07-19 15:08:07",
"user": "test_user",
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2025, 7, 19, 15, 8, 7, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,table1,PROD)"
)
],
session_id=None,
),
False,
id="datetime_string_without_microseconds",
),
# Table filtering - empty strings removed
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": 1609459200,
"user": "test_user",
"upstream_tables": ["table1", "", "table2"],
"downstream_tables": ["output_table"],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,output_table,PROD)"
)
],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,table1,PROD)"
),
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,table2,PROD)"
),
],
session_id=None,
),
False,
id="filter_empty_upstream_tables",
),
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": 1609459200,
"user": "test_user",
"upstream_tables": [""],
"downstream_tables": [""],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[],
session_id=None,
),
False,
id="all_empty_tables",
),
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": 1609459200,
"user": "test_user",
"upstream_tables": [" ", "\t"],
"downstream_tables": [" "],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[],
session_id=None,
),
False,
id="whitespace_only_tables",
),
# Missing fields
pytest.param(
{
"query": "SELECT * FROM table",
"user": "test_user",
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="SELECT * FROM table",
timestamp=None,
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,table1,PROD)"
)
],
session_id=None,
),
False,
id="no_timestamp",
),
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": 1609459200,
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
user=None,
operation_type=None,
downstream_tables=[],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,table1,PROD)"
)
],
session_id=None,
),
False,
id="no_user",
),
# Both upstream and downstream tables
pytest.param(
{
"query": "CREATE TABLE out AS SELECT * FROM table",
"timestamp": 1609459200,
"user": "test_user",
"upstream_tables": ["table1"],
"downstream_tables": ["output_table"],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
QueryEntry(
query="CREATE TABLE out AS SELECT * FROM table",
timestamp=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,output_table,PROD)"
)
],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:athena,table1,PROD)"
)
],
session_id=None,
),
False,
id="upstream_and_downstream_tables",
),
# Config variations - platform, platform_instance, env
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": 1609459200,
"user": "test_user",
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(
platform="snowflake",
query_file="dummy.json",
platform_instance="prod_instance",
env="DEV",
),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,prod_instance.table1,DEV)"
)
],
session_id=None,
),
False,
id="config_with_platform_instance_and_env",
),
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": 1609459200,
"user": "test_user",
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(
platform="bigquery", query_file="dummy.json", env="PRE"
),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:bigquery,table1,PRE)"
)
],
session_id=None,
),
False,
id="config_with_different_env",
),
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": 1609459200,
"user": "test_user",
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(
platform="postgres",
query_file="dummy.json",
platform_instance="dev_cluster",
),
QueryEntry(
query="SELECT * FROM table",
timestamp=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
user=CorpUserUrn("test_user"),
operation_type=None,
downstream_tables=[],
upstream_tables=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:postgres,dev_cluster.table1,PROD)"
)
],
session_id=None,
),
False,
id="config_with_platform_instance_only",
),
# Error cases
pytest.param(
{
"query": "SELECT * FROM table",
"timestamp": "invalid-timestamp",
"user": "test_user",
"upstream_tables": ["table1"],
},
SqlQueriesSourceConfig(platform="athena", query_file="dummy.json"),
None,
True,
id="invalid_timestamp_format",
),
],
)
def test_create(self, entry_dict, entry_config, expected_query_entry, should_raise):
"""Test QueryEntry creation with various input formats and edge cases."""
if should_raise:
with pytest.raises(ValueError):
QueryEntry.create(entry_dict, config=entry_config)
return
query_entry = QueryEntry.create(entry_dict, config=entry_config)
# Check all fields match expected
assert query_entry.query == expected_query_entry.query
assert query_entry.timestamp == expected_query_entry.timestamp
assert query_entry.user == expected_query_entry.user
assert query_entry.operation_type == expected_query_entry.operation_type
assert query_entry.downstream_tables == expected_query_entry.downstream_tables
assert query_entry.upstream_tables == expected_query_entry.upstream_tables
assert query_entry.session_id == expected_query_entry.session_id