diff --git a/ingestion/src/metadata/interfaces/sqalchemy/mixins/sqa_mixin.py b/ingestion/src/metadata/interfaces/sqalchemy/mixins/sqa_mixin.py index 2de629b3d85..e1fdb14ac49 100644 --- a/ingestion/src/metadata/interfaces/sqalchemy/mixins/sqa_mixin.py +++ b/ingestion/src/metadata/interfaces/sqalchemy/mixins/sqa_mixin.py @@ -15,12 +15,11 @@ supporting sqlalchemy abstraction layer """ -from typing import Dict, Optional +from typing import Optional from sqlalchemy import Column, MetaData, inspect from sqlalchemy.orm import DeclarativeMeta -from metadata.generated.schema.entity.data.table import PartitionProfilerConfig from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( SnowflakeType, ) @@ -86,27 +85,6 @@ class SQAInterfaceMixin: ) ) - def get_partition_details( - self, partition_config: Optional[PartitionProfilerConfig] - ) -> Optional[Dict]: - """From partition config, get the partition table for a table entity - - Args: - partition_config: PartitionProfilerConfig object with some partition details - - Returns: - dict or None: dictionary with all the elements constituting the a partition - """ - if not partition_config: - return None - - return { - "partition_field": partition_config.partitionColumnName, - "partition_values": partition_config.partitionValues, - "partition_interval_unit": partition_config.partitionIntervalUnit.value, - "partition_interval": partition_config.partitionInterval, - } - def close(self): """close session""" self.session.close() diff --git a/ingestion/src/metadata/interfaces/sqalchemy/sqa_profiler_interface.py b/ingestion/src/metadata/interfaces/sqalchemy/sqa_profiler_interface.py index a54588782d6..bfa205e90f6 100644 --- a/ingestion/src/metadata/interfaces/sqalchemy/sqa_profiler_interface.py +++ b/ingestion/src/metadata/interfaces/sqalchemy/sqa_profiler_interface.py @@ -80,7 +80,7 @@ class SQAProfilerInterface(SQAInterfaceMixin, ProfilerProtocol): self.profile_sample_config = profiler_interface_args.profile_sample_config self.profile_query = profiler_interface_args.table_sample_query self.partition_details = ( - self.get_partition_details(profiler_interface_args.table_partition_config) + profiler_interface_args.table_partition_config if not self.profile_query else None ) diff --git a/ingestion/src/metadata/interfaces/sqalchemy/sqa_test_suite_interface.py b/ingestion/src/metadata/interfaces/sqalchemy/sqa_test_suite_interface.py index 3c39b765dd5..97c0e5511b6 100644 --- a/ingestion/src/metadata/interfaces/sqalchemy/sqa_test_suite_interface.py +++ b/ingestion/src/metadata/interfaces/sqalchemy/sqa_test_suite_interface.py @@ -21,7 +21,7 @@ from sqlalchemy import MetaData from sqlalchemy.orm import DeclarativeMeta from sqlalchemy.orm.util import AliasedClass -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.table import PartitionProfilerConfig, Table from metadata.generated.schema.entity.services.databaseService import DatabaseConnection from metadata.generated.schema.tests.basic import TestCaseResult from metadata.generated.schema.tests.testCase import TestCase @@ -56,7 +56,7 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteProtocol): sqa_metadata_obj: Optional[MetaData] = None, profile_sample_config: Optional[ProfileSampleConfig] = None, table_sample_query: str = None, - table_partition_config: dict = None, + table_partition_config: Optional[PartitionProfilerConfig] = None, table_entity: Table = None, ): self.ometa_client = ometa_client @@ -72,9 +72,7 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteProtocol): self.profile_sample_config = profile_sample_config self.table_sample_query = table_sample_query self.table_partition_config = ( - self.get_partition_details(table_partition_config) - if not self.table_sample_query - else None + table_partition_config if not self.table_sample_query else None ) self._sampler = self._create_sampler() diff --git a/ingestion/src/metadata/orm_profiler/metrics/system/system.py b/ingestion/src/metadata/orm_profiler/metrics/system/system.py index 7428e4c8b22..fadbac87de5 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/system/system.py +++ b/ingestion/src/metadata/orm_profiler/metrics/system/system.py @@ -22,6 +22,7 @@ from sqlalchemy import text from sqlalchemy.orm import DeclarativeMeta, Session from sqlparse.sql import Identifier +from metadata.generated.schema.entity.data.table import DmlOperationType from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( BigQueryConnection, ) @@ -258,6 +259,8 @@ def _( token.value for token in query_text.tokens if token.ttype is sqlparse.tokens.DML + and token.value.upper() + in DmlOperationType._member_names_ # pylint: disable=protected-access ), None, ) diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/datetime.py b/ingestion/src/metadata/orm_profiler/orm/functions/datetime.py index aa2361725d1..00af8cafe17 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/datetime.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/datetime.py @@ -90,6 +90,13 @@ def _(elements, compiler, **kwargs): return f"DATEADD({interval_unit}, -{interval}, {func.current_date()})" +@compiles(DateAddFn, Dialects.SQLite) +def _(elements, compiler, **kwargs): # pylint: disable=unused-argument + interval = elements.clauses.clauses[0].value + interval_unit = elements.clauses.clauses[1].text + return f"DATE({func.current_date()}, '-{interval} {interval_unit}')" + + class DatetimeAddFn(FunctionElement): inherit_cache = CACHE @@ -151,3 +158,10 @@ def _(elements, compiler, **kwargs): compiler.process(element, **kwargs) for element in elements.clauses ] return f"DATEADD({interval_unit}, -{interval}, {func.current_timestamp()})" + + +@compiles(DatetimeAddFn, Dialects.SQLite) +def _(elements, compiler, **kwargs): # pylint: disable=unused-argument + interval = elements.clauses.clauses[0].value + interval_unit = elements.clauses.clauses[1].text + return f"DATE({func.current_timestamp()}, '-{interval} {interval_unit}')" diff --git a/ingestion/src/metadata/orm_profiler/profiler/handle_partition.py b/ingestion/src/metadata/orm_profiler/profiler/handle_partition.py index 6251f2dd4f2..93a99a0415f 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/handle_partition.py +++ b/ingestion/src/metadata/orm_profiler/profiler/handle_partition.py @@ -14,45 +14,29 @@ Helper submodule for partitioned tables from __future__ import annotations -import sqlalchemy -from sqlalchemy import Column, text -from sqlalchemy.sql import and_ +from typing import List -from metadata.orm_profiler.orm.functions.datetime import DateAddFn, DatetimeAddFn +from sqlalchemy import Column, text + +from metadata.generated.schema.entity.data.table import PartitionProfilerConfig from metadata.orm_profiler.orm.functions.modulo import ModuloFn from metadata.orm_profiler.orm.functions.random_num import RandomNumFn from metadata.utils.logger import profiler_logger +from metadata.utils.sqa_utils import ( + build_query_filter, + dispatch_to_date_or_datetime, + get_partition_col_type, +) RANDOM_LABEL = "random" logger = profiler_logger() -def format_partition_datetime( - partition_field: str, - partition_interval: int, - partition_interval_unit: str, - col_type, +def build_partition_predicate( + partition_details: PartitionProfilerConfig, + columns: List[Column], ): - """format partition predicate - - Args: - partition_field (_type_): _description_ - partition_interval (_type_): _description_ - partition_interval_unit (_type_): _description_ - """ - if isinstance(col_type, (sqlalchemy.DATE)): - return and_( - Column(partition_field) - >= DateAddFn(partition_interval, text(partition_interval_unit)) - ) - return and_( - Column(partition_field) - >= DatetimeAddFn(partition_interval, text(partition_interval_unit)) - ) - - -def build_partition_predicate(partition_details: dict, col: Column): """_summary_ Args: @@ -62,20 +46,27 @@ def build_partition_predicate(partition_details: dict, col: Column): Returns: _type_: _description_ """ - if partition_details["partition_values"]: - return col.in_(partition_details["partition_values"]) + partition_field = partition_details.partitionColumnName + if partition_details.partitionValues: + return build_query_filter( + [(Column(partition_field), "in", partition_details.partitionValues)], + False, + ) - col_type = None - if col is not None: - col_type = col.type - if partition_details["partition_field"] == "_PARTITIONDATE": - col_type = sqlalchemy.DATE + type_ = get_partition_col_type( + partition_field, + columns, + ) - return format_partition_datetime( - partition_details["partition_field"], - partition_details["partition_interval"], - partition_details["partition_interval_unit"], - col_type, + date_or_datetime_fn = dispatch_to_date_or_datetime( + partition_details.partitionInterval, + text(partition_details.partitionIntervalUnit.value), + type_, + ) + + return build_query_filter( + [(Column(partition_field), "ge", date_or_datetime_fn)], + False, ) @@ -103,12 +94,9 @@ class partition_filter_handler: def handle_and_execute(_self, *args, **kwargs): """Handle partitioned queries""" if _self._partition_details: - partition_field = _self._partition_details[ - "partition_field" - ].lower() # normnalizing field name as we'll lookup by key partition_filter = build_partition_predicate( _self._partition_details, - _self.table.__table__.c.get(partition_field), + _self.table.__table__.c, ) if self.build_sample: return ( diff --git a/ingestion/src/metadata/orm_profiler/profiler/runner.py b/ingestion/src/metadata/orm_profiler/profiler/runner.py index b8dbeca2ab0..f0818d47e7a 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/runner.py +++ b/ingestion/src/metadata/orm_profiler/profiler/runner.py @@ -23,6 +23,10 @@ from sqlalchemy.orm import DeclarativeMeta, Query, Session from sqlalchemy.orm.util import AliasedClass from metadata.orm_profiler.profiler.handle_partition import partition_filter_handler +from metadata.utils.logger import query_runner_logger +from metadata.utils.sqa_utils import get_query_filter_for_runner + +logger = query_runner_logger() class QueryRunner: @@ -55,25 +59,60 @@ class QueryRunner: return self._session.query(*entities, **kwargs) def _select_from_sample(self, *entities, **kwargs): - return self._build_query(*entities, **kwargs).select_from(self._sample) + """Run select statement against sample data""" + filter_ = get_query_filter_for_runner(kwargs) + + query = self._build_query(*entities, **kwargs).select_from(self._sample) + + if filter_ is not None: + return query.filter(filter_) + + return query def _select_from_user_query(self, *entities, **kwargs): + """Run select statement against user defined query""" + filter_ = get_query_filter_for_runner(kwargs) + user_query = self._session.query(self.table).from_statement( text(f"{self._profile_sample_query}") ) - return self._build_query(*entities, **kwargs).select_from(user_query) + + query = self._build_query(*entities, **kwargs).select_from(user_query) + + if filter_ is not None: + return query.filter(filter_) + + return query @partition_filter_handler() def select_first_from_table(self, *entities, **kwargs): + """Select first row from the table""" + filter_ = get_query_filter_for_runner(kwargs) + if self._profile_sample_query: return self._select_from_user_query(*entities, **kwargs).first() - return self._build_query(*entities, **kwargs).select_from(self.table).first() + + query = self._build_query(*entities, **kwargs).select_from(self.table) + + if filter_ is not None: + return query.filter(filter_).first() + + return query.first() @partition_filter_handler(first=False) def select_all_from_table(self, *entities, **kwargs): + """Select all rows from the table""" + filter_ = get_query_filter_for_runner(kwargs) + if self._profile_sample_query: return self._select_from_user_query(*entities, **kwargs).all() - return self._build_query(*entities, **kwargs).select_from(self.table).all() + + query = self._build_query(*entities, **kwargs).select_from(self.table) + + if filter_ is not None: + return query.filter(filter_).all() + + return query.all() @partition_filter_handler(sampled=True) def select_first_from_sample(self, *entities, **kwargs): diff --git a/ingestion/src/metadata/orm_profiler/profiler/sampler.py b/ingestion/src/metadata/orm_profiler/profiler/sampler.py index ccf1572f1e7..4a5d8d658c1 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/sampler.py +++ b/ingestion/src/metadata/orm_profiler/profiler/sampler.py @@ -12,21 +12,26 @@ Helper module to handle data sampling for the profiler """ -from typing import Dict, Optional, Union +from typing import Dict, Optional, Union, cast -import sqlalchemy from sqlalchemy import column, inspect, text from sqlalchemy.orm import DeclarativeMeta, Query, Session, aliased from sqlalchemy.orm.util import AliasedClass -from metadata.generated.schema.entity.data.table import ProfileSampleType, TableData +from metadata.generated.schema.entity.data.table import ( + PartitionProfilerConfig, + ProfileSampleType, + TableData, +) from metadata.orm_profiler.api.models import ProfileSampleConfig from metadata.orm_profiler.orm.functions.modulo import ModuloFn from metadata.orm_profiler.orm.functions.random_num import RandomNumFn from metadata.orm_profiler.orm.registry import Dialects -from metadata.orm_profiler.profiler.handle_partition import ( - format_partition_datetime, - partition_filter_handler, +from metadata.orm_profiler.profiler.handle_partition import partition_filter_handler +from metadata.utils.sqa_utils import ( + build_query_filter, + dispatch_to_date_or_datetime, + get_partition_col_type, ) RANDOM_LABEL = "random" @@ -92,7 +97,7 @@ class Sampler: if not self.profile_sample: if self._partition_details: - return self._random_sample_for_partitioned_tables() + return self._partitioned_table() return self.table @@ -148,36 +153,56 @@ class Sampler: text(f"{self._profile_sample_query}") ) - def _random_sample_for_partitioned_tables(self) -> Query: + def _partitioned_table(self) -> Query: """Return the Query object for partitioned tables""" - partition_field = self._partition_details["partition_field"] - col = self.table.__table__.c.get(partition_field.lower()) - col_type = None - if col is not None: - col_type = col.type - if partition_field == "_PARTITIONDATE": - col_type = sqlalchemy.DATE - if partition_field == "_PARTITIONTIME": - col_type = sqlalchemy.DATETIME() + self._partition_details = cast( + PartitionProfilerConfig, self._partition_details + ) # satisfying type checker + partition_field = self._partition_details.partitionColumnName - if not self._partition_details.get("partition_values"): + type_ = get_partition_col_type( + partition_field, + self.table.__table__.c, + ) + + if not self._partition_details.partitionValues: sample = ( self.session.query(self.table) .filter( - format_partition_datetime( - partition_field, - self._partition_details["partition_interval"], - self._partition_details["partition_interval_unit"], - col_type, + build_query_filter( + [ + ( + column(partition_field), + "ge", + dispatch_to_date_or_datetime( + self._partition_details.partitionInterval, + text( + self._partition_details.partitionIntervalUnit.value + ), + type_, + ), + ) + ], + False, ) ) .subquery() ) return aliased(self.table, sample) + sample = ( self.session.query(self.table) .filter( - column(partition_field).in_(self._partition_details["partition_values"]) + build_query_filter( + [ + ( + column(partition_field), + "in", + self._partition_details.partitionValues, + ) + ], + False, + ) ) .subquery() ) diff --git a/ingestion/src/metadata/test_suite/validations/core.py b/ingestion/src/metadata/test_suite/validations/core.py index 0c9f6dcd138..d5c567c21b1 100644 --- a/ingestion/src/metadata/test_suite/validations/core.py +++ b/ingestion/src/metadata/test_suite/validations/core.py @@ -87,6 +87,9 @@ from metadata.test_suite.validations.table.table_row_count_to_be_between import from metadata.test_suite.validations.table.table_row_count_to_equal import ( table_row_count_to_equal, ) +from metadata.test_suite.validations.table.table_row_inserted_count_to_be_between import ( + table_row_inserted_count_to_be_between, +) from metadata.utils.dispatch import enum_register from metadata.utils.logger import profiler_logger @@ -104,6 +107,9 @@ validation_enum_registry.add("tableColumnCountToBeBetween")( validation_enum_registry.add("tableColumnToMatchSet")(table_column_to_match_set) validation_enum_registry.add("tableColumnNameToExist")(table_column_name_to_exist) validation_enum_registry.add("tableCustomSQLQuery")(table_custom_sql_query) +validation_enum_registry.add("tableRowInsertedCountToBeBetween")( + table_row_inserted_count_to_be_between +) # # # Column Tests validation_enum_registry.add("columnValuesToBeBetween")(column_values_to_be_between) diff --git a/ingestion/src/metadata/test_suite/validations/table/table_row_inserted_count_to_be_between.py b/ingestion/src/metadata/test_suite/validations/table/table_row_inserted_count_to_be_between.py new file mode 100644 index 00000000000..0ba72d6360e --- /dev/null +++ b/ingestion/src/metadata/test_suite/validations/table/table_row_inserted_count_to_be_between.py @@ -0,0 +1,170 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +TableRowInsertedCountToBeBetween validation implementation +""" + +import traceback +from datetime import datetime +from functools import singledispatch +from typing import List, Optional, Union, cast + +from pandas import DataFrame +from sqlalchemy import Column, text + +from metadata.generated.schema.tests.basic import ( + TestCaseResult, + TestCaseStatus, + TestResultValue, +) +from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue +from metadata.orm_profiler.metrics.registry import Metrics +from metadata.orm_profiler.profiler.runner import QueryRunner +from metadata.utils.logger import test_suite_logger +from metadata.utils.sqa_utils import ( + dispatch_to_date_or_datetime, + get_partition_col_type, +) +from metadata.utils.test_suite import build_test_case_result, get_test_case_param_value + +logger = test_suite_logger() + + +@singledispatch +def table_row_inserted_count_to_be_between( + runner, + test_case: TestCase, + execution_date: Union[datetime, float], +): + raise NotImplementedError + + +@table_row_inserted_count_to_be_between.register +def _( + runner: QueryRunner, + test_case: TestCase, + execution_date: Union[datetime, float], +) -> TestCaseResult: + """_summary_ + + Args: + runner (QueryRunner): _description_ + test_case (TestCase): _description_ + execution_date (Union[datetime, float]): _description_ + + Returns: + TestCaseResult: _description_ + """ + parameter_values = test_case.parameterValues + parameter_values = cast(List[TestCaseParameterValue], parameter_values) + column_name: Optional[Column] = get_test_case_param_value( + parameter_values, + "columnName", + Column, + ) + range_type: Optional[str] = get_test_case_param_value( + parameter_values, + "rangeType", + str, + ) + range_interval: Optional[int] = get_test_case_param_value( + parameter_values, + "rangeInterval", + int, + ) + + if any(var is None for var in [column_name, range_type, range_interval]): + result = ( + f"Error computing {test_case.name} for {runner.table.__tablename__}:" + " No value found for columnName, rangeType or rangeInterval" + ) + logger.debug(traceback.format_exc()) + logger.warning(result) + return build_test_case_result( + execution_date, + TestCaseStatus.Aborted, + result, + [TestResultValue(name="rowCount", value=None)], + sample_data=None, + ) + + try: + date_or_datetime_fn = dispatch_to_date_or_datetime( + range_interval, + text(range_type), + get_partition_col_type(column_name.name, runner.table.__table__.c), + ) + + row_count_dict = dict( + runner.dispatch_query_select_first( + Metrics.ROW_COUNT.value().fn(), + query_filter_={ + "filters": [(column_name, "ge", date_or_datetime_fn)], + "or_filter": False, + }, + ) + ) + row_count_value = row_count_dict.get(Metrics.ROW_COUNT.name) + except Exception as exc: + result = ( + f"Error computing {test_case.name} for {runner.table.__tablename__}: {exc}" + ) + logger.debug(traceback.format_exc()) + logger.warning(result) + return build_test_case_result( + execution_date, + TestCaseStatus.Aborted, + result, + [TestResultValue(name="rowCount", value=None)], + sample_data=None, + ) + + min_: Union[int, float] = get_test_case_param_value( + parameter_values, + "min", + int, + float("-inf"), + ) + max_: Union[int, float] = get_test_case_param_value( + parameter_values, + "max", + int, + float("inf"), + ) + + status = ( + TestCaseStatus.Success + if min_ <= row_count_value <= max_ + else TestCaseStatus.Failed + ) + result = f"Found {row_count_value} rows vs. the expected range [{min_}, {max_}]." + return build_test_case_result( + execution_date, + status, + result, + [TestResultValue(name="rowCount", value=str(row_count_value))], + sample_data=None, + ) + + +@table_row_inserted_count_to_be_between.register +def _( + runner: DataFrame, test_case: TestCase, execution_date: Union[datetime, float] +): # pylint: disable=unused-argument + result = "Test is currently not supported for datalake sources." + return build_test_case_result( + execution_date, + TestCaseStatus.Aborted, + result, + [TestResultValue(name="rowCount", value=None)], + sample_data=None, + ) diff --git a/ingestion/src/metadata/utils/logger.py b/ingestion/src/metadata/utils/logger.py index 32c37064a8b..48e09ef25f6 100644 --- a/ingestion/src/metadata/utils/logger.py +++ b/ingestion/src/metadata/utils/logger.py @@ -40,6 +40,7 @@ class Loggers(Enum): PROFILER_INTERFACE = "ProfilerInterface" TEST_SUITE = "TestSuite" DATA_INSIGHT = "DataInsight" + QUERY_RUNNER = "QueryRunner" @DynamicClassAttribute def value(self): @@ -131,6 +132,14 @@ def great_expectations_logger(): return logging.getLogger(Loggers.GREAT_EXPECTATIONS.value) +def query_runner_logger(): + """ + Method to get the QUERY_RUNNER logger + """ + + return logging.getLogger(Loggers.QUERY_RUNNER.value) + + def set_loggers_level(level: Union[int, str] = logging.INFO): """ Set all loggers levels diff --git a/ingestion/src/metadata/utils/sqa_utils.py b/ingestion/src/metadata/utils/sqa_utils.py new file mode 100644 index 00000000000..7f72c61b54a --- /dev/null +++ b/ingestion/src/metadata/utils/sqa_utils.py @@ -0,0 +1,127 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +sqlalchemy utility functions +""" + +import traceback +from typing import Any, Dict, List, Optional, Tuple + +import sqlalchemy +from sqlalchemy import Column, and_, or_ +from sqlalchemy.sql.elements import BinaryExpression + +from metadata.orm_profiler.orm.functions.datetime import DateAddFn, DatetimeAddFn +from metadata.utils.logger import query_runner_logger + +logger = query_runner_logger() + +# pylint: disable=cell-var-from-loop +def build_query_filter( + filters: List[Tuple[Column, str, Any]], or_filter: bool = False +) -> Optional[BinaryExpression]: + """Dynamically build query filter + + Args: + filters (List[Tuple[Column, str, Any]]): list of tuples representing filters. + The first value is the column, the second the comparison operators (e.g. "ge", "lt", "eq", "in", etc.) and + the last value the comparison value. e.g. (Column("foo"), "ge", 1) will produce "foo >= 1". + or_filter (bool, optional): whether to perform an OR or AND condition. Defaults to False (i.e. AND). + + Returns: + BinaryExpression: a filter pattern + """ + list_of_filters = [] + for filter_ in filters: + column, operator, value = filter_ + try: + filter_attr = ( + next( + filter( + lambda x: hasattr(column, x % operator), ["%s", "%s_", "__%s__"] + ), + None, + ) + % operator + ) # type: ignore + except TypeError as err: + logger.debug(traceback.format_exc()) + logger.error(f"Error when looking for operator {operator} - {err}") + else: + list_of_filters.append(getattr(column, filter_attr)(value)) + + if not list_of_filters: + logger.debug("No filters found.") + return None + + if or_filter: + return or_(*list_of_filters) + return and_(*list_of_filters) + + +def dispatch_to_date_or_datetime( + partition_interval: int, + partition_interval_unit: str, + type_, +): + """Dispatch to date or datetime function based on the type + + Args: + partition_field (_type_): _description_ + partition_interval (_type_): _description_ + partition_interval_unit (_type_): _description_ + """ + if isinstance(type_, (sqlalchemy.DATE)): + return DateAddFn(partition_interval, partition_interval_unit) + return DatetimeAddFn(partition_interval, partition_interval_unit) + + +def get_partition_col_type(partition_column_name: str, columns: List[Column]): + """From partition field, get the type + + Args: + partition_column_name (str): column name + columns (List[Column]): list of table columns + + Returns: + _type_: type + """ + partition_field = ( + partition_column_name.lower() + ) # normalize field name as we'll be looking by key + + col = columns.get(partition_field) + if ( + col is not None + ): # if col is None, this means we have BQ pseudo columns _PARTITIONDATE or _PARTITIONTIME + return col.type + if partition_field == "_partitiondate": + return sqlalchemy.DATE() + if partition_field == "_partitiontime": + return sqlalchemy.DATETIME() + return None + + +def get_query_filter_for_runner(kwargs: Dict) -> Optional[BinaryExpression]: + """Get query filters from kwargs. IMPORTANT, this will update the original dictionary + passed in the function argument. + + Args: + kwargs (Dict): kwargs + """ + if kwargs.get("query_filter_"): + query_filter = kwargs.pop("query_filter_") + filter_ = build_query_filter(**query_filter) + else: + filter_ = None + + return filter_ diff --git a/ingestion/src/metadata/utils/test_suite.py b/ingestion/src/metadata/utils/test_suite.py index de239099275..fd4e90e1b88 100644 --- a/ingestion/src/metadata/utils/test_suite.py +++ b/ingestion/src/metadata/utils/test_suite.py @@ -15,8 +15,14 @@ Helper module for test suite functions from __future__ import annotations -from typing import Callable, Optional +from datetime import datetime +from typing import Callable, List, Optional +from metadata.generated.schema.tests.basic import ( + TestCaseResult, + TestCaseStatus, + TestResultValue, +) from metadata.generated.schema.tests.testCase import TestCaseParameterValue @@ -49,3 +55,30 @@ def get_test_case_param_value( pre_processed_value = pre_processor(value) return type_(pre_processed_value) + + +def build_test_case_result( + execution_datetime: datetime, + status: TestCaseStatus, + result: str, + test_result_value: List[TestResultValue], + sample_data: Optional[str] = None, +) -> TestCaseResult: + """create a test case result object + + Args: + execution_datetime (datetime): execution datetime of the test + status (TestCaseStatus): failed, succeed, aborted + result (str): message to display + testResultValue (List[TestResultValue]): values for the test result + + Returns: + TestCaseResult: + """ + return TestCaseResult( + timestamp=execution_datetime, + testCaseStatus=status, + result=result, + testResultValue=test_result_value, + sampleData=sample_data, + ) diff --git a/ingestion/tests/unit/test_suite/conftest.py b/ingestion/tests/unit/test_suite/conftest.py new file mode 100644 index 00000000000..22911b0aaf0 --- /dev/null +++ b/ingestion/tests/unit/test_suite/conftest.py @@ -0,0 +1,589 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +shared test cases +""" + +import os +from datetime import datetime, timedelta +from unittest.mock import patch +from uuid import uuid4 + +import pytest +import sqlalchemy as sqa +from sqlalchemy.orm import declarative_base + +from metadata.generated.schema.entity.data.table import Column, DataType, Table +from metadata.generated.schema.entity.services.connections.database.sqliteConnection import ( + SQLiteConnection, + SQLiteScheme, +) +from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.interfaces.sqalchemy.sqa_test_suite_interface import SQATestSuiteInterface + +Base = declarative_base() + +TEST_CASE_NAME = "my_test_case" +ENTITY_LINK_NICKNAME = "<#E::table::service.db.users::columns::nickname>" +ENTITY_LINK_FNAME = "<#E::table::service.db.users::columns::first name>" +ENTITY_LINK_AGE = "<#E::table::service.db.users::columns::age>" +ENTITY_LINK_NAME = "<#E::table::service.db.users::columns::name>" +ENTITY_LINK_USER = "<#E::table::service.db.users>" + +TABLE = Table( + id=uuid4(), + name="users", + fullyQualifiedName="service.db.users", + columns=[ + Column(name="id", dataType=DataType.INT), # type: ignore + Column(name="name", dataType=DataType.STRING), # type: ignore + Column(name="first name", dataType=DataType.STRING), # type: ignore + Column(name="fullname", dataType=DataType.STRING), # type: ignore + Column(name="nickname", dataType=DataType.STRING), # type: ignore + Column(name="age", dataType=DataType.INT), # type: ignore + Column(name="inserted_date", dataType=DataType.DATE), # type: ignore + ], + database=EntityReference(id=uuid4(), name="db", type="database"), # type: ignore +) # type: ignore + + +class User(Base): + __tablename__ = "users" + id = sqa.Column(sqa.Integer, primary_key=True) + name = sqa.Column(sqa.String(256)) + first_name = sqa.Column("first name", sqa.String(256)) + fullname = sqa.Column(sqa.String(256)) + nickname = sqa.Column(sqa.String(256)) + age = sqa.Column(sqa.Integer) + inserted_date = sqa.Column(sqa.DATE) + + +@pytest.fixture +def create_sqlite_table(): + """create and delete sqlite table""" + db_path = os.path.join( + os.path.dirname(__file__), f"{os.path.splitext(__file__)[0]}.db" + ) + sqlite_conn = SQLiteConnection( + scheme=SQLiteScheme.sqlite_pysqlite, + databaseMode=db_path + "?check_same_thread=False", + ) # type: ignore + + with patch.object( + SQATestSuiteInterface, "_convert_table_to_orm_object", return_value=User + ): + sqa_profiler_interface = SQATestSuiteInterface( + sqlite_conn, # type: ignore + table_entity=TABLE, + ometa_client=None, # type: ignore + ) + + runner = sqa_profiler_interface.runner + engine = sqa_profiler_interface.session.get_bind() + session = sqa_profiler_interface.session + + User.__table__.create(bind=engine) + for i in range(10): + data = [ + User( + name="John", + first_name="Jo", + fullname="John Doe", + nickname="johnny b goode", + age=30, + inserted_date=datetime.today() - timedelta(days=i), + ), + User( + name="Jane", + first_name="Ja", + fullname="Jone Doe", + nickname="Johnny d", + age=31, + inserted_date=datetime.today() - timedelta(days=i), + ), + User( + name="John", + first_name="Joh", + fullname="John Doe", + nickname=None, + age=None, + inserted_date=datetime.today() - timedelta(days=i), + ), + ] + session.add_all(data) + session.commit() + + yield runner + # clean up + User.__table__.drop(bind=engine) + os.remove(db_path) + + +@pytest.fixture +def test_case_column_value_length_to_be_between(): + """Test case for test column_value_length_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NICKNAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minLength", value="1"), + TestCaseParameterValue(name="maxLength", value="10"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_length_to_be_between_col_space(): + """Test case for test column_value_length_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_FNAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minLength", value="1"), + TestCaseParameterValue(name="maxLength", value="10"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_length_to_be_between_no_min(): + """Test case for test column_value_length_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_FNAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="maxLength", value="10"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_max_to_be_between(): + """Test case for test column_value_max_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minValueForMaxInCol", value="1"), + TestCaseParameterValue(name="maxValueForMaxInCol", value="10"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_max_to_be_between_no_min(): + """Test case for test column_value_max_to_be_between_no_min""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="maxValueForMaxInCol", value="10"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_mean_to_be_between(): + """Test case for test column_value_mean_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minValueForMeanInCol", value="1"), + TestCaseParameterValue(name="maxValueForMeanInCol", value="10"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_mean_to_be_between_no_max(): + """Test case for test column_value_mean_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minValueForMeanInCol", value="1"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_median_to_be_between(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minValueForMedianInCol", value="1"), + TestCaseParameterValue(name="maxValueForMedianInCol", value="10"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_min_to_be_between(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minValueForMinInCol", value="25"), + TestCaseParameterValue(name="maxValueForMinInCol", value="40"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_min_to_be_between_no_min(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="maxValueForMinInCol", value="40"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_stddev_to_be_between(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minValueForStdDevInCol", value="20"), + TestCaseParameterValue(name="maxValueForStdDevInCol", value="40"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_stddev_to_be_between_no_min(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="maxValueForStdDevInCol", value="40"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_value_in_set(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="allowedValues", value="['John']"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_values_missing_count_to_be_equal(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NICKNAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="missingCountValue", value="10"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_values_missing_count_to_be_equal_missing_valuesl(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NICKNAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="missingCountValue", value="10"), + TestCaseParameterValue(name="missingValueMatch", value="['Johnny d']"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_values_not_in_set(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="forbiddenValues", value="['John']"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_sum_to_be_between(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minValueForColSum", value="10"), + TestCaseParameterValue(name="maxValueForColSum", value="100"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_values_to_be_between(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minValue", value="29"), + TestCaseParameterValue(name="maxValue", value="33"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_values_to_be_not_null(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NICKNAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + ) # type: ignore + + +@pytest.fixture +def test_case_column_values_to_be_unique(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NICKNAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + ) # type: ignore + + +@pytest.fixture +def test_case_column_values_to_match_regex(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="regex", value="J.*"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_values_to_not_match_regex(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="forbiddenRegex", value="X%"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_table_column_count_to_be_between(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_USER, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minColValue", value="2"), + TestCaseParameterValue(name="maxColValue", value="10"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_table_column_count_to_equal(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_USER, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[TestCaseParameterValue(name="columnCount", value="8")], + ) # type: ignore + + +@pytest.fixture +def test_case_table_column_name_to_exist(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_USER, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[TestCaseParameterValue(name="columnName", value="id")], + ) # type: ignore + + +@pytest.fixture +def test_case_column_to_match_set(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_USER, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="columnNames", value="id,name,nickname") + ], + ) # type: ignore + + +@pytest.fixture +def test_case_column_to_match_set_ordered(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_USER, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue( + name="columnNames", value="id,name,nickname,fullname,age" + ), + TestCaseParameterValue(name="ordered", value="True"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_table_custom_sql_query(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue( + name="sqlExpression", value="SELECT * FROM users WHERE age > 20" + ), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_table_custom_sql_query_success(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_USER, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue( + name="sqlExpression", value="SELECT * FROM users WHERE age < 0" + ), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_table_row_count_to_be_between(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_USER, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="minValue", value="10"), + TestCaseParameterValue(name="maxValue", value="35"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_table_row_count_to_be_equal(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_USER, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="value", value="10"), + ], + ) # type: ignore + + +@pytest.fixture +def test_case_table_row_inserted_count_to_be_between(): + """Test case for test column_value_median_to_be_between""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_USER, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="min", value="3"), + TestCaseParameterValue(name="columnName", value="inserted_date"), + TestCaseParameterValue(name="rangeType", value="DAY"), + TestCaseParameterValue(name="rangeInterval", value="1"), + ], + ) # type: ignore diff --git a/ingestion/tests/unit/test_suite/test_validations.py b/ingestion/tests/unit/test_suite/test_validations.py deleted file mode 100644 index 09d1a3447f5..00000000000 --- a/ingestion/tests/unit/test_suite/test_validations.py +++ /dev/null @@ -1,1177 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Test Table and Column Tests' validate implementations. - -Each test should validate the Success, Failure and Aborted statuses -""" -import os -import unittest -from datetime import datetime -from unittest.mock import patch -from uuid import uuid4 - -import sqlalchemy as sqa -from pandas import DataFrame -from sqlalchemy.orm import declarative_base - -from metadata.generated.schema.entity.data.table import Column, DataType, Table -from metadata.generated.schema.entity.services.connections.database.sqliteConnection import ( - SQLiteConnection, - SQLiteScheme, -) -from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus -from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue -from metadata.generated.schema.tests.testSuite import TestSuite -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.interfaces.sqalchemy.sqa_test_suite_interface import SQATestSuiteInterface -from metadata.test_suite.validations.core import validation_enum_registry - -EXECUTION_DATE = datetime.strptime("2021-07-03", "%Y-%m-%d") - -Base = declarative_base() - -TABLE = Table( - id=uuid4(), - name="users", - fullyQualifiedName="service.db.users", - columns=[ - Column(name="id", dataType=DataType.INT), - Column(name="name", dataType=DataType.STRING), - Column(name="first name", dataType=DataType.STRING), - Column(name="fullname", dataType=DataType.STRING), - Column(name="nickname", dataType=DataType.STRING), - Column(name="age", dataType=DataType.INT), - ], - database=EntityReference(id=uuid4(), name="db", type="database"), -) - -TEST_SUITE = TestSuite(name="my_test_suite", description="description") -DL_DATA = ( - ["1", "John", "Jo", "John Doe", "johnny b goode", 30], - ["2", "Jane", "Ja", "Jone Doe", "Johnny d", 31], - ["3", "John", "Joh", "John Doe", None, None], -) * 10 - - -DATALAKE_DATA_FRAME = DataFrame( - DL_DATA, columns=["id", "name", "first name", "fullname", "nickname", "age"] -) - - -class User(Base): - __tablename__ = "users" - id = sqa.Column(sqa.Integer, primary_key=True) - name = sqa.Column(sqa.String(256)) - first_name = sqa.Column("first name", sqa.String(256)) - fullname = sqa.Column(sqa.String(256)) - nickname = sqa.Column(sqa.String(256)) - age = sqa.Column(sqa.Integer) - - -class testSuiteValidation(unittest.TestCase): - """test suite validation""" - - db_path = os.path.join( - os.path.dirname(__file__), f"{os.path.splitext(__file__)[0]}.db" - ) - sqlite_conn = SQLiteConnection( - scheme=SQLiteScheme.sqlite_pysqlite, - databaseMode=db_path + "?check_same_thread=False", - ) - - with patch.object( - SQATestSuiteInterface, "_convert_table_to_orm_object", return_value=User - ): - sqa_profiler_interface = SQATestSuiteInterface( - sqlite_conn, - table_entity=TABLE, - ometa_client=None, - ) - dl_runner = DATALAKE_DATA_FRAME - - runner = sqa_profiler_interface.runner - engine = sqa_profiler_interface.session.get_bind() - session = sqa_profiler_interface.session - - @classmethod - def setUpClass(cls) -> None: - """ - Prepare Ingredients - """ - User.__table__.create(bind=cls.engine) - - for i in range(10): - data = [ - User( - name="John", - first_name="Jo", - fullname="John Doe", - nickname="johnny b goode", - age=30, - ), - User( - name="Jane", - first_name="Ja", - fullname="Jone Doe", - nickname="Johnny d", - age=31, - ), - User( - name="John", - first_name="Joh", - fullname="John Doe", - nickname=None, - age=None, - ), - ] - cls.session.add_all(data) - cls.session.commit() - - def test_column_value_length_to_be_between(self): - """ - Check ColumnValueLengthsToBeBetween - """ - - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::nickname>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minLength", value="1"), - TestCaseParameterValue(name="maxLength", value="10"), - ], - ) - - res = validation_enum_registry.registry["columnValueLengthsToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValueLengthsToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "8" - assert res.testResultValue[1].value == "14" - assert dl_res.testResultValue[0].value == "4" - assert dl_res.testResultValue[1].value == "14" - - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::first+name>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minLength", value="1"), - TestCaseParameterValue(name="maxLength", value="10"), - ], - ) - - res = validation_enum_registry.registry["columnValueLengthsToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValueLengthsToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert res.testResultValue[0].value == "2" - assert res.testResultValue[1].value == "3" - assert dl_res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testResultValue[0].value == "2" - assert dl_res.testResultValue[1].value == "3" - - test_case = TestCase( - name="my_test_case_two", - entityLink="<#E::table::service.db.users::columns::first+name>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="maxLength", value="10"), - ], - ) - - res = validation_enum_registry.registry["columnValueLengthsToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - dl_res = validation_enum_registry.registry["columnValueLengthsToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testCaseStatus == TestCaseStatus.Success - - def test_column_value_max_to_be_between(self): - """test column value max to be between""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValueForMaxInCol", value="1"), - TestCaseParameterValue(name="maxValueForMaxInCol", value="10"), - ], - ) - - res = validation_enum_registry.registry["columnValueMaxToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValueMaxToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "31" - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testResultValue[0].value == "31.0" - - test_case = TestCase( - name="my_test_case_two", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="maxValueForMaxInCol", value="10"), - ], - ) - - res = validation_enum_registry.registry["columnValueMaxToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValueMaxToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testCaseStatus == TestCaseStatus.Failed - - def test_column_value_mean_to_be_between(self): - """test column value mean to be between""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValueForMeanInCol", value="1"), - TestCaseParameterValue(name="maxValueForMeanInCol", value="10"), - ], - ) - - res = validation_enum_registry.registry["columnValueMeanToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValueMeanToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "30.5" - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testResultValue[0].value == "30.5" - - test_case = TestCase( - name="my_test_case_two", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValueForMeanInCol", value="1"), - ], - ) - - res = validation_enum_registry.registry["columnValueMeanToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValueMeanToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testCaseStatus == TestCaseStatus.Success - - def test_column_value_median_to_be_between(self): - """test column value median to be between""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValueForMedianInCol", value="1"), - TestCaseParameterValue(name="maxValueForMedianInCol", value="10"), - ], - ) - - res = validation_enum_registry.registry["columnValueMedianToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValueMedianToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "30.0" - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testResultValue[0].value == "30.5" - - def test_column_value_min_to_be_between(self): - """test column value min to be between""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValueForMinInCol", value="25"), - TestCaseParameterValue(name="maxValueForMinInCol", value="40"), - ], - ) - - res = validation_enum_registry.registry["columnValueMinToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValueMinToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert res.testResultValue[0].value == "30" - assert dl_res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testResultValue[0].value == "30.0" - - test_case = TestCase( - name="my_test_case_two", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="maxValueForMinInCol", value="40"), - ], - ) - - res = validation_enum_registry.registry["columnValueMinToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValueMinToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testCaseStatus == TestCaseStatus.Success - - def test_column_value_stddev_to_be_between(self): - """test column value stddev to be between""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValueForStdDevInCol", value="20"), - TestCaseParameterValue(name="maxValueForStdDevInCol", value="40"), - ], - ) - - res = validation_enum_registry.registry["columnValueStdDevToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValueStdDevToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "0.25" - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testResultValue[0].value == "0.512989176042577" - - test_case = TestCase( - name="my_test_case_two", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="maxValueForStdDevInCol", value="40"), - ], - ) - - res = validation_enum_registry.registry["columnValueStdDevToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValueStdDevToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testCaseStatus == TestCaseStatus.Success - - def test_column_value_in_set(self): - """test column value in set""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::name>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="allowedValues", value="['John']"), - ], - ) - - res = validation_enum_registry.registry["columnValuesToBeInSet"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValuesToBeInSet"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert res.testResultValue[0].value == "20" - assert dl_res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testResultValue[0].value == "20" - - def test_column_values_missing_count_to_be_equal(self): - """test column value missing count to be equal""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::nickname>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="missingCountValue", value="10"), - ], - ) - - res = validation_enum_registry.registry["columnValuesMissingCount"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValuesMissingCount"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert res.testResultValue[0].value == "10" - assert dl_res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testResultValue[0].value == "10" - - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::nickname>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="missingCountValue", value="10"), - TestCaseParameterValue(name="missingValueMatch", value="['Johnny d']"), - ], - ) - - res = validation_enum_registry.registry["columnValuesMissingCount"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValuesMissingCount"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - assert res.testResultValue[0].value == "20" - assert dl_res.testResultValue[0].value == "20" - - def test_column_values_not_in_set(self): - """test column value not in set""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::name>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="forbiddenValues", value="['John']"), - ], - ) - - res = validation_enum_registry.registry["columnValuesToBeNotInSet"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValuesToBeNotInSet"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "20" - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testResultValue[0].value == "20" - - def test_column_sum_to_be_between(self): - """test column value sum to be between""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValueForColSum", value="10"), - TestCaseParameterValue(name="maxValueForColSum", value="100"), - ], - ) - - res = validation_enum_registry.registry["columnValuesSumToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValuesSumToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "610" - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testResultValue[0].value == "610.0" - - test_case = TestCase( - name="my_test_case_two", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValueForColSum", value="10"), - ], - ) - - res = validation_enum_registry.registry["columnValuesSumToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValuesSumToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testCaseStatus == TestCaseStatus.Success - - def test_column_values_to_be_between(self): - """test column value to be between""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValue", value="29"), - TestCaseParameterValue(name="maxValue", value="33"), - ], - ) - - res = validation_enum_registry.registry["columnValuesToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValuesToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert res.testResultValue[0].value == "30" - assert dl_res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testResultValue[0].value == "30.0" - - test_case = TestCase( - name="my_test_case_two", - entityLink="<#E::table::service.db.users::columns::age>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValue", value="29"), - ], - ) - - res = validation_enum_registry.registry["columnValuesToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValuesToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testCaseStatus == TestCaseStatus.Success - - def test_column_values_to_be_not_null(self): - """test column value to be not null""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::nickname>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - ) - - res = validation_enum_registry.registry["columnValuesToBeNotNull"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValuesToBeNotNull"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "10" - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testResultValue[0].value == "10" - - def test_column_values_to_be_unique(self): - """test column value to be unique""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::nickname>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - ) - - res = validation_enum_registry.registry["columnValuesToBeUnique"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["columnValuesToBeUnique"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "20" - assert res.testResultValue[1].value == "0" - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testResultValue[0].value == "30" - assert dl_res.testResultValue[1].value == "2" - - def test_column_values_to_match_regex(self): - """test column value to match regex""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::name>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="regex", value="J.*"), - ], - ) - - res = validation_enum_registry.registry["columnValuesToMatchRegex"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert res.testResultValue[0].value == "30" - - def test_column_values_to_not_match_regex(self): - """test column value to not match regex""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::name>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="forbiddenRegex", value="X%"), - ], - ) - - res = validation_enum_registry.registry["columnValuesToNotMatchRegex"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert res.testResultValue[0].value == "0" - - def test_table_column_count_to_be_between(self): - """test column value count to be between""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minColValue", value="2"), - TestCaseParameterValue(name="maxColValue", value="10"), - ], - ) - - res = validation_enum_registry.registry["tableColumnCountToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["tableColumnCountToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert res.testResultValue[0].value == "6" - assert dl_res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testResultValue[0].value == "6" - - test_case = TestCase( - name="my_test_case_two", - entityLink="<#E::table::service.db.users>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="maxColValue", value="10"), - ], - ) - - res = validation_enum_registry.registry["tableColumnCountToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["tableColumnCountToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testCaseStatus == TestCaseStatus.Success - - def test_table_column_count_to_equal(self): - """test column value to be equal""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[TestCaseParameterValue(name="columnCount", value="7")], - ) - - res = validation_enum_registry.registry["tableColumnCountToEqual"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["tableColumnCountToEqual"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "6" - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testResultValue[0].value == "6" - - def test_table_column_name_to_exist(self): - """test column name to exist""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[TestCaseParameterValue(name="columnName", value="id")], - ) - - res = validation_enum_registry.registry["tableColumnNameToExist"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["tableColumnNameToExist"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert res.testResultValue[0].value == "True" - assert dl_res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testResultValue[0].value == "True" - - def test_column_to_match_set(self): - """test column names to match set""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="columnNames", value="id,name,nickname") - ], - ) - - res = validation_enum_registry.registry["tableColumnToMatchSet"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["tableColumnToMatchSet"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert ( - res.testResultValue[0].value - == "['first name', 'id', 'name', 'fullname', 'nickname', 'age']" - ) - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert ( - dl_res.testResultValue[0].value - == "['id', 'name', 'first name', 'fullname', 'nickname', 'age']" - ) - - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue( - name="columnNames", value="id,name,nickname,fullname,age" - ) - ], - ) - - res = validation_enum_registry.registry["tableColumnToMatchSet"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["tableColumnToMatchSet"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testCaseStatus == TestCaseStatus.Failed - - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue( - name="columnNames", value="id,name,nickname,fullname,age" - ), - TestCaseParameterValue(name="ordered", value="True"), - ], - ) - - res = validation_enum_registry.registry["tableColumnToMatchSet"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["tableColumnToMatchSet"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testCaseStatus == TestCaseStatus.Failed - - def test_table_custom_sql_query(self): - """test custom sql""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users::columns::name>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue( - name="sqlExpression", value="SELECT * FROM users WHERE age > 20" - ), - ], - ) - - res = validation_enum_registry.registry["tableCustomSQLQuery"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "20" - - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue( - name="sqlExpression", value="SELECT * FROM users WHERE age < 0" - ), - ], - ) - - res = validation_enum_registry.registry["tableCustomSQLQuery"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert res.testCaseStatus == TestCaseStatus.Success - - def test_table_row_count_to_be_between(self): - """test row count to be between""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValue", value="10"), - TestCaseParameterValue(name="maxValue", value="35"), - ], - ) - - res = validation_enum_registry.registry["tableRowCountToBeBetween"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["tableRowCountToBeBetween"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert res.testResultValue[0].value == "30" - assert dl_res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testResultValue[0].value == "30" - - test_case = TestCase( - name="my_test_case_two", - entityLink="<#E::table::service.db.users>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="minValue", value="10"), - ], - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Success - assert res.testResultValue[0].value == "30" - assert dl_res.testCaseStatus == TestCaseStatus.Success - assert dl_res.testResultValue[0].value == "30" - - def test_table_row_count_to_be_equal(self): - """test row count to be equal""" - test_case = TestCase( - name="my_test_case", - entityLink="<#E::table::service.db.users>", - testSuite=EntityReference(id=uuid4(), type="TestSuite"), - testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), - parameterValues=[ - TestCaseParameterValue(name="value", value="10"), - ], - ) - - res = validation_enum_registry.registry["tableRowCountToEqual"]( - self.runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - dl_res = validation_enum_registry.registry["tableRowCountToEqual"]( - self.dl_runner, - test_case=test_case, - execution_date=EXECUTION_DATE.timestamp(), - ) - - assert isinstance(res, TestCaseResult) - assert isinstance(dl_res, TestCaseResult) - assert res.testCaseStatus == TestCaseStatus.Failed - assert res.testResultValue[0].value == "30" - assert dl_res.testCaseStatus == TestCaseStatus.Failed - assert dl_res.testResultValue[0].value == "30" - - @classmethod - def tearDownClass(cls) -> None: - os.remove(cls.db_path) - return super().tearDownClass() diff --git a/ingestion/tests/unit/test_suite/test_validations_databases.py b/ingestion/tests/unit/test_suite/test_validations_databases.py new file mode 100644 index 00000000000..51c12438886 --- /dev/null +++ b/ingestion/tests/unit/test_suite/test_validations_databases.py @@ -0,0 +1,219 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test Table and Column Tests' validate implementations. + +Each test should validate the Success, Failure and Aborted statuses +""" +from datetime import datetime + +import pytest + +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.test_suite.validations.core import validation_enum_registry + +EXECUTION_DATE = datetime.strptime("2021-07-03", "%Y-%m-%d") + +# pylint: disable=line-too-long +@pytest.mark.parametrize( + "test_case_name,test_case_type,expected", + [ + ( + "test_case_column_value_length_to_be_between", + "columnValueLengthsToBeBetween", + (TestCaseResult, "8", "14", TestCaseStatus.Failed), + ), + ( + "test_case_column_value_length_to_be_between_col_space", + "columnValueLengthsToBeBetween", + (TestCaseResult, "2", "3", TestCaseStatus.Success), + ), + ( + "test_case_column_value_length_to_be_between_no_min", + "columnValueLengthsToBeBetween", + (TestCaseResult, None, None, TestCaseStatus.Success), + ), + ( + "test_case_column_value_max_to_be_between", + "columnValueMaxToBeBetween", + (TestCaseResult, "31", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_value_max_to_be_between_no_min", + "columnValueMaxToBeBetween", + (TestCaseResult, None, None, TestCaseStatus.Failed), + ), + ( + "test_case_column_value_mean_to_be_between", + "columnValueMeanToBeBetween", + (TestCaseResult, "30.5", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_value_mean_to_be_between_no_max", + "columnValueMeanToBeBetween", + (TestCaseResult, None, None, TestCaseStatus.Success), + ), + ( + "test_case_column_value_median_to_be_between", + "columnValueMedianToBeBetween", + (TestCaseResult, "30.0", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_value_min_to_be_between", + "columnValueMinToBeBetween", + (TestCaseResult, "30", None, TestCaseStatus.Success), + ), + ( + "test_case_column_value_min_to_be_between_no_min", + "columnValueMinToBeBetween", + (TestCaseResult, None, None, TestCaseStatus.Success), + ), + ( + "test_case_column_value_stddev_to_be_between", + "columnValueStdDevToBeBetween", + (TestCaseResult, "0.25", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_value_stddev_to_be_between_no_min", + "columnValueStdDevToBeBetween", + (TestCaseResult, None, None, TestCaseStatus.Success), + ), + ( + "test_case_column_value_in_set", + "columnValuesToBeInSet", + (TestCaseResult, "20", None, TestCaseStatus.Success), + ), + ( + "test_case_column_values_missing_count_to_be_equal", + "columnValuesMissingCount", + (TestCaseResult, "10", None, TestCaseStatus.Success), + ), + ( + "test_case_column_values_missing_count_to_be_equal_missing_valuesl", + "columnValuesMissingCount", + (TestCaseResult, "20", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_values_not_in_set", + "columnValuesToBeNotInSet", + (TestCaseResult, "20", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_sum_to_be_between", + "columnValuesSumToBeBetween", + (TestCaseResult, "610", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_values_to_be_between", + "columnValuesToBeBetween", + (TestCaseResult, "30", None, TestCaseStatus.Success), + ), + ( + "test_case_column_values_to_be_not_null", + "columnValuesToBeNotNull", + (TestCaseResult, "10", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_values_to_be_unique", + "columnValuesToBeUnique", + (TestCaseResult, "20", "0", TestCaseStatus.Failed), + ), + ( + "test_case_column_values_to_match_regex", + "columnValuesToMatchRegex", + (TestCaseResult, "30", None, TestCaseStatus.Success), + ), + ( + "test_case_column_values_to_not_match_regex", + "columnValuesToNotMatchRegex", + (TestCaseResult, "0", None, TestCaseStatus.Success), + ), + ( + "test_case_table_column_count_to_be_between", + "tableColumnCountToBeBetween", + (TestCaseResult, "7", None, TestCaseStatus.Success), + ), + ( + "test_case_table_column_count_to_equal", + "tableColumnCountToEqual", + (TestCaseResult, "7", None, TestCaseStatus.Failed), + ), + ( + "test_case_table_column_name_to_exist", + "tableColumnNameToExist", + (TestCaseResult, "True", None, TestCaseStatus.Success), + ), + ( + "test_case_column_to_match_set", + "tableColumnToMatchSet", + ( + TestCaseResult, + "['first name', 'id', 'name', 'fullname', 'nickname', 'age', 'inserted_date']", + None, + TestCaseStatus.Failed, + ), + ), + ( + "test_case_column_to_match_set_ordered", + "tableColumnToMatchSet", + (TestCaseResult, None, None, TestCaseStatus.Failed), + ), + ( + "test_case_table_custom_sql_query", + "tableCustomSQLQuery", + (TestCaseResult, "20", None, TestCaseStatus.Failed), + ), + ( + "test_case_table_custom_sql_query_success", + "tableCustomSQLQuery", + (TestCaseResult, "0", None, TestCaseStatus.Success), + ), + ( + "test_case_table_row_count_to_be_between", + "tableRowCountToBeBetween", + (TestCaseResult, "30", None, TestCaseStatus.Success), + ), + ( + "test_case_table_row_count_to_be_equal", + "tableRowCountToEqual", + (TestCaseResult, "30", None, TestCaseStatus.Failed), + ), + ( + "test_case_table_row_inserted_count_to_be_between", + "tableRowInsertedCountToBeBetween", + (TestCaseResult, "6", None, TestCaseStatus.Success), + ), + ], +) +def test_suite_validation_database( + test_case_name, + test_case_type, + expected, + request, + create_sqlite_table, +): + """Generic test runner for test validations""" + test_case = request.getfixturevalue(test_case_name) + type_, val_1, val_2, status = expected + + res = validation_enum_registry.registry[test_case_type]( + create_sqlite_table, + test_case=test_case, + execution_date=EXECUTION_DATE.timestamp(), + ) + + assert isinstance(res, type_) + if val_1: + assert res.testResultValue[0].value == val_1 + if val_2: + assert res.testResultValue[1].value == val_2 + assert res.testCaseStatus == status diff --git a/ingestion/tests/unit/test_suite/test_validations_datalake.py b/ingestion/tests/unit/test_suite/test_validations_datalake.py new file mode 100644 index 00000000000..74dc29ee0fa --- /dev/null +++ b/ingestion/tests/unit/test_suite/test_validations_datalake.py @@ -0,0 +1,210 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test Table and Column Tests' validate implementations. + +Each test should validate the Success, Failure and Aborted statuses +""" + +from datetime import datetime + +import pytest +from pandas import DataFrame + +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.test_suite.validations.core import validation_enum_registry + +EXECUTION_DATE = datetime.strptime("2021-07-03", "%Y-%m-%d") +DL_DATA = ( + ["1", "John", "Jo", "John Doe", "johnny b goode", 30], + ["2", "Jane", "Ja", "Jone Doe", "Johnny d", 31], + ["3", "John", "Joh", "John Doe", None, None], +) * 10 + + +DATALAKE_DATA_FRAME = DataFrame( + DL_DATA, columns=["id", "name", "first name", "fullname", "nickname", "age"] +) + +# pylint: disable=line-too-long +@pytest.mark.parametrize( + "test_case_name,test_case_type,expected", + [ + ( + "test_case_column_value_length_to_be_between", + "columnValueLengthsToBeBetween", + (TestCaseResult, "4", "14", TestCaseStatus.Failed), + ), + ( + "test_case_column_value_length_to_be_between_col_space", + "columnValueLengthsToBeBetween", + (TestCaseResult, "2", "3", TestCaseStatus.Success), + ), + ( + "test_case_column_value_length_to_be_between_no_min", + "columnValueLengthsToBeBetween", + (TestCaseResult, None, None, TestCaseStatus.Success), + ), + ( + "test_case_column_value_max_to_be_between", + "columnValueMaxToBeBetween", + (TestCaseResult, "31.0", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_value_max_to_be_between_no_min", + "columnValueMaxToBeBetween", + (TestCaseResult, None, None, TestCaseStatus.Failed), + ), + ( + "test_case_column_value_mean_to_be_between", + "columnValueMeanToBeBetween", + (TestCaseResult, "30.5", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_value_mean_to_be_between_no_max", + "columnValueMeanToBeBetween", + (TestCaseResult, None, None, TestCaseStatus.Success), + ), + ( + "test_case_column_value_median_to_be_between", + "columnValueMedianToBeBetween", + (TestCaseResult, "30.5", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_value_min_to_be_between", + "columnValueMinToBeBetween", + (TestCaseResult, "30.0", None, TestCaseStatus.Success), + ), + ( + "test_case_column_value_min_to_be_between_no_min", + "columnValueMinToBeBetween", + (TestCaseResult, None, None, TestCaseStatus.Success), + ), + ( + "test_case_column_value_stddev_to_be_between", + "columnValueStdDevToBeBetween", + (TestCaseResult, "0.512989176042577", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_value_stddev_to_be_between_no_min", + "columnValueStdDevToBeBetween", + (TestCaseResult, None, None, TestCaseStatus.Success), + ), + ( + "test_case_column_value_in_set", + "columnValuesToBeInSet", + (TestCaseResult, "20", None, TestCaseStatus.Success), + ), + ( + "test_case_column_values_missing_count_to_be_equal", + "columnValuesMissingCount", + (TestCaseResult, "10", None, TestCaseStatus.Success), + ), + ( + "test_case_column_values_missing_count_to_be_equal_missing_valuesl", + "columnValuesMissingCount", + (TestCaseResult, "20", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_values_not_in_set", + "columnValuesToBeNotInSet", + (TestCaseResult, "20", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_sum_to_be_between", + "columnValuesSumToBeBetween", + (TestCaseResult, "610.0", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_values_to_be_between", + "columnValuesToBeBetween", + (TestCaseResult, "30.0", None, TestCaseStatus.Success), + ), + ( + "test_case_column_values_to_be_not_null", + "columnValuesToBeNotNull", + (TestCaseResult, "10", None, TestCaseStatus.Failed), + ), + ( + "test_case_column_values_to_be_unique", + "columnValuesToBeUnique", + (TestCaseResult, "30", "2", TestCaseStatus.Failed), + ), + ( + "test_case_table_column_count_to_be_between", + "tableColumnCountToBeBetween", + (TestCaseResult, "6", None, TestCaseStatus.Success), + ), + ( + "test_case_table_column_count_to_equal", + "tableColumnCountToEqual", + (TestCaseResult, "6", None, TestCaseStatus.Failed), + ), + ( + "test_case_table_column_name_to_exist", + "tableColumnNameToExist", + (TestCaseResult, "True", None, TestCaseStatus.Success), + ), + ( + "test_case_column_to_match_set", + "tableColumnToMatchSet", + ( + TestCaseResult, + "['id', 'name', 'first name', 'fullname', 'nickname', 'age']", + None, + TestCaseStatus.Failed, + ), + ), + ( + "test_case_column_to_match_set_ordered", + "tableColumnToMatchSet", + (TestCaseResult, None, None, TestCaseStatus.Failed), + ), + ( + "test_case_table_row_count_to_be_between", + "tableRowCountToBeBetween", + (TestCaseResult, "30", None, TestCaseStatus.Success), + ), + ( + "test_case_table_row_count_to_be_equal", + "tableRowCountToEqual", + (TestCaseResult, "30", None, TestCaseStatus.Failed), + ), + ( + "test_case_table_row_inserted_count_to_be_between", + "tableRowInsertedCountToBeBetween", + (TestCaseResult, None, None, TestCaseStatus.Aborted), + ), + ], +) +def test_suite_validation_datalake( + test_case_name, + test_case_type, + expected, + request, +): + """Generic test runner for test validations""" + test_case = request.getfixturevalue(test_case_name) + type_, val_1, val_2, status = expected + + res = validation_enum_registry.registry[test_case_type]( + DATALAKE_DATA_FRAME, + test_case=test_case, + execution_date=EXECUTION_DATE.timestamp(), + ) + + assert isinstance(res, type_) + if val_1: + assert res.testResultValue[0].value == val_1 + if val_2: + assert res.testResultValue[1].value == val_2 + assert res.testCaseStatus == status diff --git a/ingestion/tests/unit/topology/test_sqa_utils.py b/ingestion/tests/unit/topology/test_sqa_utils.py new file mode 100644 index 00000000000..92493c40f80 --- /dev/null +++ b/ingestion/tests/unit/topology/test_sqa_utils.py @@ -0,0 +1,61 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +sqlalchemy utility tests +""" + +import pytest +import sqlalchemy +from sqlalchemy import Column, text + +from metadata.utils.sqa_utils import build_query_filter, dispatch_to_date_or_datetime + +FILTER_TEST_DATA = [ + ( + [(Column("foo"), "eq", "foo"), (Column("bar"), "ne", "bar")], + "foo = 'foo' AND bar != 'bar'", + ), + ([(Column("foo"), "gt", 1), (Column("bar"), "lt", 2)], "foo > 1 AND bar < 2"), + ([(Column("foo"), "in", [1, 2, 3, 4])], "foo IN (1, 2, 3, 4)"), + ([(Column("foo"), "not_in", [1, 2, 3, 4])], "(foo NOT IN (1, 2, 3, 4))"), +] + +DISPATCH_TEST_DATA = [ + (1, "DAY", sqlalchemy.DATE(), "CAST(CURRENT_DATE - interval '1' DAY AS DATE)"), + ( + 1, + "HOUR", + sqlalchemy.DATETIME(), + "CAST(CURRENT_TIMESTAMP - interval '1' HOUR AS TIMESTAMP)", + ), +] + + +@pytest.mark.parametrize( + "filters,expected", FILTER_TEST_DATA, ids=["eq", "gt_lt", "in", "not_in"] +) +def test_build_query_filter(filters, expected): + """Test SQA query filter builder""" + filter_ = build_query_filter(filters, False) + assert filter_.compile(compile_kwargs={"literal_binds": True}).string == expected + + +@pytest.mark.parametrize( + "interval,interval_type,type_,expected", + DISPATCH_TEST_DATA, + ids=["date", "datetime"], +) +def test_dispatch_to_date_or_datetime(interval, interval_type, type_, expected): + """Test dispatch function""" + fn = dispatch_to_date_or_datetime(interval, text(interval_type), type_) + + assert fn.compile(compile_kwargs={"literal_binds": True}).string == expected diff --git a/openmetadata-service/src/main/resources/json/data/tests/tableRowInsertedCountToBeBetween.json b/openmetadata-service/src/main/resources/json/data/tests/tableRowInsertedCountToBeBetween.json new file mode 100644 index 00000000000..8db90f1dfc3 --- /dev/null +++ b/openmetadata-service/src/main/resources/json/data/tests/tableRowInsertedCountToBeBetween.json @@ -0,0 +1,46 @@ +{ + "name": "tableRowInsertedCountToBeBetween", + "fullyQualifiedName": "tableRowInsertedCountToBeBetween", + "displayName": "Table Row Inserted Count To be Between", + "description": "This schema defines the test tableRowInsertedCountToBeBetween. Test the number of rows inserted is between x and y.", + "entityType": "TABLE", + "testPlatforms": ["OpenMetadata"], + "parameterDefinition": [ + { + "name": "min", + "displayName": "Min Row Count", + "description": "Lower Bound of the Count", + "dataType": "INT", + "required": false + }, + { + "name": "max", + "displayName": "Max Row Count", + "description": "Upper Bound of the Count", + "dataType": "INT", + "required": false + }, + { + "name": "columnName", + "displayName": "Column Name", + "description": "Name of the Column. It should be a timestamp, date or datetime field.", + "dataType": "STRING", + "required": true + }, + { + "name": "rangeType", + "displayName": "Range Type", + "description": "One of 'HOUR', 'DAY', 'MONTH', 'YEAR'", + "dataType": "STRING", + "required": true + }, + { + "name": "rangeInterval", + "displayName": "Interval", + "description": "Interval Range. E.g. if rangeInterval=1 and rangeType=DAY, we'll check the numbers of rows inserted where columnName=-1 DAY", + "dataType": "INT", + "required": true + } + ] + } + \ No newline at end of file