diff --git a/metadata-ingestion/docs/sources/bigquery/bigquery-legacy_pre.md b/metadata-ingestion/docs/sources/bigquery/bigquery-legacy_pre.md index 39e8db0ee8..2d433b82e1 100644 --- a/metadata-ingestion/docs/sources/bigquery/bigquery-legacy_pre.md +++ b/metadata-ingestion/docs/sources/bigquery/bigquery-legacy_pre.md @@ -78,24 +78,8 @@ Note: Since bigquery source also supports dataset level lineage, the auth client ### Profiling Details -Profiling can profile normal/partitioned and sharded tables as well but due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables. - -If limit/offset parameter is set or partitioning partitioned or sharded table Great Expectation (the profiling framework we use) needs to create temporary -views. By default these views are created in the schema where the profiled table is but you can control to create all these -tables into a predefined schema by setting `profiling.bigquery_temp_table_schema` property. -Temporary tables are removed after profiling. - -```yaml - profiling: - enabled: true - bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created -``` - -:::note - -Due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables. -You can set partition explicitly with `partition.partition_datetime` property if you want. (partition will be applied to all partitioned tables) -::: +For performance reasons, we only profile the latest partition for partitioned tables and the latest shard for sharded tables. +You can set partition explicitly with `partition.partition_datetime` property if you want, though note that partition config will be applied to all partitioned tables. ### Working with multi-project GCP setups diff --git a/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md b/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md index e60016dade..c204fcb644 100644 --- a/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md +++ b/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md @@ -21,8 +21,6 @@ There are two important concepts to understand and identify: | `bigquery.jobs.list`             | Manage the queries that the service account has sent. *This only needs for the extractor project where the service account belongs* |                                                                                                               | | `bigquery.readsessions.create`   | Create a session for streaming large results. *This only needs for the extractor project where the service account belongs*         |                                                                                                               | | `bigquery.readsessions.getData` | Get data from the read session. *This only needs for the extractor project where the service account belongs*                       | -| `bigquery.tables.create`         | Create temporary tables when profiling tables. Tip: Use the `profiling.bigquery_temp_table_schema` to ensure that all temp tables (across multiple projects) are created in this project under a specific dataset.                 | Profiling                           |                                                                                                                 | -| `bigquery.tables.delete`         | Delete temporary tables when profiling tables. Tip: Use the `profiling.bigquery_temp_table_schema` to ensure that all temp tables (across multiple projects) are created in this project under a specific dataset.                   | Profiling                           |                                                                                                                 | 2. Grant the following permissions to the Service Account on every project where you would like to extract metadata from :::info @@ -41,11 +39,6 @@ If you have multiple projects in your BigQuery setup, the role should be granted | `logging.logEntries.list`       | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) | | `logging.privateLogEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) | | `bigquery.tables.getData`       | Access table data to extract storage size, last updated at, data profiles etc. | Profiling                           |                                                                                                                 | -| `bigquery.tables.create`         | [Optional] Only needed if not using the `profiling.bigquery_temp_table_schema` config option. | Profiling                           |                                                                                                                 | -| `bigquery.tables.delete`         | [Optional] Only needed if not using the `profiling.bigquery_temp_table_schema` config option. | Profiling                           |                                                                                                                 | - -The profiler creates temporary tables to profile partitioned/sharded tables and that is why it needs table create/delete privilege. -Use `profiling.bigquery_temp_table_schema` to restrict to one specific dataset the create/delete permission #### Create a service account in the Extractor Project @@ -100,24 +93,8 @@ Note: Since bigquery source also supports dataset level lineage, the auth client ### Profiling Details -Profiling can profile normal/partitioned and sharded tables as well but due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables. - -If limit/offset parameter is set or partitioning partitioned or sharded table Great Expectation (the profiling framework we use) needs to create temporary -views. By default, these views are created in the schema where the profiled table is but you can control to create all these -tables into a predefined schema by setting `profiling.bigquery_temp_table_schema` property. -Temporary tables are removed after profiling. - -```yaml -profiling: - enabled: true - bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created -``` - -:::note - -Due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables. -You can set partition explicitly with `partition.partition_datetime` property if you want. (partition will be applied to all partitioned tables) -::: +For performance reasons, we only profile the latest partition for partitioned tables and the latest shard for sharded tables. +You can set partition explicitly with `partition.partition_datetime` property if you want, though note that partition config will be applied to all partitioned tables. ### Caveats diff --git a/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml b/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml index 377af98598..84f098fa06 100644 --- a/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml +++ b/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml @@ -20,7 +20,6 @@ source: #end_time: 2023-12-15T20:08:23.091Z #profiling: # enabled: true - # bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created-for-profiling # turn_off_expensive_profiling_metrics: false # query_combiner_enabled: true # max_number_of_fields_to_profile: 8 diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 394b39765c..e6782a92e3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -10,10 +10,20 @@ import threading import traceback import unittest.mock import uuid -from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, + Union, + cast, +) import sqlalchemy as sa -from great_expectations import __version__ as ge_version from great_expectations.core.util import convert_to_json_serializable from great_expectations.data_context import BaseDataContext from great_expectations.data_context.types.base import ( @@ -30,12 +40,11 @@ from sqlalchemy.engine import Connection, Engine from sqlalchemy.exc import ProgrammingError from typing_extensions import Concatenate, ParamSpec -from datahub.configuration.common import ConfigurationError from datahub.emitter.mce_builder import get_sys_time from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.profiling.common import ( Cardinality, - _convert_to_cardinality, + convert_to_cardinality, ) from datahub.ingestion.source.sql.sql_common import SQLSourceReport from datahub.metadata.schema_classes import ( @@ -315,7 +324,7 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase): column_spec.unique_count = unique_count - column_spec.cardinality = _convert_to_cardinality(unique_count, pct_unique) + column_spec.cardinality = convert_to_cardinality(unique_count, pct_unique) @_run_with_query_combiner def _get_dataset_rows(self, dataset_profile: DatasetProfileClass) -> None: @@ -795,16 +804,6 @@ class DatahubGEProfiler: self.report.report_from_query_combiner(query_combiner.report) - def _is_legacy_ge_temp_table_creation(self) -> bool: - legacy_ge_bq_temp_table_creation: bool = False - (major, minor, patch) = ge_version.split(".") - if int(major) == 0 and ( - int(minor) < 15 or (int(minor) == 15 and int(patch) < 3) - ): - legacy_ge_bq_temp_table_creation = True - - return legacy_ge_bq_temp_table_creation - def _generate_profile_from_request( self, query_combiner: SQLAlchemyQueryCombiner, @@ -820,16 +819,6 @@ class DatahubGEProfiler: **request.batch_kwargs, ) - def _drop_bigquery_temp_table(self, bigquery_temp_table: str) -> None: - try: - with self.base_engine.connect() as connection: - connection.execute(f"drop view if exists `{bigquery_temp_table}`") - logger.debug(f"Temp table {bigquery_temp_table} was dropped.") - except Exception: - logger.warning( - f"Unable to delete bigquery temporary table: {bigquery_temp_table}" - ) - def _drop_trino_temp_table(self, temp_dataset: Dataset) -> None: schema = temp_dataset._table.schema table = temp_dataset._table.name @@ -855,7 +844,6 @@ class DatahubGEProfiler: logger.debug( f"Received single profile request for {pretty_name} for {schema}, {table}, {custom_sql}" ) - bigquery_temp_table: Optional[str] = None ge_config = { "schema": schema, @@ -865,46 +853,90 @@ class DatahubGEProfiler: **kwargs, } - # We have to create temporary tables if offset or limit or custom sql is set on Bigquery - if custom_sql or self.config.limit or self.config.offset: - if profiler_args is not None: - temp_table_db = profiler_args.get("temp_table_db", schema) - if platform is not None and platform == "bigquery": - ge_config["schema"] = None + bigquery_temp_table: Optional[str] = None + if platform == "bigquery" and ( + custom_sql or self.config.limit or self.config.offset + ): + # On BigQuery, we need to bypass GE's mechanism for creating temporary tables because + # it requires create/delete table permissions. + import google.cloud.bigquery.job.query + from google.cloud.bigquery.dbapi.cursor import Cursor as BigQueryCursor - if self.config.bigquery_temp_table_schema: - num_parts = self.config.bigquery_temp_table_schema.split(".") - # If we only have 1 part that means the project_id is missing from the table name and we add it - if len(num_parts) == 1: - bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" - elif len(num_parts) == 2: - bigquery_temp_table = f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" + raw_connection = self.base_engine.raw_connection() + try: + cursor: "BigQueryCursor" = cast( + "BigQueryCursor", raw_connection.cursor() + ) + if custom_sql is not None: + # Note that limit and offset are not supported for custom SQL. + bq_sql = custom_sql else: - raise ConfigurationError( - f"bigquery_temp_table_schema should be either project.dataset or dataset format but it was: {self.config.bigquery_temp_table_schema}" - ) - else: - assert table - table_parts = table.split(".") - if len(table_parts) == 2: - bigquery_temp_table = ( - f"{temp_table_db}.{table_parts[0]}.ge-temp-{uuid.uuid4()}" - ) + bq_sql = f"SELECT * FROM `{table}`" + if self.config.limit: + bq_sql += f" LIMIT {self.config.limit}" + if self.config.offset: + bq_sql += f" OFFSET {self.config.offset}" - # With this pr there is no option anymore to set the bigquery temp table: - # https://github.com/great-expectations/great_expectations/pull/4925 - # This dirty hack to make it possible to control the temp table to use in Bigquery - # otherwise it will expect dataset_id in the connection url which is not option in our case - # as we batch these queries. - # Currently only with this option is possible to control the temp table which is created: - # https://github.com/great-expectations/great_expectations/blob/7e53b615c36a53f78418ce46d6bc91a7011163c0/great_expectations/datasource/sqlalchemy_datasource.py#L397 - if self._is_legacy_ge_temp_table_creation(): - ge_config["bigquery_temp_table"] = bigquery_temp_table - else: - ge_config["snowflake_transient_table"] = bigquery_temp_table + cursor.execute(bq_sql) - if custom_sql is not None: - ge_config["query"] = custom_sql + # Great Expectations batch v2 API, which is the one we're using, requires + # a concrete table name against which profiling is executed. Normally, GE + # creates a table with an expiry time of 24 hours. However, we don't want the + # temporary tables to stick around that long, so we'd also have to delete them + # ourselves. As such, the profiler required create and delete table permissions + # on BigQuery. + # + # It turns out that we can (ab)use the BigQuery cached results feature + # to avoid creating temporary tables ourselves. For almost all queries, BigQuery + # will store the results in a temporary, cached results table when an explicit + # destination table is not provided. These tables are pretty easy to identify + # because they live in "anonymous datasets" and have a name that looks like + # "project-id._d60e97aec7f471046a960419adb6d44e98300db7.anon10774d0ea85fd20fe9671456c5c53d5f1b85e1b17bedb232dfce91661a219ee3" + # These tables are per-user and per-project, so there's no risk of permissions escalation. + # As per the docs, the cached results tables typically have a lifetime of 24 hours, + # which should be plenty for our purposes. + # See https://cloud.google.com/bigquery/docs/cached-results for more details. + # + # The code below extracts the name of the cached results table from the query job + # and points GE to that table for profiling. + # + # Risks: + # 1. If the query results are larger than the maximum response size, BigQuery will + # not cache the results. According to the docs https://cloud.google.com/bigquery/quotas, + # the maximum response size is 10 GB compressed. + # 2. The cache lifetime of 24 hours is "best-effort" and hence not guaranteed. + # 3. Tables with column-level security may not be cached, and tables with row-level + # security will not be cached. + # 4. BigQuery "discourages" using cached results directly, but notes that + # the current semantics do allow it. + # + # The better long-term solution would be to use a subquery avoid this whole + # temporary table dance. However, that would require either a) upgrading to + # use GE's batch v3 API or b) bypassing GE altogether. + + query_job: Optional[ + "google.cloud.bigquery.job.query.QueryJob" + ] = cursor._query_job + assert query_job + temp_destination_table = query_job.destination + bigquery_temp_table = f"{temp_destination_table.project}.{temp_destination_table.dataset_id}.{temp_destination_table.table_id}" + finally: + raw_connection.close() + + if platform == "bigquery": + if bigquery_temp_table: + ge_config["table"] = bigquery_temp_table + ge_config["schema"] = None + ge_config["limit"] = None + ge_config["offset"] = None + + bigquery_temp_table = None + + assert not ge_config["limit"] + assert not ge_config["offset"] + else: + if custom_sql is not None: + ge_config["query"] = custom_sql with self._ge_context() as ge_context, PerfTimer() as timer: try: @@ -944,8 +976,6 @@ class DatahubGEProfiler: finally: if self.base_engine.engine.name == "trino": self._drop_trino_temp_table(batch) - elif bigquery_temp_table: - self._drop_bigquery_temp_table(bigquery_temp_table) def _get_ge_dataset( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index d06bb9d5d2..576d4de8c2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -1,4 +1,5 @@ import datetime +import logging import os from typing import Any, Dict, List, Optional @@ -14,6 +15,8 @@ _PROFILING_FLAGS_TO_REPORT = { # all include_field_ flags are reported. } +logger = logging.getLogger(__name__) + class GEProfilingConfig(ConfigModel): enabled: bool = Field( @@ -128,15 +131,22 @@ class GEProfilingConfig(ConfigModel): catch_exceptions: bool = Field(default=True, description="") partition_profiling_enabled: bool = Field(default=True, description="") - bigquery_temp_table_schema: Optional[str] = Field( - default=None, - description="On bigquery for profiling partitioned tables needs to create temporary views. You have to define a dataset where these will be created. Views will be cleaned up after profiler runs. (Great expectation tech details about this (https://legacy.docs.greatexpectations.io/en/0.9.0/reference/integrations/bigquery.html#custom-queries-with-sql-datasource).", - ) partition_datetime: Optional[datetime.datetime] = Field( default=None, description="For partitioned datasets profile only the partition which matches the datetime or profile the latest one if not set. Only Bigquery supports this.", ) + @pydantic.root_validator(pre=True) + def deprecate_bigquery_temp_table_schema(cls, values): + # TODO: Update docs to remove mention of this field. + if "bigquery_temp_table_schema" in values: + logger.warning( + "The bigquery_temp_table_schema config is no longer required. Please remove it from your config.", + DeprecationWarning, + ) + del values["bigquery_temp_table_schema"] + return values + @pydantic.root_validator() def ensure_field_level_settings_are_normalized( cls: "GEProfilingConfig", values: Dict[str, Any] diff --git a/metadata-ingestion/src/datahub/ingestion/source/profiling/common.py b/metadata-ingestion/src/datahub/ingestion/source/profiling/common.py index 463bfed0a7..b54f0e02fc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/profiling/common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/profiling/common.py @@ -13,7 +13,7 @@ class Cardinality(Enum): UNIQUE = 7 -def _convert_to_cardinality( +def convert_to_cardinality( unique_count: Optional[int], pct_unique: Optional[float] ) -> Optional[Cardinality]: """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py b/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py index 9f534d77c1..a8822fca58 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py @@ -37,7 +37,7 @@ from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.emitter.mce_builder import get_sys_time from datahub.ingestion.source.profiling.common import ( Cardinality, - _convert_to_cardinality, + convert_to_cardinality, ) from datahub.ingestion.source.s3.report import DataLakeSourceReport from datahub.metadata.schema_classes import ( @@ -303,7 +303,7 @@ class _SingleTableProfiler: ) column_spec.type_ = column_types[column] - column_spec.cardinality = _convert_to_cardinality( + column_spec.cardinality = convert_to_cardinality( column_distinct_counts[column], column_null_fractions[column], ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index 0613fda459..e9a82b4182 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -961,7 +961,7 @@ WHERE ) def get_profile_args(self) -> Dict: - return {"temp_table_db": self.config.project_id} + return {} def is_dataset_eligible_for_profiling( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py index 36e7f48b12..3782d26ea7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py @@ -97,16 +97,6 @@ class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig) def validate_that_bigquery_audit_metadata_datasets_is_correctly_configured( cls, values: Dict[str, Any] ) -> Dict[str, Any]: - profiling = values.get("profiling") - if ( - values.get("storage_project_id") - and profiling is not None - and profiling.enabled - and not profiling.bigquery_temp_table_schema - ): - raise ConfigurationError( - "If storage project is being used with profiling then bigquery_temp_table_schema needs to be set to a dataset in the compute project" - ) if ( values.get("use_exported_bigquery_audit_metadata") and not values.get("use_v2_audit_metadata") @@ -115,7 +105,6 @@ class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig) raise ConfigurationError( "bigquery_audit_metadata_datasets must be specified if using exported audit metadata. Otherwise set use_v2_audit_metadata to True." ) - pass return values @pydantic.validator("platform")