mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-25 17:47:49 +00:00
fix(ingest/bigquery): Fix performance issue with column profiling ignore (#11807)
This commit is contained in:
parent
b357f87f94
commit
01fb64c531
@ -118,7 +118,6 @@ class BigqueryTable(BaseTable):
|
|||||||
active_billable_bytes: Optional[int] = None
|
active_billable_bytes: Optional[int] = None
|
||||||
long_term_billable_bytes: Optional[int] = None
|
long_term_billable_bytes: Optional[int] = None
|
||||||
partition_info: Optional[PartitionInfo] = None
|
partition_info: Optional[PartitionInfo] = None
|
||||||
columns_ignore_from_profiling: List[str] = field(default_factory=list)
|
|
||||||
external: bool = False
|
external: bool = False
|
||||||
constraints: List[BigqueryTableConstraint] = field(default_factory=list)
|
constraints: List[BigqueryTableConstraint] = field(default_factory=list)
|
||||||
table_type: Optional[str] = None
|
table_type: Optional[str] = None
|
||||||
|
@ -598,18 +598,6 @@ class BigQuerySchemaGenerator:
|
|||||||
dataset_name=dataset_name,
|
dataset_name=dataset_name,
|
||||||
)
|
)
|
||||||
|
|
||||||
# This method is used to generate the ignore list for datatypes the profiler doesn't support we have to do it here
|
|
||||||
# because the profiler doesn't have access to columns
|
|
||||||
def generate_profile_ignore_list(self, columns: List[BigqueryColumn]) -> List[str]:
|
|
||||||
ignore_list: List[str] = []
|
|
||||||
for column in columns:
|
|
||||||
if not column.data_type or any(
|
|
||||||
word in column.data_type.lower()
|
|
||||||
for word in ["array", "struct", "geography", "json"]
|
|
||||||
):
|
|
||||||
ignore_list.append(column.field_path)
|
|
||||||
return ignore_list
|
|
||||||
|
|
||||||
def _process_table(
|
def _process_table(
|
||||||
self,
|
self,
|
||||||
table: BigqueryTable,
|
table: BigqueryTable,
|
||||||
@ -631,15 +619,6 @@ class BigQuerySchemaGenerator:
|
|||||||
)
|
)
|
||||||
table.column_count = len(columns)
|
table.column_count = len(columns)
|
||||||
|
|
||||||
# We only collect profile ignore list if profiling is enabled and profile_table_level_only is false
|
|
||||||
if (
|
|
||||||
self.config.is_profiling_enabled()
|
|
||||||
and not self.config.profiling.profile_table_level_only
|
|
||||||
):
|
|
||||||
table.columns_ignore_from_profiling = self.generate_profile_ignore_list(
|
|
||||||
columns
|
|
||||||
)
|
|
||||||
|
|
||||||
if not table.column_count:
|
if not table.column_count:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Table doesn't have any column or unable to get columns for table: {table_identifier}"
|
f"Table doesn't have any column or unable to get columns for table: {table_identifier}"
|
||||||
|
@ -166,12 +166,6 @@ WHERE
|
|||||||
normalized_table_name = BigqueryTableIdentifier(
|
normalized_table_name = BigqueryTableIdentifier(
|
||||||
project_id=project_id, dataset=dataset, table=table.name
|
project_id=project_id, dataset=dataset, table=table.name
|
||||||
).get_table_name()
|
).get_table_name()
|
||||||
for column in table.columns_ignore_from_profiling:
|
|
||||||
# Profiler has issues with complex types (array, struct, geography, json), so we deny those types from profiling
|
|
||||||
# We also filter columns without data type as it means that column is part of a complex type.
|
|
||||||
self.config.profile_pattern.deny.append(
|
|
||||||
f"^{normalized_table_name}.{column}$"
|
|
||||||
)
|
|
||||||
|
|
||||||
if table.external and not self.config.profiling.profile_external_tables:
|
if table.external and not self.config.profiling.profile_external_tables:
|
||||||
self.report.profiling_skipped_other[f"{project_id}.{dataset}"] += 1
|
self.report.profiling_skipped_other[f"{project_id}.{dataset}"] += 1
|
||||||
|
@ -7,6 +7,7 @@ import dataclasses
|
|||||||
import functools
|
import functools
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import re
|
||||||
import threading
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
import unittest.mock
|
import unittest.mock
|
||||||
@ -123,6 +124,8 @@ ProfilerTypeMapping.BINARY_TYPE_NAMES.append("LargeBinary")
|
|||||||
|
|
||||||
_datasource_connection_injection_lock = threading.Lock()
|
_datasource_connection_injection_lock = threading.Lock()
|
||||||
|
|
||||||
|
NORMALIZE_TYPE_PATTERN = re.compile(r"^(.*?)(?:[\[<(].*)?$")
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _inject_connection_into_datasource(conn: Connection) -> Iterator[None]:
|
def _inject_connection_into_datasource(conn: Connection) -> Iterator[None]:
|
||||||
@ -165,11 +168,9 @@ def get_column_unique_count_dh_patch(self: SqlAlchemyDataset, column: str) -> in
|
|||||||
return convert_to_json_serializable(element_values.fetchone()[0])
|
return convert_to_json_serializable(element_values.fetchone()[0])
|
||||||
elif self.engine.dialect.name.lower() == BIGQUERY:
|
elif self.engine.dialect.name.lower() == BIGQUERY:
|
||||||
element_values = self.engine.execute(
|
element_values = self.engine.execute(
|
||||||
sa.select(
|
sa.select(sa.func.APPROX_COUNT_DISTINCT(sa.column(column))).select_from(
|
||||||
[
|
self._table
|
||||||
sa.func.coalesce(sa.text(f"APPROX_COUNT_DISTINCT(`{column}`)")),
|
)
|
||||||
]
|
|
||||||
).select_from(self._table)
|
|
||||||
)
|
)
|
||||||
return convert_to_json_serializable(element_values.fetchone()[0])
|
return convert_to_json_serializable(element_values.fetchone()[0])
|
||||||
elif self.engine.dialect.name.lower() == SNOWFLAKE:
|
elif self.engine.dialect.name.lower() == SNOWFLAKE:
|
||||||
@ -378,6 +379,9 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
|
|||||||
f"{self.dataset_name}.{col}"
|
f"{self.dataset_name}.{col}"
|
||||||
):
|
):
|
||||||
ignored_columns_by_pattern.append(col)
|
ignored_columns_by_pattern.append(col)
|
||||||
|
# We try to ignore nested columns as well
|
||||||
|
elif not self.config.profile_nested_fields and "." in col:
|
||||||
|
ignored_columns_by_pattern.append(col)
|
||||||
elif col_dict.get("type") and self._should_ignore_column(col_dict["type"]):
|
elif col_dict.get("type") and self._should_ignore_column(col_dict["type"]):
|
||||||
ignored_columns_by_type.append(col)
|
ignored_columns_by_type.append(col)
|
||||||
else:
|
else:
|
||||||
@ -407,9 +411,18 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
|
|||||||
return columns_to_profile
|
return columns_to_profile
|
||||||
|
|
||||||
def _should_ignore_column(self, sqlalchemy_type: sa.types.TypeEngine) -> bool:
|
def _should_ignore_column(self, sqlalchemy_type: sa.types.TypeEngine) -> bool:
|
||||||
return str(sqlalchemy_type) in _get_column_types_to_ignore(
|
# We don't profiles columns with None types
|
||||||
self.dataset.engine.dialect.name
|
if str(sqlalchemy_type) == "NULL":
|
||||||
)
|
return True
|
||||||
|
|
||||||
|
sql_type = str(sqlalchemy_type)
|
||||||
|
|
||||||
|
match = re.match(NORMALIZE_TYPE_PATTERN, sql_type)
|
||||||
|
|
||||||
|
if match:
|
||||||
|
sql_type = match.group(1)
|
||||||
|
|
||||||
|
return sql_type in _get_column_types_to_ignore(self.dataset.engine.dialect.name)
|
||||||
|
|
||||||
@_run_with_query_combiner
|
@_run_with_query_combiner
|
||||||
def _get_column_type(self, column_spec: _SingleColumnSpec, column: str) -> None:
|
def _get_column_type(self, column_spec: _SingleColumnSpec, column: str) -> None:
|
||||||
@ -1397,6 +1410,8 @@ class DatahubGEProfiler:
|
|||||||
def _get_column_types_to_ignore(dialect_name: str) -> List[str]:
|
def _get_column_types_to_ignore(dialect_name: str) -> List[str]:
|
||||||
if dialect_name.lower() == POSTGRESQL:
|
if dialect_name.lower() == POSTGRESQL:
|
||||||
return ["JSON"]
|
return ["JSON"]
|
||||||
|
elif dialect_name.lower() == BIGQUERY:
|
||||||
|
return ["ARRAY", "STRUCT", "GEOGRAPHY", "JSON"]
|
||||||
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
@ -188,6 +188,11 @@ class GEProfilingConfig(GEProfilingBaseConfig):
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
profile_nested_fields: bool = Field(
|
||||||
|
default=False,
|
||||||
|
description="Whether to profile complex types like structs, arrays and maps. ",
|
||||||
|
)
|
||||||
|
|
||||||
@pydantic.root_validator(pre=True)
|
@pydantic.root_validator(pre=True)
|
||||||
def deprecate_bigquery_temp_table_schema(cls, values):
|
def deprecate_bigquery_temp_table_schema(cls, values):
|
||||||
# TODO: Update docs to remove mention of this field.
|
# TODO: Update docs to remove mention of this field.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user