diff --git a/.gitignore b/.gitignore index 3fd8ec8160c..9b877400109 100644 --- a/.gitignore +++ b/.gitignore @@ -123,9 +123,6 @@ scan-requirements.txt # CLI e2e tests ingestion/tests/cli_e2e/**/*test.yaml -# GX artifacts -/ingestion/tests/integration/great_expectations/gx/* - # Tests **/metastore_db/ diff --git a/ingestion/setup.py b/ingestion/setup.py index c7304bf8051..e7b2c1350b8 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -27,7 +27,8 @@ VERSIONS = { "google-cloud-monitoring": "google-cloud-monitoring>=2.0.0", "google-cloud-storage": "google-cloud-storage>=1.43.0", "gcsfs": "gcsfs>=2023.1.0", - "great-expectations": "great-expectations>=0.18.0,<0.18.14", + "great-expectations": "great-expectations~=0.18.0", + "great-expectations-1xx": "great-expectations~=1.0", "grpc-tools": "grpcio-tools>=1.47.2", "msal": "msal~=1.2", "neo4j": "neo4j~=5.3", @@ -250,6 +251,7 @@ plugins: Dict[str, Set[str]] = { "exasol": {"sqlalchemy_exasol>=5,<6"}, "glue": {VERSIONS["boto3"]}, "great-expectations": {VERSIONS["great-expectations"]}, + "great-expectations-1xx": {VERSIONS["great-expectations-1xx"]}, "greenplum": {*COMMONS["postgres"]}, "cockroach": { VERSIONS["cockroach"], diff --git a/ingestion/src/metadata/great_expectations/action.py b/ingestion/src/metadata/great_expectations/action.py index f9a732d8d41..5ff71cef05c 100644 --- a/ingestion/src/metadata/great_expectations/action.py +++ b/ingestion/src/metadata/great_expectations/action.py @@ -96,6 +96,7 @@ class OpenMetadataValidationAction(ValidationAction): def __init__( self, data_context: DataContext, # type: ignore + name: str = "OpenMetadataValidationAction", *, config_file_path: Optional[str] = None, database_service_name: Optional[str] = None, @@ -103,7 +104,7 @@ class OpenMetadataValidationAction(ValidationAction): database_name: Optional[str] = None, table_name: Optional[str] = None, ): - super().__init__(data_context) + super().__init__(data_context, name=name) self.database_service_name = database_service_name self.database_name = database_name self.table_name = table_name diff --git a/ingestion/src/metadata/great_expectations/action1xx.py b/ingestion/src/metadata/great_expectations/action1xx.py new file mode 100644 index 00000000000..de5bd7f526c --- /dev/null +++ b/ingestion/src/metadata/great_expectations/action1xx.py @@ -0,0 +1,400 @@ +# Copyright 2022 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Great Expectations subpackage to send expectation results to +Open Metadata table quality. + +This subpackage needs to be used in Great Expectations +checkpoints actions. +""" +import logging +import traceback +from datetime import datetime +from typing import Dict, List, Literal, Optional, Union, cast + +from great_expectations.checkpoint import ( + ActionContext, + CheckpointResult, + ValidationAction, +) +from great_expectations.core.batch import Batch +from great_expectations.core.expectation_validation_result import ( + ExpectationSuiteValidationResultMeta, +) +from great_expectations.datasource.fluent import DataAsset +from great_expectations.validator.validator import Validator +from sqlalchemy.engine.base import Connection, Engine +from sqlalchemy.engine.url import URL + +from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.tests.basic import ( + TestCaseResult, + TestCaseStatus, + TestResultValue, +) +from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue +from metadata.generated.schema.tests.testDefinition import ( + EntityType, + TestCaseParameterDefinition, + TestPlatform, +) +from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.generated.schema.type.basic import Timestamp +from metadata.great_expectations.utils.ometa_config_handler import ( + create_jinja_environment, + create_ometa_connection_obj, + render_template, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import fqn +from metadata.utils.entity_link import get_entity_link + +logger = logging.getLogger( + "great_expectations.validation_operators.validation_operators.openmetadata" +) + + +class OpenMetadataValidationAction1xx(ValidationAction): + """Open Metdata validation action for GE 1.x.x It inherits from + great expection validation action class and implements the + `run` method. + + Attributes: + data_context: great expectation data context + database_service_name: name of the service for the table + api_version: default to v1 + config_file_path: path to the open metdata config path + """ + + type: Literal["open_metadata_validation_action"] = "open_metadata_validation_action" + name: str = "OpenMetadataValidationAction" + config_file_path: Optional[str] = None + database_service_name: Optional[str] = None + schema_name: Optional[str] = "default" + database_name: str + table_name: Optional[str] = None + # Using Optional to make this field not part of the serialized model + # This will be initialized in the run method + ometa_conn: Optional[OpenMetadata] = None + + def run( # pylint: disable=unused-argument + self, + checkpoint_result: CheckpointResult, + action_context: Union[ActionContext, None], + ): + """main function to implement great expectation hook + + Args: + validation_result_suite: result suite returned when checkpoint is ran + validation_result_suite_identifier: type of result suite + data_asset: + expectation_suite_identifier: type of expectation suite + checkpoint_identifier: identifier for the checkpoint + """ + self.ometa_conn = self._create_ometa_connection() + for _, v in checkpoint_result.run_results.items(): + meta = v.meta + if meta: + check_point_spec = self._get_checkpoint_batch_spec(meta) + table_entity = self._get_table_entity( + self.database_name, + check_point_spec.get("schema_name", self.schema_name), + check_point_spec.get("table_name"), + ) + + else: + table_entity = self._get_table_entity( + self.database_name, + self.schema_name, + self.table_name, + ) + + if table_entity: + test_suite = self._check_or_create_test_suite(table_entity) + for result in v.results: + self._handle_test_case(result, table_entity, test_suite) + + @staticmethod + def _get_checkpoint_batch_spec( + meta: Union[ExpectationSuiteValidationResultMeta, dict] + ): + """Return run meta and check instance of data_asset + + Args: + data_asset: data assets of the checkpoint run + Returns: + SqlAlchemyDatasourceBatchSpec + Raises: + ValueError: if datasource not SqlAlchemyDatasourceBatchSpec raise + """ + return meta.get("batch_spec") + + def _get_table_entity( + self, + database: Optional[str], + schema_name: Optional[str], + table_name: Optional[str], + ) -> Optional[Table]: + """Return the table entity for the test. If service name is defined + in GE checkpoint entity will be fetch using the FQN. If not provided + iterative search will be perform among all the entities. If 2 entities + are found with the same `database`.`schema`.`table` the method will + raise an error. + + Args: + database: database name + schema_name: schema name + table_name: table name + + Return: + Optional[Table] + + Raises: + ValueError: if 2 entities with the same + `database`.`schema`.`table` are found + """ + if not all([schema_name, table_name]): + raise ValueError( + "No Schema or Table name provided. Can't fetch table entity from OpenMetadata." + ) + + if self.database_service_name: + return self.ometa_conn.get_by_name( + entity=Table, + fqn=f"{self.database_service_name}.{database}.{schema_name}.{table_name}", + fields=["testSuite"], + ) + + table_entity = [ + entity + for entity in self.ometa_conn.list_entities( + entity=Table, fields=["testSuite"] + ).entities + if f"{database}.{schema_name}.{table_name}" + in entity.fullyQualifiedName.root + ] + + if len(table_entity) > 1: + raise ValueError( + f"Non unique `database`.`schema`.`table` found: {table_entity}." + "Please specify an `database_service_name` in you checkpoint.yml file.", + ) + + if table_entity: + return table_entity[0] + + logger.warning( + "No entity found for %s.%s.%s", database, schema_name, table_name + ) + return None + + def _check_or_create_test_suite(self, table_entity: Table) -> TestSuite: + """Check if test suite already exists for a given table entity. If not + create a new one. + + Args: + table_entity: table entity object + Returns: + TestSuite + """ + + if table_entity.testSuite: + test_suite = self.ometa_conn.get_by_name( + TestSuite, table_entity.testSuite.fullyQualifiedName + ) + test_suite = cast(TestSuite, test_suite) + return test_suite + + create_test_suite = CreateTestSuiteRequest( + name=f"{table_entity.fullyQualifiedName.root}.TestSuite", + basicEntityReference=table_entity.fullyQualifiedName.root, + ) # type: ignore + test_suite = self.ometa_conn.create_or_update_executable_test_suite( + create_test_suite + ) + return test_suite + + @staticmethod + def _get_execution_engine_url( + data_asset: Union[Validator, DataAsset, Batch] + ) -> URL: + """Get execution engine used to run the expectation + + Args: + data_asset: data assets of the checkpoint run + Returns: + URL + Raises: + ValueError: if expectation is not ran against DB + """ + if isinstance(data_asset.execution_engine.engine, Engine): + return data_asset.execution_engine.engine.url + if isinstance(data_asset.execution_engine.engine, Connection): + return data_asset.execution_engine.engine.engine.url + raise ValueError( + "Type is not supported. Make sur you ran your" + " expectations against a relational database" + ) + + def _create_ometa_connection(self) -> OpenMetadata: + """Create OpenMetadata API connection""" + environment = create_jinja_environment(self.config_file_path) + rendered_config = render_template(environment) + + return OpenMetadata(create_ometa_connection_obj(rendered_config)) + + def _build_test_case_fqn(self, table_fqn: str, result: Dict) -> str: + """build test case fqn from table entity and GE test results + + Args: + table_fqn (str): table fully qualified name + result (Dict): result from great expectation tests + """ + split_table_fqn = table_fqn.split(".") + fqn_ = fqn.build( + self.ometa_conn, + entity_type=TestCase, + service_name=split_table_fqn[0], + database_name=split_table_fqn[1], + schema_name=split_table_fqn[2], + table_name=split_table_fqn[3], + column_name=result["expectation_config"]["kwargs"].get("column"), + test_case_name=result["expectation_config"]["type"], + ) + fqn_ = cast(str, fqn_) + return fqn_ + + def _get_test_case_params_value(self, result: dict) -> List[TestCaseParameterValue]: + """Build test case parameter value from GE test result""" + if "observed_value" not in result["result"]: + return [ + TestCaseParameterValue( + name="unexpected_percentage_total", + value=str(0.0), + ) + ] + + return [ + TestCaseParameterValue( + name=key, + value=str(value), + ) + for key, value in result["expectation_config"]["kwargs"].items() + if key not in {"column", "batch_id"} + ] + + def _get_test_case_params_definition( + self, result: dict + ) -> List[TestCaseParameterDefinition]: + """Build test case parameter definition from GE test result""" + if "observed_value" not in result["result"]: + return [ + TestCaseParameterDefinition( + name="unexpected_percentage_total", + ) # type: ignore + ] + + return [ + TestCaseParameterDefinition( + name=key, + ) # type: ignore + for key, _ in result["expectation_config"]["kwargs"].items() + if key not in {"column", "batch_id"} + ] + + def _get_test_result_value(self, result: dict) -> List[TestResultValue]: + """Get test result value from GE test result + + Args: + result (dict): result + + Returns: + TestCaseResult: a test case result object + """ + try: + test_result_value = TestResultValue( + name="observed_value", + value=str(result["result"]["observed_value"]), + ) + except KeyError: + unexpected_percent_total = result["result"].get("unexpected_percent_total") + test_result_value = TestResultValue( + name="unexpected_percentage_total", + value=str(unexpected_percent_total), + ) + + return [test_result_value] + + def _handle_test_case( + self, result: Dict, table_entity: Table, test_suite: TestSuite + ): + """Handle adding test to table entity based on the test case. + Test Definitions will be created on the fly from the results of the + great expectations run. We will then write the test case results to the + specific test case. + + Args: + result: GE test result + table_entity: table entity object + test_suite: test suite object + """ + + try: + test_definition = self.ometa_conn.get_or_create_test_definition( + test_definition_fqn=result["expectation_config"]["type"], + test_definition_description=result["expectation_config"][ + "type" + ].replace("_", " "), + entity_type=EntityType.COLUMN + if "column" in result["expectation_config"]["kwargs"] + else EntityType.TABLE, + test_platforms=[TestPlatform.GreatExpectations], + test_case_parameter_definition=self._get_test_case_params_definition( + result + ), + ) + + test_case_fqn = self._build_test_case_fqn( + table_entity.fullyQualifiedName.root, + result, + ) + + test_case = self.ometa_conn.get_or_create_test_case( + test_case_fqn, + entity_link=get_entity_link( + Table, + fqn=table_entity.fullyQualifiedName.root, + column_name=fqn.split_test_case_fqn(test_case_fqn).column, + ), + test_suite_fqn=test_suite.fullyQualifiedName.root, + test_definition_fqn=test_definition.fullyQualifiedName.root, + test_case_parameter_values=self._get_test_case_params_value(result), + ) + + self.ometa_conn.add_test_case_results( + test_results=TestCaseResult( + timestamp=Timestamp(int(datetime.now().timestamp() * 1000)), + testCaseStatus=TestCaseStatus.Success + if result["success"] + else TestCaseStatus.Failed, + testResultValue=self._get_test_result_value(result), + ), # type: ignore + test_case_fqn=test_case.fullyQualifiedName.root, + ) + + logger.debug( + f"Test case result for {test_case.fullyQualifiedName.root} successfully ingested" + ) + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(exc) diff --git a/ingestion/tests/integration/great_expectations/great_expectations/.gitignore b/ingestion/tests/integration/great_expectations/great_expectations/.gitignore deleted file mode 100644 index e36274ccd06..00000000000 --- a/ingestion/tests/integration/great_expectations/great_expectations/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -uncommitted/ -expectations/.ge_store_backend_id \ No newline at end of file diff --git a/ingestion/tests/integration/great_expectations/gx/.gitignore b/ingestion/tests/integration/great_expectations/gx/.gitignore new file mode 100644 index 00000000000..f34131a7443 --- /dev/null +++ b/ingestion/tests/integration/great_expectations/gx/.gitignore @@ -0,0 +1,2 @@ + +uncommitted/ \ No newline at end of file diff --git a/ingestion/tests/integration/great_expectations/great_expectations/checkpoints/sqlite.yml b/ingestion/tests/integration/great_expectations/gx/checkpoints/sqlite.yml similarity index 100% rename from ingestion/tests/integration/great_expectations/great_expectations/checkpoints/sqlite.yml rename to ingestion/tests/integration/great_expectations/gx/checkpoints/sqlite.yml diff --git a/ingestion/tests/integration/great_expectations/gx/expectations/.ge_store_backend_id b/ingestion/tests/integration/great_expectations/gx/expectations/.ge_store_backend_id new file mode 100644 index 00000000000..93be47dc2ba --- /dev/null +++ b/ingestion/tests/integration/great_expectations/gx/expectations/.ge_store_backend_id @@ -0,0 +1 @@ +store_backend_id = e3a2fbc4-3b99-4bfe-ab2e-c81eaeec3e2a diff --git a/ingestion/tests/integration/great_expectations/great_expectations/expectations/IntegrationTestExpectation.json b/ingestion/tests/integration/great_expectations/gx/expectations/IntegrationTestExpectation.json similarity index 100% rename from ingestion/tests/integration/great_expectations/great_expectations/expectations/IntegrationTestExpectation.json rename to ingestion/tests/integration/great_expectations/gx/expectations/IntegrationTestExpectation.json diff --git a/ingestion/tests/integration/great_expectations/great_expectations/great_expectations.yml b/ingestion/tests/integration/great_expectations/gx/great_expectations.yml similarity index 100% rename from ingestion/tests/integration/great_expectations/great_expectations/great_expectations.yml rename to ingestion/tests/integration/great_expectations/gx/great_expectations.yml diff --git a/ingestion/tests/integration/great_expectations/great_expectations/ometa_config/config.yaml b/ingestion/tests/integration/great_expectations/gx/ometa_config/config.yaml similarity index 100% rename from ingestion/tests/integration/great_expectations/great_expectations/ometa_config/config.yaml rename to ingestion/tests/integration/great_expectations/gx/ometa_config/config.yaml diff --git a/ingestion/tests/integration/great_expectations/great_expectations/plugins/custom_data_docs/styles/data_docs_custom_styles.css b/ingestion/tests/integration/great_expectations/gx/plugins/custom_data_docs/styles/data_docs_custom_styles.css similarity index 100% rename from ingestion/tests/integration/great_expectations/great_expectations/plugins/custom_data_docs/styles/data_docs_custom_styles.css rename to ingestion/tests/integration/great_expectations/gx/plugins/custom_data_docs/styles/data_docs_custom_styles.css diff --git a/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py b/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py index 6cb3c4b2015..e5e4db7e2ad 100644 --- a/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py +++ b/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py @@ -15,10 +15,11 @@ Validate great expectation integration import logging import os +import subprocess +import sys from datetime import datetime, timedelta from unittest import TestCase -from great_expectations import DataContext from sqlalchemy import Column, DateTime, Integer, String, create_engine from sqlalchemy.orm import declarative_base @@ -175,6 +176,17 @@ class TestGreatExpectationIntegration(TestCase): """ Test great expectation integration """ + self.install_gx_018x() + import great_expectations as gx + + try: + self.assertTrue(gx.__version__.startswith("0.18.")) + except AssertionError as exc: + # module versions are cached, so we need to skip the test if the version is not 0.18.x + # e.g. we run the 1.x.x test before this one, 0.18.x version will be cached and used here + # The test will run if we run this test alone without the 1.x.x test + self.skipTest(f"GX version is not 0.18.x: {exc}") + table_entity = self.metadata.get_by_name( entity=Table, fqn="test_sqlite.default.main.users", @@ -186,10 +198,9 @@ class TestGreatExpectationIntegration(TestCase): # GE config file ge_folder = os.path.join( os.path.dirname(os.path.abspath(__file__)), - "great_expectations", ) - ometa_config = os.path.join(ge_folder, "ometa_config") - context = DataContext(ge_folder) + ometa_config = os.path.join(ge_folder, "gx/ometa_config") + context = gx.get_context(project_root_dir=ge_folder) checkpoint = context.get_checkpoint("sqlite") # update our checkpoint file at runtime to dynamically pass the ometa config file checkpoint.action_list[-1].update( @@ -227,3 +238,9 @@ class TestGreatExpectationIntegration(TestCase): ) assert test_case_results + + def install_gx_018x(self): + """Install GX 0.18.x at runtime as we support 0.18.x and 1.x.x and setup will install 1 default version""" + subprocess.check_call( + [sys.executable, "-m", "pip", "install", "great-expectations~=0.18.0"] + ) diff --git a/ingestion/tests/integration/great_expectations/test_great_expectation_integration_1xx.py b/ingestion/tests/integration/great_expectations/test_great_expectation_integration_1xx.py new file mode 100644 index 00000000000..47bb170c4e2 --- /dev/null +++ b/ingestion/tests/integration/great_expectations/test_great_expectation_integration_1xx.py @@ -0,0 +1,282 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Validate great expectation integration +""" + +import logging +import os +import subprocess +import sys +from datetime import datetime, timedelta +from unittest import TestCase + +from sqlalchemy import Column, DateTime, Integer, String, create_engine +from sqlalchemy.orm import declarative_base + +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( + OpenMetadataJWTClientConfig, +) +from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.ingestion.connections.session import create_and_bind_session +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.time_utils import ( + get_beginning_of_day_timestamp_mill, + get_end_of_day_timestamp_mill, +) +from metadata.workflow.metadata import MetadataWorkflow + +Base = declarative_base() + +TEST_CASE_FQN = ( + "test_sqlite.default.main.users.name.expect_column_values_to_not_be_null" +) +SQLLITE_SHARD = "file:cachedb?mode=memory&cache=shared&check_same_thread=False" +LOGGER = logging.getLogger(__name__) + +WORKFLOW_CONFIG = { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } +} +INGESTION_CONFIG = { + "source": { + "type": "sqlite", + "serviceName": "test_sqlite", + "serviceConnection": { + "config": { + "type": "SQLite", + "databaseMode": SQLLITE_SHARD, + "database": "default", + } + }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + **WORKFLOW_CONFIG, + }, +} + + +class User(Base): + __tablename__ = "users" + id = Column(Integer, primary_key=True) + name = Column(String(256)) + fullname = Column(String(256)) + nickname = Column(String(256)) + age = Column(Integer) + signedup = Column(DateTime) + + +class TestGreatExpectationIntegration1xx(TestCase): + """Test great expectation integration""" + + skip_test = True + + engine = create_engine( + f"sqlite+pysqlite:///{SQLLITE_SHARD}", + ) + session = create_and_bind_session(engine) + server_config = OpenMetadataConnection( + hostPort=WORKFLOW_CONFIG["openMetadataServerConfig"]["hostPort"], + authProvider=WORKFLOW_CONFIG["openMetadataServerConfig"]["authProvider"], + securityConfig=OpenMetadataJWTClientConfig( + jwtToken=WORKFLOW_CONFIG["openMetadataServerConfig"]["securityConfig"][ + "jwtToken" + ] + ), + ) # type: ignore + metadata = OpenMetadata(server_config) + + @classmethod + def setUpClass(cls): + """Set up class by ingesting metadata""" + try: + User.__table__.create(bind=cls.engine) + except Exception as exc: + LOGGER.warning(f"Table Already exists: {exc}") + + data = [ + User( + name="John", + fullname="John Doe", + nickname="johnny b goode", + age=30, + signedup=datetime.now() - timedelta(days=10), + ), + User( + name="Jane", + fullname="Jone Doe", + nickname=None, + age=31, + signedup=datetime.now() - timedelta(days=2), + ), + User( + name="Joh", + fullname="Joh Doe", + nickname=None, + age=37, + signedup=datetime.now() - timedelta(days=1), + ), + User( + name="Jae", + fullname="Jae Doe", + nickname=None, + age=38, + signedup=datetime.now() - timedelta(days=1), + ), + ] + cls.session.add_all(data) + cls.session.commit() + + ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG) + ingestion_workflow.execute() + ingestion_workflow.raise_from_status() + ingestion_workflow.print_status() + ingestion_workflow.stop() + + @classmethod + def tearDownClass(cls) -> None: + """ + Clean up + """ + + service_id = str( + cls.metadata.get_by_name(entity=DatabaseService, fqn="test_sqlite").id.root + ) + + cls.metadata.delete( + entity=DatabaseService, + entity_id=service_id, + recursive=True, + hard_delete=True, + ) + + User.__table__.drop(bind=cls.engine) + cls.session.close() + + def test_great_expectation_integration(self): + """ + Test great expectation integration + """ + self.install_gx_1xx() + import great_expectations as gx + + try: + self.assertTrue(gx.__version__.startswith("1.")) + except AssertionError as exc: + # module versions are cached, so we need to skip the test if the version is not 1.x.x + # e.g. we run the 0.18.x test before this one, 0.18 version will be cached and used here + # The test will run if we run this test alone without the 0.18.x test + self.skipTest(f"GX version is not 1.x.x: {exc}") + + from metadata.great_expectations.action1xx import ( + OpenMetadataValidationAction1xx, + ) + + table_entity = self.metadata.get_by_name( + entity=Table, + fqn="test_sqlite.default.main.users", + fields=["testSuite"], + ) + + assert not table_entity.testSuite + + # GE config file + ge_folder = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + ) + ometa_config = os.path.join(ge_folder, "gx/ometa_config") + + context = gx.get_context() + conn_string = f"sqlite+pysqlite:///file:cachedb?mode=memory&cache=shared&check_same_thread=False" + data_source = context.data_sources.add_sqlite( + name="test_sqlite", + connection_string=conn_string, + ) + + data_asset = data_source.add_table_asset( + name="users", table_name="users", schema_name="main" + ) + batch_definition = data_asset.add_batch_definition_whole_table( + "batch definition" + ) + batch = batch_definition.get_batch() + suite = context.suites.add( + gx.core.expectation_suite.ExpectationSuite(name="name") + ) + suite.add_expectation( + gx.expectations.ExpectColumnValuesToNotBeNull(column="name") + ) + + validation_definition = context.validation_definitions.add( + gx.core.validation_definition.ValidationDefinition( + name="validation definition", + data=batch_definition, + suite=suite, + ) + ) + + action_list = [ + OpenMetadataValidationAction1xx( + database_service_name="test_sqlite", + database_name="default", + table_name="users", + schema_name="main", + config_file_path=ometa_config, + ) + ] + + checkpoint = context.checkpoints.add( + gx.checkpoint.checkpoint.Checkpoint( + name="checkpoint", + validation_definitions=[validation_definition], + actions=action_list, + ) + ) + checkpoint.run() + + table_entity = self.metadata.get_by_name( + entity=Table, + fqn="test_sqlite.default.main.users", + fields=["testSuite"], + ) + + assert table_entity.testSuite + test_suite: TestSuite = self.metadata.get_by_id( + entity=TestSuite, entity_id=table_entity.testSuite.id, fields=["tests"] + ) + assert len(test_suite.tests) == 1 + + test_case_results = self.metadata.get_test_case_results( + test_case_fqn=TEST_CASE_FQN, + start_ts=get_beginning_of_day_timestamp_mill(), + end_ts=get_end_of_day_timestamp_mill(), + ) + + assert test_case_results + + def install_gx_1xx(self): + """Install GX 1.x.x at runtime as we support 0.18.x and 1.x.x and setup will install 1 default version""" + subprocess.check_call( + [sys.executable, "-m", "pip", "install", "great-expectations~=1.0"] + ) diff --git a/ingestion/tests/unit/great_expectations/conftest.py b/ingestion/tests/unit/great_expectations/conftest.py index 320da2f3a75..2d395f6d6fe 100644 --- a/ingestion/tests/unit/great_expectations/conftest.py +++ b/ingestion/tests/unit/great_expectations/conftest.py @@ -17,7 +17,6 @@ from unittest import mock from pytest import fixture -from metadata.great_expectations.action import OpenMetadataValidationAction from metadata.great_expectations.utils.ometa_config_handler import ( create_jinja_environment, ) @@ -51,6 +50,8 @@ def mocked_ometa_object(): @fixture(scope="module") def mocked_ometa(): """Mocks OMeta obkect""" + from metadata.great_expectations.action import OpenMetadataValidationAction + with mock.patch.object( OpenMetadataValidationAction, "_create_ometa_connection", diff --git a/ingestion/tests/unit/great_expectations/test_ometa_validation_action.py b/ingestion/tests/unit/great_expectations/test_ometa_validation_action.py index 66d9b0b62d0..ad80d9235a7 100644 --- a/ingestion/tests/unit/great_expectations/test_ometa_validation_action.py +++ b/ingestion/tests/unit/great_expectations/test_ometa_validation_action.py @@ -13,15 +13,29 @@ Test suite for the action module implementation """ import os +import subprocess +import sys from unittest import mock +import great_expectations as gx from jinja2 import Environment from pytest import mark -from metadata.great_expectations.action import OpenMetadataValidationAction from metadata.great_expectations.utils.ometa_config_handler import render_template +def install_gx_018x(): + """Install GX 0.18.x at runtime as we support 0.18.x and 1.x.x and setup will install 1 default version""" + + if not gx.__version__.startswith("0.18."): + subprocess.check_call( + [sys.executable, "-m", "pip", "install", "great-expectations~=0.18.0"] + ) + + +install_gx_018x() + + @mark.parametrize( "input,expected", [ @@ -31,6 +45,8 @@ from metadata.great_expectations.utils.ometa_config_handler import render_templa ) def test_get_table_entity(input, expected, mocked_ometa, mocked_ge_data_context): """Test get table entity""" + from metadata.great_expectations.action import OpenMetadataValidationAction + ometa_validation = OpenMetadataValidationAction( data_context=mocked_ge_data_context, config_file_path="my/config/path", @@ -52,6 +68,8 @@ def test_get_table_entity_database_service_name( input, expected, mocked_ometa, mocked_ge_data_context ): """Test get table entity""" + from metadata.great_expectations.action import OpenMetadataValidationAction + ometa_validation = OpenMetadataValidationAction( data_context=mocked_ge_data_context, config_file_path="my/config/path", diff --git a/openmetadata-docs/content/v1.6.x/connectors/ingestion/great-expectations.md b/openmetadata-docs/content/v1.6.x/connectors/ingestion/great-expectations.md index 0188553ad12..10a20d5dc78 100644 --- a/openmetadata-docs/content/v1.6.x/connectors/ingestion/great-expectations.md +++ b/openmetadata-docs/content/v1.6.x/connectors/ingestion/great-expectations.md @@ -116,6 +116,73 @@ src="/images/v1.6/features/integrations/ge-run-checkpoint.gif" alt="Run Great Expectations checkpoint" /%} +### Running GX using the Python SDK? +If you are running GX using their Python SDK below is a full example of how to add the action to your code + +```python + import great_expectations as gx + +from great_expectations.checkpoint import Checkpoint + +context = gx.get_context() +conn_string = f"postgresql+psycopg2://user:pw@host:port/db" + +data_source = context.sources.add_postgres( + name="source_name", + connection_string=conn_string, +) + +data_asset = data_source.add_table_asset( + name="name", + table_name="table_name", + schema_name="schema_name", +) + +batch_request = data_source.get_asset("name").build_batch_request() + +expectation_suite_name = "expectation_suite_name" +context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name) +validator = context.get_validator( + batch_request=batch_request, + expectation_suite_name=expectation_suite_name, +) + +validator.expect_column_values_to_not_be_null(column="column_name") + +validator.expect_column_values_to_be_between( + column="column_name", min_value=min_value, max_value=max_value +) + +validator.save_expectation_suite(discard_failed_expectations=False) + +my_checkpoint_name = "my_sql_checkpoint" + +checkpoint = Checkpoint( + name=my_checkpoint_name, + run_name_template="%Y%m%d-%H%M%S-my-run-name-template", + data_context=context, + batch_request=batch_request, + expectation_suite_name=expectation_suite_name, + action_list=[ + { + "name": "store results in OpenMetadata", + "action": { + "module_name": "metadata.great_expectations.action", + "class_name": "OpenMetadataValidationAction", + "config_file_path": "path/to/config/file", + "database_service_name": "openmetadata_service_name", + "database_name": "database_name", + "table_name": "table_name", + "schema_name": "schema_name", + } + }, + ], +) + +context.add_or_update_checkpoint(checkpoint=checkpoint) +checkpoint_result = checkpoint.run() +``` + ### List of Great Expectations Supported Test We currently only support a certain number of Great Expectations tests. The full list can be found in the [Tests](/how-to-guides/data-quality-observability/quality/tests-yaml) section. diff --git a/openmetadata-docs/content/v1.7.x/connectors/ingestion/great-expectations.md b/openmetadata-docs/content/v1.7.x/connectors/ingestion/great-expectations.md index 28cd5627eb7..0979a03f3f8 100644 --- a/openmetadata-docs/content/v1.7.x/connectors/ingestion/great-expectations.md +++ b/openmetadata-docs/content/v1.7.x/connectors/ingestion/great-expectations.md @@ -116,6 +116,138 @@ src="/images/v1.7/features/integrations/ge-run-checkpoint.gif" alt="Run Great Expectations checkpoint" /%} +### Running GX using the Python SDK? +If you are running GX using their Python SDK below is a full example of how to add the action to your code + +```python + import great_expectations as gx + +from great_expectations.checkpoint import Checkpoint + +context = gx.get_context() +conn_string = f"postgresql+psycopg2://user:pw@host:port/db" + +data_source = context.sources.add_postgres( + name="source_name", + connection_string=conn_string, +) + +data_asset = data_source.add_table_asset( + name="name", + table_name="table_name", + schema_name="schema_name", +) + +batch_request = data_source.get_asset("name").build_batch_request() + +expectation_suite_name = "expectation_suite_name" +context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name) +validator = context.get_validator( + batch_request=batch_request, + expectation_suite_name=expectation_suite_name, +) + +validator.expect_column_values_to_not_be_null(column="column_name") + +validator.expect_column_values_to_be_between( + column="column_name", min_value=min_value, max_value=max_value +) + +validator.save_expectation_suite(discard_failed_expectations=False) + +my_checkpoint_name = "my_sql_checkpoint" + +checkpoint = Checkpoint( + name=my_checkpoint_name, + run_name_template="%Y%m%d-%H%M%S-my-run-name-template", + data_context=context, + batch_request=batch_request, + expectation_suite_name=expectation_suite_name, + action_list=[ + { + "name": "store results in OpenMetadata", + "action": { + "module_name": "metadata.great_expectations.action", + "class_name": "OpenMetadataValidationAction", + "config_file_path": "path/to/config/file", + "database_service_name": "openmetadata_service_name", + "database_name": "database_name", + "table_name": "table_name", + "schema_name": "schema_name", + } + }, + ], +) + +context.add_or_update_checkpoint(checkpoint=checkpoint) +checkpoint_result = checkpoint.run() +``` + +### Working with GX 1.x.x? +In v1.x.x GX introduced significant changes to their SDK. One notable change was the removal of the `great_expectations` CLI. OpenMetadata introduced support for 1.x.x version through its `OpenMetadataValidationAction1xx` class. You will need to first `pip install 'open-metadata[great-expectations-1xx]'. Below is a complete example + +```python +import great_expectations as gx + +from metadata.great_expectations.action1xx import OpenMetadataValidationAction1xx # OpenMetadata Validation Action for GX 1.x.x + + +context = gx.get_context() +conn_string = f"redshift+psycopg2://user:pw@host:port/db" + +data_source = context.data_sources.add_redshift( + name="name", + connection_string=conn_string, +) + +data_asset = data_source.add_table_asset( + name="name", + table_name="table_name", + schema_name="schema_name", +) + +batch_definition = data_asset.add_batch_definition_whole_table("batch definition") +batch = batch_definition.get_batch() + +suite = context.suites.add( + gx.core.expectation_suite.ExpectationSuite(name="name") +) +suite.add_expectation( + gx.expectations.ExpectColumnValuesToNotBeNull( + column="column_name" + ) +) +suite.add_expectation( + gx.expectations.ExpectColumnValuesToBeBetween(column="column_name", min_value=50, max_value=1000000) +) + +validation_definition = context.validation_definitions.add( + gx.core.validation_definition.ValidationDefinition( + name="validation definition", + data=batch_definition, + suite=suite, + ) +) + +action_list = [ + OpenMetadataValidationAction1xx( + database_service_name="openmetadata_service_name", + database_name="openmetadata_database_name", + table_name="openmetadata_table_name", + schema_name="openmetadata_schema_name", + config_file_path="path/to/config/file", + ) +] + +checkpoint = context.checkpoints.add( + gx.checkpoint.checkpoint.Checkpoint( + name="checkpoint", validation_definitions=[validation_definition], actions=action_list + ) +) + +checkpoint_result = checkpoint.run() +``` + ### List of Great Expectations Supported Test We currently only support a certain number of Great Expectations tests. The full list can be found in the [Tests](/how-to-guides/data-quality-observability/quality/tests-yaml) section. diff --git a/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/ingestion/great-expectations.md b/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/ingestion/great-expectations.md index 44299447b66..7b43e74a9b3 100644 --- a/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/ingestion/great-expectations.md +++ b/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/ingestion/great-expectations.md @@ -116,6 +116,139 @@ src="/images/v1.8/features/integrations/ge-run-checkpoint.gif" alt="Run Great Expectations checkpoint" /%} +### Running GX using the Python SDK? +If you are running GX using their Python SDK below is a full example of how to add the action to your code + +```python + import great_expectations as gx + +from great_expectations.checkpoint import Checkpoint + +context = gx.get_context() +conn_string = f"postgresql+psycopg2://user:pw@host:port/db" + +data_source = context.sources.add_postgres( + name="source_name", + connection_string=conn_string, +) + +data_asset = data_source.add_table_asset( + name="name", + table_name="table_name", + schema_name="schema_name", +) + +batch_request = data_source.get_asset("name").build_batch_request() + +expectation_suite_name = "expectation_suite_name" +context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name) +validator = context.get_validator( + batch_request=batch_request, + expectation_suite_name=expectation_suite_name, +) + +validator.expect_column_values_to_not_be_null(column="column_name") + +validator.expect_column_values_to_be_between( + column="column_name", min_value=min_value, max_value=max_value +) + +validator.save_expectation_suite(discard_failed_expectations=False) + +my_checkpoint_name = "my_sql_checkpoint" + +checkpoint = Checkpoint( + name=my_checkpoint_name, + run_name_template="%Y%m%d-%H%M%S-my-run-name-template", + data_context=context, + batch_request=batch_request, + expectation_suite_name=expectation_suite_name, + action_list=[ + { + "name": "store results in OpenMetadata", + "action": { + "module_name": "metadata.great_expectations.action", + "class_name": "OpenMetadataValidationAction", + "config_file_path": "path/to/config/file", + "database_service_name": "openmetadata_service_name", + "database_name": "database_name", + "table_name": "table_name", + "schema_name": "schema_name", + } + }, + ], +) + +context.add_or_update_checkpoint(checkpoint=checkpoint) +checkpoint_result = checkpoint.run() +``` + +### Working with GX 1.x.x? +In v1.x.x GX introduced significant changes to their SDK. One notable change was the removal of the `great_expectations` CLI. OpenMetadata introduced support for 1.x.x version through its `OpenMetadataValidationAction1xx` class. You will need to first `pip install 'open-metadata[great-expectations-1xx]'. Below is a complete example + +```python +import great_expectations as gx + +from metadata.great_expectations.action1xx import OpenMetadataValidationAction1xx # OpenMetadata Validation Action for GX 1.x.x + + +context = gx.get_context() +conn_string = f"redshift+psycopg2://user:pw@host:port/db" + +data_source = context.data_sources.add_redshift( + name="name", + connection_string=conn_string, +) + +data_asset = data_source.add_table_asset( + name="name", + table_name="table_name", + schema_name="schema_name", +) + +batch_definition = data_asset.add_batch_definition_whole_table("batch definition") +batch = batch_definition.get_batch() + +suite = context.suites.add( + gx.core.expectation_suite.ExpectationSuite(name="name") +) +suite.add_expectation( + gx.expectations.ExpectColumnValuesToNotBeNull( + column="column_name" + ) +) +suite.add_expectation( + gx.expectations.ExpectColumnValuesToBeBetween(column="column_name", min_value=50, max_value=1000000) +) + +validation_definition = context.validation_definitions.add( + gx.core.validation_definition.ValidationDefinition( + name="validation definition", + data=batch_definition, + suite=suite, + ) +) + +action_list = [ + OpenMetadataValidationAction1xx( + database_service_name="openmetadata_service_name", + database_name="openmetadata_database_name", + table_name="openmetadata_table_name", + schema_name="openmetadata_schema_name", + config_file_path="path/to/config/file", + ) +] + +checkpoint = context.checkpoints.add( + gx.checkpoint.checkpoint.Checkpoint( + name="checkpoint", validation_definitions=[validation_definition], actions=action_list + ) +) + +checkpoint_result = checkpoint.run() +``` + + ### List of Great Expectations Supported Test We currently only support a certain number of Great Expectations tests. The full list can be found in the [Tests](/how-to-guides/data-quality-observability/quality/tests-yaml) section.