From 2268c0c5b6aa3dfe60bf98aa02c0e816ba247c0f Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 4 Jan 2024 16:36:09 -0500 Subject: [PATCH] feat(ingest): track thread count in ingestion report (#9566) --- .../src/datahub/ingestion/run/pipeline.py | 21 ++++++++++++++++++- .../ingestion/source/looker/looker_usage.py | 1 + 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index d7c70dbea0..1641d71aba 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -5,6 +5,7 @@ import os import platform import shutil import sys +import threading import time from dataclasses import dataclass from typing import Any, Dict, Iterable, Iterator, List, Optional, cast @@ -129,9 +130,18 @@ class CliReport(Report): py_version: str = sys.version py_exec_path: str = sys.executable os_details: str = platform.platform() + + mem_info: Optional[str] = None + peak_memory_usage: Optional[str] = None _peak_memory_usage: int = 0 + + disk_info: Optional[dict] = None + peak_disk_usage: Optional[str] = None _peak_disk_usage: int = 0 + thread_count: Optional[int] = None + peak_thread_count: Optional[int] = None + def compute_stats(self) -> None: try: mem_usage = psutil.Process(os.getpid()).memory_info().rss @@ -141,7 +151,10 @@ class CliReport(Report): self._peak_memory_usage ) self.mem_info = humanfriendly.format_size(mem_usage) + except Exception as e: + logger.warning(f"Failed to compute memory usage: {e}") + try: disk_usage = shutil.disk_usage("/") if self._peak_disk_usage < disk_usage.used: self._peak_disk_usage = disk_usage.used @@ -152,7 +165,13 @@ class CliReport(Report): "free": humanfriendly.format_size(disk_usage.free), } except Exception as e: - logger.warning(f"Failed to compute report memory usage: {e}") + logger.warning(f"Failed to compute disk usage: {e}") + + try: + self.thread_count = threading.active_count() + self.peak_thread_count = max(self.peak_thread_count or 0, self.thread_count) + except Exception as e: + logger.warning(f"Failed to compute thread count: {e}") return super().compute_stats() diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py index 8f0a49ab42..cbec8ce3e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py @@ -4,6 +4,7 @@ # 3) Entity timeseries stat by user import concurrent +import concurrent.futures import dataclasses import datetime import logging