fix(dq): data types for unique columns (#17431)

1. remove json and array from supported data types of unique column test.
2. migrations.
3. tests.
This commit is contained in:
Imri Paran 2024-08-19 14:28:42 +02:00 committed by GitHub
parent 14a473e5d3
commit 7508848376
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 173 additions and 56 deletions

View File

@ -107,3 +107,8 @@ WHERE JSON_CONTAINS_PATH(json, 'one', '$.entityId') OR JSON_CONTAINS_PATH(json,
-- Add entityId and type column to thread_entity table
ALTER table thread_entity ADD COLUMN entityId VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.entityRef.id');
ALTER table thread_entity ADD COLUMN entityType VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.entityRef.type');
UPDATE test_definition
SET json = JSON_SET(json, '$.supportedDataTypes', JSON_ARRAY('NUMBER', 'TINYINT', 'SMALLINT', 'INT', 'BIGINT', 'BYTEINT', 'BYTES', 'FLOAT', 'DOUBLE', 'DECIMAL', 'NUMERIC', 'TIMESTAMP', 'TIMESTAMPZ', 'TIME', 'DATE', 'DATETIME', 'INTERVAL', 'STRING', 'MEDIUMTEXT', 'TEXT', 'CHAR', 'VARCHAR', 'BOOLEAN', 'BINARY', 'VARBINARY', 'BLOB', 'LONGBLOB', 'MEDIUMBLOB', 'MAP', 'STRUCT', 'UNION', 'SET', 'GEOGRAPHY', 'ENUM', 'UUID', 'VARIANT', 'GEOMETRY', 'POINT', 'POLYGON'))
WHERE name = 'columnValuesToBeUnique';

View File

@ -104,3 +104,11 @@ WHERE jsonb_exists(json, 'entityId') OR jsonb_exists(json, 'entityType');
-- Add entityId and type column to thread_entity table
ALTER TABLE thread_entity ADD COLUMN entityId VARCHAR(36) GENERATED ALWAYS AS (json->'entityRef'->>'id') STORED;
ALTER TABLE thread_entity ADD COLUMN entityType VARCHAR(36) GENERATED ALWAYS AS (json->'entityRef'->>'type') STORED;
UPDATE test_definition
SET json = jsonb_set(
json,
'{supportedDataTypes}',
'["NUMBER", "TINYINT", "SMALLINT", "INT", "BIGINT", "BYTEINT", "BYTES", "FLOAT", "DOUBLE", "DECIMAL", "NUMERIC", "TIMESTAMP", "TIMESTAMPZ", "TIME", "DATE", "DATETIME", "INTERVAL", "STRING", "MEDIUMTEXT", "TEXT", "CHAR", "VARCHAR", "BOOLEAN", "BINARY", "VARBINARY", "BLOB", "LONGBLOB", "MEDIUMBLOB", "MAP", "STRUCT", "UNION", "SET", "GEOGRAPHY", "ENUM", "UUID", "VARIANT", "GEOMETRY", "POINT", "POLYGON"]'::jsonb
)
WHERE name = 'columnValuesToBeUnique';

View File

@ -6,6 +6,7 @@ import zipfile
from subprocess import CalledProcessError
import pytest
from sqlalchemy import create_engine
from testcontainers.postgres import PostgresContainer
from _openmetadata_testutils.helpers.docker import copy_dir_to_container, try_bind
@ -67,4 +68,8 @@ def postgres_container(tmp_path_factory):
raise CalledProcessError(
returncode=res[0], cmd=res, output=res[1].decode("utf-8")
)
engine = create_engine(container.get_connection_url())
engine.execute(
"ALTER TABLE customer ADD COLUMN json_field JSONB DEFAULT '{}'::JSONB;"
)
yield container

View File

@ -477,6 +477,8 @@ def copy_table(source_engine, destination_engine, table_name):
and destination_engine.dialect.name == "mssql"
):
column_copy = SQAColumn(column.name, sqltypes.DateTime)
elif isinstance(column.type, postgresql.json.JSONB):
column_copy = SQAColumn(column.name, sqltypes.JSON)
else:
column_copy = SQAColumn(column.name, column.type)
destination_table.append_column(column_copy)

View File

@ -1,8 +1,11 @@
import sys
from dataclasses import dataclass
from typing import List
import pytest
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.data_quality.api.models import TestCaseDefinition
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuiteConfigType,
@ -129,66 +132,118 @@ def test_data_quality(
@pytest.fixture()
def incpompatible_column_type_config(db_service, workflow_config, sink_config):
return {
"source": {
"type": "TestSuite",
"serviceName": "MyTestSuite",
"sourceConfig": {
def get_incompatible_column_type_config(workflow_config, sink_config):
def inner(entity_fqn: str, incompatible_test_case: TestCaseDefinition):
return {
"source": {
"type": "TestSuite",
"serviceName": "MyTestSuite",
"sourceConfig": {
"config": {
"type": "TestSuite",
"entityFullyQualifiedName": entity_fqn,
}
},
},
"processor": {
"type": "orm-test-runner",
"config": {
"type": "TestSuite",
"entityFullyQualifiedName": f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
}
"testCases": [
incompatible_test_case.model_dump(),
{
"name": "compatible_test",
"testDefinitionName": "columnValueMaxToBeBetween",
"columnName": "customer_id",
"parameterValues": [
{"name": "minValueForMaxInCol", "value": "0"},
{"name": "maxValueForMaxInCol", "value": "10"},
],
},
]
},
},
},
"processor": {
"type": "orm-test-runner",
"config": {
"testCases": [
{
"name": "incompatible_column_type",
"testDefinitionName": "columnValueMaxToBeBetween",
"columnName": "first_name",
"parameterValues": [
{"name": "minValueForMaxInCol", "value": "0"},
{"name": "maxValueForMaxInCol", "value": "10"},
],
},
{
"name": "compatible_test",
"testDefinitionName": "columnValueMaxToBeBetween",
"columnName": "customer_id",
"parameterValues": [
{"name": "minValueForMaxInCol", "value": "0"},
{"name": "maxValueForMaxInCol", "value": "10"},
],
},
]
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
"sink": sink_config,
"workflowConfig": workflow_config,
}
return inner
@dataclass
class IncompatibleTypeParameter:
entity_fqn: str
test_case: TestCaseDefinition
expected_failure: TruncatedStackTraceError
@pytest.fixture(
params=[
IncompatibleTypeParameter(
entity_fqn="{database_service}.dvdrental.public.customer",
test_case=TestCaseDefinition(
name="string_max_between",
testDefinitionName="columnValueMaxToBeBetween",
columnName="first_name",
parameterValues=[
{"name": "minValueForMaxInCol", "value": "0"},
{"name": "maxValueForMaxInCol", "value": "10"},
],
),
expected_failure=TruncatedStackTraceError(
name="Incompatible Column for Test Case",
error="Test case string_max_between of type columnValueMaxToBeBetween "
"is not compatible with column first_name of type VARCHAR",
),
),
IncompatibleTypeParameter(
entity_fqn="{database_service}.dvdrental.public.customer",
test_case=TestCaseDefinition(
name="unique_json_column",
testDefinitionName="columnValuesToBeUnique",
columnName="json_field",
),
expected_failure=TruncatedStackTraceError(
name="Incompatible Column for Test Case",
error="Test case unique_json_column of type columnValuesToBeUnique "
"is not compatible with column json_field of type JSON",
),
),
],
ids=lambda x: x.test_case.name,
)
def parameters(request, db_service):
request.param.entity_fqn = request.param.entity_fqn.format(
database_service=db_service.fullyQualifiedName.root
)
return request.param
def test_incompatible_column_type(
parameters: IncompatibleTypeParameter,
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
incpompatible_column_type_config,
get_incompatible_column_type_config,
metadata: OpenMetadata,
db_service,
cleanup_fqns,
):
run_workflow(MetadataWorkflow, ingestion_config)
test_suite_processor = run_workflow(
TestSuiteWorkflow, incpompatible_column_type_config, raise_from_status=False
TestSuiteWorkflow,
get_incompatible_column_type_config(
parameters.entity_fqn, parameters.test_case
),
raise_from_status=False,
)
cleanup_fqns(
TestCase,
f"{parameters.entity_fqn}.{parameters.test_case.columnName}.{parameters.test_case.name}",
)
assert_equal_pydantic_objects(
parameters.expected_failure,
test_suite_processor.steps[0].get_status().failures[0],
)
assert test_suite_processor.steps[0].get_status().failures == [
TruncatedStackTraceError(
name="Incompatible Column for Test Case",
error="Test case incompatible_column_type of type columnValueMaxToBeBetween is not compatible with column first_name of type VARCHAR",
)
], "Test case incompatible_column_type should fail"
assert (
f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer.customer_id.compatible_test"
in test_suite_processor.steps[1].get_status().records

View File

@ -1,12 +1,54 @@
{
"name": "columnValuesToBeUnique",
"fullyQualifiedName": "columnValuesToBeUnique",
"displayName": "Column Values To Be Unique",
"description": "This schema defines the test ColumnValuesToBeUnique. Test the values in a column to be unique. ",
"entityType": "COLUMN",
"testPlatforms": ["OpenMetadata"],
"supportedDataTypes": ["NUMBER","TINYINT","SMALLINT","INT","BIGINT","BYTEINT","BYTES","FLOAT","DOUBLE","DECIMAL","NUMERIC","TIMESTAMP","TIMESTAMPZ","TIME","DATE","DATETIME","INTERVAL","STRING","MEDIUMTEXT","TEXT","CHAR","VARCHAR","BOOLEAN","BINARY","VARBINARY","ARRAY","BLOB","LONGBLOB","MEDIUMBLOB","MAP","STRUCT","UNION","SET","GEOGRAPHY","ENUM","JSON","UUID","VARIANT","GEOMETRY","POINT","POLYGON"],
"supportsRowLevelPassedFailed": true,
"provider": "system",
"dataQualityDimension": "Uniqueness"
"name": "columnValuesToBeUnique",
"fullyQualifiedName": "columnValuesToBeUnique",
"displayName": "Column Values To Be Unique",
"description": "This schema defines the test ColumnValuesToBeUnique. Test the values in a column to be unique. ",
"entityType": "COLUMN",
"testPlatforms": [
"OpenMetadata"
],
"supportedDataTypes": [
"NUMBER",
"TINYINT",
"SMALLINT",
"INT",
"BIGINT",
"BYTEINT",
"BYTES",
"FLOAT",
"DOUBLE",
"DECIMAL",
"NUMERIC",
"TIMESTAMP",
"TIMESTAMPZ",
"TIME",
"DATE",
"DATETIME",
"INTERVAL",
"STRING",
"MEDIUMTEXT",
"TEXT",
"CHAR",
"VARCHAR",
"BOOLEAN",
"BINARY",
"VARBINARY",
"BLOB",
"LONGBLOB",
"MEDIUMBLOB",
"MAP",
"STRUCT",
"UNION",
"SET",
"GEOGRAPHY",
"ENUM",
"UUID",
"VARIANT",
"GEOMETRY",
"POINT",
"POLYGON"
],
"supportsRowLevelPassedFailed": true,
"provider": "system",
"dataQualityDimension": "Uniqueness"
}