MINOR[GEN-978]: Fix empty test suites (#16975)

* tests: refactor

refactor tests and consolidate common functionality in integrations.conftest

this enables writing tests more concisely.
demonstrated with postgres and mssql.
will migrate more

* format

* removed helpers

* changed scope of fictures

* changed scope of fixtures

* added profiler test for mssql

* fixed import in data_quality test

* json safe serialization

* format

* set MARS_Connection

* fix(data-quality): empty test suite

do not raise for empty test suite

* format

* dont need to check length in _get_test_cases_from_test_suite

* fix

* added warning if no test cases are found
This commit is contained in:
Imri Paran 2024-07-19 12:12:34 +02:00 committed by GitHub
parent 4c657de5d8
commit d59b83f9d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 226 additions and 193 deletions

View File

@ -56,7 +56,7 @@ class TableAndTests(BaseModel):
table: Table = Field(None, description="Table being processed by the DQ workflow") table: Table = Field(None, description="Table being processed by the DQ workflow")
service_type: str = Field(..., description="Service type the table belongs to") service_type: str = Field(..., description="Service type the table belongs to")
test_cases: Optional[List[TestCase]] = Field( test_cases: List[TestCase] = Field(
None, description="Test Cases already existing in the Test Suite, if any" None, description="Test Cases already existing in the Test Suite, if any"
) )
executable_test_suite: Optional[CreateTestSuiteRequest] = Field( executable_test_suite: Optional[CreateTestSuiteRequest] = Field(

View File

@ -14,7 +14,7 @@ This Processor is in charge of executing the test cases
""" """
import traceback import traceback
from copy import deepcopy from copy import deepcopy
from typing import List, Optional, cast from typing import List, Optional
from metadata.data_quality.api.models import ( from metadata.data_quality.api.models import (
TableAndTests, TableAndTests,
@ -90,15 +90,6 @@ class TestCaseRunner(Processor):
), ),
table_fqn=record.table.fullyQualifiedName.root, table_fqn=record.table.fullyQualifiedName.root,
) )
if not test_cases:
return Either(
left=StackTraceError(
name="No test Cases",
error=f"No tests cases found for table {record.table.fullyQualifiedName.root}",
)
)
openmetadata_test_cases = self.filter_for_om_test_cases(test_cases) openmetadata_test_cases = self.filter_for_om_test_cases(test_cases)
openmetadata_test_cases = self.filter_incompatible_test_cases( openmetadata_test_cases = self.filter_incompatible_test_cases(
record.table, openmetadata_test_cases record.table, openmetadata_test_cases
@ -111,6 +102,12 @@ class TestCaseRunner(Processor):
record.table, record.table,
).get_data_quality_runner() ).get_data_quality_runner()
logger.debug(
f"Found {len(openmetadata_test_cases)} test cases for table {record.table.fullyQualifiedName.root}"
)
if len(openmetadata_test_cases) == 0:
logger.warning("No test cases found for the table")
test_results = [ test_results = [
test_case_result test_case_result
for test_case in openmetadata_test_cases for test_case in openmetadata_test_cases
@ -120,17 +117,14 @@ class TestCaseRunner(Processor):
return Either(right=TestCaseResults(test_results=test_results)) return Either(right=TestCaseResults(test_results=test_results))
def get_test_cases( def get_test_cases(
self, test_cases: Optional[List[TestCase]], test_suite_fqn: str, table_fqn: str self, test_cases: List[TestCase], test_suite_fqn: str, table_fqn: str
) -> List[TestCase]: ) -> List[TestCase]:
""" """
Based on the test suite test cases that we already know, pick up Based on the test suite test cases that we already know, pick up
the rest from the YAML config, compare and create the new ones the rest from the YAML config, compare and create the new ones
""" """
if self.processor_config.testCases is not None: if self.processor_config.testCases is not None:
cli_test_cases = self.get_test_case_from_cli_config() # type: ignore cli_test_cases = self.get_test_case_from_cli_config()
cli_test_cases = cast(
List[TestCaseDefinition], cli_test_cases
) # satisfy type checker
return self.compare_and_create_test_cases( return self.compare_and_create_test_cases(
cli_test_cases_definitions=cli_test_cases, cli_test_cases_definitions=cli_test_cases,
test_cases=test_cases, test_cases=test_cases,
@ -142,15 +136,13 @@ class TestCaseRunner(Processor):
def get_test_case_from_cli_config( def get_test_case_from_cli_config(
self, self,
) -> Optional[List[TestCaseDefinition]]: ) -> List[TestCaseDefinition]:
"""Get all the test cases names defined in the CLI config file""" """Get all the test cases names defined in the CLI config file"""
if self.processor_config.testCases is not None: return list(self.processor_config.testCases or [])
return list(self.processor_config.testCases)
return None
def compare_and_create_test_cases( def compare_and_create_test_cases(
self, self,
cli_test_cases_definitions: Optional[List[TestCaseDefinition]], cli_test_cases_definitions: List[TestCaseDefinition],
test_cases: List[TestCase], test_cases: List[TestCase],
table_fqn: str, table_fqn: str,
test_suite_fqn: str, test_suite_fqn: str,

View File

@ -80,7 +80,7 @@ class TestSuiteSource(Source):
def _get_test_cases_from_test_suite( def _get_test_cases_from_test_suite(
self, test_suite: Optional[TestSuite] self, test_suite: Optional[TestSuite]
) -> Optional[List[TestCase]]: ) -> List[TestCase]:
"""Return test cases if the test suite exists and has them""" """Return test cases if the test suite exists and has them"""
if test_suite: if test_suite:
test_cases = self.metadata.list_all_entities( test_cases = self.metadata.list_all_entities(
@ -94,8 +94,7 @@ class TestSuiteSource(Source):
t for t in test_cases if t.name in self.source_config.testCases t for t in test_cases if t.name in self.source_config.testCases
] ]
return test_cases return test_cases
return []
return None
def prepare(self): def prepare(self):
"""Nothing to prepare""" """Nothing to prepare"""

View File

@ -1,9 +1,11 @@
import logging import logging
import sys import sys
from typing import List, Tuple, Type
import pytest import pytest
from _openmetadata_testutils.ometa import int_admin_ometa from _openmetadata_testutils.ometa import int_admin_ometa
from ingestion.src.metadata.ingestion.api.common import Entity
from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.workflow import LogLevels from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -171,3 +173,17 @@ def patch_passwords_for_db_services(db_service, unmask_password, monkeypatch):
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id", "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id",
override_password(OpenMetadata.get_by_id), override_password(OpenMetadata.get_by_id),
) )
@pytest.fixture
def cleanup_fqns(metadata):
fqns: List[Tuple[Type[Entity], str]] = []
def inner(entity_type: Type[Entity], fqn: str):
fqns.append((entity_type, fqn))
yield inner
for etype, fqn in fqns:
entity = metadata.get_by_name(etype, fqn, fields=["*"])
if entity:
metadata.delete(etype, entity.id, recursive=True, hard_delete=True)

View File

@ -2,7 +2,20 @@ import pytest
from testcontainers.mysql import MySqlContainer from testcontainers.mysql import MySqlContainer
from _openmetadata_testutils.postgres.conftest import postgres_container, try_bind from _openmetadata_testutils.postgres.conftest import postgres_container, try_bind
from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
BasicAuth,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import LogLevels from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.ingestion.models.custom_pydantic import CustomSecretStr from metadata.ingestion.models.custom_pydantic import CustomSecretStr
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -63,3 +76,54 @@ def ingest_mysql_service(
) )
yield db_service yield db_service
metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True) metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True)
@pytest.fixture(scope="module")
def create_service_request(tmp_path_factory, postgres_container):
return CreateDatabaseServiceRequest(
name="docker_test_" + tmp_path_factory.mktemp("postgres").name,
serviceType=DatabaseServiceType.Postgres,
connection=DatabaseConnection(
config=PostgresConnection(
username=postgres_container.username,
authType=BasicAuth(password=postgres_container.password),
hostPort="localhost:"
+ postgres_container.get_exposed_port(postgres_container.port),
database="dvdrental",
)
),
)
@pytest.fixture(scope="module")
def postgres_service(db_service):
return db_service
@pytest.fixture()
def ingest_postgres_metadata(
postgres_service, metadata: OpenMetadata, sink_config, workflow_config, run_workflow
):
workflow_config = {
"source": {
"type": postgres_service.connection.config.type.value.lower(),
"serviceName": postgres_service.fullyQualifiedName.root,
"serviceConnection": postgres_service.connection,
"sourceConfig": {"config": {}},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
run_workflow(MetadataWorkflow, workflow_config)
@pytest.fixture(scope="module")
def patch_password(postgres_container):
def inner(service: DatabaseService):
service.connection.config = cast(PostgresConnection, service.connection.config)
service.connection.config.authType.password = type(
service.connection.config.authType.password
)(postgres_container.password)
return service
return inner

View File

@ -14,33 +14,10 @@ from sqlalchemy.sql import sqltypes
from _openmetadata_testutils.postgres.conftest import postgres_container from _openmetadata_testutils.postgres.conftest import postgres_container
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.data_quality.api.models import TestCaseDefinition from metadata.data_quality.api.models import TestCaseDefinition
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( from metadata.generated.schema.entity.services.databaseService import DatabaseService
BasicAuth,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuiteConfigType, TestSuiteConfigType,
TestSuitePipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
LogLevels,
OpenMetadataWorkflowConfig,
Processor,
Sink,
Source,
SourceConfig,
WorkflowConfig,
) )
from metadata.generated.schema.tests.basic import ( from metadata.generated.schema.tests.basic import (
TestCaseResult, TestCaseResult,
@ -48,10 +25,8 @@ from metadata.generated.schema.tests.basic import (
TestResultValue, TestResultValue,
) )
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.ingestion.models.custom_pydantic import CustomSecretStr
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow
if not sys.version_info >= (3, 9): if not sys.version_info >= (3, 9):
pytest.skip( pytest.skip(
@ -258,6 +233,11 @@ def test_happy_paths(
ingest_mysql_service, ingest_mysql_service,
patched_metadata, patched_metadata,
parameters: TestParameters, parameters: TestParameters,
sink_config,
profiler_config,
run_workflow,
workflow_config,
cleanup_fqns,
): ):
metadata = patched_metadata metadata = patched_metadata
table1 = metadata.get_by_name( table1 = metadata.get_by_name(
@ -265,6 +245,10 @@ def test_happy_paths(
f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer", f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer",
nullable=False, nullable=False,
) )
cleanup_fqns(
TestCase,
f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}",
)
table2_service = { table2_service = {
"POSTGRES_SERVICE": postgres_service, "POSTGRES_SERVICE": postgres_service,
"MYSQL_SERVICE": ingest_mysql_service, "MYSQL_SERVICE": ingest_mysql_service,
@ -281,41 +265,30 @@ def test_happy_paths(
), ),
] ]
) )
workflow_config = {
workflow_config = OpenMetadataWorkflowConfig( "source": {
source=Source( "type": TestSuiteConfigType.TestSuite.value,
type=TestSuiteConfigType.TestSuite.value, "serviceName": "MyTestSuite",
serviceName="MyTestSuite", "sourceConfig": {
sourceConfig=SourceConfig( "config": {
config=TestSuitePipeline( "type": TestSuiteConfigType.TestSuite.value,
type=TestSuiteConfigType.TestSuite, "entityFullyQualifiedName": table1.fullyQualifiedName.root,
entityFullyQualifiedName=f"{table1.fullyQualifiedName.root}", }
) },
), },
), "processor": {
processor=Processor( "type": "orm-test-runner",
type="orm-test-runner", "config": {"testCases": [parameters.test_case_defintion.dict()]},
config={"testCases": [parameters.test_case_defintion]}, },
), "sink": sink_config,
sink=Sink( "workflowConfig": workflow_config,
type="metadata-rest", }
config={}, run_workflow(TestSuiteWorkflow, workflow_config)
), test_case_entity = metadata.get_by_name(
workflowConfig=WorkflowConfig( TestCase,
loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}",
), fields=["*"],
) )
test_suite_procesor = TestSuiteWorkflow.create(workflow_config)
test_suite_procesor.execute()
test_suite_procesor.stop()
test_case_entity: TestCase = metadata.get_or_create_test_case(
f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}"
)
try:
test_suite_procesor.raise_from_status()
finally:
metadata.delete(TestCase, test_case_entity.id, recursive=True, hard_delete=True)
assert "ERROR: Unexpected error" not in test_case_entity.testCaseResult.result assert "ERROR: Unexpected error" not in test_case_entity.testCaseResult.result
assert_equal_pydantic_objects(parameters.expected, test_case_entity.testCaseResult) assert_equal_pydantic_objects(parameters.expected, test_case_entity.testCaseResult)
@ -393,6 +366,10 @@ def test_error_paths(
ingest_mysql_service: DatabaseService, ingest_mysql_service: DatabaseService,
postgres_service: DatabaseService, postgres_service: DatabaseService,
patched_metadata: OpenMetadata, patched_metadata: OpenMetadata,
sink_config,
workflow_config,
run_workflow,
cleanup_fqns,
): ):
metadata = patched_metadata metadata = patched_metadata
table1 = metadata.get_by_name( table1 = metadata.get_by_name(
@ -400,94 +377,37 @@ def test_error_paths(
f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer", f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer",
nullable=False, nullable=False,
) )
cleanup_fqns(TestCase, f"{table1.fullyQualifiedName.root}.{parameters.name}")
for parameter in parameters.parameterValues: for parameter in parameters.parameterValues:
if parameter.name == "table2": if parameter.name == "table2":
parameter.value = parameter.value.replace( parameter.value = parameter.value.replace(
"POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root "POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root
) )
workflow_config = OpenMetadataWorkflowConfig( workflow_config = {
source=Source( "source": {
type=TestSuiteConfigType.TestSuite.value, "type": TestSuiteConfigType.TestSuite.value,
serviceName="MyTestSuite", "serviceName": "MyTestSuite",
sourceConfig=SourceConfig( "sourceConfig": {
config=TestSuitePipeline( "config": {
type=TestSuiteConfigType.TestSuite, "type": TestSuiteConfigType.TestSuite.value,
entityFullyQualifiedName=f"{table1.fullyQualifiedName.root}", "entityFullyQualifiedName": table1.fullyQualifiedName.root,
) }
), },
serviceConnection=postgres_service.connection, },
), "processor": {
processor=Processor( "type": "orm-test-runner",
type="orm-test-runner", "config": {"testCases": [parameters.dict()]},
config={"testCases": [parameters]}, },
), "sink": sink_config,
sink=Sink( "workflowConfig": workflow_config,
type="metadata-rest", }
config={}, run_workflow(TestSuiteWorkflow, workflow_config)
),
workflowConfig=WorkflowConfig(
loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config
),
)
test_suite_procesor = TestSuiteWorkflow.create(workflow_config)
test_suite_procesor.execute()
test_suite_procesor.stop()
test_case_entity: TestCase = metadata.get_or_create_test_case( test_case_entity: TestCase = metadata.get_or_create_test_case(
f"{table1.fullyQualifiedName.root}.{parameters.name}" f"{table1.fullyQualifiedName.root}.{parameters.name}"
) )
try:
test_suite_procesor.raise_from_status()
finally:
metadata.delete(TestCase, test_case_entity.id, recursive=True, hard_delete=True)
assert_equal_pydantic_objects(expected, test_case_entity.testCaseResult) assert_equal_pydantic_objects(expected, test_case_entity.testCaseResult)
@pytest.fixture(scope="module")
def postgres_service(metadata, postgres_container):
service = CreateDatabaseServiceRequest(
name="docker_test_postgres_db",
serviceType=DatabaseServiceType.Postgres,
connection=DatabaseConnection(
config=PostgresConnection(
username=postgres_container.username,
authType=BasicAuth(password=postgres_container.password),
hostPort="localhost:"
+ postgres_container.get_exposed_port(postgres_container.port),
database="dvdrental",
)
),
)
service_entity = metadata.create_or_update(data=service)
service_entity.connection.config.authType.password = CustomSecretStr(
postgres_container.password
)
yield service_entity
metadata.delete(
DatabaseService, service_entity.id, recursive=True, hard_delete=True
)
@pytest.fixture(scope="module")
def ingest_postgres_metadata(postgres_service, metadata: OpenMetadata):
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type=postgres_service.connection.config.type.value.lower(),
serviceName=postgres_service.fullyQualifiedName.root,
serviceConnection=postgres_service.connection,
sourceConfig=SourceConfig(config={}),
),
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config),
)
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
return
def add_changed_tables(connection: Connection): def add_changed_tables(connection: Connection):
connection.execute("CREATE TABLE customer_200 AS SELECT * FROM customer LIMIT 200;") connection.execute("CREATE TABLE customer_200 AS SELECT * FROM customer LIMIT 200;")
connection.execute("CREATE TABLE changed_customer AS SELECT * FROM customer;") connection.execute("CREATE TABLE changed_customer AS SELECT * FROM customer;")
@ -573,46 +493,28 @@ def copy_table(source_engine, destination_engine, table_name):
@pytest.fixture @pytest.fixture
def patched_metadata(metadata, postgres_service, ingest_mysql_service, monkeypatch): def patched_metadata(metadata, postgres_service, ingest_mysql_service, monkeypatch):
openmetadata_get_by_name = OpenMetadata.get_by_name dbs_by_name = {
service.fullyQualifiedName.root: service
for service in [postgres_service, ingest_mysql_service]
}
def get_by_name_override_service_password(self, entity, fqn, *args, **kwargs): def override_result_by_fqn(func):
result = openmetadata_get_by_name(self, entity, fqn, *args, **kwargs) def inner(*args, **kwargs):
if entity == DatabaseService: result = func(*args, **kwargs)
return next( if result.fullyQualifiedName.root in dbs_by_name:
( return dbs_by_name[result.fullyQualifiedName.root]
service return result
for service in [postgres_service, ingest_mysql_service]
if service.fullyQualifiedName.root == fqn
),
result,
)
return result return inner
monkeypatch.setattr( monkeypatch.setattr(
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name", "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name",
get_by_name_override_service_password, override_result_by_fqn(OpenMetadata.get_by_name),
) )
openmetadata_get_by_id = OpenMetadata.get_by_id
def get_by_id_override_service_password(self, entity, entity_id, *args, **kwargs):
result = openmetadata_get_by_id(self, entity, entity_id, *args, **kwargs)
if entity == DatabaseService:
return next(
(
service
for service in [postgres_service, ingest_mysql_service]
if service.id == entity_id
),
result,
)
return result
monkeypatch.setattr( monkeypatch.setattr(
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id", "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id",
get_by_id_override_service_password, override_result_by_fqn(OpenMetadata.get_by_id),
) )
return metadata return metadata

