Fixes #7299 by updating GE submodule to new test API (#7332)

* Update GE logic to the new test API

* Cleaned up part of the code from
update to TestSuiteMixin

* Added + fixed tests

* Deleted unused files for GE tests

* fixed python tests
This commit is contained in:
Teddy 2022-09-12 05:07:19 +02:00 committed by GitHub
parent 2a6ee9c7c6
commit 0677b5258f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 453 additions and 1239 deletions

View File

@ -17,6 +17,7 @@ checkpoints actions.
"""
import traceback
import warnings
from datetime import datetime, timezone
from typing import Dict, Optional, Union
from great_expectations.checkpoint.actions import ValidationAction
@ -37,16 +38,24 @@ from sqlalchemy.engine.base import Connection, Engine
from sqlalchemy.engine.url import URL
from metadata.generated.schema.entity.data.table import Table
from metadata.great_expectations.builders.generic_test_case_builder import (
GenericTestCaseBuilder,
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
TestResultValue,
)
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.tests.testDefinition import (
EntityType,
TestCaseParameterDefinition,
TestPlatform,
)
from metadata.great_expectations.builders.supported_ge_tests import SupportedGETests
from metadata.great_expectations.utils.ometa_config_handler import (
create_jinja_environment,
create_ometa_connection_obj,
render_template,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.logger import great_expectations_logger
logger = great_expectations_logger()
@ -71,6 +80,7 @@ class OpenMetadataValidationAction(ValidationAction):
config_file_path: str = None,
database_service_name: Optional[str] = None,
ometa_service_name: Optional[str] = None,
test_suite_name: Optional[str] = None,
):
super().__init__(data_context)
self._ometa_service_name = (
@ -79,13 +89,14 @@ class OpenMetadataValidationAction(ValidationAction):
self._database_service_name = database_service_name
self.config_file_path = config_file_path
self.ometa_conn = self._create_ometa_connection()
self.test_suite_name = test_suite_name
@property
def database_service_name(self):
"""Handle depracation warning"""
if self._ometa_service_name:
warnings.warn(
"`ometa_service_name` will be deperacted in future releases. Use `database_service_name` instead",
"`ometa_service_name` will be deperacted in openmetadata-ingestion==0.13. Use `database_service_name` instead",
DeprecationWarning,
)
@ -225,28 +236,119 @@ class OpenMetadataValidationAction(ValidationAction):
return OpenMetadata(create_ometa_connection_obj(rendered_config))
def _build_test_case_fqn(self, table_fqn: str, result: Dict) -> str:
"""build test case fqn from table entity and GE test results
Args:
table_fqn (str): table fully qualified name
result (Dict): result from great expectation tests
"""
split_table_fqn = table_fqn.split(".")
return fqn.build(
self.ometa_conn,
entity_type=TestCase,
service_name=split_table_fqn[0],
database_name=split_table_fqn[1],
schema_name=split_table_fqn[2],
table_name=split_table_fqn[3],
column_name=result["expectation_config"]["kwargs"].get("column"),
test_case_name=result["expectation_config"]["expectation_type"],
)
def _build_entity_link_from_fqn(
self, table_fqn: str, column_name: Optional[str]
) -> str:
"""build entity link
Args:
table_fqn (str): table fqn
column_name (Optionla[str]): column name
Returns:
str: _description_
"""
return (
f"<#E::table::{table_fqn}::columns::{column_name}>"
if column_name
else f"<#E::table::{table_fqn}>"
)
def _handle_test_case(self, result: Dict, table_entity: Table):
"""Handle adding test to table entity based on the test case.
Test is added using a generic test case builder that accepts
a specific test builder. Test builder is retrieved from
`SupportedGETests` based on the `expectation_type` fetch from GE result.
Test Definitions will be created on the fly from the results of the
great expectations run. We will then write the test case results to the
specific test case.
Args:
result: GE test result
table_entity: table entity object
"""
try:
test_builder = SupportedGETests[
result["expectation_config"]["expectation_type"]
].value
test_builder(result, self.ometa_conn, table_entity)
GenericTestCaseBuilder(
test_case_builder=test_builder
).build_test_from_builder()
except KeyError:
logger.debug(traceback.format_exc())
logger.warning(
"GE Test %s not yet support. Skipping test ingestion",
result["expectation_config"]["expectation_type"],
test_suite = self.ometa_conn.get_or_create_test_suite(
test_suite_name=self.test_suite_name or "great_expectation_default",
test_suite_description="Test Suite Created from Great Expectation checkpoint run",
)
test_definition = self.ometa_conn.get_or_create_test_definition(
test_definition_fqn=result["expectation_config"]["expectation_type"],
test_definition_description=result["expectation_config"][
"expectation_type"
].replace("_", " "),
entity_type=EntityType.COLUMN
if "column" in result["expectation_config"]["kwargs"]
else EntityType.TABLE,
test_platforms=[TestPlatform.GreatExpectations],
test_case_parameter_definition=[
TestCaseParameterDefinition(
name=key,
)
for key, _ in result["expectation_config"]["kwargs"].items()
if key not in {"column", "batch_id"}
],
)
test_case_fqn = self._build_test_case_fqn(
table_entity.fullyQualifiedName.__root__, result
)
test_case = self.ometa_conn.get_or_create_test_case(
test_case_fqn,
entity_link=self._build_entity_link_from_fqn(
table_entity.fullyQualifiedName.__root__,
fqn.split_test_case_fqn(test_case_fqn).column,
),
test_suite_fqn=test_suite.fullyQualifiedName.__root__,
test_definition_fqn=test_definition.fullyQualifiedName.__root__,
test_case_parameter_values=[
TestCaseParameterValue(
name=key,
value=str(value),
)
for key, value in result["expectation_config"]["kwargs"].items()
if key not in {"column", "batch_id"}
],
)
self.ometa_conn.add_test_case_results(
test_results=TestCaseResult(
timestamp=datetime.now(timezone.utc).timestamp(),
testCaseStatus=TestCaseStatus.Success
if result["success"]
else TestCaseStatus.Failed,
testResultValue=[
TestResultValue(
name="observed_value",
value=str(result["result"].get("observed_value")),
)
],
),
test_case_fqn=test_case.fullyQualifiedName.__root__,
)
logger.info(
f"Test case result for {test_case.fullyQualifiedName.__root__} successfully ingested"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(exc)

View File

@ -1,117 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base column test builder
"""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Dict
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.columnTest import ColumnTestCase
from metadata.generated.schema.type.basic import Timestamp
from metadata.ingestion.ometa.ometa_api import OpenMetadata
class BaseColumnTestBuilder(ABC):
"""Base class for the column test builder. This is used to send
Great Expectations test results to OMeta
"""
def __init__(self):
self.result = None
self.ometa_conn = None
self.table_entity = None
self.timestamp = None
def __call__(self, result: Dict, ometa_conn: OpenMetadata, table_entity: Table):
"""Used to update instance attribute value as instance builders
are only defined once in the Enum class
Args:
result: single result for a GE test result
ometa_conn: OMeta API connection
table_entity: table entity for the test
"""
self.result = result
self.ometa_conn = ometa_conn
self.table_entity = table_entity
self.timestamp = Timestamp(__root__=int(int(datetime.now().timestamp())))
def add_test(self) -> None:
"""Send an API request to add a test result to a table entity"""
self.ometa_conn.add_column_test(self.table_entity, self._build_test())
@staticmethod
def build_test_case(config, test_type) -> ColumnTestCase:
"""Build test case based on the test type
Args:
config: any instance of a column test case
test_type: any instance of a column test type
Return:
ColumnTestCase
"""
return ColumnTestCase(
config=config,
columnTestType=test_type,
)
def build_test_case_results(self) -> TestCaseResult:
"""Build test case result base on GE test result"""
return TestCaseResult(
timestamp=self.timestamp,
testCaseStatus=TestCaseStatus.Success
if self.result["success"]
else TestCaseStatus.Failed,
result=self._get_expectation_result(),
)
def _get_expectation_result(self):
"""Get the expectation result"""
if self.result["result"]:
if self.result["result"].get("unexpected_percent"):
return (
"Failing rows percentage: "
f"{str(self.result['result'].get('unexpected_percent'))}"
)
if self.result["result"].get("observed_value"):
return (
"Observed values: "
f"{str(self.result['result'].get('observed_value'))}"
)
return None
def build_test_request(self, *, config, test_type) -> CreateColumnTestRequest:
"""Build a test case request to add the test to the tabe
Args:
test_case: test case
test_case_result: a test case result
Return:
CreateColumnTestRequest
"""
return CreateColumnTestRequest(
columnName=self.result["expectation_config"]["kwargs"]["column"],
testCase=self.build_test_case(config=config, test_type=test_type),
result=self.build_test_case_results(),
updatedAt=self.timestamp,
)
@abstractmethod
def _build_test(self) -> CreateColumnTestRequest:
"""Used to create the column test request for the specific test.
Needs to be implemented by the specific builder.
"""

