feat(ingest): user configurable dataset profiling. (#3453)

This commit is contained in:
Ravindra Lanka 2021-10-27 19:49:40 -07:00 committed by GitHub
parent 4b6860efd7
commit a1bf95307b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 557 additions and 92 deletions

View File

@ -58,6 +58,7 @@ kafka_common = {
sql_common = { sql_common = {
# Required for all SQL sources. # Required for all SQL sources.
"sqlalchemy==1.3.24", "sqlalchemy==1.3.24",
"great-expectations",
} }
aws_common = { aws_common = {
@ -122,7 +123,6 @@ plugins: Dict[str, Set[str]] = {
"snowflake": sql_common | {"snowflake-sqlalchemy<=1.2.4"}, "snowflake": sql_common | {"snowflake-sqlalchemy<=1.2.4"},
"snowflake-usage": sql_common | {"snowflake-sqlalchemy<=1.2.4"}, "snowflake-usage": sql_common | {"snowflake-sqlalchemy<=1.2.4"},
"sqlalchemy": sql_common, "sqlalchemy": sql_common,
"sql-profiles": sql_common | {"great-expectations"},
"superset": {"requests"}, "superset": {"requests"},
"trino": sql_common "trino": sql_common
| { | {
@ -233,7 +233,6 @@ full_test_dev_requirements = {
"mysql", "mysql",
"mariadb", "mariadb",
"snowflake", "snowflake",
"sql-profiles",
"redash", "redash",
] ]
for dependency in plugins[plugin] for dependency in plugins[plugin]

View File

@ -4,7 +4,8 @@ For context on getting started with ingestion, check out our [metadata ingestion
## Setup ## Setup
To install this plugin, run `pip install 'acryl-datahub[sql-profiles]'`. To install this plugin, run `pip install 'acryl-datahub[sql-profiles]'` (prior to datahub version `0.8.16.0`).
In the versions after `0.8.16.0`, this gets installed along with the SQL-based source itself.
The SQL-based profiler does not run alone, but rather can be enabled for other SQL-based sources. The SQL-based profiler does not run alone, but rather can be enabled for other SQL-based sources.
Enabling profiling will slow down ingestion runs. Enabling profiling will slow down ingestion runs.
@ -68,16 +69,28 @@ sink:
Note that a `.` is used to denote nested fields in the YAML recipe. Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description | | Field | Required | Default | Description |
| ---------------------------- | -------- | ------------------ | ----------------------------------------------------------------------- | | --------------------------------------------------- | -------- | --------------------------- | ------------------------------------------------------------------------------------ |
| `profiling.enabled` | | `False` | Whether profiling should be done. | | `profiling.enabled` | | `False` | Whether profiling should be done. |
| `profiling.limit` | | | Max number of documents to profile. By default, profiles all documents. | | `profiling.limit` | | | Max number of documents to profile. By default, profiles all documents. |
| `profiling.offset` | | | Offset in documents to profile. By default, uses no offset. | | `profiling.offset` | | | Offset in documents to profile. By default, uses no offset. |
| `profiling.max_workers` | | `5*os.cpu_count()` | Number of worker threads to use for profiling. Set to 1 to disable. | | `profiling.max_workers` | | `5 * (os.cpu_count() or 4)` | Number of worker threads to use for profiling. Set to 1 to disable. |
| `profile_pattern.allow` | | | List of regex patterns for tables to profile. | | `profile_pattern.allow` | | `*` | List of regex patterns for tables or table columns to profile. Defaults to all. |
| `profile_pattern.deny` | | | List of regex patterns for tables to not profile. | | `profile_pattern.deny` | | | List of regex patterns for tables or table columns to not profile. Defaults to none. |
| `profile_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | | `profile_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `profile.send_sample_values` | | `True` | Whether to send sample values or not. | | `profile.turn_off_expensive_profiling_metrics` | | False | Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.|
| `profile.max_number_of_fields_to_profile` | | `None` | A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.|
| `profile.profile_table_level_only` | | False | Whether to perform profiling at table-level only, or include column-level profiling as well.|
| `profile.include_field_null_count` | | `True` | Whether to profile for the number of nulls for each column. |
| `profile.include_field_min_value` | | `True` | Whether to profile for the min value of numeric columns. |
| `profile.include_field_max_value` | | `True` | Whether to profile for the max value of numeric columns. |
| `profile.include_field_mean_value` | | `True` | Whether to profile for the mean value of numeric columns. |
| `profile.include_field_median_value` | | `True` | Whether to profile for the median value of numeric columns. |
| `profile.include_field_stddev_value` | | `True` | Whether to profile for the standard deviation of numeric columns. |
| `profile.include_field_quantiles` | | `True` | Whether to profile for the quantiles of numeric columns. |
| `profile.include_field_distinct_value_frequencies` | | `True` | Whether to profile for distinct value frequencies. |
| `profile.include_field_histogram` | | `True` | Whether to profile for the histogram for numeric fields. |
| `profile.include_field_sample_values` | | `True` | Whether to profile for the sample values for all columns. |
## Compatibility ## Compatibility

View File

@ -0,0 +1,278 @@
import logging
from typing import Any, Dict
from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.dataset.dataset import Dataset
from great_expectations.profile.base import ProfilerCardinality, ProfilerDataType
from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfilerBase
logger = logging.getLogger(__name__)
class DatahubGECustomProfiler(BasicDatasetProfilerBase):
"""DatahubGECustomProfiler is the customizable version of of the BasicDatasetProfiler.
The reason for going this route instead of using UserConfigurableProfiler is UserConfigurableProfiler
does not compute all of the expectations the BasicDatasetProfiler such as the sample values etc.
"""
# flake8: noqa: C901
@classmethod
def _profile(
cls,
dataset: Dataset,
configuration: Dict[str, Any],
) -> ExpectationSuite:
columns_to_profile = configuration["columns_to_profile"]
excluded_expectations = configuration["excluded_expectations"]
df = dataset
df.set_default_expectation_argument("catch_exceptions", True)
df.expect_table_row_count_to_be_between(min_value=0, max_value=None)
df.expect_table_columns_to_match_ordered_list(None)
df.set_config_value("interactive_evaluation", False)
meta_columns = {}
for column in columns_to_profile:
meta_columns[column] = {"description": ""}
number_of_columns = len(columns_to_profile)
for i, column in enumerate(columns_to_profile):
logger.info(
" Preparing column {} of {}: {}".format(
i + 1, number_of_columns, column
)
)
type_ = cls._get_column_type(df, column)
cardinality = cls._get_column_cardinality(df, column)
if "expect_column_values_to_not_be_null" not in excluded_expectations:
df.expect_column_values_to_not_be_null(column, mostly=0.5)
if "expect_column_values_to_be_in_set" not in excluded_expectations:
df.expect_column_values_to_be_in_set(
column, [], result_format="SUMMARY"
)
if type_ == ProfilerDataType.INT:
if cardinality == ProfilerCardinality.UNIQUE:
df.expect_column_values_to_be_unique(column)
elif cardinality in [
ProfilerCardinality.ONE,
ProfilerCardinality.TWO,
ProfilerCardinality.VERY_FEW,
ProfilerCardinality.FEW,
] and (
"expect_column_distinct_values_to_be_in_set"
not in excluded_expectations
):
df.expect_column_distinct_values_to_be_in_set(
column, value_set=None, result_format="SUMMARY"
)
elif cardinality in [
ProfilerCardinality.MANY,
ProfilerCardinality.VERY_MANY,
ProfilerCardinality.UNIQUE,
]:
if "expect_column_min_to_be_between" not in excluded_expectations:
df.expect_column_min_to_be_between(
column, min_value=None, max_value=None
)
if "expect_column_max_to_be_between" not in excluded_expectations:
df.expect_column_max_to_be_between(
column, min_value=None, max_value=None
)
if "expect_column_mean_to_be_between" not in excluded_expectations:
df.expect_column_mean_to_be_between(
column, min_value=None, max_value=None
)
if (
"expect_column_median_to_be_between"
not in excluded_expectations
):
df.expect_column_median_to_be_between(
column, min_value=None, max_value=None
)
if "expect_column_stdev_to_be_between" not in excluded_expectations:
df.expect_column_stdev_to_be_between(
column, min_value=None, max_value=None
)
if (
"expect_column_quantile_values_to_be_between"
not in excluded_expectations
):
df.expect_column_quantile_values_to_be_between(
column,
quantile_ranges={
"quantiles": [0.05, 0.25, 0.5, 0.75, 0.95],
"value_ranges": [
[None, None],
[None, None],
[None, None],
[None, None],
[None, None],
],
},
)
if (
"expect_column_kl_divergence_to_be_less_than"
not in excluded_expectations
):
df.expect_column_kl_divergence_to_be_less_than(
column,
partition_object=None,
threshold=None,
result_format="COMPLETE",
)
else: # unknown cardinality - skip
pass
elif type_ == ProfilerDataType.FLOAT:
if cardinality == ProfilerCardinality.UNIQUE:
df.expect_column_values_to_be_unique(column)
elif cardinality in [
ProfilerCardinality.ONE,
ProfilerCardinality.TWO,
ProfilerCardinality.VERY_FEW,
ProfilerCardinality.FEW,
] and (
"expect_column_distinct_values_to_be_in_set"
not in excluded_expectations
):
df.expect_column_distinct_values_to_be_in_set(
column, value_set=None, result_format="SUMMARY"
)
elif cardinality in [
ProfilerCardinality.MANY,
ProfilerCardinality.VERY_MANY,
ProfilerCardinality.UNIQUE,
]:
if "expect_column_min_to_be_between" not in excluded_expectations:
df.expect_column_min_to_be_between(
column, min_value=None, max_value=None
)
if "expect_column_max_to_be_between" not in excluded_expectations:
df.expect_column_max_to_be_between(
column, min_value=None, max_value=None
)
if "expect_column_mean_to_be_between" not in excluded_expectations:
df.expect_column_mean_to_be_between(
column, min_value=None, max_value=None
)
if (
"expect_column_median_to_be_between"
not in excluded_expectations
):
df.expect_column_median_to_be_between(
column, min_value=None, max_value=None
)
if (
"expect_column_quantile_values_to_be_between"
not in excluded_expectations
):
df.expect_column_quantile_values_to_be_between(
column,
quantile_ranges={
"quantiles": [0.05, 0.25, 0.5, 0.75, 0.95],
"value_ranges": [
[None, None],
[None, None],
[None, None],
[None, None],
[None, None],
],
},
)
if (
"expect_column_kl_divergence_to_be_less_than"
not in excluded_expectations
):
df.expect_column_kl_divergence_to_be_less_than(
column,
partition_object=None,
threshold=None,
result_format="COMPLETE",
)
else: # unknown cardinality - skip
pass
elif type_ == ProfilerDataType.STRING:
# Check for leading and trailing whitespace.
df.expect_column_values_to_not_match_regex(column, r"^\s+|\s+$")
if cardinality == ProfilerCardinality.UNIQUE:
df.expect_column_values_to_be_unique(column)
elif cardinality in [
ProfilerCardinality.ONE,
ProfilerCardinality.TWO,
ProfilerCardinality.VERY_FEW,
ProfilerCardinality.FEW,
] and (
"expect_column_distinct_values_to_be_in_set"
not in excluded_expectations
):
df.expect_column_distinct_values_to_be_in_set(
column, value_set=None, result_format="SUMMARY"
)
else:
pass
elif type_ == ProfilerDataType.DATETIME:
if "expect_column_min_to_be_between" not in excluded_expectations:
df.expect_column_min_to_be_between(
column, min_value=None, max_value=None
)
if "expect_column_max_to_be_between" not in excluded_expectations:
df.expect_column_max_to_be_between(
column, min_value=None, max_value=None
)
# Re-add once kl_divergence has been modified to support datetimes
# df.expect_column_kl_divergence_to_be_less_than(column, partition_object=None,
# threshold=None, result_format='COMPLETE')
if cardinality in [
ProfilerCardinality.ONE,
ProfilerCardinality.TWO,
ProfilerCardinality.VERY_FEW,
ProfilerCardinality.FEW,
] and (
"expect_column_distinct_values_to_be_in_set"
not in excluded_expectations
):
df.expect_column_distinct_values_to_be_in_set(
column, value_set=None, result_format="SUMMARY"
)
else:
if cardinality == ProfilerCardinality.UNIQUE:
df.expect_column_values_to_be_unique(column)
elif cardinality in [
ProfilerCardinality.ONE,
ProfilerCardinality.TWO,
ProfilerCardinality.VERY_FEW,
ProfilerCardinality.FEW,
] and (
"expect_column_distinct_values_to_be_in_set"
not in excluded_expectations
):
df.expect_column_distinct_values_to_be_in_set(
column, value_set=None, result_format="SUMMARY"
)
else:
pass
df.set_config_value("interactive_evaluation", True)
expectation_suite = df.get_expectation_suite(
suppress_warnings=True, discard_failed_expectations=False
)
expectation_suite.meta["columns"] = meta_columns
return expectation_suite

View File

@ -1,13 +1,17 @@
import concurrent.futures import concurrent.futures
import contextlib import contextlib
import dataclasses import dataclasses
import itertools
import logging import logging
import os
import threading import threading
import time import time
import unittest.mock import unittest.mock
import uuid import uuid
from typing import Any, Iterable, Iterator, List, Optional, Tuple, Union from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
import pydantic
from great_expectations.core import ExpectationSuite
from great_expectations.core.expectation_validation_result import ( from great_expectations.core.expectation_validation_result import (
ExpectationSuiteValidationResult, ExpectationSuiteValidationResult,
ExpectationValidationResult, ExpectationValidationResult,
@ -19,11 +23,15 @@ from great_expectations.data_context.types.base import (
InMemoryStoreBackendDefaults, InMemoryStoreBackendDefaults,
datasourceConfigSchema, datasourceConfigSchema,
) )
from great_expectations.dataset.dataset import Dataset
from great_expectations.datasource.sqlalchemy_datasource import SqlAlchemyDatasource from great_expectations.datasource.sqlalchemy_datasource import SqlAlchemyDatasource
from great_expectations.profile.base import DatasetProfiler
from sqlalchemy.engine import Connection, Engine from sqlalchemy.engine import Connection, Engine
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.emitter.mce_builder import get_sys_time from datahub.emitter.mce_builder import get_sys_time
from datahub.ingestion.api.source import SourceReport from datahub.ingestion.source.datahub_custom_ge_profiler import DatahubGECustomProfiler
from datahub.ingestion.source.sql.sql_common import SQLSourceReport
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
DatasetFieldProfileClass, DatasetFieldProfileClass,
DatasetProfileClass, DatasetProfileClass,
@ -77,7 +85,186 @@ class GEProfilerRequest:
pretty_name: str pretty_name: str
batch_kwargs: dict batch_kwargs: dict
send_sample_values: bool
class GEProfilingConfig(ConfigModel):
enabled: bool = False
limit: Optional[int] = None
offset: Optional[int] = None
turn_off_expensive_profiling_metrics: bool = False
profile_table_level_only: bool = False
include_field_null_count: bool = True
include_field_min_value: bool = True
include_field_max_value: bool = True
include_field_mean_value: bool = True
include_field_median_value: bool = True
include_field_stddev_value: bool = True
include_field_quantiles: bool = True
include_field_distinct_value_frequencies: bool = True
include_field_histogram: bool = True
include_field_sample_values: bool = True
allow_deny_patterns: AllowDenyPattern = AllowDenyPattern.allow_all()
max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = None
# The default of (5 * cpu_count) is adopted from the default max_workers
# parameter of ThreadPoolExecutor. Given that profiling is often an I/O-bound
# task, it may make sense to increase this default value in the future.
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
max_workers: int = 5 * (os.cpu_count() or 4)
@pydantic.root_validator()
def ensure_field_level_settings_are_normalized(
cls: "GEProfilingConfig", values: Dict[str, Any]
) -> Dict[str, Any]:
max_num_fields_to_profile_key = "max_number_of_fields_to_profile"
table_level_profiling_only_key = "profile_table_level_only"
max_num_fields_to_profile = values.get(max_num_fields_to_profile_key)
if values.get(table_level_profiling_only_key):
all_field_level_metrics: List[str] = [
"include_field_null_count",
"include_field_min_value",
"include_field_max_value",
"include_field_mean_value",
"include_field_median_value",
"include_field_stddev_value",
"include_field_quantiles",
"include_field_distinct_value_frequencies",
"include_field_histogram",
"include_field_sample_values",
]
# Supress all field-level metrics
for field_level_metric in all_field_level_metrics:
values[field_level_metric] = False
assert (
max_num_fields_to_profile is None
), f"{max_num_fields_to_profile_key} should be set to None"
if values.get("turn_off_expensive_profiling_metrics"):
if not values.get(table_level_profiling_only_key):
expensive_field_level_metrics: List[str] = [
"include_field_quantiles",
"include_field_distinct_value_frequencies",
"include_field_histogram",
"include_field_sample_values",
]
for expensive_field_metric in expensive_field_level_metrics:
values[expensive_field_metric] = False
if max_num_fields_to_profile is None:
# We currently profile upto 10 non-filtered columns in this mode by default.
values[max_num_fields_to_profile_key] = 10
return values
class DatahubConfigurableProfiler(DatasetProfiler):
"""
DatahubConfigurableProfiler is a wrapper on top of DatahubGECustomProfiler that essentially translates the
GEProfilingConfig into a proper GEProfiler's interface and delegates actual profiling to DatahubGECustomProfiler.
Column filtering based on our Allow/Deny patterns requires us to intercept the _profile call
and compute the list of the columns to profile.
"""
@staticmethod
def _get_excluded_expectations(config: GEProfilingConfig) -> List[str]:
# Compute excluded expectations
excluded_expectations: List[str] = []
if not config.include_field_null_count:
excluded_expectations.append("expect_column_values_to_not_be_null")
if not config.include_field_min_value:
excluded_expectations.append("expect_column_min_to_be_between")
if not config.include_field_max_value:
excluded_expectations.append("expect_column_max_to_be_between")
if not config.include_field_mean_value:
excluded_expectations.append("expect_column_mean_to_be_between")
if not config.include_field_median_value:
excluded_expectations.append("expect_column_median_to_be_between")
if not config.include_field_stddev_value:
excluded_expectations.append("expect_column_stdev_to_be_between")
if not config.include_field_quantiles:
excluded_expectations.append("expect_column_quantile_values_to_be_between")
if not config.include_field_distinct_value_frequencies:
excluded_expectations.append("expect_column_distinct_values_to_be_in_set")
if not config.include_field_histogram:
excluded_expectations.append("expect_column_kl_divergence_to_be_less_than")
if not config.include_field_sample_values:
excluded_expectations.append("expect_column_values_to_be_in_set")
return excluded_expectations
@staticmethod
def _get_columns_to_profile(
dataset: Dataset,
dataset_name: str,
config: GEProfilingConfig,
report: SQLSourceReport,
) -> List[str]:
if config.profile_table_level_only:
return []
# Compute columns to profile
columns_to_profile: List[str] = []
# Compute ignored columns
ignored_columns: List[str] = []
for col in dataset.get_table_columns():
# We expect the allow/deny patterns to specify '<table_pattern>.<column_pattern>'
if not config.allow_deny_patterns.allowed(f"{dataset_name}.{col}"):
ignored_columns.append(col)
else:
columns_to_profile.append(col)
if ignored_columns:
report.report_dropped(
f"The profile of columns by pattern {dataset_name}({', '.join(sorted(ignored_columns))})"
)
if config.max_number_of_fields_to_profile is not None:
columns_being_dropped: List[str] = list(
itertools.islice(
columns_to_profile, config.max_number_of_fields_to_profile, None
)
)
columns_to_profile = list(
itertools.islice(
columns_to_profile, config.max_number_of_fields_to_profile
)
)
if columns_being_dropped:
report.report_dropped(
f"The max_number_of_fields_to_profile={config.max_number_of_fields_to_profile} reached. Profile of columns {dataset_name}({', '.join(sorted(columns_being_dropped))})"
)
return columns_to_profile
@staticmethod
def datahub_config_to_ge_config(
dataset: Dataset,
dataset_name: str,
config: GEProfilingConfig,
report: SQLSourceReport,
) -> Dict[str, Any]:
excluded_expectations: List[
str
] = DatahubConfigurableProfiler._get_excluded_expectations(config)
columns_to_profile: List[
str
] = DatahubConfigurableProfiler._get_columns_to_profile(
dataset, dataset_name, config, report
)
return {
"excluded_expectations": excluded_expectations,
"columns_to_profile": columns_to_profile,
}
@classmethod
def _profile(
cls, dataset: Dataset, configuration: Dict[str, Any]
) -> ExpectationSuite:
"""
Override method, which returns the expectation suite using the UserConfigurable Profiler.
"""
profiler_configuration = cls.datahub_config_to_ge_config(
dataset,
configuration["dataset_name"],
configuration["config"],
configuration["report"],
)
return DatahubGECustomProfiler._profile(dataset, profiler_configuration)
@dataclasses.dataclass @dataclasses.dataclass
@ -88,14 +275,21 @@ class GEContext:
@dataclasses.dataclass @dataclasses.dataclass
class DatahubGEProfiler: class DatahubGEProfiler:
report: SourceReport report: SQLSourceReport
config: GEProfilingConfig
# The actual value doesn't matter, it just matters that we use it consistently throughout. # The actual value doesn't matter, it just matters that we use it consistently throughout.
_datasource_name_base: str = "my_sqlalchemy_datasource" _datasource_name_base: str = "my_sqlalchemy_datasource"
def __init__(self, conn: Union[Engine, Connection], report: SourceReport): def __init__(
self,
conn: Union[Engine, Connection],
report: SQLSourceReport,
config: GEProfilingConfig,
):
self.base_engine = conn self.base_engine = conn
self.report = report self.report = report
self.config = config
@contextlib.contextmanager @contextlib.contextmanager
def _ge_context(self) -> Iterator[GEContext]: def _ge_context(self) -> Iterator[GEContext]:
@ -140,7 +334,7 @@ class DatahubGEProfiler:
def generate_profiles( def generate_profiles(
self, requests: List[GEProfilerRequest], max_workers: int self, requests: List[GEProfilerRequest], max_workers: int
) -> Iterable[Tuple[GEProfilerRequest, DatasetProfileClass]]: ) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]:
start_time = time.perf_counter() start_time = time.perf_counter()
max_workers = min(max_workers, len(requests)) max_workers = min(max_workers, len(requests))
@ -168,11 +362,10 @@ class DatahubGEProfiler:
def generate_profile_from_request( def generate_profile_from_request(
self, request: GEProfilerRequest self, request: GEProfilerRequest
) -> Tuple[GEProfilerRequest, DatasetProfileClass]: ) -> Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]:
return request, self.generate_profile( return request, self.generate_profile(
request.pretty_name, request.pretty_name,
**request.batch_kwargs, **request.batch_kwargs,
send_sample_values=request.send_sample_values,
) )
def generate_profile( def generate_profile(
@ -180,11 +373,8 @@ class DatahubGEProfiler:
pretty_name: str, pretty_name: str,
schema: str = None, schema: str = None,
table: str = None, table: str = None,
limit: int = None,
offset: int = None,
send_sample_values: bool = True,
**kwargs: Any, **kwargs: Any,
) -> DatasetProfileClass: ) -> Optional[DatasetProfileClass]:
with self._ge_context() as ge_context: with self._ge_context() as ge_context:
logger.info(f"Profiling {pretty_name} (this may take a while)") logger.info(f"Profiling {pretty_name} (this may take a while)")
@ -193,15 +383,17 @@ class DatahubGEProfiler:
{ {
"schema": schema, "schema": schema,
"table": table, "table": table,
"limit": limit, "limit": self.config.limit,
"offset": offset, "offset": self.config.offset,
**kwargs, **kwargs,
}, },
pretty_name=pretty_name, pretty_name=pretty_name,
) )
profile = self._convert_evrs_to_profile( profile = (
evrs, pretty_name=pretty_name, send_sample_values=send_sample_values self._convert_evrs_to_profile(evrs, pretty_name=pretty_name)
if evrs is not None
else None
) )
logger.debug(f"Finished profiling {pretty_name}") logger.debug(f"Finished profiling {pretty_name}")
@ -213,21 +405,36 @@ class DatahubGEProfiler:
batch_kwargs: dict, batch_kwargs: dict,
pretty_name: str, pretty_name: str,
) -> ExpectationSuiteValidationResult: ) -> ExpectationSuiteValidationResult:
# Internally, this uses the GE dataset profiler: try:
# great_expectations.profile.basic_dataset_profiler.BasicDatasetProfiler start_time = time.perf_counter()
profile_results = ge_context.data_context.profile_data_asset(
ge_context.datasource_name,
profiler=DatahubConfigurableProfiler,
profiler_configuration={
"config": self.config,
"dataset_name": pretty_name,
"report": self.report,
},
batch_kwargs={
"datasource": ge_context.datasource_name,
**batch_kwargs,
},
)
end_time = time.perf_counter()
logger.info(
f"Profiling for {pretty_name} took {end_time - start_time} seconds."
)
profile_results = ge_context.data_context.profile_data_asset( assert profile_results["success"]
ge_context.datasource_name, assert len(profile_results["results"]) == 1
batch_kwargs={ _suite, evrs = profile_results["results"][0]
"datasource": ge_context.datasource_name, return evrs
**batch_kwargs, except Exception as e:
}, logger.warning(
) f"Encountered exception {e}\nwhile profiling {pretty_name}, {batch_kwargs}"
assert profile_results["success"] )
self.report.report_warning(pretty_name, "Exception {e}")
assert len(profile_results["results"]) == 1 return None
_suite, evrs = profile_results["results"][0]
return evrs
@staticmethod @staticmethod
def _get_column_from_evr(evr: ExpectationValidationResult) -> Optional[str]: def _get_column_from_evr(evr: ExpectationValidationResult) -> Optional[str]:
@ -242,7 +449,6 @@ class DatahubGEProfiler:
self, self,
evrs: ExpectationSuiteValidationResult, evrs: ExpectationSuiteValidationResult,
pretty_name: str, pretty_name: str,
send_sample_values: bool,
) -> DatasetProfileClass: ) -> DatasetProfileClass:
profile = DatasetProfileClass(timestampMillis=get_sys_time()) profile = DatasetProfileClass(timestampMillis=get_sys_time())
@ -259,7 +465,6 @@ class DatahubGEProfiler:
col, col,
evrs_for_col, evrs_for_col,
pretty_name=pretty_name, pretty_name=pretty_name,
send_sample_values=send_sample_values,
) )
return profile return profile
@ -291,7 +496,6 @@ class DatahubGEProfiler:
column: str, column: str,
col_evrs: Iterable[ExpectationValidationResult], col_evrs: Iterable[ExpectationValidationResult],
pretty_name: str, pretty_name: str,
send_sample_values: bool,
) -> None: ) -> None:
# TRICKY: This method mutates the profile directly. # TRICKY: This method mutates the profile directly.
@ -346,8 +550,6 @@ class DatahubGEProfiler:
column_profile.sampleValues = [ column_profile.sampleValues = [
str(v) for v in res["partial_unexpected_list"] str(v) for v in res["partial_unexpected_list"]
] ]
if not send_sample_values:
column_profile.sampleValues = []
elif exp == "expect_column_kl_divergence_to_be_less_than": elif exp == "expect_column_kl_divergence_to_be_less_than":
if "details" in res and "observed_partition" in res["details"]: if "details" in res and "observed_partition" in res["details"]:
partition = res["details"]["observed_partition"] partition = res["details"]["observed_partition"]
@ -368,8 +570,6 @@ class DatahubGEProfiler:
ValueFrequencyClass(value=str(value), frequency=count) ValueFrequencyClass(value=str(value), frequency=count)
for value, count in res["details"]["value_counts"].items() for value, count in res["details"]["value_counts"].items()
] ]
if not send_sample_values:
column_profile.distinctValueFrequencies = []
elif exp == "expect_column_values_to_be_in_type_list": elif exp == "expect_column_values_to_be_in_type_list":
# ignore; we already know the types for each column via ingestion # ignore; we already know the types for each column via ingestion
pass pass

