From 067fb510ab0edff4b406d67df9418e9dd8f9544f Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Tue, 28 May 2024 11:23:43 +0200 Subject: [PATCH] MINOR: test case for usage->delete->usage (#16409) * test: added test case to demonstrate cache issue --- .../integration/postgres/test_postgres.py | 106 +++++++++++++++++- 1 file changed, 103 insertions(+), 3 deletions(-) diff --git a/ingestion/tests/integration/postgres/test_postgres.py b/ingestion/tests/integration/postgres/test_postgres.py index 29b04f8801f..62b6325b831 100644 --- a/ingestion/tests/integration/postgres/test_postgres.py +++ b/ingestion/tests/integration/postgres/test_postgres.py @@ -36,6 +36,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( WorkflowConfig, ) from metadata.ingestion.lineage.sql_lineage import search_cache +from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.profiler.api.models import ProfilerProcessorConfig from metadata.workflow.metadata import MetadataWorkflow @@ -64,9 +65,15 @@ def db_service(metadata, postgres_container): service_entity = metadata.create_or_update(data=service) service_entity.connection.config.authType.password = postgres_container.password yield service_entity - metadata.delete( - DatabaseService, service_entity.id, recursive=True, hard_delete=True - ) + try: + metadata.delete( + DatabaseService, service_entity.id, recursive=True, hard_delete=True + ) + except APIError as error: + if error.status_code == 404: + pass + else: + raise @pytest.fixture(scope="module") @@ -201,3 +208,96 @@ def test_profiler(run_profiler_workflow): def test_lineage(ingest_lineage): pass + + +def run_usage_workflow(db_service, metadata): + workflow_config = { + "source": { + "type": "postgres-usage", + "serviceName": db_service.fullyQualifiedName.__root__, + "serviceConnection": db_service.connection.dict(), + "sourceConfig": { + "config": {"type": DatabaseUsageConfigType.DatabaseUsage.value} + }, + }, + "processor": {"type": "query-parser", "config": {}}, + "stage": { + "type": "table-usage", + "config": { + "filename": "/tmp/postgres_usage", + }, + }, + "bulkSink": { + "type": "metadata-usage", + "config": { + "filename": "/tmp/postgres_usage", + }, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "loggerLevel": "DEBUG", + "openMetadataServerConfig": metadata.config.dict(), + }, + } + workflow = UsageWorkflow.create(workflow_config) + search_cache.clear() + workflow.execute() + workflow.raise_from_status() + + +@pytest.mark.xfail( + reason="'metadata.ingestion.lineage.sql_lineage.search_cache' gets corrupted with invalid data." + " See issue https://github.com/open-metadata/OpenMetadata/issues/16408" +) +def test_usage_delete_usage(db_service, ingest_lineage, metadata): + workflow_config = { + "source": { + "type": "postgres-usage", + "serviceName": db_service.fullyQualifiedName.__root__, + "serviceConnection": db_service.connection.dict(), + "sourceConfig": { + "config": {"type": DatabaseUsageConfigType.DatabaseUsage.value} + }, + }, + "processor": {"type": "query-parser", "config": {}}, + "stage": { + "type": "table-usage", + "config": { + "filename": "/tmp/postgres_usage", + }, + }, + "bulkSink": { + "type": "metadata-usage", + "config": { + "filename": "/tmp/postgres_usage", + }, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "loggerLevel": "DEBUG", + "openMetadataServerConfig": metadata.config.dict(), + }, + } + workflow = UsageWorkflow.create(workflow_config) + search_cache.clear() + workflow.execute() + workflow.raise_from_status() + run_usage_workflow(db_service, metadata) + metadata.delete(DatabaseService, db_service.id, hard_delete=True, recursive=True) + workflow_config = OpenMetadataWorkflowConfig( + source=Source( + type=db_service.connection.config.type.value.lower(), + serviceName=db_service.fullyQualifiedName.__root__, + serviceConnection=db_service.connection, + sourceConfig=SourceConfig(config={}), + ), + sink=Sink( + type="metadata-rest", + config={}, + ), + workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config), + ) + metadata_ingestion = MetadataWorkflow.create(workflow_config) + metadata_ingestion.execute() + metadata_ingestion.raise_from_status() + run_usage_workflow(db_service, metadata)