From 01fb64c531f6801909d1f4704c44afb43f5ea05f Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Tue, 26 Nov 2024 15:32:03 +0100 Subject: [PATCH] fix(ingest/bigquery): Fix performance issue with column profiling ignore (#11807) --- .../source/bigquery_v2/bigquery_schema.py | 1 - .../source/bigquery_v2/bigquery_schema_gen.py | 21 ------------- .../ingestion/source/bigquery_v2/profiler.py | 6 ---- .../ingestion/source/ge_data_profiler.py | 31 ++++++++++++++----- .../ingestion/source/ge_profiling_config.py | 5 +++ 5 files changed, 28 insertions(+), 36 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index be85d037af..3ce34be8dc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -118,7 +118,6 @@ class BigqueryTable(BaseTable): active_billable_bytes: Optional[int] = None long_term_billable_bytes: Optional[int] = None partition_info: Optional[PartitionInfo] = None - columns_ignore_from_profiling: List[str] = field(default_factory=list) external: bool = False constraints: List[BigqueryTableConstraint] = field(default_factory=list) table_type: Optional[str] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index 788016103d..4a3b47f6b5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -598,18 +598,6 @@ class BigQuerySchemaGenerator: dataset_name=dataset_name, ) - # This method is used to generate the ignore list for datatypes the profiler doesn't support we have to do it here - # because the profiler doesn't have access to columns - def generate_profile_ignore_list(self, columns: List[BigqueryColumn]) -> List[str]: - ignore_list: List[str] = [] - for column in columns: - if not column.data_type or any( - word in column.data_type.lower() - for word in ["array", "struct", "geography", "json"] - ): - ignore_list.append(column.field_path) - return ignore_list - def _process_table( self, table: BigqueryTable, @@ -631,15 +619,6 @@ class BigQuerySchemaGenerator: ) table.column_count = len(columns) - # We only collect profile ignore list if profiling is enabled and profile_table_level_only is false - if ( - self.config.is_profiling_enabled() - and not self.config.profiling.profile_table_level_only - ): - table.columns_ignore_from_profiling = self.generate_profile_ignore_list( - columns - ) - if not table.column_count: logger.warning( f"Table doesn't have any column or unable to get columns for table: {table_identifier}" diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index 6af8166fbf..182ae2265c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -166,12 +166,6 @@ WHERE normalized_table_name = BigqueryTableIdentifier( project_id=project_id, dataset=dataset, table=table.name ).get_table_name() - for column in table.columns_ignore_from_profiling: - # Profiler has issues with complex types (array, struct, geography, json), so we deny those types from profiling - # We also filter columns without data type as it means that column is part of a complex type. - self.config.profile_pattern.deny.append( - f"^{normalized_table_name}.{column}$" - ) if table.external and not self.config.profiling.profile_external_tables: self.report.profiling_skipped_other[f"{project_id}.{dataset}"] += 1 diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 0d9dadaf6a..f7d783cd3d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -7,6 +7,7 @@ import dataclasses import functools import json import logging +import re import threading import traceback import unittest.mock @@ -123,6 +124,8 @@ ProfilerTypeMapping.BINARY_TYPE_NAMES.append("LargeBinary") _datasource_connection_injection_lock = threading.Lock() +NORMALIZE_TYPE_PATTERN = re.compile(r"^(.*?)(?:[\[<(].*)?$") + @contextlib.contextmanager def _inject_connection_into_datasource(conn: Connection) -> Iterator[None]: @@ -165,11 +168,9 @@ def get_column_unique_count_dh_patch(self: SqlAlchemyDataset, column: str) -> in return convert_to_json_serializable(element_values.fetchone()[0]) elif self.engine.dialect.name.lower() == BIGQUERY: element_values = self.engine.execute( - sa.select( - [ - sa.func.coalesce(sa.text(f"APPROX_COUNT_DISTINCT(`{column}`)")), - ] - ).select_from(self._table) + sa.select(sa.func.APPROX_COUNT_DISTINCT(sa.column(column))).select_from( + self._table + ) ) return convert_to_json_serializable(element_values.fetchone()[0]) elif self.engine.dialect.name.lower() == SNOWFLAKE: @@ -378,6 +379,9 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase): f"{self.dataset_name}.{col}" ): ignored_columns_by_pattern.append(col) + # We try to ignore nested columns as well + elif not self.config.profile_nested_fields and "." in col: + ignored_columns_by_pattern.append(col) elif col_dict.get("type") and self._should_ignore_column(col_dict["type"]): ignored_columns_by_type.append(col) else: @@ -407,9 +411,18 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase): return columns_to_profile def _should_ignore_column(self, sqlalchemy_type: sa.types.TypeEngine) -> bool: - return str(sqlalchemy_type) in _get_column_types_to_ignore( - self.dataset.engine.dialect.name - ) + # We don't profiles columns with None types + if str(sqlalchemy_type) == "NULL": + return True + + sql_type = str(sqlalchemy_type) + + match = re.match(NORMALIZE_TYPE_PATTERN, sql_type) + + if match: + sql_type = match.group(1) + + return sql_type in _get_column_types_to_ignore(self.dataset.engine.dialect.name) @_run_with_query_combiner def _get_column_type(self, column_spec: _SingleColumnSpec, column: str) -> None: @@ -1397,6 +1410,8 @@ class DatahubGEProfiler: def _get_column_types_to_ignore(dialect_name: str) -> List[str]: if dialect_name.lower() == POSTGRESQL: return ["JSON"] + elif dialect_name.lower() == BIGQUERY: + return ["ARRAY", "STRUCT", "GEOGRAPHY", "JSON"] return [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index 9bf4451a18..42d0def0a4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -188,6 +188,11 @@ class GEProfilingConfig(GEProfilingBaseConfig): ), ) + profile_nested_fields: bool = Field( + default=False, + description="Whether to profile complex types like structs, arrays and maps. ", + ) + @pydantic.root_validator(pre=True) def deprecate_bigquery_temp_table_schema(cls, values): # TODO: Update docs to remove mention of this field.