datahub/metadata-ingestion/tests/unit/bigquery/test_bigquery_profiler.py
2025-02-28 17:49:52 +05:30

213 lines
6.5 KiB
Python

from datetime import datetime, timezone
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryColumn,
BigqueryTable,
PartitionInfo,
)
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
def test_not_generate_partition_profiler_query_if_not_partitioned_sharded_table():
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
test_table = BigqueryTable(
name="test_table",
comment="test_comment",
rows_count=1,
size_in_bytes=1,
last_altered=datetime.now(timezone.utc),
created=datetime.now(timezone.utc),
)
query = profiler.generate_partition_profiler_query(
project="test_project",
schema="test_dataset",
table=test_table,
partition_datetime=None,
)
assert query == (None, None)
def test_generate_day_partitioned_partition_profiler_query():
column = BigqueryColumn(
name="date",
field_path="date",
ordinal_position=1,
data_type="TIMESTAMP",
is_partition_column=True,
cluster_column_position=None,
comment=None,
is_nullable=False,
)
partition_info = PartitionInfo(type="DAY", field="date", column=column)
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
test_table = BigqueryTable(
name="test_table",
comment="test_comment",
rows_count=1,
size_in_bytes=1,
last_altered=datetime.now(timezone.utc),
created=datetime.now(timezone.utc),
partition_info=partition_info,
max_partition_id="20200101",
)
query = profiler.generate_partition_profiler_query(
project="test_project",
schema="test_dataset",
table=test_table,
)
expected_query = """
SELECT
*
FROM
`test_project.test_dataset.test_table`
WHERE
`date` BETWEEN TIMESTAMP('2020-01-01 00:00:00') AND TIMESTAMP('2020-01-02 00:00:00')
""".strip()
assert query[0] == "20200101"
assert query[1]
assert expected_query == query[1].strip()
# If partition time is passed in we force to use that time instead of the max partition id
def test_generate_day_partitioned_partition_profiler_query_with_set_partition_time():
column = BigqueryColumn(
name="date",
field_path="date",
ordinal_position=1,
data_type="TIMESTAMP",
is_partition_column=True,
cluster_column_position=None,
comment=None,
is_nullable=False,
)
partition_info = PartitionInfo(type="DAY", field="date", column=column)
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
test_table = BigqueryTable(
name="test_table",
comment="test_comment",
rows_count=1,
size_in_bytes=1,
last_altered=datetime.now(timezone.utc),
created=datetime.now(timezone.utc),
partition_info=partition_info,
max_partition_id="20200101",
)
query = profiler.generate_partition_profiler_query(
project="test_project",
schema="test_dataset",
table=test_table,
)
expected_query = """
SELECT
*
FROM
`test_project.test_dataset.test_table`
WHERE
`date` BETWEEN TIMESTAMP('2020-01-01 00:00:00') AND TIMESTAMP('2020-01-02 00:00:00')
""".strip()
assert query[0] == "20200101"
assert query[1]
assert expected_query == query[1].strip()
def test_generate_hour_partitioned_partition_profiler_query():
column = BigqueryColumn(
name="partition_column",
field_path="partition_column",
ordinal_position=1,
data_type="TIMESTAMP",
is_partition_column=True,
cluster_column_position=None,
comment=None,
is_nullable=False,
)
partition_info = PartitionInfo(type="DAY", field="date", column=column)
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
test_table = BigqueryTable(
name="test_table",
comment="test_comment",
rows_count=1,
size_in_bytes=1,
last_altered=datetime.now(timezone.utc),
created=datetime.now(timezone.utc),
partition_info=partition_info,
max_partition_id="2020010103",
)
query = profiler.generate_partition_profiler_query(
project="test_project",
schema="test_dataset",
table=test_table,
partition_datetime=None,
)
expected_query = """
SELECT
*
FROM
`test_project.test_dataset.test_table`
WHERE
`partition_column` BETWEEN TIMESTAMP('2020-01-01 03:00:00') AND TIMESTAMP('2020-01-01 04:00:00')
""".strip()
assert query[0] == "2020010103"
assert query[1]
assert expected_query == query[1].strip()
# Ingestion partitioned tables do not have partition column in the schema as it uses a psudo column _PARTITIONTIME to partition
def test_generate_ingestion_partitioned_partition_profiler_query():
partition_info = PartitionInfo(type="DAY", field="date")
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
test_table = BigqueryTable(
name="test_table",
comment="test_comment",
rows_count=1,
size_in_bytes=1,
last_altered=datetime.now(timezone.utc),
created=datetime.now(timezone.utc),
partition_info=partition_info,
max_partition_id="20200101",
)
query = profiler.generate_partition_profiler_query(
project="test_project",
schema="test_dataset",
table=test_table,
)
expected_query = """
SELECT
*
FROM
`test_project.test_dataset.test_table`
WHERE
`_PARTITIONTIME` BETWEEN TIMESTAMP('2020-01-01 00:00:00') AND TIMESTAMP('2020-01-02 00:00:00')
""".strip()
assert query[0] == "20200101"
assert query[1]
assert expected_query == query[1].strip()
def test_generate_sharded_table_profiler_query():
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
test_table = BigqueryTable(
name="my_sharded_table",
max_shard_id="20200101",
comment="test_comment",
rows_count=1,
size_in_bytes=1,
last_altered=datetime.now(timezone.utc),
created=datetime.now(timezone.utc),
)
query = profiler.generate_partition_profiler_query(
project="test_project",
schema="test_dataset",
table=test_table,
)
assert query[0] == "20200101"
assert query[1] is None