refactor(ci): move from sleep to kafka lag based testing (#8094)

This commit is contained in:
Shirshanka Das 2023-06-06 14:44:52 -07:00 committed by GitHub
parent 6bad15be5c
commit 8e6e4e8962
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 60 additions and 67 deletions

View File

@ -1,2 +0,0 @@
ELASTICSEARCH_REFRESH_INTERVAL_SECONDS = 4

View File

@ -1,6 +1,5 @@
import requests
from .constants import *
from time import sleep
from tests.consistency_utils import wait_for_writes_to_sync
class CustomSession(requests.Session):
@ -12,7 +11,7 @@ class CustomSession(requests.Session):
response = super(CustomSession, self).post(*args, **kwargs)
if "/logIn" not in args[0]:
print("sleeping.")
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
return response
@ -20,7 +19,7 @@ def post(*args, **kwargs):
response = requests.post(*args, **kwargs)
if "/logIn" not in args[0]:
print("sleeping.")
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
return response

View File

@ -3,8 +3,7 @@ import pytest
from time import sleep
from datahub.cli.cli_utils import guess_entity_type, post_entity, get_aspects_for_entity
from datahub.cli.ingest_cli import get_session_and_host, rollback
from tests.utils import ingest_file_via_rest
from requests_wrapper import ELASTICSEARCH_REFRESH_INTERVAL_SECONDS
from tests.utils import ingest_file_via_rest, wait_for_writes_to_sync
ingested_dataset_run_id = ""
ingested_editable_run_id = ""
@ -74,7 +73,7 @@ def test_rollback_editable():
session.post(rollback_url, data=json.dumps({"runId": ingested_dataset_run_id, "dryRun": False, "hardDelete": False}))
# Allow async MCP processor to handle ingestions & rollbacks
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# EditableDatasetProperties should still be part of the entity that was soft deleted.
assert "editableDatasetProperties" in get_aspects_for_entity(entity_urn=dataset_urn, aspects=["editableDatasetProperties"], typed=False)

View File

@ -14,7 +14,7 @@ from datahub.entrypoints import datahub
from datahub.metadata.schema_classes import DatasetProfileClass
from tests.aspect_generators.timeseries.dataset_profile_gen import \
gen_dataset_profiles
from tests.utils import get_strftime_from_timestamp_millis
from tests.utils import get_strftime_from_timestamp_millis, wait_for_writes_to_sync
import requests_wrapper as requests
logger = logging.getLogger(__name__)
@ -31,8 +31,7 @@ runner = CliRunner(mix_stderr=False)
def sync_elastic() -> None:
time.sleep(requests.ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
def datahub_put_profile(dataset_profile: DatasetProfileClass) -> None:
with tempfile.NamedTemporaryFile("w+t", suffix=".json") as aspect_file:

View File

@ -8,14 +8,14 @@ import datahub.emitter.mce_builder as builder
from datahub.emitter.serialization_helper import post_json_transform
from datahub.entrypoints import datahub
from datahub.metadata.schema_classes import DatasetProfileClass
from tests.utils import ingest_file_via_rest
from tests.utils import ingest_file_via_rest, wait_for_writes_to_sync
import requests_wrapper as requests
runner = CliRunner(mix_stderr=False)
def sync_elastic() -> None:
time.sleep(requests.ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
def datahub_rollback(run_id: str) -> None:

View File

@ -10,12 +10,13 @@ from datahub.entrypoints import datahub
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
import time
import requests_wrapper as requests
from tests.utils import wait_for_writes_to_sync
runner = CliRunner(mix_stderr=False)
def sync_elastic() -> None:
time.sleep(requests.ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
def datahub_upsert_group(group: CorpGroup) -> None:

View File

@ -0,0 +1,29 @@
import time
import subprocess
_ELASTIC_BUFFER_WRITES_TIME_IN_SEC: int = 1
def wait_for_writes_to_sync(max_timeout_in_sec: int = 120) -> None:
start_time = time.time()
# get offsets
lag_zero = False
while not lag_zero and (time.time() - start_time) < max_timeout_in_sec:
time.sleep(1) # micro-sleep
completed_process = subprocess.run(
"docker exec broker /bin/kafka-consumer-groups --bootstrap-server broker:29092 --group generic-mae-consumer-job-client --describe | grep -v LAG | awk '{print $6}'",
capture_output=True,
shell=True,
text=True,
)
result = str(completed_process.stdout)
lines = result.splitlines()
lag_values = [int(l) for l in lines if l != ""]
maximum_lag = max(lag_values)
if maximum_lag == 0:
lag_zero = True
if not lag_zero:
logger.warning(f"Exiting early from waiting for elastic to catch up due to a timeout. Current lag is {lag_values}")
else:
# we want to sleep for an additional period of time for Elastic writes buffer to clear
time.sleep(_ELASTIC_BUFFER_WRITES_TIME_IN_SEC)

View File

@ -8,9 +8,9 @@ from tests.utils import (
ingest_file_via_rest,
wait_for_healthcheck_util,
delete_urns_from_file,
wait_for_writes_to_sync,
get_datahub_graph,
)
from requests_wrapper import ELASTICSEARCH_REFRESH_INTERVAL_SECONDS
# Disable telemetry
os.environ["DATAHUB_TELEMETRY_ENABLED"] = "false"
@ -68,7 +68,7 @@ def test_setup():
),
)
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
assert "browsePaths" not in get_aspects_for_entity(
entity_urn=dataset_urn, aspects=["browsePaths"], typed=False
@ -101,8 +101,8 @@ def test_delete_reference(test_setup, depends=["test_healthchecks"]):
# Delete references to the tag
graph.delete_references_to_urn(tag_urn, dry_run=False)
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# Validate that references no longer exist
references_count, related_aspects = graph.delete_references_to_urn(
tag_urn, dry_run=True

View File

@ -3,8 +3,7 @@ from time import sleep
from datahub.cli import timeline_cli
from datahub.cli.cli_utils import guess_entity_type, post_entity
from tests.utils import ingest_file_via_rest, get_datahub_graph
from requests_wrapper import ELASTICSEARCH_REFRESH_INTERVAL_SECONDS
from tests.utils import ingest_file_via_rest, wait_for_writes_to_sync, get_datahub_graph
def test_all():
@ -176,4 +175,4 @@ def put(urn: str, aspect: str, aspect_data: str) -> None:
entity_type=entity_type,
aspect_value=aspect_obj,
)
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()

View File

@ -2,12 +2,12 @@ import os
import pytest
import requests
from time import sleep
from requests_wrapper import ELASTICSEARCH_REFRESH_INTERVAL_SECONDS
from tests.utils import (
get_frontend_url,
wait_for_healthcheck_util,
get_admin_credentials,
wait_for_writes_to_sync,
)
@ -77,7 +77,7 @@ def custom_user_setup():
assert sign_up_response
assert "error" not in sign_up_response
# Sleep for eventual consistency
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# signUp will override the session cookie to the new user to be signed up.
admin_session.cookies.clear()
@ -97,7 +97,7 @@ def custom_user_setup():
assert res_data["data"]
assert res_data["data"]["removeUser"] == True
# Sleep for eventual consistency
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# Make user created user is not there.
res_data = listUsers(admin_session)
@ -126,7 +126,7 @@ def access_token_setup():
revokeAccessToken(admin_session, metadata["id"])
# Sleep for eventual consistency
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
@pytest.mark.dependency(depends=["test_healthchecks"])
@ -152,7 +152,7 @@ def test_admin_can_create_list_and_revoke_tokens(wait_for_healthchecks):
)
admin_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Sleep for eventual consistency
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# Using a super account, list the previously created token.
res_data = listAccessTokens(admin_session)
@ -176,7 +176,7 @@ def test_admin_can_create_list_and_revoke_tokens(wait_for_healthchecks):
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] == True
# Sleep for eventual consistency
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session)
@ -209,7 +209,7 @@ def test_admin_can_create_and_revoke_tokens_for_other_user(wait_for_healthchecks
)
user_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Sleep for eventual consistency
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# Using a super account, list the previously created tokens.
res_data = listAccessTokens(admin_session)
@ -233,7 +233,7 @@ def test_admin_can_create_and_revoke_tokens_for_other_user(wait_for_healthchecks
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] == True
# Sleep for eventual consistency
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session)
@ -259,7 +259,7 @@ def test_non_admin_can_create_list_revoke_tokens(wait_for_healthchecks):
)
user_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Sleep for eventual consistency
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# User should be able to list his own token
res_data = listAccessTokens(
@ -286,7 +286,7 @@ def test_non_admin_can_create_list_revoke_tokens(wait_for_healthchecks):
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] == True
# Sleep for eventual consistency
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# Using a normal account, check that all its tokens where removed.
res_data = listAccessTokens(
@ -326,7 +326,7 @@ def test_admin_can_manage_tokens_generated_by_other_user(wait_for_healthchecks):
)
user_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Sleep for eventual consistency
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# Admin should be able to list other tokens
user_session.cookies.clear()
@ -357,7 +357,7 @@ def test_admin_can_manage_tokens_generated_by_other_user(wait_for_healthchecks):
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] == True
# Sleep for eventual consistency
sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# Using a normal account, check that all its tokens where removed.
user_session.cookies.clear()

