mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-23 08:28:10 +00:00

* fix: add support for GX 0.18.22 and GX 1.4.x * fix: add support for GX 0.18.22 and GX 1.4.x * style: ran python linting * fix: skip test if GX version is not installed
247 lines
8.6 KiB
Python
247 lines
8.6 KiB
Python
# Copyright 2025 Collate
|
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Validate great expectation integration
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from unittest import TestCase
|
|
|
|
from sqlalchemy import Column, DateTime, Integer, String, create_engine
|
|
from sqlalchemy.orm import declarative_base
|
|
|
|
from metadata.generated.schema.entity.data.table import Table
|
|
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
|
OpenMetadataConnection,
|
|
)
|
|
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
|
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
|
OpenMetadataJWTClientConfig,
|
|
)
|
|
from metadata.generated.schema.tests.testSuite import TestSuite
|
|
from metadata.ingestion.connections.session import create_and_bind_session
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
from metadata.utils.time_utils import (
|
|
get_beginning_of_day_timestamp_mill,
|
|
get_end_of_day_timestamp_mill,
|
|
)
|
|
from metadata.workflow.metadata import MetadataWorkflow
|
|
|
|
Base = declarative_base()
|
|
|
|
TEST_CASE_FQN = (
|
|
"test_sqlite.default.main.users.name.expect_column_values_to_not_be_null"
|
|
)
|
|
SQLLITE_SHARD = "file:cachedb?mode=memory&cache=shared&check_same_thread=False"
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
WORKFLOW_CONFIG = {
|
|
"openMetadataServerConfig": {
|
|
"hostPort": "http://localhost:8585/api",
|
|
"authProvider": "openmetadata",
|
|
"securityConfig": {
|
|
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
|
},
|
|
}
|
|
}
|
|
INGESTION_CONFIG = {
|
|
"source": {
|
|
"type": "sqlite",
|
|
"serviceName": "test_sqlite",
|
|
"serviceConnection": {
|
|
"config": {
|
|
"type": "SQLite",
|
|
"databaseMode": SQLLITE_SHARD,
|
|
"database": "default",
|
|
}
|
|
},
|
|
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
|
|
},
|
|
"sink": {"type": "metadata-rest", "config": {}},
|
|
"workflowConfig": {
|
|
**WORKFLOW_CONFIG,
|
|
},
|
|
}
|
|
|
|
|
|
class User(Base):
|
|
__tablename__ = "users"
|
|
id = Column(Integer, primary_key=True)
|
|
name = Column(String(256))
|
|
fullname = Column(String(256))
|
|
nickname = Column(String(256))
|
|
age = Column(Integer)
|
|
signedup = Column(DateTime)
|
|
|
|
|
|
class TestGreatExpectationIntegration(TestCase):
|
|
"""Test great expectation integration"""
|
|
|
|
engine = create_engine(
|
|
f"sqlite+pysqlite:///{SQLLITE_SHARD}",
|
|
)
|
|
session = create_and_bind_session(engine)
|
|
server_config = OpenMetadataConnection(
|
|
hostPort=WORKFLOW_CONFIG["openMetadataServerConfig"]["hostPort"],
|
|
authProvider=WORKFLOW_CONFIG["openMetadataServerConfig"]["authProvider"],
|
|
securityConfig=OpenMetadataJWTClientConfig(
|
|
jwtToken=WORKFLOW_CONFIG["openMetadataServerConfig"]["securityConfig"][
|
|
"jwtToken"
|
|
]
|
|
),
|
|
) # type: ignore
|
|
metadata = OpenMetadata(server_config)
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Set up class by ingesting metadata"""
|
|
try:
|
|
User.__table__.create(bind=cls.engine)
|
|
except Exception as exc:
|
|
LOGGER.warning(f"Table Already exists: {exc}")
|
|
|
|
data = [
|
|
User(
|
|
name="John",
|
|
fullname="John Doe",
|
|
nickname="johnny b goode",
|
|
age=30,
|
|
signedup=datetime.now() - timedelta(days=10),
|
|
),
|
|
User(
|
|
name="Jane",
|
|
fullname="Jone Doe",
|
|
nickname=None,
|
|
age=31,
|
|
signedup=datetime.now() - timedelta(days=2),
|
|
),
|
|
User(
|
|
name="Joh",
|
|
fullname="Joh Doe",
|
|
nickname=None,
|
|
age=37,
|
|
signedup=datetime.now() - timedelta(days=1),
|
|
),
|
|
User(
|
|
name="Jae",
|
|
fullname="Jae Doe",
|
|
nickname=None,
|
|
age=38,
|
|
signedup=datetime.now() - timedelta(days=1),
|
|
),
|
|
]
|
|
cls.session.add_all(data)
|
|
cls.session.commit()
|
|
|
|
ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG)
|
|
ingestion_workflow.execute()
|
|
ingestion_workflow.raise_from_status()
|
|
ingestion_workflow.print_status()
|
|
ingestion_workflow.stop()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls) -> None:
|
|
"""
|
|
Clean up
|
|
"""
|
|
|
|
service_id = str(
|
|
cls.metadata.get_by_name(entity=DatabaseService, fqn="test_sqlite").id.root
|
|
)
|
|
|
|
cls.metadata.delete(
|
|
entity=DatabaseService,
|
|
entity_id=service_id,
|
|
recursive=True,
|
|
hard_delete=True,
|
|
)
|
|
|
|
User.__table__.drop(bind=cls.engine)
|
|
cls.session.close()
|
|
|
|
def test_great_expectation_integration(self):
|
|
"""
|
|
Test great expectation integration
|
|
"""
|
|
self.install_gx_018x()
|
|
import great_expectations as gx
|
|
|
|
try:
|
|
self.assertTrue(gx.__version__.startswith("0.18."))
|
|
except AssertionError as exc:
|
|
# module versions are cached, so we need to skip the test if the version is not 0.18.x
|
|
# e.g. we run the 1.x.x test before this one, 0.18.x version will be cached and used here
|
|
# The test will run if we run this test alone without the 1.x.x test
|
|
self.skipTest(f"GX version is not 0.18.x: {exc}")
|
|
|
|
table_entity = self.metadata.get_by_name(
|
|
entity=Table,
|
|
fqn="test_sqlite.default.main.users",
|
|
fields=["testSuite"],
|
|
)
|
|
|
|
assert not table_entity.testSuite
|
|
|
|
# GE config file
|
|
ge_folder = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)),
|
|
)
|
|
ometa_config = os.path.join(ge_folder, "gx/ometa_config")
|
|
context = gx.get_context(project_root_dir=ge_folder)
|
|
checkpoint = context.get_checkpoint("sqlite")
|
|
# update our checkpoint file at runtime to dynamically pass the ometa config file
|
|
checkpoint.action_list[-1].update(
|
|
{
|
|
"name": "ometa_ingestion",
|
|
"action": {
|
|
"module_name": "metadata.great_expectations.action",
|
|
"class_name": "OpenMetadataValidationAction",
|
|
"config_file_path": ometa_config,
|
|
"database_service_name": "test_sqlite",
|
|
"database_name": "default",
|
|
"schema_name": "main",
|
|
},
|
|
}
|
|
)
|
|
# run the checkpoint
|
|
checkpoint.run()
|
|
|
|
table_entity = self.metadata.get_by_name(
|
|
entity=Table,
|
|
fqn="test_sqlite.default.main.users",
|
|
fields=["testSuite"],
|
|
)
|
|
|
|
assert table_entity.testSuite
|
|
test_suite: TestSuite = self.metadata.get_by_id(
|
|
entity=TestSuite, entity_id=table_entity.testSuite.id, fields=["tests"]
|
|
)
|
|
assert len(test_suite.tests) == 1
|
|
|
|
test_case_results = self.metadata.get_test_case_results(
|
|
test_case_fqn=TEST_CASE_FQN,
|
|
start_ts=get_beginning_of_day_timestamp_mill(),
|
|
end_ts=get_end_of_day_timestamp_mill(),
|
|
)
|
|
|
|
assert test_case_results
|
|
|
|
def install_gx_018x(self):
|
|
"""Install GX 0.18.x at runtime as we support 0.18.x and 1.x.x and setup will install 1 default version"""
|
|
subprocess.check_call(
|
|
[sys.executable, "-m", "pip", "install", "great-expectations~=0.18.0"]
|
|
)
|