Add Test Suite Implementation for Datalake (#9235)

This commit is contained in:
Ayush Shah 2022-12-14 21:14:51 +05:30 committed by GitHub
parent c36544640f
commit a6ae9fd11a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 1778 additions and 351 deletions

View File

@ -17,11 +17,7 @@ import traceback
from typing import TYPE_CHECKING, Dict
from airflow_provider_openmetadata.lineage.config.loader import get_lineage_config
from airflow_provider_openmetadata.lineage.utils import (
add_status,
get_xlets,
parse_lineage,
)
from airflow_provider_openmetadata.lineage.utils import add_status
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.ingestion.ometa.ometa_api import OpenMetadata

View File

@ -43,7 +43,6 @@ from metadata.generated.schema.entity.services.pipelineService import (
PipelineService,
PipelineServiceType,
)
from metadata.generated.schema.type.basic import Uuid
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata

View File

@ -0,0 +1,29 @@
source:
type: datalake
serviceName: local_datalake
serviceConnection:
config:
type: Datalake
configSource:
securityConfig:
awsAccessKeyId: aws access key id
awsSecretAccessKey: aws secret access key
awsRegion: aws region
bucketName: bucket name
prefix: prefix
sourceConfig:
config:
type: Profiler
processor:
type: "orm-profiler"
config: {}
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -63,6 +63,22 @@ DATALAKE_INT_TYPES = {"int64", "INT", "int32"}
DATALAKE_SUPPORTED_FILE_TYPES = (".csv", ".tsv", ".json", ".parquet", ".json.gz")
def ometa_to_dataframe(config_source, client, table):
if isinstance(config_source, GCSConfig):
return DatalakeSource.get_gcs_files(
client=client,
key=table.name.__root__,
bucket_name=table.databaseSchema.name,
)
if isinstance(config_source, S3Config):
return DatalakeSource.get_s3_files(
client=client,
key=table.name.__root__,
bucket_name=table.databaseSchema.name,
)
return None
class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-methods
"""
Implements the necessary methods to extract

View File

@ -17,18 +17,13 @@ supporting sqlalchemy abstraction layer
import traceback
from collections import defaultdict
from datetime import datetime, timezone
from typing import Dict, List, Union
from typing import Dict, List
from pydantic import BaseModel
from sqlalchemy import Column
from metadata.generated.schema.entity.data.table import DataType, TableData
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
GCSConfig,
S3Config,
)
from metadata.generated.schema.entity.data.table import TableData
from metadata.ingestion.api.processor import ProfilerProcessorStatus
from metadata.ingestion.source.database.datalake import DatalakeSource
from metadata.ingestion.source.database.datalake import ometa_to_dataframe
from metadata.interfaces.profiler_protocol import (
ProfilerInterfaceArgs,
ProfilerProtocol,
@ -36,6 +31,7 @@ from metadata.interfaces.profiler_protocol import (
from metadata.orm_profiler.metrics.core import MetricTypes
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.datalake_sampler import DatalakeSampler
from metadata.utils.column_base_model import ColumnBaseModel
from metadata.utils.connections import get_connection
from metadata.utils.dispatch import valuedispatch
from metadata.utils.logger import profiler_interface_registry_logger
@ -69,8 +65,10 @@ class DataLakeProfilerInterface(ProfilerProtocol):
self.profile_query = profiler_interface_args.table_sample_query
self.partition_details = None
self._table = profiler_interface_args.table_entity
self.data_frame_list = self.ometa_to_dataframe(
self.service_connection_config.configSource
self.data_frame_list = ometa_to_dataframe(
config_source=self.service_connection_config.configSource,
client=self.client,
table=self.table,
)
@valuedispatch
@ -220,21 +218,6 @@ class DataLakeProfilerInterface(ProfilerProtocol):
"""
return None # to be implemented
def ometa_to_dataframe(self, config_source):
if isinstance(config_source, GCSConfig):
return DatalakeSource.get_gcs_files(
client=self.client,
key=self.table.name.__root__,
bucket_name=self.table.databaseSchema.name,
)
if isinstance(config_source, S3Config):
return DatalakeSource.get_s3_files(
client=self.client,
key=self.table.name.__root__,
bucket_name=self.table.databaseSchema.name,
)
return None
def compute_metrics(
self,
metrics,
@ -336,17 +319,7 @@ class DataLakeProfilerInterface(ProfilerProtocol):
return self._table
def get_columns(self):
return [
ColumnBaseModel(
name=column, datatype=self.data_frame_list[0][column].dtype.name
)
for column in self.data_frame_list[0].columns
]
return ColumnBaseModel.col_base_model_list(self.data_frame_list)
def close(self):
pass
class ColumnBaseModel(BaseModel):
name: str
datatype: Union[DataType, str]

View File

@ -0,0 +1,79 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from datetime import datetime, timezone
from typing import Optional
from pandas import DataFrame
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.interfaces.test_suite_protocol import TestSuiteProtocol
from metadata.test_suite.validations.core import validation_enum_registry
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
class DataLakeTestSuiteInterface(TestSuiteProtocol):
"""
Sequential interface protocol for testSuite and Profiler. This class
implements specific operations needed to run profiler and test suite workflow
against a Datalake source.
"""
def __init__(
self,
ometa_client: OpenMetadata = None,
service_connection_config: DatalakeConnection = None,
table_entity=None,
data_frame: DataFrame = None,
):
self.table_entity = table_entity
self.data_frame = data_frame
self.ometa_client = ometa_client
self.service_connection_config = service_connection_config
def run_test_case(
self,
test_case: TestCase,
) -> Optional[TestCaseResult]:
"""Run table tests where platformsTest=OpenMetadata
Args:
test_case: test case object to execute
Returns:
TestCaseResult object
"""
try:
return validation_enum_registry.registry[
test_case.testDefinition.fullyQualifiedName
](
self.data_frame,
test_case=test_case,
execution_date=datetime.now(tz=timezone.utc).timestamp(),
)
except KeyError as err:
logger.warning(
f"Test definition {test_case.testDefinition.fullyQualifiedName} not registered in OpenMetadata "
f"TestDefintion registry. Skipping test case {test_case.name.__root__} - {err}"
)
return None

View File

@ -150,9 +150,9 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteProtocol):
return validation_enum_registry.registry[
test_case.testDefinition.fullyQualifiedName
](
test_case,
self.runner,
test_case=test_case,
execution_date=datetime.now(tz=timezone.utc).timestamp(),
runner=self.runner,
)
except KeyError as err:
logger.warning(

View File

@ -16,6 +16,7 @@ CountInSet Metric definition
import traceback
from typing import List
from pandas import DataFrame
from sqlalchemy import case, column
from metadata.orm_profiler.metrics.core import StaticMetric, _label
@ -63,5 +64,15 @@ class CountInSet(StaticMetric):
return None
@_label
def dl_fn(self):
return self.fn()
def dl_fn(self, data_frame: DataFrame):
try:
count = 0
rows = list(data_frame[self.col.name])
for value in self.values:
if value in rows:
count = rows.count(value)
return count
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Error trying to run countInSet for {self.col.name}: {exc}")
return None

View File

@ -14,6 +14,7 @@ Unique Count Metric definition
"""
from typing import Optional
from pandas import DataFrame
from sqlalchemy import column, func
from sqlalchemy.orm import DeclarativeMeta, Session
@ -65,7 +66,7 @@ class UniqueCount(QueryMetric):
only_once_cte = only_once.cte("only_once")
return session.query(func.count().label(self.name())).select_from(only_once_cte)
def dl_query(self, data_frame):
def dl_query(self, data_frame: DataFrame = None):
"""
Build the Unique Count metric
"""

View File

@ -32,7 +32,6 @@ from metadata.generated.schema.entity.data.table import (
SystemProfile,
TableProfile,
)
from metadata.interfaces.datalake.datalake_profiler_interface import ColumnBaseModel
from metadata.interfaces.profiler_protocol import ProfilerProtocol
from metadata.orm_profiler.api.models import ProfilerResponse
from metadata.orm_profiler.metrics.core import (
@ -47,6 +46,7 @@ from metadata.orm_profiler.metrics.core import (
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.metrics.static.row_count import RowCount
from metadata.orm_profiler.orm.registry import NOT_COMPUTE, NOT_COMPUTE_OM
from metadata.utils.column_base_model import ColumnBaseModel
from metadata.utils.logger import profiler_logger
logger = profiler_logger()

View File

@ -30,7 +30,7 @@ class TestCaseDefinition(ConfigModel):
description: Optional[str] = "Default suite description"
testDefinitionName: str
entityLink: EntityLink
parameterValues: List[TestCaseParameterValue]
parameterValues: Optional[List[TestCaseParameterValue]]
class TestSuiteDefinition(ConfigModel):

View File

@ -29,6 +29,9 @@ from metadata.config.workflow import get_sink
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import PartitionProfilerConfig, Table
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -49,10 +52,15 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.processor import ProcessorStatus
from metadata.ingestion.ometa.client_utils import create_ometa_client
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.datalake import ometa_to_dataframe
from metadata.interfaces.datalake.datalake_test_suite_interface import (
DataLakeTestSuiteInterface,
)
from metadata.interfaces.sqalchemy.sqa_test_suite_interface import SQATestSuiteInterface
from metadata.test_suite.api.models import TestCaseDefinition, TestSuiteProcessorConfig
from metadata.test_suite.runner.core import DataTestsRunner
from metadata.utils import entity_link
from metadata.utils.connections import get_connection
from metadata.utils.logger import test_suite_logger
from metadata.utils.partition import get_partition_details
from metadata.utils.workflow_output_handler import print_test_suite_status
@ -87,6 +95,7 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
self.metadata_config: OpenMetadataConnection = (
self.config.workflowConfig.openMetadataServerConfig
)
self.client = create_ometa_client(self.metadata_config)
self.metadata = OpenMetadata(self.metadata_config)
self.set_ingestion_pipeline_status(state=PipelineState.running)
@ -218,25 +227,42 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
"""
return get_partition_details(entity)
def _create_runner_interface(self, entity_fqn: str, sqa_metadata_obj: MetaData):
"""create the interface to execute test against SQA sources"""
def _create_runner_interface(self, entity_fqn: str):
"""create the interface to execute test against sources"""
table_entity = self._get_table_entity_from_test_case(entity_fqn)
return SQATestSuiteInterface(
service_connection_config=self._get_service_connection_from_test_case(
entity_fqn
),
ometa_client=create_ometa_client(self.metadata_config),
sqa_metadata_obj=sqa_metadata_obj,
table_entity=table_entity,
table_sample_precentage=self._get_profile_sample(table_entity)
if not self._get_profile_query(table_entity)
else None,
table_sample_query=self._get_profile_query(table_entity)
service_connection_config = self._get_service_connection_from_test_case(
entity_fqn
)
table_partition_config = None
table_sample_precentage = None
table_sample_query = (
self._get_profile_query(table_entity)
if not self._get_profile_sample(table_entity)
else None,
table_partition_config=self._get_partition_details(table_entity)
if not self._get_profile_query(table_entity)
else None,
else None
)
if not table_sample_query:
table_sample_precentage = self._get_profile_sample(table_entity)
table_partition_config = self._get_partition_details(table_entity)
if not isinstance(service_connection_config, DatalakeConnection):
sqa_metadata_obj = MetaData()
return SQATestSuiteInterface(
service_connection_config=service_connection_config,
ometa_client=self.client,
sqa_metadata_obj=sqa_metadata_obj,
table_entity=table_entity,
table_sample_precentage=table_sample_precentage,
table_sample_query=table_sample_query,
table_partition_config=table_partition_config,
)
self.client = get_connection(service_connection_config).client
return DataLakeTestSuiteInterface(
service_connection_config=service_connection_config,
ometa_client=self.client,
data_frame=ometa_to_dataframe(
service_connection_config.configSource, self.client, table_entity
)[0],
table_entity=table_entity,
)
def _create_data_tests_runner(self, sqa_interface):
@ -357,7 +383,9 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
testSuite=self.metadata.get_entity_reference(
entity=TestSuite, fqn=test_suite.name
),
parameterValues=list(test_case_to_create.parameterValues),
parameterValues=list(test_case_to_create.parameterValues)
if test_case_to_create.parameterValues
else None,
)
)
)
@ -400,11 +428,8 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
unique_entity_fqns = self._get_unique_entities_from_test_cases(test_cases)
for entity_fqn in unique_entity_fqns:
sqa_metadata_obj = MetaData()
try:
runner_interface = self._create_runner_interface(
entity_fqn, sqa_metadata_obj
)
runner_interface = self._create_runner_interface(entity_fqn)
data_test_runner = self._create_data_tests_runner(runner_interface)
for test_case in self._filter_test_cases_for_entity(

View File

@ -16,7 +16,10 @@ ColumnValuesToBeBetween validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -27,6 +30,7 @@ from metadata.generated.schema.tests.basic import (
from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
from metadata.utils.test_suite import get_test_case_param_value
@ -34,14 +38,34 @@ from metadata.utils.test_suite import get_test_case_param_value
logger = test_suite_logger()
def test_case_status_result(min_bound, max_bound, max_value_res):
return (
TestCaseStatus.Success
if min_bound <= max_value_res <= max_bound
else TestCaseStatus.Failed,
f"Found max={max_value_res} vs."
+ f" the expected min={min_bound}, max={max_bound}.",
)
@singledispatch
def column_value_max_to_be_between(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_value_max_to_be_between.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
:param test_case: ColumnValuesToBeBetween
:param test_case: columnValueMaxToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
@ -94,15 +118,7 @@ def column_value_max_to_be_between(
default=float("inf"),
)
status = (
TestCaseStatus.Success
if min_bound <= max_value_res <= max_bound
else TestCaseStatus.Failed
)
result = (
f"Found max={max_value_res} vs."
+ f" the expected min={min_bound}, max={max_bound}."
)
status, result = test_case_status_result(min_bound, max_bound, max_value_res)
return TestCaseResult(
timestamp=execution_date,
@ -110,3 +126,42 @@ def column_value_max_to_be_between(
result=result,
testResultValue=[TestResultValue(name="max", value=str(max_value_res))],
)
@column_value_max_to_be_between.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate Column Values metric
:param test_case: columnValueMaxToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
min_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"minValueForMaxInCol",
float,
default=float("-inf"),
)
max_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"maxValueForMaxInCol",
float,
default=float("inf"),
)
max_value_res = Metrics.MAX.value(column_obj).dl_fn(runner)
status, result = test_case_status_result(min_bound, max_bound, max_value_res)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="max", value=str(max_value_res))],
)

