From 450172d90ff9fa1a618fb594bc6edeb0d7b93818 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Mon, 14 Jul 2025 19:46:20 +0530 Subject: [PATCH] feat(ingest): improve readability by using tables (#14074) --- .../src/datahub/cli/check_cli.py | 41 +++++++++++++- .../src/datahub/ingestion/api/report.py | 54 ++++++++++++++++++- 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/check_cli.py b/metadata-ingestion/src/datahub/cli/check_cli.py index f96fbf40b4..e58f6ba5c1 100644 --- a/metadata-ingestion/src/datahub/cli/check_cli.py +++ b/metadata-ingestion/src/datahub/cli/check_cli.py @@ -9,6 +9,7 @@ from datetime import datetime from typing import Any, Dict, List, Optional, Union import click +from tabulate import tabulate from datahub._version import __package_name__ from datahub.cli.json_file import check_mce_file @@ -519,4 +520,42 @@ def get_kafka_consumer_offsets() -> None: """Get Kafka consumer offsets from the DataHub API.""" graph = get_default_graph(ClientMode.CLI) result = graph.get_kafka_consumer_offsets() - pprint.pprint(result) + + table_data = [] + headers = [ + "Topic", + "Consumer Group", + "Schema", + "Partition", + "Offset", + "Lag", + "Avg Lag", + "Max Lag", + "Total Lag", + ] + + for topic, consumers in result.items(): + for consumer_group, schemas in consumers.items(): + for schema, data in schemas.items(): + metrics = data.get("metrics", {}) + partitions = data.get("partitions", {}) + + for partition, partition_data in partitions.items(): + table_data.append( + [ + topic, + consumer_group, + schema, + partition, + partition_data.get("offset", "N/A"), + partition_data.get("lag", "N/A"), + metrics.get("avgLag", "N/A"), + metrics.get("maxLag", "N/A"), + metrics.get("totalLag", "N/A"), + ] + ) + + if table_data: + click.echo(tabulate(table_data, headers=headers, tablefmt="grid")) + else: + click.echo("No Kafka consumer offset data found.") diff --git a/metadata-ingestion/src/datahub/ingestion/api/report.py b/metadata-ingestion/src/datahub/ingestion/api/report.py index da93414583..8c536a5454 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/report.py +++ b/metadata-ingestion/src/datahub/ingestion/api/report.py @@ -11,6 +11,7 @@ from typing import Any, Dict, List, Optional, Set, Union, cast, runtime_checkabl import humanfriendly import pydantic from pydantic import BaseModel +from tabulate import tabulate from typing_extensions import Literal, Protocol from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -95,7 +96,58 @@ class Report(SupportsAsObj): } def as_string(self) -> str: - return pprint.pformat(self.as_obj(), width=150, sort_dicts=False) + self_obj = self.as_obj() + _aspects_by_subtypes = self_obj.pop("aspects_by_subtypes", None) + + # Format the main report data + result = pprint.pformat(self_obj, width=150, sort_dicts=False) + + # Add aspects_by_subtypes table if it exists + if _aspects_by_subtypes: + result += "\n\nAspects by Subtypes:\n" + result += self._format_aspects_by_subtypes_table(_aspects_by_subtypes) + + return result + + def _format_aspects_by_subtypes_table( + self, aspects_by_subtypes: Dict[str, Dict[str, Dict[str, int]]] + ) -> str: + """Format aspects_by_subtypes data as a table with aspects as rows and entity/subtype as columns.""" + if not aspects_by_subtypes: + return "No aspects by subtypes data available." + + all_aspects: set[str] = { + aspect + for subtypes in aspects_by_subtypes.values() + for aspects in subtypes.values() + for aspect in aspects + } + + aspect_rows = sorted(all_aspects) + + entity_subtype_columns = [] + for entity_type, subtypes in aspects_by_subtypes.items(): + for subtype in subtypes: + entity_subtype_columns.append(f"{entity_type} ({subtype})") + + entity_subtype_columns.sort() + + headers = ["Aspect"] + entity_subtype_columns + + table_data = [ + [aspect] + + [ + aspects.get(aspect, 0) + for subtypes in aspects_by_subtypes.values() + for aspects in subtypes.values() + ] + for aspect in aspect_rows + ] + + if table_data: + return tabulate(table_data, headers=headers, tablefmt="grid") + else: + return "No aspects by subtypes data available." def as_json(self) -> str: return json.dumps(self.as_obj())