feat(ingest): improve readability by using tables (#14074)

This commit is contained in:
Aseem Bansal 2025-07-14 19:46:20 +05:30 committed by GitHub
parent 8d332b68b8
commit 450172d90f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 93 additions and 2 deletions

View File

@ -9,6 +9,7 @@ from datetime import datetime
from typing import Any, Dict, List, Optional, Union
import click
from tabulate import tabulate
from datahub._version import __package_name__
from datahub.cli.json_file import check_mce_file
@ -519,4 +520,42 @@ def get_kafka_consumer_offsets() -> None:
"""Get Kafka consumer offsets from the DataHub API."""
graph = get_default_graph(ClientMode.CLI)
result = graph.get_kafka_consumer_offsets()
pprint.pprint(result)
table_data = []
headers = [
"Topic",
"Consumer Group",
"Schema",
"Partition",
"Offset",
"Lag",
"Avg Lag",
"Max Lag",
"Total Lag",
]
for topic, consumers in result.items():
for consumer_group, schemas in consumers.items():
for schema, data in schemas.items():
metrics = data.get("metrics", {})
partitions = data.get("partitions", {})
for partition, partition_data in partitions.items():
table_data.append(
[
topic,
consumer_group,
schema,
partition,
partition_data.get("offset", "N/A"),
partition_data.get("lag", "N/A"),
metrics.get("avgLag", "N/A"),
metrics.get("maxLag", "N/A"),
metrics.get("totalLag", "N/A"),
]
)
if table_data:
click.echo(tabulate(table_data, headers=headers, tablefmt="grid"))
else:
click.echo("No Kafka consumer offset data found.")

View File

@ -11,6 +11,7 @@ from typing import Any, Dict, List, Optional, Set, Union, cast, runtime_checkabl
import humanfriendly
import pydantic
from pydantic import BaseModel
from tabulate import tabulate
from typing_extensions import Literal, Protocol
from datahub.emitter.mcp import MetadataChangeProposalWrapper
@ -95,7 +96,58 @@ class Report(SupportsAsObj):
}
def as_string(self) -> str:
return pprint.pformat(self.as_obj(), width=150, sort_dicts=False)
self_obj = self.as_obj()
_aspects_by_subtypes = self_obj.pop("aspects_by_subtypes", None)
# Format the main report data
result = pprint.pformat(self_obj, width=150, sort_dicts=False)
# Add aspects_by_subtypes table if it exists
if _aspects_by_subtypes:
result += "\n\nAspects by Subtypes:\n"
result += self._format_aspects_by_subtypes_table(_aspects_by_subtypes)
return result
def _format_aspects_by_subtypes_table(
self, aspects_by_subtypes: Dict[str, Dict[str, Dict[str, int]]]
) -> str:
"""Format aspects_by_subtypes data as a table with aspects as rows and entity/subtype as columns."""
if not aspects_by_subtypes:
return "No aspects by subtypes data available."
all_aspects: set[str] = {
aspect
for subtypes in aspects_by_subtypes.values()
for aspects in subtypes.values()
for aspect in aspects
}
aspect_rows = sorted(all_aspects)
entity_subtype_columns = []
for entity_type, subtypes in aspects_by_subtypes.items():
for subtype in subtypes:
entity_subtype_columns.append(f"{entity_type} ({subtype})")
entity_subtype_columns.sort()
headers = ["Aspect"] + entity_subtype_columns
table_data = [
[aspect]
+ [
aspects.get(aspect, 0)
for subtypes in aspects_by_subtypes.values()
for aspects in subtypes.values()
]
for aspect in aspect_rows
]
if table_data:
return tabulate(table_data, headers=headers, tablefmt="grid")
else:
return "No aspects by subtypes data available."
def as_json(self) -> str:
return json.dumps(self.as_obj())