feat(ingest): bigquery - profile only the latest partition/shard (#3930)

This commit is contained in:
Tamas Nemeth 2022-02-01 19:05:35 +01:00 committed by GitHub
parent 1afe8876b7
commit 928ab74f33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 354 additions and 51 deletions

View File

@ -20,6 +20,7 @@ source:
#end_time: 2023-12-15T20:08:23.091Z
#profiling:
# enabled: true
# bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created-for-profiling
# turn_off_expensive_profiling_metrics: false
# query_combiner_enabled: true
# max_number_of_fields_to_profile: 8

View File

@ -136,7 +136,6 @@ As a SQL-based service, the Athena integration is also supported by our SQL prof
| `bigquery_audit_metadata_datasets` | | None | A list of datasets that contain a table named `cloudaudit_googleapis_com_data_access` which contain BigQuery audit logs, specifically, those containing `BigQueryAuditMetadata`. It is recommended that the project of the dataset is also specified, for example, `projectA.datasetB`. |
The following parameters are only relevant if include_table_lineage is set to true:
- max_query_duration
@ -150,9 +149,21 @@ Note: the bigquery_audit_metadata_datasets parameter receives a list of datasets
Note: Since bigquery source also supports dataset level lineage, the auth client will require additional permissions to be able to access the google audit logs. Refer the permissions section in bigquery-usage section below which also accesses the audit logs.
## Compatibility
## Profiling
For profiling you have to set a table schema where Great Expectation (the profiling framework we use) can create temporary
views by setting `profiling.bigquery_temp_table_schema` property.
Coming soon!
```yaml
profiling:
enabled: true
bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created
```
:::note
Due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables.
:::
# BigQuery Usage Stats

View File

@ -69,29 +69,30 @@ sink:
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| --------------------------------------------------- | -------- |----------------------| ------------------------------------------------------------------------------------ |
| `profiling.enabled` | | `False` | Whether profiling should be done. |
| `profiling.limit` | | | Max number of documents to profile. By default, profiles all documents. |
| `profiling.offset` | | | Offset in documents to profile. By default, uses no offset. |
| `profiling.max_workers` | | `5 * os.cpu_count()` | Number of worker threads to use for profiling. Set to 1 to disable. |
| `profiling.query_combiner_enabled` | | `True` | *This feature is still experimental and can be disabled if it causes issues.* Reduces the total number of queries issued and speeds up profiling by dynamically combining SQL queries where possible. |
| `profile_pattern.allow` | | `*` | List of regex patterns for tables or table columns to profile. Defaults to all. |
| `profile_pattern.deny` | | | List of regex patterns for tables or table columns to not profile. Defaults to none. |
| `profile_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `profiling.turn_off_expensive_profiling_metrics` | | False | Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.|
| `profiling.max_number_of_fields_to_profile` | | `None` | A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.|
| `profiling.profile_table_level_only` | | False | Whether to perform profiling at table-level only, or include column-level profiling as well.|
| `profiling.include_field_null_count` | | `True` | Whether to profile for the number of nulls for each column. |
| `profiling.include_field_min_value` | | `True` | Whether to profile for the min value of numeric columns. |
| `profiling.include_field_max_value` | | `True` | Whether to profile for the max value of numeric columns. |
| `profiling.include_field_mean_value` | | `True` | Whether to profile for the mean value of numeric columns. |
| `profiling.include_field_median_value` | | `True` | Whether to profile for the median value of numeric columns. |
| `profiling.include_field_stddev_value` | | `True` | Whether to profile for the standard deviation of numeric columns. |
| `profiling.include_field_quantiles` | | `False` | Whether to profile for the quantiles of numeric columns. |
| `profiling.include_field_distinct_value_frequencies` | | `False` | Whether to profile for distinct value frequencies. |
| `profiling.include_field_histogram` | | `False` | Whether to profile for the histogram for numeric fields. |
| `profiling.include_field_sample_values` | | `True` | Whether to profile for the sample values for all columns. |
| Field | Required | Default | Description |
| -------------------------------------------------- | -------- |----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `profiling.enabled` | | `False` | Whether profiling should be done. |
| `profiling.bigquery_temp_table_schema` | | | On bigquery for profiling partitioned tables needs to create temporary views. You have to define a schema where these will be created. Views will be cleaned up after profiler runs. (Great expectation tech details about this [here](https://legacy.docs.greatexpectations.io/en/0.9.0/reference/integrations/bigquery.html#custom-queries-with-sql-datasource). |
| `profiling.limit` | | | Max number of documents to profile. By default, profiles all documents. |
| `profiling.offset` | | | Offset in documents to profile. By default, uses no offset. |
| `profiling.max_workers` | | `5 * os.cpu_count()` | Number of worker threads to use for profiling. Set to 1 to disable. |
| `profiling.query_combiner_enabled` | | `True` | *This feature is still experimental and can be disabled if it causes issues.* Reduces the total number of queries issued and speeds up profiling by dynamically combining SQL queries where possible. |
| `profile_pattern.allow` | | `*` | List of regex patterns for tables or table columns to profile. Defaults to all. |
| `profile_pattern.deny` | | | List of regex patterns for tables or table columns to not profile. Defaults to none. |
| `profile_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `profiling.turn_off_expensive_profiling_metrics` | | False | Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10. |
| `profiling.max_number_of_fields_to_profile` | | `None` | A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up. |
| `profiling.profile_table_level_only` | | False | Whether to perform profiling at table-level only, or include column-level profiling as well. |
| `profiling.include_field_null_count` | | `True` | Whether to profile for the number of nulls for each column. |
| `profiling.include_field_min_value` | | `True` | Whether to profile for the min value of numeric columns. |
| `profiling.include_field_max_value` | | `True` | Whether to profile for the max value of numeric columns. |
| `profiling.include_field_mean_value` | | `True` | Whether to profile for the mean value of numeric columns. |
| `profiling.include_field_median_value` | | `True` | Whether to profile for the median value of numeric columns. |
| `profiling.include_field_stddev_value` | | `True` | Whether to profile for the standard deviation of numeric columns. |
| `profiling.include_field_quantiles` | | `False` | Whether to profile for the quantiles of numeric columns. |
| `profiling.include_field_distinct_value_frequencies` | | `False` | Whether to profile for distinct value frequencies. |
| `profiling.include_field_histogram` | | `False` | Whether to profile for the histogram for numeric fields. |
| `profiling.include_field_sample_values` | | `True` | Whether to profile for the sample values for all columns. |
## Compatibility
Coming soon!

View File

@ -53,6 +53,7 @@ from datahub.metadata.schema_classes import (
DatasetFieldProfileClass,
DatasetProfileClass,
HistogramClass,
PartitionSpecClass,
QuantileClass,
ValueFrequencyClass,
)
@ -196,6 +197,8 @@ class GEProfilingConfig(ConfigModel):
# Hidden option - used for debugging purposes.
catch_exceptions: bool = True
bigquery_temp_table_schema: Optional[str] = None
@pydantic.root_validator()
def ensure_field_level_settings_are_normalized(
cls: "GEProfilingConfig", values: Dict[str, Any]
@ -334,6 +337,7 @@ class _SingleColumnSpec:
class _SingleDatasetProfiler(BasicDatasetProfilerBase):
dataset: Dataset
dataset_name: str
partition: Optional[str]
config: GEProfilingConfig
report: SQLSourceReport
@ -528,6 +532,8 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
)
profile = DatasetProfileClass(timestampMillis=get_sys_time())
if self.partition:
profile.partitionSpec = PartitionSpecClass(partition=self.partition)
profile.fieldProfiles = []
self._get_dataset_rows(profile)
@ -799,47 +805,87 @@ class DatahubGEProfiler:
request: GEProfilerRequest,
) -> Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]:
return request, self._generate_single_profile(
query_combiner,
request.pretty_name,
query_combiner=query_combiner,
pretty_name=request.pretty_name,
**request.batch_kwargs,
)
def _drop_bigquery_temp_table(self, ge_config: dict) -> None:
if "bigquery_temp_table" in ge_config:
try:
with self.base_engine.connect() as connection:
connection.execute(
f"drop view if exists `{ge_config.get('bigquery_temp_table')}`"
)
except Exception:
logger.warning(
f"Unable to delete bigquery temporary table: {ge_config.get('bigquery_temp_table')}"
)
def _generate_single_profile(
self,
query_combiner: SQLAlchemyQueryCombiner,
pretty_name: str,
schema: str = None,
table: str = None,
partition: Optional[str] = None,
custom_sql: str = None,
**kwargs: Any,
) -> Optional[DatasetProfileClass]:
logger.info(f"Profiling {pretty_name}")
bigquery_temp_table: Optional[str] = None
ge_config = {
"schema": schema,
"table": table,
"limit": self.config.limit,
"offset": self.config.offset,
**kwargs,
}
if custom_sql:
if self.config.bigquery_temp_table_schema:
bigquery_temp_table = (
f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
)
ge_config = {
"query": custom_sql,
"bigquery_temp_table": bigquery_temp_table,
**kwargs,
}
with self._ge_context() as ge_context, PerfTimer() as timer:
try:
logger.info(f"Profiling {pretty_name}")
batch = self._get_ge_dataset(
ge_context,
{
"schema": schema,
"table": table,
"limit": self.config.limit,
"offset": self.config.offset,
**kwargs,
},
ge_config,
pretty_name=pretty_name,
)
profile = _SingleDatasetProfiler(
batch, pretty_name, self.config, self.report, query_combiner
batch,
pretty_name,
partition,
self.config,
self.report,
query_combiner,
).generate_dataset_profile()
logger.info(
f"Finished profiling {pretty_name}; took {(timer.elapsed_seconds()):.3f} seconds"
)
self._drop_bigquery_temp_table(ge_config)
return profile
except Exception as e:
if not self.config.catch_exceptions:
raise e
logger.exception(f"Encountered exception while profiling {pretty_name}")
self.report.report_failure(pretty_name, f"Profiling exception {e}")
self._drop_bigquery_temp_table(ge_config)
return None
def _get_ge_dataset(

View File

@ -1,10 +1,13 @@
import collections
import datetime
import functools
import json
import logging
import os
import re
import tempfile
import textwrap
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
from unittest.mock import patch
@ -13,8 +16,10 @@ from unittest.mock import patch
import pybigquery # noqa: F401
import pybigquery.sqlalchemy_bigquery
import pydantic
from dateutil import parser
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector
from datahub.configuration import ConfigModel
@ -70,6 +75,41 @@ AND
timestamp < "{end_time}"
""".strip()
BQ_GET_LATEST_PARTITION_TEMPLATE = """
SELECT
c.table_catalog,
c.table_schema,
c.table_name,
c.column_name,
c.data_type,
max(p.partition_id) as partition_id
FROM
`{project_id}.{schema}.INFORMATION_SCHEMA.COLUMNS` as c
join `{project_id}.{schema}.INFORMATION_SCHEMA.PARTITIONS` as p
on
c.table_catalog = p.table_catalog
and c.table_schema = p.table_schema
and c.table_name = p.table_name
where
is_partitioning_column = 'YES'
-- Filter out special partitions (https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables)
and p.partition_id not in ('__NULL__', '__UNPARTITIONED__')
group by
c.table_catalog,
c.table_schema,
c.table_name,
c.column_name,
c.data_type
""".strip()
SHARDED_TABLE_REGEX = r"^(.+)[_](\d{4}|\d{6}|\d{8}|\d{10})$"
BQ_GET_LATEST_SHARD = """
SELECT SUBSTR(MAX(table_id), LENGTH('{table}_') + 1) as max_shard
FROM `{project_id}.{schema}.__TABLES_SUMMARY__`
WHERE table_id LIKE '{table}%'
""".strip()
# The existing implementation of this method can be found here:
# https://github.com/googleapis/python-bigquery-sqlalchemy/blob/e0f1496c99dd627e0ed04a0c4e89ca5b14611be2/pybigquery/sqlalchemy_bigquery.py#L967-L974.
# The existing implementation does not use the schema parameter and hence
@ -165,6 +205,16 @@ class BigQueryCredential(ConfigModel):
)
@dataclass
class BigQueryPartitionColumn:
table_catalog: str
table_schema: str
table_name: str
column_name: str
data_type: str
partition_id: str
def create_credential_temp_file(credential: BigQueryCredential) -> str:
with tempfile.NamedTemporaryFile(delete=False) as fp:
cred_json = json.dumps(credential.dict(), indent=4, separators=(",", ": "))
@ -219,11 +269,19 @@ class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig):
class BigQuerySource(SQLAlchemySource):
config: BigQueryConfig
partiton_columns: Dict[str, Dict[str, BigQueryPartitionColumn]] = dict()
maximum_shard_ids: Dict[str, str] = dict()
lineage_metadata: Optional[Dict[str, Set[str]]] = None
def __init__(self, config, ctx):
super().__init__(config, ctx, "bigquery")
def get_db_name(self, inspector: Inspector = None) -> str:
if self.config.project_id:
return self.config.project_id
else:
return self._get_project_id(inspector)
def _compute_big_query_lineage(self) -> None:
if self.config.include_table_lineage:
if self.config.use_exported_bigquery_audit_metadata:
@ -408,6 +466,133 @@ class BigQuerySource(SQLAlchemySource):
lineage_map[destination_table_str].add(ref_table_str)
return lineage_map
def get_latest_partitions_for_schema(self, schema: str) -> None:
url = self.config.get_sql_alchemy_url()
engine = create_engine(url, **self.config.options)
with engine.connect() as con:
inspector = inspect(con)
sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format(
project_id=self.get_db_name(inspector),
schema=schema,
)
result = con.execute(sql)
partitions = {}
for row in result:
partition = BigQueryPartitionColumn(**row)
partitions[partition.table_name] = partition
self.partiton_columns[schema] = partitions
def get_latest_partition(
self, schema: str, table: str
) -> Optional[BigQueryPartitionColumn]:
if schema not in self.partiton_columns:
self.get_latest_partitions_for_schema(schema)
return self.partiton_columns[schema].get(table)
def get_shard_from_table(self, table: str) -> Tuple[str, Optional[str]]:
match = re.search(SHARDED_TABLE_REGEX, table, re.IGNORECASE)
if match:
table_name = match.group(1)
shard = match.group(2)
return table_name, shard
return table, None
def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool:
# Getting latest shard from table names
# https://cloud.google.com/bigquery/docs/partitioned-tables#dt_partition_shard
table_name, shard = self.get_shard_from_table(table)
if shard:
logger.debug(f"{table_name} is sharded and shard id is: {shard}")
url = self.config.get_sql_alchemy_url()
engine = create_engine(url, **self.config.options)
if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids:
with engine.connect() as con:
sql = BQ_GET_LATEST_SHARD.format(
project_id=project_id,
schema=schema,
table=table_name,
)
result = con.execute(sql)
for row in result:
max_shard = row["max_shard"]
self.maximum_shard_ids[
f"{project_id}.{schema}.{table_name}"
] = max_shard
logger.debug(f"Max shard for table {table_name} is {max_shard}")
return (
self.maximum_shard_ids[f"{project_id}.{schema}.{table_name}"] == shard
)
else:
return True
def generate_partition_profiler_query(
self, schema: str, table: str
) -> Tuple[Optional[str], Optional[str]]:
"""
Method returns partition id if table is partitioned or sharded and generate custom partition query for
partitioned table.
See more about partitioned tables at https://cloud.google.com/bigquery/docs/partitioned-tables
"""
partition = self.get_latest_partition(schema, table)
if partition:
partition_ts: Union[datetime.datetime, datetime.date]
logger.debug(f"{table} is partitioned and partition column is {partition}")
if partition.data_type in ("TIMESTAMP", "DATETIME"):
partition_ts = parser.parse(partition.partition_id)
elif partition.data_type == "DATE":
partition_ts = parser.parse(partition.partition_id).date()
else:
logger.warning(f"Not supported partition type {partition.data_type}")
return None, None
custom_sql = """
SELECT
*
FROM
`{table_catalog}.{table_schema}.{table_name}`
WHERE
{column_name} = '{partition_id}'
""".format(
table_catalog=partition.table_catalog,
table_schema=partition.table_schema,
table_name=partition.table_name,
column_name=partition.column_name,
partition_id=partition_ts,
)
return (partition.partition_id, custom_sql)
else:
# For sharded table we want to get the partition id but not needed to generate custom query
table, shard = self.get_shard_from_table(table)
if shard:
return shard, None
return None, None
def is_dataset_eligable_profiling(
self, dataset_name: str, sql_config: SQLAlchemyConfig
) -> bool:
"""
Method overrides default profiling filter which checks profiling eligibility based on allow-deny pattern.
This one also don't profile those sharded tables which are not the latest.
"""
if not super().is_dataset_eligable_profiling(dataset_name, sql_config):
return False
(project_id, schema, table) = dataset_name.split(".")
if not self.is_latest_shard(project_id=project_id, table=table, schema=schema):
logger.warning(
f"{dataset_name} is sharded but not the latest shard, skipping..."
)
return False
return True
@classmethod
def create(cls, config_dict, ctx):
config = BigQueryConfig.parse_obj(config_dict)
@ -486,11 +671,19 @@ class BigQuerySource(SQLAlchemySource):
return mcp
return None
def prepare_profiler_args(self, schema: str, table: str) -> dict:
def prepare_profiler_args(
self,
schema: str,
table: str,
partition: Optional[str],
custom_sql: Optional[str] = None,
) -> dict:
self.config: BigQueryConfig
return dict(
schema=self.config.project_id,
table=f"{schema}.{table}",
partition=partition,
custom_sql=custom_sql,
)
@staticmethod
@ -500,6 +693,18 @@ class BigQuerySource(SQLAlchemySource):
project_id = connection.connection._client.project
return project_id
def normalise_dataset_name(self, dataset_name: str) -> str:
(project_id, schema, table) = dataset_name.split(".")
trimmed_table_name = (
BigQueryTableRef.from_spec_obj(
{"projectId": project_id, "datasetId": schema, "tableId": table}
)
.remove_extras()
.table
)
return f"{project_id}.{schema}.{trimmed_table_name}"
def get_identifier(
self,
*,
@ -510,14 +715,10 @@ class BigQuerySource(SQLAlchemySource):
) -> str:
assert inspector
project_id = self._get_project_id(inspector)
trimmed_table_name = (
BigQueryTableRef.from_spec_obj(
{"projectId": project_id, "datasetId": schema, "tableId": entity}
)
.remove_extras()
.table
)
return f"{project_id}.{schema}.{trimmed_table_name}"
table_name = BigQueryTableRef.from_spec_obj(
{"projectId": project_id, "datasetId": schema, "tableId": entity}
).table
return f"{project_id}.{schema}.{table_name}"
def standardize_schema_table_names(
self, schema: str, entity: str

View File

@ -600,6 +600,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
fk_dict["name"], foreign_fields, source_fields, foreign_dataset
)
def normalise_dataset_name(self, dataset_name: str) -> str:
return dataset_name
def loop_tables( # noqa: C901
self,
inspector: Inspector,
@ -614,6 +617,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
dataset_name = self.get_identifier(
schema=schema, entity=table, inspector=inspector
)
dataset_name = self.normalise_dataset_name(dataset_name)
if dataset_name not in tables_seen:
tables_seen.add(dataset_name)
else:
@ -793,6 +799,8 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
dataset_name = self.get_identifier(
schema=schema, entity=view, inspector=inspector
)
dataset_name = self.normalise_dataset_name(dataset_name)
self.report.report_entity_scanned(dataset_name, ent_type="view")
if not sql_config.view_pattern.allowed(dataset_name):
@ -891,6 +899,18 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
conn=inspector.bind, report=self.report, config=self.config.profiling
)
# Override if needed
def generate_partition_profiler_query(
self, schema: str, table: str
) -> Tuple[Optional[str], Optional[str]]:
return None, None
# Override if you want to do additional checks
def is_dataset_eligable_profiling(
self, dataset_name: str, sql_config: SQLAlchemyConfig
) -> bool:
return sql_config.profile_pattern.allowed(dataset_name)
def loop_profiler_requests(
self,
inspector: Inspector,
@ -899,6 +919,8 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
) -> Iterable["GEProfilerRequest"]:
from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest
tables_seen: Set[str] = set()
for table in inspector.get_table_names(schema):
schema, table = self.standardize_schema_table_names(
schema=schema, entity=table
@ -906,15 +928,31 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
dataset_name = self.get_identifier(
schema=schema, entity=table, inspector=inspector
)
if not sql_config.profile_pattern.allowed(dataset_name):
if not self.is_dataset_eligable_profiling(dataset_name, sql_config):
self.report.report_dropped(f"profile of {dataset_name}")
continue
dataset_name = self.normalise_dataset_name(dataset_name)
if dataset_name not in tables_seen:
tables_seen.add(dataset_name)
else:
logger.debug(f"{dataset_name} has already been seen, skipping...")
continue
(partition, custom_sql) = self.generate_partition_profiler_query(
schema, table
)
self.report.report_entity_profiled(dataset_name)
yield GEProfilerRequest(
pretty_name=dataset_name,
batch_kwargs=self.prepare_profiler_args(schema=schema, table=table),
batch_kwargs=self.prepare_profiler_args(
schema=schema,
table=table,
partition=partition,
custom_sql=custom_sql,
),
)
def loop_profiler(
@ -944,10 +982,15 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
yield wu
def prepare_profiler_args(self, schema: str, table: str) -> dict:
def prepare_profiler_args(
self,
schema: str,
table: str,
partition: Optional[str],
custom_sql: Optional[str] = None,
) -> dict:
return dict(
schema=schema,
table=table,
schema=schema, table=table, partition=partition, custom_sql=custom_sql
)
def get_report(self):