mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-08 17:53:11 +00:00
213 lines
6.5 KiB
Python
213 lines
6.5 KiB
Python
from datetime import datetime, timezone
|
|
|
|
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
|
|
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
|
|
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
|
|
BigqueryColumn,
|
|
BigqueryTable,
|
|
PartitionInfo,
|
|
)
|
|
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
|
|
|
|
|
|
def test_not_generate_partition_profiler_query_if_not_partitioned_sharded_table():
|
|
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
|
|
test_table = BigqueryTable(
|
|
name="test_table",
|
|
comment="test_comment",
|
|
rows_count=1,
|
|
size_in_bytes=1,
|
|
last_altered=datetime.now(timezone.utc),
|
|
created=datetime.now(timezone.utc),
|
|
)
|
|
query = profiler.generate_partition_profiler_query(
|
|
project="test_project",
|
|
schema="test_dataset",
|
|
table=test_table,
|
|
partition_datetime=None,
|
|
)
|
|
|
|
assert query == (None, None)
|
|
|
|
|
|
def test_generate_day_partitioned_partition_profiler_query():
|
|
column = BigqueryColumn(
|
|
name="date",
|
|
field_path="date",
|
|
ordinal_position=1,
|
|
data_type="TIMESTAMP",
|
|
is_partition_column=True,
|
|
cluster_column_position=None,
|
|
comment=None,
|
|
is_nullable=False,
|
|
)
|
|
partition_info = PartitionInfo(type="DAY", field="date", column=column)
|
|
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
|
|
test_table = BigqueryTable(
|
|
name="test_table",
|
|
comment="test_comment",
|
|
rows_count=1,
|
|
size_in_bytes=1,
|
|
last_altered=datetime.now(timezone.utc),
|
|
created=datetime.now(timezone.utc),
|
|
partition_info=partition_info,
|
|
max_partition_id="20200101",
|
|
)
|
|
query = profiler.generate_partition_profiler_query(
|
|
project="test_project",
|
|
schema="test_dataset",
|
|
table=test_table,
|
|
)
|
|
expected_query = """
|
|
SELECT
|
|
*
|
|
FROM
|
|
`test_project.test_dataset.test_table`
|
|
WHERE
|
|
`date` BETWEEN TIMESTAMP('2020-01-01 00:00:00') AND TIMESTAMP('2020-01-02 00:00:00')
|
|
""".strip()
|
|
|
|
assert query[0] == "20200101"
|
|
assert query[1]
|
|
assert expected_query == query[1].strip()
|
|
|
|
|
|
# If partition time is passed in we force to use that time instead of the max partition id
|
|
def test_generate_day_partitioned_partition_profiler_query_with_set_partition_time():
|
|
column = BigqueryColumn(
|
|
name="date",
|
|
field_path="date",
|
|
ordinal_position=1,
|
|
data_type="TIMESTAMP",
|
|
is_partition_column=True,
|
|
cluster_column_position=None,
|
|
comment=None,
|
|
is_nullable=False,
|
|
)
|
|
partition_info = PartitionInfo(type="DAY", field="date", column=column)
|
|
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
|
|
test_table = BigqueryTable(
|
|
name="test_table",
|
|
comment="test_comment",
|
|
rows_count=1,
|
|
size_in_bytes=1,
|
|
last_altered=datetime.now(timezone.utc),
|
|
created=datetime.now(timezone.utc),
|
|
partition_info=partition_info,
|
|
max_partition_id="20200101",
|
|
)
|
|
query = profiler.generate_partition_profiler_query(
|
|
project="test_project",
|
|
schema="test_dataset",
|
|
table=test_table,
|
|
)
|
|
expected_query = """
|
|
SELECT
|
|
*
|
|
FROM
|
|
`test_project.test_dataset.test_table`
|
|
WHERE
|
|
`date` BETWEEN TIMESTAMP('2020-01-01 00:00:00') AND TIMESTAMP('2020-01-02 00:00:00')
|
|
""".strip()
|
|
|
|
assert query[0] == "20200101"
|
|
assert query[1]
|
|
assert expected_query == query[1].strip()
|
|
|
|
|
|
def test_generate_hour_partitioned_partition_profiler_query():
|
|
column = BigqueryColumn(
|
|
name="partition_column",
|
|
field_path="partition_column",
|
|
ordinal_position=1,
|
|
data_type="TIMESTAMP",
|
|
is_partition_column=True,
|
|
cluster_column_position=None,
|
|
comment=None,
|
|
is_nullable=False,
|
|
)
|
|
partition_info = PartitionInfo(type="DAY", field="date", column=column)
|
|
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
|
|
test_table = BigqueryTable(
|
|
name="test_table",
|
|
comment="test_comment",
|
|
rows_count=1,
|
|
size_in_bytes=1,
|
|
last_altered=datetime.now(timezone.utc),
|
|
created=datetime.now(timezone.utc),
|
|
partition_info=partition_info,
|
|
max_partition_id="2020010103",
|
|
)
|
|
query = profiler.generate_partition_profiler_query(
|
|
project="test_project",
|
|
schema="test_dataset",
|
|
table=test_table,
|
|
partition_datetime=None,
|
|
)
|
|
expected_query = """
|
|
SELECT
|
|
*
|
|
FROM
|
|
`test_project.test_dataset.test_table`
|
|
WHERE
|
|
`partition_column` BETWEEN TIMESTAMP('2020-01-01 03:00:00') AND TIMESTAMP('2020-01-01 04:00:00')
|
|
""".strip()
|
|
|
|
assert query[0] == "2020010103"
|
|
assert query[1]
|
|
assert expected_query == query[1].strip()
|
|
|
|
|
|
# Ingestion partitioned tables do not have partition column in the schema as it uses a psudo column _PARTITIONTIME to partition
|
|
def test_generate_ingestion_partitioned_partition_profiler_query():
|
|
partition_info = PartitionInfo(type="DAY", field="date")
|
|
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
|
|
test_table = BigqueryTable(
|
|
name="test_table",
|
|
comment="test_comment",
|
|
rows_count=1,
|
|
size_in_bytes=1,
|
|
last_altered=datetime.now(timezone.utc),
|
|
created=datetime.now(timezone.utc),
|
|
partition_info=partition_info,
|
|
max_partition_id="20200101",
|
|
)
|
|
query = profiler.generate_partition_profiler_query(
|
|
project="test_project",
|
|
schema="test_dataset",
|
|
table=test_table,
|
|
)
|
|
expected_query = """
|
|
SELECT
|
|
*
|
|
FROM
|
|
`test_project.test_dataset.test_table`
|
|
WHERE
|
|
`_PARTITIONTIME` BETWEEN TIMESTAMP('2020-01-01 00:00:00') AND TIMESTAMP('2020-01-02 00:00:00')
|
|
""".strip()
|
|
|
|
assert query[0] == "20200101"
|
|
assert query[1]
|
|
assert expected_query == query[1].strip()
|
|
|
|
|
|
def test_generate_sharded_table_profiler_query():
|
|
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())
|
|
test_table = BigqueryTable(
|
|
name="my_sharded_table",
|
|
max_shard_id="20200101",
|
|
comment="test_comment",
|
|
rows_count=1,
|
|
size_in_bytes=1,
|
|
last_altered=datetime.now(timezone.utc),
|
|
created=datetime.now(timezone.utc),
|
|
)
|
|
query = profiler.generate_partition_profiler_query(
|
|
project="test_project",
|
|
schema="test_dataset",
|
|
table=test_table,
|
|
)
|
|
|
|
assert query[0] == "20200101"
|
|
assert query[1] is None
|