MINOR: test case for usage->delete->usage (#16409)

* test: added test case to demonstrate cache issue
This commit is contained in:
Imri Paran 2024-05-28 11:23:43 +02:00 committed by GitHub
parent 0c974edd13
commit 067fb510ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -36,6 +36,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
WorkflowConfig, WorkflowConfig,
) )
from metadata.ingestion.lineage.sql_lineage import search_cache from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.models import ProfilerProcessorConfig from metadata.profiler.api.models import ProfilerProcessorConfig
from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.metadata import MetadataWorkflow
@ -64,9 +65,15 @@ def db_service(metadata, postgres_container):
service_entity = metadata.create_or_update(data=service) service_entity = metadata.create_or_update(data=service)
service_entity.connection.config.authType.password = postgres_container.password service_entity.connection.config.authType.password = postgres_container.password
yield service_entity yield service_entity
metadata.delete( try:
DatabaseService, service_entity.id, recursive=True, hard_delete=True metadata.delete(
) DatabaseService, service_entity.id, recursive=True, hard_delete=True
)
except APIError as error:
if error.status_code == 404:
pass
else:
raise
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@ -201,3 +208,96 @@ def test_profiler(run_profiler_workflow):
def test_lineage(ingest_lineage): def test_lineage(ingest_lineage):
pass pass
def run_usage_workflow(db_service, metadata):
workflow_config = {
"source": {
"type": "postgres-usage",
"serviceName": db_service.fullyQualifiedName.__root__,
"serviceConnection": db_service.connection.dict(),
"sourceConfig": {
"config": {"type": DatabaseUsageConfigType.DatabaseUsage.value}
},
},
"processor": {"type": "query-parser", "config": {}},
"stage": {
"type": "table-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"bulkSink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": metadata.config.dict(),
},
}
workflow = UsageWorkflow.create(workflow_config)
search_cache.clear()
workflow.execute()
workflow.raise_from_status()
@pytest.mark.xfail(
reason="'metadata.ingestion.lineage.sql_lineage.search_cache' gets corrupted with invalid data."
" See issue https://github.com/open-metadata/OpenMetadata/issues/16408"
)
def test_usage_delete_usage(db_service, ingest_lineage, metadata):
workflow_config = {
"source": {
"type": "postgres-usage",
"serviceName": db_service.fullyQualifiedName.__root__,
"serviceConnection": db_service.connection.dict(),
"sourceConfig": {
"config": {"type": DatabaseUsageConfigType.DatabaseUsage.value}
},
},
"processor": {"type": "query-parser", "config": {}},
"stage": {
"type": "table-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"bulkSink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": metadata.config.dict(),
},
}
workflow = UsageWorkflow.create(workflow_config)
search_cache.clear()
workflow.execute()
workflow.raise_from_status()
run_usage_workflow(db_service, metadata)
metadata.delete(DatabaseService, db_service.id, hard_delete=True, recursive=True)
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type=db_service.connection.config.type.value.lower(),
serviceName=db_service.fullyQualifiedName.__root__,
serviceConnection=db_service.connection,
sourceConfig=SourceConfig(config={}),
),
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config),
)
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
run_usage_workflow(db_service, metadata)