View File

@ -14,6 +14,7 @@ from datahub.cli import cli_utils
from datahub.cli.cli_utils import get_system_auth
from datahub.ingestion.graph.client import DataHubGraph, DatahubClientConfig
from datahub.ingestion.run.pipeline import Pipeline
from tests.consistency_utils import wait_for_writes_to_sync
TIME: int = 1581407189000
logger = logging.getLogger(__name__)
@ -171,16 +172,7 @@ def delete_urns_from_file(filename: str, shared_data: bool = False) -> None:
d = json.load(f)
Parallel(n_jobs=10)(delayed(delete)(entry) for entry in d)
# Deletes require 60 seconds when run between tests operating on common data, otherwise standard sync wait
if shared_data:
wait_for_writes_to_sync()
# sleep(60)
else:
wait_for_writes_to_sync()
# sleep(requests.ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
wait_for_writes_to_sync()
# Fixed now value
NOW: datetime = datetime.now()
@ -242,27 +234,4 @@ def create_datahub_step_state_aspects(
json.dump(aspects_dict, f, indent=2)
def wait_for_writes_to_sync(max_timeout_in_sec: int = 120) -> None:
start_time = time.time()
# get offsets
lag_zero = False
while not lag_zero and (time.time() - start_time) < max_timeout_in_sec:
time.sleep(1) # micro-sleep
completed_process = subprocess.run(
"docker exec broker /bin/kafka-consumer-groups --bootstrap-server broker:29092 --group generic-mae-consumer-job-client --describe | grep -v LAG | awk '{print $6}'",
capture_output=True,
shell=True,
text=True,
)
result = str(completed_process.stdout)
lines = result.splitlines()
lag_values = [int(l) for l in lines if l != ""]
maximum_lag = max(lag_values)
if maximum_lag == 0:
lag_zero = True
if not lag_zero:
logger.warning(
f"Exiting early from waiting for elastic to catch up due to a timeout. Current lag is {lag_values}"
)