2023-10-10 16:08:34 +05:30
|
|
|
import logging
|
2023-06-19 13:47:44 +05:30
|
|
|
import os
|
2023-06-06 14:44:52 -07:00
|
|
|
import subprocess
|
2023-10-10 16:08:34 +05:30
|
|
|
import time
|
2023-06-19 13:47:44 +05:30
|
|
|
|
2023-06-06 14:44:52 -07:00
|
|
|
_ELASTIC_BUFFER_WRITES_TIME_IN_SEC: int = 1
|
2023-06-19 13:47:44 +05:30
|
|
|
USE_STATIC_SLEEP: bool = bool(os.getenv("USE_STATIC_SLEEP", False))
|
2023-10-10 16:08:34 +05:30
|
|
|
ELASTICSEARCH_REFRESH_INTERVAL_SECONDS: int = int(
|
|
|
|
os.getenv("ELASTICSEARCH_REFRESH_INTERVAL_SECONDS", 5)
|
|
|
|
)
|
2024-02-28 16:57:26 -06:00
|
|
|
KAFKA_BROKER_CONTAINER: str = str(
|
|
|
|
os.getenv("KAFKA_BROKER_CONTAINER", "datahub-broker-1")
|
|
|
|
)
|
|
|
|
KAFKA_BOOTSTRAP_SERVER: str = str(os.getenv("KAFKA_BOOTSTRAP_SERVER", "broker:29092"))
|
2023-10-10 16:08:34 +05:30
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
2023-06-06 14:44:52 -07:00
|
|
|
|
|
|
|
def wait_for_writes_to_sync(max_timeout_in_sec: int = 120) -> None:
|
2023-06-19 13:47:44 +05:30
|
|
|
if USE_STATIC_SLEEP:
|
|
|
|
time.sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
|
|
|
|
return
|
2023-06-06 14:44:52 -07:00
|
|
|
start_time = time.time()
|
|
|
|
# get offsets
|
|
|
|
lag_zero = False
|
|
|
|
while not lag_zero and (time.time() - start_time) < max_timeout_in_sec:
|
|
|
|
time.sleep(1) # micro-sleep
|
|
|
|
|
2024-02-28 16:57:26 -06:00
|
|
|
cmd = (
|
|
|
|
f"docker exec {KAFKA_BROKER_CONTAINER} /bin/kafka-consumer-groups --bootstrap-server {KAFKA_BOOTSTRAP_SERVER} --group generic-mae-consumer-job-client --describe | grep -v LAG "
|
|
|
|
+ "| awk '{print $6}'"
|
|
|
|
)
|
|
|
|
try:
|
|
|
|
completed_process = subprocess.run(
|
|
|
|
cmd,
|
|
|
|
capture_output=True,
|
|
|
|
shell=True,
|
|
|
|
text=True,
|
|
|
|
)
|
|
|
|
result = str(completed_process.stdout)
|
|
|
|
lines = result.splitlines()
|
|
|
|
lag_values = [int(line) for line in lines if line != ""]
|
|
|
|
maximum_lag = max(lag_values)
|
|
|
|
if maximum_lag == 0:
|
|
|
|
lag_zero = True
|
|
|
|
except ValueError:
|
|
|
|
logger.warning(f"Error reading kafka lag using command: {cmd}")
|
2023-06-06 14:44:52 -07:00
|
|
|
|
|
|
|
if not lag_zero:
|
2023-10-10 16:08:34 +05:30
|
|
|
logger.warning(
|
|
|
|
f"Exiting early from waiting for elastic to catch up due to a timeout. Current lag is {lag_values}"
|
|
|
|
)
|
2023-06-06 14:44:52 -07:00
|
|
|
else:
|
|
|
|
# we want to sleep for an additional period of time for Elastic writes buffer to clear
|
2023-10-10 16:08:34 +05:30
|
|
|
time.sleep(_ELASTIC_BUFFER_WRITES_TIME_IN_SEC)
|