View File

@ -16,7 +16,10 @@ ColumnValuesToBeBetween validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -27,6 +30,7 @@ from metadata.generated.schema.tests.basic import (
from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
from metadata.utils.test_suite import get_test_case_param_value
@ -34,14 +38,38 @@ from metadata.utils.test_suite import get_test_case_param_value
logger = test_suite_logger()
def test_case_status_result(
min_bound,
max_bound,
mean_value_res,
):
return (
TestCaseStatus.Success
if min_bound <= mean_value_res <= max_bound
else TestCaseStatus.Failed,
f"Found mean={mean_value_res:.2f} vs."
+ f" the expected min={min_bound}, max={max_bound}.",
)
@singledispatch
def column_value_mean_to_be_between(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_value_mean_to_be_between.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
:param test_case: ColumnValuesToBeBetween
:param test_case: columnValueMeanToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
@ -94,14 +122,10 @@ def column_value_mean_to_be_between(
default=float("inf"),
)
status = (
TestCaseStatus.Success
if min_bound <= mean_value_res <= max_bound
else TestCaseStatus.Failed
)
result = (
f"Found mean={mean_value_res:.2f} vs."
+ f" the expected min={min_bound}, max={max_bound}."
status, result = test_case_status_result(
min_bound,
max_bound,
mean_value_res,
)
return TestCaseResult(
@ -110,3 +134,46 @@ def column_value_mean_to_be_between(
result=result,
testResultValue=[TestResultValue(name="mean", value=str(mean_value_res))],
)
@column_value_mean_to_be_between.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate Column Values metric
:param test_case: columnValueMeanToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
min_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"minValueForMeanInCol",
float,
default=float("-inf"),
)
max_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"maxValueForMeanInCol",
float,
default=float("inf"),
)
mean_value_res = Metrics.MEAN.value(column_obj).dl_fn(runner)
status, result = test_case_status_result(
min_bound,
max_bound,
mean_value_res,
)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="mean", value=str(mean_value_res))],
)

