diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 6e2b46129e..fe4911729a 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -3,6 +3,7 @@ import itertools import logging import os import platform +import shutil import sys import time from dataclasses import dataclass @@ -128,14 +129,30 @@ class CliReport(Report): py_exec_path: str = sys.executable os_details: str = platform.platform() _peak_memory_usage: int = 0 + _peak_disk_usage: int = 0 def compute_stats(self) -> None: - mem_usage = psutil.Process(os.getpid()).memory_info().rss - if self._peak_memory_usage < mem_usage: - self._peak_memory_usage = mem_usage - self.peak_memory_usage = humanfriendly.format_size(self._peak_memory_usage) + try: + mem_usage = psutil.Process(os.getpid()).memory_info().rss + if self._peak_memory_usage < mem_usage: + self._peak_memory_usage = mem_usage + self.peak_memory_usage = humanfriendly.format_size( + self._peak_memory_usage + ) + self.mem_info = humanfriendly.format_size(mem_usage) + + disk_usage = shutil.disk_usage("/") + if self._peak_disk_usage < disk_usage.used: + self._peak_disk_usage = disk_usage.used + self.peak_disk_usage = humanfriendly.format_size(self._peak_disk_usage) + self.disk_info = { + "total": humanfriendly.format_size(disk_usage.total), + "used": humanfriendly.format_size(disk_usage.used), + "free": humanfriendly.format_size(disk_usage.free), + } + except Exception as e: + logger.warning(f"Failed to compute report memory usage: {e}") - self.mem_info = humanfriendly.format_size(mem_usage) return super().compute_stats()