Fixes #22406: Add workflow resource utilisation metrics for better troubleshooting (#23696)

* Add workflow resource utilization metrics for better troubleshooting

* Add types for correct static type checking

* Remove duplicate type annotations
This commit is contained in:
Mohit Tilala 2025-10-06 13:20:06 +05:30 committed by GitHub
parent d22dd3cfad
commit 0cf0394d0b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 126 additions and 0 deletions

View File

@ -58,6 +58,7 @@ from metadata.utils.streamable_logger import (
setup_streamable_logging_for_workflow, setup_streamable_logging_for_workflow,
) )
from metadata.workflow.workflow_output_handler import WorkflowOutputHandler from metadata.workflow.workflow_output_handler import WorkflowOutputHandler
from metadata.workflow.workflow_resource_metrics import WorkflowResourceMetrics
from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
logger = ingestion_logger() logger = ingestion_logger()
@ -363,6 +364,20 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
f" found {len(step.status.failures)} errors" f" found {len(step.status.failures)} errors"
) )
# Only calculate resource metrics when debug is enabled
# for no unnecessary computations
if self._is_debug_enabled():
metrics = WorkflowResourceMetrics()
logger.debug(
f"Workflow Resources - "
f"CPU: {metrics.cpu_usage_percent:.2f}% "
f"({metrics.system_cpu_cores}c/{metrics.system_cpu_threads}t) | "
f"Memory: {metrics.memory_used_mb:.2f}MB/"
f"{metrics.memory_total_mb:.2f}MB "
f"({metrics.memory_usage_percent:.2f}%) | "
f"Processes: {metrics.active_processes}"
)
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error(f"Wild exception reporting status - {exc}") logger.error(f"Wild exception reporting status - {exc}")

View File

@ -0,0 +1,111 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module to capture resource metrics for workflow execution monitoring
"""
import os
from typing import Optional
import psutil
class WorkflowResourceMetrics:
"""Captures CPU and memory metrics for a workflow process and all its children."""
def __init__(self, pid: Optional[int] = None):
"""Initialize metrics for the given process ID (defaults to current process)."""
self.pid: int = pid or os.getpid()
self._collect_metrics()
def _collect_metrics(self):
"""Collect metrics for this process and all its children."""
try:
process = psutil.Process(self.pid)
# Get memory and CPU for this process and all its children
total_mem_rss = process.memory_info().rss
total_cpu = process.cpu_percent(interval=1)
process_count = 1
total_threads = process.num_threads() # Get thread count for main process
# Is process is spawning other child processes via multiprocessing
# then, include all child processes. This is needed to get aggregated
# view of CPU and memory usage
try:
children = process.children(recursive=True)
process_count += len(children)
for child in children:
try:
total_mem_rss += child.memory_info().rss
total_cpu += child.cpu_percent(interval=0.1)
total_threads += (
child.num_threads()
) # Add thread count for each child
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
except Exception:
pass
# Get actual system memory info
system_mem = psutil.virtual_memory()
# Get CPU core information
cpu_cores = psutil.cpu_count(logical=False) # Physical cores
cpu_logical = psutil.cpu_count(
logical=True
) # Logical cores (with hyperthreading)
# Store computed metrics
self.cpu_usage_percent: float = total_cpu
self.memory_used_rss_bytes: int = total_mem_rss
self.memory_used_mb: float = total_mem_rss / (1024 * 1024)
self.memory_total_mb: float = system_mem.total / (1024 * 1024)
self.memory_usage_percent: float = (total_mem_rss / system_mem.total) * 100
self.active_processes: int = process_count
self.active_threads: int = total_threads
self.system_cpu_cores: int = cpu_cores or 1
self.system_cpu_threads: int = cpu_logical or 1
except (psutil.NoSuchProcess, psutil.AccessDenied, Exception):
# Fallback values if process metrics cannot be collected
self.cpu_usage_percent = 0.0
self.memory_used_rss_bytes = 0
self.memory_used_mb = 0.0
self.memory_total_mb = 0.0
self.memory_usage_percent = 0.0
self.active_processes = 0
self.active_threads = 0
self.system_cpu_cores = 1
self.system_cpu_threads = 1
def to_dict(self):
"""Return metrics as a dictionary."""
return {
"cpu_usage_percent": self.cpu_usage_percent,
"memory_used_rss_bytes": self.memory_used_rss_bytes,
"memory_used_mb": self.memory_used_mb,
"memory_total_mb": self.memory_total_mb,
"memory_usage_percent": self.memory_usage_percent,
"active_processes": self.active_processes,
"active_threads": self.active_threads,
"system_cpu_cores": self.system_cpu_cores,
"system_cpu_threads": self.system_cpu_threads,
}
def __str__(self):
"""String representation of metrics."""
return (
f"CPU: {self.cpu_usage_percent:.2f}% ({self.system_cpu_cores}c/{self.system_cpu_threads}t) | "
f"Memory: {self.memory_used_mb:.2f}MB/{self.memory_total_mb:.2f}MB ({self.memory_usage_percent:.2f}%) | "
f"Processes: {self.active_processes} | Threads: {self.active_threads}"
)