MINOR: Add logic to handle WorkflowContext on Ingestion (#21425)

* Add logic to handle WorkflowContext on Ingestion

* Revert base.py changes

* Removed comment

* Fix basedpyright complaints

* Make ContextManager automatically add its context to the PipelineStatus

* Small changes

(cherry picked from commit 5b20b845462cfcb568b92dbf22e160226433fae5)
This commit is contained in:
IceS2 2025-06-03 17:35:08 +02:00 committed by OpenMetadata Release Bot
parent 008a3ebc27
commit 41dfdff43e
7 changed files with 276 additions and 12 deletions

View File

@ -0,0 +1,24 @@
"""
Base context class for workflow contexts.
This module defines the BaseContext, which all workflow context types should inherit from.
It uses Pydantic for data validation, serialization, and type safety.
"""
from enum import Enum
from pydantic import BaseModel
class BaseContextFieldsEnum(Enum):
"""
Base class for all workflow context fields.
"""
class BaseContext(BaseModel):
"""
Base class for all workflow contexts. Extend this for specific context types.
"""
class Config:
validate_assignment: bool = True

View File

@ -0,0 +1,122 @@
"""
Context manager for workflow contexts.
This module provides the ContextManager singleton, which dynamically registers and exposes
workflow contexts as attributes. Contexts are registered using the register_context decorator.
This approach combines extensibility with attribute-based access and IDE/static analysis support.
"""
import threading
from enum import Enum
from typing import Any, Optional
from metadata.workflow.context.base import BaseContext, BaseContextFieldsEnum
from metadata.workflow.context.workflow_context import WorkflowContext
# NOTE: To make the context available on the context manager we need to:
# 1. Add it to the ContextsEnum
# 2. Declare it and initialize it as a class attribute in ContextManager
class ContextsEnum(Enum):
"""
Enum defining all available workflow contexts.
Each member represents a context type that can be registered with the ContextManager.
"""
WORKFLOW = "workflow"
class ContextManager:
"""
Singleton manager for all workflow contexts. Dynamically registers and exposes contexts as attributes.
"""
_instance: Optional["ContextManager"] = None
_lock: threading.RLock = threading.RLock()
# List of Contexts
workflow: WorkflowContext = WorkflowContext()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def get_instance(cls):
"""
Return the singleton instance of ContextManager, creating it if necessary.
"""
if cls._instance is None:
_ = cls()
return cls._instance
@classmethod
def set_context_attr(
cls, context_enum: ContextsEnum, field_enum: BaseContextFieldsEnum, value: Any
):
"""
Thread-safe method to set an attribute on a context.
Args:
context_enum (Enum): Enum member representing the context (e.g., ContextManager.Contexts.workflow).
field_enum (Enum): Enum member representing the field (e.g., WorkflowContext.Fields.serviceName).
value (Any): The value to set for the field.
"""
with cls._lock:
instance = cls.get_instance()
context = getattr(instance, context_enum.value)
setattr(context, field_enum.value, value)
@classmethod
def get_context_attr(
cls, context_enum: ContextsEnum, field_enum: BaseContextFieldsEnum
) -> Any:
"""
Thread-safe method to get an attribute from a context.
Args:
context_enum (Enum): Enum member representing the context (e.g., ContextManager.Contexts.workflow).
field_enum (Enum): Enum member representing the field (e.g., WorkflowContext.Fields.serviceName).
Returns:
Any: The value of the requested field.
"""
with cls._lock:
instance = cls.get_instance()
context = getattr(instance, context_enum.value)
return getattr(context, field_enum.value)
@classmethod
def get_context(cls, context_enum: ContextsEnum) -> BaseContext:
"""
Thread-safe method to retrieve the full context object by Enum member.
Args:
context_enum (Enum): Enum member representing the context (e.g., ContextManager.Contexts.workflow).
Returns:
Any: The context object instance.
"""
with cls._lock:
instance = cls.get_instance()
return getattr(instance, context_enum.value)
@classmethod
def dump_contexts(cls) -> Optional[dict[str, Any]]:
"""
Dump all available contexts as a dictionary: {contextName: content}
Assumes each context is a Pydantic object.
"""
with cls._lock:
instance = cls.get_instance()
result: dict[str, Any] = {}
for context_enum in ContextsEnum:
context_obj = getattr(instance, context_enum.value)
result[context_enum.value] = context_obj.model_dump()
if result:
return result
return None

View File

@ -0,0 +1,29 @@
"""
Workflow context definition.
This module defines the WorkflowContext, which holds workflow-level metadata such as the service name.
It is registered with the ContextManager for attribute-based access throughout the workflow system.
"""
from typing import Union
from pydantic import Field
from .base import BaseContext, BaseContextFieldsEnum
class WorkflowContextFieldsEnum(BaseContextFieldsEnum):
"""
Enum defining all available workflow context fields.
"""
SERVICE_NAME = "serviceName"
class WorkflowContext(BaseContext):
"""
Context for workflow-level metadata.
"""
serviceName: Union[str, None] = Field(
default=None, description="Name of the service on which the workflow operates"
)

View File

@ -29,10 +29,11 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig, OpenMetadataWorkflowConfig,
) )
from metadata.generated.schema.type.basic import Timestamp from metadata.generated.schema.type.basic import Map, Timestamp
from metadata.ingestion.api.step import Step, Summary from metadata.ingestion.api.step import Step, Summary
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ometa_logger from metadata.utils.logger import ometa_logger
from metadata.workflow.context.context_manager import ContextManager
logger = ometa_logger() logger = ometa_logger()
@ -80,12 +81,23 @@ class WorkflowStatusMixin:
pipelineState=state, pipelineState=state,
startDate=Timestamp(self._start_ts), startDate=Timestamp(self._start_ts),
timestamp=Timestamp(self._start_ts), timestamp=Timestamp(self._start_ts),
) ) # type: ignore
def update_pipeline_status_metadata(
self, pipeline_status: PipelineStatus
) -> PipelineStatus:
"""
Update the pipeline status metadata with the context manager data.
"""
metadata = ContextManager.dump_contexts()
if metadata:
pipeline_status.metadata = Map(**metadata)
else:
pipeline_status.metadata = None
return pipeline_status
def set_ingestion_pipeline_status( def set_ingestion_pipeline_status(
self, self, state: PipelineState, ingestion_status: Optional[IngestionStatus] = None
state: PipelineState,
ingestion_status: Optional[IngestionStatus] = None,
) -> None: ) -> None:
""" """
Method to set the pipeline status of current ingestion pipeline Method to set the pipeline status of current ingestion pipeline
@ -93,7 +105,11 @@ class WorkflowStatusMixin:
try: try:
# if we don't have a related Ingestion Pipeline FQN, no status is set. # if we don't have a related Ingestion Pipeline FQN, no status is set.
if self.config.ingestionPipelineFQN and self.ingestion_pipeline: if (
self.config.ingestionPipelineFQN
and self.ingestion_pipeline
and self.ingestion_pipeline.fullyQualifiedName
):
pipeline_status = self.metadata.get_pipeline_status( pipeline_status = self.metadata.get_pipeline_status(
self.ingestion_pipeline.fullyQualifiedName.root, self.run_id self.ingestion_pipeline.fullyQualifiedName.root, self.run_id
) )
@ -112,10 +128,15 @@ class WorkflowStatusMixin:
) )
# committing configurations can be a burden on resources, # committing configurations can be a burden on resources,
# we dump a subset to be mindful of the payload size # we dump a subset to be mindful of the payload size
pipeline_status.config = self.config.model_dump( pipeline_status.config = Map(
include={"appConfig"}, **self.config.model_dump(
mask_secrets=True, include={"appConfig"},
mask_secrets=True,
)
) )
pipeline_status = self.update_pipeline_status_metadata(pipeline_status)
self.metadata.create_or_update_pipeline_status( self.metadata.create_or_update_pipeline_status(
self.ingestion_pipeline.fullyQualifiedName.root, pipeline_status self.ingestion_pipeline.fullyQualifiedName.root, pipeline_status
) )
@ -129,13 +150,13 @@ class WorkflowStatusMixin:
""" """
Method to raise error if failed execution Method to raise error if failed execution
""" """
self.raise_from_status_internal(raise_warnings) self.raise_from_status_internal(raise_warnings) # type: ignore
def result_status(self) -> WorkflowResultStatus: def result_status(self) -> WorkflowResultStatus:
""" """
Returns 1 if source status is failed, 0 otherwise. Returns 1 if source status is failed, 0 otherwise.
""" """
if self.get_failures(): if self.get_failures(): # type: ignore
return WorkflowResultStatus.FAILURE return WorkflowResultStatus.FAILURE
return WorkflowResultStatus.SUCCESS return WorkflowResultStatus.SUCCESS
@ -148,6 +169,6 @@ class WorkflowStatusMixin:
return IngestionStatus( return IngestionStatus(
[ [
StepSummary.model_validate(Summary.from_step(step).model_dump()) StepSummary.model_validate(Summary.from_step(step).model_dump())
for step in self.workflow_steps() for step in self.workflow_steps() # type: ignore
] ]
) )