View File

@ -1,38 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.tests.column import columnValueMaxToBeBetween
from metadata.generated.schema.tests.columnTest import ColumnTestType
from metadata.great_expectations.builders.column.base_column_test_builder import (
BaseColumnTestBuilder,
)
class ColumnValueMaxToBeBetweenBuilder(BaseColumnTestBuilder):
"""Builder for `expect_column_value_lengths_to_be_between` GE expectation"""
def _build_test(self) -> CreateColumnTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=columnValueMaxToBeBetween.ColumnValueMaxToBeBetween(
minValueForMaxInCol=self.result["expectation_config"]["kwargs"].get(
"min_value"
),
maxValueForMaxInCol=self.result["expectation_config"]["kwargs"].get(
"max_value"
),
),
test_type=ColumnTestType.columnValueMaxToBeBetween,
)

View File

@ -1,38 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.tests.column import columnValueMinToBeBetween
from metadata.generated.schema.tests.columnTest import ColumnTestType
from metadata.great_expectations.builders.column.base_column_test_builder import (
BaseColumnTestBuilder,
)
class ColumnValueMinToBeBetweenBuilder(BaseColumnTestBuilder):
"""Builder for `expect_column_value_lengths_to_be_between` GE expectation"""
def _build_test(self) -> CreateColumnTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=columnValueMinToBeBetween.ColumnValueMinToBeBetween(
minValueForMinInCol=self.result["expectation_config"]["kwargs"].get(
"min_value"
),
maxValueForMinInCol=self.result["expectation_config"]["kwargs"].get(
"max_value"
),
),
test_type=ColumnTestType.columnValueMinToBeBetween,
)

View File

@ -1,34 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.tests.column import columnValuesLengthsToBeBetween
from metadata.generated.schema.tests.columnTest import ColumnTestType
from metadata.great_expectations.builders.column.base_column_test_builder import (
BaseColumnTestBuilder,
)
class ColumnValuesLengthsToBeBetweenBuilder(BaseColumnTestBuilder):
"""Builder for `expect_column_value_lengths_to_be_between` GE expectation"""
def _build_test(self) -> CreateColumnTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=columnValuesLengthsToBeBetween.ColumnValueLengthsToBeBetween(
minLength=self.result["expectation_config"]["kwargs"].get("min_value"),
maxLength=self.result["expectation_config"]["kwargs"].get("max_value"),
),
test_type=ColumnTestType.columnValueLengthsToBeBetween,
)

View File

@ -1,38 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.tests.column import columnValuesSumToBeBetween
from metadata.generated.schema.tests.columnTest import ColumnTestType
from metadata.great_expectations.builders.column.base_column_test_builder import (
BaseColumnTestBuilder,
)
class ColumnValueSumToBeBetweenBuilder(BaseColumnTestBuilder):
"""Builder for `expect_column_value_lengths_to_be_between` GE expectation"""
def _build_test(self) -> CreateColumnTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=columnValuesSumToBeBetween.ColumnValuesSumToBeBetween(
minValueForColSum=self.result["expectation_config"]["kwargs"].get(
"min_value"
),
maxValueForColSum=self.result["expectation_config"]["kwargs"].get(
"max_value"
),
),
test_type=ColumnTestType.columnValuesSumToBeBetween,
)

View File

