mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-18 14:16:48 +00:00
fix(ci): cleanup sleeps to instead use retries (#5597)
This commit is contained in:
parent
0481075705
commit
526d0497a7
@ -76,6 +76,32 @@ def _ensure_user_present(urn: str):
|
|||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
@tenacity.retry(
|
||||||
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
||||||
|
)
|
||||||
|
def _ensure_user_relationship_present(frontend_session, urn, relationships):
|
||||||
|
json = {
|
||||||
|
"query": """query corpUser($urn: String!) {\n
|
||||||
|
corpUser(urn: $urn) {\n
|
||||||
|
urn\n
|
||||||
|
relationships(input: { types: ["IsMemberOfNativeGroup"], direction: OUTGOING, start: 0, count: 1 }) {\n
|
||||||
|
total\n
|
||||||
|
}\n
|
||||||
|
}\n
|
||||||
|
}""",
|
||||||
|
"variables": {"urn": urn},
|
||||||
|
}
|
||||||
|
response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
||||||
|
response.raise_for_status()
|
||||||
|
res_data = response.json()
|
||||||
|
|
||||||
|
assert res_data
|
||||||
|
assert res_data["data"]
|
||||||
|
assert res_data["data"]["corpUser"]
|
||||||
|
assert res_data["data"]["corpUser"]["relationships"]
|
||||||
|
assert res_data["data"]["corpUser"]["relationships"]["total"] == 1
|
||||||
|
|
||||||
|
|
||||||
@tenacity.retry(
|
@tenacity.retry(
|
||||||
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
||||||
)
|
)
|
||||||
@ -816,30 +842,8 @@ def test_add_remove_members_from_group(frontend_session):
|
|||||||
response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
# Sleep for edge store to be updated. Not ideal!
|
|
||||||
time.sleep(3)
|
|
||||||
|
|
||||||
# Verify the member has been added
|
# Verify the member has been added
|
||||||
json = {
|
_ensure_user_relationship_present(frontend_session, "urn:li:corpuser:jdoe", 1)
|
||||||
"query": """query corpUser($urn: String!) {\n
|
|
||||||
corpUser(urn: $urn) {\n
|
|
||||||
urn\n
|
|
||||||
relationships(input: { types: ["IsMemberOfNativeGroup"], direction: OUTGOING, start: 0, count: 1 }) {\n
|
|
||||||
total\n
|
|
||||||
}\n
|
|
||||||
}\n
|
|
||||||
}""",
|
|
||||||
"variables": {"urn": "urn:li:corpuser:jdoe"},
|
|
||||||
}
|
|
||||||
response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
|
||||||
response.raise_for_status()
|
|
||||||
res_data = response.json()
|
|
||||||
|
|
||||||
assert res_data
|
|
||||||
assert res_data["data"]
|
|
||||||
assert res_data["data"]["corpUser"]
|
|
||||||
assert res_data["data"]["corpUser"]["relationships"]
|
|
||||||
assert res_data["data"]["corpUser"]["relationships"]["total"] == 1
|
|
||||||
|
|
||||||
# Now remove jdoe from the group
|
# Now remove jdoe from the group
|
||||||
json = {
|
json = {
|
||||||
@ -856,29 +860,8 @@ def test_add_remove_members_from_group(frontend_session):
|
|||||||
response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
# Sleep for edge store to be updated. Not ideal!
|
|
||||||
time.sleep(3)
|
|
||||||
|
|
||||||
# Verify the member has been removed
|
# Verify the member has been removed
|
||||||
json = {
|
_ensure_user_relationship_present(frontend_session, "urn:li:corpuser:jdoe", 0)
|
||||||
"query": """query corpUser($urn: String!) {\n
|
|
||||||
corpUser(urn: $urn) {\n
|
|
||||||
urn\n
|
|
||||||
relationships(input: { types: ["IsMemberOfNativeGroup"], direction: OUTGOING, start: 0, count: 1 }) {\n
|
|
||||||
total\n
|
|
||||||
}\n
|
|
||||||
}\n
|
|
||||||
}""",
|
|
||||||
"variables": {"urn": "urn:li:corpuser:jdoe"},
|
|
||||||
}
|
|
||||||
response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
|
||||||
response.raise_for_status()
|
|
||||||
res_data = response.json()
|
|
||||||
|
|
||||||
assert res_data
|
|
||||||
assert res_data["data"]
|
|
||||||
assert res_data["data"]["corpUser"]
|
|
||||||
assert res_data["data"]["corpUser"]["relationships"]["total"] == 0
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||||||
|
@ -1,9 +1,15 @@
|
|||||||
import time
|
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import requests
|
import requests
|
||||||
|
import tenacity
|
||||||
|
|
||||||
from tests.utils import get_frontend_url, ingest_file_via_rest, wait_for_healthcheck_util
|
from tests.utils import (
|
||||||
|
get_frontend_url,
|
||||||
|
ingest_file_via_rest,
|
||||||
|
wait_for_healthcheck_util,
|
||||||
|
get_sleep_info,
|
||||||
|
)
|
||||||
|
|
||||||
|
sleep_sec, sleep_times = get_sleep_info()
|
||||||
|
|
||||||
bootstrap_small = "test_resources/bootstrap_single.json"
|
bootstrap_small = "test_resources/bootstrap_single.json"
|
||||||
bootstrap_small_2 = "test_resources/bootstrap_single2.json"
|
bootstrap_small_2 = "test_resources/bootstrap_single2.json"
|
||||||
@ -29,9 +35,10 @@ def frontend_session(wait_for_healthchecks):
|
|||||||
yield session
|
yield session
|
||||||
|
|
||||||
|
|
||||||
def test_ingestion_via_rest_rapid(frontend_session, wait_for_healthchecks):
|
@tenacity.retry(
|
||||||
ingest_file_via_rest(bootstrap_small)
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
||||||
ingest_file_via_rest(bootstrap_small_2)
|
)
|
||||||
|
def _ensure_dataset_present_correctly(frontend_session):
|
||||||
urn = "urn:li:dataset:(urn:li:dataPlatform:testPlatform,testDataset,PROD)"
|
urn = "urn:li:dataset:(urn:li:dataPlatform:testPlatform,testDataset,PROD)"
|
||||||
json = {
|
json = {
|
||||||
"query": """query getDataset($urn: String!) {\n
|
"query": """query getDataset($urn: String!) {\n
|
||||||
@ -66,8 +73,6 @@ def test_ingestion_via_rest_rapid(frontend_session, wait_for_healthchecks):
|
|||||||
}""",
|
}""",
|
||||||
"variables": {"urn": urn},
|
"variables": {"urn": urn},
|
||||||
}
|
}
|
||||||
#
|
|
||||||
time.sleep(2)
|
|
||||||
response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
res_data = response.json()
|
res_data = response.json()
|
||||||
@ -76,5 +81,9 @@ def test_ingestion_via_rest_rapid(frontend_session, wait_for_healthchecks):
|
|||||||
assert res_data["data"]
|
assert res_data["data"]
|
||||||
assert res_data["data"]["dataset"]
|
assert res_data["data"]["dataset"]
|
||||||
assert res_data["data"]["dataset"]["urn"] == urn
|
assert res_data["data"]["dataset"]["urn"] == urn
|
||||||
# commenting this out temporarily while we work on fixing this race condition for elasticsearch
|
|
||||||
# assert len(res_data["data"]["dataset"]["outgoing"]["relationships"]) == 1
|
|
||||||
|
def test_ingestion_via_rest_rapid(frontend_session, wait_for_healthchecks):
|
||||||
|
ingest_file_via_rest(bootstrap_small)
|
||||||
|
ingest_file_via_rest(bootstrap_small_2)
|
||||||
|
_ensure_dataset_present_correctly(frontend_session)
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
import json
|
import json
|
||||||
import time
|
|
||||||
import urllib
|
import urllib
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import requests
|
import requests
|
||||||
|
import tenacity
|
||||||
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
|
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
|
||||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||||
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
|
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
|
||||||
@ -23,11 +23,12 @@ from datahub.metadata.schema_classes import (
|
|||||||
PartitionSpecClass,
|
PartitionSpecClass,
|
||||||
PartitionTypeClass,
|
PartitionTypeClass,
|
||||||
)
|
)
|
||||||
from tests.utils import delete_urns_from_file, get_gms_url, ingest_file_via_rest, wait_for_healthcheck_util
|
from tests.utils import delete_urns_from_file, get_gms_url, ingest_file_via_rest, wait_for_healthcheck_util, get_sleep_info
|
||||||
|
|
||||||
restli_default_headers = {
|
restli_default_headers = {
|
||||||
"X-RestLi-Protocol-Version": "2.0.0",
|
"X-RestLi-Protocol-Version": "2.0.0",
|
||||||
}
|
}
|
||||||
|
sleep_sec, sleep_times = get_sleep_info()
|
||||||
|
|
||||||
|
|
||||||
def create_test_data(test_file):
|
def create_test_data(test_file):
|
||||||
@ -246,13 +247,12 @@ def test_run_ingestion(generate_test_data):
|
|||||||
ingest_file_via_rest(generate_test_data)
|
ingest_file_via_rest(generate_test_data)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
@tenacity.retry(
|
||||||
def test_gms_get_latest_assertions_results_by_partition():
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
||||||
|
)
|
||||||
|
def _gms_get_latest_assertions_results_by_partition():
|
||||||
urn = make_dataset_urn("postgres", "foo")
|
urn = make_dataset_urn("postgres", "foo")
|
||||||
|
|
||||||
# sleep for elasticsearch indices to be updated
|
|
||||||
time.sleep(5)
|
|
||||||
|
|
||||||
# Query
|
# Query
|
||||||
# Given the dataset
|
# Given the dataset
|
||||||
# show me latest assertion run events grouped-by date, partition, assertionId
|
# show me latest assertion run events grouped-by date, partition, assertionId
|
||||||
@ -313,6 +313,11 @@ def test_gms_get_latest_assertions_results_by_partition():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||||||
|
def test_gms_get_latest_assertions_results_by_partition():
|
||||||
|
_gms_get_latest_assertions_results_by_partition()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||||||
def test_gms_get_assertions_on_dataset():
|
def test_gms_get_assertions_on_dataset():
|
||||||
"""lists all assertion urns including those which may not have executed"""
|
"""lists all assertion urns including those which may not have executed"""
|
||||||
|
@ -1,5 +1,3 @@
|
|||||||
import time
|
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import tenacity
|
import tenacity
|
||||||
from tests.utils import (
|
from tests.utils import (
|
||||||
@ -32,7 +30,6 @@ def test_healthchecks(wait_for_healthchecks):
|
|||||||
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
||||||
)
|
)
|
||||||
def _ensure_more_domains(frontend_session, list_domains_json, before_count):
|
def _ensure_more_domains(frontend_session, list_domains_json, before_count):
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
# Get new count of Domains
|
# Get new count of Domains
|
||||||
response = frontend_session.post(
|
response = frontend_session.post(
|
||||||
|
@ -1,10 +1,12 @@
|
|||||||
import time
|
|
||||||
import pytest
|
import pytest
|
||||||
import requests
|
import requests
|
||||||
from tests.utils import get_frontend_url, wait_for_healthcheck_util, get_admin_credentials
|
import tenacity
|
||||||
|
from tests.utils import get_frontend_url, wait_for_healthcheck_util, get_admin_credentials, get_sleep_info
|
||||||
|
|
||||||
TEST_POLICY_NAME = "Updated Platform Policy"
|
TEST_POLICY_NAME = "Updated Platform Policy"
|
||||||
|
|
||||||
|
sleep_sec, sleep_times = get_sleep_info()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
def wait_for_healthchecks():
|
def wait_for_healthchecks():
|
||||||
@ -32,6 +34,7 @@ def frontend_session(wait_for_healthchecks):
|
|||||||
|
|
||||||
yield session
|
yield session
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=["test_healthchecks"])
|
@pytest.mark.dependency(depends=["test_healthchecks"])
|
||||||
@pytest.fixture(scope='class', autouse=True)
|
@pytest.fixture(scope='class', autouse=True)
|
||||||
def test_frontend_list_policies(frontend_session):
|
def test_frontend_list_policies(frontend_session):
|
||||||
@ -69,6 +72,29 @@ def test_frontend_list_policies(frontend_session):
|
|||||||
)
|
)
|
||||||
assert len(list(result)) == 0
|
assert len(list(result)) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@tenacity.retry(
|
||||||
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
||||||
|
)
|
||||||
|
def _ensure_policy_present(frontend_session, new_urn):
|
||||||
|
res_data = listPolicies(frontend_session)
|
||||||
|
|
||||||
|
assert res_data
|
||||||
|
assert res_data["data"]
|
||||||
|
assert res_data["data"]["listPolicies"]
|
||||||
|
|
||||||
|
# Verify that the updated policy appears in the list and has the appropriate changes
|
||||||
|
result = list(filter(
|
||||||
|
lambda x: x["urn"] == new_urn, res_data["data"]["listPolicies"]["policies"]
|
||||||
|
))
|
||||||
|
print(result)
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0]["description"] == "Updated Metadaata Policy"
|
||||||
|
assert result[0]["privileges"] == ["EDIT_ENTITY_TAGS", "EDIT_ENTITY_GLOSSARY_TERMS"]
|
||||||
|
assert result[0]["actors"]["allUsers"]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=["test_healthchecks"])
|
@pytest.mark.dependency(depends=["test_healthchecks"])
|
||||||
def test_frontend_policy_operations(frontend_session):
|
def test_frontend_policy_operations(frontend_session):
|
||||||
|
|
||||||
@ -103,9 +129,6 @@ def test_frontend_policy_operations(frontend_session):
|
|||||||
|
|
||||||
new_urn = res_data["data"]["createPolicy"]
|
new_urn = res_data["data"]["createPolicy"]
|
||||||
|
|
||||||
# Sleep for eventual consistency
|
|
||||||
time.sleep(3)
|
|
||||||
|
|
||||||
update_json = {
|
update_json = {
|
||||||
"query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n
|
"query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n
|
||||||
updatePolicy(urn: $urn, input: $input) }""",
|
updatePolicy(urn: $urn, input: $input) }""",
|
||||||
@ -136,25 +159,7 @@ def test_frontend_policy_operations(frontend_session):
|
|||||||
assert res_data["data"]["updatePolicy"]
|
assert res_data["data"]["updatePolicy"]
|
||||||
assert res_data["data"]["updatePolicy"] == new_urn
|
assert res_data["data"]["updatePolicy"] == new_urn
|
||||||
|
|
||||||
# Sleep for eventual consistency
|
_ensure_policy_present(frontend_session, new_urn)
|
||||||
time.sleep(3)
|
|
||||||
|
|
||||||
res_data = listPolicies(frontend_session)
|
|
||||||
|
|
||||||
assert res_data
|
|
||||||
assert res_data["data"]
|
|
||||||
assert res_data["data"]["listPolicies"]
|
|
||||||
|
|
||||||
# Verify that the updated policy appears in the list and has the appropriate changes
|
|
||||||
result = list(filter(
|
|
||||||
lambda x: x["urn"] == new_urn, res_data["data"]["listPolicies"]["policies"]
|
|
||||||
))
|
|
||||||
print(result)
|
|
||||||
|
|
||||||
assert len(result) == 1
|
|
||||||
assert result[0]["description"] == "Updated Metadaata Policy"
|
|
||||||
assert result[0]["privileges"] == ["EDIT_ENTITY_TAGS", "EDIT_ENTITY_GLOSSARY_TERMS"]
|
|
||||||
assert result[0]["actors"]["allUsers"] == True
|
|
||||||
|
|
||||||
# Now test that the policy can be deleted
|
# Now test that the policy can be deleted
|
||||||
json = {
|
json = {
|
||||||
@ -180,6 +185,7 @@ def test_frontend_policy_operations(frontend_session):
|
|||||||
)
|
)
|
||||||
assert len(list(result)) == 0
|
assert len(list(result)) == 0
|
||||||
|
|
||||||
|
|
||||||
def listPolicies(session):
|
def listPolicies(session):
|
||||||
json = {
|
json = {
|
||||||
"query": """query listPolicies($input: ListPoliciesInput!) {\n
|
"query": """query listPolicies($input: ListPoliciesInput!) {\n
|
||||||
|
Loading…
x
Reference in New Issue
Block a user