View File

@ -0,0 +1,60 @@
from metadata.workflow.context.context_manager import ContextManager, ContextsEnum
from metadata.workflow.context.workflow_context import (
WorkflowContext,
WorkflowContextFieldsEnum,
)
def test_singleton_behavior():
cm1 = ContextManager.get_instance()
cm2 = ContextManager.get_instance()
assert cm1 is cm2, "ContextManager should be a singleton"
def test_context_get_set_attr():
service_name = "test_service"
cm = ContextManager.get_instance()
assert cm is not None
# Set and get using enums
ContextManager.set_context_attr(
ContextsEnum.WORKFLOW, WorkflowContextFieldsEnum.SERVICE_NAME, service_name
)
value = ContextManager.get_context_attr(
ContextsEnum.WORKFLOW, WorkflowContextFieldsEnum.SERVICE_NAME
)
assert value == service_name
def test_get_context_returns_context():
cm = ContextManager.get_instance()
assert cm is not None
workflow_ctx = ContextManager.get_context(ContextsEnum.WORKFLOW)
assert isinstance(workflow_ctx, WorkflowContext)
# Should be the same object as returned by get_context
assert workflow_ctx is ContextManager.get_context(ContextsEnum.WORKFLOW)
def test_thread_safety():
import threading
cm = ContextManager.get_instance()
assert cm is not None
def set_service_name(name):
ContextManager.set_context_attr(
ContextsEnum.WORKFLOW, WorkflowContextFieldsEnum.SERVICE_NAME, name
)
threads = [
threading.Thread(target=set_service_name, args=(f"service_{i}",))
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
# The final value should be one of the set values
final_value = ContextManager.get_context_attr(
ContextsEnum.WORKFLOW, WorkflowContextFieldsEnum.SERVICE_NAME
)
assert final_value in {f"service_{i}" for i in range(10)}

View File

@ -47,6 +47,10 @@
"config": { "config": {
"description": "Pipeline configuration for this particular execution.", "description": "Pipeline configuration for this particular execution.",
"$ref": "../../../type/basic.json#/definitions/map" "$ref": "../../../type/basic.json#/definitions/map"
},
"metadata": {
"description": "Metadata for the pipeline status.",
"$ref": "../../../type/basic.json#/definitions/map"
} }
}, },
"additionalProperties": false "additionalProperties": false

View File

@ -652,6 +652,10 @@ export interface PipelineStatus {
* endDate of the pipeline run for this particular execution. * endDate of the pipeline run for this particular execution.
*/ */
endDate?: number; endDate?: number;
/**
* Metadata for the pipeline status.
*/
metadata?: { [key: string]: any };
/** /**
* Pipeline status denotes if its failed or succeeded. * Pipeline status denotes if its failed or succeeded.
*/ */