mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-08 17:53:11 +00:00
128 lines
4.3 KiB
Python
128 lines
4.3 KiB
Python
import json
|
|
import sys
|
|
import tempfile
|
|
from typing import Any, Dict, Iterable, List
|
|
|
|
import yaml
|
|
|
|
from datahub.api.entities.corpgroup.corpgroup import CorpGroup
|
|
from datahub.ingestion.graph.client import DataHubGraph
|
|
from tests.utils import run_datahub_cmd, wait_for_writes_to_sync
|
|
|
|
|
|
def sync_elastic() -> None:
|
|
wait_for_writes_to_sync()
|
|
|
|
|
|
def datahub_upsert_group(auth_session: Any, group: CorpGroup) -> None:
|
|
with tempfile.NamedTemporaryFile("w+t", suffix=".yaml") as group_file:
|
|
yaml.dump(group.dict(), group_file)
|
|
group_file.flush()
|
|
upsert_args: List[str] = [
|
|
"group",
|
|
"upsert",
|
|
"-f",
|
|
group_file.name,
|
|
]
|
|
group_create_result = run_datahub_cmd(
|
|
upsert_args,
|
|
env={
|
|
"DATAHUB_GMS_URL": auth_session.gms_url(),
|
|
"DATAHUB_GMS_TOKEN": auth_session.gms_token(),
|
|
},
|
|
)
|
|
assert group_create_result.exit_code == 0
|
|
|
|
|
|
def gen_datahub_groups(num_groups: int) -> Iterable[CorpGroup]:
|
|
for i in range(0, num_groups):
|
|
group = CorpGroup(
|
|
id=f"group_{i}",
|
|
display_name=f"Group {i}",
|
|
email=f"group_{i}@datahubproject.io",
|
|
description=f"The Group {i}",
|
|
picture_link=f"https://images.google.com/group{i}.jpg",
|
|
slack=f"@group{i}",
|
|
owners=["user1"],
|
|
members=["user2"],
|
|
)
|
|
yield group
|
|
|
|
|
|
def datahub_get_group(auth_session, group_urn: str):
|
|
get_args: List[str] = ["get", "--urn", group_urn]
|
|
get_result = run_datahub_cmd(
|
|
get_args,
|
|
env={
|
|
"DATAHUB_GMS_URL": auth_session.gms_url(),
|
|
"DATAHUB_GMS_TOKEN": auth_session.gms_token(),
|
|
},
|
|
)
|
|
assert get_result.exit_code == 0
|
|
try:
|
|
get_result_output_obj: Dict = json.loads(get_result.stdout)
|
|
return get_result_output_obj
|
|
except json.JSONDecodeError as e:
|
|
print("Failed to decode: " + get_result.stdout, file=sys.stderr)
|
|
raise e
|
|
|
|
|
|
def get_group_ownership(graph_client: DataHubGraph, user_urn: str) -> List[str]:
|
|
entities = graph_client.get_related_entities(
|
|
entity_urn=user_urn,
|
|
relationship_types=["OwnedBy"],
|
|
direction=DataHubGraph.RelationshipDirection.INCOMING,
|
|
)
|
|
return [entity.urn for entity in entities]
|
|
|
|
|
|
def get_group_membership(graph_client: DataHubGraph, user_urn: str) -> List[str]:
|
|
entities = graph_client.get_related_entities(
|
|
entity_urn=user_urn,
|
|
relationship_types=["IsMemberOfGroup"],
|
|
direction=DataHubGraph.RelationshipDirection.OUTGOING,
|
|
)
|
|
return [entity.urn for entity in entities]
|
|
|
|
|
|
def test_group_upsert(auth_session: Any, graph_client: DataHubGraph) -> None:
|
|
num_groups: int = 10
|
|
for i, datahub_group in enumerate(gen_datahub_groups(num_groups)):
|
|
datahub_upsert_group(auth_session, datahub_group)
|
|
group_dict = datahub_get_group(auth_session, f"urn:li:corpGroup:group_{i}")
|
|
assert group_dict == {
|
|
"corpGroupEditableInfo": {
|
|
"description": f"The Group {i}",
|
|
"email": f"group_{i}@datahubproject.io",
|
|
"pictureLink": f"https://images.google.com/group{i}.jpg",
|
|
"slack": f"@group{i}",
|
|
},
|
|
"corpGroupInfo": {
|
|
"admins": ["urn:li:corpuser:user1"],
|
|
"description": f"The Group {i}",
|
|
"displayName": f"Group {i}",
|
|
"email": f"group_{i}@datahubproject.io",
|
|
"groups": [],
|
|
"members": ["urn:li:corpuser:user2"],
|
|
"slack": f"@group{i}",
|
|
},
|
|
"corpGroupKey": {"name": f"group_{i}"},
|
|
"ownership": {
|
|
"lastModified": {"actor": "urn:li:corpuser:unknown", "time": 0},
|
|
"owners": [
|
|
{"owner": "urn:li:corpuser:user1", "type": "TECHNICAL_OWNER"}
|
|
],
|
|
"ownerTypes": {},
|
|
},
|
|
"status": {"removed": False},
|
|
}
|
|
|
|
sync_elastic()
|
|
groups_owned = get_group_ownership(graph_client, "urn:li:corpuser:user1")
|
|
groups_partof = get_group_membership(graph_client, "urn:li:corpuser:user2")
|
|
|
|
all_groups = sorted([f"urn:li:corpGroup:group_{i}" for i in range(0, num_groups)])
|
|
|
|
assert sorted(groups_owned) == all_groups
|
|
assert sorted(groups_partof) == all_groups
|