View File

@ -273,8 +273,6 @@ class BigQuerySource(SQLAlchemySource):
return dict( return dict(
schema=self.config.project_id, schema=self.config.project_id,
table=f"{schema}.{table}", table=f"{schema}.{table}",
limit=self.config.profiling.limit,
offset=self.config.profiling.offset,
) )
@staticmethod @staticmethod

View File

@ -1,5 +1,4 @@
import logging import logging
import os
from abc import abstractmethod from abc import abstractmethod
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import ( from typing import (
@ -21,12 +20,7 @@ from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql import sqltypes as types from sqlalchemy.sql import sqltypes as types
from datahub import __package_name__ from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.common import (
AllowDenyPattern,
ConfigModel,
ConfigurationError,
)
from datahub.emitter.mce_builder import DEFAULT_ENV from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.common import PipelineContext
@ -144,19 +138,6 @@ class SQLSourceReport(SourceReport):
self.filtered.append(ent_name) self.filtered.append(ent_name)
class GEProfilingConfig(ConfigModel):
enabled: bool = False
limit: Optional[int] = None
offset: Optional[int] = None
send_sample_values: bool = True
# The default of (5 * cpu_count) is adopted from the default max_workers
# parameter of ThreadPoolExecutor. Given that profiling is often an I/O-bound
# task, it may make sense to increase this default value in the future.
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
max_workers: int = 5 * (os.cpu_count() or 4)
class SQLAlchemyConfig(ConfigModel): class SQLAlchemyConfig(ConfigModel):
env: str = DEFAULT_ENV env: str = DEFAULT_ENV
options: dict = {} options: dict = {}
@ -172,8 +153,19 @@ class SQLAlchemyConfig(ConfigModel):
include_views: Optional[bool] = True include_views: Optional[bool] = True
include_tables: Optional[bool] = True include_tables: Optional[bool] = True
from datahub.ingestion.source.ge_data_profiler import GEProfilingConfig
profiling: GEProfilingConfig = GEProfilingConfig() profiling: GEProfilingConfig = GEProfilingConfig()
@pydantic.root_validator()
def ensure_profiling_pattern_is_passed_to_profiling(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
profiling = values.get("profiling")
if profiling is not None and profiling.enabled:
profiling.allow_deny_patterns = values["profile_pattern"]
return values
@abstractmethod @abstractmethod
def get_sql_alchemy_url(self): def get_sql_alchemy_url(self):
pass pass
@ -316,12 +308,6 @@ class SQLAlchemySource(Source):
self.platform = platform self.platform = platform
self.report = SQLSourceReport() self.report = SQLSourceReport()
if self.config.profiling.enabled and not self._can_run_profiler():
raise ConfigurationError(
"Table profiles requested but profiler plugin is not enabled. "
f"Try running: pip install '{__package_name__}[sql-profiles]'"
)
def get_inspectors(self) -> Iterable[Inspector]: def get_inspectors(self) -> Iterable[Inspector]:
# This method can be overridden in the case that you want to dynamically # This method can be overridden in the case that you want to dynamically
# run on multiple databases. # run on multiple databases.
@ -608,20 +594,12 @@ class SQLAlchemySource(Source):
self.report.report_workunit(wu) self.report.report_workunit(wu)
yield wu yield wu
def _can_run_profiler(self) -> bool:
try:
from datahub.ingestion.source.ge_data_profiler import ( # noqa: F401
DatahubGEProfiler,
)
return True
except Exception:
return False
def _get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": def _get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler":
from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler
return DatahubGEProfiler(conn=inspector.bind, report=self.report) return DatahubGEProfiler(
conn=inspector.bind, report=self.report, config=self.config.profiling
)
def loop_profiler_requests( def loop_profiler_requests(
self, self,
@ -647,7 +625,6 @@ class SQLAlchemySource(Source):
yield GEProfilerRequest( yield GEProfilerRequest(
pretty_name=dataset_name, pretty_name=dataset_name,
batch_kwargs=self.prepare_profiler_args(schema=schema, table=table), batch_kwargs=self.prepare_profiler_args(schema=schema, table=table),
send_sample_values=self.config.profiling.send_sample_values,
) )
def loop_profiler( def loop_profiler(
@ -656,6 +633,8 @@ class SQLAlchemySource(Source):
for request, profile in profiler.generate_profiles( for request, profile in profiler.generate_profiles(
profile_requests, self.config.profiling.max_workers profile_requests, self.config.profiling.max_workers
): ):
if profile is None:
continue
dataset_name = request.pretty_name dataset_name = request.pretty_name
mcp = MetadataChangeProposalWrapper( mcp = MetadataChangeProposalWrapper(
entityType="dataset", entityType="dataset",
@ -672,8 +651,6 @@ class SQLAlchemySource(Source):
return dict( return dict(
schema=schema, schema=schema,
table=table, table=table,
limit=self.config.profiling.limit,
offset=self.config.profiling.offset,
) )
def get_report(self): def get_report(self):