From f3c0e0804e51a140b06b1054b9be48563006ca0d Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Mon, 17 Nov 2025 22:14:27 +0530 Subject: [PATCH] refactor(test): use DataHub API in consistency checks (#15311) Co-authored-by: cclaude-dev Co-authored-by: Claude --- smoke-test/tests/consistency_utils.py | 115 +++++++++++++++++--------- 1 file changed, 75 insertions(+), 40 deletions(-) diff --git a/smoke-test/tests/consistency_utils.py b/smoke-test/tests/consistency_utils.py index 8d07d66148..cdff67c000 100644 --- a/smoke-test/tests/consistency_utils.py +++ b/smoke-test/tests/consistency_utils.py @@ -1,6 +1,8 @@ import logging -import subprocess import time +from typing import Optional + +import requests from tests.utilities import env_vars @@ -8,67 +10,100 @@ USE_STATIC_SLEEP: bool = env_vars.get_use_static_sleep() ELASTICSEARCH_REFRESH_INTERVAL_SECONDS: int = ( env_vars.get_elasticsearch_refresh_interval_seconds() ) -KAFKA_BOOTSTRAP_SERVER: str = env_vars.get_kafka_bootstrap_server() logger = logging.getLogger(__name__) -def infer_kafka_broker_container() -> str: - cmd = "docker ps --format '{{.Names}}' | grep broker" - completed_process = subprocess.run( - cmd, - capture_output=True, - shell=True, - text=True, - ) - result = str(completed_process.stdout) - lines = result.splitlines() - if len(lines) == 0: - raise ValueError("No Kafka broker containers found") - return lines[0] +def get_kafka_consumer_lag_from_api() -> Optional[int]: + """ + Fetch Kafka consumer lag using DataHub's OpenAPI endpoint. + Returns the maximum lag across all consumer groups (MCP, MCL, MCL-timeseries). + Returns None if the API call fails. + """ + gms_url = env_vars.get_gms_url() + if not gms_url: + logger.warning("GMS URL not configured, cannot fetch Kafka lag from API") + return None + + # Get admin credentials + admin_username = env_vars.get_admin_username() + admin_password = env_vars.get_admin_password() + + # Define the endpoints to check + endpoints = [ + "/openapi/operations/kafka/mcp/consumer/offsets", + "/openapi/operations/kafka/mcl/consumer/offsets", + "/openapi/operations/kafka/mcl-timeseries/consumer/offsets", + ] + + max_lag = 0 + for endpoint in endpoints: + url = f"{gms_url}{endpoint}?skipCache=true&detailed=false" + try: + response = requests.get( + url, + auth=(admin_username, admin_password), + timeout=10, + ) + if response.status_code == 200: + data = response.json() + # Parse the response structure: + # { + # "consumer-group-id": { + # "topic-name": { + # "metrics": { + # "maxLag": 500 + # } + # } + # } + # } + for consumer_group in data.values(): + if isinstance(consumer_group, dict): + for topic_info in consumer_group.values(): + if isinstance(topic_info, dict) and "metrics" in topic_info: + metrics = topic_info["metrics"] + if metrics and "maxLag" in metrics: + lag = metrics["maxLag"] + max_lag = max(max_lag, lag) + else: + logger.warning( + f"Failed to fetch Kafka lag from {endpoint}: HTTP {response.status_code}" + ) + except Exception as e: + logger.warning(f"Error fetching Kafka lag from {endpoint}: {e}") + + return max_lag def wait_for_writes_to_sync( max_timeout_in_sec: int = 120, consumer_group: str = "generic-mae-consumer-job-client", ) -> None: + """ + Wait for Kafka consumer lag to reach zero, indicating all writes have been processed. + Uses DataHub's OpenAPI endpoint to check consumer lag instead of Kafka CLI. + """ if USE_STATIC_SLEEP: time.sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS) return - KAFKA_BROKER_CONTAINER: str = str( - env_vars.get_kafka_broker_container() or infer_kafka_broker_container() - ) + start_time = time.time() - # get offsets lag_zero = False + current_lag = None + while not lag_zero and (time.time() - start_time) < max_timeout_in_sec: time.sleep(1) # micro-sleep - cmd = ( - f"docker exec {KAFKA_BROKER_CONTAINER} /bin/kafka-consumer-groups --bootstrap-server {KAFKA_BOOTSTRAP_SERVER} --all-groups --describe | grep -E '({consumer_group}|cdc-consumer-job-client)' " - + "| awk '{print $6}'" - ) - try: - completed_process = subprocess.run( - cmd, - capture_output=True, - shell=True, - text=True, - ) - result = str(completed_process.stdout) - lines = result.splitlines() - lag_values = [int(line) for line in lines if line != ""] - maximum_lag = max(lag_values) - if maximum_lag == 0: + current_lag = get_kafka_consumer_lag_from_api() + if current_lag is not None: + if current_lag == 0: lag_zero = True - except ValueError: - logger.warning( - f"Error reading kafka lag using command: {cmd}", exc_info=True - ) + else: + logger.warning("Unable to fetch Kafka lag from API") if not lag_zero: logger.warning( - f"Exiting early from waiting for elastic to catch up due to a timeout. Current lag is {lag_values}" + f"Exiting early from waiting for elastic to catch up due to a timeout. Current lag is {current_lag}" ) else: # we want to sleep for an additional period of time for Elastic writes buffer to clear