View File

@ -16,7 +16,10 @@ ColumnValuesToBeBetween validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -27,6 +30,7 @@ from metadata.generated.schema.tests.basic import (
from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
from metadata.utils.test_suite import get_test_case_param_value
@ -34,14 +38,34 @@ from metadata.utils.test_suite import get_test_case_param_value
logger = test_suite_logger()
def test_case_status_result(min_bound, max_bound, median_value_res):
return (
TestCaseStatus.Success
if min_bound <= median_value_res <= max_bound
else TestCaseStatus.Failed,
f"Found median={median_value_res} vs."
+ f" the expected median={min_bound}, max={max_bound}.",
)
@singledispatch
def column_value_median_to_be_between(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_value_median_to_be_between.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
:param test_case: ColumnValuesToBeBetween
:param test_case: columnValueMedianToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
@ -89,21 +113,50 @@ def column_value_median_to_be_between(
max_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"maxColValue",
"maxValueForMedianInCol",
float,
default=float("inf"),
)
status = (
TestCaseStatus.Success
if min_bound <= median_value_res <= max_bound
else TestCaseStatus.Failed
)
result = (
f"Found median={median_value_res} vs."
+ f" the expected median={min_bound}, max={max_bound}."
)
status, result = test_case_status_result(min_bound, max_bound, median_value_res)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="median", value=str(median_value_res))],
)
@column_value_median_to_be_between.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate Column Values metric
:param test_case: columnValueMedianToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
median_value_res = Metrics.MEDIAN.value(column_obj).dl_fn(runner)
min_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"minValueForMedianInCol",
float,
default=float("-inf"),
)
max_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"maxValueForMedianInCol",
float,
default=float("inf"),
)
status, result = test_case_status_result(min_bound, max_bound, median_value_res)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,

View File

@ -16,7 +16,10 @@ ColumnValuesToBeBetween validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -27,6 +30,7 @@ from metadata.generated.schema.tests.basic import (
from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
from metadata.utils.test_suite import get_test_case_param_value
@ -34,14 +38,34 @@ from metadata.utils.test_suite import get_test_case_param_value
logger = test_suite_logger()
def test_case_status_result(min_bound, max_bound, min_value_res):
return (
TestCaseStatus.Success
if min_bound <= min_value_res <= max_bound
else TestCaseStatus.Failed,
f"Found min={min_value_res} vs."
+ f" the expected min={min_bound}, max={max_bound}.",
)
@singledispatch
def column_value_min_to_be_between(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_value_min_to_be_between.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
:param test_case: ColumnValuesToBeBetween
:param test_case: columnValueMinToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
@ -94,15 +118,7 @@ def column_value_min_to_be_between(
default=float("inf"),
)
status = (
TestCaseStatus.Success
if min_bound <= min_value_res <= max_bound
else TestCaseStatus.Failed
)
result = (
f"Found min={min_value_res} vs."
+ f" the expected min={min_bound}, max={max_bound}."
)
status, result = test_case_status_result(min_bound, max_bound, min_value_res)
return TestCaseResult(
timestamp=execution_date,
@ -110,3 +126,41 @@ def column_value_min_to_be_between(
result=result,
testResultValue=[TestResultValue(name="min", value=str(min_value_res))],
)
@column_value_min_to_be_between.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate Column Values metric
:param test_case: columnValueMinToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
min_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"minValueForMinInCol",
float,
default=float("-inf"),
)
max_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"maxValueForMinInCol",
float,
default=float("inf"),
)
min_value_res = Metrics.MIN.value(column_obj).dl_fn(runner)
status, result = test_case_status_result(min_bound, max_bound, min_value_res)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="min", value=str(min_value_res))],
)

View File

@ -16,7 +16,10 @@ ColumnValuesToBeBetween validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -27,6 +30,7 @@ from metadata.generated.schema.tests.basic import (
from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
from metadata.utils.test_suite import get_test_case_param_value
@ -34,14 +38,34 @@ from metadata.utils.test_suite import get_test_case_param_value
logger = test_suite_logger()
def test_case_status_result(min_bound, max_bound, stddev_value_res):
return (
TestCaseStatus.Success
if min_bound <= stddev_value_res <= max_bound
else TestCaseStatus.Failed,
f"Found stddev={stddev_value_res:.2f} vs."
+ f" the expected min={min_bound}, max={max_bound}.",
)
@singledispatch
def column_value_stddev_to_be_between(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_value_stddev_to_be_between.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
:param test_case: ColumnValuesToBeBetween
:param test_case: columnValueStdDevToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
@ -94,15 +118,7 @@ def column_value_stddev_to_be_between(
default=float("inf"),
)
status = (
TestCaseStatus.Success
if min_bound <= stddev_value_res <= max_bound
else TestCaseStatus.Failed
)
result = (
f"Found stddev={stddev_value_res:.2f} vs."
+ f" the expected min={min_bound}, max={max_bound}."
)
status, result = test_case_status_result(min_bound, max_bound, stddev_value_res)
return TestCaseResult(
timestamp=execution_date,
@ -110,3 +126,41 @@ def column_value_stddev_to_be_between(
result=result,
testResultValue=[TestResultValue(name="min", value=str(stddev_value_res))],
)
@column_value_stddev_to_be_between.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate Column Values metric
:param test_case: columnValueStdDevToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
min_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"minValueForStdDevInCol",
float,
default=float("-inf"),
)
max_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"maxValueForStdDevInCol",
float,
default=float("inf"),
)
stddev_value_res = Metrics.STDDEV.value(column_obj).dl_fn(runner)
status, result = test_case_status_result(min_bound, max_bound, stddev_value_res)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="min", value=str(stddev_value_res))],
)

View File

