diff --git a/ingestion/tests/integration/conftest.py b/ingestion/tests/integration/conftest.py index 43ccd78c9b5..b3f9fcff1b8 100644 --- a/ingestion/tests/integration/conftest.py +++ b/ingestion/tests/integration/conftest.py @@ -6,6 +6,9 @@ import pytest from _openmetadata_testutils.ometa import int_admin_ometa from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseMetadataConfigType, +) from metadata.generated.schema.metadataIngestion.workflow import LogLevels from metadata.ingestion.api.common import Entity from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -215,7 +218,9 @@ def ingestion_config(db_service, metadata, workflow_config, sink_config): "source": { "type": db_service.connection.config.type.value.lower(), "serviceName": db_service.fullyQualifiedName.root, - "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + "sourceConfig": { + "config": {"type": DatabaseMetadataConfigType.DatabaseMetadata.value} + }, "serviceConnection": db_service.connection.model_dump(), }, "sink": sink_config, diff --git a/ingestion/tests/integration/postgres/test_lineage.py b/ingestion/tests/integration/postgres/test_lineage.py index 9a5b9f34138..636d2bf7aa4 100644 --- a/ingestion/tests/integration/postgres/test_lineage.py +++ b/ingestion/tests/integration/postgres/test_lineage.py @@ -26,10 +26,33 @@ def native_lineage_config(db_service, workflow_config, sink_config): } +@pytest.mark.parametrize( + "source_config,expected_nodes", + [ + ({"includeDDL": False}, 0), + ({"includeDDL": True}, 3), + ], + ids=lambda config: ( + "".join([f"{k}={str(v)}" for k, v in config.items()]) + if isinstance(config, dict) + else "" + ), +) def test_native_lineage( - run_workflow, ingestion_config, native_lineage_config, metadata, db_service + source_config, + expected_nodes, + run_workflow, + ingestion_config, + native_lineage_config, + metadata, + db_service, ): + ingestion_config["source"]["sourceConfig"]["config"].update(source_config) run_workflow(MetadataWorkflow, ingestion_config) + film_actor_edges = metadata.get_lineage_by_name( + Table, f"{db_service.fullyQualifiedName.root}.dvdrental.public.film_actor" + ) + assert len(film_actor_edges["nodes"]) == expected_nodes run_workflow(MetadataWorkflow, native_lineage_config) @@ -97,12 +120,15 @@ def test_log_lineage( def reindex_search(metadata: OpenMetadata, timeout=60): start = time.time() - status = None - while status is None or status == "running": + # wait for previous reindexing to finish (if any) + while True: response = metadata.client.get( "/apps/name/SearchIndexingApplication/status?offset=0&limit=1" ) - status = response["data"][0]["status"] + if len(response["data"]) == 0: + break + if response["data"][0]["status"] != "running": + break if time.time() - start > timeout: raise TimeoutError("Timed out waiting for reindexing to start") time.sleep(1) @@ -111,6 +137,7 @@ def reindex_search(metadata: OpenMetadata, timeout=60): ) # app interactivity is not immediate (probably bc async operations), so we wait a bit metadata.client.post("/apps/trigger/SearchIndexingApplication") time.sleep(0.5) # here too + status = None while status != "success": response = metadata.client.get( "/apps/name/SearchIndexingApplication/status?offset=0&limit=1"