View File

@ -0,0 +1,52 @@
import sys
import pytest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuiteConfigType,
)
from metadata.workflow.data_quality import TestSuiteWorkflow
if not sys.version_info >= (3, 9):
pytest.skip(
"requires python 3.9+ due to incompatibility with testcontainers",
allow_module_level=True,
)
def test_empty_test_suite(
postgres_service: DatabaseService,
run_workflow,
ingest_postgres_metadata,
patch_passwords_for_db_services,
metadata,
sink_config,
workflow_config,
cleanup_fqns,
):
table = metadata.get_by_name(
Table,
f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer",
nullable=False,
)
workflow_config = {
"source": {
"type": TestSuiteConfigType.TestSuite.value,
"serviceName": "MyTestSuite",
"sourceConfig": {
"config": {
"type": TestSuiteConfigType.TestSuite.value,
"entityFullyQualifiedName": table.fullyQualifiedName.root,
}
},
},
"processor": {
"type": "orm-test-runner",
"config": {"testCases": []},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
run_workflow(TestSuiteWorkflow, workflow_config)

View File

@ -38,6 +38,14 @@ MOCK_ENTITY_REFERENCE = EntityReference(
}, },
["test_case1"], ["test_case1"],
), ),
(
{
"type": "TestSuite",
"entityFullyQualifiedName": "MyTestSuite",
"testCases": [],
},
[],
),
], ],
) )
def test_source_config(parameters, expected, monkeypatch): def test_source_config(parameters, expected, monkeypatch):