From e30571cf4e81f7b2c71a559053aebf4c150c2c9a Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Fri, 6 Dec 2024 14:21:33 +0100 Subject: [PATCH] [GEN-2187] fix(data-diff): added MD5 handling for bigquery (#18904) * fix(data-diff): added nd5 handling for bigquery - added MD5 handling for bigquery - use URL instead of Engine because it requires less steps and less prone to failure * added e2e test for data diff with sampling in bigquery --- .../validations/table/sqlalchemy/tableDiff.py | 9 +-- .../metadata/profiler/orm/functions/md5.py | 13 ++-- ingestion/tests/cli_e2e/base/test_cli_db.py | 4 ++ ingestion/tests/cli_e2e/test_cli_bigquery.py | 62 ++++++++++++++++++- 4 files changed, 77 insertions(+), 11 deletions(-) diff --git a/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableDiff.py b/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableDiff.py index e5cba847550..7d2b2131554 100644 --- a/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableDiff.py +++ b/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableDiff.py @@ -25,7 +25,8 @@ from data_diff.diff_tables import DiffResultWrapper from data_diff.errors import DataDiffMismatchingKeyTypesError from data_diff.utils import ArithAlphanumeric, CaseInsensitiveDict from sqlalchemy import Column as SAColumn -from sqlalchemy import create_engine, literal, select +from sqlalchemy import literal, select +from sqlalchemy.engine import make_url from metadata.data_quality.validations import utils from metadata.data_quality.validations.base_test_handler import BaseTestValidator @@ -83,9 +84,9 @@ def build_sample_where_clause( reduced_concat = reduce( lambda c1, c2: c1.concat(c2), sql_alchemy_columns + [literal(salt)] ) - sqa_dialect = create_engine( + sqa_dialect = make_url( f"{PythonDialects[table.database_service_type.name].value}://" - ).dialect + ).get_dialect() return str( select() .filter( @@ -97,7 +98,7 @@ def build_sample_where_clause( < hex_nounce ) .whereclause.compile( - dialect=sqa_dialect, + dialect=sqa_dialect(), compile_kwargs={"literal_binds": True}, ) ) diff --git a/ingestion/src/metadata/profiler/orm/functions/md5.py b/ingestion/src/metadata/profiler/orm/functions/md5.py index 035326cb17d..61666450c36 100644 --- a/ingestion/src/metadata/profiler/orm/functions/md5.py +++ b/ingestion/src/metadata/profiler/orm/functions/md5.py @@ -10,15 +10,13 @@ # limitations under the License. """ -Define Concat function +Define MD5 hashing function """ -# Keep SQA docs style defining custom constructs - from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.functions import FunctionElement from metadata.profiler.metrics.core import CACHE -from metadata.profiler.orm.registry import Dialects +from metadata.profiler.orm.registry import PythonDialects from metadata.utils.logger import profiler_logger logger = profiler_logger() @@ -33,7 +31,12 @@ def _(element, compiler, **kw): return f"MD5({compiler.process(element.clauses, **kw)})" -@compiles(MD5, Dialects.MSSQL) +@compiles(MD5, PythonDialects.MSSQL.value) def _(element, compiler, **kw): # TODO requires separate where clauses for each table return f"CONVERT(VARCHAR(8), HashBytes('MD5', {compiler.process(element.clauses, **kw)}), 2)" + + +@compiles(MD5, PythonDialects.BigQuery.value) +def _(element, compiler, **kw): + return f"TO_HEX(MD5(CAST({compiler.process(element.clauses, **kw)} AS STRING)))" diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index 838d40536d6..b2e29e7b8f8 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -243,6 +243,7 @@ class CliDBBase(TestCase): self.create_table_and_view() self.build_config_file() self.run_command() + self.add_table_profile_config() table: Table = self.openmetadata.get_by_name( Table, self.get_data_quality_table(), nullable=False ) @@ -476,3 +477,6 @@ class CliDBBase(TestCase): def get_system_profile_cases(self) -> List[Tuple[str, List[SystemProfile]]]: """Return a list of tuples with the table fqn and the expected system profile""" return [] + + def add_table_profile_config(self): + pass diff --git a/ingestion/tests/cli_e2e/test_cli_bigquery.py b/ingestion/tests/cli_e2e/test_cli_bigquery.py index 8ed4f4c7891..23bad416a5a 100644 --- a/ingestion/tests/cli_e2e/test_cli_bigquery.py +++ b/ingestion/tests/cli_e2e/test_cli_bigquery.py @@ -12,9 +12,18 @@ """ Test Bigquery connector with CLI """ +import random from typing import List, Tuple -from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile +from metadata.data_quality.api.models import TestCaseDefinition +from metadata.generated.schema.entity.data.table import ( + DmlOperationType, + ProfileSampleType, + SystemProfile, + TableProfilerConfig, +) +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.testCase import TestCaseParameterValue from metadata.generated.schema.type.basic import Timestamp from .common.test_cli_db import CliCommonDB @@ -36,7 +45,22 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods): """ insert_data_queries: List[str] = [ - "INSERT INTO `open-metadata-beta.exclude_me`.orders (id, order_name) VALUES (1,'XBOX'), (2,'PS');", + ( + "INSERT INTO `open-metadata-beta.exclude_me`.orders (id, order_name) VALUES " + + ",".join( + [ + "(" + ",".join(values) + ")" + for values in [ + ( + str(i), + random.choice(["'PS'", "'XBOX'", "'NINTENDO'", "'SEGA'"]), + ) + for i in range(1000) + ] + ] + ) + + ";" + ), "UPDATE `open-metadata-beta.exclude_me`.orders SET order_name = 'NINTENDO' WHERE id = 2", ] @@ -148,3 +172,37 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods): ], ) ] + + def add_table_profile_config(self): + self.openmetadata.create_or_update_table_profiler_config( + self.get_data_quality_table(), + TableProfilerConfig( + profileSampleType=ProfileSampleType.ROWS, + profileSample=100, + ), + ) + + def get_data_quality_table(self): + return self.fqn_created_table() + + def get_test_case_definitions(self) -> List[TestCaseDefinition]: + return [ + TestCaseDefinition( + name="bigquery_data_diff", + testDefinitionName="tableDiff", + computePassedFailedRowCount=True, + parameterValues=[ + TestCaseParameterValue( + name="table2", + value=self.get_data_quality_table(), + ), + TestCaseParameterValue( + name="keyColumns", + value='["id"]', + ), + ], + ) + ] + + def get_expected_test_case_results(self): + return [TestCaseResult(testCaseStatus=TestCaseStatus.Success, timestamp=0)]