@ -1,34 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.tests.column import columnValuesToBeBetween
from metadata.generated.schema.tests.columnTest import ColumnTestType
from metadata.great_expectations.builders.column.base_column_test_builder import (
BaseColumnTestBuilder,
)
class ColumnValuesToBeBetweenBuilder(BaseColumnTestBuilder):
"""Builder for `expect_column_value_to_be_between` GE expectation"""
def _build_test(self) -> CreateColumnTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=columnValuesToBeBetween.ColumnValuesToBeBetween(
minValue=self.result["expectation_config"]["kwargs"].get("max_value"),
maxValue=self.result["expectation_config"]["kwargs"].get("min_value"),
),
test_type=ColumnTestType.columnValuesToBeBetween,
)

View File

@ -1,33 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.tests.column import columnValuesToBeInSet
from metadata.generated.schema.tests.columnTest import ColumnTestType
from metadata.great_expectations.builders.column.base_column_test_builder import (
BaseColumnTestBuilder,
)
class ColumnValuesToBeInSetBuilder(BaseColumnTestBuilder):
"""Builder for `expect_column_values_to_not_be_in_set` GE expectation"""
def _build_test(self) -> CreateColumnTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=columnValuesToBeInSet.ColumnValuesToBeInSet(
allowedValues=self.result["expectation_config"]["kwargs"]["value_set"],
),
test_type=ColumnTestType.columnValuesToBeInSet,
)

View File

@ -1,35 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.tests.column import columnValuesToBeNotInSet
from metadata.generated.schema.tests.columnTest import ColumnTestType
from metadata.great_expectations.builders.column.base_column_test_builder import (
BaseColumnTestBuilder,
)
class ColumnValuesToBeNotInSetBuilder(BaseColumnTestBuilder):
"""Builder for `expect_column_values_to_not_be_in_set` GE expectation"""
def _build_test(self) -> CreateColumnTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=columnValuesToBeNotInSet.ColumnValuesToBeNotInSet(
forbiddenValues=self.result["expectation_config"]["kwargs"][
"value_set"
],
),
test_type=ColumnTestType.columnValuesToBeNotInSet,
)

View File

@ -1,31 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.tests.column import columnValuesToBeNotNull
from metadata.generated.schema.tests.columnTest import ColumnTestType
from metadata.great_expectations.builders.column.base_column_test_builder import (
BaseColumnTestBuilder,
)
class ColumnValuesToBeNotNullBuilder(BaseColumnTestBuilder):
"""Builder for `expect_column_values_to_not_be_null` GE expectation"""
def _build_test(self) -> CreateColumnTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=columnValuesToBeNotNull.ColumnValuesToBeNotNull(),
test_type=ColumnTestType.columnValuesToBeNotNull,
)

View File

@ -1,31 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.tests.column import columnValuesToBeUnique
from metadata.generated.schema.tests.columnTest import ColumnTestType
from metadata.great_expectations.builders.column.base_column_test_builder import (
BaseColumnTestBuilder,
)
class ColumnValuesToBeUniqueBuilder(BaseColumnTestBuilder):
"""Builder for `expect_column_values_to_be_unique` GE expectation"""
def _build_test(self) -> CreateColumnTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=columnValuesToBeUnique.ColumnValuesToBeUnique(),
test_type=ColumnTestType.columnValuesToBeUnique,
)

View File

@ -1,33 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.tests.column import columnValuesToMatchRegex
from metadata.generated.schema.tests.columnTest import ColumnTestType
from metadata.great_expectations.builders.column.base_column_test_builder import (
BaseColumnTestBuilder,
)
class ColumnValuesToMatchRegexBuilder(BaseColumnTestBuilder):
"""Builder for `expect_column_values_to_match_regex` GE expectation"""
def _build_test(self) -> CreateColumnTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=columnValuesToMatchRegex.ColumnValuesToMatchRegex(
regex=self.result["expectation_config"]["kwargs"]["regex"],
),
test_type=ColumnTestType.columnValuesToMatchRegex,
)

View File

@ -1,28 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generic test case builder"""
class GenericTestCaseBuilder:
"""Generic TestCase builder to create test case entity
Attributes:
test_case_builder: Specific builder for the GE expectation
"""
def __init__(self, *, test_case_builder):
self.test_case_builder = test_case_builder
def build_test_from_builder(self):
"""Main method to build the test case entity
and send the results to OMeta
"""
self.test_case_builder.add_test()

View File

@ -1,84 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Supported GE tests for Open Metadata"""
from enum import Enum
from metadata.great_expectations.builders.column.value_max_to_be_between import (
ColumnValueMaxToBeBetweenBuilder,
)
from metadata.great_expectations.builders.column.value_min_to_be_between import (
ColumnValueMinToBeBetweenBuilder,
)
from metadata.great_expectations.builders.column.values_lengths_to_be_between import (
ColumnValuesLengthsToBeBetweenBuilder,
)
from metadata.great_expectations.builders.column.values_sum_to_be_between import (
ColumnValueSumToBeBetweenBuilder,
)
from metadata.great_expectations.builders.column.values_to_be_between import (
ColumnValuesToBeBetweenBuilder,
)
from metadata.great_expectations.builders.column.values_to_be_in_set import (
ColumnValuesToBeInSetBuilder,
)
from metadata.great_expectations.builders.column.values_to_be_not_in_set import (
ColumnValuesToBeNotInSetBuilder,
)
from metadata.great_expectations.builders.column.values_to_be_not_null import (
ColumnValuesToBeNotNullBuilder,
)
from metadata.great_expectations.builders.column.values_to_be_unique import (
ColumnValuesToBeUniqueBuilder,
)
from metadata.great_expectations.builders.column.values_to_match_regex import (
ColumnValuesToMatchRegexBuilder,
)
from metadata.great_expectations.builders.table.column_count_to_be_between import (
TableColumnCountToBeBetweenBuilder,
)
from metadata.great_expectations.builders.table.column_count_to_equal import (
TableColumCountToEqualBuilder,
)
from metadata.great_expectations.builders.table.column_name_to_exist import (
TableColumnNameToExistBuilder,
)
from metadata.great_expectations.builders.table.column_name_to_match_set import (
TableColumnNameToMatchSetBuilder,
)
from metadata.great_expectations.builders.table.row_count_to_be_between import (
TableRowCountToBeBetweenBuilder,
)
from metadata.great_expectations.builders.table.row_count_to_equal import (
TableRowCountToEqualBuilder,
)
class SupportedGETests(Enum):
"""list of supported GE test OMeta builders"""
# pylint: disable=invalid-name
expect_table_column_count_to_equal = TableColumCountToEqualBuilder()
expect_table_row_count_to_be_between = TableRowCountToBeBetweenBuilder()
expect_table_row_count_to_equal = TableRowCountToEqualBuilder()
expect_column_value_lengths_to_be_between = ColumnValuesLengthsToBeBetweenBuilder()
expect_column_values_to_be_between = ColumnValuesToBeBetweenBuilder()
expect_column_values_to_not_be_in_set = ColumnValuesToBeNotInSetBuilder()
expect_column_values_to_not_be_null = ColumnValuesToBeNotNullBuilder()
expect_column_values_to_be_unique = ColumnValuesToBeUniqueBuilder()
expect_column_values_to_match_regex = ColumnValuesToMatchRegexBuilder()
expect_table_column_count_to_be_between = TableColumnCountToBeBetweenBuilder()
expect_column_to_exist = TableColumnNameToExistBuilder()
expect_table_columns_to_match_set = TableColumnNameToMatchSetBuilder()
expect_column_values_to_be_in_set = ColumnValuesToBeInSetBuilder()
expect_column_max_to_be_between = ColumnValueMaxToBeBetweenBuilder()
expect_column_min_to_be_between = ColumnValueMinToBeBetweenBuilder()
expect_column_sum_to_be_between = ColumnValueSumToBeBetweenBuilder()

