diff --git a/ingestion/src/metadata/data_quality/api/models.py b/ingestion/src/metadata/data_quality/api/models.py index eae7544f8c1..e70def6e9ed 100644 --- a/ingestion/src/metadata/data_quality/api/models.py +++ b/ingestion/src/metadata/data_quality/api/models.py @@ -56,7 +56,7 @@ class TableAndTests(BaseModel): table: Table = Field(None, description="Table being processed by the DQ workflow") service_type: str = Field(..., description="Service type the table belongs to") - test_cases: Optional[List[TestCase]] = Field( + test_cases: List[TestCase] = Field( None, description="Test Cases already existing in the Test Suite, if any" ) executable_test_suite: Optional[CreateTestSuiteRequest] = Field( diff --git a/ingestion/src/metadata/data_quality/processor/test_case_runner.py b/ingestion/src/metadata/data_quality/processor/test_case_runner.py index e87b74cca97..73a145d415d 100644 --- a/ingestion/src/metadata/data_quality/processor/test_case_runner.py +++ b/ingestion/src/metadata/data_quality/processor/test_case_runner.py @@ -14,7 +14,7 @@ This Processor is in charge of executing the test cases """ import traceback from copy import deepcopy -from typing import List, Optional, cast +from typing import List, Optional from metadata.data_quality.api.models import ( TableAndTests, @@ -90,15 +90,6 @@ class TestCaseRunner(Processor): ), table_fqn=record.table.fullyQualifiedName.root, ) - - if not test_cases: - return Either( - left=StackTraceError( - name="No test Cases", - error=f"No tests cases found for table {record.table.fullyQualifiedName.root}", - ) - ) - openmetadata_test_cases = self.filter_for_om_test_cases(test_cases) openmetadata_test_cases = self.filter_incompatible_test_cases( record.table, openmetadata_test_cases @@ -111,6 +102,12 @@ class TestCaseRunner(Processor): record.table, ).get_data_quality_runner() + logger.debug( + f"Found {len(openmetadata_test_cases)} test cases for table {record.table.fullyQualifiedName.root}" + ) + if len(openmetadata_test_cases) == 0: + logger.warning("No test cases found for the table") + test_results = [ test_case_result for test_case in openmetadata_test_cases @@ -120,17 +117,14 @@ class TestCaseRunner(Processor): return Either(right=TestCaseResults(test_results=test_results)) def get_test_cases( - self, test_cases: Optional[List[TestCase]], test_suite_fqn: str, table_fqn: str + self, test_cases: List[TestCase], test_suite_fqn: str, table_fqn: str ) -> List[TestCase]: """ Based on the test suite test cases that we already know, pick up the rest from the YAML config, compare and create the new ones """ if self.processor_config.testCases is not None: - cli_test_cases = self.get_test_case_from_cli_config() # type: ignore - cli_test_cases = cast( - List[TestCaseDefinition], cli_test_cases - ) # satisfy type checker + cli_test_cases = self.get_test_case_from_cli_config() return self.compare_and_create_test_cases( cli_test_cases_definitions=cli_test_cases, test_cases=test_cases, @@ -142,15 +136,13 @@ class TestCaseRunner(Processor): def get_test_case_from_cli_config( self, - ) -> Optional[List[TestCaseDefinition]]: + ) -> List[TestCaseDefinition]: """Get all the test cases names defined in the CLI config file""" - if self.processor_config.testCases is not None: - return list(self.processor_config.testCases) - return None + return list(self.processor_config.testCases or []) def compare_and_create_test_cases( self, - cli_test_cases_definitions: Optional[List[TestCaseDefinition]], + cli_test_cases_definitions: List[TestCaseDefinition], test_cases: List[TestCase], table_fqn: str, test_suite_fqn: str, diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py index 2790d83fb48..272a592eb15 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite.py +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -80,7 +80,7 @@ class TestSuiteSource(Source): def _get_test_cases_from_test_suite( self, test_suite: Optional[TestSuite] - ) -> Optional[List[TestCase]]: + ) -> List[TestCase]: """Return test cases if the test suite exists and has them""" if test_suite: test_cases = self.metadata.list_all_entities( @@ -94,8 +94,7 @@ class TestSuiteSource(Source): t for t in test_cases if t.name in self.source_config.testCases ] return test_cases - - return None + return [] def prepare(self): """Nothing to prepare""" diff --git a/ingestion/tests/integration/conftest.py b/ingestion/tests/integration/conftest.py index 308989383a2..3407ae89a3a 100644 --- a/ingestion/tests/integration/conftest.py +++ b/ingestion/tests/integration/conftest.py @@ -1,9 +1,11 @@ import logging import sys +from typing import List, Tuple, Type import pytest from _openmetadata_testutils.ometa import int_admin_ometa +from ingestion.src.metadata.ingestion.api.common import Entity from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.workflow import LogLevels from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -171,3 +173,17 @@ def patch_passwords_for_db_services(db_service, unmask_password, monkeypatch): "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id", override_password(OpenMetadata.get_by_id), ) + + +@pytest.fixture +def cleanup_fqns(metadata): + fqns: List[Tuple[Type[Entity], str]] = [] + + def inner(entity_type: Type[Entity], fqn: str): + fqns.append((entity_type, fqn)) + + yield inner + for etype, fqn in fqns: + entity = metadata.get_by_name(etype, fqn, fields=["*"]) + if entity: + metadata.delete(etype, entity.id, recursive=True, hard_delete=True) diff --git a/ingestion/tests/integration/data_quality/conftest.py b/ingestion/tests/integration/data_quality/conftest.py index 7f04d86289e..726488fdb36 100644 --- a/ingestion/tests/integration/data_quality/conftest.py +++ b/ingestion/tests/integration/data_quality/conftest.py @@ -2,7 +2,20 @@ import pytest from testcontainers.mysql import MySqlContainer from _openmetadata_testutils.postgres.conftest import postgres_container, try_bind -from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) +from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( + BasicAuth, +) +from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( + PostgresConnection, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) from metadata.generated.schema.metadataIngestion.workflow import LogLevels from metadata.ingestion.models.custom_pydantic import CustomSecretStr from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -63,3 +76,54 @@ def ingest_mysql_service( ) yield db_service metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True) + + +@pytest.fixture(scope="module") +def create_service_request(tmp_path_factory, postgres_container): + return CreateDatabaseServiceRequest( + name="docker_test_" + tmp_path_factory.mktemp("postgres").name, + serviceType=DatabaseServiceType.Postgres, + connection=DatabaseConnection( + config=PostgresConnection( + username=postgres_container.username, + authType=BasicAuth(password=postgres_container.password), + hostPort="localhost:" + + postgres_container.get_exposed_port(postgres_container.port), + database="dvdrental", + ) + ), + ) + + +@pytest.fixture(scope="module") +def postgres_service(db_service): + return db_service + + +@pytest.fixture() +def ingest_postgres_metadata( + postgres_service, metadata: OpenMetadata, sink_config, workflow_config, run_workflow +): + workflow_config = { + "source": { + "type": postgres_service.connection.config.type.value.lower(), + "serviceName": postgres_service.fullyQualifiedName.root, + "serviceConnection": postgres_service.connection, + "sourceConfig": {"config": {}}, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + run_workflow(MetadataWorkflow, workflow_config) + + +@pytest.fixture(scope="module") +def patch_password(postgres_container): + def inner(service: DatabaseService): + service.connection.config = cast(PostgresConnection, service.connection.config) + service.connection.config.authType.password = type( + service.connection.config.authType.password + )(postgres_container.password) + return service + + return inner diff --git a/ingestion/tests/integration/data_quality/test_data_diff.py b/ingestion/tests/integration/data_quality/test_data_diff.py index 1e2a5f5a8ac..0b29a8054b1 100644 --- a/ingestion/tests/integration/data_quality/test_data_diff.py +++ b/ingestion/tests/integration/data_quality/test_data_diff.py @@ -14,33 +14,10 @@ from sqlalchemy.sql import sqltypes from _openmetadata_testutils.postgres.conftest import postgres_container from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects from metadata.data_quality.api.models import TestCaseDefinition -from metadata.generated.schema.api.services.createDatabaseService import ( - CreateDatabaseServiceRequest, -) from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( - BasicAuth, -) -from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( - PostgresConnection, -) -from metadata.generated.schema.entity.services.databaseService import ( - DatabaseConnection, - DatabaseService, - DatabaseServiceType, -) +from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( TestSuiteConfigType, - TestSuitePipeline, -) -from metadata.generated.schema.metadataIngestion.workflow import ( - LogLevels, - OpenMetadataWorkflowConfig, - Processor, - Sink, - Source, - SourceConfig, - WorkflowConfig, ) from metadata.generated.schema.tests.basic import ( TestCaseResult, @@ -48,10 +25,8 @@ from metadata.generated.schema.tests.basic import ( TestResultValue, ) from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue -from metadata.ingestion.models.custom_pydantic import CustomSecretStr from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.workflow.data_quality import TestSuiteWorkflow -from metadata.workflow.metadata import MetadataWorkflow if not sys.version_info >= (3, 9): pytest.skip( @@ -258,6 +233,11 @@ def test_happy_paths( ingest_mysql_service, patched_metadata, parameters: TestParameters, + sink_config, + profiler_config, + run_workflow, + workflow_config, + cleanup_fqns, ): metadata = patched_metadata table1 = metadata.get_by_name( @@ -265,6 +245,10 @@ def test_happy_paths( f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer", nullable=False, ) + cleanup_fqns( + TestCase, + f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}", + ) table2_service = { "POSTGRES_SERVICE": postgres_service, "MYSQL_SERVICE": ingest_mysql_service, @@ -281,41 +265,30 @@ def test_happy_paths( ), ] ) - - workflow_config = OpenMetadataWorkflowConfig( - source=Source( - type=TestSuiteConfigType.TestSuite.value, - serviceName="MyTestSuite", - sourceConfig=SourceConfig( - config=TestSuitePipeline( - type=TestSuiteConfigType.TestSuite, - entityFullyQualifiedName=f"{table1.fullyQualifiedName.root}", - ) - ), - ), - processor=Processor( - type="orm-test-runner", - config={"testCases": [parameters.test_case_defintion]}, - ), - sink=Sink( - type="metadata-rest", - config={}, - ), - workflowConfig=WorkflowConfig( - loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config - ), + workflow_config = { + "source": { + "type": TestSuiteConfigType.TestSuite.value, + "serviceName": "MyTestSuite", + "sourceConfig": { + "config": { + "type": TestSuiteConfigType.TestSuite.value, + "entityFullyQualifiedName": table1.fullyQualifiedName.root, + } + }, + }, + "processor": { + "type": "orm-test-runner", + "config": {"testCases": [parameters.test_case_defintion.dict()]}, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + run_workflow(TestSuiteWorkflow, workflow_config) + test_case_entity = metadata.get_by_name( + TestCase, + f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}", + fields=["*"], ) - - test_suite_procesor = TestSuiteWorkflow.create(workflow_config) - test_suite_procesor.execute() - test_suite_procesor.stop() - test_case_entity: TestCase = metadata.get_or_create_test_case( - f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}" - ) - try: - test_suite_procesor.raise_from_status() - finally: - metadata.delete(TestCase, test_case_entity.id, recursive=True, hard_delete=True) assert "ERROR: Unexpected error" not in test_case_entity.testCaseResult.result assert_equal_pydantic_objects(parameters.expected, test_case_entity.testCaseResult) @@ -393,6 +366,10 @@ def test_error_paths( ingest_mysql_service: DatabaseService, postgres_service: DatabaseService, patched_metadata: OpenMetadata, + sink_config, + workflow_config, + run_workflow, + cleanup_fqns, ): metadata = patched_metadata table1 = metadata.get_by_name( @@ -400,94 +377,37 @@ def test_error_paths( f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer", nullable=False, ) + cleanup_fqns(TestCase, f"{table1.fullyQualifiedName.root}.{parameters.name}") for parameter in parameters.parameterValues: if parameter.name == "table2": parameter.value = parameter.value.replace( "POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root ) - workflow_config = OpenMetadataWorkflowConfig( - source=Source( - type=TestSuiteConfigType.TestSuite.value, - serviceName="MyTestSuite", - sourceConfig=SourceConfig( - config=TestSuitePipeline( - type=TestSuiteConfigType.TestSuite, - entityFullyQualifiedName=f"{table1.fullyQualifiedName.root}", - ) - ), - serviceConnection=postgres_service.connection, - ), - processor=Processor( - type="orm-test-runner", - config={"testCases": [parameters]}, - ), - sink=Sink( - type="metadata-rest", - config={}, - ), - workflowConfig=WorkflowConfig( - loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config - ), - ) - test_suite_procesor = TestSuiteWorkflow.create(workflow_config) - test_suite_procesor.execute() - test_suite_procesor.stop() + workflow_config = { + "source": { + "type": TestSuiteConfigType.TestSuite.value, + "serviceName": "MyTestSuite", + "sourceConfig": { + "config": { + "type": TestSuiteConfigType.TestSuite.value, + "entityFullyQualifiedName": table1.fullyQualifiedName.root, + } + }, + }, + "processor": { + "type": "orm-test-runner", + "config": {"testCases": [parameters.dict()]}, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + run_workflow(TestSuiteWorkflow, workflow_config) test_case_entity: TestCase = metadata.get_or_create_test_case( f"{table1.fullyQualifiedName.root}.{parameters.name}" ) - try: - test_suite_procesor.raise_from_status() - finally: - metadata.delete(TestCase, test_case_entity.id, recursive=True, hard_delete=True) assert_equal_pydantic_objects(expected, test_case_entity.testCaseResult) -@pytest.fixture(scope="module") -def postgres_service(metadata, postgres_container): - service = CreateDatabaseServiceRequest( - name="docker_test_postgres_db", - serviceType=DatabaseServiceType.Postgres, - connection=DatabaseConnection( - config=PostgresConnection( - username=postgres_container.username, - authType=BasicAuth(password=postgres_container.password), - hostPort="localhost:" - + postgres_container.get_exposed_port(postgres_container.port), - database="dvdrental", - ) - ), - ) - service_entity = metadata.create_or_update(data=service) - service_entity.connection.config.authType.password = CustomSecretStr( - postgres_container.password - ) - yield service_entity - metadata.delete( - DatabaseService, service_entity.id, recursive=True, hard_delete=True - ) - - -@pytest.fixture(scope="module") -def ingest_postgres_metadata(postgres_service, metadata: OpenMetadata): - workflow_config = OpenMetadataWorkflowConfig( - source=Source( - type=postgres_service.connection.config.type.value.lower(), - serviceName=postgres_service.fullyQualifiedName.root, - serviceConnection=postgres_service.connection, - sourceConfig=SourceConfig(config={}), - ), - sink=Sink( - type="metadata-rest", - config={}, - ), - workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config), - ) - metadata_ingestion = MetadataWorkflow.create(workflow_config) - metadata_ingestion.execute() - metadata_ingestion.raise_from_status() - return - - def add_changed_tables(connection: Connection): connection.execute("CREATE TABLE customer_200 AS SELECT * FROM customer LIMIT 200;") connection.execute("CREATE TABLE changed_customer AS SELECT * FROM customer;") @@ -573,46 +493,28 @@ def copy_table(source_engine, destination_engine, table_name): @pytest.fixture def patched_metadata(metadata, postgres_service, ingest_mysql_service, monkeypatch): - openmetadata_get_by_name = OpenMetadata.get_by_name + dbs_by_name = { + service.fullyQualifiedName.root: service + for service in [postgres_service, ingest_mysql_service] + } - def get_by_name_override_service_password(self, entity, fqn, *args, **kwargs): - result = openmetadata_get_by_name(self, entity, fqn, *args, **kwargs) - if entity == DatabaseService: - return next( - ( - service - for service in [postgres_service, ingest_mysql_service] - if service.fullyQualifiedName.root == fqn - ), - result, - ) + def override_result_by_fqn(func): + def inner(*args, **kwargs): + result = func(*args, **kwargs) + if result.fullyQualifiedName.root in dbs_by_name: + return dbs_by_name[result.fullyQualifiedName.root] + return result - return result + return inner monkeypatch.setattr( "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name", - get_by_name_override_service_password, + override_result_by_fqn(OpenMetadata.get_by_name), ) - openmetadata_get_by_id = OpenMetadata.get_by_id - - def get_by_id_override_service_password(self, entity, entity_id, *args, **kwargs): - result = openmetadata_get_by_id(self, entity, entity_id, *args, **kwargs) - if entity == DatabaseService: - return next( - ( - service - for service in [postgres_service, ingest_mysql_service] - if service.id == entity_id - ), - result, - ) - - return result - monkeypatch.setattr( "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id", - get_by_id_override_service_password, + override_result_by_fqn(OpenMetadata.get_by_id), ) return metadata diff --git a/ingestion/tests/integration/data_quality/test_data_quality.py b/ingestion/tests/integration/data_quality/test_data_quality.py new file mode 100644 index 00000000000..d6cb28b8f88 --- /dev/null +++ b/ingestion/tests/integration/data_quality/test_data_quality.py @@ -0,0 +1,52 @@ +import sys + +import pytest + +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( + TestSuiteConfigType, +) +from metadata.workflow.data_quality import TestSuiteWorkflow + +if not sys.version_info >= (3, 9): + pytest.skip( + "requires python 3.9+ due to incompatibility with testcontainers", + allow_module_level=True, + ) + + +def test_empty_test_suite( + postgres_service: DatabaseService, + run_workflow, + ingest_postgres_metadata, + patch_passwords_for_db_services, + metadata, + sink_config, + workflow_config, + cleanup_fqns, +): + table = metadata.get_by_name( + Table, + f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer", + nullable=False, + ) + workflow_config = { + "source": { + "type": TestSuiteConfigType.TestSuite.value, + "serviceName": "MyTestSuite", + "sourceConfig": { + "config": { + "type": TestSuiteConfigType.TestSuite.value, + "entityFullyQualifiedName": table.fullyQualifiedName.root, + } + }, + }, + "processor": { + "type": "orm-test-runner", + "config": {"testCases": []}, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + run_workflow(TestSuiteWorkflow, workflow_config) diff --git a/ingestion/tests/unit/data_quality/source/test_test_suite.py b/ingestion/tests/unit/data_quality/source/test_test_suite.py index e02e5981321..d909026a4f3 100644 --- a/ingestion/tests/unit/data_quality/source/test_test_suite.py +++ b/ingestion/tests/unit/data_quality/source/test_test_suite.py @@ -38,6 +38,14 @@ MOCK_ENTITY_REFERENCE = EntityReference( }, ["test_case1"], ), + ( + { + "type": "TestSuite", + "entityFullyQualifiedName": "MyTestSuite", + "testCases": [], + }, + [], + ), ], ) def test_source_config(parameters, expected, monkeypatch):