2025-05-14 23:26:18 -07:00

128 lines
4.3 KiB
Python

import json
import sys
import tempfile
from typing import Any, Dict, Iterable, List
import yaml
from datahub.api.entities.corpgroup.corpgroup import CorpGroup
from datahub.ingestion.graph.client import DataHubGraph
from tests.utils import run_datahub_cmd, wait_for_writes_to_sync
def sync_elastic() -> None:
wait_for_writes_to_sync()
def datahub_upsert_group(auth_session: Any, group: CorpGroup) -> None:
with tempfile.NamedTemporaryFile("w+t", suffix=".yaml") as group_file:
yaml.dump(group.dict(), group_file)
group_file.flush()
upsert_args: List[str] = [
"group",
"upsert",
"-f",
group_file.name,
]
group_create_result = run_datahub_cmd(
upsert_args,
env={
"DATAHUB_GMS_URL": auth_session.gms_url(),
"DATAHUB_GMS_TOKEN": auth_session.gms_token(),
},
)
assert group_create_result.exit_code == 0
def gen_datahub_groups(num_groups: int) -> Iterable[CorpGroup]:
for i in range(0, num_groups):
group = CorpGroup(
id=f"group_{i}",
display_name=f"Group {i}",
email=f"group_{i}@datahubproject.io",
description=f"The Group {i}",
picture_link=f"https://images.google.com/group{i}.jpg",
slack=f"@group{i}",
owners=["user1"],
members=["user2"],
)
yield group
def datahub_get_group(auth_session, group_urn: str):
get_args: List[str] = ["get", "--urn", group_urn]
get_result = run_datahub_cmd(
get_args,
env={
"DATAHUB_GMS_URL": auth_session.gms_url(),
"DATAHUB_GMS_TOKEN": auth_session.gms_token(),
},
)
assert get_result.exit_code == 0
try:
get_result_output_obj: Dict = json.loads(get_result.stdout)
return get_result_output_obj
except json.JSONDecodeError as e:
print("Failed to decode: " + get_result.stdout, file=sys.stderr)
raise e
def get_group_ownership(graph_client: DataHubGraph, user_urn: str) -> List[str]:
entities = graph_client.get_related_entities(
entity_urn=user_urn,
relationship_types=["OwnedBy"],
direction=DataHubGraph.RelationshipDirection.INCOMING,
)
return [entity.urn for entity in entities]
def get_group_membership(graph_client: DataHubGraph, user_urn: str) -> List[str]:
entities = graph_client.get_related_entities(
entity_urn=user_urn,
relationship_types=["IsMemberOfGroup"],
direction=DataHubGraph.RelationshipDirection.OUTGOING,
)
return [entity.urn for entity in entities]
def test_group_upsert(auth_session: Any, graph_client: DataHubGraph) -> None:
num_groups: int = 10
for i, datahub_group in enumerate(gen_datahub_groups(num_groups)):
datahub_upsert_group(auth_session, datahub_group)
group_dict = datahub_get_group(auth_session, f"urn:li:corpGroup:group_{i}")
assert group_dict == {
"corpGroupEditableInfo": {
"description": f"The Group {i}",
"email": f"group_{i}@datahubproject.io",
"pictureLink": f"https://images.google.com/group{i}.jpg",
"slack": f"@group{i}",
},
"corpGroupInfo": {
"admins": ["urn:li:corpuser:user1"],
"description": f"The Group {i}",
"displayName": f"Group {i}",
"email": f"group_{i}@datahubproject.io",
"groups": [],
"members": ["urn:li:corpuser:user2"],
"slack": f"@group{i}",
},
"corpGroupKey": {"name": f"group_{i}"},
"ownership": {
"lastModified": {"actor": "urn:li:corpuser:unknown", "time": 0},
"owners": [
{"owner": "urn:li:corpuser:user1", "type": "TECHNICAL_OWNER"}
],
"ownerTypes": {},
},
"status": {"removed": False},
}
sync_elastic()
groups_owned = get_group_ownership(graph_client, "urn:li:corpuser:user1")
groups_partof = get_group_membership(graph_client, "urn:li:corpuser:user2")
all_groups = sorted([f"urn:li:corpGroup:group_{i}" for i in range(0, num_groups)])
assert sorted(groups_owned) == all_groups
assert sorted(groups_partof) == all_groups