import json import sys import tempfile import time from typing import Any, Dict, Iterable, List import yaml from click.testing import CliRunner, Result from datahub.api.entities.corpuser.corpuser import CorpUser from datahub.entrypoints import datahub from tests.consistency_utils import wait_for_writes_to_sync runner = CliRunner(mix_stderr=False) def datahub_upsert_user(auth_session, user: CorpUser) -> None: with tempfile.NamedTemporaryFile("w+t", suffix=".yaml") as user_file: yaml.dump(user.dict(), user_file) user_file.flush() upsert_args: List[str] = [ "user", "upsert", "-f", user_file.name, ] user_create_result = runner.invoke( datahub, upsert_args, env={ "DATAHUB_GMS_URL": auth_session.gms_url(), "DATAHUB_GMS_TOKEN": auth_session.gms_token(), }, ) assert user_create_result.exit_code == 0 def gen_datahub_users(num_users: int) -> Iterable[CorpUser]: for i in range(0, num_users): user = CorpUser( id=f"user_{i}", display_name=f"User {i}", email=f"user_{i}@datahubproject.io", title=f"User {i}", first_name="User", last_name=f"{i}", groups=[f"urn:li:corpGroup:group_{i}"], description=f"The User {i}", slack=f"@user{i}", picture_link=f"https://images.google.com/user{i}.jpg", phone=f"1-800-USER-{i}", ) yield user def datahub_get_user(auth_session: Any, user_urn: str): get_args: List[str] = ["get", "--urn", user_urn] get_result: Result = runner.invoke( datahub, get_args, env={ "DATAHUB_GMS_URL": auth_session.gms_url(), "DATAHUB_GMS_TOKEN": auth_session.gms_token(), }, ) assert get_result.exit_code == 0 try: get_result_output_obj: Dict = json.loads(get_result.stdout) return get_result_output_obj except json.JSONDecodeError as e: print("Failed to decode: " + get_result.stdout, file=sys.stderr) raise e def test_user_upsert(auth_session: Any) -> None: num_user_profiles: int = 10 max_retries: int = 3 backoff_seconds: float = 1.0 # Create users first for _i, datahub_user in enumerate(gen_datahub_users(num_user_profiles)): datahub_upsert_user(auth_session, datahub_user) # Initial wait for writes wait_for_writes_to_sync() # Retry logic for getting all users and assertion for attempt in range(max_retries): try: actual = [] expected = [] # Collect all user data for _i in range(num_user_profiles): user_dict = datahub_get_user(auth_session, f"urn:li:corpuser:user_{_i}") actual.append(user_dict) expected.append( { "corpUserEditableInfo": { "aboutMe": f"The User {_i}", "displayName": f"User {_i}", "email": f"user_{_i}@datahubproject.io", "phone": f"1-800-USER-{_i}", "pictureLink": f"https://images.google.com/user{_i}.jpg", "skills": [], "slack": f"@user{_i}", "teams": [], }, "corpUserKey": {"username": f"user_{_i}"}, "groupMembership": {"groups": [f"urn:li:corpGroup:group_{_i}"]}, "status": {"removed": False}, } ) # Try assertion assert actual == expected # If assertion passes, we're done return except AssertionError: if attempt < max_retries - 1: wait_time = backoff_seconds * (2**attempt) print( f"Assertion failed on attempt {attempt + 1}, waiting {wait_time} seconds before retrying..." ) # Wait before retrying time.sleep(wait_time) # Additional sync wait wait_for_writes_to_sync() else: # On last attempt, let the assertion error propagate raise