diff --git a/ingestion/src/metadata/data_quality/api/models.py b/ingestion/src/metadata/data_quality/api/models.py index 5f5800c5a90..50c2eca41f1 100644 --- a/ingestion/src/metadata/data_quality/api/models.py +++ b/ingestion/src/metadata/data_quality/api/models.py @@ -36,6 +36,7 @@ class TestCaseDefinition(ConfigModel): testDefinitionName: str columnName: Optional[str] = None parameterValues: Optional[List[TestCaseParameterValue]] + computePassedFailedRowCount: Optional[bool] = False class TestSuiteProcessorConfig(ConfigModel): diff --git a/ingestion/src/metadata/data_quality/processor/test_case_runner.py b/ingestion/src/metadata/data_quality/processor/test_case_runner.py index aac36cfa112..698da6a2fa3 100644 --- a/ingestion/src/metadata/data_quality/processor/test_case_runner.py +++ b/ingestion/src/metadata/data_quality/processor/test_case_runner.py @@ -251,13 +251,14 @@ class TestCaseRunner(Processor): if test_case_to_update.name == test_case.name.__root__ ) updated_test_case = self.metadata.patch_test_case_definition( - source=test_case, + test_case=test_case, entity_link=entity_link.get_entity_link( Table, fqn=table_fqn, column_name=test_case_definition.columnName, ), test_case_parameter_values=test_case_definition.parameterValues, + compute_passed_failed_row_count=test_case_definition.computePassedFailedRowCount, ) if updated_test_case: test_cases.pop(indx) diff --git a/ingestion/src/metadata/data_quality/validations/base_test_handler.py b/ingestion/src/metadata/data_quality/validations/base_test_handler.py index 067a85f4c3c..ebbe8256ca0 100644 --- a/ingestion/src/metadata/data_quality/validations/base_test_handler.py +++ b/ingestion/src/metadata/data_quality/validations/base_test_handler.py @@ -113,7 +113,7 @@ class BaseTestValidator(ABC): sampleData=None, ) - if (row_count is not None) and ( + if (row_count is not None and row_count != 0) and ( # we'll need at least one of these to be not None to compute the other (failed_rows is not None) or (passed_rows is not None) diff --git a/ingestion/src/metadata/data_quality/validations/mixins/sqa_validator_mixin.py b/ingestion/src/metadata/data_quality/validations/mixins/sqa_validator_mixin.py index 19f0dcd7fa6..7227621ba5f 100644 --- a/ingestion/src/metadata/data_quality/validations/mixins/sqa_validator_mixin.py +++ b/ingestion/src/metadata/data_quality/validations/mixins/sqa_validator_mixin.py @@ -79,7 +79,10 @@ class SQAValidatorMixin: if res is None: raise ValueError( - f"Query on table/column {column.name if column is not None else ''} returned None" + f"\nQuery on table/column {column.name if column is not None else ''} returned None. Your table might be empty. " + "If you confirmed your table is not empty and are still seeing this message you can:\n" + "\t1. check the documentation: https://docs.open-metadata.org/v1.3.x/connectors/ingestion/workflows/data-quality/tests\n" + "\t2. reach out to the Collate team for support" ) return res diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py index 3e627597d8f..257462e8946 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py @@ -234,9 +234,10 @@ class OMetaPatchMixin(OMetaPatchMixinBase): def patch_test_case_definition( self, - source: TestCase, + test_case: TestCase, entity_link: str, test_case_parameter_values: Optional[List[TestCaseParameterValue]] = None, + compute_passed_failed_row_count: Optional[bool] = False, ) -> Optional[TestCase]: """Given a test case and a test case definition JSON PATCH the test case @@ -245,7 +246,7 @@ class OMetaPatchMixin(OMetaPatchMixinBase): test_case_definition: test case definition to add """ source: TestCase = self._fetch_entity_if_exists( - entity=TestCase, entity_id=source.id, fields=["testDefinition", "testSuite"] + entity=TestCase, entity_id=test_case.id, fields=["testDefinition", "testSuite"] # type: ignore ) # type: ignore if not source: @@ -256,6 +257,8 @@ class OMetaPatchMixin(OMetaPatchMixinBase): destination.entityLink = EntityLink(__root__=entity_link) if test_case_parameter_values: destination.parameterValues = test_case_parameter_values + if compute_passed_failed_row_count != source.computePassedFailedRowCount: + destination.computePassedFailedRowCount = compute_passed_failed_row_count return self.patch(entity=TestCase, source=source, destination=destination) diff --git a/ingestion/src/metadata/profiler/orm/converter/base.py b/ingestion/src/metadata/profiler/orm/converter/base.py index 6fce328af85..b2c23577e2d 100644 --- a/ingestion/src/metadata/profiler/orm/converter/base.py +++ b/ingestion/src/metadata/profiler/orm/converter/base.py @@ -127,7 +127,10 @@ def ometa_to_sqa_orm( { "__tablename__": str(table.name.__root__), "__table_args__": { - "schema": orm_schema_name, + # SQLite does not support schemas + "schema": orm_schema_name + if table.serviceType != databaseService.DatabaseServiceType.SQLite + else None, "extend_existing": True, # Recreates the table ORM object if it already exists. Useful for testing "quote": check_snowflake_case_sensitive( table.serviceType, table.name.__root__ diff --git a/ingestion/tests/integration/test_suite/test_e2e_workflow.py b/ingestion/tests/integration/test_suite/test_e2e_workflow.py index e1967eb730b..db05cc15483 100644 --- a/ingestion/tests/integration/test_suite/test_e2e_workflow.py +++ b/ingestion/tests/integration/test_suite/test_e2e_workflow.py @@ -71,9 +71,10 @@ test_suite_config = { ], }, { - "name": "table_column_name_to_exists", - "testDefinitionName": "tableColumnNameToExist", - "parameterValues": [{"name": "columnName", "value": "id"}], + "name": "table_column_to_be_not_null", + "testDefinitionName": "columnValuesToBeNotNull", + "columnName": "id", + "computePassedFailedRowCount": True, }, ], }, @@ -94,7 +95,16 @@ Base = declarative_base() class User(Base): - __tablename__ = ("users",) + __tablename__ = "users" + id = sqa.Column(sqa.Integer, primary_key=True) + name = sqa.Column(sqa.String(256)) + fullname = sqa.Column(sqa.String(256)) + nickname = sqa.Column(sqa.String(256)) + age = sqa.Column(sqa.Integer) + + +class EmptyUser(Base): + __tablename__ = "empty_users" id = sqa.Column(sqa.Integer, primary_key=True) name = sqa.Column(sqa.String(256)) fullname = sqa.Column(sqa.String(256)) @@ -159,11 +169,25 @@ class TestE2EWorkflow(unittest.TestCase): databaseSchema=database_schema.fullyQualifiedName, ) ) + cls.metadata.create_or_update( + CreateTableRequest( + name="empty_users", + columns=[ + Column(name="id", dataType=DataType.INT), + Column(name="name", dataType=DataType.STRING), + Column(name="fullname", dataType=DataType.STRING), + Column(name="nickname", dataType=DataType.STRING), + Column(name="age", dataType=DataType.INT), + ], + databaseSchema=database_schema.fullyQualifiedName, + ) + ) engine = sqa.create_engine(f"sqlite:///{cls.sqlite_conn.config.databaseMode}") session = Session(bind=engine) User.__table__.create(bind=engine) + EmptyUser.__table__.create(bind=engine) for _ in range(10): data = [ @@ -212,38 +236,64 @@ class TestE2EWorkflow(unittest.TestCase): def test_e2e_cli_workflow(self): """test cli workflow e2e""" - workflow = TestSuiteWorkflow.create(test_suite_config) - workflow.execute() - workflow.raise_from_status() + parameters = [ + {"table_name": "users", "status": "Success"}, + {"table_name": "empty_users", "status": "Aborted"}, + ] - test_case_1 = self.metadata.get_by_name( - entity=TestCase, - fqn="test_suite_service_test.test_suite_database.test_suite_database_schema.users.my_test_case", - fields=["testDefinition", "testSuite"], - ) - test_case_2 = self.metadata.get_by_name( - entity=TestCase, - fqn="test_suite_service_test.test_suite_database.test_suite_database_schema.users.table_column_name_to_exists", - fields=["testDefinition", "testSuite"], - ) + for param in parameters: + with self.subTest(param=param): + table_name = param["table_name"] + status = param["status"] + test_suite_config["source"]["sourceConfig"]["config"].update( + { + "entityFullyQualifiedName": f"test_suite_service_test.test_suite_database.test_suite_database_schema.{table_name}" + } + ) - assert test_case_1 - assert test_case_2 + workflow = TestSuiteWorkflow.create(test_suite_config) + workflow.execute() + workflow.raise_from_status() - test_case_result_1 = self.metadata.client.get( - "/dataQuality/testCases/test_suite_service_test.test_suite_database.test_suite_database_schema.users.my_test_case/testCaseResult", - data={ - "startTs": int((datetime.now() - timedelta(days=3)).timestamp()), - "endTs": int((datetime.now() + timedelta(days=3)).timestamp()), - }, - ) - test_case_result_2 = self.metadata.client.get( - "/dataQuality/testCases/test_suite_service_test.test_suite_database.test_suite_database_schema.users.table_column_name_to_exists/testCaseResult", - data={ - "startTs": int((datetime.now() - timedelta(days=3)).timestamp()), - "endTs": int((datetime.now() + timedelta(days=3)).timestamp()), - }, - ) + test_case_1 = self.metadata.get_by_name( + entity=TestCase, + fqn=f"test_suite_service_test.test_suite_database.test_suite_database_schema.{table_name}.my_test_case", + fields=["testDefinition", "testSuite"], + ) + test_case_2 = self.metadata.get_by_name( + entity=TestCase, + fqn=f"test_suite_service_test.test_suite_database.test_suite_database_schema.{table_name}.id.table_column_to_be_not_null", + fields=["testDefinition", "testSuite"], + ) - assert test_case_result_1 - assert test_case_result_2 + assert test_case_1 + assert test_case_2 + + test_case_result_1 = self.metadata.client.get( + f"/dataQuality/testCases/test_suite_service_test.test_suite_database.test_suite_database_schema.{table_name}" + ".my_test_case/testCaseResult", + data={ + "startTs": int((datetime.now() - timedelta(days=3)).timestamp()) + * 1000, + "endTs": int((datetime.now() + timedelta(days=3)).timestamp()) + * 1000, + }, + ) + test_case_result_2 = self.metadata.client.get( + f"/dataQuality/testCases/test_suite_service_test.test_suite_database.test_suite_database_schema.{table_name}" + ".id.table_column_to_be_not_null/testCaseResult", + data={ + "startTs": int((datetime.now() - timedelta(days=3)).timestamp()) + * 1000, + "endTs": int((datetime.now() + timedelta(days=3)).timestamp()) + * 1000, + }, + ) + + data_test_case_result_1: dict = test_case_result_1.get("data") # type: ignore + data_test_case_result_2: dict = test_case_result_2.get("data") # type: ignore + + assert data_test_case_result_1 + assert data_test_case_result_1[0]["testCaseStatus"] == "Success" + assert data_test_case_result_2 + assert data_test_case_result_2[0]["testCaseStatus"] == status diff --git a/openmetadata-docs/content/v1.3.x/connectors/ingestion/workflows/data-quality/tests.md b/openmetadata-docs/content/v1.3.x/connectors/ingestion/workflows/data-quality/tests.md index 2a7e6f021d5..6333b46c0cb 100644 --- a/openmetadata-docs/content/v1.3.x/connectors/ingestion/workflows/data-quality/tests.md +++ b/openmetadata-docs/content/v1.3.x/connectors/ingestion/workflows/data-quality/tests.md @@ -293,11 +293,11 @@ Validate a list of table column name matches an expected set of columns ### Table Custom SQL Test Write you own SQL test. When writting your query you can use 2 strategies: -- `ROWS` (default): expects the query to be written as `SELECT , FROM WHERE `. **Note** if your query returns a large amount of rows it might cause an "Out Of Memeory" error. In this case we recoomend you to use the `COUNT` startegy. +- `ROWS` (default): expects the query to be written as `SELECT , FROM WHERE `. **Note** if your query returns a large amount of rows it might cause an "Out Of Memeory" error. In this case we recomend you to use the `COUNT` strategy. - `COUNT`: expects the query to be written as `SELECT COUNT() FROM WHERE `. **How to use the Threshold Parameter?** -The threshold allows you to define a limit for which you test should pass or fail - by defaut this number is 0. For example if my custom SQL query test returns 10 rows and my threshold is 5 the test will fail. If I update my threshold to 11 on my next run my test will pass. +The threshold allows you to define a limit for which you test should pass or fail - by defaut this number is 0. For example if my custom SQL query test returns 10 rows (or a COUNT value of 10) and my threshold is 5 the test will fail. If I update my threshold to 11 on my next run my test will pass. **Properties** @@ -473,6 +473,7 @@ Makes sure that there are no duplicate values in a given column. description: test description columnName: columnName testDefinitionName: columnValuesToBeUnique + computePassedFailedRowCount: parameterValues: - name: columnNames value: true @@ -516,6 +517,7 @@ Validates that there are no null values in the column. description: test description columnName: columnName testDefinitionName: columnValuesToBeNotNull + computePassedFailedRowCount: parameterValues: - name: columnValuesToBeNotNull value: true @@ -569,6 +571,7 @@ The other databases will fall back to the `LIKE` expression description: test description columnName: columnName testDefinitionName: columnValuesToMatchRegex + computePassedFailedRowCount: parameterValues: - name: regex value: "%something%" @@ -622,6 +625,7 @@ The other databases will fall back to the `LIKE` expression description: test description columnName: columnName testDefinitionName: columnValuesToMatchRegex + computePassedFailedRowCount: parameterValues: - name: forbiddenRegex value: "%something%" @@ -661,10 +665,13 @@ Validate values form a set are present in a column. **YAML Config** ```yaml -testDefinitionName: columnValuesToBeInSet -parameterValues: - - name: allowedValues - value: ["forbidden1", "forbidden2"] +- name: myTestName + testDefinitionName: columnValuesToBeInSet + columnName: columnName + computePassedFailedRowCount: + parameterValues: + - name: allowedValues + value: ["forbidden1", "forbidden2"] ``` **JSON Config** @@ -708,6 +715,7 @@ Validate that there are no values in a column in a set of forbidden values. description: test description columnName: columnName testDefinitionName: columnValuesToBeNotInSet + computePassedFailedRowCount: parameterValues: - name: forbiddenValues value: ["forbidden1", "forbidden2"] @@ -762,6 +770,7 @@ Any of those two need to be informed. description: test description columnName: columnName testDefinitionName: columnValuesToBeBetween + computePassedFailedRowCount: parameterValues: - name: minValue value: ["forbidden1", "forbidden2"] @@ -893,6 +902,7 @@ Any of those two need to be informed. description: test description columnName: columnName testDefinitionName: columnValueLengthsToBeBetween + computePassedFailedRowCount: parameterValues: - name: minLength value: 50