Issue #7291 - Implements Table Rows Inserted to be Between test (#9813)

* staging commit

* staging commit

* refactor: partitioning logic

* refactor (tests): move to parametrized tests for test validations

* refactor: local variables into global

* (feat): Added logic for table row inserted test

* (feat): fix python checkstyle

* feature: extracted get_query_filter logic into its own function
This commit is contained in:
Teddy 2023-01-31 15:57:51 +01:00 committed by GitHub
parent 2c71c2c6be
commit ba08302ea1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1617 additions and 1279 deletions

View File

@ -15,12 +15,11 @@ supporting sqlalchemy abstraction layer
"""
from typing import Dict, Optional
from typing import Optional
from sqlalchemy import Column, MetaData, inspect
from sqlalchemy.orm import DeclarativeMeta
from metadata.generated.schema.entity.data.table import PartitionProfilerConfig
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeType,
)
@ -86,27 +85,6 @@ class SQAInterfaceMixin:
)
)
def get_partition_details(
self, partition_config: Optional[PartitionProfilerConfig]
) -> Optional[Dict]:
"""From partition config, get the partition table for a table entity
Args:
partition_config: PartitionProfilerConfig object with some partition details
Returns:
dict or None: dictionary with all the elements constituting the a partition
"""
if not partition_config:
return None
return {
"partition_field": partition_config.partitionColumnName,
"partition_values": partition_config.partitionValues,
"partition_interval_unit": partition_config.partitionIntervalUnit.value,
"partition_interval": partition_config.partitionInterval,
}
def close(self):
"""close session"""
self.session.close()

View File

@ -80,7 +80,7 @@ class SQAProfilerInterface(SQAInterfaceMixin, ProfilerProtocol):
self.profile_sample_config = profiler_interface_args.profile_sample_config
self.profile_query = profiler_interface_args.table_sample_query
self.partition_details = (
self.get_partition_details(profiler_interface_args.table_partition_config)
profiler_interface_args.table_partition_config
if not self.profile_query
else None
)

View File

@ -21,7 +21,7 @@ from sqlalchemy import MetaData
from sqlalchemy.orm import DeclarativeMeta
from sqlalchemy.orm.util import AliasedClass
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.table import PartitionProfilerConfig, Table
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase
@ -56,7 +56,7 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteProtocol):
sqa_metadata_obj: Optional[MetaData] = None,
profile_sample_config: Optional[ProfileSampleConfig] = None,
table_sample_query: str = None,
table_partition_config: dict = None,
table_partition_config: Optional[PartitionProfilerConfig] = None,
table_entity: Table = None,
):
self.ometa_client = ometa_client
@ -72,9 +72,7 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteProtocol):
self.profile_sample_config = profile_sample_config
self.table_sample_query = table_sample_query
self.table_partition_config = (
self.get_partition_details(table_partition_config)
if not self.table_sample_query
else None
table_partition_config if not self.table_sample_query else None
)
self._sampler = self._create_sampler()

View File

@ -22,6 +22,7 @@ from sqlalchemy import text
from sqlalchemy.orm import DeclarativeMeta, Session
from sqlparse.sql import Identifier
from metadata.generated.schema.entity.data.table import DmlOperationType
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
@ -258,6 +259,8 @@ def _(
token.value
for token in query_text.tokens
if token.ttype is sqlparse.tokens.DML
and token.value.upper()
in DmlOperationType._member_names_ # pylint: disable=protected-access
),
None,
)

View File

@ -90,6 +90,13 @@ def _(elements, compiler, **kwargs):
return f"DATEADD({interval_unit}, -{interval}, {func.current_date()})"
@compiles(DateAddFn, Dialects.SQLite)
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
interval = elements.clauses.clauses[0].value
interval_unit = elements.clauses.clauses[1].text
return f"DATE({func.current_date()}, '-{interval} {interval_unit}')"
class DatetimeAddFn(FunctionElement):
inherit_cache = CACHE
@ -151,3 +158,10 @@ def _(elements, compiler, **kwargs):
compiler.process(element, **kwargs) for element in elements.clauses
]
return f"DATEADD({interval_unit}, -{interval}, {func.current_timestamp()})"
@compiles(DatetimeAddFn, Dialects.SQLite)
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
interval = elements.clauses.clauses[0].value
interval_unit = elements.clauses.clauses[1].text
return f"DATE({func.current_timestamp()}, '-{interval} {interval_unit}')"

View File

@ -14,45 +14,29 @@ Helper submodule for partitioned tables
from __future__ import annotations
import sqlalchemy
from sqlalchemy import Column, text
from sqlalchemy.sql import and_
from typing import List
from metadata.orm_profiler.orm.functions.datetime import DateAddFn, DatetimeAddFn
from sqlalchemy import Column, text
from metadata.generated.schema.entity.data.table import PartitionProfilerConfig
from metadata.orm_profiler.orm.functions.modulo import ModuloFn
from metadata.orm_profiler.orm.functions.random_num import RandomNumFn
from metadata.utils.logger import profiler_logger
from metadata.utils.sqa_utils import (
build_query_filter,
dispatch_to_date_or_datetime,
get_partition_col_type,
)
RANDOM_LABEL = "random"
logger = profiler_logger()
def format_partition_datetime(
partition_field: str,
partition_interval: int,
partition_interval_unit: str,
col_type,
def build_partition_predicate(
partition_details: PartitionProfilerConfig,
columns: List[Column],
):
"""format partition predicate
Args:
partition_field (_type_): _description_
partition_interval (_type_): _description_
partition_interval_unit (_type_): _description_
"""
if isinstance(col_type, (sqlalchemy.DATE)):
return and_(
Column(partition_field)
>= DateAddFn(partition_interval, text(partition_interval_unit))
)
return and_(
Column(partition_field)
>= DatetimeAddFn(partition_interval, text(partition_interval_unit))
)
def build_partition_predicate(partition_details: dict, col: Column):
"""_summary_
Args:
@ -62,20 +46,27 @@ def build_partition_predicate(partition_details: dict, col: Column):
Returns:
_type_: _description_
"""
if partition_details["partition_values"]:
return col.in_(partition_details["partition_values"])
partition_field = partition_details.partitionColumnName
if partition_details.partitionValues:
return build_query_filter(
[(Column(partition_field), "in", partition_details.partitionValues)],
False,
)
col_type = None
if col is not None:
col_type = col.type
if partition_details["partition_field"] == "_PARTITIONDATE":
col_type = sqlalchemy.DATE
type_ = get_partition_col_type(
partition_field,
columns,
)
return format_partition_datetime(
partition_details["partition_field"],
partition_details["partition_interval"],
partition_details["partition_interval_unit"],
col_type,
date_or_datetime_fn = dispatch_to_date_or_datetime(
partition_details.partitionInterval,
text(partition_details.partitionIntervalUnit.value),
type_,
)
return build_query_filter(
[(Column(partition_field), "ge", date_or_datetime_fn)],
False,
)
@ -103,12 +94,9 @@ class partition_filter_handler:
def handle_and_execute(_self, *args, **kwargs):
"""Handle partitioned queries"""
if _self._partition_details:
partition_field = _self._partition_details[
"partition_field"
].lower() # normnalizing field name as we'll lookup by key
partition_filter = build_partition_predicate(
_self._partition_details,
_self.table.__table__.c.get(partition_field),
_self.table.__table__.c,
)
if self.build_sample:
return (

View File

@ -23,6 +23,10 @@ from sqlalchemy.orm import DeclarativeMeta, Query, Session
from sqlalchemy.orm.util import AliasedClass
from metadata.orm_profiler.profiler.handle_partition import partition_filter_handler
from metadata.utils.logger import query_runner_logger
from metadata.utils.sqa_utils import get_query_filter_for_runner
logger = query_runner_logger()
class QueryRunner:
@ -55,25 +59,60 @@ class QueryRunner:
return self._session.query(*entities, **kwargs)
def _select_from_sample(self, *entities, **kwargs):
return self._build_query(*entities, **kwargs).select_from(self._sample)
"""Run select statement against sample data"""
filter_ = get_query_filter_for_runner(kwargs)
query = self._build_query(*entities, **kwargs).select_from(self._sample)
if filter_ is not None:
return query.filter(filter_)
return query
def _select_from_user_query(self, *entities, **kwargs):
"""Run select statement against user defined query"""
filter_ = get_query_filter_for_runner(kwargs)
user_query = self._session.query(self.table).from_statement(
text(f"{self._profile_sample_query}")
)
return self._build_query(*entities, **kwargs).select_from(user_query)
query = self._build_query(*entities, **kwargs).select_from(user_query)
if filter_ is not None:
return query.filter(filter_)
return query
@partition_filter_handler()
def select_first_from_table(self, *entities, **kwargs):
"""Select first row from the table"""
filter_ = get_query_filter_for_runner(kwargs)
if self._profile_sample_query:
return self._select_from_user_query(*entities, **kwargs).first()
return self._build_query(*entities, **kwargs).select_from(self.table).first()
query = self._build_query(*entities, **kwargs).select_from(self.table)
if filter_ is not None:
return query.filter(filter_).first()
return query.first()
@partition_filter_handler(first=False)
def select_all_from_table(self, *entities, **kwargs):
"""Select all rows from the table"""
filter_ = get_query_filter_for_runner(kwargs)
if self._profile_sample_query:
return self._select_from_user_query(*entities, **kwargs).all()
return self._build_query(*entities, **kwargs).select_from(self.table).all()
query = self._build_query(*entities, **kwargs).select_from(self.table)
if filter_ is not None:
return query.filter(filter_).all()
return query.all()
@partition_filter_handler(sampled=True)
def select_first_from_sample(self, *entities, **kwargs):

View File

@ -12,21 +12,26 @@
Helper module to handle data sampling
for the profiler
"""
from typing import Dict, Optional, Union
from typing import Dict, Optional, Union, cast
import sqlalchemy
from sqlalchemy import column, inspect, text
from sqlalchemy.orm import DeclarativeMeta, Query, Session, aliased
from sqlalchemy.orm.util import AliasedClass
from metadata.generated.schema.entity.data.table import ProfileSampleType, TableData
from metadata.generated.schema.entity.data.table import (
PartitionProfilerConfig,
ProfileSampleType,
TableData,
)
from metadata.orm_profiler.api.models import ProfileSampleConfig
from metadata.orm_profiler.orm.functions.modulo import ModuloFn
from metadata.orm_profiler.orm.functions.random_num import RandomNumFn
from metadata.orm_profiler.orm.registry import Dialects
from metadata.orm_profiler.profiler.handle_partition import (
format_partition_datetime,
partition_filter_handler,
from metadata.orm_profiler.profiler.handle_partition import partition_filter_handler
from metadata.utils.sqa_utils import (
build_query_filter,
dispatch_to_date_or_datetime,
get_partition_col_type,
)
RANDOM_LABEL = "random"
@ -92,7 +97,7 @@ class Sampler:
if not self.profile_sample:
if self._partition_details:
return self._random_sample_for_partitioned_tables()
return self._partitioned_table()
return self.table
@ -148,36 +153,56 @@ class Sampler:
text(f"{self._profile_sample_query}")
)
def _random_sample_for_partitioned_tables(self) -> Query:
def _partitioned_table(self) -> Query:
"""Return the Query object for partitioned tables"""
partition_field = self._partition_details["partition_field"]
col = self.table.__table__.c.get(partition_field.lower())
col_type = None
if col is not None:
col_type = col.type
if partition_field == "_PARTITIONDATE":
col_type = sqlalchemy.DATE
if partition_field == "_PARTITIONTIME":
col_type = sqlalchemy.DATETIME()
self._partition_details = cast(
PartitionProfilerConfig, self._partition_details
) # satisfying type checker
partition_field = self._partition_details.partitionColumnName
if not self._partition_details.get("partition_values"):
type_ = get_partition_col_type(
partition_field,
self.table.__table__.c,
)
if not self._partition_details.partitionValues:
sample = (
self.session.query(self.table)
.filter(
format_partition_datetime(
partition_field,
self._partition_details["partition_interval"],
self._partition_details["partition_interval_unit"],
col_type,
build_query_filter(
[
(
column(partition_field),
"ge",
dispatch_to_date_or_datetime(
self._partition_details.partitionInterval,
text(
self._partition_details.partitionIntervalUnit.value
),
type_,
),
)
],
False,
)
)
.subquery()
)
return aliased(self.table, sample)
sample = (
self.session.query(self.table)
.filter(
column(partition_field).in_(self._partition_details["partition_values"])
build_query_filter(
[
(
column(partition_field),
"in",
self._partition_details.partitionValues,
)
],
False,
)
)
.subquery()
)

View File

@ -87,6 +87,9 @@ from metadata.test_suite.validations.table.table_row_count_to_be_between import
from metadata.test_suite.validations.table.table_row_count_to_equal import (
table_row_count_to_equal,
)
from metadata.test_suite.validations.table.table_row_inserted_count_to_be_between import (
table_row_inserted_count_to_be_between,
)
from metadata.utils.dispatch import enum_register
from metadata.utils.logger import profiler_logger
@ -104,6 +107,9 @@ validation_enum_registry.add("tableColumnCountToBeBetween")(
validation_enum_registry.add("tableColumnToMatchSet")(table_column_to_match_set)
validation_enum_registry.add("tableColumnNameToExist")(table_column_name_to_exist)
validation_enum_registry.add("tableCustomSQLQuery")(table_custom_sql_query)
validation_enum_registry.add("tableRowInsertedCountToBeBetween")(
table_row_inserted_count_to_be_between
)
# # # Column Tests
validation_enum_registry.add("columnValuesToBeBetween")(column_values_to_be_between)

View File

@ -0,0 +1,170 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TableRowInsertedCountToBeBetween validation implementation
"""
import traceback
from datetime import datetime
from functools import singledispatch
from typing import List, Optional, Union, cast
from pandas import DataFrame
from sqlalchemy import Column, text
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
TestResultValue,
)
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.logger import test_suite_logger
from metadata.utils.sqa_utils import (
dispatch_to_date_or_datetime,
get_partition_col_type,
)
from metadata.utils.test_suite import build_test_case_result, get_test_case_param_value
logger = test_suite_logger()
@singledispatch
def table_row_inserted_count_to_be_between(
runner,
test_case: TestCase,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@table_row_inserted_count_to_be_between.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""_summary_
Args:
runner (QueryRunner): _description_
test_case (TestCase): _description_
execution_date (Union[datetime, float]): _description_
Returns:
TestCaseResult: _description_
"""
parameter_values = test_case.parameterValues
parameter_values = cast(List[TestCaseParameterValue], parameter_values)
column_name: Optional[Column] = get_test_case_param_value(
parameter_values,
"columnName",
Column,
)
range_type: Optional[str] = get_test_case_param_value(
parameter_values,
"rangeType",
str,
)
range_interval: Optional[int] = get_test_case_param_value(
parameter_values,
"rangeInterval",
int,
)
if any(var is None for var in [column_name, range_type, range_interval]):
result = (
f"Error computing {test_case.name} for {runner.table.__tablename__}:"
" No value found for columnName, rangeType or rangeInterval"
)
logger.debug(traceback.format_exc())
logger.warning(result)
return build_test_case_result(
execution_date,
TestCaseStatus.Aborted,
result,
[TestResultValue(name="rowCount", value=None)],
sample_data=None,
)
try:
date_or_datetime_fn = dispatch_to_date_or_datetime(
range_interval,
text(range_type),
get_partition_col_type(column_name.name, runner.table.__table__.c),
)
row_count_dict = dict(
runner.dispatch_query_select_first(
Metrics.ROW_COUNT.value().fn(),
query_filter_={
"filters": [(column_name, "ge", date_or_datetime_fn)],
"or_filter": False,
},
)
)
row_count_value = row_count_dict.get(Metrics.ROW_COUNT.name)
except Exception as exc:
result = (
f"Error computing {test_case.name} for {runner.table.__tablename__}: {exc}"
)
logger.debug(traceback.format_exc())
logger.warning(result)
return build_test_case_result(
execution_date,
TestCaseStatus.Aborted,
result,
[TestResultValue(name="rowCount", value=None)],
sample_data=None,
)
min_: Union[int, float] = get_test_case_param_value(
parameter_values,
"min",
int,
float("-inf"),
)
max_: Union[int, float] = get_test_case_param_value(
parameter_values,
"max",
int,
float("inf"),
)
status = (
TestCaseStatus.Success
if min_ <= row_count_value <= max_
else TestCaseStatus.Failed
)
result = f"Found {row_count_value} rows vs. the expected range [{min_}, {max_}]."
return build_test_case_result(
execution_date,
status,
result,
[TestResultValue(name="rowCount", value=str(row_count_value))],
sample_data=None,
)
@table_row_inserted_count_to_be_between.register
def _(
runner: DataFrame, test_case: TestCase, execution_date: Union[datetime, float]
): # pylint: disable=unused-argument
result = "Test is currently not supported for datalake sources."
return build_test_case_result(
execution_date,
TestCaseStatus.Aborted,
result,
[TestResultValue(name="rowCount", value=None)],
sample_data=None,
)

View File

@ -40,6 +40,7 @@ class Loggers(Enum):
PROFILER_INTERFACE = "ProfilerInterface"
TEST_SUITE = "TestSuite"
DATA_INSIGHT = "DataInsight"
QUERY_RUNNER = "QueryRunner"
@DynamicClassAttribute
def value(self):
@ -131,6 +132,14 @@ def great_expectations_logger():
return logging.getLogger(Loggers.GREAT_EXPECTATIONS.value)
def query_runner_logger():
"""
Method to get the QUERY_RUNNER logger
"""
return logging.getLogger(Loggers.QUERY_RUNNER.value)
def set_loggers_level(level: Union[int, str] = logging.INFO):
"""
Set all loggers levels

View File

@ -0,0 +1,127 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
sqlalchemy utility functions
"""
import traceback
from typing import Any, Dict, List, Optional, Tuple
import sqlalchemy
from sqlalchemy import Column, and_, or_
from sqlalchemy.sql.elements import BinaryExpression
from metadata.orm_profiler.orm.functions.datetime import DateAddFn, DatetimeAddFn
from metadata.utils.logger import query_runner_logger
logger = query_runner_logger()
# pylint: disable=cell-var-from-loop
def build_query_filter(
filters: List[Tuple[Column, str, Any]], or_filter: bool = False
) -> Optional[BinaryExpression]:
"""Dynamically build query filter
Args:
filters (List[Tuple[Column, str, Any]]): list of tuples representing filters.
The first value is the column, the second the comparison operators (e.g. "ge", "lt", "eq", "in", etc.) and
the last value the comparison value. e.g. (Column("foo"), "ge", 1) will produce "foo >= 1".
or_filter (bool, optional): whether to perform an OR or AND condition. Defaults to False (i.e. AND).
Returns:
BinaryExpression: a filter pattern
"""
list_of_filters = []
for filter_ in filters:
column, operator, value = filter_
try:
filter_attr = (
next(
filter(
lambda x: hasattr(column, x % operator), ["%s", "%s_", "__%s__"]
),
None,
)
% operator
) # type: ignore
except TypeError as err:
logger.debug(traceback.format_exc())
logger.error(f"Error when looking for operator {operator} - {err}")
else:
list_of_filters.append(getattr(column, filter_attr)(value))
if not list_of_filters:
logger.debug("No filters found.")
return None
if or_filter:
return or_(*list_of_filters)
return and_(*list_of_filters)
def dispatch_to_date_or_datetime(
partition_interval: int,
partition_interval_unit: str,
type_,
):
"""Dispatch to date or datetime function based on the type
Args:
partition_field (_type_): _description_
partition_interval (_type_): _description_
partition_interval_unit (_type_): _description_
"""
if isinstance(type_, (sqlalchemy.DATE)):
return DateAddFn(partition_interval, partition_interval_unit)
return DatetimeAddFn(partition_interval, partition_interval_unit)
def get_partition_col_type(partition_column_name: str, columns: List[Column]):
"""From partition field, get the type
Args:
partition_column_name (str): column name
columns (List[Column]): list of table columns
Returns:
_type_: type
"""
partition_field = (
partition_column_name.lower()
) # normalize field name as we'll be looking by key
col = columns.get(partition_field)
if (
col is not None
): # if col is None, this means we have BQ pseudo columns _PARTITIONDATE or _PARTITIONTIME
return col.type
if partition_field == "_partitiondate":
return sqlalchemy.DATE()
if partition_field == "_partitiontime":
return sqlalchemy.DATETIME()
return None
def get_query_filter_for_runner(kwargs: Dict) -> Optional[BinaryExpression]:
"""Get query filters from kwargs. IMPORTANT, this will update the original dictionary
passed in the function argument.
Args:
kwargs (Dict): kwargs
"""
if kwargs.get("query_filter_"):
query_filter = kwargs.pop("query_filter_")
filter_ = build_query_filter(**query_filter)
else:
filter_ = None
return filter_

View File

@ -15,8 +15,14 @@ Helper module for test suite functions
from __future__ import annotations
from typing import Callable, Optional
from datetime import datetime
from typing import Callable, List, Optional
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
TestResultValue,
)
from metadata.generated.schema.tests.testCase import TestCaseParameterValue
@ -49,3 +55,30 @@ def get_test_case_param_value(
pre_processed_value = pre_processor(value)
return type_(pre_processed_value)
def build_test_case_result(
execution_datetime: datetime,
status: TestCaseStatus,
result: str,
test_result_value: List[TestResultValue],
sample_data: Optional[str] = None,
) -> TestCaseResult:
"""create a test case result object
Args:
execution_datetime (datetime): execution datetime of the test
status (TestCaseStatus): failed, succeed, aborted
result (str): message to display
testResultValue (List[TestResultValue]): values for the test result
Returns:
TestCaseResult:
"""
return TestCaseResult(
timestamp=execution_datetime,
testCaseStatus=status,
result=result,
testResultValue=test_result_value,
sampleData=sample_data,
)

View File

@ -0,0 +1,589 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
shared test cases
"""
import os
from datetime import datetime, timedelta
from unittest.mock import patch
from uuid import uuid4
import pytest
import sqlalchemy as sqa
from sqlalchemy.orm import declarative_base
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.connections.database.sqliteConnection import (
SQLiteConnection,
SQLiteScheme,
)
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.interfaces.sqalchemy.sqa_test_suite_interface import SQATestSuiteInterface
Base = declarative_base()
TEST_CASE_NAME = "my_test_case"
ENTITY_LINK_NICKNAME = "<#E::table::service.db.users::columns::nickname>"
ENTITY_LINK_FNAME = "<#E::table::service.db.users::columns::first name>"
ENTITY_LINK_AGE = "<#E::table::service.db.users::columns::age>"
ENTITY_LINK_NAME = "<#E::table::service.db.users::columns::name>"
ENTITY_LINK_USER = "<#E::table::service.db.users>"
TABLE = Table(
id=uuid4(),
name="users",
fullyQualifiedName="service.db.users",
columns=[
Column(name="id", dataType=DataType.INT), # type: ignore
Column(name="name", dataType=DataType.STRING), # type: ignore
Column(name="first name", dataType=DataType.STRING), # type: ignore
Column(name="fullname", dataType=DataType.STRING), # type: ignore
Column(name="nickname", dataType=DataType.STRING), # type: ignore
Column(name="age", dataType=DataType.INT), # type: ignore
Column(name="inserted_date", dataType=DataType.DATE), # type: ignore
],
database=EntityReference(id=uuid4(), name="db", type="database"), # type: ignore
) # type: ignore
class User(Base):
__tablename__ = "users"
id = sqa.Column(sqa.Integer, primary_key=True)
name = sqa.Column(sqa.String(256))
first_name = sqa.Column("first name", sqa.String(256))
fullname = sqa.Column(sqa.String(256))
nickname = sqa.Column(sqa.String(256))
age = sqa.Column(sqa.Integer)
inserted_date = sqa.Column(sqa.DATE)
@pytest.fixture
def create_sqlite_table():
"""create and delete sqlite table"""
db_path = os.path.join(
os.path.dirname(__file__), f"{os.path.splitext(__file__)[0]}.db"
)
sqlite_conn = SQLiteConnection(
scheme=SQLiteScheme.sqlite_pysqlite,
databaseMode=db_path + "?check_same_thread=False",
) # type: ignore
with patch.object(
SQATestSuiteInterface, "_convert_table_to_orm_object", return_value=User
):
sqa_profiler_interface = SQATestSuiteInterface(
sqlite_conn, # type: ignore
table_entity=TABLE,
ometa_client=None, # type: ignore
)
runner = sqa_profiler_interface.runner
engine = sqa_profiler_interface.session.get_bind()
session = sqa_profiler_interface.session
User.__table__.create(bind=engine)
for i in range(10):
data = [
User(
name="John",
first_name="Jo",
fullname="John Doe",
nickname="johnny b goode",
age=30,
inserted_date=datetime.today() - timedelta(days=i),
),
User(
name="Jane",
first_name="Ja",
fullname="Jone Doe",
nickname="Johnny d",
age=31,
inserted_date=datetime.today() - timedelta(days=i),
),
User(
name="John",
first_name="Joh",
fullname="John Doe",
nickname=None,
age=None,
inserted_date=datetime.today() - timedelta(days=i),
),
]
session.add_all(data)
session.commit()
yield runner
# clean up
User.__table__.drop(bind=engine)
os.remove(db_path)
@pytest.fixture
def test_case_column_value_length_to_be_between():
"""Test case for test column_value_length_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NICKNAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minLength", value="1"),
TestCaseParameterValue(name="maxLength", value="10"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_length_to_be_between_col_space():
"""Test case for test column_value_length_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_FNAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minLength", value="1"),
TestCaseParameterValue(name="maxLength", value="10"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_length_to_be_between_no_min():
"""Test case for test column_value_length_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_FNAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="maxLength", value="10"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_max_to_be_between():
"""Test case for test column_value_max_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minValueForMaxInCol", value="1"),
TestCaseParameterValue(name="maxValueForMaxInCol", value="10"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_max_to_be_between_no_min():
"""Test case for test column_value_max_to_be_between_no_min"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="maxValueForMaxInCol", value="10"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_mean_to_be_between():
"""Test case for test column_value_mean_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minValueForMeanInCol", value="1"),
TestCaseParameterValue(name="maxValueForMeanInCol", value="10"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_mean_to_be_between_no_max():
"""Test case for test column_value_mean_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minValueForMeanInCol", value="1"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_median_to_be_between():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minValueForMedianInCol", value="1"),
TestCaseParameterValue(name="maxValueForMedianInCol", value="10"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_min_to_be_between():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minValueForMinInCol", value="25"),
TestCaseParameterValue(name="maxValueForMinInCol", value="40"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_min_to_be_between_no_min():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="maxValueForMinInCol", value="40"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_stddev_to_be_between():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minValueForStdDevInCol", value="20"),
TestCaseParameterValue(name="maxValueForStdDevInCol", value="40"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_stddev_to_be_between_no_min():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="maxValueForStdDevInCol", value="40"),
],
) # type: ignore
@pytest.fixture
def test_case_column_value_in_set():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="allowedValues", value="['John']"),
],
) # type: ignore
@pytest.fixture
def test_case_column_values_missing_count_to_be_equal():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NICKNAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="missingCountValue", value="10"),
],
) # type: ignore
@pytest.fixture
def test_case_column_values_missing_count_to_be_equal_missing_valuesl():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NICKNAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="missingCountValue", value="10"),
TestCaseParameterValue(name="missingValueMatch", value="['Johnny d']"),
],
) # type: ignore
@pytest.fixture
def test_case_column_values_not_in_set():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="forbiddenValues", value="['John']"),
],
) # type: ignore
@pytest.fixture
def test_case_column_sum_to_be_between():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minValueForColSum", value="10"),
TestCaseParameterValue(name="maxValueForColSum", value="100"),
],
) # type: ignore
@pytest.fixture
def test_case_column_values_to_be_between():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minValue", value="29"),
TestCaseParameterValue(name="maxValue", value="33"),
],
) # type: ignore
@pytest.fixture
def test_case_column_values_to_be_not_null():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NICKNAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
) # type: ignore
@pytest.fixture
def test_case_column_values_to_be_unique():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NICKNAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
) # type: ignore
@pytest.fixture
def test_case_column_values_to_match_regex():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="regex", value="J.*"),
],
) # type: ignore
@pytest.fixture
def test_case_column_values_to_not_match_regex():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="forbiddenRegex", value="X%"),
],
) # type: ignore
@pytest.fixture
def test_case_table_column_count_to_be_between():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_USER,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minColValue", value="2"),
TestCaseParameterValue(name="maxColValue", value="10"),
],
) # type: ignore
@pytest.fixture
def test_case_table_column_count_to_equal():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_USER,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[TestCaseParameterValue(name="columnCount", value="8")],
) # type: ignore
@pytest.fixture
def test_case_table_column_name_to_exist():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_USER,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[TestCaseParameterValue(name="columnName", value="id")],
) # type: ignore
@pytest.fixture
def test_case_column_to_match_set():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_USER,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="columnNames", value="id,name,nickname")
],
) # type: ignore
@pytest.fixture
def test_case_column_to_match_set_ordered():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_USER,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(
name="columnNames", value="id,name,nickname,fullname,age"
),
TestCaseParameterValue(name="ordered", value="True"),
],
) # type: ignore
@pytest.fixture
def test_case_table_custom_sql_query():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(
name="sqlExpression", value="SELECT * FROM users WHERE age > 20"
),
],
) # type: ignore
@pytest.fixture
def test_case_table_custom_sql_query_success():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_USER,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(
name="sqlExpression", value="SELECT * FROM users WHERE age < 0"
),
],
) # type: ignore
@pytest.fixture
def test_case_table_row_count_to_be_between():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_USER,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="minValue", value="10"),
TestCaseParameterValue(name="maxValue", value="35"),
],
) # type: ignore
@pytest.fixture
def test_case_table_row_count_to_be_equal():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_USER,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="value", value="10"),
],
) # type: ignore
@pytest.fixture
def test_case_table_row_inserted_count_to_be_between():
"""Test case for test column_value_median_to_be_between"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_USER,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="min", value="3"),
TestCaseParameterValue(name="columnName", value="inserted_date"),
TestCaseParameterValue(name="rangeType", value="DAY"),
TestCaseParameterValue(name="rangeInterval", value="1"),
],
) # type: ignore

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,219 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Table and Column Tests' validate implementations.
Each test should validate the Success, Failure and Aborted statuses
"""
from datetime import datetime
import pytest
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.test_suite.validations.core import validation_enum_registry
EXECUTION_DATE = datetime.strptime("2021-07-03", "%Y-%m-%d")
# pylint: disable=line-too-long
@pytest.mark.parametrize(
"test_case_name,test_case_type,expected",
[
(
"test_case_column_value_length_to_be_between",
"columnValueLengthsToBeBetween",
(TestCaseResult, "8", "14", TestCaseStatus.Failed),
),
(
"test_case_column_value_length_to_be_between_col_space",
"columnValueLengthsToBeBetween",
(TestCaseResult, "2", "3", TestCaseStatus.Success),
),
(
"test_case_column_value_length_to_be_between_no_min",
"columnValueLengthsToBeBetween",
(TestCaseResult, None, None, TestCaseStatus.Success),
),
(
"test_case_column_value_max_to_be_between",
"columnValueMaxToBeBetween",
(TestCaseResult, "31", None, TestCaseStatus.Failed),
),
(
"test_case_column_value_max_to_be_between_no_min",
"columnValueMaxToBeBetween",
(TestCaseResult, None, None, TestCaseStatus.Failed),
),
(
"test_case_column_value_mean_to_be_between",
"columnValueMeanToBeBetween",
(TestCaseResult, "30.5", None, TestCaseStatus.Failed),
),
(
"test_case_column_value_mean_to_be_between_no_max",
"columnValueMeanToBeBetween",
(TestCaseResult, None, None, TestCaseStatus.Success),
),
(
"test_case_column_value_median_to_be_between",
"columnValueMedianToBeBetween",
(TestCaseResult, "30.0", None, TestCaseStatus.Failed),
),
(
"test_case_column_value_min_to_be_between",
"columnValueMinToBeBetween",
(TestCaseResult, "30", None, TestCaseStatus.Success),
),
(
"test_case_column_value_min_to_be_between_no_min",
"columnValueMinToBeBetween",
(TestCaseResult, None, None, TestCaseStatus.Success),
),
(
"test_case_column_value_stddev_to_be_between",
"columnValueStdDevToBeBetween",
(TestCaseResult, "0.25", None, TestCaseStatus.Failed),
),
(
"test_case_column_value_stddev_to_be_between_no_min",
"columnValueStdDevToBeBetween",
(TestCaseResult, None, None, TestCaseStatus.Success),
),
(
"test_case_column_value_in_set",
"columnValuesToBeInSet",
(TestCaseResult, "20", None, TestCaseStatus.Success),
),
(
"test_case_column_values_missing_count_to_be_equal",
"columnValuesMissingCount",
(TestCaseResult, "10", None, TestCaseStatus.Success),
),
(
"test_case_column_values_missing_count_to_be_equal_missing_valuesl",
"columnValuesMissingCount",
(TestCaseResult, "20", None, TestCaseStatus.Failed),
),
(
"test_case_column_values_not_in_set",
"columnValuesToBeNotInSet",
(TestCaseResult, "20", None, TestCaseStatus.Failed),
),
(
"test_case_column_sum_to_be_between",
"columnValuesSumToBeBetween",
(TestCaseResult, "610", None, TestCaseStatus.Failed),
),
(
"test_case_column_values_to_be_between",
"columnValuesToBeBetween",
(TestCaseResult, "30", None, TestCaseStatus.Success),
),
(
"test_case_column_values_to_be_not_null",
"columnValuesToBeNotNull",
(TestCaseResult, "10", None, TestCaseStatus.Failed),
),
(
"test_case_column_values_to_be_unique",
"columnValuesToBeUnique",
(TestCaseResult, "20", "0", TestCaseStatus.Failed),
),
(
"test_case_column_values_to_match_regex",
"columnValuesToMatchRegex",
(TestCaseResult, "30", None, TestCaseStatus.Success),
),
(
"test_case_column_values_to_not_match_regex",
"columnValuesToNotMatchRegex",
(TestCaseResult, "0", None, TestCaseStatus.Success),
),
(
"test_case_table_column_count_to_be_between",
"tableColumnCountToBeBetween",
(TestCaseResult, "7", None, TestCaseStatus.Success),
),
(
"test_case_table_column_count_to_equal",
"tableColumnCountToEqual",
(TestCaseResult, "7", None, TestCaseStatus.Failed),
),
(
"test_case_table_column_name_to_exist",
"tableColumnNameToExist",
(TestCaseResult, "True", None, TestCaseStatus.Success),
),
(
"test_case_column_to_match_set",
"tableColumnToMatchSet",
(
TestCaseResult,
"['first name', 'id', 'name', 'fullname', 'nickname', 'age', 'inserted_date']",
None,
TestCaseStatus.Failed,
),
),
(
"test_case_column_to_match_set_ordered",
"tableColumnToMatchSet",
(TestCaseResult, None, None, TestCaseStatus.Failed),
),
(
"test_case_table_custom_sql_query",
"tableCustomSQLQuery",
(TestCaseResult, "20", None, TestCaseStatus.Failed),
),
(
"test_case_table_custom_sql_query_success",
"tableCustomSQLQuery",
(TestCaseResult, "0", None, TestCaseStatus.Success),
),
(
"test_case_table_row_count_to_be_between",
"tableRowCountToBeBetween",
(TestCaseResult, "30", None, TestCaseStatus.Success),
),
(
"test_case_table_row_count_to_be_equal",
"tableRowCountToEqual",
(TestCaseResult, "30", None, TestCaseStatus.Failed),
),
(
"test_case_table_row_inserted_count_to_be_between",
"tableRowInsertedCountToBeBetween",
(TestCaseResult, "6", None, TestCaseStatus.Success),
),
],
)
def test_suite_validation_database(
test_case_name,
test_case_type,
expected,
request,
create_sqlite_table,
):
"""Generic test runner for test validations"""
test_case = request.getfixturevalue(test_case_name)
type_, val_1, val_2, status = expected
res = validation_enum_registry.registry[test_case_type](
create_sqlite_table,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
assert isinstance(res, type_)
if val_1:
assert res.testResultValue[0].value == val_1
if val_2:
assert res.testResultValue[1].value == val_2
assert res.testCaseStatus == status

View File

@ -0,0 +1,210 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Table and Column Tests' validate implementations.
Each test should validate the Success, Failure and Aborted statuses
"""
from datetime import datetime
import pytest
from pandas import DataFrame
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.test_suite.validations.core import validation_enum_registry
EXECUTION_DATE = datetime.strptime("2021-07-03", "%Y-%m-%d")
DL_DATA = (
["1", "John", "Jo", "John Doe", "johnny b goode", 30],
["2", "Jane", "Ja", "Jone Doe", "Johnny d", 31],
["3", "John", "Joh", "John Doe", None, None],
) * 10
DATALAKE_DATA_FRAME = DataFrame(
DL_DATA, columns=["id", "name", "first name", "fullname", "nickname", "age"]
)
# pylint: disable=line-too-long
@pytest.mark.parametrize(
"test_case_name,test_case_type,expected",
[
(
"test_case_column_value_length_to_be_between",
"columnValueLengthsToBeBetween",
(TestCaseResult, "4", "14", TestCaseStatus.Failed),
),
(
"test_case_column_value_length_to_be_between_col_space",
"columnValueLengthsToBeBetween",
(TestCaseResult, "2", "3", TestCaseStatus.Success),
),
(
"test_case_column_value_length_to_be_between_no_min",
"columnValueLengthsToBeBetween",
(TestCaseResult, None, None, TestCaseStatus.Success),
),
(
"test_case_column_value_max_to_be_between",
"columnValueMaxToBeBetween",
(TestCaseResult, "31.0", None, TestCaseStatus.Failed),
),
(
"test_case_column_value_max_to_be_between_no_min",
"columnValueMaxToBeBetween",
(TestCaseResult, None, None, TestCaseStatus.Failed),
),
(
"test_case_column_value_mean_to_be_between",
"columnValueMeanToBeBetween",
(TestCaseResult, "30.5", None, TestCaseStatus.Failed),
),
(
"test_case_column_value_mean_to_be_between_no_max",
"columnValueMeanToBeBetween",
(TestCaseResult, None, None, TestCaseStatus.Success),
),
(
"test_case_column_value_median_to_be_between",
"columnValueMedianToBeBetween",
(TestCaseResult, "30.5", None, TestCaseStatus.Failed),
),
(
"test_case_column_value_min_to_be_between",
"columnValueMinToBeBetween",
(TestCaseResult, "30.0", None, TestCaseStatus.Success),
),
(
"test_case_column_value_min_to_be_between_no_min",
"columnValueMinToBeBetween",
(TestCaseResult, None, None, TestCaseStatus.Success),
),
(
"test_case_column_value_stddev_to_be_between",
"columnValueStdDevToBeBetween",
(TestCaseResult, "0.512989176042577", None, TestCaseStatus.Failed),
),
(
"test_case_column_value_stddev_to_be_between_no_min",
"columnValueStdDevToBeBetween",
(TestCaseResult, None, None, TestCaseStatus.Success),
),
(
"test_case_column_value_in_set",
"columnValuesToBeInSet",
(TestCaseResult, "20", None, TestCaseStatus.Success),
),
(
"test_case_column_values_missing_count_to_be_equal",
"columnValuesMissingCount",
(TestCaseResult, "10", None, TestCaseStatus.Success),
),
(
"test_case_column_values_missing_count_to_be_equal_missing_valuesl",
"columnValuesMissingCount",
(TestCaseResult, "20", None, TestCaseStatus.Failed),
),
(
"test_case_column_values_not_in_set",
"columnValuesToBeNotInSet",
(TestCaseResult, "20", None, TestCaseStatus.Failed),
),
(
"test_case_column_sum_to_be_between",
"columnValuesSumToBeBetween",
(TestCaseResult, "610.0", None, TestCaseStatus.Failed),
),
(
"test_case_column_values_to_be_between",
"columnValuesToBeBetween",
(TestCaseResult, "30.0", None, TestCaseStatus.Success),
),
(
"test_case_column_values_to_be_not_null",
"columnValuesToBeNotNull",
(TestCaseResult, "10", None, TestCaseStatus.Failed),
),
(
"test_case_column_values_to_be_unique",
"columnValuesToBeUnique",
(TestCaseResult, "30", "2", TestCaseStatus.Failed),
),
(
"test_case_table_column_count_to_be_between",
"tableColumnCountToBeBetween",
(TestCaseResult, "6", None, TestCaseStatus.Success),
),
(
"test_case_table_column_count_to_equal",
"tableColumnCountToEqual",
(TestCaseResult, "6", None, TestCaseStatus.Failed),
),
(
"test_case_table_column_name_to_exist",
"tableColumnNameToExist",
(TestCaseResult, "True", None, TestCaseStatus.Success),
),
(
"test_case_column_to_match_set",
"tableColumnToMatchSet",
(
TestCaseResult,
"['id', 'name', 'first name', 'fullname', 'nickname', 'age']",
None,
TestCaseStatus.Failed,
),
),
(
"test_case_column_to_match_set_ordered",
"tableColumnToMatchSet",
(TestCaseResult, None, None, TestCaseStatus.Failed),
),
(
"test_case_table_row_count_to_be_between",
"tableRowCountToBeBetween",
(TestCaseResult, "30", None, TestCaseStatus.Success),
),
(
"test_case_table_row_count_to_be_equal",
"tableRowCountToEqual",
(TestCaseResult, "30", None, TestCaseStatus.Failed),
),
(
"test_case_table_row_inserted_count_to_be_between",
"tableRowInsertedCountToBeBetween",
(TestCaseResult, None, None, TestCaseStatus.Aborted),
),
],
)
def test_suite_validation_datalake(
test_case_name,
test_case_type,
expected,
request,
):
"""Generic test runner for test validations"""
test_case = request.getfixturevalue(test_case_name)
type_, val_1, val_2, status = expected
res = validation_enum_registry.registry[test_case_type](
DATALAKE_DATA_FRAME,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
assert isinstance(res, type_)
if val_1:
assert res.testResultValue[0].value == val_1
if val_2:
assert res.testResultValue[1].value == val_2
assert res.testCaseStatus == status

View File

@ -0,0 +1,61 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
sqlalchemy utility tests
"""
import pytest
import sqlalchemy
from sqlalchemy import Column, text
from metadata.utils.sqa_utils import build_query_filter, dispatch_to_date_or_datetime
FILTER_TEST_DATA = [
(
[(Column("foo"), "eq", "foo"), (Column("bar"), "ne", "bar")],
"foo = 'foo' AND bar != 'bar'",
),
([(Column("foo"), "gt", 1), (Column("bar"), "lt", 2)], "foo > 1 AND bar < 2"),
([(Column("foo"), "in", [1, 2, 3, 4])], "foo IN (1, 2, 3, 4)"),
([(Column("foo"), "not_in", [1, 2, 3, 4])], "(foo NOT IN (1, 2, 3, 4))"),
]
DISPATCH_TEST_DATA = [
(1, "DAY", sqlalchemy.DATE(), "CAST(CURRENT_DATE - interval '1' DAY AS DATE)"),
(
1,
"HOUR",
sqlalchemy.DATETIME(),
"CAST(CURRENT_TIMESTAMP - interval '1' HOUR AS TIMESTAMP)",
),
]
@pytest.mark.parametrize(
"filters,expected", FILTER_TEST_DATA, ids=["eq", "gt_lt", "in", "not_in"]
)
def test_build_query_filter(filters, expected):
"""Test SQA query filter builder"""
filter_ = build_query_filter(filters, False)
assert filter_.compile(compile_kwargs={"literal_binds": True}).string == expected
@pytest.mark.parametrize(
"interval,interval_type,type_,expected",
DISPATCH_TEST_DATA,
ids=["date", "datetime"],
)
def test_dispatch_to_date_or_datetime(interval, interval_type, type_, expected):
"""Test dispatch function"""
fn = dispatch_to_date_or_datetime(interval, text(interval_type), type_)
assert fn.compile(compile_kwargs={"literal_binds": True}).string == expected

View File

@ -0,0 +1,46 @@
{
"name": "tableRowInsertedCountToBeBetween",
"fullyQualifiedName": "tableRowInsertedCountToBeBetween",
"displayName": "Table Row Inserted Count To be Between",
"description": "This schema defines the test tableRowInsertedCountToBeBetween. Test the number of rows inserted is between x and y.",
"entityType": "TABLE",
"testPlatforms": ["OpenMetadata"],
"parameterDefinition": [
{
"name": "min",
"displayName": "Min Row Count",
"description": "Lower Bound of the Count",
"dataType": "INT",
"required": false
},
{
"name": "max",
"displayName": "Max Row Count",
"description": "Upper Bound of the Count",
"dataType": "INT",
"required": false
},
{
"name": "columnName",
"displayName": "Column Name",
"description": "Name of the Column. It should be a timestamp, date or datetime field.",
"dataType": "STRING",
"required": true
},
{
"name": "rangeType",
"displayName": "Range Type",
"description": "One of 'HOUR', 'DAY', 'MONTH', 'YEAR'",
"dataType": "STRING",
"required": true
},
{
"name": "rangeInterval",
"displayName": "Interval",
"description": "Interval Range. E.g. if rangeInterval=1 and rangeType=DAY, we'll check the numbers of rows inserted where columnName=-1 DAY",
"dataType": "INT",
"required": true
}
]
}