View File

@ -1,111 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base table test builder handlers
"""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Dict
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.tableTest import TableTestCase
from metadata.generated.schema.type.basic import Timestamp
from metadata.ingestion.ometa.ometa_api import OpenMetadata
class BaseTableTestBuilder(ABC):
"""Base class for the table test builder. This is used to send
Great Expectations test results to OMeta
"""
def __init__(self):
self.result = None
self.ometa_conn = None
self.table_entity = None
self.timestamp = None
def __call__(
self, result: Dict, ometa_conn: OpenMetadata, table_entity: Table
) -> None:
"""Used to update instance attribute value as instance builders
are only defined once in the Enum class
Args:
result: single result for a GE test result
ometa_conn: OMeta API connection
table_entity: table entity for the test
"""
self.result = result
self.ometa_conn = ometa_conn
self.table_entity = table_entity
self.timestamp = Timestamp(__root__=int(int(datetime.now().timestamp())))
def add_test(self) -> None:
"""Send an API request to add a test result to a table entity"""
self.ometa_conn.add_table_test(self.table_entity, self._build_test())
@staticmethod
def build_test_case(config, test_type) -> TableTestCase:
"""Build test case based on the test type
Args:
config: any instance of a column test case
test_type: any instance of a column test type
Return:
ColumnTestCase
"""
return TableTestCase(
config=config,
tableTestType=test_type,
)
def build_test_case_results(self) -> TestCaseResult:
"""Build test case result base on GE test result"""
return TestCaseResult(
timestamp=self.timestamp,
testCaseStatus=TestCaseStatus.Success
if self.result["success"]
else TestCaseStatus.Failed,
result=self._get_expectation_result(),
)
def _get_expectation_result(self):
"""Get the expectation result"""
if self.result["result"]:
if isinstance(self.result["result"]["observed_value"], list):
return ", ".join(self.result["result"].get("observed_value"))
return self.result["result"]["observed_value"]
return None
def build_test_request(self, *, config, test_type) -> CreateTableTestRequest:
"""Build a test case request to add the test to the tabe
Args:
test_case: test case
test_case_result: a test case result
Return:
CreateColumnTestRequest
"""
return CreateTableTestRequest(
testCase=self.build_test_case(config=config, test_type=test_type),
result=self.build_test_case_results(),
updatedAt=self.timestamp,
)
@abstractmethod
def _build_test(self) -> CreateTableTestRequest:
"""Used to create the table test request for the specific test.
Needs to be implemented by the specific builder.
"""

View File

@ -1,34 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder for table column count
"""
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.tests.table import tableColumnCountToBeBetween
from metadata.generated.schema.tests.tableTest import TableTestType
from metadata.great_expectations.builders.table.base_table_test_builders import (
BaseTableTestBuilder,
)
class TableColumnCountToBeBetweenBuilder(BaseTableTestBuilder):
"""Builder for `expect_table_row_count_to_be_between` GE expectation"""
def _build_test(self) -> CreateTableTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=tableColumnCountToBeBetween.TableColumnCountToBeBetween(
minColValue=self.result["expectation_config"]["kwargs"]["min_value"],
maxColValue=self.result["expectation_config"]["kwargs"]["max_value"],
),
test_type=TableTestType.tableColumnCountToBeBetween,
)

View File

@ -1,33 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.tests.table import tableColumnCountToEqual
from metadata.generated.schema.tests.tableTest import TableTestType
from metadata.great_expectations.builders.table.base_table_test_builders import (
BaseTableTestBuilder,
)
class TableColumCountToEqualBuilder(BaseTableTestBuilder):
"""Builder for `expect_table_column_count_to_equal` GE expectation"""
def _build_test(self) -> CreateTableTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=tableColumnCountToEqual.TableColumnCountToEqual(
columnCount=self.result["expectation_config"]["kwargs"]["value"]
),
test_type=TableTestType.tableColumnCountToEqual,
)

View File

@ -1,33 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder for table column count
"""
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.tests.table import tableColumnNameToExist
from metadata.generated.schema.tests.tableTest import TableTestType
from metadata.great_expectations.builders.table.base_table_test_builders import (
BaseTableTestBuilder,
)
class TableColumnNameToExistBuilder(BaseTableTestBuilder):
"""Builder for `expect_table_row_count_to_be_between` GE expectation"""
def _build_test(self) -> CreateTableTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=tableColumnNameToExist.TableColumnNameToExist(
columnName=self.result["expectation_config"]["kwargs"]["column"],
),
test_type=TableTestType.tableColumnNameToExist,
)

View File

@ -1,37 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder for table column count
"""
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.tests.table import tableColumnToMatchSet
from metadata.generated.schema.tests.tableTest import TableTestType
from metadata.great_expectations.builders.table.base_table_test_builders import (
BaseTableTestBuilder,
)
class TableColumnNameToMatchSetBuilder(BaseTableTestBuilder):
"""Builder for `expect_table_row_count_to_be_between` GE expectation"""
def _build_test(self) -> CreateTableTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=tableColumnToMatchSet.TableColumnToMatchSet(
columnNames=",".join(
self.result["expectation_config"]["kwargs"]["column_set"]
),
ordered=self.result["expectation_config"]["kwargs"].get("exact_match")
or False,
),
test_type=TableTestType.tableColumnToMatchSet,
)

