fix(ingest): bigquery - Graceful bq partition id date parsing failure (#5386)

This commit is contained in:
Tamas Nemeth 2022-07-13 13:21:45 +02:00 committed by GitHub
parent 4c6d427800
commit 9ec4fbae86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 8 deletions

View File

@ -868,17 +868,29 @@ class BigQuerySource(SQLAlchemySource):
partitioned table.
See more about partitioned tables at https://cloud.google.com/bigquery/docs/partitioned-tables
"""
logger.debug(
f"generate partition profiler query for schema: {schema} and table {table}, partition_datetime: {partition_datetime}"
)
partition = self.get_latest_partition(schema, table)
if partition:
partition_where_clause: str
logger.debug(f"{table} is partitioned and partition column is {partition}")
(
partition_datetime,
upper_bound_partition_datetime,
) = get_partition_range_from_partition_id(
partition.partition_id, partition_datetime
)
try:
(
partition_datetime,
upper_bound_partition_datetime,
) = get_partition_range_from_partition_id(
partition.partition_id, partition_datetime
)
except ValueError as e:
logger.error(
f"Unable to get partition range for partition id: {partition.partition_id} it failed with exception {e}"
)
self.report.invalid_partition_ids[
f"{schema}.{table}"
] = partition.partition_id
return None, None
if partition.data_type in ("TIMESTAMP", "DATETIME"):
partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format(
column_name=partition.column_name,

View File

@ -1442,7 +1442,8 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
database=None, schema=schema, table=table
):
self.report.report_warning(
"profile skipped as partitioned table empty", dataset_name
"profile skipped as partitioned table is empty or partition id was invalid",
dataset_name,
)
continue

View File

@ -36,3 +36,4 @@ class BigQueryReport(SQLSourceReport):
table_metadata: Dict[str, List[str]] = field(default_factory=dict)
profile_table_selection_criteria: Dict[str, str] = field(default_factory=dict)
selected_profile_tables: Dict[str, List[str]] = field(default_factory=dict)
invalid_partition_ids: Dict[str, str] = field(default_factory=dict)