OpenMetadata/ingestion/tests/unit/workflow/test_context_manager.py
IceS2 5b20b84546
MINOR: Add logic to handle WorkflowContext on Ingestion (#21425)
* Add logic to handle WorkflowContext on Ingestion

* Revert base.py changes

* Removed comment

* Fix basedpyright complaints

* Make ContextManager automatically add its context to the PipelineStatus

* Small changes
2025-06-03 17:35:08 +02:00

61 lines
1.9 KiB
Python

from metadata.workflow.context.context_manager import ContextManager, ContextsEnum
from metadata.workflow.context.workflow_context import (
WorkflowContext,
WorkflowContextFieldsEnum,
)
def test_singleton_behavior():
cm1 = ContextManager.get_instance()
cm2 = ContextManager.get_instance()
assert cm1 is cm2, "ContextManager should be a singleton"
def test_context_get_set_attr():
service_name = "test_service"
cm = ContextManager.get_instance()
assert cm is not None
# Set and get using enums
ContextManager.set_context_attr(
ContextsEnum.WORKFLOW, WorkflowContextFieldsEnum.SERVICE_NAME, service_name
)
value = ContextManager.get_context_attr(
ContextsEnum.WORKFLOW, WorkflowContextFieldsEnum.SERVICE_NAME
)
assert value == service_name
def test_get_context_returns_context():
cm = ContextManager.get_instance()
assert cm is not None
workflow_ctx = ContextManager.get_context(ContextsEnum.WORKFLOW)
assert isinstance(workflow_ctx, WorkflowContext)
# Should be the same object as returned by get_context
assert workflow_ctx is ContextManager.get_context(ContextsEnum.WORKFLOW)
def test_thread_safety():
import threading
cm = ContextManager.get_instance()
assert cm is not None
def set_service_name(name):
ContextManager.set_context_attr(
ContextsEnum.WORKFLOW, WorkflowContextFieldsEnum.SERVICE_NAME, name
)
threads = [
threading.Thread(target=set_service_name, args=(f"service_{i}",))
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
# The final value should be one of the set values
final_value = ContextManager.get_context_attr(
ContextsEnum.WORKFLOW, WorkflowContextFieldsEnum.SERVICE_NAME
)
assert final_value in {f"service_{i}" for i in range(10)}