mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-14 03:31:40 +00:00
feat(ingest/bigquery): support bigquery profiling with sampling (#8794)
This commit is contained in:
parent
f4da93988e
commit
99d7eb756c
@ -616,6 +616,9 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
|
|||||||
logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries")
|
logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries")
|
||||||
self.query_combiner.flush()
|
self.query_combiner.flush()
|
||||||
|
|
||||||
|
if self.config.use_sampling and not self.config.limit:
|
||||||
|
self.update_dataset_batch_use_sampling(profile)
|
||||||
|
|
||||||
columns_profiling_queue: List[_SingleColumnSpec] = []
|
columns_profiling_queue: List[_SingleColumnSpec] = []
|
||||||
if columns_to_profile:
|
if columns_to_profile:
|
||||||
for column in all_columns:
|
for column in all_columns:
|
||||||
@ -737,6 +740,61 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
|
|||||||
self.query_combiner.flush()
|
self.query_combiner.flush()
|
||||||
return profile
|
return profile
|
||||||
|
|
||||||
|
def update_dataset_batch_use_sampling(self, profile: DatasetProfileClass) -> None:
|
||||||
|
if (
|
||||||
|
self.dataset.engine.dialect.name.lower() == BIGQUERY
|
||||||
|
and profile.rowCount
|
||||||
|
and profile.rowCount > self.config.sample_size
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
According to BigQuery Sampling Docs(https://cloud.google.com/bigquery/docs/table-sampling),
|
||||||
|
BigQuery does not cache the results of a query that includes a TABLESAMPLE clause and the
|
||||||
|
query may return different results every time. Calculating different column level metrics
|
||||||
|
on different sampling results is possible however each query execution would incur the cost
|
||||||
|
of reading data from storage. Also, using different table samples may create non-coherent
|
||||||
|
representation of column level metrics, for example, minimum value of a column in one sample
|
||||||
|
can be greater than maximum value of the column in another sample.
|
||||||
|
|
||||||
|
It is observed that for a simple select * query with TABLESAMPLE, results are cached and
|
||||||
|
stored in temporary table. This can be (ab)used and all column level profiling calculations
|
||||||
|
can be performed against it.
|
||||||
|
|
||||||
|
Risks:
|
||||||
|
1. All the risks mentioned in notes of `create_bigquery_temp_table` are also
|
||||||
|
applicable here.
|
||||||
|
2. TABLESAMPLE query may read entire table for small tables that are written
|
||||||
|
as single data block. This may incorrectly label datasetProfile's partition as
|
||||||
|
"SAMPLE", although profile is for entire table.
|
||||||
|
3. Table Sampling in BigQuery is a Pre-GA (Preview) feature.
|
||||||
|
"""
|
||||||
|
sample_pc = 100 * self.config.sample_size / profile.rowCount
|
||||||
|
sql = (
|
||||||
|
f"SELECT * FROM {str(self.dataset._table)} "
|
||||||
|
+ f"TABLESAMPLE SYSTEM ({sample_pc:.3f} percent)"
|
||||||
|
)
|
||||||
|
temp_table_name = create_bigquery_temp_table(
|
||||||
|
self,
|
||||||
|
sql,
|
||||||
|
self.dataset_name,
|
||||||
|
self.dataset.engine.engine.raw_connection(),
|
||||||
|
)
|
||||||
|
if temp_table_name:
|
||||||
|
self.dataset._table = sa.text(temp_table_name)
|
||||||
|
logger.debug(f"Setting table name to be {self.dataset._table}")
|
||||||
|
|
||||||
|
if (
|
||||||
|
profile.partitionSpec
|
||||||
|
and profile.partitionSpec.type == PartitionTypeClass.FULL_TABLE
|
||||||
|
):
|
||||||
|
profile.partitionSpec = PartitionSpecClass(
|
||||||
|
type=PartitionTypeClass.QUERY, partition="SAMPLE"
|
||||||
|
)
|
||||||
|
elif (
|
||||||
|
profile.partitionSpec
|
||||||
|
and profile.partitionSpec.type == PartitionTypeClass.PARTITION
|
||||||
|
):
|
||||||
|
profile.partitionSpec.partition += " SAMPLE"
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class GEContext:
|
class GEContext:
|
||||||
@ -961,84 +1019,18 @@ class DatahubGEProfiler:
|
|||||||
if platform == BIGQUERY and (
|
if platform == BIGQUERY and (
|
||||||
custom_sql or self.config.limit or self.config.offset
|
custom_sql or self.config.limit or self.config.offset
|
||||||
):
|
):
|
||||||
# On BigQuery, we need to bypass GE's mechanism for creating temporary tables because
|
if custom_sql is not None:
|
||||||
# it requires create/delete table permissions.
|
# Note that limit and offset are not supported for custom SQL.
|
||||||
import google.cloud.bigquery.job.query
|
bq_sql = custom_sql
|
||||||
from google.cloud.bigquery.dbapi.cursor import Cursor as BigQueryCursor
|
else:
|
||||||
|
bq_sql = f"SELECT * FROM `{table}`"
|
||||||
raw_connection = self.base_engine.raw_connection()
|
if self.config.limit:
|
||||||
try:
|
bq_sql += f" LIMIT {self.config.limit}"
|
||||||
cursor: "BigQueryCursor" = cast(
|
if self.config.offset:
|
||||||
"BigQueryCursor", raw_connection.cursor()
|
bq_sql += f" OFFSET {self.config.offset}"
|
||||||
)
|
bigquery_temp_table = create_bigquery_temp_table(
|
||||||
if custom_sql is not None:
|
self, bq_sql, pretty_name, self.base_engine.raw_connection()
|
||||||
# Note that limit and offset are not supported for custom SQL.
|
)
|
||||||
# Presence of custom SQL represents that the bigquery table
|
|
||||||
# is either partitioned or sharded
|
|
||||||
bq_sql = custom_sql
|
|
||||||
else:
|
|
||||||
bq_sql = f"SELECT * FROM `{table}`"
|
|
||||||
if self.config.limit:
|
|
||||||
bq_sql += f" LIMIT {self.config.limit}"
|
|
||||||
if self.config.offset:
|
|
||||||
bq_sql += f" OFFSET {self.config.offset}"
|
|
||||||
try:
|
|
||||||
cursor.execute(bq_sql)
|
|
||||||
except Exception as e:
|
|
||||||
if not self.config.catch_exceptions:
|
|
||||||
raise e
|
|
||||||
logger.exception(
|
|
||||||
f"Encountered exception while profiling {pretty_name}"
|
|
||||||
)
|
|
||||||
self.report.report_warning(
|
|
||||||
pretty_name,
|
|
||||||
f"Profiling exception {e} when running custom sql {bq_sql}",
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Great Expectations batch v2 API, which is the one we're using, requires
|
|
||||||
# a concrete table name against which profiling is executed. Normally, GE
|
|
||||||
# creates a table with an expiry time of 24 hours. However, we don't want the
|
|
||||||
# temporary tables to stick around that long, so we'd also have to delete them
|
|
||||||
# ourselves. As such, the profiler required create and delete table permissions
|
|
||||||
# on BigQuery.
|
|
||||||
#
|
|
||||||
# It turns out that we can (ab)use the BigQuery cached results feature
|
|
||||||
# to avoid creating temporary tables ourselves. For almost all queries, BigQuery
|
|
||||||
# will store the results in a temporary, cached results table when an explicit
|
|
||||||
# destination table is not provided. These tables are pretty easy to identify
|
|
||||||
# because they live in "anonymous datasets" and have a name that looks like
|
|
||||||
# "project-id._d60e97aec7f471046a960419adb6d44e98300db7.anon10774d0ea85fd20fe9671456c5c53d5f1b85e1b17bedb232dfce91661a219ee3"
|
|
||||||
# These tables are per-user and per-project, so there's no risk of permissions escalation.
|
|
||||||
# As per the docs, the cached results tables typically have a lifetime of 24 hours,
|
|
||||||
# which should be plenty for our purposes.
|
|
||||||
# See https://cloud.google.com/bigquery/docs/cached-results for more details.
|
|
||||||
#
|
|
||||||
# The code below extracts the name of the cached results table from the query job
|
|
||||||
# and points GE to that table for profiling.
|
|
||||||
#
|
|
||||||
# Risks:
|
|
||||||
# 1. If the query results are larger than the maximum response size, BigQuery will
|
|
||||||
# not cache the results. According to the docs https://cloud.google.com/bigquery/quotas,
|
|
||||||
# the maximum response size is 10 GB compressed.
|
|
||||||
# 2. The cache lifetime of 24 hours is "best-effort" and hence not guaranteed.
|
|
||||||
# 3. Tables with column-level security may not be cached, and tables with row-level
|
|
||||||
# security will not be cached.
|
|
||||||
# 4. BigQuery "discourages" using cached results directly, but notes that
|
|
||||||
# the current semantics do allow it.
|
|
||||||
#
|
|
||||||
# The better long-term solution would be to use a subquery avoid this whole
|
|
||||||
# temporary table dance. However, that would require either a) upgrading to
|
|
||||||
# use GE's batch v3 API or b) bypassing GE altogether.
|
|
||||||
|
|
||||||
query_job: Optional[
|
|
||||||
"google.cloud.bigquery.job.query.QueryJob"
|
|
||||||
] = cursor._query_job
|
|
||||||
assert query_job
|
|
||||||
temp_destination_table = query_job.destination
|
|
||||||
bigquery_temp_table = f"{temp_destination_table.project}.{temp_destination_table.dataset_id}.{temp_destination_table.table_id}"
|
|
||||||
finally:
|
|
||||||
raw_connection.close()
|
|
||||||
|
|
||||||
if platform == BIGQUERY:
|
if platform == BIGQUERY:
|
||||||
if bigquery_temp_table:
|
if bigquery_temp_table:
|
||||||
@ -1128,6 +1120,7 @@ class DatahubGEProfiler:
|
|||||||
**batch_kwargs,
|
**batch_kwargs,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
if platform == BIGQUERY:
|
if platform == BIGQUERY:
|
||||||
# This is done as GE makes the name as DATASET.TABLE
|
# This is done as GE makes the name as DATASET.TABLE
|
||||||
# but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups
|
# but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups
|
||||||
@ -1153,3 +1146,76 @@ def _get_column_types_to_ignore(dialect_name: str) -> List[str]:
|
|||||||
return ["JSON"]
|
return ["JSON"]
|
||||||
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def create_bigquery_temp_table(
|
||||||
|
instance: Union[DatahubGEProfiler, _SingleDatasetProfiler],
|
||||||
|
bq_sql: str,
|
||||||
|
table_pretty_name: str,
|
||||||
|
raw_connection: Any,
|
||||||
|
) -> Optional[str]:
|
||||||
|
# On BigQuery, we need to bypass GE's mechanism for creating temporary tables because
|
||||||
|
# it requires create/delete table permissions.
|
||||||
|
import google.cloud.bigquery.job.query
|
||||||
|
from google.cloud.bigquery.dbapi.cursor import Cursor as BigQueryCursor
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor: "BigQueryCursor" = cast("BigQueryCursor", raw_connection.cursor())
|
||||||
|
try:
|
||||||
|
cursor.execute(bq_sql)
|
||||||
|
except Exception as e:
|
||||||
|
if not instance.config.catch_exceptions:
|
||||||
|
raise e
|
||||||
|
logger.exception(
|
||||||
|
f"Encountered exception while profiling {table_pretty_name}"
|
||||||
|
)
|
||||||
|
instance.report.report_warning(
|
||||||
|
table_pretty_name,
|
||||||
|
f"Profiling exception {e} when running custom sql {bq_sql}",
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Great Expectations batch v2 API, which is the one we're using, requires
|
||||||
|
# a concrete table name against which profiling is executed. Normally, GE
|
||||||
|
# creates a table with an expiry time of 24 hours. However, we don't want the
|
||||||
|
# temporary tables to stick around that long, so we'd also have to delete them
|
||||||
|
# ourselves. As such, the profiler required create and delete table permissions
|
||||||
|
# on BigQuery.
|
||||||
|
#
|
||||||
|
# It turns out that we can (ab)use the BigQuery cached results feature
|
||||||
|
# to avoid creating temporary tables ourselves. For almost all queries, BigQuery
|
||||||
|
# will store the results in a temporary, cached results table when an explicit
|
||||||
|
# destination table is not provided. These tables are pretty easy to identify
|
||||||
|
# because they live in "anonymous datasets" and have a name that looks like
|
||||||
|
# "project-id._d60e97aec7f471046a960419adb6d44e98300db7.anon10774d0ea85fd20fe9671456c5c53d5f1b85e1b17bedb232dfce91661a219ee3"
|
||||||
|
# These tables are per-user and per-project, so there's no risk of permissions escalation.
|
||||||
|
# As per the docs, the cached results tables typically have a lifetime of 24 hours,
|
||||||
|
# which should be plenty for our purposes.
|
||||||
|
# See https://cloud.google.com/bigquery/docs/cached-results for more details.
|
||||||
|
#
|
||||||
|
# The code below extracts the name of the cached results table from the query job
|
||||||
|
# and points GE to that table for profiling.
|
||||||
|
#
|
||||||
|
# Risks:
|
||||||
|
# 1. If the query results are larger than the maximum response size, BigQuery will
|
||||||
|
# not cache the results. According to the docs https://cloud.google.com/bigquery/quotas,
|
||||||
|
# the maximum response size is 10 GB compressed.
|
||||||
|
# 2. The cache lifetime of 24 hours is "best-effort" and hence not guaranteed.
|
||||||
|
# 3. Tables with column-level security may not be cached, and tables with row-level
|
||||||
|
# security will not be cached.
|
||||||
|
# 4. BigQuery "discourages" using cached results directly, but notes that
|
||||||
|
# the current semantics do allow it.
|
||||||
|
#
|
||||||
|
# The better long-term solution would be to use a subquery avoid this whole
|
||||||
|
# temporary table dance. However, that would require either a) upgrading to
|
||||||
|
# use GE's batch v3 API or b) bypassing GE altogether.
|
||||||
|
|
||||||
|
query_job: Optional[
|
||||||
|
"google.cloud.bigquery.job.query.QueryJob"
|
||||||
|
] = cursor._query_job
|
||||||
|
assert query_job
|
||||||
|
temp_destination_table = query_job.destination
|
||||||
|
bigquery_temp_table = f"{temp_destination_table.project}.{temp_destination_table.dataset_id}.{temp_destination_table.table_id}"
|
||||||
|
return bigquery_temp_table
|
||||||
|
finally:
|
||||||
|
raw_connection.close()
|
||||||
|
@ -145,10 +145,26 @@ class GEProfilingConfig(ConfigModel):
|
|||||||
# Hidden option - used for debugging purposes.
|
# Hidden option - used for debugging purposes.
|
||||||
catch_exceptions: bool = Field(default=True, description="")
|
catch_exceptions: bool = Field(default=True, description="")
|
||||||
|
|
||||||
partition_profiling_enabled: bool = Field(default=True, description="")
|
partition_profiling_enabled: bool = Field(
|
||||||
|
default=True,
|
||||||
|
description="Whether to profile partitioned tables. Only BigQuery supports this. "
|
||||||
|
"If enabled, latest partition data is used for profiling.",
|
||||||
|
)
|
||||||
partition_datetime: Optional[datetime.datetime] = Field(
|
partition_datetime: Optional[datetime.datetime] = Field(
|
||||||
default=None,
|
default=None,
|
||||||
description="For partitioned datasets profile only the partition which matches the datetime or profile the latest one if not set. Only Bigquery supports this.",
|
description="If specified, profile only the partition which matches this datetime. "
|
||||||
|
"If not specified, profile the latest partition. Only Bigquery supports this.",
|
||||||
|
)
|
||||||
|
use_sampling: bool = Field(
|
||||||
|
default=True,
|
||||||
|
description="Whether to profile column level stats on sample of table. Only BigQuery supports this. "
|
||||||
|
"If enabled, profiling is done on rows sampled from table. Sampling is not done for smaller tables. ",
|
||||||
|
)
|
||||||
|
|
||||||
|
sample_size: int = Field(
|
||||||
|
default=1000,
|
||||||
|
description="Number of rows to be sampled from table for column level profiling."
|
||||||
|
"Applicable only if `use_sampling` is set to True.",
|
||||||
)
|
)
|
||||||
|
|
||||||
@pydantic.root_validator(pre=True)
|
@pydantic.root_validator(pre=True)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user