fix(ingestion): profiling - Fixing partitioned table profiling in BQ (#5283)

This commit is contained in:
Tamas Nemeth 2022-06-29 14:00:52 +02:00 committed by GitHub
parent b73477f31e
commit fb1b1db7f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 11 deletions

View File

@ -11,6 +11,7 @@ from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tupl
from great_expectations import __version__ as ge_version from great_expectations import __version__ as ge_version
from datahub.configuration.common import ConfigurationError
from datahub.telemetry import stats, telemetry from datahub.telemetry import stats, telemetry
# Fun compatibility hack! GE version 0.13.44 broke compatibility with SQLAlchemy 1.3.24. # Fun compatibility hack! GE version 0.13.44 broke compatibility with SQLAlchemy 1.3.24.
@ -872,7 +873,16 @@ class DatahubGEProfiler:
ge_config["schema"] = temp_table_db ge_config["schema"] = temp_table_db
if self.config.bigquery_temp_table_schema: if self.config.bigquery_temp_table_schema:
bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" num_parts = self.config.bigquery_temp_table_schema.split(".")
# If we only have 1 part that means the project_id is missing from the table name and we add it
if len(num_parts) == 1:
bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
elif len(num_parts) == 2:
bigquery_temp_table = f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
else:
raise ConfigurationError(
f"bigquery_temp_table_schema should be either project.dataset or dataset format but it was: {self.config.bigquery_temp_table_schema}"
)
else: else:
assert table assert table
table_parts = table.split(".") table_parts = table.split(".")
@ -970,12 +980,15 @@ class DatahubGEProfiler:
if platform is not None and platform == "bigquery": if platform is not None and platform == "bigquery":
# This is done as GE makes the name as DATASET.TABLE # This is done as GE makes the name as DATASET.TABLE
# but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups # but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups
logger.debug(f"Setting table name to be {pretty_name}")
batch._table = sa.text(pretty_name)
name_parts = pretty_name.split(".") name_parts = pretty_name.split(".")
if len(name_parts) != 3: if len(name_parts) != 3:
logger.error( logger.error(
f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts." f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts."
) )
# If we only have two parts that means the project_id is missing from the table name and we add it
# Temp tables has 3 parts while normal tables only has 2 parts
if len(str(batch._table).split(".")) == 2:
batch._table = sa.text(f"{name_parts[0]}.{str(batch._table)}")
logger.debug(f"Setting table name to be {batch._table}")
return batch return batch

View File

@ -160,6 +160,11 @@ FROM `{project_id}.{schema}.__TABLES_SUMMARY__`
WHERE table_id LIKE '{table}%' WHERE table_id LIKE '{table}%'
""".strip() """.strip()
BQ_GET_LATEST_DATE_TABLE = """
SELECT MAX(table_name) as max_shard
FROM `{project_id}.{schema}.INFORMATION_SCHEMA.TABLES`
where REGEXP_CONTAINS(table_name, r'^\\d{{{date_length}}}$')
""".strip()
# The existing implementation of this method can be found here: # The existing implementation of this method can be found here:
# https://github.com/googleapis/python-bigquery-sqlalchemy/blob/main/sqlalchemy_bigquery/base.py#L1018-L1025. # https://github.com/googleapis/python-bigquery-sqlalchemy/blob/main/sqlalchemy_bigquery/base.py#L1018-L1025.
@ -707,11 +712,16 @@ class BigQuerySource(SQLAlchemySource):
engine = self._get_engine(for_run_sql=True) engine = self._get_engine(for_run_sql=True)
if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids: if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids:
with engine.connect() as con: with engine.connect() as con:
sql = BQ_GET_LATEST_SHARD.format( if table_name is not None:
project_id=project_id, sql = BQ_GET_LATEST_SHARD.format(
schema=schema, project_id=project_id,
table=table_name, schema=schema,
) table=table_name,
)
else:
sql = BQ_GET_LATEST_DATE_TABLE.format(
project_id=project_id, schema=schema, date_length=len(shard)
)
result = con.execute(sql) result = con.execute(sql)
for row in result: for row in result:

View File

@ -281,14 +281,16 @@ class BigQueryTableRef:
if matches: if matches:
table_name = matches.group(1) table_name = matches.group(1)
if matches: if matches:
logger.debug(
f"Found sharded table {self.table}. Using {table_name} as the table name."
)
if not table_name: if not table_name:
logger.debug( logger.debug(
f"Using dataset id {self.dataset} as table name because table only contains date value {self.table}" f"Using dataset id {self.dataset} as table name because table only contains date value {self.table}"
) )
table_name = self.dataset table_name = self.dataset
logger.debug(
f"Found sharded table {self.table}. Using {table_name} as the table name."
)
return BigQueryTableRef(self.project, self.dataset, table_name) return BigQueryTableRef(self.project, self.dataset, table_name)
# Handle table snapshots. # Handle table snapshots.