View File

@ -1,34 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.tests.table import tableRowCountToBeBetween
from metadata.generated.schema.tests.tableTest import TableTestType
from metadata.great_expectations.builders.table.base_table_test_builders import (
BaseTableTestBuilder,
)
class TableRowCountToBeBetweenBuilder(BaseTableTestBuilder):
"""Builder for `expect_table_row_count_to_be_between` GE expectation"""
def _build_test(self) -> CreateTableTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=tableRowCountToBeBetween.TableRowCountToBeBetween(
minValue=self.result["expectation_config"]["kwargs"]["min_value"],
maxValue=self.result["expectation_config"]["kwargs"]["max_value"],
),
test_type=TableTestType.tableRowCountToBeBetween,
)

View File

@ -1,33 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase builder
"""
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.tests.table import tableRowCountToEqual
from metadata.generated.schema.tests.tableTest import TableTestType
from metadata.great_expectations.builders.table.base_table_test_builders import (
BaseTableTestBuilder,
)
class TableRowCountToEqualBuilder(BaseTableTestBuilder):
"""Builder for `expect_table_row_count_to_equal` GE expectation"""
def _build_test(self) -> CreateTableTestRequest:
"""Specific test builder for the test"""
return self.build_test_request(
config=tableRowCountToEqual.TableRowCountToEqual(
value=self.result["expectation_config"]["kwargs"]["value"],
),
test_type=TableTestType.tableRowCountToEqual,
)

View File

@ -78,10 +78,10 @@ def render_template(environment: Environment, template_file: str = "config.yml")
try:
tmplt = environment.get_template("config.yaml")
return tmplt.render()
except TemplateNotFound as err:
except TemplateNotFound as exc:
raise TemplateNotFound(
f"Config file at {environment.loader.searchpath} not found"
) from err
) from exc
def create_ometa_connection_obj(config: str) -> OpenMetadataConnection:

View File

@ -14,6 +14,9 @@ Mixin class containing Tests specific methods
To be used by OpenMetadata class
"""
from datetime import datetime, timezone
from typing import List, Optional
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestDefinition import (
CreateTestDefinitionRequest,
@ -21,9 +24,17 @@ from metadata.generated.schema.api.tests.createTestDefinition import (
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.generated.schema.tests.testDefinition import (
EntityType,
TestCaseParameterDefinition,
TestDefinition,
TestPlatform,
)
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.ingestion.ometa.client import REST
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
class OMetaTestsMixin:
@ -38,12 +49,145 @@ class OMetaTestsMixin:
def add_test_case_results(
self,
test_results: TestCaseResult,
test_case_name: str,
test_case_fqn: str,
):
"""Create test definition"""
"""Add test case results to a test case
Args:
test_results (TestCaseResult): test case results to pass to the test case
test_case_fqn (str): test case fqn
Returns:
_type_: _description_
"""
resp = self.client.put(
f"{self.get_suffix(TestCase)}/{test_case_name}/testCaseResult",
f"{self.get_suffix(TestCase)}/{test_case_fqn}/testCaseResult",
test_results.json(),
)
return resp
def get_or_create_test_suite(
self,
test_suite_name: str,
test_suite_description: Optional[
str
] = f"Test Suite created on {datetime.now(timezone.utc).strftime('%Y-%m-%d')}",
) -> TestSuite:
"""Get or create a TestSuite
Args:
test_suite_name (str): test suite name
test_suite_description (Optional[str], optional): test suite description. Defaults to f"Test Suite created on {datetime.now(timezone.utc).strftime('%Y-%m-%d')}".
Returns:
TestSuite:
"""
test_suite = self.get_by_name(
entity=TestSuite,
fqn=test_suite_name,
)
if test_suite:
return test_suite
logger.info(
f"TestSuite {test_suite_name} not found. Creating new TestSuite: {test_suite_name}"
)
return self.create_or_update(
CreateTestSuiteRequest(
name=test_suite_name,
description=test_suite_description,
)
)
def get_or_create_test_definition(
self,
test_definition_fqn: str,
test_definition_description: Optional[str] = None,
entity_type: Optional[EntityType] = None,
test_platforms: Optional[List[TestPlatform]] = None,
test_case_parameter_definition: Optional[
List[TestCaseParameterDefinition]
] = None,
) -> TestDefinition:
"""Get or create a test definition
Args:
test_definition_fqn (str): test definition fully qualified name
test_definition_description (Optional[str], optional): description for the test definition. Defaults to None.
entity_type (Optional[EntityType], optional): entity type (COLUMN or TABLE). Defaults to None.
test_platforms (Optional[List[TestPlatform]], optional): test platforms. Defaults to None.
test_case_parameter_definition (Optional[List[TestCaseParameterDefinition]], optional): parameters for the test case defintion. Defaults to None.
Returns:
TestDefinition: a test definition object
"""
test_definition = self.get_by_name(
entity=TestDefinition,
fqn=test_definition_fqn,
)
if test_definition:
return test_definition
logger.info(
f"TestDefinition {test_definition_fqn} not found. Creating new TestDefinition: {test_definition_fqn}"
)
return self.create_or_update(
CreateTestDefinitionRequest(
name=test_definition_fqn,
description=test_definition_description,
entityType=entity_type,
testPlatforms=test_platforms,
parameterDefinition=test_case_parameter_definition,
)
)
def get_or_create_test_case(
self,
test_case_fqn: str,
entity_link: Optional[str] = None,
test_suite_fqn: Optional[str] = None,
test_definition_fqn: Optional[str] = None,
test_case_parameter_values: Optional[str] = None,
):
"""Get or create a test case
Args:
test_case_fqn (str): fully qualified name for the test
entity_link (Optional[str], optional): _description_. Defaults to None.
test_suite_fqn (Optional[str], optional): _description_. Defaults to None.
test_definition_fqn (Optional[str], optional): _description_. Defaults to None.
test_case_parameter_values (Optional[str], optional): _description_. Defaults to None.
Returns:
_type_: _description_
"""
test_case = self.get_by_name(entity=TestCase, fqn=test_case_fqn, fields=["*"])
if test_case:
return test_case
logger.info(
f"TestCase {test_case_fqn} not found. Creating TestCase {test_case_fqn}"
)
test_case = self.create_or_update(
CreateTestCaseRequest(
name=test_case_fqn.split(".")[-1],
entityLink=entity_link,
testSuite=self.get_entity_reference(
entity=TestSuite,
fqn=test_suite_fqn,
),
testDefinition=self.get_entity_reference(
entity=TestDefinition,
fqn=test_definition_fqn,
),
parameterValues=test_case_parameter_values,
)
)
return test_case

View File

@ -476,7 +476,6 @@ class OpenMetadata(
We PUT to the endpoint and return the Entity generated result
"""
entity = data.__class__
is_create = "create" in data.__class__.__name__.lower()
@ -487,7 +486,6 @@ class OpenMetadata(
raise InvalidEntityException(
f"PUT operations need a CrateEntity, not {entity}"
)
resp = self.client.put(
self.get_suffix(entity), data=data.json(encoder=show_secrets_encoder)
)

View File

@ -374,7 +374,7 @@ class DBTMixin:
)
self.metadata.add_test_case_results(
test_results=test_case_result,
test_case_name=test_case_fqn,
test_case_fqn=test_case_fqn,
)
except Exception as err: # pylint: disable=broad-except
logger.error(f"Failed capture tests results {err}")

View File

@ -378,7 +378,7 @@ class TestSuiteWorkflow:
logger.warning(
f"Couldn't create test case name {test_case_name_to_create}: {exc}"
)
logger.debug(traceback.format_exc(exc))
logger.debug(traceback.format_exc())
return created_test_case
@ -417,12 +417,12 @@ class TestSuiteWorkflow:
)
self.status.processed(test_case.fullyQualifiedName.__root__)
except Exception as exc:
logger.debug(traceback.format_exc(exc))
logger.debug(traceback.format_exc())
logger.warning(
f"Could not run test case {test_case.name}: {exc}"
)
except TypeError as exc:
logger.debug(traceback.format_exc(exc))
logger.debug(traceback.format_exc())
logger.warning(f"Could not run test case {test_case.name}: {exc}")
self.status.failure(test_case.fullyQualifiedName.__root__)

View File

@ -70,7 +70,7 @@ class MetadataRestSink(Sink[Entity]):
try:
self.metadata.add_test_case_results(
test_results=record.testCaseResult,
test_case_name=record.testCase.fullyQualifiedName.__root__,
test_case_fqn=record.testCase.fullyQualifiedName.__root__,
)
logger.info(
f"Successfully ingested test case results for test case {record.testCase.name.__root__}"

View File

@ -14,6 +14,7 @@ Filter information has been taken from the
ES indexes definitions
"""
import re
from collections import namedtuple
from typing import Dict, List, Optional, Type, TypeVar, Union
from antlr4.CommonTokenStream import CommonTokenStream
@ -356,3 +357,28 @@ def split_table_name(table_name: str) -> Dict[str, Optional[str]]:
database, database_schema, table = full_details
return {"database": database, "database_schema": database_schema, "table": table}
def split_test_case_fqn(test_case_fqn: str) -> Dict[str, Optional[str]]:
"""given a test case fqn split each element
Args:
test_case_fqn (str): test case fqn
Returns:
Dict[str, Optional[str]]:
"""
SplitTestCaseFqn = namedtuple(
"SplitTestCaseFqn", "service database schema table column test_case"
)
details = split(test_case_fqn)
if len(details) < 5:
raise ValueError(
f"{test_case_fqn} does not appear to be a valid test_case fqn "
)
if len(details) != 6:
details.insert(4, None)
service, database, schema, table, column, test_case = details
return SplitTestCaseFqn(service, database, schema, table, column, test_case)

View File

@ -0,0 +1,125 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenMetadata API test suite mixin test
"""
from unittest import TestCase
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestDefinition import (
CreateTestDefinitionRequest,
)
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.tests.testCase import TestCase as OMetaTestCase
from metadata.generated.schema.tests.testCase import TestCaseParameterValue
from metadata.generated.schema.tests.testDefinition import (
EntityType,
TestCaseParameterDefinition,
TestDefinition,
TestPlatform,
)
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.ingestion.ometa.ometa_api import OpenMetadata
class OMetaTestSuiteTest(TestCase):
"""
Run this integration test with the local API available
Install the ingestion package before running the tests
"""
service_entity_id = None
server_config = OpenMetadataConnection(hostPort="http://localhost:8585/api")
metadata = OpenMetadata(server_config)
assert metadata.health_check()
test_suite: TestSuite = metadata.create_or_update(
CreateTestSuiteRequest(
name="testSuiteForIntegrationTest",
description="This is a test suite for the integration tests",
)
)
test_definition = metadata.create_or_update(
CreateTestDefinitionRequest(
name="testDefinitionForIntegration",
description="this is a test definition for integration tests",
entityType=EntityType.TABLE,
testPlatforms=[TestPlatform.GreatExpectations],
parameterDefinition=[TestCaseParameterDefinition(name="foo")],
)
)
@classmethod
def setUpClass(cls) -> None:
"""set up tests"""
cls.metadata.create_or_update(
CreateTestCaseRequest(
name="testCaseForIntegration",
entityLink="<#E::table::sample_data.ecommerce_db.shopify.dim_address>",
testSuite=cls.metadata.get_entity_reference(
entity=TestSuite,
fqn=cls.test_suite.fullyQualifiedName.__root__,
),
testDefinition=cls.metadata.get_entity_reference(
entity=TestDefinition,
fqn=cls.test_definition.fullyQualifiedName.__root__,
),
parameterValues=[TestCaseParameterValue(name="foo", value=10)],
)
)
def test_get_or_create_test_suite(self):
"""test we get a test suite object"""
test_suite = self.metadata.get_or_create_test_suite(
"testSuiteForIntegrationTest"
)
assert test_suite.name.__root__ == "testSuiteForIntegrationTest"
assert isinstance(test_suite, TestSuite)
def test_get_or_create_test_definition(self):
"""test we get a test definition object"""
test_definition = self.metadata.get_or_create_test_definition(
"testDefinitionForIntegration"
)
assert test_definition.name.__root__ == "testDefinitionForIntegration"
assert isinstance(test_definition, TestDefinition)
def test_get_or_create_test_case(self):
"""test we get a test case object"""
test_case = self.metadata.get_or_create_test_case(
"sample_data.ecommerce_db.shopify.dim_address.testCaseForIntegration"
)
assert test_case.name.__root__ == "testCaseForIntegration"
assert isinstance(test_case, OMetaTestCase)
@classmethod
def tearDownClass(cls) -> None:
cls.metadata.delete(
entity=TestSuite,
entity_id=cls.test_suite.id,
recursive=True,
hard_delete=True,
)
cls.metadata.delete(
entity=TestDefinition,
entity_id=cls.test_definition.id,
recursive=True,
hard_delete=True,
)

View File

@ -18,12 +18,6 @@ from unittest import mock
from pytest import fixture
from metadata.great_expectations.action import OpenMetadataValidationAction
from metadata.great_expectations.builders.column.base_column_test_builder import (
BaseColumnTestBuilder,
)
from metadata.great_expectations.builders.table.base_table_test_builders import (
BaseTableTestBuilder,
)
from metadata.great_expectations.utils.ometa_config_handler import (
create_jinja_environment,
)
@ -71,16 +65,6 @@ def mocked_ge_data_context():
yield mocked_data_context
@fixture(scope="module")
def mocked_base_column_builder():
class MockedBaseColumnBuilder(BaseColumnTestBuilder):
def _build_test(self):
...
instance = MockedBaseColumnBuilder()
return instance
@fixture(scope="module")
def mocked_ge_column_result():
return {
@ -98,16 +82,6 @@ def mocked_ge_column_result():
}
@fixture(scope="module")
def mocked_base_table_builder():
class MockedBaseTableBuilder(BaseTableTestBuilder):
def _build_test(self):
...
instance = MockedBaseTableBuilder()
return instance
@fixture(scope="module")
def mocked_ge_table_result():
return {

View File

@ -1,152 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test suite for column builder
"""
from pytest import mark
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.tests.basic import TestCaseResult as _TestCaseResult
from metadata.generated.schema.tests.column import columnValuesToBeNotNull
from metadata.generated.schema.tests.columnTest import ColumnTestCase, ColumnTestType
from metadata.great_expectations.builders.column.values_lengths_to_be_between import (
ColumnValuesLengthsToBeBetweenBuilder,
)
from metadata.great_expectations.builders.column.values_to_be_between import (
ColumnValuesToBeBetweenBuilder,
)
from metadata.great_expectations.builders.column.values_to_be_not_in_set import (
ColumnValuesToBeNotInSetBuilder,
)
from metadata.great_expectations.builders.column.values_to_be_not_null import (
ColumnValuesToBeNotNullBuilder,
)
from metadata.great_expectations.builders.column.values_to_be_unique import (
ColumnValuesToBeUniqueBuilder,
)
from metadata.great_expectations.builders.column.values_to_match_regex import (
ColumnValuesToMatchRegexBuilder,
)
def test_base_column_builder_attributes_none(mocked_base_column_builder):
"""Test correct attributes are set for the base builder class"""
assert mocked_base_column_builder.result is None
assert mocked_base_column_builder.ometa_conn is None
assert mocked_base_column_builder.table_entity is None
assert mocked_base_column_builder.timestamp is None
@mark.parametrize(
"values",
[
{"result": {"foo": "bar"}, "ometa_conn": "OMetaConn", "table_entity": "Table"},
{
"result": {"bar": "baz"},
"ometa_conn": "DummyConn",
"table_entity": "DummyTable",
},
],
)
def test_base_column_builder_attributes_not_none(mocked_base_column_builder, values):
"""Test attributes are set correctly using __call__ method"""
mocked_base_column_builder(
result=values["result"],
ometa_conn=values["ometa_conn"],
table_entity=values["table_entity"],
)
assert mocked_base_column_builder.result == values["result"]
assert mocked_base_column_builder.ometa_conn == values["ometa_conn"]
assert mocked_base_column_builder.table_entity == values["table_entity"]
assert mocked_base_column_builder.timestamp is not None
def test_base_column_builder_attributes_list(mocked_base_column_builder):
"""Test base column builder has the expected attributes"""
expected_attributes = {"result", "ometa_conn", "table_entity", "timestamp"}
assert expected_attributes.issubset(mocked_base_column_builder.__dict__.keys())
def test_base_column_builder_build_test_case(mocked_base_column_builder):
"""Test test case builder"""
test_case = mocked_base_column_builder.build_test_case(
config=columnValuesToBeNotNull.ColumnValuesToBeNotNull(),
test_type=ColumnTestType.columnValuesToBeNotNull,
)
assert isinstance(test_case, ColumnTestCase)
test_case.config
test_case.columnTestType
def test_base_column_builder_build_test_case_results(
mocked_base_column_builder, mocked_ge_column_result
):
"""Test test case results are built as expected"""
mocked_base_column_builder(
result=mocked_ge_column_result,
ometa_conn="OMetaConnection",
table_entity="TableEntity",
)
test_case_result = mocked_base_column_builder.build_test_case_results()
assert isinstance(test_case_result, _TestCaseResult)
test_case_result.timestamp
test_case_result.testCaseStatus
test_case_result.result
def test_base_column_builder_build_test_request(
mocked_base_column_builder, mocked_ge_column_result
):
"""Test CreateTestRequest is built as expected"""
mocked_base_column_builder(
result=mocked_ge_column_result,
ometa_conn="OMetaConnection",
table_entity="TableEntity",
)
column_test_request = mocked_base_column_builder.build_test_request(
config=columnValuesToBeNotNull.ColumnValuesToBeNotNull(),
test_type=ColumnTestType.columnValuesToBeNotNull,
)
assert isinstance(column_test_request, CreateColumnTestRequest)
column_test_request.columnName
column_test_request.testCase
column_test_request.result
column_test_request.updatedAt
@mark.parametrize(
"builder",
[
ColumnValuesToBeUniqueBuilder(),
ColumnValuesToBeBetweenBuilder(),
ColumnValuesToBeNotNullBuilder(),
ColumnValuesToBeNotInSetBuilder(),
ColumnValuesToMatchRegexBuilder(),
ColumnValuesLengthsToBeBetweenBuilder(),
],
)
def test_column_custom_builders(mocked_ge_column_result, builder):
"""Test custom builders"""
builder(
result=mocked_ge_column_result,
ometa_conn="OMetaConnection",
table_entity="TableEntity",
)
builder_test_request = builder._build_test()
assert isinstance(builder_test_request, CreateColumnTestRequest)

View File

@ -1,139 +0,0 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test suite for table builders
"""
from pytest import mark
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.tests.basic import TestCaseResult as _TestCaseResult
from metadata.generated.schema.tests.table import tableColumnCountToEqual
from metadata.generated.schema.tests.tableTest import TableTestCase, TableTestType
from metadata.great_expectations.builders.table.column_count_to_equal import (
TableColumCountToEqualBuilder,
)
from metadata.great_expectations.builders.table.row_count_to_be_between import (
TableRowCountToBeBetweenBuilder,
)
from metadata.great_expectations.builders.table.row_count_to_equal import (
TableRowCountToEqualBuilder,
)
def test_base_table_builder_attributes_none(mocked_base_table_builder):
"""Test correct attributes are set for the base builder class"""
assert mocked_base_table_builder.result is None
assert mocked_base_table_builder.ometa_conn is None
assert mocked_base_table_builder.table_entity is None
assert mocked_base_table_builder.timestamp is None
@mark.parametrize(
"values",
[
{"result": {"foo": "bar"}, "ometa_conn": "OMetaConn", "table_entity": "Table"},
{
"result": {"bar": "baz"},
"ometa_conn": "DummyConn",
"table_entity": "DummyTable",
},
],
)
def test_base_table_builder_attributes_not_none(mocked_base_table_builder, values):
"""Test attributes are set correctly using __call__ method"""
mocked_base_table_builder(
result=values["result"],
ometa_conn=values["ometa_conn"],
table_entity=values["table_entity"],
)
assert mocked_base_table_builder.result == values["result"]
assert mocked_base_table_builder.ometa_conn == values["ometa_conn"]
assert mocked_base_table_builder.table_entity == values["table_entity"]
assert mocked_base_table_builder.timestamp is not None
def test_base_table_builder_attributes_list(mocked_base_table_builder):
"""Test base table builder has the expected attributes"""
expected_attributes = {"result", "ometa_conn", "table_entity", "timestamp"}
assert expected_attributes.issubset(mocked_base_table_builder.__dict__.keys())
def test_base_table_builder_build_test_case(mocked_base_table_builder):
"""Test test case builder"""
test_case = mocked_base_table_builder.build_test_case(
config=tableColumnCountToEqual.TableColumnCountToEqual(columnCount=10),
test_type=TableTestType.tableColumnCountToEqual,
)
assert isinstance(test_case, TableTestCase)
test_case.config
test_case.tableTestType
def test_base_table_builder_build_test_case_results(
mocked_base_table_builder, mocked_ge_table_result
):
"""Test test case results are built as expected"""
mocked_base_table_builder(
result=mocked_ge_table_result,
ometa_conn="OMetaConnection",
table_entity="TableEntity",
)
test_case_result = mocked_base_table_builder.build_test_case_results()
assert isinstance(test_case_result, _TestCaseResult)
test_case_result.timestamp
test_case_result.testCaseStatus
test_case_result.result
def test_base_table_builder_build_test_request(
mocked_base_table_builder, mocked_ge_table_result
):
"""Test CreateTestRequest is built as expected"""
mocked_base_table_builder(
result=mocked_ge_table_result,
ometa_conn="OMetaConnection",
table_entity="TableEntity",
)
table_test_request = mocked_base_table_builder.build_test_request(
config=tableColumnCountToEqual.TableColumnCountToEqual(columnCount=10),
test_type=TableTestType.tableColumnCountToEqual,
)
assert isinstance(table_test_request, CreateTableTestRequest)
table_test_request.testCase
table_test_request.result
table_test_request.updatedAt
@mark.parametrize(
"builder",
[
TableRowCountToEqualBuilder(),
TableColumCountToEqualBuilder(),
TableRowCountToBeBetweenBuilder(),
],
)
def test_table_custom_builders(mocked_ge_table_result, builder):
"""Test custom builders"""
builder(
result=mocked_ge_table_result,
ometa_conn="OMetaConnection",
table_entity="TableEntity",
)
builder_test_request = builder._build_test()
assert isinstance(builder_test_request, CreateTableTestRequest)

View File

@ -13,6 +13,8 @@ Test FQN build behavior
"""
from unittest import TestCase
import pytest
from metadata.generated.schema.entity.data.table import Table
from metadata.utils import fqn
@ -117,3 +119,26 @@ class TestFqn(TestCase):
table_name="table",
)
self.assertEqual(table_fqn_space, "service.data base.schema.table")
def test_split_test_case_fqn(self):
"""test for split test case"""
split_fqn = fqn.split_test_case_fqn(
"local_redshift.dev.dbt_jaffle.customers.customer_id.expect_column_max_to_be_between"
)
assert split_fqn.service == "local_redshift"
assert split_fqn.database == "dev"
assert split_fqn.schema == "dbt_jaffle"
assert split_fqn.table == "customers"
assert split_fqn.column == "customer_id"
assert split_fqn.test_case == "expect_column_max_to_be_between"
split_fqn = fqn.split_test_case_fqn(
"local_redshift.dev.dbt_jaffle.customers.expect_table_column_to_be_between"
)
assert not split_fqn.column
assert split_fqn.test_case == "expect_table_column_to_be_between"
with pytest.raises(ValueError):
fqn.split_test_case_fqn("local_redshift.dev.dbt_jaffle.customers")