diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 9cf67354fb3..6bba4792027 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -58,6 +58,7 @@ kafka_common = { sql_common = { # Required for all SQL sources. "sqlalchemy==1.3.24", + "great-expectations", } aws_common = { @@ -122,7 +123,6 @@ plugins: Dict[str, Set[str]] = { "snowflake": sql_common | {"snowflake-sqlalchemy<=1.2.4"}, "snowflake-usage": sql_common | {"snowflake-sqlalchemy<=1.2.4"}, "sqlalchemy": sql_common, - "sql-profiles": sql_common | {"great-expectations"}, "superset": {"requests"}, "trino": sql_common | { @@ -233,7 +233,6 @@ full_test_dev_requirements = { "mysql", "mariadb", "snowflake", - "sql-profiles", "redash", ] for dependency in plugins[plugin] diff --git a/metadata-ingestion/source_docs/sql_profiles.md b/metadata-ingestion/source_docs/sql_profiles.md index 2b13fb35786..785b0f883c4 100644 --- a/metadata-ingestion/source_docs/sql_profiles.md +++ b/metadata-ingestion/source_docs/sql_profiles.md @@ -4,7 +4,8 @@ For context on getting started with ingestion, check out our [metadata ingestion ## Setup -To install this plugin, run `pip install 'acryl-datahub[sql-profiles]'`. +To install this plugin, run `pip install 'acryl-datahub[sql-profiles]'` (prior to datahub version `0.8.16.0`). +In the versions after `0.8.16.0`, this gets installed along with the SQL-based source itself. The SQL-based profiler does not run alone, but rather can be enabled for other SQL-based sources. Enabling profiling will slow down ingestion runs. @@ -68,16 +69,28 @@ sink: Note that a `.` is used to denote nested fields in the YAML recipe. -| Field | Required | Default | Description | -| ---------------------------- | -------- | ------------------ | ----------------------------------------------------------------------- | -| `profiling.enabled` | | `False` | Whether profiling should be done. | -| `profiling.limit` | | | Max number of documents to profile. By default, profiles all documents. | -| `profiling.offset` | | | Offset in documents to profile. By default, uses no offset. | -| `profiling.max_workers` | | `5*os.cpu_count()` | Number of worker threads to use for profiling. Set to 1 to disable. | -| `profile_pattern.allow` | | | List of regex patterns for tables to profile. | -| `profile_pattern.deny` | | | List of regex patterns for tables to not profile. | -| `profile_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | -| `profile.send_sample_values` | | `True` | Whether to send sample values or not. | +| Field | Required | Default | Description | +| --------------------------------------------------- | -------- | --------------------------- | ------------------------------------------------------------------------------------ | +| `profiling.enabled` | | `False` | Whether profiling should be done. | +| `profiling.limit` | | | Max number of documents to profile. By default, profiles all documents. | +| `profiling.offset` | | | Offset in documents to profile. By default, uses no offset. | +| `profiling.max_workers` | | `5 * (os.cpu_count() or 4)` | Number of worker threads to use for profiling. Set to 1 to disable. | +| `profile_pattern.allow` | | `*` | List of regex patterns for tables or table columns to profile. Defaults to all. | +| `profile_pattern.deny` | | | List of regex patterns for tables or table columns to not profile. Defaults to none. | +| `profile_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | +| `profile.turn_off_expensive_profiling_metrics` | | False | Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.| +| `profile.max_number_of_fields_to_profile` | | `None` | A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.| +| `profile.profile_table_level_only` | | False | Whether to perform profiling at table-level only, or include column-level profiling as well.| +| `profile.include_field_null_count` | | `True` | Whether to profile for the number of nulls for each column. | +| `profile.include_field_min_value` | | `True` | Whether to profile for the min value of numeric columns. | +| `profile.include_field_max_value` | | `True` | Whether to profile for the max value of numeric columns. | +| `profile.include_field_mean_value` | | `True` | Whether to profile for the mean value of numeric columns. | +| `profile.include_field_median_value` | | `True` | Whether to profile for the median value of numeric columns. | +| `profile.include_field_stddev_value` | | `True` | Whether to profile for the standard deviation of numeric columns. | +| `profile.include_field_quantiles` | | `True` | Whether to profile for the quantiles of numeric columns. | +| `profile.include_field_distinct_value_frequencies` | | `True` | Whether to profile for distinct value frequencies. | +| `profile.include_field_histogram` | | `True` | Whether to profile for the histogram for numeric fields. | +| `profile.include_field_sample_values` | | `True` | Whether to profile for the sample values for all columns. | ## Compatibility diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub_custom_ge_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/datahub_custom_ge_profiler.py new file mode 100644 index 00000000000..14f99a0205b --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub_custom_ge_profiler.py @@ -0,0 +1,278 @@ +import logging +from typing import Any, Dict + +from great_expectations.core.expectation_suite import ExpectationSuite +from great_expectations.dataset.dataset import Dataset +from great_expectations.profile.base import ProfilerCardinality, ProfilerDataType +from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfilerBase + +logger = logging.getLogger(__name__) + + +class DatahubGECustomProfiler(BasicDatasetProfilerBase): + """DatahubGECustomProfiler is the customizable version of of the BasicDatasetProfiler. + + The reason for going this route instead of using UserConfigurableProfiler is UserConfigurableProfiler + does not compute all of the expectations the BasicDatasetProfiler such as the sample values etc. + """ + + # flake8: noqa: C901 + @classmethod + def _profile( + cls, + dataset: Dataset, + configuration: Dict[str, Any], + ) -> ExpectationSuite: + columns_to_profile = configuration["columns_to_profile"] + excluded_expectations = configuration["excluded_expectations"] + + df = dataset + + df.set_default_expectation_argument("catch_exceptions", True) + + df.expect_table_row_count_to_be_between(min_value=0, max_value=None) + df.expect_table_columns_to_match_ordered_list(None) + df.set_config_value("interactive_evaluation", False) + + meta_columns = {} + for column in columns_to_profile: + meta_columns[column] = {"description": ""} + + number_of_columns = len(columns_to_profile) + for i, column in enumerate(columns_to_profile): + logger.info( + " Preparing column {} of {}: {}".format( + i + 1, number_of_columns, column + ) + ) + + type_ = cls._get_column_type(df, column) + cardinality = cls._get_column_cardinality(df, column) + if "expect_column_values_to_not_be_null" not in excluded_expectations: + df.expect_column_values_to_not_be_null(column, mostly=0.5) + + if "expect_column_values_to_be_in_set" not in excluded_expectations: + df.expect_column_values_to_be_in_set( + column, [], result_format="SUMMARY" + ) + + if type_ == ProfilerDataType.INT: + if cardinality == ProfilerCardinality.UNIQUE: + df.expect_column_values_to_be_unique(column) + elif cardinality in [ + ProfilerCardinality.ONE, + ProfilerCardinality.TWO, + ProfilerCardinality.VERY_FEW, + ProfilerCardinality.FEW, + ] and ( + "expect_column_distinct_values_to_be_in_set" + not in excluded_expectations + ): + df.expect_column_distinct_values_to_be_in_set( + column, value_set=None, result_format="SUMMARY" + ) + elif cardinality in [ + ProfilerCardinality.MANY, + ProfilerCardinality.VERY_MANY, + ProfilerCardinality.UNIQUE, + ]: + if "expect_column_min_to_be_between" not in excluded_expectations: + df.expect_column_min_to_be_between( + column, min_value=None, max_value=None + ) + if "expect_column_max_to_be_between" not in excluded_expectations: + df.expect_column_max_to_be_between( + column, min_value=None, max_value=None + ) + if "expect_column_mean_to_be_between" not in excluded_expectations: + df.expect_column_mean_to_be_between( + column, min_value=None, max_value=None + ) + if ( + "expect_column_median_to_be_between" + not in excluded_expectations + ): + df.expect_column_median_to_be_between( + column, min_value=None, max_value=None + ) + if "expect_column_stdev_to_be_between" not in excluded_expectations: + df.expect_column_stdev_to_be_between( + column, min_value=None, max_value=None + ) + if ( + "expect_column_quantile_values_to_be_between" + not in excluded_expectations + ): + df.expect_column_quantile_values_to_be_between( + column, + quantile_ranges={ + "quantiles": [0.05, 0.25, 0.5, 0.75, 0.95], + "value_ranges": [ + [None, None], + [None, None], + [None, None], + [None, None], + [None, None], + ], + }, + ) + if ( + "expect_column_kl_divergence_to_be_less_than" + not in excluded_expectations + ): + df.expect_column_kl_divergence_to_be_less_than( + column, + partition_object=None, + threshold=None, + result_format="COMPLETE", + ) + else: # unknown cardinality - skip + pass + elif type_ == ProfilerDataType.FLOAT: + if cardinality == ProfilerCardinality.UNIQUE: + df.expect_column_values_to_be_unique(column) + + elif cardinality in [ + ProfilerCardinality.ONE, + ProfilerCardinality.TWO, + ProfilerCardinality.VERY_FEW, + ProfilerCardinality.FEW, + ] and ( + "expect_column_distinct_values_to_be_in_set" + not in excluded_expectations + ): + df.expect_column_distinct_values_to_be_in_set( + column, value_set=None, result_format="SUMMARY" + ) + + elif cardinality in [ + ProfilerCardinality.MANY, + ProfilerCardinality.VERY_MANY, + ProfilerCardinality.UNIQUE, + ]: + if "expect_column_min_to_be_between" not in excluded_expectations: + df.expect_column_min_to_be_between( + column, min_value=None, max_value=None + ) + if "expect_column_max_to_be_between" not in excluded_expectations: + df.expect_column_max_to_be_between( + column, min_value=None, max_value=None + ) + if "expect_column_mean_to_be_between" not in excluded_expectations: + df.expect_column_mean_to_be_between( + column, min_value=None, max_value=None + ) + if ( + "expect_column_median_to_be_between" + not in excluded_expectations + ): + df.expect_column_median_to_be_between( + column, min_value=None, max_value=None + ) + if ( + "expect_column_quantile_values_to_be_between" + not in excluded_expectations + ): + df.expect_column_quantile_values_to_be_between( + column, + quantile_ranges={ + "quantiles": [0.05, 0.25, 0.5, 0.75, 0.95], + "value_ranges": [ + [None, None], + [None, None], + [None, None], + [None, None], + [None, None], + ], + }, + ) + if ( + "expect_column_kl_divergence_to_be_less_than" + not in excluded_expectations + ): + df.expect_column_kl_divergence_to_be_less_than( + column, + partition_object=None, + threshold=None, + result_format="COMPLETE", + ) + else: # unknown cardinality - skip + pass + + elif type_ == ProfilerDataType.STRING: + # Check for leading and trailing whitespace. + df.expect_column_values_to_not_match_regex(column, r"^\s+|\s+$") + + if cardinality == ProfilerCardinality.UNIQUE: + df.expect_column_values_to_be_unique(column) + + elif cardinality in [ + ProfilerCardinality.ONE, + ProfilerCardinality.TWO, + ProfilerCardinality.VERY_FEW, + ProfilerCardinality.FEW, + ] and ( + "expect_column_distinct_values_to_be_in_set" + not in excluded_expectations + ): + df.expect_column_distinct_values_to_be_in_set( + column, value_set=None, result_format="SUMMARY" + ) + else: + pass + + elif type_ == ProfilerDataType.DATETIME: + + if "expect_column_min_to_be_between" not in excluded_expectations: + df.expect_column_min_to_be_between( + column, min_value=None, max_value=None + ) + + if "expect_column_max_to_be_between" not in excluded_expectations: + df.expect_column_max_to_be_between( + column, min_value=None, max_value=None + ) + + # Re-add once kl_divergence has been modified to support datetimes + # df.expect_column_kl_divergence_to_be_less_than(column, partition_object=None, + # threshold=None, result_format='COMPLETE') + + if cardinality in [ + ProfilerCardinality.ONE, + ProfilerCardinality.TWO, + ProfilerCardinality.VERY_FEW, + ProfilerCardinality.FEW, + ] and ( + "expect_column_distinct_values_to_be_in_set" + not in excluded_expectations + ): + df.expect_column_distinct_values_to_be_in_set( + column, value_set=None, result_format="SUMMARY" + ) + + else: + if cardinality == ProfilerCardinality.UNIQUE: + df.expect_column_values_to_be_unique(column) + + elif cardinality in [ + ProfilerCardinality.ONE, + ProfilerCardinality.TWO, + ProfilerCardinality.VERY_FEW, + ProfilerCardinality.FEW, + ] and ( + "expect_column_distinct_values_to_be_in_set" + not in excluded_expectations + ): + df.expect_column_distinct_values_to_be_in_set( + column, value_set=None, result_format="SUMMARY" + ) + else: + pass + + df.set_config_value("interactive_evaluation", True) + expectation_suite = df.get_expectation_suite( + suppress_warnings=True, discard_failed_expectations=False + ) + expectation_suite.meta["columns"] = meta_columns + + return expectation_suite diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 9f87ba527f6..46275d11646 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -1,13 +1,17 @@ import concurrent.futures import contextlib import dataclasses +import itertools import logging +import os import threading import time import unittest.mock import uuid -from typing import Any, Iterable, Iterator, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union +import pydantic +from great_expectations.core import ExpectationSuite from great_expectations.core.expectation_validation_result import ( ExpectationSuiteValidationResult, ExpectationValidationResult, @@ -19,11 +23,15 @@ from great_expectations.data_context.types.base import ( InMemoryStoreBackendDefaults, datasourceConfigSchema, ) +from great_expectations.dataset.dataset import Dataset from great_expectations.datasource.sqlalchemy_datasource import SqlAlchemyDatasource +from great_expectations.profile.base import DatasetProfiler from sqlalchemy.engine import Connection, Engine +from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.emitter.mce_builder import get_sys_time -from datahub.ingestion.api.source import SourceReport +from datahub.ingestion.source.datahub_custom_ge_profiler import DatahubGECustomProfiler +from datahub.ingestion.source.sql.sql_common import SQLSourceReport from datahub.metadata.schema_classes import ( DatasetFieldProfileClass, DatasetProfileClass, @@ -77,7 +85,186 @@ class GEProfilerRequest: pretty_name: str batch_kwargs: dict - send_sample_values: bool + +class GEProfilingConfig(ConfigModel): + enabled: bool = False + limit: Optional[int] = None + offset: Optional[int] = None + turn_off_expensive_profiling_metrics: bool = False + profile_table_level_only: bool = False + include_field_null_count: bool = True + include_field_min_value: bool = True + include_field_max_value: bool = True + include_field_mean_value: bool = True + include_field_median_value: bool = True + include_field_stddev_value: bool = True + include_field_quantiles: bool = True + include_field_distinct_value_frequencies: bool = True + include_field_histogram: bool = True + include_field_sample_values: bool = True + allow_deny_patterns: AllowDenyPattern = AllowDenyPattern.allow_all() + max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = None + + # The default of (5 * cpu_count) is adopted from the default max_workers + # parameter of ThreadPoolExecutor. Given that profiling is often an I/O-bound + # task, it may make sense to increase this default value in the future. + # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor + max_workers: int = 5 * (os.cpu_count() or 4) + + @pydantic.root_validator() + def ensure_field_level_settings_are_normalized( + cls: "GEProfilingConfig", values: Dict[str, Any] + ) -> Dict[str, Any]: + max_num_fields_to_profile_key = "max_number_of_fields_to_profile" + table_level_profiling_only_key = "profile_table_level_only" + max_num_fields_to_profile = values.get(max_num_fields_to_profile_key) + if values.get(table_level_profiling_only_key): + all_field_level_metrics: List[str] = [ + "include_field_null_count", + "include_field_min_value", + "include_field_max_value", + "include_field_mean_value", + "include_field_median_value", + "include_field_stddev_value", + "include_field_quantiles", + "include_field_distinct_value_frequencies", + "include_field_histogram", + "include_field_sample_values", + ] + # Supress all field-level metrics + for field_level_metric in all_field_level_metrics: + values[field_level_metric] = False + assert ( + max_num_fields_to_profile is None + ), f"{max_num_fields_to_profile_key} should be set to None" + + if values.get("turn_off_expensive_profiling_metrics"): + if not values.get(table_level_profiling_only_key): + expensive_field_level_metrics: List[str] = [ + "include_field_quantiles", + "include_field_distinct_value_frequencies", + "include_field_histogram", + "include_field_sample_values", + ] + for expensive_field_metric in expensive_field_level_metrics: + values[expensive_field_metric] = False + if max_num_fields_to_profile is None: + # We currently profile upto 10 non-filtered columns in this mode by default. + values[max_num_fields_to_profile_key] = 10 + + return values + + +class DatahubConfigurableProfiler(DatasetProfiler): + """ + DatahubConfigurableProfiler is a wrapper on top of DatahubGECustomProfiler that essentially translates the + GEProfilingConfig into a proper GEProfiler's interface and delegates actual profiling to DatahubGECustomProfiler. + Column filtering based on our Allow/Deny patterns requires us to intercept the _profile call + and compute the list of the columns to profile. + """ + + @staticmethod + def _get_excluded_expectations(config: GEProfilingConfig) -> List[str]: + # Compute excluded expectations + excluded_expectations: List[str] = [] + if not config.include_field_null_count: + excluded_expectations.append("expect_column_values_to_not_be_null") + if not config.include_field_min_value: + excluded_expectations.append("expect_column_min_to_be_between") + if not config.include_field_max_value: + excluded_expectations.append("expect_column_max_to_be_between") + if not config.include_field_mean_value: + excluded_expectations.append("expect_column_mean_to_be_between") + if not config.include_field_median_value: + excluded_expectations.append("expect_column_median_to_be_between") + if not config.include_field_stddev_value: + excluded_expectations.append("expect_column_stdev_to_be_between") + if not config.include_field_quantiles: + excluded_expectations.append("expect_column_quantile_values_to_be_between") + if not config.include_field_distinct_value_frequencies: + excluded_expectations.append("expect_column_distinct_values_to_be_in_set") + if not config.include_field_histogram: + excluded_expectations.append("expect_column_kl_divergence_to_be_less_than") + if not config.include_field_sample_values: + excluded_expectations.append("expect_column_values_to_be_in_set") + return excluded_expectations + + @staticmethod + def _get_columns_to_profile( + dataset: Dataset, + dataset_name: str, + config: GEProfilingConfig, + report: SQLSourceReport, + ) -> List[str]: + if config.profile_table_level_only: + return [] + + # Compute columns to profile + columns_to_profile: List[str] = [] + # Compute ignored columns + ignored_columns: List[str] = [] + for col in dataset.get_table_columns(): + # We expect the allow/deny patterns to specify '.' + if not config.allow_deny_patterns.allowed(f"{dataset_name}.{col}"): + ignored_columns.append(col) + else: + columns_to_profile.append(col) + if ignored_columns: + report.report_dropped( + f"The profile of columns by pattern {dataset_name}({', '.join(sorted(ignored_columns))})" + ) + + if config.max_number_of_fields_to_profile is not None: + columns_being_dropped: List[str] = list( + itertools.islice( + columns_to_profile, config.max_number_of_fields_to_profile, None + ) + ) + columns_to_profile = list( + itertools.islice( + columns_to_profile, config.max_number_of_fields_to_profile + ) + ) + if columns_being_dropped: + report.report_dropped( + f"The max_number_of_fields_to_profile={config.max_number_of_fields_to_profile} reached. Profile of columns {dataset_name}({', '.join(sorted(columns_being_dropped))})" + ) + return columns_to_profile + + @staticmethod + def datahub_config_to_ge_config( + dataset: Dataset, + dataset_name: str, + config: GEProfilingConfig, + report: SQLSourceReport, + ) -> Dict[str, Any]: + excluded_expectations: List[ + str + ] = DatahubConfigurableProfiler._get_excluded_expectations(config) + columns_to_profile: List[ + str + ] = DatahubConfigurableProfiler._get_columns_to_profile( + dataset, dataset_name, config, report + ) + return { + "excluded_expectations": excluded_expectations, + "columns_to_profile": columns_to_profile, + } + + @classmethod + def _profile( + cls, dataset: Dataset, configuration: Dict[str, Any] + ) -> ExpectationSuite: + """ + Override method, which returns the expectation suite using the UserConfigurable Profiler. + """ + profiler_configuration = cls.datahub_config_to_ge_config( + dataset, + configuration["dataset_name"], + configuration["config"], + configuration["report"], + ) + return DatahubGECustomProfiler._profile(dataset, profiler_configuration) @dataclasses.dataclass @@ -88,14 +275,21 @@ class GEContext: @dataclasses.dataclass class DatahubGEProfiler: - report: SourceReport + report: SQLSourceReport + config: GEProfilingConfig # The actual value doesn't matter, it just matters that we use it consistently throughout. _datasource_name_base: str = "my_sqlalchemy_datasource" - def __init__(self, conn: Union[Engine, Connection], report: SourceReport): + def __init__( + self, + conn: Union[Engine, Connection], + report: SQLSourceReport, + config: GEProfilingConfig, + ): self.base_engine = conn self.report = report + self.config = config @contextlib.contextmanager def _ge_context(self) -> Iterator[GEContext]: @@ -140,7 +334,7 @@ class DatahubGEProfiler: def generate_profiles( self, requests: List[GEProfilerRequest], max_workers: int - ) -> Iterable[Tuple[GEProfilerRequest, DatasetProfileClass]]: + ) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]: start_time = time.perf_counter() max_workers = min(max_workers, len(requests)) @@ -168,11 +362,10 @@ class DatahubGEProfiler: def generate_profile_from_request( self, request: GEProfilerRequest - ) -> Tuple[GEProfilerRequest, DatasetProfileClass]: + ) -> Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]: return request, self.generate_profile( request.pretty_name, **request.batch_kwargs, - send_sample_values=request.send_sample_values, ) def generate_profile( @@ -180,11 +373,8 @@ class DatahubGEProfiler: pretty_name: str, schema: str = None, table: str = None, - limit: int = None, - offset: int = None, - send_sample_values: bool = True, **kwargs: Any, - ) -> DatasetProfileClass: + ) -> Optional[DatasetProfileClass]: with self._ge_context() as ge_context: logger.info(f"Profiling {pretty_name} (this may take a while)") @@ -193,15 +383,17 @@ class DatahubGEProfiler: { "schema": schema, "table": table, - "limit": limit, - "offset": offset, + "limit": self.config.limit, + "offset": self.config.offset, **kwargs, }, pretty_name=pretty_name, ) - profile = self._convert_evrs_to_profile( - evrs, pretty_name=pretty_name, send_sample_values=send_sample_values + profile = ( + self._convert_evrs_to_profile(evrs, pretty_name=pretty_name) + if evrs is not None + else None ) logger.debug(f"Finished profiling {pretty_name}") @@ -213,21 +405,36 @@ class DatahubGEProfiler: batch_kwargs: dict, pretty_name: str, ) -> ExpectationSuiteValidationResult: - # Internally, this uses the GE dataset profiler: - # great_expectations.profile.basic_dataset_profiler.BasicDatasetProfiler + try: + start_time = time.perf_counter() + profile_results = ge_context.data_context.profile_data_asset( + ge_context.datasource_name, + profiler=DatahubConfigurableProfiler, + profiler_configuration={ + "config": self.config, + "dataset_name": pretty_name, + "report": self.report, + }, + batch_kwargs={ + "datasource": ge_context.datasource_name, + **batch_kwargs, + }, + ) + end_time = time.perf_counter() + logger.info( + f"Profiling for {pretty_name} took {end_time - start_time} seconds." + ) - profile_results = ge_context.data_context.profile_data_asset( - ge_context.datasource_name, - batch_kwargs={ - "datasource": ge_context.datasource_name, - **batch_kwargs, - }, - ) - assert profile_results["success"] - - assert len(profile_results["results"]) == 1 - _suite, evrs = profile_results["results"][0] - return evrs + assert profile_results["success"] + assert len(profile_results["results"]) == 1 + _suite, evrs = profile_results["results"][0] + return evrs + except Exception as e: + logger.warning( + f"Encountered exception {e}\nwhile profiling {pretty_name}, {batch_kwargs}" + ) + self.report.report_warning(pretty_name, "Exception {e}") + return None @staticmethod def _get_column_from_evr(evr: ExpectationValidationResult) -> Optional[str]: @@ -242,7 +449,6 @@ class DatahubGEProfiler: self, evrs: ExpectationSuiteValidationResult, pretty_name: str, - send_sample_values: bool, ) -> DatasetProfileClass: profile = DatasetProfileClass(timestampMillis=get_sys_time()) @@ -259,7 +465,6 @@ class DatahubGEProfiler: col, evrs_for_col, pretty_name=pretty_name, - send_sample_values=send_sample_values, ) return profile @@ -291,7 +496,6 @@ class DatahubGEProfiler: column: str, col_evrs: Iterable[ExpectationValidationResult], pretty_name: str, - send_sample_values: bool, ) -> None: # TRICKY: This method mutates the profile directly. @@ -346,8 +550,6 @@ class DatahubGEProfiler: column_profile.sampleValues = [ str(v) for v in res["partial_unexpected_list"] ] - if not send_sample_values: - column_profile.sampleValues = [] elif exp == "expect_column_kl_divergence_to_be_less_than": if "details" in res and "observed_partition" in res["details"]: partition = res["details"]["observed_partition"] @@ -368,8 +570,6 @@ class DatahubGEProfiler: ValueFrequencyClass(value=str(value), frequency=count) for value, count in res["details"]["value_counts"].items() ] - if not send_sample_values: - column_profile.distinctValueFrequencies = [] elif exp == "expect_column_values_to_be_in_type_list": # ignore; we already know the types for each column via ingestion pass diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index d4f896f59e3..c3adc58bb3a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -273,8 +273,6 @@ class BigQuerySource(SQLAlchemySource): return dict( schema=self.config.project_id, table=f"{schema}.{table}", - limit=self.config.profiling.limit, - offset=self.config.profiling.offset, ) @staticmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index ccd6c6dcc1d..d01820d5be5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -1,5 +1,4 @@ import logging -import os from abc import abstractmethod from dataclasses import dataclass, field from typing import ( @@ -21,12 +20,7 @@ from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector from sqlalchemy.sql import sqltypes as types -from datahub import __package_name__ -from datahub.configuration.common import ( - AllowDenyPattern, - ConfigModel, - ConfigurationError, -) +from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.emitter.mce_builder import DEFAULT_ENV from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext @@ -144,19 +138,6 @@ class SQLSourceReport(SourceReport): self.filtered.append(ent_name) -class GEProfilingConfig(ConfigModel): - enabled: bool = False - limit: Optional[int] = None - offset: Optional[int] = None - send_sample_values: bool = True - - # The default of (5 * cpu_count) is adopted from the default max_workers - # parameter of ThreadPoolExecutor. Given that profiling is often an I/O-bound - # task, it may make sense to increase this default value in the future. - # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor - max_workers: int = 5 * (os.cpu_count() or 4) - - class SQLAlchemyConfig(ConfigModel): env: str = DEFAULT_ENV options: dict = {} @@ -172,8 +153,19 @@ class SQLAlchemyConfig(ConfigModel): include_views: Optional[bool] = True include_tables: Optional[bool] = True + from datahub.ingestion.source.ge_data_profiler import GEProfilingConfig + profiling: GEProfilingConfig = GEProfilingConfig() + @pydantic.root_validator() + def ensure_profiling_pattern_is_passed_to_profiling( + cls, values: Dict[str, Any] + ) -> Dict[str, Any]: + profiling = values.get("profiling") + if profiling is not None and profiling.enabled: + profiling.allow_deny_patterns = values["profile_pattern"] + return values + @abstractmethod def get_sql_alchemy_url(self): pass @@ -316,12 +308,6 @@ class SQLAlchemySource(Source): self.platform = platform self.report = SQLSourceReport() - if self.config.profiling.enabled and not self._can_run_profiler(): - raise ConfigurationError( - "Table profiles requested but profiler plugin is not enabled. " - f"Try running: pip install '{__package_name__}[sql-profiles]'" - ) - def get_inspectors(self) -> Iterable[Inspector]: # This method can be overridden in the case that you want to dynamically # run on multiple databases. @@ -608,20 +594,12 @@ class SQLAlchemySource(Source): self.report.report_workunit(wu) yield wu - def _can_run_profiler(self) -> bool: - try: - from datahub.ingestion.source.ge_data_profiler import ( # noqa: F401 - DatahubGEProfiler, - ) - - return True - except Exception: - return False - def _get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler - return DatahubGEProfiler(conn=inspector.bind, report=self.report) + return DatahubGEProfiler( + conn=inspector.bind, report=self.report, config=self.config.profiling + ) def loop_profiler_requests( self, @@ -647,7 +625,6 @@ class SQLAlchemySource(Source): yield GEProfilerRequest( pretty_name=dataset_name, batch_kwargs=self.prepare_profiler_args(schema=schema, table=table), - send_sample_values=self.config.profiling.send_sample_values, ) def loop_profiler( @@ -656,6 +633,8 @@ class SQLAlchemySource(Source): for request, profile in profiler.generate_profiles( profile_requests, self.config.profiling.max_workers ): + if profile is None: + continue dataset_name = request.pretty_name mcp = MetadataChangeProposalWrapper( entityType="dataset", @@ -672,8 +651,6 @@ class SQLAlchemySource(Source): return dict( schema=schema, table=table, - limit=self.config.profiling.limit, - offset=self.config.profiling.offset, ) def get_report(self):