OpenMetadata/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py
Teddy 209793f315
MINOR - Add support for GX 1.4 (#20934)
* fix: add support for GX 0.18.22 and GX 1.4.x

* fix: add  support for GX 0.18.22 and GX 1.4.x

* style: ran python linting

* fix: skip test if GX version is not installed
2025-04-24 11:55:04 +02:00

247 lines
8.6 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate great expectation integration
"""
import logging
import os
import subprocess
import sys
from datetime import datetime, timedelta
from unittest import TestCase
from sqlalchemy import Column, DateTime, Integer, String, create_engine
from sqlalchemy.orm import declarative_base
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.ingestion.connections.session import create_and_bind_session
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
from metadata.workflow.metadata import MetadataWorkflow
Base = declarative_base()
TEST_CASE_FQN = (
"test_sqlite.default.main.users.name.expect_column_values_to_not_be_null"
)
SQLLITE_SHARD = "file:cachedb?mode=memory&cache=shared&check_same_thread=False"
LOGGER = logging.getLogger(__name__)
WORKFLOW_CONFIG = {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
}
INGESTION_CONFIG = {
"source": {
"type": "sqlite",
"serviceName": "test_sqlite",
"serviceConnection": {
"config": {
"type": "SQLite",
"databaseMode": SQLLITE_SHARD,
"database": "default",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
**WORKFLOW_CONFIG,
},
}
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(256))
fullname = Column(String(256))
nickname = Column(String(256))
age = Column(Integer)
signedup = Column(DateTime)
class TestGreatExpectationIntegration(TestCase):
"""Test great expectation integration"""
engine = create_engine(
f"sqlite+pysqlite:///{SQLLITE_SHARD}",
)
session = create_and_bind_session(engine)
server_config = OpenMetadataConnection(
hostPort=WORKFLOW_CONFIG["openMetadataServerConfig"]["hostPort"],
authProvider=WORKFLOW_CONFIG["openMetadataServerConfig"]["authProvider"],
securityConfig=OpenMetadataJWTClientConfig(
jwtToken=WORKFLOW_CONFIG["openMetadataServerConfig"]["securityConfig"][
"jwtToken"
]
),
) # type: ignore
metadata = OpenMetadata(server_config)
@classmethod
def setUpClass(cls):
"""Set up class by ingesting metadata"""
try:
User.__table__.create(bind=cls.engine)
except Exception as exc:
LOGGER.warning(f"Table Already exists: {exc}")
data = [
User(
name="John",
fullname="John Doe",
nickname="johnny b goode",
age=30,
signedup=datetime.now() - timedelta(days=10),
),
User(
name="Jane",
fullname="Jone Doe",
nickname=None,
age=31,
signedup=datetime.now() - timedelta(days=2),
),
User(
name="Joh",
fullname="Joh Doe",
nickname=None,
age=37,
signedup=datetime.now() - timedelta(days=1),
),
User(
name="Jae",
fullname="Jae Doe",
nickname=None,
age=38,
signedup=datetime.now() - timedelta(days=1),
),
]
cls.session.add_all(data)
cls.session.commit()
ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.print_status()
ingestion_workflow.stop()
@classmethod
def tearDownClass(cls) -> None:
"""
Clean up
"""
service_id = str(
cls.metadata.get_by_name(entity=DatabaseService, fqn="test_sqlite").id.root
)
cls.metadata.delete(
entity=DatabaseService,
entity_id=service_id,
recursive=True,
hard_delete=True,
)
User.__table__.drop(bind=cls.engine)
cls.session.close()
def test_great_expectation_integration(self):
"""
Test great expectation integration
"""
self.install_gx_018x()
import great_expectations as gx
try:
self.assertTrue(gx.__version__.startswith("0.18."))
except AssertionError as exc:
# module versions are cached, so we need to skip the test if the version is not 0.18.x
# e.g. we run the 1.x.x test before this one, 0.18.x version will be cached and used here
# The test will run if we run this test alone without the 1.x.x test
self.skipTest(f"GX version is not 0.18.x: {exc}")
table_entity = self.metadata.get_by_name(
entity=Table,
fqn="test_sqlite.default.main.users",
fields=["testSuite"],
)
assert not table_entity.testSuite
# GE config file
ge_folder = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
)
ometa_config = os.path.join(ge_folder, "gx/ometa_config")
context = gx.get_context(project_root_dir=ge_folder)
checkpoint = context.get_checkpoint("sqlite")
# update our checkpoint file at runtime to dynamically pass the ometa config file
checkpoint.action_list[-1].update(
{
"name": "ometa_ingestion",
"action": {
"module_name": "metadata.great_expectations.action",
"class_name": "OpenMetadataValidationAction",
"config_file_path": ometa_config,
"database_service_name": "test_sqlite",
"database_name": "default",
"schema_name": "main",
},
}
)
# run the checkpoint
checkpoint.run()
table_entity = self.metadata.get_by_name(
entity=Table,
fqn="test_sqlite.default.main.users",
fields=["testSuite"],
)
assert table_entity.testSuite
test_suite: TestSuite = self.metadata.get_by_id(
entity=TestSuite, entity_id=table_entity.testSuite.id, fields=["tests"]
)
assert len(test_suite.tests) == 1
test_case_results = self.metadata.get_test_case_results(
test_case_fqn=TEST_CASE_FQN,
start_ts=get_beginning_of_day_timestamp_mill(),
end_ts=get_end_of_day_timestamp_mill(),
)
assert test_case_results
def install_gx_018x(self):
"""Install GX 0.18.x at runtime as we support 0.18.x and 1.x.x and setup will install 1 default version"""
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "great-expectations~=0.18.0"]
)