From 328658ebeaa36e5f9de5d4d562a7adaf020bd25a Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Sat, 28 Aug 2021 13:09:07 -0700 Subject: [PATCH] [WIP] profiler --- ingestion/examples/workflows/redshift.json | 6 +- ingestion/pipelines/mysql.json | 3 +- ingestion/requirements.txt | 3 +- ingestion/setup.py | 1 + .../ingestion/models/table_metadata.py | 47 +++ .../metadata/ingestion/source/sql_source.py | 39 ++- ingestion/src/metadata/utils/dataprofiler.py | 270 ++++++++++++++++++ 7 files changed, 360 insertions(+), 9 deletions(-) create mode 100644 ingestion/src/metadata/utils/dataprofiler.py diff --git a/ingestion/examples/workflows/redshift.json b/ingestion/examples/workflows/redshift.json index fd22db30e30..d71b831d36a 100644 --- a/ingestion/examples/workflows/redshift.json +++ b/ingestion/examples/workflows/redshift.json @@ -2,9 +2,9 @@ "source": { "type": "redshift", "config": { - "host_port": "cluster.name.region.redshift.amazonaws.com:5439", - "username": "username", - "password": "strong_password", + "host_port": "redshift-cluster-1.clot5cqn1cnb.us-west-2.redshift.amazonaws.com:5439", + "username": "awsuser", + "password": "focguC-kaqqe5-nepsok", "database": "warehouse", "service_name": "aws_redshift", "filter_pattern": { diff --git a/ingestion/pipelines/mysql.json b/ingestion/pipelines/mysql.json index 2b084c4cceb..0f1c54fa026 100644 --- a/ingestion/pipelines/mysql.json +++ b/ingestion/pipelines/mysql.json @@ -4,9 +4,10 @@ "config": { "username": "openmetadata_user", "password": "openmetadata_password", + "database": "openmetadata_db", "service_name": "local_mysql", "filter_pattern": { - "excludes": ["mysql.*", "information_schema.*"] + "excludes": ["mysql.*", "information_schema.*", "performance_schema.*", "sys.*"] } } }, diff --git a/ingestion/requirements.txt b/ingestion/requirements.txt index 07ad3a89aad..7cff288849a 100644 --- a/ingestion/requirements.txt +++ b/ingestion/requirements.txt @@ -16,4 +16,5 @@ confluent_kafka>=1.5.0 fastavro>=1.2.0 google~=3.0.0 okta~=2.0.0 -PyMySQL~=1.0.2 \ No newline at end of file +PyMySQL~=1.0.2 +great-expectations>=0.13.31 \ No newline at end of file diff --git a/ingestion/setup.py b/ingestion/setup.py index a30f0e3b667..0111d15a67a 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -64,6 +64,7 @@ base_requirements = { "sql-metadata~=2.0.0", "spacy==3.0.5", "requests~=2.25.1", + "great-expectations>=0.13.31", "en_core_web_sm@https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0.tar.gz#egg=en_core_web" } base_plugins = { diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index 6605471de76..eb01aa14842 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -258,3 +258,50 @@ class Dashboard(BaseModel): charts: List[str] service: EntityReference lastModified: int = None + + +class ValueFrequency(BaseModel): + """Profiler ValueFrequency""" + value: str + frequency: int + + +class Histogram(BaseModel): + """Histogram""" + boundaries: List[str] + heights: List[str] + + +class Quantile(BaseModel): + """Quantile""" + quantile: str + value: str + + +class DatasetColumnProfile(BaseModel): + """Dataset Column Profile stats """ + fqdn: str + unique_count: int = None + unique_proportion: int = None + null_count: int = None + null_proportion: int = None + min: str = None + max: str = None + mean: str = None + median: str = None + stddev: str = None + quantiles: List[Quantile] = None + distinct_value_frequencies: List[ValueFrequency] = None + histogram: List[Histogram] = None + sample_values: List[str] = None + + +class DatasetProfile(BaseModel): + """Dataset(table) stats""" + timestamp: int + table_name: str + row_count: int = None + col_count: int = None + col_profiles: List[DatasetColumnProfile] = None + + diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index b0f680a1f9b..d491dd36b50 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -29,7 +29,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint, TableType, TableData -from sqlalchemy import create_engine, inspect +from sqlalchemy import create_engine from sqlalchemy.engine.reflection import Inspector from sqlalchemy.sql import sqltypes as types from sqlalchemy.inspection import inspect @@ -37,8 +37,10 @@ from sqlalchemy.inspection import inspect from metadata.ingestion.api.common import IncludeFilterPattern, ConfigModel, Record from metadata.ingestion.api.common import WorkflowContext from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.models.table_metadata import DatasetProfile from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.utils.helpers import get_database_service_or_create +from metadata.utils.dataprofiler import DataProfiler logger: logging.Logger = logging.getLogger(__name__) @@ -72,6 +74,7 @@ class SQLConnectionConfig(ConfigModel): include_views: Optional[bool] = True include_tables: Optional[bool] = True generate_sample_data: Optional[bool] = True + data_profiler_enabled: Optional[bool] = True filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() @abstractmethod @@ -152,8 +155,10 @@ def get_column_type(status: SQLSourceStatus, dataset_name: str, column_type: Any return type_class -class SQLSource(Source): + + +class SQLSource(Source): def __init__(self, config: SQLConnectionConfig, metadata_config: MetadataServerConfig, ctx: WorkflowContext): super().__init__(ctx) @@ -172,6 +177,9 @@ class SQLSource(Source): def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): pass + def _get_profiler_instance(self, inspector: Inspector) -> DataProfiler: + return DataProfiler(conn=inspector.bind, status=self.status) + def standardize_schema_table_names( self, schema: str, table: str ) -> Tuple[str, str]: @@ -211,7 +219,7 @@ class SQLSource(Source): schema, table_name = self.standardize_schema_table_names(schema, table_name) if not self.sql_config.filter_pattern.included(table_name): self.status.filter('{}.{}'.format(self.config.get_service_name(), table_name), - "Table pattern not allowed") + "Table pattern not allowed") continue self.status.scanned('{}.{}'.format(self.config.get_service_name(), table_name)) @@ -227,6 +235,11 @@ class SQLSource(Source): table_data = self.fetch_sample_data(schema, table_name) table_entity.sampleData = table_data + if self.config.data_profiler_enabled: + data_profiler = self._get_profiler_instance(inspector) + profile = self.run_data_profiler(data_profiler, table_name, schema) + logger.info(profile.json()) + table_and_db = OMetaDatabaseAndTable(table=table_entity, database=self._get_database(schema)) yield table_and_db except ValidationError as err: @@ -241,7 +254,7 @@ class SQLSource(Source): try: if not self.sql_config.filter_pattern.included(view_name): self.status.filter('{}.{}'.format(self.config.get_service_name(), view_name), - "View pattern not allowed") + "View pattern not allowed") continue try: view_definition = inspector.get_view_definition(view_name, schema) @@ -324,6 +337,24 @@ class SQLSource(Source): description = table_info["text"] return description + def run_data_profiler( + self, + profiler: DataProfiler, + table: str, + schema: str + ) -> DatasetProfile: + dataset_name = f"{schema}.{table}" + self.status.scanned(f"profile of {dataset_name}") + logger.info(f"Profiling {dataset_name} (this may take a while)") + profile = profiler.generate_profile( + pretty_name=dataset_name, + schema=schema, + table=table, + limit=50000, + offset=0) + logger.debug(f"Finished profiling {dataset_name}") + return profile + def close(self): if self.connection is not None: self.connection.close() diff --git a/ingestion/src/metadata/utils/dataprofiler.py b/ingestion/src/metadata/utils/dataprofiler.py new file mode 100644 index 00000000000..359598bd22c --- /dev/null +++ b/ingestion/src/metadata/utils/dataprofiler.py @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import collections +import contextlib +import dataclasses +import logging +import unittest.mock +import time +from typing import Any, Iterable, Optional, Callable, Tuple, TypeVar + +from great_expectations.core.expectation_validation_result import ( + ExpectationSuiteValidationResult, + ExpectationValidationResult, +) +from great_expectations.data_context import BaseDataContext +from great_expectations.data_context.types.base import ( + DataContextConfig, + DatasourceConfig, + InMemoryStoreBackendDefaults, +) +from great_expectations.datasource.sqlalchemy_datasource import SqlAlchemyDatasource + +from metadata.ingestion.api.source import SourceStatus +from metadata.ingestion.models.table_metadata import DatasetProfile, DatasetColumnProfile, Quantile, \ + Histogram, ValueFrequency + +logger: logging.Logger = logging.getLogger(__name__) + +T = TypeVar("T") +K = TypeVar("K") + + +def groupby_unsorted( + iterable: Iterable[T], key: Callable[[T], K] +) -> Iterable[Tuple[K, Iterable[T]]]: + """The default itertools.groupby() requires that the iterable is already sorted by the key. + This method is similar to groupby() but without the pre-sorted requirement.""" + + values = collections.defaultdict(list) + for v in iterable: + values[key(v)].append(v) + return values.items() + + +@contextlib.contextmanager +def _properly_init_datasource(conn): + underlying_datasource_init = SqlAlchemyDatasource.__init__ + + def sqlalchemy_datasource_init( + self: SqlAlchemyDatasource, *args: Any, **kwargs: Any + ) -> None: + underlying_datasource_init(self, *args, **kwargs, engine=conn) + self.drivername = conn.dialect.name + del self._datasource_config["engine"] + + with unittest.mock.patch( + "great_expectations.datasource.sqlalchemy_datasource.SqlAlchemyDatasource.__init__", + sqlalchemy_datasource_init, + ), unittest.mock.patch( + "great_expectations.data_context.store.validations_store.ValidationsStore.set" + ): + yield + + +@dataclasses.dataclass +class DataProfiler: + data_context: BaseDataContext + status: SourceStatus + datasource_name: str = "om_sqlalchemy_datasource" + + def __init__(self, conn, status): + self.conn = conn + self.status = status + + data_context_config = DataContextConfig( + datasources={ + self.datasource_name: DatasourceConfig( + class_name="SqlAlchemyDatasource", + credentials={ + "url": self.conn.engine.url, + }, + ) + }, + store_backend_defaults=InMemoryStoreBackendDefaults(), + anonymous_usage_statistics={ + "enabled": False, + }, + ) + + with _properly_init_datasource(self.conn): + self.data_context = BaseDataContext(project_config=data_context_config) + + def generate_profile( + self, + pretty_name: str, + schema: str = None, + table: str = None, + limit: int = None, + offset: int = None, + **kwargs: Any, + ) -> DatasetProfile: + with _properly_init_datasource(self.conn): + evrs = self._profile_data_asset( + { + "schema": schema, + "table": table, + "limit": limit, + "offset": offset, + **kwargs, + }, + pretty_name=pretty_name, + ) + profile = self._convert_evrs_to_profile(evrs, pretty_name=pretty_name) + return profile + + def _profile_data_asset( + self, + batch_kwargs: dict, + pretty_name: str, + ) -> ExpectationSuiteValidationResult: + # Internally, this uses the GE dataset profiler: + # great_expectations.profile.basic_dataset_profiler.BasicDatasetProfiler + + profile_results = self.data_context.profile_data_asset( + self.datasource_name, + batch_kwargs={ + "datasource": self.datasource_name, + **batch_kwargs, + }, + ) + assert profile_results["success"] + + assert len(profile_results["results"]) == 1 + _suite, evrs = profile_results["results"][0] + return evrs + + @staticmethod + def _get_column_from_evr(evr: ExpectationValidationResult) -> Optional[str]: + return evr.expectation_config.kwargs.get("column") + + def _convert_evrs_to_profile( + self, evrs: ExpectationSuiteValidationResult, pretty_name: str + ) -> DatasetProfile: + profile = None + column_profiles = [] + for col, evrs_for_col in groupby_unsorted( + evrs.results, key=self._get_column_from_evr + ): + if col is None: + profile = self._handle_convert_table_evrs(evrs_for_col, pretty_name=pretty_name) + else: + column_profile = self._handle_convert_column_evrs(col, evrs_for_col, pretty_name=pretty_name) + column_profiles.append(column_profile) + + if profile is not None: + profile.col_profiles = column_profiles + return profile + + def _handle_convert_table_evrs( + self, + table_evrs: Iterable[ExpectationValidationResult], + pretty_name: str, + ) -> DatasetProfile: + logger.info("generating table stats") + profile = DatasetProfile(timestamp=round(time.time() * 1000), table_name=pretty_name) + for evr in table_evrs: + exp: str = evr.expectation_config.expectation_type + res: dict = evr.result + if exp == "expect_table_row_count_to_be_between": + profile.row_count = res["observed_value"] + elif exp == "expect_table_columns_to_match_ordered_list": + profile.col_count = len(res["observed_value"]) + else: + self.status.warning( + f"profile of {pretty_name}", f"unknown table mapper {exp}" + ) + return profile + + def _handle_convert_column_evrs( + self, + column: str, + col_evrs: Iterable[ExpectationValidationResult], + pretty_name: str, + ) -> DatasetColumnProfile: + logger.info(f"Generating Column Stats for {column}") + column_profile = DatasetColumnProfile(fqdn=column) + for evr in col_evrs: + exp: str = evr.expectation_config.expectation_type + res: dict = evr.result + if not res: + self.status.warning( + f"profile of {pretty_name}", f"{exp} did not yield any results" + ) + continue + + if exp == "expect_column_unique_value_count_to_be_between": + column_profile.unique_count = res["observed_value"] + elif exp == "expect_column_proportion_of_unique_values_to_be_between": + column_profile.unique_proportion = res["observed_value"] + elif exp == "expect_column_values_to_not_be_null": + column_profile.null_count = res["unexpected_count"] + if ( + "unexpected_percent" in res + and res["unexpected_percent"] is not None + ): + column_profile.null_proportion = res["unexpected_percent"] / 100 + elif exp == "expect_column_values_to_not_match_regex": + pass + elif exp == "expect_column_mean_to_be_between": + column_profile.mean = str(res["observed_value"]) + elif exp == "expect_column_min_to_be_between": + column_profile.min = str(res["observed_value"]) + elif exp == "expect_column_max_to_be_between": + column_profile.max = str(res["observed_value"]) + elif exp == "expect_column_median_to_be_between": + column_profile.median = str(res["observed_value"]) + elif exp == "expect_column_stdev_to_be_between": + column_profile.stddev = str(res["observed_value"]) + elif exp == "expect_column_quantile_values_to_be_between": + if "observed_value" in res: + column_profile.quantiles = [ + Quantile(quantile=str(quantile), value=str(value)) + for quantile, value in zip( + res["observed_value"]["quantiles"], + res["observed_value"]["values"], + ) + ] + elif exp == "expect_column_values_to_be_in_set": + column_profile.sample_values = [ + str(v) for v in res["partial_unexpected_list"] + ] + elif exp == "expect_column_kl_divergence_to_be_less_than": + if "details" in res and "observed_partition" in res["details"]: + partition = res["details"]["observed_partition"] + column_profile.histogram = Histogram( + [str(v) for v in partition["bins"]], + [ + partition["tail_weights"][0], + *partition["weights"], + partition["tail_weights"][1], + ], + ) + elif exp == "expect_column_distinct_values_to_be_in_set": + if "details" in res and "value_counts" in res["details"]: + column_profile.distinct_value_frequencies = [ + ValueFrequency(value=str(value), frequency=count) + for value, count in res["details"]["value_counts"].items() + ] + elif exp == "expect_column_values_to_be_in_type_list": + pass + elif exp == "expect_column_values_to_be_unique": + pass + else: + self.status.warning( + f"profile of {pretty_name}", + f"warning: unknown column mapper {exp} in col {column}", + ) + return column_profile