datahub/smoke-test/tests/consistency_utils.py

75 lines
2.4 KiB
Python

import logging
import os
import subprocess
import time
USE_STATIC_SLEEP: bool = bool(os.getenv("USE_STATIC_SLEEP", False))
ELASTICSEARCH_REFRESH_INTERVAL_SECONDS: int = int(
os.getenv("ELASTICSEARCH_REFRESH_INTERVAL_SECONDS", 1)
)
KAFKA_BOOTSTRAP_SERVER: str = str(os.getenv("KAFKA_BOOTSTRAP_SERVER", "broker:29092"))
logger = logging.getLogger(__name__)
def infer_kafka_broker_container() -> str:
cmd = "docker ps --format '{{.Names}}' | grep broker"
completed_process = subprocess.run(
cmd,
capture_output=True,
shell=True,
text=True,
)
result = str(completed_process.stdout)
lines = result.splitlines()
if len(lines) == 0:
raise ValueError("No Kafka broker containers found")
return lines[0]
def wait_for_writes_to_sync(
max_timeout_in_sec: int = 120,
consumer_group: str = "generic-mae-consumer-job-client",
) -> None:
if USE_STATIC_SLEEP:
time.sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
return
KAFKA_BROKER_CONTAINER: str = str(
os.getenv("KAFKA_BROKER_CONTAINER", infer_kafka_broker_container())
)
start_time = time.time()
# get offsets
lag_zero = False
while not lag_zero and (time.time() - start_time) < max_timeout_in_sec:
time.sleep(1) # micro-sleep
cmd = (
f"docker exec {KAFKA_BROKER_CONTAINER} /bin/kafka-consumer-groups --bootstrap-server {KAFKA_BOOTSTRAP_SERVER} --group '{consumer_group}' --describe | grep -v LAG "
+ "| awk '{print $6}'"
)
try:
completed_process = subprocess.run(
cmd,
capture_output=True,
shell=True,
text=True,
)
result = str(completed_process.stdout)
lines = result.splitlines()
lag_values = [int(line) for line in lines if line != ""]
maximum_lag = max(lag_values)
if maximum_lag == 0:
lag_zero = True
except ValueError:
logger.warning(
f"Error reading kafka lag using command: {cmd}", exc_info=True
)
if not lag_zero:
logger.warning(
f"Exiting early from waiting for elastic to catch up due to a timeout. Current lag is {lag_values}"
)
else:
# we want to sleep for an additional period of time for Elastic writes buffer to clear
time.sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)