@ -16,7 +16,10 @@ ColumnValuesToBeNotNull validation implementation
import traceback
from ast import literal_eval
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -28,17 +31,35 @@ from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.core import add_props
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
def test_case_status_result(set_count_res):
return (
TestCaseStatus.Success if set_count_res >= 1 else TestCaseStatus.Failed,
f"Found countInSet={set_count_res}",
)
# pylint: disable=abstract-class-instantiated
@singledispatch
def column_values_in_set(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_values_in_set.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
@ -87,8 +108,40 @@ def column_values_in_set(
testResultValue=[TestResultValue(name="allowedValueCount", value=None)],
)
status = TestCaseStatus.Success if set_count_res >= 1 else TestCaseStatus.Failed
result = f"Found countInSet={set_count_res}"
status, result = test_case_status_result(set_count_res)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[
TestResultValue(
name="allowedValueCount",
value=str(set_count_res),
)
],
)
# pylint: disable=no-member,abstract-class-instantiated
@column_values_in_set.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
allowed_value = next(
(
literal_eval(param.value)
for param in test_case.parameterValues
if param.name == "allowedValues"
)
)
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
set_count_res = add_props(values=allowed_value)(Metrics.COUNT_IN_SET.value)(
column_obj
).dl_fn(runner)
status, result = test_case_status_result(set_count_res)
return TestCaseResult(
timestamp=execution_date,

View File

@ -16,7 +16,10 @@ ColumnValueLengthsToBeBetween validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -27,6 +30,7 @@ from metadata.generated.schema.tests.basic import (
from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
from metadata.utils.test_suite import get_test_case_param_value
@ -34,14 +38,73 @@ from metadata.utils.test_suite import get_test_case_param_value
logger = test_suite_logger()
def _return_test_case(
min_value_length_value_res, max_value_length_value_res, execution_date, test_case
):
min_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"minLength",
float,
default=float("-inf"),
)
max_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"maxLength",
float,
default=float("inf"),
)
status, result = test_case_status_result(
min_bound, max_bound, min_value_length_value_res, max_value_length_value_res
)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[
TestResultValue(
name="minValueLength", value=str(min_value_length_value_res)
),
TestResultValue(
name="maxValueLength", value=str(max_value_length_value_res)
),
],
)
def test_case_status_result(
min_bound, max_bound, min_value_length_value_res, max_value_length_value_res
):
return (
TestCaseStatus.Success
if min_bound <= min_value_length_value_res
and max_bound >= max_value_length_value_res
else TestCaseStatus.Failed,
f"Found minLength={min_value_length_value_res}, maxLength={max_value_length_value_res} vs."
+ f" the expected minLength={min_bound}, maxLength={max_bound}.",
)
@singledispatch
def column_value_length_to_be_between(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_value_length_to_be_between.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
:param test_case: ColumnValueLengthsToBeBetween
:param test_case: columnValueLengthsToBeBetween
:param col_profile: should contain minLength & maxLength metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
@ -105,42 +168,34 @@ def column_value_length_to_be_between(
TestResultValue(name="maxValueLength", value=None),
],
)
min_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"minLength",
float,
default=float("-inf"),
return _return_test_case(
min_value_length_value_res,
max_value_length_value_res,
execution_date,
test_case,
)
max_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"maxLength",
float,
default=float("inf"),
)
status = (
TestCaseStatus.Success
if min_bound <= min_value_length_value_res
and max_bound >= max_value_length_value_res
else TestCaseStatus.Failed
)
result = (
f"Found minLength={min_value_length_value_res}, maxLength={max_value_length_value_res} vs."
+ f" the expected minLength={min_bound}, maxLength={max_bound}."
)
@column_value_length_to_be_between.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate Column Values metric
:param test_case: columnValueLengthsToBeBetween
:param col_profile: should contain minLength & maxLength metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[
TestResultValue(
name="minValueLength", value=str(min_value_length_value_res)
),
TestResultValue(
name="maxValueLength", value=str(max_value_length_value_res)
),
],
min_value_length_value_res = Metrics.MIN_LENGTH.value(column_obj).dl_fn(runner)
max_value_length_value_res = Metrics.MAX_LENGTH.value(column_obj).dl_fn(runner)
return _return_test_case(
min_value_length_value_res,
max_value_length_value_res,
execution_date,
test_case,
)

View File

