mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 01:48:24 +00:00
feat(ingest): allow specific profiler config fields to override profile_table_level_only (#6366)
This commit is contained in:
parent
a3aec72c3d
commit
8c322ede35
@ -245,7 +245,7 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
|
||||
query_combiner: SQLAlchemyQueryCombiner
|
||||
|
||||
def _get_columns_to_profile(self) -> List[str]:
|
||||
if self.config.profile_table_level_only:
|
||||
if not self.config.any_field_level_metrics_enabled():
|
||||
return []
|
||||
|
||||
# Compute columns to profile
|
||||
|
||||
@ -7,6 +7,13 @@ from pydantic.fields import Field
|
||||
|
||||
from datahub.configuration.common import AllowDenyPattern, ConfigModel
|
||||
|
||||
_PROFILING_FLAGS_TO_REPORT = {
|
||||
"turn_off_expensive_profiling_metrics",
|
||||
"profile_table_level_only",
|
||||
"query_combiner_enabled",
|
||||
# all include_field_ flags are reported.
|
||||
}
|
||||
|
||||
|
||||
class GEProfilingConfig(ConfigModel):
|
||||
enabled: bool = Field(
|
||||
@ -25,7 +32,6 @@ class GEProfilingConfig(ConfigModel):
|
||||
description="Whether to report datasets or dataset columns which were not profiled. Set to `True` for debugging purposes.",
|
||||
)
|
||||
|
||||
# These settings will override the ones below.
|
||||
turn_off_expensive_profiling_metrics: bool = Field(
|
||||
default=False,
|
||||
description="Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.",
|
||||
@ -136,41 +142,46 @@ class GEProfilingConfig(ConfigModel):
|
||||
cls: "GEProfilingConfig", values: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
max_num_fields_to_profile_key = "max_number_of_fields_to_profile"
|
||||
table_level_profiling_only_key = "profile_table_level_only"
|
||||
max_num_fields_to_profile = values.get(max_num_fields_to_profile_key)
|
||||
if values.get(table_level_profiling_only_key):
|
||||
all_field_level_metrics: List[str] = [
|
||||
"include_field_null_count",
|
||||
"include_field_distinct_count",
|
||||
"include_field_min_value",
|
||||
"include_field_max_value",
|
||||
"include_field_mean_value",
|
||||
"include_field_median_value",
|
||||
"include_field_stddev_value",
|
||||
|
||||
# Disable all field-level metrics.
|
||||
if values.get("profile_table_level_only"):
|
||||
for field_level_metric in cls.__fields__:
|
||||
if field_level_metric.startswith("include_field_"):
|
||||
values.setdefault(field_level_metric, False)
|
||||
|
||||
assert (
|
||||
max_num_fields_to_profile is None
|
||||
), f"{max_num_fields_to_profile_key} should be set to None"
|
||||
|
||||
# Disable expensive queries.
|
||||
if values.get("turn_off_expensive_profiling_metrics"):
|
||||
expensive_field_level_metrics: List[str] = [
|
||||
"include_field_quantiles",
|
||||
"include_field_distinct_value_frequencies",
|
||||
"include_field_histogram",
|
||||
"include_field_sample_values",
|
||||
]
|
||||
# Suppress all field-level metrics
|
||||
for field_level_metric in all_field_level_metrics:
|
||||
values[field_level_metric] = False
|
||||
assert (
|
||||
max_num_fields_to_profile is None
|
||||
), f"{max_num_fields_to_profile_key} should be set to None"
|
||||
for expensive_field_metric in expensive_field_level_metrics:
|
||||
values.setdefault(expensive_field_metric, False)
|
||||
|
||||
if values.get("turn_off_expensive_profiling_metrics"):
|
||||
if not values.get(table_level_profiling_only_key):
|
||||
expensive_field_level_metrics: List[str] = [
|
||||
"include_field_quantiles",
|
||||
"include_field_distinct_value_frequencies",
|
||||
"include_field_histogram",
|
||||
"include_field_sample_values",
|
||||
]
|
||||
for expensive_field_metric in expensive_field_level_metrics:
|
||||
values[expensive_field_metric] = False
|
||||
if max_num_fields_to_profile is None:
|
||||
# We currently profile up to 10 non-filtered columns in this mode by default.
|
||||
values[max_num_fields_to_profile_key] = 10
|
||||
# By default, we profile at most 10 non-filtered columns in this mode.
|
||||
values.setdefault(max_num_fields_to_profile_key, 10)
|
||||
|
||||
return values
|
||||
|
||||
def any_field_level_metrics_enabled(self) -> bool:
|
||||
return any(
|
||||
getattr(self, field_name)
|
||||
for field_name in self.__fields__
|
||||
if field_name.startswith("include_field_")
|
||||
)
|
||||
|
||||
def config_for_telemetry(self) -> Dict[str, Any]:
|
||||
config_dict = self.dict()
|
||||
|
||||
return {
|
||||
flag: config_dict[flag]
|
||||
for flag in config_dict
|
||||
if flag in _PROFILING_FLAGS_TO_REPORT or flag.startswith("include_field_")
|
||||
}
|
||||
|
||||
@ -124,24 +124,14 @@ class DataLakeProfilerConfig(ConfigModel):
|
||||
cls: "DataLakeProfilerConfig", values: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
max_num_fields_to_profile_key = "max_number_of_fields_to_profile"
|
||||
table_level_profiling_only_key = "profile_table_level_only"
|
||||
max_num_fields_to_profile = values.get(max_num_fields_to_profile_key)
|
||||
if values.get(table_level_profiling_only_key):
|
||||
all_field_level_metrics: List[str] = [
|
||||
"include_field_null_count",
|
||||
"include_field_min_value",
|
||||
"include_field_max_value",
|
||||
"include_field_mean_value",
|
||||
"include_field_median_value",
|
||||
"include_field_stddev_value",
|
||||
"include_field_quantiles",
|
||||
"include_field_distinct_value_frequencies",
|
||||
"include_field_histogram",
|
||||
"include_field_sample_values",
|
||||
]
|
||||
# Suppress all field-level metrics
|
||||
for field_level_metric in all_field_level_metrics:
|
||||
values[field_level_metric] = False
|
||||
|
||||
# Disable all field-level metrics.
|
||||
if values.get("profile_table_level_only"):
|
||||
for field_level_metric in cls.__fields__:
|
||||
if field_level_metric.startswith("include_field_"):
|
||||
values.setdefault(field_level_metric, False)
|
||||
|
||||
assert (
|
||||
max_num_fields_to_profile is None
|
||||
), f"{max_num_fields_to_profile_key} should be set to None"
|
||||
|
||||
@ -455,23 +455,6 @@ config_options_to_report = [
|
||||
"include_tables",
|
||||
]
|
||||
|
||||
# flags to emit telemetry for
|
||||
profiling_flags_to_report = [
|
||||
"turn_off_expensive_profiling_metrics",
|
||||
"profile_table_level_only",
|
||||
"include_field_null_count",
|
||||
"include_field_min_value",
|
||||
"include_field_max_value",
|
||||
"include_field_mean_value",
|
||||
"include_field_median_value",
|
||||
"include_field_stddev_value",
|
||||
"include_field_quantiles",
|
||||
"include_field_distinct_value_frequencies",
|
||||
"include_field_histogram",
|
||||
"include_field_sample_values",
|
||||
"query_combiner_enabled",
|
||||
]
|
||||
|
||||
|
||||
class SQLAlchemySource(StatefulIngestionSourceBase):
|
||||
"""A Base class for all SQL Sources that use SQLAlchemy to extend"""
|
||||
@ -508,13 +491,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
|
||||
)
|
||||
|
||||
if config.profiling.enabled:
|
||||
|
||||
telemetry.telemetry_instance.ping(
|
||||
"sql_profiling_config",
|
||||
{
|
||||
config_flag: config.profiling.dict().get(config_flag)
|
||||
for config_flag in profiling_flags_to_report
|
||||
},
|
||||
config.profiling.config_for_telemetry(),
|
||||
)
|
||||
if self.config.domain:
|
||||
self.domain_registry = DomainRegistry(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user