MINOR: fix table profiler on empty tables in trino (#17471)

* fix(profiler): trino

coalesce row count to 0 if result is null. this value gets returned for empty tables

* fixed test_metadata.py
This commit is contained in:
Imri Paran 2024-08-20 10:42:10 +02:00 committed by GitHub
parent 8ad3db243b
commit 5da7bb049c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 225 additions and 41 deletions

View File

@ -43,7 +43,6 @@ def assert_equal_pydantic_objects(
""" """
errors = [] errors = []
queue = deque([(expected, actual, "")]) queue = deque([(expected, actual, "")])
while queue: while queue:
expected, actual, current_key_prefix = queue.popleft() expected, actual, current_key_prefix = queue.popleft()
if not isinstance(expected, actual.__class__): if not isinstance(expected, actual.__class__):
@ -52,11 +51,13 @@ def assert_equal_pydantic_objects(
f"expected: [{type(expected).__name__}], actual: [{type(actual).__name__}]" f"expected: [{type(expected).__name__}], actual: [{type(actual).__name__}]"
) )
continue continue
if issubclass(expected.__class__, BaseModel): if issubclass(expected.__class__, BaseModel) and isinstance(
for key, expected_value in expected.dict().items(): expected.model_dump(), dict
):
for key, expected_value in expected.model_dump().items():
if expected_value is None and ignore_none: if expected_value is None and ignore_none:
continue continue
actual_value = actual.dict().get(key) actual_value = actual.model_dump().get(key)
new_key_prefix = ( new_key_prefix = (
f"{current_key_prefix}.{key}" if current_key_prefix else key f"{current_key_prefix}.{key}" if current_key_prefix else key
) )

View File

@ -45,7 +45,7 @@ def _(element, compiler, **kw):
def _(element, compiler, **kw): def _(element, compiler, **kw):
"""Cast to DECIMAL to address cannot cast nan to bigint""" """Cast to DECIMAL to address cannot cast nan to bigint"""
proc = compiler.process(element.clauses, **kw) proc = compiler.process(element.clauses, **kw)
return f"SUM(TRY_CAST({proc} AS DECIMAL))" return f"COALESCE(SUM(CAST({proc} AS DECIMAL)),0)"
@compiles(SumFn, Dialects.BigQuery) @compiles(SumFn, Dialects.BigQuery)

View File

@ -59,7 +59,7 @@ def workflow_config(metadata):
} }
@pytest.fixture() @pytest.fixture(scope="module")
def profiler_config(db_service, workflow_config, sink_config): def profiler_config(db_service, workflow_config, sink_config):
return { return {
"source": { "source": {
@ -82,7 +82,7 @@ def profiler_config(db_service, workflow_config, sink_config):
} }
@pytest.fixture() @pytest.fixture(scope="module")
def run_workflow(): def run_workflow():
def _run(workflow_type: Type[IngestionWorkflow], config, raise_from_status=True): def _run(workflow_type: Type[IngestionWorkflow], config, raise_from_status=True):
workflow: IngestionWorkflow = workflow_type.create(config) workflow: IngestionWorkflow = workflow_type.create(config)
@ -152,8 +152,14 @@ def create_service_request():
raise NotImplementedError("Implement in the test module") raise NotImplementedError("Implement in the test module")
@pytest.fixture() @pytest.fixture(scope="module")
def patch_passwords_for_db_services(db_service, unmask_password, monkeypatch): def monkeymodule():
with pytest.MonkeyPatch.context() as mp:
yield mp
@pytest.fixture(scope="module")
def patch_passwords_for_db_services(db_service, unmask_password, monkeymodule):
"""Patch the password for all db services returned by the metadata service. """Patch the password for all db services returned by the metadata service.
Usage: Usage:
@ -178,12 +184,12 @@ def patch_passwords_for_db_services(db_service, unmask_password, monkeypatch):
return inner return inner
monkeypatch.setattr( monkeymodule.setattr(
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name", "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name",
override_password(OpenMetadata.get_by_name), override_password(OpenMetadata.get_by_name),
) )
monkeypatch.setattr( monkeymodule.setattr(
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id", "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id",
override_password(OpenMetadata.get_by_id), override_password(OpenMetadata.get_by_id),
) )

View File

@ -42,7 +42,14 @@ def usage_config(sink_config, workflow_config, db_service):
} }
def test_usage(run_workflow, ingestion_config, usage_config, metadata, db_service): def test_usage(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
usage_config,
metadata,
db_service,
):
search_cache.clear() search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config) run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(UsageWorkflow, usage_config) run_workflow(UsageWorkflow, usage_config)
@ -54,7 +61,12 @@ def test_usage(run_workflow, ingestion_config, usage_config, metadata, db_servic
strict=True, strict=True,
) )
def test_usage_delete_usage( def test_usage_delete_usage(
run_workflow, ingestion_config, usage_config, metadata, db_service patch_passwords_for_db_services,
run_workflow,
ingestion_config,
usage_config,
metadata,
db_service,
): ):
search_cache.clear() search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config) run_workflow(MetadataWorkflow, ingestion_config)

