diff --git a/ingestion/src/_openmetadata_testutils/pydantic/test_utils.py b/ingestion/src/_openmetadata_testutils/pydantic/test_utils.py index f914403a32a..34c91246c72 100644 --- a/ingestion/src/_openmetadata_testutils/pydantic/test_utils.py +++ b/ingestion/src/_openmetadata_testutils/pydantic/test_utils.py @@ -43,7 +43,6 @@ def assert_equal_pydantic_objects( """ errors = [] queue = deque([(expected, actual, "")]) - while queue: expected, actual, current_key_prefix = queue.popleft() if not isinstance(expected, actual.__class__): @@ -52,11 +51,13 @@ def assert_equal_pydantic_objects( f"expected: [{type(expected).__name__}], actual: [{type(actual).__name__}]" ) continue - if issubclass(expected.__class__, BaseModel): - for key, expected_value in expected.dict().items(): + if issubclass(expected.__class__, BaseModel) and isinstance( + expected.model_dump(), dict + ): + for key, expected_value in expected.model_dump().items(): if expected_value is None and ignore_none: continue - actual_value = actual.dict().get(key) + actual_value = actual.model_dump().get(key) new_key_prefix = ( f"{current_key_prefix}.{key}" if current_key_prefix else key ) diff --git a/ingestion/src/metadata/profiler/orm/functions/sum.py b/ingestion/src/metadata/profiler/orm/functions/sum.py index a89810d2ba0..74534d1956c 100644 --- a/ingestion/src/metadata/profiler/orm/functions/sum.py +++ b/ingestion/src/metadata/profiler/orm/functions/sum.py @@ -45,7 +45,7 @@ def _(element, compiler, **kw): def _(element, compiler, **kw): """Cast to DECIMAL to address cannot cast nan to bigint""" proc = compiler.process(element.clauses, **kw) - return f"SUM(TRY_CAST({proc} AS DECIMAL))" + return f"COALESCE(SUM(CAST({proc} AS DECIMAL)),0)" @compiles(SumFn, Dialects.BigQuery) diff --git a/ingestion/tests/integration/conftest.py b/ingestion/tests/integration/conftest.py index 5f41c7bae76..1de0ffbf39a 100644 --- a/ingestion/tests/integration/conftest.py +++ b/ingestion/tests/integration/conftest.py @@ -59,7 +59,7 @@ def workflow_config(metadata): } -@pytest.fixture() +@pytest.fixture(scope="module") def profiler_config(db_service, workflow_config, sink_config): return { "source": { @@ -82,7 +82,7 @@ def profiler_config(db_service, workflow_config, sink_config): } -@pytest.fixture() +@pytest.fixture(scope="module") def run_workflow(): def _run(workflow_type: Type[IngestionWorkflow], config, raise_from_status=True): workflow: IngestionWorkflow = workflow_type.create(config) @@ -152,8 +152,14 @@ def create_service_request(): raise NotImplementedError("Implement in the test module") -@pytest.fixture() -def patch_passwords_for_db_services(db_service, unmask_password, monkeypatch): +@pytest.fixture(scope="module") +def monkeymodule(): + with pytest.MonkeyPatch.context() as mp: + yield mp + + +@pytest.fixture(scope="module") +def patch_passwords_for_db_services(db_service, unmask_password, monkeymodule): """Patch the password for all db services returned by the metadata service. Usage: @@ -178,12 +184,12 @@ def patch_passwords_for_db_services(db_service, unmask_password, monkeypatch): return inner - monkeypatch.setattr( + monkeymodule.setattr( "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name", override_password(OpenMetadata.get_by_name), ) - monkeypatch.setattr( + monkeymodule.setattr( "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id", override_password(OpenMetadata.get_by_id), ) diff --git a/ingestion/tests/integration/postgres/test_usage.py b/ingestion/tests/integration/postgres/test_usage.py index dfe6d3e7d00..9eca038818a 100644 --- a/ingestion/tests/integration/postgres/test_usage.py +++ b/ingestion/tests/integration/postgres/test_usage.py @@ -42,7 +42,14 @@ def usage_config(sink_config, workflow_config, db_service): } -def test_usage(run_workflow, ingestion_config, usage_config, metadata, db_service): +def test_usage( + patch_passwords_for_db_services, + run_workflow, + ingestion_config, + usage_config, + metadata, + db_service, +): search_cache.clear() run_workflow(MetadataWorkflow, ingestion_config) run_workflow(UsageWorkflow, usage_config) @@ -54,7 +61,12 @@ def test_usage(run_workflow, ingestion_config, usage_config, metadata, db_servic strict=True, ) def test_usage_delete_usage( - run_workflow, ingestion_config, usage_config, metadata, db_service + patch_passwords_for_db_services, + run_workflow, + ingestion_config, + usage_config, + metadata, + db_service, ): search_cache.clear() run_workflow(MetadataWorkflow, ingestion_config) diff --git a/ingestion/tests/integration/trino/conftest.py b/ingestion/tests/integration/trino/conftest.py index 32dbd225b02..0e9ebef298d 100644 --- a/ingestion/tests/integration/trino/conftest.py +++ b/ingestion/tests/integration/trino/conftest.py @@ -1,10 +1,13 @@ import os.path import random +from time import sleep import docker +import pandas as pd import pytest import testcontainers.core.network -from sqlalchemy import create_engine +from sqlalchemy import create_engine, insert +from sqlalchemy.engine import make_url from tenacity import retry, stop_after_delay, wait_fixed from testcontainers.core.container import DockerContainer from testcontainers.core.generic import DbContainer @@ -121,15 +124,19 @@ def docker_network(): @pytest.fixture(scope="session") def trino_container(hive_metastore_container, minio_container, docker_network): - with TrinoContainer(image="trinodb/trino:418").with_network( - docker_network - ).with_env( - "HIVE_METASTORE_URI", - f"thrift://metastore:{hive_metastore_container.port}", - ).with_env( - "MINIO_ENDPOINT", - f"http://minio:{minio_container.port}", - ) as trino: + container = ( + TrinoContainer(image="trinodb/trino:418") + .with_network(docker_network) + .with_env( + "HIVE_METASTORE_URI", + f"thrift://metastore:{hive_metastore_container.port}", + ) + .with_env( + "MINIO_ENDPOINT", + f"http://minio:{minio_container.port}", + ) + ) + with try_bind(container, container.port, container.port + 1) as trino: yield trino @@ -177,15 +184,51 @@ def minio_container(docker_network): @pytest.fixture(scope="session") def create_test_data(trino_container): - engine = create_engine(trino_container.get_connection_url()) + engine = create_engine( + make_url(trino_container.get_connection_url()).set(database="minio") + ) engine.execute( "create schema minio.my_schema WITH (location = 's3a://hive-warehouse/')" ) - engine.execute("create table minio.my_schema.test_table (id int)") - engine.execute("insert into minio.my_schema.test_table values (1), (2), (3)") + data_dir = os.path.dirname(__file__) + "/data" + for file in os.listdir(data_dir): + df = pd.read_parquet(f"{data_dir}/{file}") + for col in df.columns: + if pd.api.types.is_datetime64tz_dtype(df[col]): + df[col] = df[col].dt.tz_convert(None) + df.to_sql( + file.replace(".parquet", ""), + engine, + schema="my_schema", + if_exists="fail", + index=False, + method=custom_insert, + ) + sleep(1) return +def custom_insert(self, conn, keys: list[str], data_iter): + """ + Hack pandas.io.sql.SQLTable._execute_insert_multi to retry untill rows are inserted. + This is required becauase using trino with pd.to_sql in our setup us unreliable. + """ + rowcount = 0 + max_tries = 10 + try_num = 0 + data = [dict(zip(keys, row)) for row in data_iter] + while rowcount != len(data): + if try_num >= max_tries: + raise RuntimeError(f"Failed to insert data after {max_tries} tries") + try_num += 1 + stmt = insert(self.table).values(data) + conn.execute(stmt) + rowcount = conn.execute( + "SELECT COUNT(*) FROM " + f'"{self.schema}"."{self.name}"' + ).scalar() + return rowcount + + @pytest.fixture(scope="module") def create_service_request(trino_container, tmp_path_factory): return CreateDatabaseServiceRequest( @@ -203,7 +246,7 @@ def create_service_request(trino_container, tmp_path_factory): ) -@pytest.fixture +@pytest.fixture(scope="module") def ingestion_config(db_service, sink_config, workflow_config, base_ingestion_config): base_ingestion_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { "excludes": [ diff --git a/ingestion/tests/integration/trino/data/empty.parquet b/ingestion/tests/integration/trino/data/empty.parquet new file mode 100644 index 00000000000..3027ff75c8a Binary files /dev/null and b/ingestion/tests/integration/trino/data/empty.parquet differ diff --git a/ingestion/tests/integration/trino/data/iris.parquet b/ingestion/tests/integration/trino/data/iris.parquet new file mode 100644 index 00000000000..9224dead94c Binary files /dev/null and b/ingestion/tests/integration/trino/data/iris.parquet differ diff --git a/ingestion/tests/integration/trino/data/table.parquet b/ingestion/tests/integration/trino/data/table.parquet new file mode 100644 index 00000000000..24f7685ea0a Binary files /dev/null and b/ingestion/tests/integration/trino/data/table.parquet differ diff --git a/ingestion/tests/integration/trino/data/titanic.parquet b/ingestion/tests/integration/trino/data/titanic.parquet new file mode 100644 index 00000000000..a9847c5df4c Binary files /dev/null and b/ingestion/tests/integration/trino/data/titanic.parquet differ diff --git a/ingestion/tests/integration/trino/data/userdata.parquet b/ingestion/tests/integration/trino/data/userdata.parquet new file mode 100644 index 00000000000..2ae23dac0ff Binary files /dev/null and b/ingestion/tests/integration/trino/data/userdata.parquet differ diff --git a/ingestion/tests/integration/trino/test_metadata.py b/ingestion/tests/integration/trino/test_metadata.py index aa489bd74f9..85c3c5b5c0e 100644 --- a/ingestion/tests/integration/trino/test_metadata.py +++ b/ingestion/tests/integration/trino/test_metadata.py @@ -1,19 +1,27 @@ +import pytest + from metadata.generated.schema.entity.data.table import Table from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.workflow.metadata import MetadataWorkflow -def test_metadata( - db_service, metadata: OpenMetadata, run_workflow, ingestion_config, create_test_data -): +@pytest.fixture(scope="module") +def run_workflow(run_workflow, ingestion_config, create_test_data): run_workflow(MetadataWorkflow, ingestion_config) - tables = metadata.list_entities( - Table, - params={ - "databaseSchema": f"{db_service.fullyQualifiedName.root}.minio.my_schema" - }, - ) - assert ( - next((t for t in tables.entities if t.name.root == "test_table"), None) - is not None + + +@pytest.mark.parametrize( + "table_name", + [ + "{database_service}.minio.my_schema.table", + "{database_service}.minio.my_schema.titanic", + "{database_service}.minio.my_schema.iris", + "{database_service}.minio.my_schema.userdata", + "{database_service}.minio.my_schema.empty", + ], + ids=lambda x: x.split(".")[-1], +) +def test_metadata(run_workflow, db_service, metadata: OpenMetadata, table_name): + metadata.get_by_name( + Table, table_name.format(database_service=db_service.fullyQualifiedName.root) ) diff --git a/ingestion/tests/integration/trino/test_profiler.py b/ingestion/tests/integration/trino/test_profiler.py index af8bab0512e..5406df19e04 100644 --- a/ingestion/tests/integration/trino/test_profiler.py +++ b/ingestion/tests/integration/trino/test_profiler.py @@ -1,9 +1,22 @@ +from dataclasses import dataclass +from typing import List + +import pytest + +from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects +from metadata.generated.schema.entity.data.table import ( + ColumnProfile, + Table, + TableProfile, +) +from metadata.generated.schema.type.basic import Timestamp from metadata.ingestion.lineage.sql_lineage import search_cache from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -def test_profiler( +@pytest.fixture(scope="module") +def run_profiler( patch_passwords_for_db_services, run_workflow, ingestion_config, @@ -13,3 +26,103 @@ def test_profiler( search_cache.clear() run_workflow(MetadataWorkflow, ingestion_config) run_workflow(ProfilerWorkflow, profiler_config) + + +@dataclass +class ProfilerTestParameters: + table_fqn: str + expected_table_profile: TableProfile + expected_column_profiles: List[ColumnProfile] = None + + +@pytest.mark.parametrize( + "parameters", + [ + ProfilerTestParameters( + "{database_service}.minio.my_schema.table", + TableProfile(timestamp=Timestamp(0), rowCount=3), + [ + ColumnProfile( + name="three", + timestamp=Timestamp(0), + valuesCount=2, + nullCount=1, + ) + ], + ), + ProfilerTestParameters( + "{database_service}.minio.my_schema.titanic", + TableProfile(timestamp=Timestamp(0), rowCount=891), + [ + ColumnProfile( + name="name", + timestamp=Timestamp(0), + valuesCount=891, + nullCount=0, + ) + ], + ), + ProfilerTestParameters( + "{database_service}.minio.my_schema.iris", + TableProfile(timestamp=Timestamp(0), rowCount=150), + [ + ColumnProfile( + name="petal.length", + timestamp=Timestamp(0), + valuesCount=150, + nullCount=0, + ) + ], + ), + ProfilerTestParameters( + "{database_service}.minio.my_schema.userdata", + TableProfile(timestamp=Timestamp(0), rowCount=1000), + [ + ColumnProfile( + name="gender", + timestamp=Timestamp(0), + valuesCount=1000, + nullCount=0, + ) + ], + ), + ProfilerTestParameters( + "{database_service}.minio.my_schema.empty", + TableProfile(timestamp=Timestamp(0), rowCount=0), + [ + ColumnProfile( + name="a", + timestamp=Timestamp(0), + valuesCount=0, + nullCount=0, + ) + ], + ), + ], + ids=lambda x: x.table_fqn.split(".")[-1], +) +def test_profiler( + run_profiler, metadata, db_service, parameters: ProfilerTestParameters +): + table: Table = metadata.get_latest_table_profile( + parameters.table_fqn.format(database_service=db_service.fullyQualifiedName.root) + ) + assert_equal_pydantic_objects( + parameters.expected_table_profile, + # we dont want to validate the timestamp because it will be different for each run + table.profile.model_copy( + update={"timestamp": parameters.expected_table_profile.timestamp} + ), + ) + for profile in parameters.expected_column_profiles: + column = next( + (col for col in table.columns if col.profile.name == profile.name), None + ) + if column is None: + raise AssertionError( + f"Column [{profile.name}] not found in table [{table.fullyQualifiedName.root}]" + ) + assert_equal_pydantic_objects( + profile, + column.profile.model_copy(update={"timestamp": profile.timestamp}), + ) diff --git a/ingestion/tests/integration/trino/trino/etc/log.properties b/ingestion/tests/integration/trino/trino/etc/log.properties index 6420205581a..c03f8bb285d 100644 --- a/ingestion/tests/integration/trino/trino/etc/log.properties +++ b/ingestion/tests/integration/trino/trino/etc/log.properties @@ -1 +1,2 @@ -io.trino=DEBUG \ No newline at end of file +# Set to DEBUG for verbose logging +io.trino=INFO