@ -16,7 +16,10 @@ ColumnValuesMissingCount validation implementation
import traceback
from ast import literal_eval
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -28,16 +31,36 @@ from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.core import add_props
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
def test_case_status_result(null_count_value_res, missing_count_values):
return (
TestCaseStatus.Success
if null_count_value_res == missing_count_values
else TestCaseStatus.Failed,
f"Found missingCount={null_count_value_res}. It should be {missing_count_values}.",
)
@singledispatch
def column_values_missing_count_to_be_equal(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_values_missing_count_to_be_equal.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
@ -117,13 +140,60 @@ def column_values_missing_count_to_be_equal(
result=msg,
testResultValue=[TestResultValue(name="nullCount", value=None)],
)
status = (
TestCaseStatus.Success
if null_count_value_res == missing_count_values
else TestCaseStatus.Failed
)
result = f"Found missingCount={null_count_value_res}. It should be {missing_count_values}."
status, result = test_case_status_result(null_count_value_res, missing_count_values)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[
TestResultValue(name="missingCount", value=str(null_count_value_res))
],
)
# pylint: disable=abstract-class-instantiated,no-member
@column_values_missing_count_to_be_equal.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate Column Values metric
:param test_case: ColumnValuesMissingCount. Just used to trigger singledispatch
:param col_profile: should contain count and distinct count metrics
:param execution_date: Datetime when the tests ran
:param session: SQLAlchemy Session, for tests that need to compute new metrics
:param table: SQLAlchemy Table, for tests that need to compute new metrics
:param profile_sample: % of the data to run the profiler on
:return: TestCaseResult with status and results
"""
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
null_count_value_res = Metrics.NULL_COUNT.value(column_obj).dl_fn(runner)
missing_values = next(
(
literal_eval(param.value)
for param in test_case.parameterValues
if param.name == "missingValueMatch"
),
None,
)
if missing_values:
set_count = add_props(values=missing_values)(Metrics.COUNT_IN_SET.value)
null_count_value_res += set_count(column_obj).dl_fn(runner)
missing_count_values = next(
(
literal_eval(param.value)
for param in test_case.parameterValues
if param.name == "missingCountValue"
)
)
status, result = test_case_status_result(null_count_value_res, missing_count_values)
return TestCaseResult(
timestamp=execution_date,

View File

@ -16,7 +16,10 @@ ColumnValuesToBeNotNull validation implementation
import traceback
from ast import literal_eval
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -28,16 +31,35 @@ from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.core import add_props
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
def test_case_status_result(set_count_res):
return (
TestCaseStatus.Success if set_count_res == 0 else TestCaseStatus.Failed,
f"Found countInSet={set_count_res}. It should be 0.",
)
# pylint: disable=abstract-class-instantiated
@singledispatch
def column_values_not_in_set(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_values_not_in_set.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
@ -85,8 +107,7 @@ def column_values_not_in_set(
testResultValue=[TestResultValue(name="countForbiddenValues", value=None)],
)
status = TestCaseStatus.Success if set_count_res == 0 else TestCaseStatus.Failed
result = f"Found countInSet={set_count_res}. It should be 0."
status, result = test_case_status_result(set_count_res)
return TestCaseResult(
timestamp=execution_date,
@ -96,3 +117,30 @@ def column_values_not_in_set(
TestResultValue(name="countForbiddenValues", value=str(set_count_res))
],
)
@column_values_not_in_set.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
forbidden_value = next(
(
literal_eval(param.value)
for param in test_case.parameterValues
if param.name == "forbiddenValues"
)
)
set_count = add_props(values=forbidden_value)(Metrics.COUNT_IN_SET.value)
set_count_res = set_count(column_obj).dl_fn(runner) # pylint: disable=no-member
status, result = test_case_status_result(set_count_res)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[
TestResultValue(name="countForbiddenValues", value=str(set_count_res))
],
)

View File

@ -16,7 +16,10 @@ ColumnValuesToBeBetween validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -27,6 +30,7 @@ from metadata.generated.schema.tests.basic import (
from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
from metadata.utils.test_suite import get_test_case_param_value
@ -34,14 +38,34 @@ from metadata.utils.test_suite import get_test_case_param_value
logger = test_suite_logger()
def test_case_status_result(min_bound, max_bound, sum_value_res):
return (
TestCaseStatus.Success
if min_bound <= sum_value_res <= max_bound
else TestCaseStatus.Failed,
f"Found sum={sum_value_res} vs."
+ f" the expected min={min_bound}, max={max_bound}.",
)
@singledispatch
def column_values_sum_to_be_between(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_values_sum_to_be_between.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
:param test_case: ColumnValuesToBeBetween
:param test_case: columnValuesSumToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
@ -82,7 +106,7 @@ def column_values_sum_to_be_between(
min_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"minValueForMeanInCol",
"minValueForColSum",
float,
default=float("-inf"),
)
@ -94,15 +118,7 @@ def column_values_sum_to_be_between(
default=float("inf"),
)
status = (
TestCaseStatus.Success
if min_bound <= sum_value_res <= max_bound
else TestCaseStatus.Failed
)
result = (
f"Found sum={sum_value_res} vs."
+ f" the expected min={min_bound}, max={max_bound}."
)
status, result = test_case_status_result(min_bound, max_bound, sum_value_res)
return TestCaseResult(
timestamp=execution_date,
@ -110,3 +126,42 @@ def column_values_sum_to_be_between(
result=result,
testResultValue=[TestResultValue(name="sum", value=str(sum_value_res))],
)
@column_values_sum_to_be_between.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate Column Values metric
:param test_case: columnValuesSumToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
min_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"minValueForColSum",
float,
default=float("-inf"),
)
max_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"maxValueForColSum",
float,
default=float("inf"),
)
sum_value_res = Metrics.SUM.value(column_obj).dl_fn(runner)
status, result = test_case_status_result(min_bound, max_bound, sum_value_res)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="sum", value=str(sum_value_res))],
)

View File

@ -16,7 +16,10 @@ ColumnValuesToBeBetween validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -27,6 +30,7 @@ from metadata.generated.schema.tests.basic import (
from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
from metadata.utils.test_suite import get_test_case_param_value
@ -34,10 +38,30 @@ from metadata.utils.test_suite import get_test_case_param_value
logger = test_suite_logger()
def test_case_status_result(min_value_res, max_value_res, min_bound, max_bound):
return (
TestCaseStatus.Success
if min_value_res >= min_bound and max_value_res <= max_bound
else TestCaseStatus.Failed,
f"Found min={min_value_res}, max={max_value_res} vs."
+ f" the expected min={min_bound}, max={max_bound}.",
)
@singledispatch
def column_values_to_be_between(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_values_to_be_between.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
@ -101,14 +125,55 @@ def column_values_to_be_between(
default=float("inf"),
)
status = (
TestCaseStatus.Success
if min_value_res >= min_bound and max_value_res <= max_bound
else TestCaseStatus.Failed
)
result = (
f"Found min={min_value_res}, max={max_value_res} vs."
+ f" the expected min={min_bound}, max={max_bound}."
status, result = test_case_status_result(
min_value_res, max_value_res, min_bound, max_bound
)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[
TestResultValue(name="min", value=str(min_value_res)),
TestResultValue(name="max", value=str(max_value_res)),
],
)
@column_values_to_be_between.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate Column Values metric
:param test_case: ColumnValuesToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
min_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"minValue",
float,
default=float("-inf"),
)
max_bound = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"maxValue",
float,
default=float("inf"),
)
min_value_res = Metrics.MIN.value(column_obj).dl_fn(runner)
max_value_res = Metrics.MAX.value(column_obj).dl_fn(runner)
status, result = test_case_status_result(
min_value_res, max_value_res, min_bound, max_bound
)
return TestCaseResult(

View File

@ -16,7 +16,10 @@ ColumnValuesToBeNotNull validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -27,16 +30,27 @@ from metadata.generated.schema.tests.basic import (
from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
@singledispatch
def column_values_to_be_not_null(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_values_to_be_not_null.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
@ -75,10 +89,10 @@ def column_values_to_be_not_null(
testResultValue=[TestResultValue(name="nullCount", value=None)],
)
status = (
TestCaseStatus.Success if null_count_value_res == 0 else TestCaseStatus.Failed
status, result = (
TestCaseStatus.Success if null_count_value_res == 0 else TestCaseStatus.Failed,
f"Found nullCount={null_count_value_res}. It should be 0.",
)
result = f"Found nullCount={null_count_value_res}. It should be 0."
return TestCaseResult(
timestamp=execution_date,
@ -88,3 +102,26 @@ def column_values_to_be_not_null(
TestResultValue(name="nullCount", value=str(null_count_value_res))
],
)
@column_values_to_be_not_null.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
null_count_value_res = Metrics.NULL_COUNT.value(column_obj).dl_fn(runner)
status, result = (
TestCaseStatus.Success if null_count_value_res == 0 else TestCaseStatus.Failed,
f"Found nullCount={null_count_value_res}. It should be 0.",
)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[
TestResultValue(name="nullCount", value=str(null_count_value_res))
],
)

View File

@ -12,11 +12,13 @@
"""
ColumnValuesToBeUnique validation implementation
"""
# pylint: disable=duplicate-code,protected-access
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
# pylint: disable=duplicate-code,protected-access
from pandas import DataFrame
from sqlalchemy import inspect
from sqlalchemy.orm.util import AliasedClass
@ -28,16 +30,37 @@ from metadata.generated.schema.tests.basic import (
from metadata.generated.schema.tests.testCase import TestCase
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.column_base_model import fetch_column_obj
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
def test_case_status_result(value_count_value_res, unique_count_value_res):
return (
TestCaseStatus.Success
if value_count_value_res == unique_count_value_res
else TestCaseStatus.Failed,
f"Found valuesCount={value_count_value_res} vs. uniqueCount={unique_count_value_res}."
+ " Both counts should be equal for column values to be unique.",
)
@singledispatch
def column_values_to_be_unique(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@column_values_to_be_unique.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric
@ -88,16 +111,34 @@ def column_values_to_be_unique(
],
)
status = (
TestCaseStatus.Success
if value_count_value_res == unique_count_value_res
else TestCaseStatus.Failed
status, result = test_case_status_result(
value_count_value_res, unique_count_value_res
)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[
TestResultValue(name="valueCount", value=str(value_count_value_res)),
TestResultValue(name="uniqueCount", value=str(unique_count_value_res)),
],
)
@column_values_to_be_unique.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
column_obj = fetch_column_obj(test_case.entityLink.__root__, runner)
value_count_value_res = Metrics.COUNT.value(column_obj).dl_fn(runner)
unique_count_value_res = Metrics.UNIQUE_COUNT.value(column_obj).dl_query(runner)
status, result = test_case_status_result(
value_count_value_res, unique_count_value_res
)
result = (
f"Found valuesCount={value_count_value_res} vs. uniqueCount={unique_count_value_res}."
+ " Both counts should be equal for column values to be unique."
)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,

View File

@ -15,7 +15,7 @@ ColumnValuesToBeNotNull validation implementation
# pylint: disable=duplicate-code
import traceback
from datetime import datetime
from typing import Optional
from typing import Optional, Union
from sqlalchemy import inspect
from sqlalchemy.exc import CompileError
@ -62,9 +62,9 @@ def _get_match_count(like_count, regex_count, runner, col) -> Optional[int]:
def column_values_to_match_regex(
test_case: TestCase,
execution_date: datetime,
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric

View File

@ -15,7 +15,8 @@ ColumnValuesToBeNotNull validation implementation
# pylint: disable=duplicate-code
import traceback
from datetime import datetime
from typing import Optional
from functools import singledispatch
from typing import Optional, Union
from sqlalchemy import inspect
from sqlalchemy.exc import CompileError
@ -63,10 +64,11 @@ def _get_not_match_count(not_like_count, not_regex_count, runner, col) -> Option
return not_like_count_dict.get(Metrics.NOT_LIKE_COUNT.name)
@singledispatch
def column_values_to_not_match_regex(
test_case: TestCase,
execution_date: datetime,
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate Column Values metric

View File

@ -105,7 +105,7 @@ validation_enum_registry.add("tableColumnToMatchSet")(table_column_to_match_set)
validation_enum_registry.add("tableColumnNameToExist")(table_column_name_to_exist)
validation_enum_registry.add("tableCustomSQLQuery")(table_custom_sql_query)
# # Column Tests
# # # Column Tests
validation_enum_registry.add("columnValuesToBeBetween")(column_values_to_be_between)
validation_enum_registry.add("columnValuesToBeUnique")(column_values_to_be_unique)
validation_enum_registry.add("columnValuesToBeNotNull")(column_values_to_be_not_null)
@ -131,7 +131,7 @@ validation_enum_registry.add("columnValueStdDevToBeBetween")(
column_value_stddev_to_be_between
)
# # Column Session Tests
# # # Column Session Tests
validation_enum_registry.add("columnValuesToBeNotInSet")(column_values_not_in_set)
validation_enum_registry.add("columnValuesToBeInSet")(column_values_in_set)
validation_enum_registry.add("columnValuesToMatchRegex")(column_values_to_match_regex)

View File

@ -16,7 +16,10 @@ TableColumnCountToBeBetween validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -32,10 +35,20 @@ from metadata.utils.test_suite import get_test_case_param_value
logger = test_suite_logger()
@singledispatch
def table_column_count_to_be_between(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@table_column_count_to_be_between.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate row count metric
@ -95,3 +108,49 @@ def table_column_count_to_be_between(
result=result,
testResultValue=[TestResultValue(name="columnCount", value=column_count)],
)
@table_column_count_to_be_between.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate row count metric
Args:
test_case: test case type to be ran. Used to dispatch
table_profile: table profile results
execution_date: datetime of the test execution
Returns:
TestCaseResult with status and results
"""
column_count = len(runner.columns)
min_ = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"minColValue",
int,
default=float("-inf"),
)
max_ = get_test_case_param_value(
test_case.parameterValues, # type: ignore
"maxColValue",
int,
default=float("inf"),
)
status = (
TestCaseStatus.Success
if min_ <= column_count <= max_
else TestCaseStatus.Failed
)
result = f"Found {column_count} column vs. the expected range [{min_}, {max_}]."
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="columnCount", value=column_count)],
)

