From b3087d08b93e5a7d8ee68d43dc255d7097db2998 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 30 Mar 2022 08:54:27 +0200 Subject: [PATCH] Fix #3522 - Add timeout to profiler (#3707) Fix #3522 - Add timeout to profiler (#3707) --- .../metadata/ingestion/source/sql_source.py | 2 +- .../src/metadata/orm_profiler/api/models.py | 2 +- .../orm_profiler/processor/orm_profiler.py | 12 +- .../{profiles => profiler}/README.md | 0 .../{profiles => profiler}/__init__.py | 0 .../{profiles => profiler}/core.py | 96 ++++------- .../{profiles => profiler}/default.py | 55 ++++--- .../{profiles => profiler}/models.py | 11 +- .../metadata/orm_profiler/profiler/runner.py | 71 ++++++++ .../metadata/orm_profiler/profiler/sampler.py | 62 +++++++ .../orm_profiler/validations/utils.py | 2 +- ingestion/src/metadata/utils/constants.py | 1 + ingestion/src/metadata/utils/timeout.py | 73 +++++++++ .../orm_profiler/test_orm_profiler.py | 1 + ingestion/tests/unit/profiler/test_metrics.py | 2 +- .../tests/unit/profiler/test_profiler.py | 4 +- ingestion/tests/unit/profiler/test_runner.py | 154 ++++++++++++++++++ ingestion/tests/unit/profiler/test_sample.py | 13 +- .../tests/unit/profiler/test_workflow.py | 4 +- 19 files changed, 466 insertions(+), 99 deletions(-) rename ingestion/src/metadata/orm_profiler/{profiles => profiler}/README.md (100%) rename ingestion/src/metadata/orm_profiler/{profiles => profiler}/__init__.py (100%) rename ingestion/src/metadata/orm_profiler/{profiles => profiler}/core.py (85%) rename ingestion/src/metadata/orm_profiler/{profiles => profiler}/default.py (58%) rename ingestion/src/metadata/orm_profiler/{profiles => profiler}/models.py (82%) create mode 100644 ingestion/src/metadata/orm_profiler/profiler/runner.py create mode 100644 ingestion/src/metadata/orm_profiler/profiler/sampler.py create mode 100644 ingestion/src/metadata/utils/timeout.py create mode 100644 ingestion/tests/unit/profiler/test_runner.py diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 2feff7988a9..449527ea1d6 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -53,7 +53,7 @@ from metadata.ingestion.source.sql_source_common import ( SQLSourceStatus, ) from metadata.orm_profiler.orm.converter import ometa_to_orm -from metadata.orm_profiler.profiles.default import DefaultProfiler +from metadata.orm_profiler.profiler.default import DefaultProfiler from metadata.utils.column_type_parser import ColumnTypeParser from metadata.utils.engines import create_and_bind_session, get_engine from metadata.utils.helpers import get_database_service_or_create, ingest_lineage diff --git a/ingestion/src/metadata/orm_profiler/api/models.py b/ingestion/src/metadata/orm_profiler/api/models.py index 045156979fe..e36cf9a5159 100644 --- a/ingestion/src/metadata/orm_profiler/api/models.py +++ b/ingestion/src/metadata/orm_profiler/api/models.py @@ -19,7 +19,7 @@ from typing import Optional from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.table import Table, TableProfile -from metadata.orm_profiler.profiles.models import ProfilerDef +from metadata.orm_profiler.profiler.models import ProfilerDef from metadata.orm_profiler.validations.models import TestDef, TestSuite diff --git a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py index 42ee5ed4c49..7d6522ff74b 100644 --- a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py +++ b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py @@ -36,8 +36,8 @@ from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse from metadata.orm_profiler.metrics.registry import Metrics from metadata.orm_profiler.orm.converter import ometa_to_orm -from metadata.orm_profiler.profiles.core import Profiler -from metadata.orm_profiler.profiles.default import DefaultProfiler +from metadata.orm_profiler.profiler.core import Profiler +from metadata.orm_profiler.profiler.default import DefaultProfiler, get_default_metrics from metadata.orm_profiler.validations.core import validate from metadata.orm_profiler.validations.models import TestDef @@ -147,7 +147,12 @@ class OrmProfilerProcessor(Processor[Table]): ) # Here we will need to add the logic to pass kwargs to the metrics - metrics = [Metrics.get(name) for name in self.config.profiler.metrics] + # TODO: add_props when needed for incoming metrics + metrics = ( + [Metrics.get(name) for name in self.config.profiler.metrics] + if self.config.profiler.metrics + else get_default_metrics(orm) + ) return Profiler( *metrics, @@ -155,6 +160,7 @@ class OrmProfilerProcessor(Processor[Table]): table=orm, profile_date=self.execution_date, profile_sample=profile_sample, + timeout_seconds=self.config.profiler.timeout_seconds, ) def profile_entity(self, orm: DeclarativeMeta, table: Table) -> TableProfile: diff --git a/ingestion/src/metadata/orm_profiler/profiles/README.md b/ingestion/src/metadata/orm_profiler/profiler/README.md similarity index 100% rename from ingestion/src/metadata/orm_profiler/profiles/README.md rename to ingestion/src/metadata/orm_profiler/profiler/README.md diff --git a/ingestion/src/metadata/orm_profiler/profiles/__init__.py b/ingestion/src/metadata/orm_profiler/profiler/__init__.py similarity index 100% rename from ingestion/src/metadata/orm_profiler/profiles/__init__.py rename to ingestion/src/metadata/orm_profiler/profiler/__init__.py diff --git a/ingestion/src/metadata/orm_profiler/profiles/core.py b/ingestion/src/metadata/orm_profiler/profiler/core.py similarity index 85% rename from ingestion/src/metadata/orm_profiler/profiles/core.py rename to ingestion/src/metadata/orm_profiler/profiler/core.py index ea7f9ab1d3a..31d61b6d234 100644 --- a/ingestion/src/metadata/orm_profiler/profiles/core.py +++ b/ingestion/src/metadata/orm_profiler/profiler/core.py @@ -12,6 +12,7 @@ """ Main Profile definition and queries to execute """ +import traceback from datetime import datetime from typing import Any, Dict, Generic, List, Optional, Tuple, Type, Union @@ -30,9 +31,12 @@ from metadata.orm_profiler.metrics.core import ( StaticMetric, ) from metadata.orm_profiler.metrics.static.row_count import RowCount -from metadata.orm_profiler.orm.functions.random_num import RandomNumFn from metadata.orm_profiler.orm.registry import NOT_COMPUTE +from metadata.orm_profiler.profiler.runner import QueryRunner +from metadata.orm_profiler.profiler.sampler import Sampler from metadata.orm_profiler.utils import logger +from metadata.utils.constants import TEN_MIN +from metadata.utils.timeout import cls_timeout logger = logger() @@ -63,6 +67,7 @@ class Profiler(Generic[MetricType]): ignore_cols: Optional[List[str]] = None, use_cols: Optional[List[Column]] = None, profile_sample: Optional[float] = None, + timeout_seconds: Optional[int] = TEN_MIN, ): """ :param metrics: Metrics to run. We are receiving the uninitialized classes @@ -81,9 +86,10 @@ class Profiler(Generic[MetricType]): self._ignore_cols = ignore_cols self._use_cols = use_cols self._profile_sample = profile_sample - self._profile_date = profile_date + self.validate_composed_metric() + # Initialize profiler results self._table_results: Dict[str, Any] = {} self._column_results: Dict[str, Any] = {} @@ -92,9 +98,15 @@ class Profiler(Generic[MetricType]): self._columns: Optional[List[Column]] = None # We will compute the sample from the property + self._sampler = Sampler( + session=session, table=table, profile_sample=profile_sample + ) self._sample: Optional[Union[DeclarativeMeta, AliasedClass]] = None - self.validate_composed_metric() + # Prepare a timeout controlled query runner + self.runner: QueryRunner = cls_timeout(timeout_seconds)( + QueryRunner(session=session, table=table, sample=self.sample) + ) @property def session(self) -> Session: @@ -123,10 +135,6 @@ class Profiler(Generic[MetricType]): """ return self._use_cols - @property - def profile_sample(self) -> Optional[float]: - return self._profile_sample - @property def profile_date(self) -> datetime: return self._profile_date @@ -181,31 +189,8 @@ class Profiler(Generic[MetricType]): @property def sample(self): - """ - Either return a sampled CTE of table, or - the full table if no sampling is required. - """ if not self._sample: - - if not self.profile_sample: - # Use the full table - self._sample = self.table - - else: - # Add new RandomNumFn column - rnd = self.session.query( - self.table, (RandomNumFn() % 100).label("random") - ).cte(f"{self.table.__tablename__}_rnd") - - # Prepare sampled CTE - sampled = ( - self.session.query(rnd) - .where(rnd.c.random <= self.profile_sample) - .cte(f"{self.table.__tablename__}_sample") - ) - - # Assign as an alias - self._sample = aliased(self.table, sampled) + self._sample = self._sampler.random_sample() return self._sample @@ -222,7 +207,7 @@ class Profiler(Generic[MetricType]): f"We need {metric.required_metrics()} for {metric.name}, but only got {names} in the profiler" ) - def sql_col_run(self, col: Column): + def run_static_metrics(self, col: Column): """ Run the profiler and store its results @@ -238,19 +223,17 @@ class Profiler(Generic[MetricType]): return try: - query = self.session.query( + row = self.runner.select_first_from_sample( *[metric(col).fn() for metric in col_metrics] - ).select_from(self.sample) - - row = query.first() + ) self._column_results[col.name].update(dict(row)) - except Exception as err: + except (TimeoutError, Exception) as err: logger.warning( f"Error trying to compute column profile for {col.name} - {err}" ) self.session.rollback() - def sql_table_run(self): + def run_table_metrics(self): """ Run Table Static metrics @@ -266,15 +249,13 @@ class Profiler(Generic[MetricType]): if not table_metrics: return - query = self.session.query( + row = self.runner.select_first_from_table( *[metric().fn() for metric in table_metrics] - ).select_from(self.table) - - row = query.first() + ) if row: self._table_results.update(dict(row)) - def sql_col_query_run(self, col: Column) -> None: + def run_query_metrics(self, col: Column) -> None: """ Run QueryMetrics """ @@ -292,7 +273,7 @@ class Profiler(Generic[MetricType]): if not metric_query: continue if col_metric.metric_type == dict: - query_res = metric_query.all() + query_res = self.runner.select_all_from_query(metric_query) # query_res has the shape of List[Row], where each row is a dict, # e.g., [{colA: 1, colB: 2},...] # We are going to transform this into a Dict[List] by pivoting, so that @@ -303,16 +284,18 @@ class Profiler(Generic[MetricType]): self._column_results[col.name].update({metric.name(): data}) else: - row = metric_query.first() + row = self.runner.select_first_from_query(metric_query) self._column_results[col.name].update(dict(row)) - except Exception as err: # pylint: disable=broad-except + except (TimeoutError, Exception) as err: # pylint: disable=broad-except + print(err) + print(traceback.format_exc()) logger.error( - f"Exception encountered computing {metric.name()} for {self.table.__tablename__}.{col.name} - {err}" + f"Error computing query metric {metric.name()} for {self.table.__tablename__}.{col.name} - {err}" ) self.session.rollback() - def post_col_run(self, col: Column): + def run_composed_metrics(self, col: Column): """ Run this after the metrics have been computed @@ -336,15 +319,6 @@ class Profiler(Generic[MetricType]): current_col_results ) - def execute_table(self) -> None: - """ - Run table metrics - - So far we only support Static Metrics - for Table Metrics - """ - self.sql_table_run() - def execute_column(self, col: Column) -> None: """ Run the profiler on all the columns that @@ -353,9 +327,9 @@ class Profiler(Generic[MetricType]): We can assume from this point onwards that columns are of allowed types """ - self.sql_col_run(col) - self.sql_col_query_run(col) - self.post_col_run(col) + self.run_static_metrics(col) + self.run_query_metrics(col) + self.run_composed_metrics(col) def execute(self) -> "Profiler": """ @@ -364,7 +338,7 @@ class Profiler(Generic[MetricType]): logger.debug(f"Running profiler for {self.table.__tablename__}") - self.execute_table() + self.run_table_metrics() for col in self.columns: logger.debug( diff --git a/ingestion/src/metadata/orm_profiler/profiles/default.py b/ingestion/src/metadata/orm_profiler/profiler/default.py similarity index 58% rename from ingestion/src/metadata/orm_profiler/profiles/default.py rename to ingestion/src/metadata/orm_profiler/profiler/default.py index 8316d3a5dce..877d6c70e6c 100644 --- a/ingestion/src/metadata/orm_profiler/profiles/default.py +++ b/ingestion/src/metadata/orm_profiler/profiler/default.py @@ -18,9 +18,34 @@ from typing import List, Optional from sqlalchemy.orm import DeclarativeMeta from sqlalchemy.orm.session import Session -from metadata.orm_profiler.metrics.core import add_props +from metadata.orm_profiler.metrics.core import Metric, add_props from metadata.orm_profiler.metrics.registry import Metrics -from metadata.orm_profiler.profiles.core import Profiler +from metadata.orm_profiler.profiler.core import Profiler +from metadata.utils.constants import TEN_MIN + + +def get_default_metrics(table: DeclarativeMeta) -> List[Metric]: + return [ + # Table Metrics + Metrics.ROW_COUNT.value, + add_props(table=table)(Metrics.COLUMN_COUNT.value), + # Column Metrics + Metrics.MEAN.value, + Metrics.COUNT.value, + Metrics.DISTINCT_COUNT.value, + Metrics.DISTINCT_RATIO.value, + Metrics.MIN.value, + Metrics.MIN_LENGTH.value, + Metrics.MAX.value, + Metrics.MAX_LENGTH.value, + Metrics.NULL_COUNT.value, + Metrics.NULL_RATIO.value, + Metrics.STDDEV.value, + Metrics.SUM.value, + Metrics.UNIQUE_COUNT.value, + Metrics.UNIQUE_RATIO.value, + Metrics.HISTOGRAM.value, + ] class DefaultProfiler(Profiler): @@ -37,28 +62,11 @@ class DefaultProfiler(Profiler): ignore_cols: Optional[List[str]] = None, profile_date: datetime = datetime.now(), profile_sample: Optional[float] = None, + timeout_seconds: Optional[int] = TEN_MIN, ): - _metrics = [ - # Table Metrics - Metrics.ROW_COUNT.value, - add_props(table=table)(Metrics.COLUMN_COUNT.value), - # Column Metrics - Metrics.MEAN.value, - Metrics.COUNT.value, - Metrics.DISTINCT_COUNT.value, - Metrics.DISTINCT_RATIO.value, - Metrics.MIN.value, - Metrics.MIN_LENGTH.value, - Metrics.MAX.value, - Metrics.MAX_LENGTH.value, - Metrics.NULL_COUNT.value, - Metrics.NULL_RATIO.value, - Metrics.STDDEV.value, - Metrics.SUM.value, - Metrics.UNIQUE_COUNT.value, - Metrics.UNIQUE_RATIO.value, - Metrics.HISTOGRAM.value, - ] + + _metrics = get_default_metrics(table) + super().__init__( *_metrics, session=session, @@ -66,4 +74,5 @@ class DefaultProfiler(Profiler): ignore_cols=ignore_cols, profile_date=profile_date, profile_sample=profile_sample, + timeout_seconds=timeout_seconds, ) diff --git a/ingestion/src/metadata/orm_profiler/profiles/models.py b/ingestion/src/metadata/orm_profiler/profiler/models.py similarity index 82% rename from ingestion/src/metadata/orm_profiler/profiles/models.py rename to ingestion/src/metadata/orm_profiler/profiler/models.py index f48307a4e5d..028374ad8e9 100644 --- a/ingestion/src/metadata/orm_profiler/profiles/models.py +++ b/ingestion/src/metadata/orm_profiler/profiler/models.py @@ -10,10 +10,10 @@ # limitations under the License. """ -Models to map profiles definitions +Models to map profiler definitions JSON workflows to the profiler """ -from typing import List +from typing import List, Optional from pydantic import BaseModel, validator @@ -27,7 +27,12 @@ class ProfilerDef(BaseModel): """ name: str # Profiler name - metrics: List[str] # names of currently supported Static and Composed metrics + timeout_seconds: Optional[ + int + ] = None # Stop running a query after X seconds and continue + metrics: Optional[ + List[str] + ] = None # names of currently supported Static and Composed metrics # TBD: # time_metrics: List[TimeMetricDef] = None # custom_metrics: List[CustomMetricDef] = None diff --git a/ingestion/src/metadata/orm_profiler/profiler/runner.py b/ingestion/src/metadata/orm_profiler/profiler/runner.py new file mode 100644 index 00000000000..ab72a7cb7b8 --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/profiler/runner.py @@ -0,0 +1,71 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module in charge of running the queries against +the session. + +This is useful to centralise the running logic +and manage behavior such as timeouts. +""" +from typing import Union + +from sqlalchemy.orm import DeclarativeMeta, Query, Session +from sqlalchemy.orm.util import AliasedClass + + +class QueryRunner: + """ + Handles the query runs and returns the results + to the caller. + + The goal of this class is abstract a bit + how to get the query results. Moreover, + we can then wrap it up with a timeout + to make sure that methods executed from this class + won't take more than X seconds to execute. + """ + + def __init__( + self, + session: Session, + table: DeclarativeMeta, + sample: Union[DeclarativeMeta, AliasedClass], + ): + self._session = session + self._table = table + self._sample = sample + + def _build_query(self, *entities, **kwargs) -> Query: + return self._session.query(*entities, **kwargs) + + def _select_from_sample(self, *entities, **kwargs): + return self._build_query(*entities, **kwargs).select_from(self._sample) + + def select_first_from_table(self, *entities, **kwargs): + return self._build_query(*entities, **kwargs).select_from(self._table).first() + + def select_all_from_table(self, *entities, **kwargs): + return self._build_query(*entities, **kwargs).select_from(self._table).all() + + def select_first_from_sample(self, *entities, **kwargs): + return self._select_from_sample(*entities, **kwargs).first() + + def select_all_from_sample(self, *entities, **kwargs): + return self._select_from_sample(*entities, **kwargs).all() + + @staticmethod + def select_first_from_query(query: Query): + return query.first() + + @staticmethod + def select_all_from_query(query: Query): + return query.all() diff --git a/ingestion/src/metadata/orm_profiler/profiler/sampler.py b/ingestion/src/metadata/orm_profiler/profiler/sampler.py new file mode 100644 index 00000000000..5427e722b6e --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/profiler/sampler.py @@ -0,0 +1,62 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Helper module to handle data sampling +for the profiler +""" +from typing import Optional, Union + +from sqlalchemy.orm import DeclarativeMeta, Session, aliased +from sqlalchemy.orm.util import AliasedClass + +from metadata.orm_profiler.orm.functions.random_num import RandomNumFn + + +class Sampler: + """ + Generates a sample of the data to not + run the query in the whole table. + """ + + def __init__( + self, + session: Session, + table: DeclarativeMeta, + profile_sample: Optional[float] = None, + ): + self.profile_sample = profile_sample + self.session = session + self.table = table + + def random_sample(self) -> Union[DeclarativeMeta, AliasedClass]: + """ + Either return a sampled CTE of table, or + the full table if no sampling is required. + """ + + if not self.profile_sample: + # Use the full table + return self.table + + # Add new RandomNumFn column + rnd = self.session.query(self.table, (RandomNumFn() % 100).label("random")).cte( + f"{self.table.__tablename__}_rnd" + ) + + # Prepare sampled CTE + sampled = ( + self.session.query(rnd) + .where(rnd.c.random <= self.profile_sample) + .cte(f"{self.table.__tablename__}_sample") + ) + + # Assign as an alias + return aliased(self.table, sampled) diff --git a/ingestion/src/metadata/orm_profiler/validations/utils.py b/ingestion/src/metadata/orm_profiler/validations/utils.py index 49d0be30551..1bdfe472853 100644 --- a/ingestion/src/metadata/orm_profiler/validations/utils.py +++ b/ingestion/src/metadata/orm_profiler/validations/utils.py @@ -19,7 +19,7 @@ from sqlalchemy import inspect from sqlalchemy.orm import DeclarativeMeta, Session from metadata.orm_profiler.metrics.core import Metric -from metadata.orm_profiler.profiles.core import Profiler +from metadata.orm_profiler.profiler.core import Profiler def run_col_metric( diff --git a/ingestion/src/metadata/utils/constants.py b/ingestion/src/metadata/utils/constants.py index df25dbe309c..53d191eadbc 100644 --- a/ingestion/src/metadata/utils/constants.py +++ b/ingestion/src/metadata/utils/constants.py @@ -14,3 +14,4 @@ Define constants useful for the metadata ingestion """ DOT = "_DOT_" +TEN_MIN = 10 * 60 diff --git a/ingestion/src/metadata/utils/timeout.py b/ingestion/src/metadata/utils/timeout.py new file mode 100644 index 00000000000..f23cdb6df90 --- /dev/null +++ b/ingestion/src/metadata/utils/timeout.py @@ -0,0 +1,73 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Timeout utilities +""" +import errno +import functools +import inspect +import logging +import os +import signal +import traceback + +from metadata.utils.constants import TEN_MIN + +logger = logging.getLogger(__name__) + + +def _handle_timeout(signum, frame): + """ + Handler for signal timeout + """ + logger.debug(traceback.print_stack(frame)) + raise TimeoutError(f"[SIGNUM {signum}] {os.strerror(errno.ETIME)}") + + +def timeout(seconds: int = TEN_MIN): + """ + Decorator factory to handle timeouts in functions. Defaults + to 10 min + :param seconds: seconds to wait until raising the timeout + """ + + def decorator(fn): + @functools.wraps(fn) + def inner(*args, **kwargs): + signal.signal(signal.SIGALRM, _handle_timeout) + signal.alarm(seconds) + try: + result = fn(*args, **kwargs) + finally: + signal.alarm(0) + return result + + return inner + + return decorator + + +def cls_timeout(seconds: int = TEN_MIN): + """ + Decorates with `timeout` all methods + of a class cls + :param seconds: timeout to use + :return: class with decorated methods + """ + + def inner(cls): + for attr_name, attr in inspect.getmembers(cls, inspect.isfunction): + setattr(cls, attr_name, timeout(seconds)(getattr(cls, attr_name))) + + return cls + + return inner diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py index 5827d73e7db..4716966a19e 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py @@ -119,6 +119,7 @@ class ProfilerWorkflowTest(TestCase): "config": { "profiler": { "name": "my_profiler", + "timeout_seconds": 60, "metrics": ["row_count", "min", "max", "COUNT", "null_count"], }, "test_suite": { diff --git a/ingestion/tests/unit/profiler/test_metrics.py b/ingestion/tests/unit/profiler/test_metrics.py index c96ec0b3263..6f6e67b1e19 100644 --- a/ingestion/tests/unit/profiler/test_metrics.py +++ b/ingestion/tests/unit/profiler/test_metrics.py @@ -19,7 +19,7 @@ from sqlalchemy.orm import declarative_base from metadata.orm_profiler.metrics.core import add_props from metadata.orm_profiler.metrics.registry import Metrics -from metadata.orm_profiler.profiles.core import Profiler +from metadata.orm_profiler.profiler.core import Profiler from metadata.utils.engines import create_and_bind_session Base = declarative_base() diff --git a/ingestion/tests/unit/profiler/test_profiler.py b/ingestion/tests/unit/profiler/test_profiler.py index 950fdd66fcf..ac3ee43c751 100644 --- a/ingestion/tests/unit/profiler/test_profiler.py +++ b/ingestion/tests/unit/profiler/test_profiler.py @@ -21,8 +21,8 @@ from sqlalchemy.orm import declarative_base from metadata.generated.schema.entity.data.table import ColumnProfile, Histogram from metadata.orm_profiler.metrics.core import add_props from metadata.orm_profiler.metrics.registry import Metrics -from metadata.orm_profiler.profiles.core import MissingMetricException, Profiler -from metadata.orm_profiler.profiles.default import DefaultProfiler +from metadata.orm_profiler.profiler.core import MissingMetricException, Profiler +from metadata.orm_profiler.profiler.default import DefaultProfiler from metadata.utils.engines import create_and_bind_session Base = declarative_base() diff --git a/ingestion/tests/unit/profiler/test_runner.py b/ingestion/tests/unit/profiler/test_runner.py new file mode 100644 index 00000000000..f0ea6ae42f6 --- /dev/null +++ b/ingestion/tests/unit/profiler/test_runner.py @@ -0,0 +1,154 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test Sample behavior +""" +import time +from unittest import TestCase + +import pytest +from sqlalchemy import TEXT, Column, Integer, String, create_engine, func +from sqlalchemy.orm import DeclarativeMeta, declarative_base + +from metadata.orm_profiler.profiler.runner import QueryRunner +from metadata.orm_profiler.profiler.sampler import Sampler +from metadata.utils.engines import create_and_bind_session +from metadata.utils.timeout import cls_timeout + +Base = declarative_base() + + +class User(Base): + __tablename__ = "users" + id = Column(Integer, primary_key=True) + name = Column(String(256)) + fullname = Column(String(256)) + nickname = Column(String(256)) + comments = Column(TEXT) + age = Column(Integer) + + +class Timer: + """ + Helper to test timeouts + """ + + @staticmethod + def slow(): + time.sleep(10) + return 1 + + @staticmethod + def fast(): + return 1 + + +class RunnerTest(TestCase): + """ + Run checks on different metrics + """ + + engine = create_engine("sqlite+pysqlite:///:memory:", echo=False, future=True) + session = create_and_bind_session(engine) + + sampler = Sampler(session=session, table=User, profile_sample=50.0) + sample = sampler.random_sample() + + raw_runner = QueryRunner(session=session, table=User, sample=sample) + + timeout_runner: Timer = cls_timeout(1)(Timer()) + + @classmethod + def setUpClass(cls) -> None: + """ + Prepare Ingredients + """ + User.__table__.create(bind=cls.engine) + + # Insert 30 rows + for i in range(10): + data = [ + User( + name="John", + fullname="John Doe", + nickname="johnny b goode", + comments="no comments", + age=30, + ), + User( + name="Jane", + fullname="Jone Doe", + nickname=None, + comments="maybe some comments", + age=31, + ), + User( + name="John", + fullname="John Doe", + nickname=None, + comments=None, + age=None, + ), + ] + cls.session.add_all(data) + cls.session.commit() + + def test_select_from_table(self): + """ + We can run queries against the table + """ + res = self.raw_runner.select_first_from_table(func.count()) + assert res[0] == 30 + + res = self.raw_runner.select_all_from_table(Column(User.name.name)) + assert len(res) == 30 + + def test_select_from_sample(self): + """ + We can run queries against the sample + """ + res = self.raw_runner.select_first_from_sample(func.count()) + assert res[0] < 30 + + # Note how we need to pass the column by name, not from the table + # object, or it will run a cartesian product. + res = self.raw_runner.select_all_from_sample(Column(User.name.name)) + assert len(res) < 30 + + def test_select_from_query(self): + """ + We can pick up results from a given query + """ + query = self.session.query(func.count()).select_from(User) + res = self.raw_runner.select_first_from_query(query) + assert res[0] == 30 + + query = self.session.query(func.count()).select_from(self.sample) + res = self.raw_runner.select_first_from_query(query) + assert res[0] < 30 + + query = self.session.query(Column(User.name.name)).select_from(User) + res = self.raw_runner.select_all_from_query(query) + assert len(res) == 30 + + query = self.session.query(func.count()).select_from(self.sample) + res = self.raw_runner.select_all_from_query(query) + assert len(res) < 30 + + def test_timeout_runner(self): + """ + Check that timeout alarms get executed + """ + assert self.timeout_runner.fast() == 1 + + with pytest.raises(TimeoutError): + self.timeout_runner.slow() diff --git a/ingestion/tests/unit/profiler/test_sample.py b/ingestion/tests/unit/profiler/test_sample.py index 75a8133db40..3730c664d87 100644 --- a/ingestion/tests/unit/profiler/test_sample.py +++ b/ingestion/tests/unit/profiler/test_sample.py @@ -18,7 +18,8 @@ from sqlalchemy import TEXT, Column, Integer, String, create_engine, func from sqlalchemy.orm import DeclarativeMeta, declarative_base from metadata.orm_profiler.metrics.registry import Metrics -from metadata.orm_profiler.profiles.core import Profiler +from metadata.orm_profiler.profiler.core import Profiler +from metadata.orm_profiler.profiler.sampler import Sampler from metadata.utils.engines import create_and_bind_session Base = declarative_base() @@ -77,6 +78,16 @@ class SampleTest(TestCase): cls.session.add_all(data) cls.session.commit() + def test_random_sampler(self): + """ + The random sampler should be able to + generate a random subset of data + """ + sampler = Sampler(session=self.session, table=User, profile_sample=50.0) + random_sample = sampler.random_sample() + res = self.session.query(func.count()).select_from(random_sample).first() + assert res[0] < 30 + def test_sample_property(self): """ Sample property should be properly generated diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index 810be299302..1533db77b1e 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -34,8 +34,8 @@ from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.source.sqlite import SQLiteConfig from metadata.orm_profiler.api.workflow import ProfilerWorkflow from metadata.orm_profiler.processor.orm_profiler import OrmProfilerProcessor -from metadata.orm_profiler.profiles.default import DefaultProfiler -from metadata.orm_profiler.profiles.models import ProfilerDef +from metadata.orm_profiler.profiler.default import DefaultProfiler +from metadata.orm_profiler.profiler.models import ProfilerDef from metadata.orm_profiler.validations.models import TestDef, TestSuite config = {