mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-31 21:03:03 +00:00
feat(ingest/bigquery): avoid creating/deleting tables for profiling (#6578)
This commit is contained in:
parent
f63c3e5222
commit
6fe9ad4fbb
@ -78,24 +78,8 @@ Note: Since bigquery source also supports dataset level lineage, the auth client
|
||||
|
||||
### Profiling Details
|
||||
|
||||
Profiling can profile normal/partitioned and sharded tables as well but due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables.
|
||||
|
||||
If limit/offset parameter is set or partitioning partitioned or sharded table Great Expectation (the profiling framework we use) needs to create temporary
|
||||
views. By default these views are created in the schema where the profiled table is but you can control to create all these
|
||||
tables into a predefined schema by setting `profiling.bigquery_temp_table_schema` property.
|
||||
Temporary tables are removed after profiling.
|
||||
|
||||
```yaml
|
||||
profiling:
|
||||
enabled: true
|
||||
bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created
|
||||
```
|
||||
|
||||
:::note
|
||||
|
||||
Due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables.
|
||||
You can set partition explicitly with `partition.partition_datetime` property if you want. (partition will be applied to all partitioned tables)
|
||||
:::
|
||||
For performance reasons, we only profile the latest partition for partitioned tables and the latest shard for sharded tables.
|
||||
You can set partition explicitly with `partition.partition_datetime` property if you want, though note that partition config will be applied to all partitioned tables.
|
||||
|
||||
### Working with multi-project GCP setups
|
||||
|
||||
|
@ -21,8 +21,6 @@ There are two important concepts to understand and identify:
|
||||
| `bigquery.jobs.list` | Manage the queries that the service account has sent. *This only needs for the extractor project where the service account belongs* | |
|
||||
| `bigquery.readsessions.create` | Create a session for streaming large results. *This only needs for the extractor project where the service account belongs* | |
|
||||
| `bigquery.readsessions.getData` | Get data from the read session. *This only needs for the extractor project where the service account belongs* |
|
||||
| `bigquery.tables.create` | Create temporary tables when profiling tables. Tip: Use the `profiling.bigquery_temp_table_schema` to ensure that all temp tables (across multiple projects) are created in this project under a specific dataset. | Profiling | |
|
||||
| `bigquery.tables.delete` | Delete temporary tables when profiling tables. Tip: Use the `profiling.bigquery_temp_table_schema` to ensure that all temp tables (across multiple projects) are created in this project under a specific dataset. | Profiling | |
|
||||
2. Grant the following permissions to the Service Account on every project where you would like to extract metadata from
|
||||
|
||||
:::info
|
||||
@ -41,11 +39,6 @@ If you have multiple projects in your BigQuery setup, the role should be granted
|
||||
| `logging.logEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
|
||||
| `logging.privateLogEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
|
||||
| `bigquery.tables.getData` | Access table data to extract storage size, last updated at, data profiles etc. | Profiling | |
|
||||
| `bigquery.tables.create` | [Optional] Only needed if not using the `profiling.bigquery_temp_table_schema` config option. | Profiling | |
|
||||
| `bigquery.tables.delete` | [Optional] Only needed if not using the `profiling.bigquery_temp_table_schema` config option. | Profiling | |
|
||||
|
||||
The profiler creates temporary tables to profile partitioned/sharded tables and that is why it needs table create/delete privilege.
|
||||
Use `profiling.bigquery_temp_table_schema` to restrict to one specific dataset the create/delete permission
|
||||
|
||||
#### Create a service account in the Extractor Project
|
||||
|
||||
@ -100,24 +93,8 @@ Note: Since bigquery source also supports dataset level lineage, the auth client
|
||||
|
||||
### Profiling Details
|
||||
|
||||
Profiling can profile normal/partitioned and sharded tables as well but due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables.
|
||||
|
||||
If limit/offset parameter is set or partitioning partitioned or sharded table Great Expectation (the profiling framework we use) needs to create temporary
|
||||
views. By default, these views are created in the schema where the profiled table is but you can control to create all these
|
||||
tables into a predefined schema by setting `profiling.bigquery_temp_table_schema` property.
|
||||
Temporary tables are removed after profiling.
|
||||
|
||||
```yaml
|
||||
profiling:
|
||||
enabled: true
|
||||
bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created
|
||||
```
|
||||
|
||||
:::note
|
||||
|
||||
Due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables.
|
||||
You can set partition explicitly with `partition.partition_datetime` property if you want. (partition will be applied to all partitioned tables)
|
||||
:::
|
||||
For performance reasons, we only profile the latest partition for partitioned tables and the latest shard for sharded tables.
|
||||
You can set partition explicitly with `partition.partition_datetime` property if you want, though note that partition config will be applied to all partitioned tables.
|
||||
|
||||
### Caveats
|
||||
|
||||
|
@ -20,7 +20,6 @@ source:
|
||||
#end_time: 2023-12-15T20:08:23.091Z
|
||||
#profiling:
|
||||
# enabled: true
|
||||
# bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created-for-profiling
|
||||
# turn_off_expensive_profiling_metrics: false
|
||||
# query_combiner_enabled: true
|
||||
# max_number_of_fields_to_profile: 8
|
||||
|
@ -10,10 +10,20 @@ import threading
|
||||
import traceback
|
||||
import unittest.mock
|
||||
import uuid
|
||||
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
import sqlalchemy as sa
|
||||
from great_expectations import __version__ as ge_version
|
||||
from great_expectations.core.util import convert_to_json_serializable
|
||||
from great_expectations.data_context import BaseDataContext
|
||||
from great_expectations.data_context.types.base import (
|
||||
@ -30,12 +40,11 @@ from sqlalchemy.engine import Connection, Engine
|
||||
from sqlalchemy.exc import ProgrammingError
|
||||
from typing_extensions import Concatenate, ParamSpec
|
||||
|
||||
from datahub.configuration.common import ConfigurationError
|
||||
from datahub.emitter.mce_builder import get_sys_time
|
||||
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
|
||||
from datahub.ingestion.source.profiling.common import (
|
||||
Cardinality,
|
||||
_convert_to_cardinality,
|
||||
convert_to_cardinality,
|
||||
)
|
||||
from datahub.ingestion.source.sql.sql_common import SQLSourceReport
|
||||
from datahub.metadata.schema_classes import (
|
||||
@ -315,7 +324,7 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
|
||||
|
||||
column_spec.unique_count = unique_count
|
||||
|
||||
column_spec.cardinality = _convert_to_cardinality(unique_count, pct_unique)
|
||||
column_spec.cardinality = convert_to_cardinality(unique_count, pct_unique)
|
||||
|
||||
@_run_with_query_combiner
|
||||
def _get_dataset_rows(self, dataset_profile: DatasetProfileClass) -> None:
|
||||
@ -795,16 +804,6 @@ class DatahubGEProfiler:
|
||||
|
||||
self.report.report_from_query_combiner(query_combiner.report)
|
||||
|
||||
def _is_legacy_ge_temp_table_creation(self) -> bool:
|
||||
legacy_ge_bq_temp_table_creation: bool = False
|
||||
(major, minor, patch) = ge_version.split(".")
|
||||
if int(major) == 0 and (
|
||||
int(minor) < 15 or (int(minor) == 15 and int(patch) < 3)
|
||||
):
|
||||
legacy_ge_bq_temp_table_creation = True
|
||||
|
||||
return legacy_ge_bq_temp_table_creation
|
||||
|
||||
def _generate_profile_from_request(
|
||||
self,
|
||||
query_combiner: SQLAlchemyQueryCombiner,
|
||||
@ -820,16 +819,6 @@ class DatahubGEProfiler:
|
||||
**request.batch_kwargs,
|
||||
)
|
||||
|
||||
def _drop_bigquery_temp_table(self, bigquery_temp_table: str) -> None:
|
||||
try:
|
||||
with self.base_engine.connect() as connection:
|
||||
connection.execute(f"drop view if exists `{bigquery_temp_table}`")
|
||||
logger.debug(f"Temp table {bigquery_temp_table} was dropped.")
|
||||
except Exception:
|
||||
logger.warning(
|
||||
f"Unable to delete bigquery temporary table: {bigquery_temp_table}"
|
||||
)
|
||||
|
||||
def _drop_trino_temp_table(self, temp_dataset: Dataset) -> None:
|
||||
schema = temp_dataset._table.schema
|
||||
table = temp_dataset._table.name
|
||||
@ -855,7 +844,6 @@ class DatahubGEProfiler:
|
||||
logger.debug(
|
||||
f"Received single profile request for {pretty_name} for {schema}, {table}, {custom_sql}"
|
||||
)
|
||||
bigquery_temp_table: Optional[str] = None
|
||||
|
||||
ge_config = {
|
||||
"schema": schema,
|
||||
@ -865,46 +853,90 @@ class DatahubGEProfiler:
|
||||
**kwargs,
|
||||
}
|
||||
|
||||
# We have to create temporary tables if offset or limit or custom sql is set on Bigquery
|
||||
if custom_sql or self.config.limit or self.config.offset:
|
||||
if profiler_args is not None:
|
||||
temp_table_db = profiler_args.get("temp_table_db", schema)
|
||||
if platform is not None and platform == "bigquery":
|
||||
ge_config["schema"] = None
|
||||
bigquery_temp_table: Optional[str] = None
|
||||
if platform == "bigquery" and (
|
||||
custom_sql or self.config.limit or self.config.offset
|
||||
):
|
||||
# On BigQuery, we need to bypass GE's mechanism for creating temporary tables because
|
||||
# it requires create/delete table permissions.
|
||||
import google.cloud.bigquery.job.query
|
||||
from google.cloud.bigquery.dbapi.cursor import Cursor as BigQueryCursor
|
||||
|
||||
if self.config.bigquery_temp_table_schema:
|
||||
num_parts = self.config.bigquery_temp_table_schema.split(".")
|
||||
# If we only have 1 part that means the project_id is missing from the table name and we add it
|
||||
if len(num_parts) == 1:
|
||||
bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
|
||||
elif len(num_parts) == 2:
|
||||
bigquery_temp_table = f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
|
||||
raw_connection = self.base_engine.raw_connection()
|
||||
try:
|
||||
cursor: "BigQueryCursor" = cast(
|
||||
"BigQueryCursor", raw_connection.cursor()
|
||||
)
|
||||
if custom_sql is not None:
|
||||
# Note that limit and offset are not supported for custom SQL.
|
||||
bq_sql = custom_sql
|
||||
else:
|
||||
raise ConfigurationError(
|
||||
f"bigquery_temp_table_schema should be either project.dataset or dataset format but it was: {self.config.bigquery_temp_table_schema}"
|
||||
)
|
||||
else:
|
||||
assert table
|
||||
table_parts = table.split(".")
|
||||
if len(table_parts) == 2:
|
||||
bigquery_temp_table = (
|
||||
f"{temp_table_db}.{table_parts[0]}.ge-temp-{uuid.uuid4()}"
|
||||
)
|
||||
bq_sql = f"SELECT * FROM `{table}`"
|
||||
if self.config.limit:
|
||||
bq_sql += f" LIMIT {self.config.limit}"
|
||||
if self.config.offset:
|
||||
bq_sql += f" OFFSET {self.config.offset}"
|
||||
|
||||
# With this pr there is no option anymore to set the bigquery temp table:
|
||||
# https://github.com/great-expectations/great_expectations/pull/4925
|
||||
# This dirty hack to make it possible to control the temp table to use in Bigquery
|
||||
# otherwise it will expect dataset_id in the connection url which is not option in our case
|
||||
# as we batch these queries.
|
||||
# Currently only with this option is possible to control the temp table which is created:
|
||||
# https://github.com/great-expectations/great_expectations/blob/7e53b615c36a53f78418ce46d6bc91a7011163c0/great_expectations/datasource/sqlalchemy_datasource.py#L397
|
||||
if self._is_legacy_ge_temp_table_creation():
|
||||
ge_config["bigquery_temp_table"] = bigquery_temp_table
|
||||
else:
|
||||
ge_config["snowflake_transient_table"] = bigquery_temp_table
|
||||
cursor.execute(bq_sql)
|
||||
|
||||
if custom_sql is not None:
|
||||
ge_config["query"] = custom_sql
|
||||
# Great Expectations batch v2 API, which is the one we're using, requires
|
||||
# a concrete table name against which profiling is executed. Normally, GE
|
||||
# creates a table with an expiry time of 24 hours. However, we don't want the
|
||||
# temporary tables to stick around that long, so we'd also have to delete them
|
||||
# ourselves. As such, the profiler required create and delete table permissions
|
||||
# on BigQuery.
|
||||
#
|
||||
# It turns out that we can (ab)use the BigQuery cached results feature
|
||||
# to avoid creating temporary tables ourselves. For almost all queries, BigQuery
|
||||
# will store the results in a temporary, cached results table when an explicit
|
||||
# destination table is not provided. These tables are pretty easy to identify
|
||||
# because they live in "anonymous datasets" and have a name that looks like
|
||||
# "project-id._d60e97aec7f471046a960419adb6d44e98300db7.anon10774d0ea85fd20fe9671456c5c53d5f1b85e1b17bedb232dfce91661a219ee3"
|
||||
# These tables are per-user and per-project, so there's no risk of permissions escalation.
|
||||
# As per the docs, the cached results tables typically have a lifetime of 24 hours,
|
||||
# which should be plenty for our purposes.
|
||||
# See https://cloud.google.com/bigquery/docs/cached-results for more details.
|
||||
#
|
||||
# The code below extracts the name of the cached results table from the query job
|
||||
# and points GE to that table for profiling.
|
||||
#
|
||||
# Risks:
|
||||
# 1. If the query results are larger than the maximum response size, BigQuery will
|
||||
# not cache the results. According to the docs https://cloud.google.com/bigquery/quotas,
|
||||
# the maximum response size is 10 GB compressed.
|
||||
# 2. The cache lifetime of 24 hours is "best-effort" and hence not guaranteed.
|
||||
# 3. Tables with column-level security may not be cached, and tables with row-level
|
||||
# security will not be cached.
|
||||
# 4. BigQuery "discourages" using cached results directly, but notes that
|
||||
# the current semantics do allow it.
|
||||
#
|
||||
# The better long-term solution would be to use a subquery avoid this whole
|
||||
# temporary table dance. However, that would require either a) upgrading to
|
||||
# use GE's batch v3 API or b) bypassing GE altogether.
|
||||
|
||||
query_job: Optional[
|
||||
"google.cloud.bigquery.job.query.QueryJob"
|
||||
] = cursor._query_job
|
||||
assert query_job
|
||||
temp_destination_table = query_job.destination
|
||||
bigquery_temp_table = f"{temp_destination_table.project}.{temp_destination_table.dataset_id}.{temp_destination_table.table_id}"
|
||||
finally:
|
||||
raw_connection.close()
|
||||
|
||||
if platform == "bigquery":
|
||||
if bigquery_temp_table:
|
||||
ge_config["table"] = bigquery_temp_table
|
||||
ge_config["schema"] = None
|
||||
ge_config["limit"] = None
|
||||
ge_config["offset"] = None
|
||||
|
||||
bigquery_temp_table = None
|
||||
|
||||
assert not ge_config["limit"]
|
||||
assert not ge_config["offset"]
|
||||
else:
|
||||
if custom_sql is not None:
|
||||
ge_config["query"] = custom_sql
|
||||
|
||||
with self._ge_context() as ge_context, PerfTimer() as timer:
|
||||
try:
|
||||
@ -944,8 +976,6 @@ class DatahubGEProfiler:
|
||||
finally:
|
||||
if self.base_engine.engine.name == "trino":
|
||||
self._drop_trino_temp_table(batch)
|
||||
elif bigquery_temp_table:
|
||||
self._drop_bigquery_temp_table(bigquery_temp_table)
|
||||
|
||||
def _get_ge_dataset(
|
||||
self,
|
||||
|
@ -1,4 +1,5 @@
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
@ -14,6 +15,8 @@ _PROFILING_FLAGS_TO_REPORT = {
|
||||
# all include_field_ flags are reported.
|
||||
}
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GEProfilingConfig(ConfigModel):
|
||||
enabled: bool = Field(
|
||||
@ -128,15 +131,22 @@ class GEProfilingConfig(ConfigModel):
|
||||
catch_exceptions: bool = Field(default=True, description="")
|
||||
|
||||
partition_profiling_enabled: bool = Field(default=True, description="")
|
||||
bigquery_temp_table_schema: Optional[str] = Field(
|
||||
default=None,
|
||||
description="On bigquery for profiling partitioned tables needs to create temporary views. You have to define a dataset where these will be created. Views will be cleaned up after profiler runs. (Great expectation tech details about this (https://legacy.docs.greatexpectations.io/en/0.9.0/reference/integrations/bigquery.html#custom-queries-with-sql-datasource).",
|
||||
)
|
||||
partition_datetime: Optional[datetime.datetime] = Field(
|
||||
default=None,
|
||||
description="For partitioned datasets profile only the partition which matches the datetime or profile the latest one if not set. Only Bigquery supports this.",
|
||||
)
|
||||
|
||||
@pydantic.root_validator(pre=True)
|
||||
def deprecate_bigquery_temp_table_schema(cls, values):
|
||||
# TODO: Update docs to remove mention of this field.
|
||||
if "bigquery_temp_table_schema" in values:
|
||||
logger.warning(
|
||||
"The bigquery_temp_table_schema config is no longer required. Please remove it from your config.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
del values["bigquery_temp_table_schema"]
|
||||
return values
|
||||
|
||||
@pydantic.root_validator()
|
||||
def ensure_field_level_settings_are_normalized(
|
||||
cls: "GEProfilingConfig", values: Dict[str, Any]
|
||||
|
@ -13,7 +13,7 @@ class Cardinality(Enum):
|
||||
UNIQUE = 7
|
||||
|
||||
|
||||
def _convert_to_cardinality(
|
||||
def convert_to_cardinality(
|
||||
unique_count: Optional[int], pct_unique: Optional[float]
|
||||
) -> Optional[Cardinality]:
|
||||
"""
|
||||
|
@ -37,7 +37,7 @@ from datahub.configuration.common import AllowDenyPattern, ConfigModel
|
||||
from datahub.emitter.mce_builder import get_sys_time
|
||||
from datahub.ingestion.source.profiling.common import (
|
||||
Cardinality,
|
||||
_convert_to_cardinality,
|
||||
convert_to_cardinality,
|
||||
)
|
||||
from datahub.ingestion.source.s3.report import DataLakeSourceReport
|
||||
from datahub.metadata.schema_classes import (
|
||||
@ -303,7 +303,7 @@ class _SingleTableProfiler:
|
||||
)
|
||||
|
||||
column_spec.type_ = column_types[column]
|
||||
column_spec.cardinality = _convert_to_cardinality(
|
||||
column_spec.cardinality = convert_to_cardinality(
|
||||
column_distinct_counts[column],
|
||||
column_null_fractions[column],
|
||||
)
|
||||
|
@ -961,7 +961,7 @@ WHERE
|
||||
)
|
||||
|
||||
def get_profile_args(self) -> Dict:
|
||||
return {"temp_table_db": self.config.project_id}
|
||||
return {}
|
||||
|
||||
def is_dataset_eligible_for_profiling(
|
||||
self,
|
||||
|
@ -97,16 +97,6 @@ class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig)
|
||||
def validate_that_bigquery_audit_metadata_datasets_is_correctly_configured(
|
||||
cls, values: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
profiling = values.get("profiling")
|
||||
if (
|
||||
values.get("storage_project_id")
|
||||
and profiling is not None
|
||||
and profiling.enabled
|
||||
and not profiling.bigquery_temp_table_schema
|
||||
):
|
||||
raise ConfigurationError(
|
||||
"If storage project is being used with profiling then bigquery_temp_table_schema needs to be set to a dataset in the compute project"
|
||||
)
|
||||
if (
|
||||
values.get("use_exported_bigquery_audit_metadata")
|
||||
and not values.get("use_v2_audit_metadata")
|
||||
@ -115,7 +105,6 @@ class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig)
|
||||
raise ConfigurationError(
|
||||
"bigquery_audit_metadata_datasets must be specified if using exported audit metadata. Otherwise set use_v2_audit_metadata to True."
|
||||
)
|
||||
pass
|
||||
return values
|
||||
|
||||
@pydantic.validator("platform")
|
||||
|
Loading…
x
Reference in New Issue
Block a user