View File

@ -1,10 +1,13 @@
import os.path import os.path
import random import random
from time import sleep
import docker import docker
import pandas as pd
import pytest import pytest
import testcontainers.core.network import testcontainers.core.network
from sqlalchemy import create_engine from sqlalchemy import create_engine, insert
from sqlalchemy.engine import make_url
from tenacity import retry, stop_after_delay, wait_fixed from tenacity import retry, stop_after_delay, wait_fixed
from testcontainers.core.container import DockerContainer from testcontainers.core.container import DockerContainer
from testcontainers.core.generic import DbContainer from testcontainers.core.generic import DbContainer
@ -121,15 +124,19 @@ def docker_network():
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def trino_container(hive_metastore_container, minio_container, docker_network): def trino_container(hive_metastore_container, minio_container, docker_network):
with TrinoContainer(image="trinodb/trino:418").with_network( container = (
docker_network TrinoContainer(image="trinodb/trino:418")
).with_env( .with_network(docker_network)
.with_env(
"HIVE_METASTORE_URI", "HIVE_METASTORE_URI",
f"thrift://metastore:{hive_metastore_container.port}", f"thrift://metastore:{hive_metastore_container.port}",
).with_env( )
.with_env(
"MINIO_ENDPOINT", "MINIO_ENDPOINT",
f"http://minio:{minio_container.port}", f"http://minio:{minio_container.port}",
) as trino: )
)
with try_bind(container, container.port, container.port + 1) as trino:
yield trino yield trino
@ -177,15 +184,51 @@ def minio_container(docker_network):
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def create_test_data(trino_container): def create_test_data(trino_container):
engine = create_engine(trino_container.get_connection_url()) engine = create_engine(
make_url(trino_container.get_connection_url()).set(database="minio")
)
engine.execute( engine.execute(
"create schema minio.my_schema WITH (location = 's3a://hive-warehouse/')" "create schema minio.my_schema WITH (location = 's3a://hive-warehouse/')"
) )
engine.execute("create table minio.my_schema.test_table (id int)") data_dir = os.path.dirname(__file__) + "/data"
engine.execute("insert into minio.my_schema.test_table values (1), (2), (3)") for file in os.listdir(data_dir):
df = pd.read_parquet(f"{data_dir}/{file}")
for col in df.columns:
if pd.api.types.is_datetime64tz_dtype(df[col]):
df[col] = df[col].dt.tz_convert(None)
df.to_sql(
file.replace(".parquet", ""),
engine,
schema="my_schema",
if_exists="fail",
index=False,
method=custom_insert,
)
sleep(1)
return return
def custom_insert(self, conn, keys: list[str], data_iter):
"""
Hack pandas.io.sql.SQLTable._execute_insert_multi to retry untill rows are inserted.
This is required becauase using trino with pd.to_sql in our setup us unreliable.
"""
rowcount = 0
max_tries = 10
try_num = 0
data = [dict(zip(keys, row)) for row in data_iter]
while rowcount != len(data):
if try_num >= max_tries:
raise RuntimeError(f"Failed to insert data after {max_tries} tries")
try_num += 1
stmt = insert(self.table).values(data)
conn.execute(stmt)
rowcount = conn.execute(
"SELECT COUNT(*) FROM " + f'"{self.schema}"."{self.name}"'
).scalar()
return rowcount
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def create_service_request(trino_container, tmp_path_factory): def create_service_request(trino_container, tmp_path_factory):
return CreateDatabaseServiceRequest( return CreateDatabaseServiceRequest(
@ -203,7 +246,7 @@ def create_service_request(trino_container, tmp_path_factory):
) )
@pytest.fixture @pytest.fixture(scope="module")
def ingestion_config(db_service, sink_config, workflow_config, base_ingestion_config): def ingestion_config(db_service, sink_config, workflow_config, base_ingestion_config):
base_ingestion_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { base_ingestion_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
"excludes": [ "excludes": [

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,19 +1,27 @@
import pytest
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.metadata import MetadataWorkflow
def test_metadata( @pytest.fixture(scope="module")
db_service, metadata: OpenMetadata, run_workflow, ingestion_config, create_test_data def run_workflow(run_workflow, ingestion_config, create_test_data):
):
run_workflow(MetadataWorkflow, ingestion_config) run_workflow(MetadataWorkflow, ingestion_config)
tables = metadata.list_entities(
Table,
params={ @pytest.mark.parametrize(
"databaseSchema": f"{db_service.fullyQualifiedName.root}.minio.my_schema" "table_name",
}, [
"{database_service}.minio.my_schema.table",
"{database_service}.minio.my_schema.titanic",
"{database_service}.minio.my_schema.iris",
"{database_service}.minio.my_schema.userdata",
"{database_service}.minio.my_schema.empty",
],
ids=lambda x: x.split(".")[-1],
) )
assert ( def test_metadata(run_workflow, db_service, metadata: OpenMetadata, table_name):
next((t for t in tables.entities if t.name.root == "test_table"), None) metadata.get_by_name(
is not None Table, table_name.format(database_service=db_service.fullyQualifiedName.root)
) )

View File

@ -1,9 +1,22 @@
from dataclasses import dataclass
from typing import List
import pytest
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.generated.schema.entity.data.table import (
ColumnProfile,
Table,
TableProfile,
)
from metadata.generated.schema.type.basic import Timestamp
from metadata.ingestion.lineage.sql_lineage import search_cache from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.profiler import ProfilerWorkflow
def test_profiler( @pytest.fixture(scope="module")
def run_profiler(
patch_passwords_for_db_services, patch_passwords_for_db_services,
run_workflow, run_workflow,
ingestion_config, ingestion_config,
@ -13,3 +26,103 @@ def test_profiler(
search_cache.clear() search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config) run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(ProfilerWorkflow, profiler_config) run_workflow(ProfilerWorkflow, profiler_config)
@dataclass
class ProfilerTestParameters:
table_fqn: str
expected_table_profile: TableProfile
expected_column_profiles: List[ColumnProfile] = None
@pytest.mark.parametrize(
"parameters",
[
ProfilerTestParameters(
"{database_service}.minio.my_schema.table",
TableProfile(timestamp=Timestamp(0), rowCount=3),
[
ColumnProfile(
name="three",
timestamp=Timestamp(0),
valuesCount=2,
nullCount=1,
)
],
),
ProfilerTestParameters(
"{database_service}.minio.my_schema.titanic",
TableProfile(timestamp=Timestamp(0), rowCount=891),
[
ColumnProfile(
name="name",
timestamp=Timestamp(0),
valuesCount=891,
nullCount=0,
)
],
),
ProfilerTestParameters(
"{database_service}.minio.my_schema.iris",
TableProfile(timestamp=Timestamp(0), rowCount=150),
[
ColumnProfile(
name="petal.length",
timestamp=Timestamp(0),
valuesCount=150,
nullCount=0,
)
],
),
ProfilerTestParameters(
"{database_service}.minio.my_schema.userdata",
TableProfile(timestamp=Timestamp(0), rowCount=1000),
[
ColumnProfile(
name="gender",
timestamp=Timestamp(0),
valuesCount=1000,
nullCount=0,
)
],
),
ProfilerTestParameters(
"{database_service}.minio.my_schema.empty",
TableProfile(timestamp=Timestamp(0), rowCount=0),
[
ColumnProfile(
name="a",
timestamp=Timestamp(0),
valuesCount=0,
nullCount=0,
)
],
),
],
ids=lambda x: x.table_fqn.split(".")[-1],
)
def test_profiler(
run_profiler, metadata, db_service, parameters: ProfilerTestParameters
):
table: Table = metadata.get_latest_table_profile(
parameters.table_fqn.format(database_service=db_service.fullyQualifiedName.root)
)
assert_equal_pydantic_objects(
parameters.expected_table_profile,
# we dont want to validate the timestamp because it will be different for each run
table.profile.model_copy(
update={"timestamp": parameters.expected_table_profile.timestamp}
),
)
for profile in parameters.expected_column_profiles:
column = next(
(col for col in table.columns if col.profile.name == profile.name), None
)
if column is None:
raise AssertionError(
f"Column [{profile.name}] not found in table [{table.fullyQualifiedName.root}]"
)
assert_equal_pydantic_objects(
profile,
column.profile.model_copy(update={"timestamp": profile.timestamp}),
)

View File

@ -1 +1,2 @@
io.trino=DEBUG # Set to DEBUG for verbose logging
io.trino=INFO