Teddy 209793f315
MINOR - Add support for GX 1.4 (#20934)
* fix: add support for GX 0.18.22 and GX 1.4.x

* fix: add  support for GX 0.18.22 and GX 1.4.x

* style: ran python linting

* fix: skip test if GX version is not installed
2025-04-24 11:55:04 +02:00

106 lines
2.8 KiB
Python

# Copyright 2022 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Fixtures for test suite
"""
import os
from unittest import mock
from pytest import fixture
from metadata.great_expectations.utils.ometa_config_handler import (
create_jinja_environment,
)
def mocked_ometa_object():
"""Mocked function for `_create_ometa_connection`."""
class FQDN:
def __init__(self):
self.root = "database.schema.table"
class Entity:
def __init__(self, _type):
self.fullyQualifiedName = FQDN() # pylint: disable=invalid-name
self._type = _type
class ListEntities:
entities = [Entity("list_entities")]
class OmetaMock:
def get_by_name(self, *args, **kwargs):
return Entity("get_by_name")
def list_entities(self, *args, **kwargs):
return ListEntities()
return OmetaMock()
@fixture(scope="module")
def mocked_ometa():
"""Mocks OMeta obkect"""
from metadata.great_expectations.action import OpenMetadataValidationAction
with mock.patch.object(
OpenMetadataValidationAction,
"_create_ometa_connection",
side_effect=mocked_ometa_object,
) as mocked_obj:
yield mocked_obj
@fixture(scope="module")
def mocked_ge_data_context():
with mock.patch("great_expectations.DataContext") as mocked_data_context:
yield mocked_data_context
@fixture(scope="module")
def mocked_ge_column_result():
return {
"success": True,
"expectation_config": {
"kwargs": {
"column": "my_column",
"regex": "abc.*",
"value_set": [1, 2],
"min_value": 10,
"max_value": 20,
}
},
"result": {"unexpected_percent": 0.0},
}
@fixture(scope="module")
def mocked_ge_table_result():
return {
"success": True,
"expectation_config": {
"kwargs": {
"min_value": 10,
"max_value": 10,
"value": 10,
}
},
"result": {"observed_value": 10},
}
@fixture(scope="module")
def fixture_jinja_environment():
return create_jinja_environment(
os.path.join(os.path.dirname(os.path.abspath(__file__)), "resources")
)