diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueLengthsToBeBetween.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueLengthsToBeBetween.py index 3093a392cbc..2473436cd2b 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueLengthsToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueLengthsToBeBetween.py @@ -12,8 +12,7 @@ """ Validator for column value length to be between test case """ - - +import math from typing import Optional from sqlalchemy import Column, inspect @@ -65,14 +64,16 @@ class ColumnValueLengthsToBeBetweenValidator( NotImplementedError: """ row_count = self._compute_row_count(self.runner, column) + filters = [] + if min_bound > -math.inf: + filters.append((LenFn(column), "lt", min_bound)) + if max_bound < math.inf: + filters.append((LenFn(column), "gt", max_bound)) failed_rows = self._compute_row_count_between( self.runner, column, { - "filters": [ - (LenFn(column), "gt", max_bound), - (LenFn(column), "lt", min_bound), - ], + "filters": filters, "or_filter": True, }, ) diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeBetween.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeBetween.py index 02bd0345650..af8fcc9b5fe 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeBetween.py @@ -12,7 +12,7 @@ """ Validator for column values to be between test case """ - +import math from typing import Optional from sqlalchemy import Column, inspect @@ -63,11 +63,16 @@ class ColumnValuesToBeBetweenValidator( NotImplementedError: """ row_count = self._compute_row_count(self.runner, column) + filters = [] + if not isinstance(min_bound, (int, float)) or min_bound > -math.inf: + filters.append((column, "lt", min_bound)) + if not isinstance(min_bound, (int, float)) or max_bound < math.inf: + filters.append((column, "gt", max_bound)) failed_rows = self._compute_row_count_between( self.runner, column, { - "filters": [(column, "gt", max_bound), (column, "lt", min_bound)], + "filters": filters, "or_filter": True, }, ) diff --git a/ingestion/tests/integration/conftest.py b/ingestion/tests/integration/conftest.py index 064800139ec..5f41c7bae76 100644 --- a/ingestion/tests/integration/conftest.py +++ b/ingestion/tests/integration/conftest.py @@ -154,6 +154,20 @@ def create_service_request(): @pytest.fixture() def patch_passwords_for_db_services(db_service, unmask_password, monkeypatch): + """Patch the password for all db services returned by the metadata service. + + Usage: + + def test_my_test(db_service, patch_passwords_for_db_services): + ... + + OR + + @pytest.usefixtures("patch_passwords_for_db_services") + def test_my_test(db_service): + ... + """ + def override_password(getter): def inner(*args, **kwargs): result = getter(*args, **kwargs) @@ -187,3 +201,17 @@ def cleanup_fqns(metadata): entity = metadata.get_by_name(etype, fqn, fields=["*"]) if entity: metadata.delete(etype, entity.id, recursive=True, hard_delete=True) + + +@pytest.fixture(scope="module") +def ingestion_config(db_service, metadata, workflow_config, sink_config): + return { + "source": { + "type": db_service.connection.config.type.value.lower(), + "serviceName": db_service.fullyQualifiedName.root, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + "serviceConnection": db_service.connection.model_dump(), + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } diff --git a/ingestion/tests/integration/mysql/__init__.py b/ingestion/tests/integration/mysql/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/tests/integration/mysql/conftest.py b/ingestion/tests/integration/mysql/conftest.py new file mode 100644 index 00000000000..1147984f0b2 --- /dev/null +++ b/ingestion/tests/integration/mysql/conftest.py @@ -0,0 +1,73 @@ +import os +from subprocess import CalledProcessError + +import pytest +from sqlalchemy import create_engine +from testcontainers.mysql import MySqlContainer + +from _openmetadata_testutils.helpers.docker import try_bind +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseServiceType, +) + + +@pytest.fixture(scope="module") +def mysql_container(tmp_path_factory): + """Start a PostgreSQL container with the dvdrental database.""" + test_db_tar_path = os.path.join( + os.path.dirname(__file__), "data", "mysql", "test_db-1.0.7.tar.gz" + ) + container = MySqlContainer(dbname="employees") + with ( + try_bind(container, 3306, 3307) if not os.getenv("CI") else container + ) as container: + docker_container = container.get_wrapped_container() + docker_container.exec_run(["mkdir", "-p", "/data"]) + docker_container.put_archive("/data", open(test_db_tar_path, "rb")) + for command in ( + [ + "sh", + "-c", + f"cd /data/test_db && mysql -uroot -p{container.password} < employees.sql", + ], + [ + "sh", + "-c", + f'mysql -uroot -p{container.password} -e \'GRANT SELECT ON employees.* TO "test"@"%";\'', + ], + ): + res = docker_container.exec_run(command) + if res[0] != 0: + raise CalledProcessError( + returncode=res[0], cmd=res, output=res[1].decode("utf-8") + ) + engine = create_engine(container.get_connection_url()) + engine.execute( + "ALTER TABLE employees ADD COLUMN last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP" + ) + engine.execute( + "UPDATE employees SET last_update = hire_date + INTERVAL FLOOR(1 + RAND() * 500000) SECOND" + ) + yield container + + +@pytest.fixture(scope="module") +def create_service_request(mysql_container, tmp_path_factory): + return CreateDatabaseServiceRequest.model_validate( + { + "name": "docker_test_" + tmp_path_factory.mktemp("mysql").name, + "serviceType": DatabaseServiceType.Mysql.value, + "connection": { + "config": { + "username": mysql_container.username, + "authType": {"password": mysql_container.password}, + "hostPort": "localhost:" + + mysql_container.get_exposed_port(mysql_container.port), + "databaseSchema": mysql_container.dbname, + } + }, + } + ) diff --git a/ingestion/tests/integration/mysql/data/mysql/README.md b/ingestion/tests/integration/mysql/data/mysql/README.md new file mode 100644 index 00000000000..7bd29254d03 --- /dev/null +++ b/ingestion/tests/integration/mysql/data/mysql/README.md @@ -0,0 +1,3 @@ +# MySQL test db + +https://github.com/datacharmer/test_db \ No newline at end of file diff --git a/ingestion/tests/integration/mysql/data/mysql/test_db-1.0.7.tar.gz b/ingestion/tests/integration/mysql/data/mysql/test_db-1.0.7.tar.gz new file mode 100644 index 00000000000..eae1d041ece Binary files /dev/null and b/ingestion/tests/integration/mysql/data/mysql/test_db-1.0.7.tar.gz differ diff --git a/ingestion/tests/integration/mysql/test_data_quality.py b/ingestion/tests/integration/mysql/test_data_quality.py new file mode 100644 index 00000000000..58ff249e824 --- /dev/null +++ b/ingestion/tests/integration/mysql/test_data_quality.py @@ -0,0 +1,222 @@ +import sys +from dataclasses import dataclass +from datetime import datetime +from typing import List + +import pytest + +from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects +from metadata.data_quality.api.models import TestCaseDefinition +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( + TestSuiteConfigType, + TestSuitePipeline, +) +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.testCase import TestCase +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.data_quality import TestSuiteWorkflow +from metadata.workflow.metadata import MetadataWorkflow + +if not sys.version_info >= (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) + + +@pytest.fixture() +def get_test_suite_config(workflow_config, sink_config): + def inner(entity_fqn: str, test_case_definitions: List[TestCaseDefinition]): + return { + "source": { + "type": TestSuiteConfigType.TestSuite.value, + "serviceName": "MyTestSuite", + "sourceConfig": { + "config": TestSuitePipeline( + type=TestSuiteConfigType.TestSuite, + entityFullyQualifiedName=entity_fqn, + ) + }, + }, + "processor": { + "type": "orm-test-runner", + "config": { + "testCases": [obj.model_dump() for obj in test_case_definitions] + }, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + + return inner + + +@dataclass +class TestColumnParameter: + entity_fqn: str + test_case_definition: TestCaseDefinition + expected_result: TestCaseResult + + +@pytest.fixture( + params=[ + TestColumnParameter( + entity_fqn="{database_service_fqn}.default.employees.employees", + test_case_definition=TestCaseDefinition( + name="first_name_includes_tom_and_jerry_wo_enum", + testDefinitionName="columnValuesToBeInSet", + computePassedFailedRowCount=True, + columnName="first_name", + parameterValues=[ + {"name": "allowedValues", "value": "['Tom', 'Jerry']"} + ], + ), + expected_result=TestCaseResult( + testCaseStatus=TestCaseStatus.Failed, + ), + ), + TestColumnParameter( + entity_fqn="{database_service_fqn}.default.employees.employees", + test_case_definition=TestCaseDefinition( + name="value_lengths_between_3_and_5", + testDefinitionName="columnValueLengthsToBeBetween", + computePassedFailedRowCount=True, + columnName="first_name", + parameterValues=[ + {"name": "minLength", "value": "3"}, + {"name": "maxLength", "value": "5"}, + ], + ), + expected_result=TestCaseResult( + testCaseStatus=TestCaseStatus.Failed, + ), + ), + TestColumnParameter( + entity_fqn="{database_service_fqn}.default.employees.employees", + test_case_definition=TestCaseDefinition( + name="value_lengths_at_most_5", + testDefinitionName="columnValueLengthsToBeBetween", + columnName="first_name", + computePassedFailedRowCount=True, + parameterValues=[ + {"name": "maxLength", "value": "5"}, + ], + ), + expected_result=TestCaseResult( + testCaseStatus=TestCaseStatus.Failed, + ), + ), + TestColumnParameter( + entity_fqn="{database_service_fqn}.default.employees.employees", + test_case_definition=TestCaseDefinition( + name="value_lengths_at_least_3", + testDefinitionName="columnValueLengthsToBeBetween", + columnName="first_name", + computePassedFailedRowCount=True, + parameterValues=[ + {"name": "minLength", "value": "3"}, + ], + ), + expected_result=TestCaseResult( + testCaseStatus=TestCaseStatus.Success, + ), + ), + TestColumnParameter( + entity_fqn="{database_service_fqn}.default.employees.employees", + test_case_definition=TestCaseDefinition( + name="id_at_least_0", + testDefinitionName="columnValuesToBeBetween", + columnName="emp_no", + computePassedFailedRowCount=True, + parameterValues=[ + {"name": "minValue", "value": "0"}, + ], + ), + expected_result=TestCaseResult( + testCaseStatus=TestCaseStatus.Success, + ), + ), + TestColumnParameter( + entity_fqn="{database_service_fqn}.default.employees.employees", + test_case_definition=TestCaseDefinition( + name="id_no_bounds", + testDefinitionName="columnValuesToBeBetween", + columnName="emp_no", + computePassedFailedRowCount=True, + parameterValues=[], + ), + expected_result=TestCaseResult( + testCaseStatus=TestCaseStatus.Success, + ), + ), + TestColumnParameter( + entity_fqn="{database_service_fqn}.default.employees.employees", + test_case_definition=TestCaseDefinition( + name="values_between_date", + testDefinitionName="columnValuesToBeBetween", + columnName="hire_date", + computePassedFailedRowCount=True, + parameterValues=[ + { + "name": "minValue", + "value": str(int(datetime(1960, 1, 1).timestamp())), + }, + ], + ), + expected_result=TestCaseResult( + testCaseStatus=TestCaseStatus.Success, + ), + ), + TestColumnParameter( + entity_fqn="{database_service_fqn}.default.employees.employees", + test_case_definition=TestCaseDefinition( + name="value_between_timestamp", + testDefinitionName="columnValuesToBeBetween", + columnName="last_update", + computePassedFailedRowCount=True, + parameterValues=[ + { + "name": "minValue", + "value": str(int(datetime(2000, 1, 1).timestamp())), + }, + ], + ), + expected_result=TestCaseResult( + testCaseStatus=TestCaseStatus.Failed, + ), + ), + ], + ids=lambda x: x.test_case_definition.name, +) +def parameters(request, db_service): + request.param.entity_fqn = request.param.entity_fqn.format( + database_service_fqn=db_service.fullyQualifiedName.root + ) + return request.param + + +def test_column_test_cases( + patch_passwords_for_db_services, + run_workflow, + ingestion_config, + db_service: DatabaseService, + metadata: OpenMetadata, + parameters: TestColumnParameter, + get_test_suite_config, + cleanup_fqns, +): + run_workflow(MetadataWorkflow, ingestion_config) + test_suite_config = get_test_suite_config( + parameters.entity_fqn, + [parameters.test_case_definition], + ) + run_workflow(TestSuiteWorkflow, test_suite_config) + test_case: TestCase = metadata.get_by_name( + TestCase, + f"{parameters.entity_fqn}.{parameters.test_case_definition.columnName}.{parameters.test_case_definition.name}", + fields=["*"], + nullable=False, + ) + cleanup_fqns(TestCase, test_case.fullyQualifiedName.root) + assert_equal_pydantic_objects( + parameters.expected_result, + test_case.testCaseResult, + ) diff --git a/ingestion/tests/integration/mysql/test_metadata.py b/ingestion/tests/integration/mysql/test_metadata.py new file mode 100644 index 00000000000..dd491df33a5 --- /dev/null +++ b/ingestion/tests/integration/mysql/test_metadata.py @@ -0,0 +1,14 @@ +import sys + +import pytest + +from metadata.workflow.metadata import MetadataWorkflow + +if not sys.version_info >= (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) + + +def test_ingest_metadata( + patch_passwords_for_db_services, run_workflow, ingestion_config +): + run_workflow(MetadataWorkflow, ingestion_config) diff --git a/ingestion/tests/integration/mysql/test_profiler.py b/ingestion/tests/integration/mysql/test_profiler.py new file mode 100644 index 00000000000..5aa854b0601 --- /dev/null +++ b/ingestion/tests/integration/mysql/test_profiler.py @@ -0,0 +1,18 @@ +import sys + +import pytest + +from metadata.ingestion.lineage.sql_lineage import search_cache +from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow + +if not sys.version_info >= (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) + + +def test_profiler( + patch_passwords_for_db_services, run_workflow, ingestion_config, profiler_config +): + search_cache.clear() + run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(ProfilerWorkflow, profiler_config) diff --git a/ingestion/tests/integration/postgres/conftest.py b/ingestion/tests/integration/postgres/conftest.py index 4ad8dde4d80..3e8ea634d61 100644 --- a/ingestion/tests/integration/postgres/conftest.py +++ b/ingestion/tests/integration/postgres/conftest.py @@ -31,19 +31,3 @@ def create_service_request(postgres_container, tmp_path_factory): ) ), ) - - -@pytest.fixture(scope="module") -def ingestion_config( - db_service, metadata, workflow_config, sink_config, postgres_container -): - return { - "source": { - "type": db_service.connection.config.type.value.lower(), - "serviceName": db_service.fullyQualifiedName.root, - "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, - "serviceConnection": db_service.connection.dict(), - }, - "sink": sink_config, - "workflowConfig": workflow_config, - } diff --git a/ingestion/tests/integration/postgres/test_data_quality.py b/ingestion/tests/integration/postgres/test_data_quality.py index 0506fc06987..2cee67c9879 100644 --- a/ingestion/tests/integration/postgres/test_data_quality.py +++ b/ingestion/tests/integration/postgres/test_data_quality.py @@ -82,6 +82,12 @@ def run_data_quality_workflow( {"name": "matchEnum", "value": "True"}, ], }, + { + "name": "id_no_bounds", + "testDefinitionName": "columnValuesToBeBetween", + "columnName": "customer_id", + "parameterValues": [], + }, ], } ), @@ -106,6 +112,7 @@ def run_data_quality_workflow( ("first_name_includes_tom_and_jerry_wo_enum", TestCaseStatus.Success), ("first_name_includes_tom_and_jerry", TestCaseStatus.Success), ("first_name_is_tom_or_jerry", TestCaseStatus.Failed), + ("id_no_bounds", TestCaseStatus.Success), ], ) def test_data_quality( diff --git a/ingestion/tests/integration/sql_server/conftest.py b/ingestion/tests/integration/sql_server/conftest.py index 7fc8917be14..942d2d275bf 100644 --- a/ingestion/tests/integration/sql_server/conftest.py +++ b/ingestion/tests/integration/sql_server/conftest.py @@ -19,6 +19,8 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) +from ..conftest import ingestion_config as base_ingestion_config + @pytest.fixture(scope="module") def mssql_container(tmp_path_factory): @@ -112,22 +114,15 @@ def create_service_request(mssql_container, scheme, tmp_path_factory): @pytest.fixture(scope="module") -def ingestion_config(db_service, tmp_path_factory, workflow_config, sink_config): - return { - "source": { - "type": "mssql", - "serviceName": db_service.fullyQualifiedName.root, - "serviceConnection": db_service.connection.dict(), - "sourceConfig": { - "config": { - "type": "DatabaseMetadata", - "databaseFilterPattern": {"includes": ["TestDB", "AdventureWorks"]}, - }, - }, - }, - "sink": sink_config, - "workflowConfig": workflow_config, +def ingestion_config( + db_service, tmp_path_factory, workflow_config, sink_config, base_ingestion_config +): + base_ingestion_config["source"]["sourceConfig"]["config"][ + "databaseFilterPattern" + ] = { + "includes": ["TestDB", "AdventureWorks"], } + return base_ingestion_config @pytest.fixture(scope="module") diff --git a/ingestion/tests/integration/trino/conftest.py b/ingestion/tests/integration/trino/conftest.py index d9db51081aa..32dbd225b02 100644 --- a/ingestion/tests/integration/trino/conftest.py +++ b/ingestion/tests/integration/trino/conftest.py @@ -24,6 +24,8 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) +from ..conftest import ingestion_config as base_ingestion_config + class TrinoContainer(DbContainer): def __init__( @@ -202,26 +204,13 @@ def create_service_request(trino_container, tmp_path_factory): @pytest.fixture -def ingestion_config(db_service, sink_config, workflow_config): - return { - "source": { - "type": db_service.connection.config.type.value.lower(), - "serviceName": db_service.fullyQualifiedName.root, - "serviceConnection": db_service.connection.dict(), - "sourceConfig": { - "config": { - "type": "DatabaseMetadata", - "schemaFilterPattern": { - "excludes": [ - "^information_schema$", - ], - }, - }, - }, - }, - "sink": sink_config, - "workflowConfig": workflow_config, +def ingestion_config(db_service, sink_config, workflow_config, base_ingestion_config): + base_ingestion_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { + "excludes": [ + "^information_schema$", + ], } + return base_ingestion_config @pytest.fixture(scope="module") diff --git a/openmetadata-docs/content/v1.3.x/connectors/ingestion/workflows/data-quality/tests.md b/openmetadata-docs/content/v1.3.x/connectors/ingestion/workflows/data-quality/tests.md index 6333b46c0cb..f1ccd50ef05 100644 --- a/openmetadata-docs/content/v1.3.x/connectors/ingestion/workflows/data-quality/tests.md +++ b/openmetadata-docs/content/v1.3.x/connectors/ingestion/workflows/data-quality/tests.md @@ -671,7 +671,7 @@ Validate values form a set are present in a column. computePassedFailedRowCount: parameterValues: - name: allowedValues - value: ["forbidden1", "forbidden2"] + value: '["allowed1","allowed2"]' ``` **JSON Config** @@ -685,10 +685,7 @@ Validate values form a set are present in a column. "parameterValues": [ { "name": "allowedValues", - "value": [ - "forbidden1", - "forbidden2" - ] + "value": '["allowed1","allowed2"]' } ] } @@ -718,7 +715,7 @@ Validate that there are no values in a column in a set of forbidden values. computePassedFailedRowCount: parameterValues: - name: forbiddenValues - value: ["forbidden1", "forbidden2"] + value: '["forbidden1", "forbidden2"]' ``` **JSON Config** @@ -743,14 +740,14 @@ Validate that there are no values in a column in a set of forbidden values. ### Column Values to Be Between Validate that the values of a column are within a given range. -> Only supports numerical types. +> For date types, the range should be in an epoch seconds or milliseconds format. **Properties** * `minValue`: Lower bound of the interval. If informed, the column values should be bigger than this number. * `maxValue`: Upper bound of the interval. If informed, the column values should be lower than this number. -Any of those two need to be informed. +Zero, one or both of those two need to be informed. **Behavior** @@ -773,7 +770,18 @@ Any of those two need to be informed. computePassedFailedRowCount: parameterValues: - name: minValue - value: ["forbidden1", "forbidden2"] + value: '10' +``` + +```yaml +- name: myTestName + description: test description + columnName: dateColumn + testDefinitionName: columnValuesToBeBetween + computePassedFailedRowCount: + parameterValues: + - name: minValue + value: '1704067200' # 2020-01-01 ``` **JSON Config** @@ -787,10 +795,7 @@ Any of those two need to be informed. "parameterValues": [ { "name": "minValue", - "value": [ - "forbidden1", - "forbidden2" - ] + "value": '10' } ] }