View File

@ -16,7 +16,10 @@ TableColumnCountToEqual validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -31,10 +34,20 @@ from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
@singledispatch
def table_column_count_to_equal(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@table_column_count_to_equal.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate row count metric
@ -75,3 +88,27 @@ def table_column_count_to_equal(
result=result,
testResultValue=[TestResultValue(name="columnCount", value=str(column_count))],
)
@table_column_count_to_equal.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
column_count = len(runner.columns)
count = next(
int(param_value.value)
for param_value in test_case.parameterValues
if param_value.name == "columnCount"
)
status = TestCaseStatus.Success if column_count == count else TestCaseStatus.Failed
result = f"Found {column_count} columns vs. the expected {count}"
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="columnCount", value=str(column_count))],
)

View File

@ -17,8 +17,10 @@ TableColumnCountToBeBetween validation implementation
import reprlib
import traceback
from datetime import datetime
from typing import List
from functools import singledispatch
from typing import List, Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -45,10 +47,20 @@ def format_column_list(status: TestCaseStatus, cols: List):
return cols
@singledispatch
def table_column_name_to_exist(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@table_column_name_to_exist.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate row count metric
@ -107,3 +119,42 @@ def table_column_name_to_exist(
result=result,
testResultValue=[TestResultValue(name="columnNameExits", value=str(True))],
)
@table_column_name_to_exist.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate row count metric
Args:
test_case: test case type to be ran. Used to dispatch
table_profile: table profile results
execution_date: datetime of the test execution
Returns:
TestCaseResult with status and results
"""
column_names = list(runner.columns)
column_name = next(
param_value.value
for param_value in test_case.parameterValues
if param_value.name == "columnName"
)
status = (
TestCaseStatus.Success if column_name in column_names else TestCaseStatus.Failed
)
result = (
f"{column_name} column expected vs {format_column_list(status, column_names)}"
)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="columnNameExits", value=str(True))],
)

View File

@ -18,8 +18,10 @@ import collections
import reprlib
import traceback
from datetime import datetime
from typing import List
from functools import singledispatch
from typing import List, Union
from pandas import DataFrame
from sqlalchemy import inspect
from metadata.generated.schema.tests.basic import (
@ -46,10 +48,20 @@ def format_column_list(status: TestCaseStatus, cols: List):
return cols
@singledispatch
def table_column_to_match_set(
test_case: TestCase,
execution_date: datetime,
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@table_column_to_match_set.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate row count metric
@ -117,3 +129,57 @@ def table_column_to_match_set(
)
],
)
@table_column_to_match_set.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate row count metric
Args:
test_case: test case type to be ran. Used to dispatch
table_profile: table profile results
execution_date: datetime of the test execution
Returns:
TestCaseResult with status and results
"""
column_names = list(runner.columns)
column_name = next(
param_value.value
for param_value in test_case.parameterValues
if param_value.name == "columnNames"
)
ordered = next(
(
bool(param_value.value)
for param_value in test_case.parameterValues
if param_value.name == "ordered"
),
None,
)
expected_column_names = [item.strip() for item in column_name.split(",")]
# pylint: disable=unnecessary-lambda-assignment
compare = lambda x, y: collections.Counter(x) == collections.Counter(y)
if ordered:
_status = expected_column_names == column_names
else:
_status = compare(expected_column_names, column_names)
status = TestCaseStatus.Success if _status else TestCaseStatus.Failed
result = (
f"Found {format_column_list(status, column_names)} column vs. "
f"the expected column names {format_column_list(status, expected_column_names)}."
)
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="columnNames", value=str(column_names))],
)

View File

@ -16,6 +16,7 @@ TableColumnCountToBeBetween validation implementation
import traceback
from datetime import datetime
from typing import Union
from sqlalchemy import text
@ -32,9 +33,9 @@ logger = test_suite_logger()
def table_custom_sql_query(
test_case: TestCase,
execution_date: datetime,
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate custom SQL tests. Tests will fail if number of rows

View File

@ -16,6 +16,10 @@ TableRowCountToBeBetween validation implementation
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
from pandas import DataFrame
from metadata.generated.schema.tests.basic import (
TestCaseResult,
@ -30,10 +34,50 @@ from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
def _return_test_case(
row_count_value,
execution_date,
test_case,
):
min_ = next(
int(param_value.value)
for param_value in test_case.parameterValues
if param_value.name == "minValue"
)
max_ = next(
int(param_value.value)
for param_value in test_case.parameterValues
if param_value.name == "maxValue"
)
status = (
TestCaseStatus.Success
if min_ <= row_count_value <= max_
else TestCaseStatus.Failed
)
result = f"Found {row_count_value} rows vs. the expected range [{min_}, {max_}]."
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="rowCount", value=str(row_count_value))],
)
@singledispatch
def table_row_count_to_be_between(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@table_row_count_to_be_between.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate row count metric
@ -62,26 +106,21 @@ def table_row_count_to_be_between(
testResultValue=[TestResultValue(name="rowCount", value=None)],
)
min_ = next(
int(param_value.value)
for param_value in test_case.parameterValues
if param_value.name == "minValue"
)
max_ = next(
int(param_value.value)
for param_value in test_case.parameterValues
if param_value.name == "maxValue"
)
return _return_test_case(row_count_value, execution_date, test_case)
status = (
TestCaseStatus.Success
if min_ <= row_count_value <= max_
else TestCaseStatus.Failed
)
result = f"Found {row_count_value} rows vs. the expected range [{min_}, {max_}]."
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="rowCount", value=str(row_count_value))],
)
@table_row_count_to_be_between.register
def _(
runner: DataFrame,
test_case: TestCase,
execution_date: Union[datetime, float],
):
"""
Validate row count metric
:param test_case: TableRowCountToBeBetween
:param table_profile: should contain row count metric
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
row_count_value = Metrics.ROW_COUNT.value().dl_fn(runner)
return _return_test_case(row_count_value, execution_date, test_case)

View File

@ -12,10 +12,13 @@
"""
TableRowCountToEqual validation implementation
"""
# pylint: disable=duplicate-code
import traceback
from datetime import datetime
from functools import singledispatch
from typing import Union
# pylint: disable=duplicate-code
from pandas import DataFrame
from metadata.generated.schema.tests.basic import (
TestCaseResult,
@ -30,10 +33,20 @@ from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
@singledispatch
def table_row_count_to_equal(
runner,
test_case: TestCase,
execution_date: datetime,
execution_date: Union[datetime, float],
):
raise NotImplementedError
@table_row_count_to_equal.register
def _(
runner: QueryRunner,
test_case: TestCase,
execution_date: Union[datetime, float],
) -> TestCaseResult:
"""
Validate row count metric
@ -79,3 +92,26 @@ def table_row_count_to_equal(
result=result,
testResultValue=[TestResultValue(name="rowCount", value=str(row_count_value))],
)
@table_row_count_to_equal.register
def _(runner: DataFrame, test_case: TestCase, execution_date: Union[datetime, float]):
row_count_value = Metrics.ROW_COUNT.value().dl_fn(runner)
value = next(
int(param_value.value)
for param_value in test_case.parameterValues
if param_value.name == "value"
)
status = (
TestCaseStatus.Success if row_count_value == value else TestCaseStatus.Failed
)
result = f"Found {row_count_value} rows vs. the expected {value}"
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=status,
result=result,
testResultValue=[TestResultValue(name="rowCount", value=str(row_count_value))],
)

View File

@ -62,7 +62,7 @@ def read_json_from_azure(
account_url, storage_options=storage_options, typ="series"
)
data = _get_json_text(key, dataframe.to_dict()) # pylint: disable=no-member
data = _get_json_text(key, dataframe.to_dict())
if isinstance(data, list):
return [pd.DataFrame.from_dict(data[:sample_size])]

