[GEN-2187] fix(data-diff): added MD5 handling for bigquery (#18904)

* fix(data-diff): added nd5 handling for bigquery

- added MD5 handling for bigquery
- use URL instead of Engine because it requires less steps and less prone to failure

* added e2e test for data diff with sampling in bigquery
This commit is contained in:
Imri Paran 2024-12-06 14:21:33 +01:00 committed by GitHub
parent 0c85e0b829
commit e30571cf4e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 77 additions and 11 deletions

View File

@ -25,7 +25,8 @@ from data_diff.diff_tables import DiffResultWrapper
from data_diff.errors import DataDiffMismatchingKeyTypesError
from data_diff.utils import ArithAlphanumeric, CaseInsensitiveDict
from sqlalchemy import Column as SAColumn
from sqlalchemy import create_engine, literal, select
from sqlalchemy import literal, select
from sqlalchemy.engine import make_url
from metadata.data_quality.validations import utils
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
@ -83,9 +84,9 @@ def build_sample_where_clause(
reduced_concat = reduce(
lambda c1, c2: c1.concat(c2), sql_alchemy_columns + [literal(salt)]
)
sqa_dialect = create_engine(
sqa_dialect = make_url(
f"{PythonDialects[table.database_service_type.name].value}://"
).dialect
).get_dialect()
return str(
select()
.filter(
@ -97,7 +98,7 @@ def build_sample_where_clause(
< hex_nounce
)
.whereclause.compile(
dialect=sqa_dialect,
dialect=sqa_dialect(),
compile_kwargs={"literal_binds": True},
)
)

View File

@ -10,15 +10,13 @@
# limitations under the License.
"""
Define Concat function
Define MD5 hashing function
"""
# Keep SQA docs style defining custom constructs
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import FunctionElement
from metadata.profiler.metrics.core import CACHE
from metadata.profiler.orm.registry import Dialects
from metadata.profiler.orm.registry import PythonDialects
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -33,7 +31,12 @@ def _(element, compiler, **kw):
return f"MD5({compiler.process(element.clauses, **kw)})"
@compiles(MD5, Dialects.MSSQL)
@compiles(MD5, PythonDialects.MSSQL.value)
def _(element, compiler, **kw):
# TODO requires separate where clauses for each table
return f"CONVERT(VARCHAR(8), HashBytes('MD5', {compiler.process(element.clauses, **kw)}), 2)"
@compiles(MD5, PythonDialects.BigQuery.value)
def _(element, compiler, **kw):
return f"TO_HEX(MD5(CAST({compiler.process(element.clauses, **kw)} AS STRING)))"

View File

@ -243,6 +243,7 @@ class CliDBBase(TestCase):
self.create_table_and_view()
self.build_config_file()
self.run_command()
self.add_table_profile_config()
table: Table = self.openmetadata.get_by_name(
Table, self.get_data_quality_table(), nullable=False
)
@ -476,3 +477,6 @@ class CliDBBase(TestCase):
def get_system_profile_cases(self) -> List[Tuple[str, List[SystemProfile]]]:
"""Return a list of tuples with the table fqn and the expected system profile"""
return []
def add_table_profile_config(self):
pass

View File

@ -12,9 +12,18 @@
"""
Test Bigquery connector with CLI
"""
import random
from typing import List, Tuple
from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile
from metadata.data_quality.api.models import TestCaseDefinition
from metadata.generated.schema.entity.data.table import (
DmlOperationType,
ProfileSampleType,
SystemProfile,
TableProfilerConfig,
)
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.testCase import TestCaseParameterValue
from metadata.generated.schema.type.basic import Timestamp
from .common.test_cli_db import CliCommonDB
@ -36,7 +45,22 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods):
"""
insert_data_queries: List[str] = [
"INSERT INTO `open-metadata-beta.exclude_me`.orders (id, order_name) VALUES (1,'XBOX'), (2,'PS');",
(
"INSERT INTO `open-metadata-beta.exclude_me`.orders (id, order_name) VALUES "
+ ",".join(
[
"(" + ",".join(values) + ")"
for values in [
(
str(i),
random.choice(["'PS'", "'XBOX'", "'NINTENDO'", "'SEGA'"]),
)
for i in range(1000)
]
]
)
+ ";"
),
"UPDATE `open-metadata-beta.exclude_me`.orders SET order_name = 'NINTENDO' WHERE id = 2",
]
@ -148,3 +172,37 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods):
],
)
]
def add_table_profile_config(self):
self.openmetadata.create_or_update_table_profiler_config(
self.get_data_quality_table(),
TableProfilerConfig(
profileSampleType=ProfileSampleType.ROWS,
profileSample=100,
),
)
def get_data_quality_table(self):
return self.fqn_created_table()
def get_test_case_definitions(self) -> List[TestCaseDefinition]:
return [
TestCaseDefinition(
name="bigquery_data_diff",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[
TestCaseParameterValue(
name="table2",
value=self.get_data_quality_table(),
),
TestCaseParameterValue(
name="keyColumns",
value='["id"]',
),
],
)
]
def get_expected_test_case_results(self):
return [TestCaseResult(testCaseStatus=TestCaseStatus.Success, timestamp=0)]