MINOR - Add support for GX 1.4 (#20934)

* fix: add support for GX 0.18.22 and GX 1.4.x

* fix: add  support for GX 0.18.22 and GX 1.4.x

* style: ran python linting

* fix: skip test if GX version is not installed
This commit is contained in:
Teddy 2025-04-24 11:55:04 +02:00 committed by GitHub
parent 17dd182cbb
commit 209793f315
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1064 additions and 13 deletions

3
.gitignore vendored
View File

@ -123,9 +123,6 @@ scan-requirements.txt
# CLI e2e tests
ingestion/tests/cli_e2e/**/*test.yaml
# GX artifacts
/ingestion/tests/integration/great_expectations/gx/*
# Tests
**/metastore_db/

View File

@ -27,7 +27,8 @@ VERSIONS = {
"google-cloud-monitoring": "google-cloud-monitoring>=2.0.0",
"google-cloud-storage": "google-cloud-storage>=1.43.0",
"gcsfs": "gcsfs>=2023.1.0",
"great-expectations": "great-expectations>=0.18.0,<0.18.14",
"great-expectations": "great-expectations~=0.18.0",
"great-expectations-1xx": "great-expectations~=1.0",
"grpc-tools": "grpcio-tools>=1.47.2",
"msal": "msal~=1.2",
"neo4j": "neo4j~=5.3",
@ -250,6 +251,7 @@ plugins: Dict[str, Set[str]] = {
"exasol": {"sqlalchemy_exasol>=5,<6"},
"glue": {VERSIONS["boto3"]},
"great-expectations": {VERSIONS["great-expectations"]},
"great-expectations-1xx": {VERSIONS["great-expectations-1xx"]},
"greenplum": {*COMMONS["postgres"]},
"cockroach": {
VERSIONS["cockroach"],

View File

@ -96,6 +96,7 @@ class OpenMetadataValidationAction(ValidationAction):
def __init__(
self,
data_context: DataContext, # type: ignore
name: str = "OpenMetadataValidationAction",
*,
config_file_path: Optional[str] = None,
database_service_name: Optional[str] = None,
@ -103,7 +104,7 @@ class OpenMetadataValidationAction(ValidationAction):
database_name: Optional[str] = None,
table_name: Optional[str] = None,
):
super().__init__(data_context)
super().__init__(data_context, name=name)
self.database_service_name = database_service_name
self.database_name = database_name
self.table_name = table_name

View File

@ -0,0 +1,400 @@
# Copyright 2022 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Great Expectations subpackage to send expectation results to
Open Metadata table quality.
This subpackage needs to be used in Great Expectations
checkpoints actions.
"""
import logging
import traceback
from datetime import datetime
from typing import Dict, List, Literal, Optional, Union, cast
from great_expectations.checkpoint import (
ActionContext,
CheckpointResult,
ValidationAction,
)
from great_expectations.core.batch import Batch
from great_expectations.core.expectation_validation_result import (
ExpectationSuiteValidationResultMeta,
)
from great_expectations.datasource.fluent import DataAsset
from great_expectations.validator.validator import Validator
from sqlalchemy.engine.base import Connection, Engine
from sqlalchemy.engine.url import URL
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
TestResultValue,
)
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.tests.testDefinition import (
EntityType,
TestCaseParameterDefinition,
TestPlatform,
)
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.basic import Timestamp
from metadata.great_expectations.utils.ometa_config_handler import (
create_jinja_environment,
create_ometa_connection_obj,
render_template,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.entity_link import get_entity_link
logger = logging.getLogger(
"great_expectations.validation_operators.validation_operators.openmetadata"
)
class OpenMetadataValidationAction1xx(ValidationAction):
"""Open Metdata validation action for GE 1.x.x It inherits from
great expection validation action class and implements the
`run` method.
Attributes:
data_context: great expectation data context
database_service_name: name of the service for the table
api_version: default to v1
config_file_path: path to the open metdata config path
"""
type: Literal["open_metadata_validation_action"] = "open_metadata_validation_action"
name: str = "OpenMetadataValidationAction"
config_file_path: Optional[str] = None
database_service_name: Optional[str] = None
schema_name: Optional[str] = "default"
database_name: str
table_name: Optional[str] = None
# Using Optional to make this field not part of the serialized model
# This will be initialized in the run method
ometa_conn: Optional[OpenMetadata] = None
def run( # pylint: disable=unused-argument
self,
checkpoint_result: CheckpointResult,
action_context: Union[ActionContext, None],
):
"""main function to implement great expectation hook
Args:
validation_result_suite: result suite returned when checkpoint is ran
validation_result_suite_identifier: type of result suite
data_asset:
expectation_suite_identifier: type of expectation suite
checkpoint_identifier: identifier for the checkpoint
"""
self.ometa_conn = self._create_ometa_connection()
for _, v in checkpoint_result.run_results.items():
meta = v.meta
if meta:
check_point_spec = self._get_checkpoint_batch_spec(meta)
table_entity = self._get_table_entity(
self.database_name,
check_point_spec.get("schema_name", self.schema_name),
check_point_spec.get("table_name"),
)
else:
table_entity = self._get_table_entity(
self.database_name,
self.schema_name,
self.table_name,
)
if table_entity:
test_suite = self._check_or_create_test_suite(table_entity)
for result in v.results:
self._handle_test_case(result, table_entity, test_suite)
@staticmethod
def _get_checkpoint_batch_spec(
meta: Union[ExpectationSuiteValidationResultMeta, dict]
):
"""Return run meta and check instance of data_asset
Args:
data_asset: data assets of the checkpoint run
Returns:
SqlAlchemyDatasourceBatchSpec
Raises:
ValueError: if datasource not SqlAlchemyDatasourceBatchSpec raise
"""
return meta.get("batch_spec")
def _get_table_entity(
self,
database: Optional[str],
schema_name: Optional[str],
table_name: Optional[str],
) -> Optional[Table]:
"""Return the table entity for the test. If service name is defined
in GE checkpoint entity will be fetch using the FQN. If not provided
iterative search will be perform among all the entities. If 2 entities
are found with the same `database`.`schema`.`table` the method will
raise an error.
Args:
database: database name
schema_name: schema name
table_name: table name
Return:
Optional[Table]
Raises:
ValueError: if 2 entities with the same
`database`.`schema`.`table` are found
"""
if not all([schema_name, table_name]):
raise ValueError(
"No Schema or Table name provided. Can't fetch table entity from OpenMetadata."
)
if self.database_service_name:
return self.ometa_conn.get_by_name(
entity=Table,
fqn=f"{self.database_service_name}.{database}.{schema_name}.{table_name}",
fields=["testSuite"],
)
table_entity = [
entity
for entity in self.ometa_conn.list_entities(
entity=Table, fields=["testSuite"]
).entities
if f"{database}.{schema_name}.{table_name}"
in entity.fullyQualifiedName.root
]
if len(table_entity) > 1:
raise ValueError(
f"Non unique `database`.`schema`.`table` found: {table_entity}."
"Please specify an `database_service_name` in you checkpoint.yml file.",
)
if table_entity:
return table_entity[0]
logger.warning(
"No entity found for %s.%s.%s", database, schema_name, table_name
)
return None
def _check_or_create_test_suite(self, table_entity: Table) -> TestSuite:
"""Check if test suite already exists for a given table entity. If not
create a new one.
Args:
table_entity: table entity object
Returns:
TestSuite
"""
if table_entity.testSuite:
test_suite = self.ometa_conn.get_by_name(
TestSuite, table_entity.testSuite.fullyQualifiedName
)
test_suite = cast(TestSuite, test_suite)
return test_suite
create_test_suite = CreateTestSuiteRequest(
name=f"{table_entity.fullyQualifiedName.root}.TestSuite",
basicEntityReference=table_entity.fullyQualifiedName.root,
) # type: ignore
test_suite = self.ometa_conn.create_or_update_executable_test_suite(
create_test_suite
)
return test_suite
@staticmethod
def _get_execution_engine_url(
data_asset: Union[Validator, DataAsset, Batch]
) -> URL:
"""Get execution engine used to run the expectation
Args:
data_asset: data assets of the checkpoint run
Returns:
URL
Raises:
ValueError: if expectation is not ran against DB
"""
if isinstance(data_asset.execution_engine.engine, Engine):
return data_asset.execution_engine.engine.url
if isinstance(data_asset.execution_engine.engine, Connection):
return data_asset.execution_engine.engine.engine.url
raise ValueError(
"Type is not supported. Make sur you ran your"
" expectations against a relational database"
)
def _create_ometa_connection(self) -> OpenMetadata:
"""Create OpenMetadata API connection"""
environment = create_jinja_environment(self.config_file_path)
rendered_config = render_template(environment)
return OpenMetadata(create_ometa_connection_obj(rendered_config))
def _build_test_case_fqn(self, table_fqn: str, result: Dict) -> str:
"""build test case fqn from table entity and GE test results
Args:
table_fqn (str): table fully qualified name
result (Dict): result from great expectation tests
"""
split_table_fqn = table_fqn.split(".")
fqn_ = fqn.build(
self.ometa_conn,
entity_type=TestCase,
service_name=split_table_fqn[0],
database_name=split_table_fqn[1],
schema_name=split_table_fqn[2],
table_name=split_table_fqn[3],
column_name=result["expectation_config"]["kwargs"].get("column"),
test_case_name=result["expectation_config"]["type"],
)
fqn_ = cast(str, fqn_)
return fqn_
def _get_test_case_params_value(self, result: dict) -> List[TestCaseParameterValue]:
"""Build test case parameter value from GE test result"""
if "observed_value" not in result["result"]:
return [
TestCaseParameterValue(
name="unexpected_percentage_total",
value=str(0.0),
)
]
return [
TestCaseParameterValue(
name=key,
value=str(value),
)
for key, value in result["expectation_config"]["kwargs"].items()
if key not in {"column", "batch_id"}
]
def _get_test_case_params_definition(
self, result: dict
) -> List[TestCaseParameterDefinition]:
"""Build test case parameter definition from GE test result"""
if "observed_value" not in result["result"]:
return [
TestCaseParameterDefinition(
name="unexpected_percentage_total",
) # type: ignore
]
return [
TestCaseParameterDefinition(
name=key,
) # type: ignore
for key, _ in result["expectation_config"]["kwargs"].items()
if key not in {"column", "batch_id"}
]
def _get_test_result_value(self, result: dict) -> List[TestResultValue]:
"""Get test result value from GE test result
Args:
result (dict): result
Returns:
TestCaseResult: a test case result object
"""
try:
test_result_value = TestResultValue(
name="observed_value",
value=str(result["result"]["observed_value"]),
)
except KeyError:
unexpected_percent_total = result["result"].get("unexpected_percent_total")
test_result_value = TestResultValue(
name="unexpected_percentage_total",
value=str(unexpected_percent_total),
)
return [test_result_value]
def _handle_test_case(
self, result: Dict, table_entity: Table, test_suite: TestSuite
):
"""Handle adding test to table entity based on the test case.
Test Definitions will be created on the fly from the results of the
great expectations run. We will then write the test case results to the
specific test case.
Args:
result: GE test result
table_entity: table entity object
test_suite: test suite object
"""
try:
test_definition = self.ometa_conn.get_or_create_test_definition(
test_definition_fqn=result["expectation_config"]["type"],
test_definition_description=result["expectation_config"][
"type"
].replace("_", " "),
entity_type=EntityType.COLUMN
if "column" in result["expectation_config"]["kwargs"]
else EntityType.TABLE,
test_platforms=[TestPlatform.GreatExpectations],
test_case_parameter_definition=self._get_test_case_params_definition(
result
),
)
test_case_fqn = self._build_test_case_fqn(
table_entity.fullyQualifiedName.root,
result,
)
test_case = self.ometa_conn.get_or_create_test_case(
test_case_fqn,
entity_link=get_entity_link(
Table,
fqn=table_entity.fullyQualifiedName.root,
column_name=fqn.split_test_case_fqn(test_case_fqn).column,
),
test_suite_fqn=test_suite.fullyQualifiedName.root,
test_definition_fqn=test_definition.fullyQualifiedName.root,
test_case_parameter_values=self._get_test_case_params_value(result),
)
self.ometa_conn.add_test_case_results(
test_results=TestCaseResult(
timestamp=Timestamp(int(datetime.now().timestamp() * 1000)),
testCaseStatus=TestCaseStatus.Success
if result["success"]
else TestCaseStatus.Failed,
testResultValue=self._get_test_result_value(result),
), # type: ignore
test_case_fqn=test_case.fullyQualifiedName.root,
)
logger.debug(
f"Test case result for {test_case.fullyQualifiedName.root} successfully ingested"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(exc)

View File

@ -1,2 +0,0 @@
uncommitted/
expectations/.ge_store_backend_id

View File

@ -0,0 +1,2 @@
uncommitted/

View File

@ -0,0 +1 @@
store_backend_id = e3a2fbc4-3b99-4bfe-ab2e-c81eaeec3e2a

View File

@ -15,10 +15,11 @@ Validate great expectation integration
import logging
import os
import subprocess
import sys
from datetime import datetime, timedelta
from unittest import TestCase
from great_expectations import DataContext
from sqlalchemy import Column, DateTime, Integer, String, create_engine
from sqlalchemy.orm import declarative_base
@ -175,6 +176,17 @@ class TestGreatExpectationIntegration(TestCase):
"""
Test great expectation integration
"""
self.install_gx_018x()
import great_expectations as gx
try:
self.assertTrue(gx.__version__.startswith("0.18."))
except AssertionError as exc:
# module versions are cached, so we need to skip the test if the version is not 0.18.x
# e.g. we run the 1.x.x test before this one, 0.18.x version will be cached and used here
# The test will run if we run this test alone without the 1.x.x test
self.skipTest(f"GX version is not 0.18.x: {exc}")
table_entity = self.metadata.get_by_name(
entity=Table,
fqn="test_sqlite.default.main.users",
@ -186,10 +198,9 @@ class TestGreatExpectationIntegration(TestCase):
# GE config file
ge_folder = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"great_expectations",
)
ometa_config = os.path.join(ge_folder, "ometa_config")
context = DataContext(ge_folder)
ometa_config = os.path.join(ge_folder, "gx/ometa_config")
context = gx.get_context(project_root_dir=ge_folder)
checkpoint = context.get_checkpoint("sqlite")
# update our checkpoint file at runtime to dynamically pass the ometa config file
checkpoint.action_list[-1].update(
@ -227,3 +238,9 @@ class TestGreatExpectationIntegration(TestCase):
)
assert test_case_results
def install_gx_018x(self):
"""Install GX 0.18.x at runtime as we support 0.18.x and 1.x.x and setup will install 1 default version"""
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "great-expectations~=0.18.0"]
)

View File

@ -0,0 +1,282 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate great expectation integration
"""
import logging
import os
import subprocess
import sys
from datetime import datetime, timedelta
from unittest import TestCase
from sqlalchemy import Column, DateTime, Integer, String, create_engine
from sqlalchemy.orm import declarative_base
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.ingestion.connections.session import create_and_bind_session
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
from metadata.workflow.metadata import MetadataWorkflow
Base = declarative_base()
TEST_CASE_FQN = (
"test_sqlite.default.main.users.name.expect_column_values_to_not_be_null"
)
SQLLITE_SHARD = "file:cachedb?mode=memory&cache=shared&check_same_thread=False"
LOGGER = logging.getLogger(__name__)
WORKFLOW_CONFIG = {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
}
INGESTION_CONFIG = {
"source": {
"type": "sqlite",
"serviceName": "test_sqlite",
"serviceConnection": {
"config": {
"type": "SQLite",
"databaseMode": SQLLITE_SHARD,
"database": "default",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
**WORKFLOW_CONFIG,
},
}
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(256))
fullname = Column(String(256))
nickname = Column(String(256))
age = Column(Integer)
signedup = Column(DateTime)
class TestGreatExpectationIntegration1xx(TestCase):
"""Test great expectation integration"""
skip_test = True
engine = create_engine(
f"sqlite+pysqlite:///{SQLLITE_SHARD}",
)
session = create_and_bind_session(engine)
server_config = OpenMetadataConnection(
hostPort=WORKFLOW_CONFIG["openMetadataServerConfig"]["hostPort"],
authProvider=WORKFLOW_CONFIG["openMetadataServerConfig"]["authProvider"],
securityConfig=OpenMetadataJWTClientConfig(
jwtToken=WORKFLOW_CONFIG["openMetadataServerConfig"]["securityConfig"][
"jwtToken"
]
),
) # type: ignore
metadata = OpenMetadata(server_config)
@classmethod
def setUpClass(cls):
"""Set up class by ingesting metadata"""
try:
User.__table__.create(bind=cls.engine)
except Exception as exc:
LOGGER.warning(f"Table Already exists: {exc}")
data = [
User(
name="John",
fullname="John Doe",
nickname="johnny b goode",
age=30,
signedup=datetime.now() - timedelta(days=10),
),
User(
name="Jane",
fullname="Jone Doe",
nickname=None,
age=31,
signedup=datetime.now() - timedelta(days=2),
),
User(
name="Joh",
fullname="Joh Doe",
nickname=None,
age=37,
signedup=datetime.now() - timedelta(days=1),
),
User(
name="Jae",
fullname="Jae Doe",
nickname=None,
age=38,
signedup=datetime.now() - timedelta(days=1),
),
]
cls.session.add_all(data)
cls.session.commit()
ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.print_status()
ingestion_workflow.stop()
@classmethod
def tearDownClass(cls) -> None:
"""
Clean up
"""
service_id = str(
cls.metadata.get_by_name(entity=DatabaseService, fqn="test_sqlite").id.root
)
cls.metadata.delete(
entity=DatabaseService,
entity_id=service_id,
recursive=True,
hard_delete=True,
)
User.__table__.drop(bind=cls.engine)
cls.session.close()
def test_great_expectation_integration(self):
"""
Test great expectation integration
"""
self.install_gx_1xx()
import great_expectations as gx
try:
self.assertTrue(gx.__version__.startswith("1."))
except AssertionError as exc:
# module versions are cached, so we need to skip the test if the version is not 1.x.x
# e.g. we run the 0.18.x test before this one, 0.18 version will be cached and used here
# The test will run if we run this test alone without the 0.18.x test
self.skipTest(f"GX version is not 1.x.x: {exc}")
from metadata.great_expectations.action1xx import (
OpenMetadataValidationAction1xx,
)
table_entity = self.metadata.get_by_name(
entity=Table,
fqn="test_sqlite.default.main.users",
fields=["testSuite"],
)
assert not table_entity.testSuite
# GE config file
ge_folder = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
)
ometa_config = os.path.join(ge_folder, "gx/ometa_config")
context = gx.get_context()
conn_string = f"sqlite+pysqlite:///file:cachedb?mode=memory&cache=shared&check_same_thread=False"
data_source = context.data_sources.add_sqlite(
name="test_sqlite",
connection_string=conn_string,
)
data_asset = data_source.add_table_asset(
name="users", table_name="users", schema_name="main"
)
batch_definition = data_asset.add_batch_definition_whole_table(
"batch definition"
)
batch = batch_definition.get_batch()
suite = context.suites.add(
gx.core.expectation_suite.ExpectationSuite(name="name")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="name")
)
validation_definition = context.validation_definitions.add(
gx.core.validation_definition.ValidationDefinition(
name="validation definition",
data=batch_definition,
suite=suite,
)
)
action_list = [
OpenMetadataValidationAction1xx(
database_service_name="test_sqlite",
database_name="default",
table_name="users",
schema_name="main",
config_file_path=ometa_config,
)
]
checkpoint = context.checkpoints.add(
gx.checkpoint.checkpoint.Checkpoint(
name="checkpoint",
validation_definitions=[validation_definition],
actions=action_list,
)
)
checkpoint.run()
table_entity = self.metadata.get_by_name(
entity=Table,
fqn="test_sqlite.default.main.users",
fields=["testSuite"],
)
assert table_entity.testSuite
test_suite: TestSuite = self.metadata.get_by_id(
entity=TestSuite, entity_id=table_entity.testSuite.id, fields=["tests"]
)
assert len(test_suite.tests) == 1
test_case_results = self.metadata.get_test_case_results(
test_case_fqn=TEST_CASE_FQN,
start_ts=get_beginning_of_day_timestamp_mill(),
end_ts=get_end_of_day_timestamp_mill(),
)
assert test_case_results
def install_gx_1xx(self):
"""Install GX 1.x.x at runtime as we support 0.18.x and 1.x.x and setup will install 1 default version"""
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "great-expectations~=1.0"]
)

View File

@ -17,7 +17,6 @@ from unittest import mock
from pytest import fixture
from metadata.great_expectations.action import OpenMetadataValidationAction
from metadata.great_expectations.utils.ometa_config_handler import (
create_jinja_environment,
)
@ -51,6 +50,8 @@ def mocked_ometa_object():
@fixture(scope="module")
def mocked_ometa():
"""Mocks OMeta obkect"""
from metadata.great_expectations.action import OpenMetadataValidationAction
with mock.patch.object(
OpenMetadataValidationAction,
"_create_ometa_connection",

View File

@ -13,15 +13,29 @@ Test suite for the action module implementation
"""
import os
import subprocess
import sys
from unittest import mock
import great_expectations as gx
from jinja2 import Environment
from pytest import mark
from metadata.great_expectations.action import OpenMetadataValidationAction
from metadata.great_expectations.utils.ometa_config_handler import render_template
def install_gx_018x():
"""Install GX 0.18.x at runtime as we support 0.18.x and 1.x.x and setup will install 1 default version"""
if not gx.__version__.startswith("0.18."):
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "great-expectations~=0.18.0"]
)
install_gx_018x()
@mark.parametrize(
"input,expected",
[
@ -31,6 +45,8 @@ from metadata.great_expectations.utils.ometa_config_handler import render_templa
)
def test_get_table_entity(input, expected, mocked_ometa, mocked_ge_data_context):
"""Test get table entity"""
from metadata.great_expectations.action import OpenMetadataValidationAction
ometa_validation = OpenMetadataValidationAction(
data_context=mocked_ge_data_context,
config_file_path="my/config/path",
@ -52,6 +68,8 @@ def test_get_table_entity_database_service_name(
input, expected, mocked_ometa, mocked_ge_data_context
):
"""Test get table entity"""
from metadata.great_expectations.action import OpenMetadataValidationAction
ometa_validation = OpenMetadataValidationAction(
data_context=mocked_ge_data_context,
config_file_path="my/config/path",

View File

@ -116,6 +116,73 @@ src="/images/v1.6/features/integrations/ge-run-checkpoint.gif"
alt="Run Great Expectations checkpoint"
/%}
### Running GX using the Python SDK?
If you are running GX using their Python SDK below is a full example of how to add the action to your code
```python
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
context = gx.get_context()
conn_string = f"postgresql+psycopg2://user:pw@host:port/db"
data_source = context.sources.add_postgres(
name="source_name",
connection_string=conn_string,
)
data_asset = data_source.add_table_asset(
name="name",
table_name="table_name",
schema_name="schema_name",
)
batch_request = data_source.get_asset("name").build_batch_request()
expectation_suite_name = "expectation_suite_name"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
validator.expect_column_values_to_not_be_null(column="column_name")
validator.expect_column_values_to_be_between(
column="column_name", min_value=min_value, max_value=max_value
)
validator.save_expectation_suite(discard_failed_expectations=False)
my_checkpoint_name = "my_sql_checkpoint"
checkpoint = Checkpoint(
name=my_checkpoint_name,
run_name_template="%Y%m%d-%H%M%S-my-run-name-template",
data_context=context,
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
action_list=[
{
"name": "store results in OpenMetadata",
"action": {
"module_name": "metadata.great_expectations.action",
"class_name": "OpenMetadataValidationAction",
"config_file_path": "path/to/config/file",
"database_service_name": "openmetadata_service_name",
"database_name": "database_name",
"table_name": "table_name",
"schema_name": "schema_name",
}
},
],
)
context.add_or_update_checkpoint(checkpoint=checkpoint)
checkpoint_result = checkpoint.run()
```
### List of Great Expectations Supported Test
We currently only support a certain number of Great Expectations tests. The full list can be found in the [Tests](/how-to-guides/data-quality-observability/quality/tests-yaml) section.

View File

@ -116,6 +116,138 @@ src="/images/v1.7/features/integrations/ge-run-checkpoint.gif"
alt="Run Great Expectations checkpoint"
/%}
### Running GX using the Python SDK?
If you are running GX using their Python SDK below is a full example of how to add the action to your code
```python
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
context = gx.get_context()
conn_string = f"postgresql+psycopg2://user:pw@host:port/db"
data_source = context.sources.add_postgres(
name="source_name",
connection_string=conn_string,
)
data_asset = data_source.add_table_asset(
name="name",
table_name="table_name",
schema_name="schema_name",
)
batch_request = data_source.get_asset("name").build_batch_request()
expectation_suite_name = "expectation_suite_name"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
validator.expect_column_values_to_not_be_null(column="column_name")
validator.expect_column_values_to_be_between(
column="column_name", min_value=min_value, max_value=max_value
)
validator.save_expectation_suite(discard_failed_expectations=False)
my_checkpoint_name = "my_sql_checkpoint"
checkpoint = Checkpoint(
name=my_checkpoint_name,
run_name_template="%Y%m%d-%H%M%S-my-run-name-template",
data_context=context,
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
action_list=[
{
"name": "store results in OpenMetadata",
"action": {
"module_name": "metadata.great_expectations.action",
"class_name": "OpenMetadataValidationAction",
"config_file_path": "path/to/config/file",
"database_service_name": "openmetadata_service_name",
"database_name": "database_name",
"table_name": "table_name",
"schema_name": "schema_name",
}
},
],
)
context.add_or_update_checkpoint(checkpoint=checkpoint)
checkpoint_result = checkpoint.run()
```
### Working with GX 1.x.x?
In v1.x.x GX introduced significant changes to their SDK. One notable change was the removal of the `great_expectations` CLI. OpenMetadata introduced support for 1.x.x version through its `OpenMetadataValidationAction1xx` class. You will need to first `pip install 'open-metadata[great-expectations-1xx]'. Below is a complete example
```python
import great_expectations as gx
from metadata.great_expectations.action1xx import OpenMetadataValidationAction1xx # OpenMetadata Validation Action for GX 1.x.x
context = gx.get_context()
conn_string = f"redshift+psycopg2://user:pw@host:port/db"
data_source = context.data_sources.add_redshift(
name="name",
connection_string=conn_string,
)
data_asset = data_source.add_table_asset(
name="name",
table_name="table_name",
schema_name="schema_name",
)
batch_definition = data_asset.add_batch_definition_whole_table("batch definition")
batch = batch_definition.get_batch()
suite = context.suites.add(
gx.core.expectation_suite.ExpectationSuite(name="name")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(
column="column_name"
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(column="column_name", min_value=50, max_value=1000000)
)
validation_definition = context.validation_definitions.add(
gx.core.validation_definition.ValidationDefinition(
name="validation definition",
data=batch_definition,
suite=suite,
)
)
action_list = [
OpenMetadataValidationAction1xx(
database_service_name="openmetadata_service_name",
database_name="openmetadata_database_name",
table_name="openmetadata_table_name",
schema_name="openmetadata_schema_name",
config_file_path="path/to/config/file",
)
]
checkpoint = context.checkpoints.add(
gx.checkpoint.checkpoint.Checkpoint(
name="checkpoint", validation_definitions=[validation_definition], actions=action_list
)
)
checkpoint_result = checkpoint.run()
```
### List of Great Expectations Supported Test
We currently only support a certain number of Great Expectations tests. The full list can be found in the [Tests](/how-to-guides/data-quality-observability/quality/tests-yaml) section.

View File

@ -116,6 +116,139 @@ src="/images/v1.8/features/integrations/ge-run-checkpoint.gif"
alt="Run Great Expectations checkpoint"
/%}
### Running GX using the Python SDK?
If you are running GX using their Python SDK below is a full example of how to add the action to your code
```python
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
context = gx.get_context()
conn_string = f"postgresql+psycopg2://user:pw@host:port/db"
data_source = context.sources.add_postgres(
name="source_name",
connection_string=conn_string,
)
data_asset = data_source.add_table_asset(
name="name",
table_name="table_name",
schema_name="schema_name",
)
batch_request = data_source.get_asset("name").build_batch_request()
expectation_suite_name = "expectation_suite_name"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
validator.expect_column_values_to_not_be_null(column="column_name")
validator.expect_column_values_to_be_between(
column="column_name", min_value=min_value, max_value=max_value
)
validator.save_expectation_suite(discard_failed_expectations=False)
my_checkpoint_name = "my_sql_checkpoint"
checkpoint = Checkpoint(
name=my_checkpoint_name,
run_name_template="%Y%m%d-%H%M%S-my-run-name-template",
data_context=context,
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
action_list=[
{
"name": "store results in OpenMetadata",
"action": {
"module_name": "metadata.great_expectations.action",
"class_name": "OpenMetadataValidationAction",
"config_file_path": "path/to/config/file",
"database_service_name": "openmetadata_service_name",
"database_name": "database_name",
"table_name": "table_name",
"schema_name": "schema_name",
}
},
],
)
context.add_or_update_checkpoint(checkpoint=checkpoint)
checkpoint_result = checkpoint.run()
```
### Working with GX 1.x.x?
In v1.x.x GX introduced significant changes to their SDK. One notable change was the removal of the `great_expectations` CLI. OpenMetadata introduced support for 1.x.x version through its `OpenMetadataValidationAction1xx` class. You will need to first `pip install 'open-metadata[great-expectations-1xx]'. Below is a complete example
```python
import great_expectations as gx
from metadata.great_expectations.action1xx import OpenMetadataValidationAction1xx # OpenMetadata Validation Action for GX 1.x.x
context = gx.get_context()
conn_string = f"redshift+psycopg2://user:pw@host:port/db"
data_source = context.data_sources.add_redshift(
name="name",
connection_string=conn_string,
)
data_asset = data_source.add_table_asset(
name="name",
table_name="table_name",
schema_name="schema_name",
)
batch_definition = data_asset.add_batch_definition_whole_table("batch definition")
batch = batch_definition.get_batch()
suite = context.suites.add(
gx.core.expectation_suite.ExpectationSuite(name="name")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(
column="column_name"
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(column="column_name", min_value=50, max_value=1000000)
)
validation_definition = context.validation_definitions.add(
gx.core.validation_definition.ValidationDefinition(
name="validation definition",
data=batch_definition,
suite=suite,
)
)
action_list = [
OpenMetadataValidationAction1xx(
database_service_name="openmetadata_service_name",
database_name="openmetadata_database_name",
table_name="openmetadata_table_name",
schema_name="openmetadata_schema_name",
config_file_path="path/to/config/file",
)
]
checkpoint = context.checkpoints.add(
gx.checkpoint.checkpoint.Checkpoint(
name="checkpoint", validation_definitions=[validation_definition], actions=action_list
)
)
checkpoint_result = checkpoint.run()
```
### List of Great Expectations Supported Test
We currently only support a certain number of Great Expectations tests. The full list can be found in the [Tests](/how-to-guides/data-quality-observability/quality/tests-yaml) section.