View File

@ -0,0 +1,41 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper module to process the base model type for profiler and test suites
"""
from typing import Union
from pydantic import BaseModel
from metadata.generated.schema.entity.data.table import DataType
from metadata.utils.entity_link import get_decoded_column
def fetch_column_obj(entity_link, data_frame):
return ColumnBaseModel.col_base_model(data_frame[get_decoded_column(entity_link)])
class ColumnBaseModel(BaseModel):
name: str
datatype: Union[DataType, str]
@staticmethod
def col_base_model_list(data_frame_list):
return [
ColumnBaseModel(name=column, datatype=data_frame_list[0][column].dtype.name)
for column in data_frame_list[0].columns
]
@staticmethod
def col_base_model(col_series):
return ColumnBaseModel(name=col_series.name, datatype=col_series.dtype.name)

View File

@ -14,7 +14,6 @@ Test Table and Column Tests' validate implementations.
Each test should validate the Success, Failure and Aborted statuses
"""
import os
import unittest
from datetime import datetime
@ -22,6 +21,7 @@ from unittest.mock import patch
from uuid import uuid4
import sqlalchemy as sqa
from pandas import DataFrame
from sqlalchemy.orm import declarative_base
from metadata.generated.schema.entity.data.table import Column, DataType, Table
@ -33,6 +33,9 @@ from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.interfaces.datalake.datalake_test_suite_interface import (
DataLakeTestSuiteInterface,
)
from metadata.interfaces.sqalchemy.sqa_test_suite_interface import SQATestSuiteInterface
from metadata.test_suite.validations.core import validation_enum_registry
@ -56,6 +59,16 @@ TABLE = Table(
)
TEST_SUITE = TestSuite(name="my_test_suite", description="description")
DL_DATA = (
["1", "John", "Jo", "John Doe", "johnny b goode", 30],
["2", "Jane", "Ja", "Jone Doe", "Johnny d", 31],
["3", "John", "Joh", "John Doe", None, None],
) * 10
DATALAKE_DATA_FRAME = DataFrame(
DL_DATA, columns=["id", "name", "first name", "fullname", "nickname", "age"]
)
class User(Base):
@ -87,6 +100,8 @@ class testSuiteValidation(unittest.TestCase):
table_entity=TABLE,
ometa_client=None,
)
dl_runner = DATALAKE_DATA_FRAME
runner = sqa_profiler_interface.runner
engine = sqa_profiler_interface.session.get_bind()
session = sqa_profiler_interface.session
@ -142,15 +157,25 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValueLengthsToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueLengthsToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert res.testResultValue[0].value == "8"
assert res.testResultValue[1].value == "14"
assert dl_res.testResultValue[0].value == "4"
assert dl_res.testResultValue[1].value == "14"
test_case = TestCase(
name="my_test_case",
@ -164,15 +189,25 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValueLengthsToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueLengthsToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert res.testResultValue[0].value == "2"
assert res.testResultValue[1].value == "3"
assert dl_res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testResultValue[0].value == "2"
assert dl_res.testResultValue[1].value == "3"
test_case = TestCase(
name="my_test_case_two",
@ -185,13 +220,20 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValueLengthsToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueLengthsToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testCaseStatus == TestCaseStatus.Success
def test_column_value_max_to_be_between(self):
"""test column value max to be between"""
@ -207,14 +249,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValueMaxToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueMaxToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert res.testResultValue[0].value == "31"
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testResultValue[0].value == "31.0"
test_case = TestCase(
name="my_test_case_two",
@ -227,13 +278,21 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValueMaxToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueMaxToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testCaseStatus == TestCaseStatus.Failed
def test_column_value_mean_to_be_between(self):
"""test column value mean to be between"""
@ -249,14 +308,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValueMeanToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueMeanToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert res.testResultValue[0].value == "30.5"
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testResultValue[0].value == "30.5"
test_case = TestCase(
name="my_test_case_two",
@ -269,13 +337,21 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValueMeanToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueMeanToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testCaseStatus == TestCaseStatus.Success
def test_column_value_median_to_be_between(self):
"""test column value median to be between"""
@ -286,19 +362,28 @@ class testSuiteValidation(unittest.TestCase):
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"),
parameterValues=[
TestCaseParameterValue(name="minValueForMedianInCol", value="1"),
TestCaseParameterValue(name="maxColValue", value="10"),
TestCaseParameterValue(name="maxValueForMedianInCol", value="10"),
],
)
res = validation_enum_registry.registry["columnValueMedianToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueMedianToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert res.testResultValue[0].value == "30.0"
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testResultValue[0].value == "30.5"
def test_column_value_min_to_be_between(self):
"""test column value min to be between"""
@ -314,14 +399,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValueMinToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueMinToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert res.testResultValue[0].value == "30"
assert dl_res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testResultValue[0].value == "30.0"
test_case = TestCase(
name="my_test_case_two",
@ -334,13 +428,21 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValueMinToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueMinToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testCaseStatus == TestCaseStatus.Success
def test_column_value_stddev_to_be_between(self):
"""test column value stddev to be between"""
@ -356,14 +458,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValueStdDevToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueStdDevToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert res.testResultValue[0].value == "0.25"
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testResultValue[0].value == "0.512989176042577"
test_case = TestCase(
name="my_test_case_two",
@ -376,13 +487,21 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValueStdDevToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValueStdDevToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testCaseStatus == TestCaseStatus.Success
def test_column_value_in_set(self):
"""test column value in set"""
@ -397,14 +516,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesToBeInSet"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValuesToBeInSet"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert res.testResultValue[0].value == "20"
assert dl_res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testResultValue[0].value == "20"
def test_column_values_missing_count_to_be_equal(self):
"""test column value missing count to be equal"""
@ -419,14 +547,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesMissingCount"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValuesMissingCount"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert res.testResultValue[0].value == "10"
assert dl_res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testResultValue[0].value == "10"
test_case = TestCase(
name="my_test_case",
@ -440,11 +577,18 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesMissingCount"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValuesMissingCount"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert res.testResultValue[0].value == "20"
assert dl_res.testResultValue[0].value == "20"
def test_column_values_not_in_set(self):
"""test column value not in set"""
@ -459,14 +603,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesToBeNotInSet"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValuesToBeNotInSet"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert res.testResultValue[0].value == "20"
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testResultValue[0].value == "20"
def test_column_sum_to_be_between(self):
"""test column value sum to be between"""
@ -482,14 +635,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesSumToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValuesSumToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert res.testResultValue[0].value == "610"
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testResultValue[0].value == "610.0"
test_case = TestCase(
name="my_test_case_two",
@ -502,13 +664,21 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesSumToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValuesSumToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testCaseStatus == TestCaseStatus.Success
def test_column_values_to_be_between(self):
"""test column value to be between"""
@ -524,14 +694,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValuesToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert res.testResultValue[0].value == "30"
assert dl_res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testResultValue[0].value == "30.0"
test_case = TestCase(
name="my_test_case_two",
@ -544,13 +723,21 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValuesToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testCaseStatus == TestCaseStatus.Success
def test_column_values_to_be_not_null(self):
"""test column value to be not null"""
@ -562,14 +749,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesToBeNotNull"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValuesToBeNotNull"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert res.testResultValue[0].value == "10"
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testResultValue[0].value == "10"
def test_column_values_to_be_unique(self):
"""test column value to be unique"""
@ -581,15 +777,25 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesToBeUnique"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["columnValuesToBeUnique"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert res.testResultValue[0].value == "20"
assert res.testResultValue[1].value == "0"
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testResultValue[0].value == "30"
assert dl_res.testResultValue[1].value == "2"
def test_column_values_to_match_regex(self):
"""test column value to match regex"""
@ -604,9 +810,9 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesToMatchRegex"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
@ -626,9 +832,9 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["columnValuesToNotMatchRegex"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
@ -649,14 +855,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["tableColumnCountToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["tableColumnCountToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert res.testResultValue[0].value == "6"
assert dl_res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testResultValue[0].value == "6"
test_case = TestCase(
name="my_test_case_two",
@ -669,13 +884,21 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["tableColumnCountToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["tableColumnCountToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testCaseStatus == TestCaseStatus.Success
def test_table_column_count_to_equal(self):
"""test column value to be equal"""
@ -688,14 +911,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["tableColumnCountToEqual"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["tableColumnCountToEqual"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert res.testResultValue[0].value == "6"
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testResultValue[0].value == "6"
def test_table_column_name_to_exist(self):
"""test column name to exist"""
@ -708,14 +940,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["tableColumnNameToExist"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["tableColumnNameToExist"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert res.testResultValue[0].value == "True"
assert dl_res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testResultValue[0].value == "True"
def test_column_to_match_set(self):
"""test column names to match set"""
@ -730,17 +971,29 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["tableColumnToMatchSet"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["tableColumnToMatchSet"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert (
res.testResultValue[0].value
== "['first name', 'id', 'name', 'fullname', 'nickname', 'age']"
)
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert (
dl_res.testResultValue[0].value
== "['id', 'name', 'first name', 'fullname', 'nickname', 'age']"
)
test_case = TestCase(
name="my_test_case",
@ -755,12 +1008,19 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["tableColumnToMatchSet"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["tableColumnToMatchSet"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testCaseStatus == TestCaseStatus.Failed
test_case = TestCase(
name="my_test_case",
@ -776,12 +1036,19 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["tableColumnToMatchSet"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["tableColumnToMatchSet"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testCaseStatus == TestCaseStatus.Failed
def test_table_custom_sql_query(self):
"""test custom sql"""
@ -798,9 +1065,9 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["tableCustomSQLQuery"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
@ -820,9 +1087,9 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["tableCustomSQLQuery"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert res.testCaseStatus == TestCaseStatus.Success
@ -841,14 +1108,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["tableRowCountToBeBetween"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["tableRowCountToBeBetween"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert res.testResultValue[0].value == "30"
assert dl_res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testResultValue[0].value == "30"
test_case = TestCase(
name="my_test_case_two",
@ -861,8 +1137,11 @@ class testSuiteValidation(unittest.TestCase):
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Success
assert res.testResultValue[0].value == "30"
assert dl_res.testCaseStatus == TestCaseStatus.Success
assert dl_res.testResultValue[0].value == "30"
def test_table_row_count_to_be_equal(self):
"""test row count to be equal"""
@ -877,14 +1156,23 @@ class testSuiteValidation(unittest.TestCase):
)
res = validation_enum_registry.registry["tableRowCountToEqual"](
self.runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
)
dl_res = validation_enum_registry.registry["tableRowCountToEqual"](
self.dl_runner,
test_case=test_case,
execution_date=EXECUTION_DATE.timestamp(),
runner=self.runner,
)
assert isinstance(res, TestCaseResult)
assert isinstance(dl_res, TestCaseResult)
assert res.testCaseStatus == TestCaseStatus.Failed
assert res.testResultValue[0].value == "30"
assert dl_res.testCaseStatus == TestCaseStatus.Failed
assert dl_res.testResultValue[0].value == "30"
@classmethod
def tearDownClass(cls) -> None:

View File

@ -16,17 +16,11 @@ from typing import Callable, Optional
import requests
from flask import Blueprint
from openmetadata_managed_apis.api.response import ApiResponse
from openmetadata_managed_apis.utils.logger import routes_logger
from requests.exceptions import ConnectionError
from urllib3.exceptions import NewConnectionError
try:
from importlib.metadata import version
except ImportError:
from importlib_metadata import version
from openmetadata_managed_apis.api.response import ApiResponse
logger = routes_logger()
IP_SERVICES = ["https://api.ipify.org", "https://api.my-ip.io/ip"]

View File

@ -1,7 +1,6 @@
from typing import Optional
from airflow.configuration import conf
from pydantic import SecretStr
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
from metadata.generated.schema.security.secrets.secretsManagerProvider import (

View File

@ -20,11 +20,6 @@ from openmetadata_managed_apis.workflows.ingestion.common import (
metadata_ingestion_workflow,
)
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)

View File

@ -20,11 +20,6 @@ from openmetadata_managed_apis.workflows.ingestion.common import (
metadata_ingestion_workflow,
)
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)

View File

@ -16,15 +16,6 @@ import json
from airflow import DAG
from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.utils.logger import set_loggers_level
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
@ -35,6 +26,9 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Sink,
WorkflowConfig,
)
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.utils.logger import set_loggers_level
def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):

View File

@ -11,6 +11,7 @@
"""
Metadata DAG function builder
"""
import tempfile
from pathlib import Path
from airflow import DAG
@ -21,6 +22,9 @@ from openmetadata_managed_apis.workflows.ingestion.common import (
metadata_ingestion_workflow,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
BulkSink,
OpenMetadataWorkflowConfig,
@ -28,17 +32,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Stage,
)
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
import tempfile
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
def build_usage_config_from_file(
ingestion_pipeline: IngestionPipeline, filename: str

View File

@ -62,7 +62,7 @@ os.environ["AIRFLOW__OPENMETADATA_AIRFLOW_APIS__DAG_RUNNER_TEMPLATE"] = str(
from airflow import DAG
from airflow.models import DagBag, DagModel
from airflow.operators.bash import BashOperator
from airflow.utils import db, timezone
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from openmetadata_managed_apis.operations.delete import delete_dag_id

View File

@ -83,18 +83,22 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
private void validateTestParameters(
List<TestCaseParameterValue> parameterValues, List<TestCaseParameter> parameterDefinition) {
if (parameterDefinition.isEmpty() && !parameterValues.isEmpty()) {
throw new IllegalArgumentException("Parameter Values doesn't match Test Definition Parameters");
}
Map<String, Object> values = new HashMap<>();
for (TestCaseParameterValue testCaseParameterValue : parameterValues) {
values.put(testCaseParameterValue.getName(), testCaseParameterValue.getValue());
}
for (TestCaseParameter parameter : parameterDefinition) {
if (Boolean.TRUE.equals(parameter.getRequired())
&& (!values.containsKey(parameter.getName()) || values.get(parameter.getName()) == null)) {
throw new IllegalArgumentException(
"Required parameter " + parameter.getName() + " is not passed in parameterValues");
if (parameterValues != null) {
if (parameterDefinition.isEmpty() && !parameterValues.isEmpty()) {
throw new IllegalArgumentException("Parameter Values doesn't match Test Definition Parameters");
}
Map<String, Object> values = new HashMap<>();
for (TestCaseParameterValue testCaseParameterValue : parameterValues) {
values.put(testCaseParameterValue.getName(), testCaseParameterValue.getValue());
}
for (TestCaseParameter parameter : parameterDefinition) {
if (Boolean.TRUE.equals(parameter.getRequired())
&& (!values.containsKey(parameter.getName()) || values.get(parameter.getName()) == null)) {
throw new IllegalArgumentException(
"Required parameter " + parameter.getName() + " is not passed in parameterValues");
}
}
}
}

View File

@ -39,7 +39,7 @@ import {
getNameFromFQN,
replaceAllSpacialCharWith_,
} from '../../../utils/CommonUtils';
import { getDecodedFqn, getEncodedFqn } from '../../../utils/StringsUtils';
import { getDecodedFqn } from '../../../utils/StringsUtils';
import { generateEntityLink } from '../../../utils/TableUtils';
import { showErrorToast } from '../../../utils/ToastUtils';
import RichTextEditor from '../../common/rich-text-editor/RichTextEditor';
@ -161,10 +161,7 @@ const TestCaseForm: React.FC<TestCaseFormProps> = ({
return {
name: value.testName,
entityLink: generateEntityLink(
getEncodedFqn(decodedEntityFQN, true),
isColumnFqn
),
entityLink: generateEntityLink(decodedEntityFQN, isColumnFqn),
parameterValues: parameterValues as TestCaseParameterValue[],
testDefinition: {
id: value.testTypeId,