mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-12 19:48:26 +00:00
735 lines
24 KiB
Python
735 lines
24 KiB
Python
# Copyright 2023 Schlameel
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
OpenMetadata high-level API Policy test
|
|
"""
|
|
import uuid
|
|
from copy import deepcopy
|
|
from typing import List
|
|
from unittest import TestCase
|
|
|
|
from metadata.generated.schema.api.policies.createPolicy import CreatePolicyRequest
|
|
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
|
|
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
|
|
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
|
|
from metadata.generated.schema.entity.policies.accessControl.resourceDescriptor import (
|
|
Operation,
|
|
)
|
|
from metadata.generated.schema.entity.policies.accessControl.rule import Effect, Rule
|
|
from metadata.generated.schema.entity.policies.policy import Policy, Rules
|
|
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
|
OpenMetadataConnection,
|
|
)
|
|
from metadata.generated.schema.entity.teams.role import Role
|
|
from metadata.generated.schema.entity.teams.team import Team
|
|
from metadata.generated.schema.entity.teams.user import User
|
|
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
|
OpenMetadataJWTClientConfig,
|
|
)
|
|
from metadata.generated.schema.type.basic import (
|
|
EntityName,
|
|
Expression,
|
|
FullyQualifiedEntityName,
|
|
Markdown,
|
|
Uuid,
|
|
)
|
|
from metadata.generated.schema.type.entityReference import EntityReference
|
|
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
|
|
from metadata.ingestion.ometa.mixins.patch_mixin_utils import PatchOperation
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
from metadata.ingestion.ometa.utils import model_str
|
|
|
|
# Conditions
|
|
CONDITION_IS_OWNER = Expression(root="isOwner()")
|
|
CONDITION_IS_NOT_OWNER = Expression(root="!isOwner")
|
|
CONDITION_NO_OWNER_IS_OWNER = Expression(root="noOwner() || isOwner()")
|
|
|
|
# Resources
|
|
RESOURCE_BOT: str = "Bot"
|
|
RESOURCE_PIPELINE: str = "Pipeline"
|
|
RESOURCE_TABLE: str = "Table"
|
|
|
|
ROLE_FIELDS: List[str] = ["policies", "teams", "users"]
|
|
|
|
|
|
class OMetaRolePolicyTest(TestCase):
|
|
"""
|
|
Run this integration test with the local API available
|
|
Install the ingestion package before running the tests
|
|
"""
|
|
|
|
service_entity_id = None
|
|
policy_entity: Policy = None
|
|
role_entity: Role = None
|
|
create_policy: CreatePolicyRequest = None
|
|
create_role: CreateRoleRequest = None
|
|
role_policy_1: Policy = None
|
|
role_policy_2: Policy = None
|
|
rule_1: Rule = None
|
|
rule_2: Rule = None
|
|
rule_3: Rule = None
|
|
|
|
server_config = OpenMetadataConnection(
|
|
hostPort="http://localhost:8585/api",
|
|
authProvider="openmetadata",
|
|
securityConfig=OpenMetadataJWTClientConfig(
|
|
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
|
),
|
|
)
|
|
metadata = OpenMetadata(server_config)
|
|
|
|
assert metadata.health_check()
|
|
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
"""
|
|
Prepare ingredients
|
|
"""
|
|
|
|
cls.rule_1: Rule = Rule(
|
|
name="rule-1",
|
|
description=Markdown("Description of rule-1"),
|
|
resources=[
|
|
RESOURCE_TABLE,
|
|
],
|
|
operations=[
|
|
Operation.EditAll,
|
|
Operation.ViewAll,
|
|
],
|
|
effect=Effect.allow,
|
|
condition=CONDITION_IS_OWNER,
|
|
)
|
|
|
|
cls.rule_2: Rule = Rule(
|
|
name="rule-2",
|
|
description=Markdown("Description of rule-2"),
|
|
fullyQualifiedName=FullyQualifiedEntityName("test-policy-1.rule-2"),
|
|
resources=[
|
|
RESOURCE_BOT,
|
|
RESOURCE_PIPELINE,
|
|
RESOURCE_TABLE,
|
|
],
|
|
operations=[
|
|
Operation.EditCustomFields,
|
|
],
|
|
effect=Effect.deny,
|
|
condition=CONDITION_NO_OWNER_IS_OWNER,
|
|
)
|
|
|
|
cls.rule_3: Rule = Rule(
|
|
name="rule-3",
|
|
fullyQualifiedName=FullyQualifiedEntityName("test-policy-1.rule-3"),
|
|
resources=[
|
|
RESOURCE_TABLE,
|
|
],
|
|
operations=[
|
|
Operation.EditAll,
|
|
Operation.ViewAll,
|
|
],
|
|
effect=Effect.allow,
|
|
condition=CONDITION_IS_OWNER,
|
|
)
|
|
|
|
cls.policy_entity = Policy(
|
|
id=Uuid(uuid.uuid4()),
|
|
name=EntityName("test-policy-1"),
|
|
fullyQualifiedName=EntityName("test-policy-1"),
|
|
description=Markdown("Description of test policy 1"),
|
|
rules=Rules(
|
|
root=[
|
|
cls.rule_1,
|
|
cls.rule_2,
|
|
]
|
|
),
|
|
)
|
|
|
|
cls.create_policy = CreatePolicyRequest(
|
|
name=EntityName("test-policy-1"),
|
|
description=Markdown("Description of test policy 1"),
|
|
rules=Rules(
|
|
root=[
|
|
cls.rule_1,
|
|
cls.rule_2,
|
|
]
|
|
),
|
|
)
|
|
|
|
cls.role_policy_1 = cls.metadata.create_or_update(
|
|
CreatePolicyRequest(
|
|
name=EntityName("test-role-policy-1"),
|
|
description=Markdown("Description of test role policy 1"),
|
|
rules=Rules(
|
|
root=[
|
|
cls.rule_1,
|
|
cls.rule_2,
|
|
]
|
|
),
|
|
)
|
|
)
|
|
|
|
cls.role_policy_2 = cls.metadata.create_or_update(
|
|
data=CreatePolicyRequest(
|
|
name=EntityName("test-role-policy-2"),
|
|
description=Markdown("Description of test role policy 2"),
|
|
rules=Rules(
|
|
root=[
|
|
cls.rule_1,
|
|
]
|
|
),
|
|
)
|
|
)
|
|
|
|
cls.role_entity = Role(
|
|
id=Uuid(uuid.uuid4()),
|
|
name=EntityName("test-role"),
|
|
fullyQualifiedName=FullyQualifiedEntityName("test-role"),
|
|
policies=EntityReferenceList(
|
|
root=[
|
|
EntityReference(id=cls.role_policy_1.id, type="policy"),
|
|
]
|
|
),
|
|
)
|
|
|
|
cls.create_role = CreateRoleRequest(
|
|
name=EntityName("test-role"),
|
|
policies=[
|
|
cls.role_policy_1.name,
|
|
],
|
|
)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls) -> None:
|
|
"""
|
|
Clean up
|
|
"""
|
|
policies = cls.metadata.list_entities(entity=Policy)
|
|
for policy in policies.entities:
|
|
if model_str(policy.name).startswith(model_str(cls.policy_entity.name)):
|
|
cls.metadata.delete(entity=Policy, entity_id=model_str(policy.id))
|
|
|
|
cls.metadata.delete(entity=Policy, entity_id=model_str(cls.role_policy_1.id))
|
|
cls.metadata.delete(entity=Policy, entity_id=model_str(cls.role_policy_2.id))
|
|
|
|
roles = cls.metadata.list_entities(entity=Role)
|
|
for role in roles.entities:
|
|
if model_str(role.name.root).startswith(
|
|
model_str(cls.role_entity.name.root)
|
|
):
|
|
cls.metadata.delete(entity=Role, entity_id=model_str(role.id))
|
|
|
|
def test_policy_create(self):
|
|
"""
|
|
We can create a Policy and we receive it back as Entity
|
|
"""
|
|
|
|
res: Policy = self.metadata.create_or_update(data=self.create_policy)
|
|
|
|
self.assertEqual(res.name, self.policy_entity.name)
|
|
self.assertEqual(res.rules.root[0].name, self.rule_1.name)
|
|
|
|
def test_policy_update(self):
|
|
"""
|
|
Updating it properly changes its properties
|
|
"""
|
|
|
|
res_create = self.metadata.create_or_update(data=self.create_policy)
|
|
|
|
updated = self.create_policy.model_dump(exclude_unset=True)
|
|
updated["rules"] = [self.rule_3]
|
|
updated_policy_entity = CreatePolicyRequest(**updated)
|
|
|
|
res = self.metadata.create_or_update(data=updated_policy_entity)
|
|
|
|
# Same ID, updated owner
|
|
self.assertEqual(res_create.id, res.id)
|
|
self.assertEqual(res.rules.root[0].name, self.rule_3.name)
|
|
|
|
def test_policy_get_name(self):
|
|
"""
|
|
We can fetch a Policy by name and get it back as Entity
|
|
"""
|
|
|
|
self.metadata.create_or_update(data=self.create_policy)
|
|
|
|
res = self.metadata.get_by_name(
|
|
entity=Policy, fqn=model_str(self.policy_entity.fullyQualifiedName)
|
|
)
|
|
self.assertEqual(res.name, self.policy_entity.name)
|
|
|
|
def test_policy_get_id(self):
|
|
"""
|
|
We can fetch a Policy by ID and get it back as Entity
|
|
"""
|
|
|
|
self.metadata.create_or_update(data=self.create_policy)
|
|
|
|
# First pick up by name
|
|
res_name = self.metadata.get_by_name(
|
|
entity=Policy, fqn=model_str(self.policy_entity.fullyQualifiedName)
|
|
)
|
|
# Then fetch by ID
|
|
res = self.metadata.get_by_id(entity=Policy, entity_id=model_str(res_name.id))
|
|
|
|
self.assertEqual(res_name.id, res.id)
|
|
|
|
def test_policy_list(self):
|
|
"""
|
|
We can list all our Policies
|
|
"""
|
|
|
|
self.metadata.create_or_update(data=self.create_policy)
|
|
|
|
res = self.metadata.list_entities(entity=Policy)
|
|
|
|
# Fetch our test Database. We have already inserted it, so we should find it
|
|
data = next(
|
|
iter(ent for ent in res.entities if ent.name == self.policy_entity.name),
|
|
None,
|
|
)
|
|
assert data
|
|
|
|
def test_policy_list_all(self):
|
|
"""
|
|
Validate generator utility to fetch all Policies
|
|
"""
|
|
fake_create = deepcopy(self.create_policy)
|
|
for i in range(0, 10):
|
|
fake_create.name = EntityName(self.create_policy.name.root + str(i))
|
|
self.metadata.create_or_update(data=fake_create)
|
|
|
|
all_entities = self.metadata.list_all_entities(
|
|
entity=Policy, limit=2 # paginate in batches of pairs
|
|
)
|
|
assert (
|
|
len(list(all_entities)) >= 10
|
|
) # In case the default testing entity is not present
|
|
|
|
def test_policy_delete(self):
|
|
"""
|
|
We can delete a Policy by ID
|
|
"""
|
|
|
|
self.metadata.create_or_update(data=self.create_policy)
|
|
|
|
# Find by name
|
|
res_name = self.metadata.get_by_name(
|
|
entity=Policy, fqn=model_str(self.policy_entity.fullyQualifiedName)
|
|
)
|
|
# Then fetch by ID
|
|
res_id = self.metadata.get_by_id(entity=Policy, entity_id=res_name.id)
|
|
|
|
# Delete
|
|
self.metadata.delete(entity=Policy, entity_id=model_str(res_id.id))
|
|
|
|
# Then we should not find it
|
|
res = self.metadata.list_entities(entity=Policy)
|
|
assert not next(
|
|
iter(
|
|
ent
|
|
for ent in res.entities
|
|
if ent.fullyQualifiedName == self.policy_entity.fullyQualifiedName
|
|
),
|
|
None,
|
|
)
|
|
|
|
def test_policy_list_versions(self):
|
|
"""
|
|
test list policy entity versions
|
|
"""
|
|
self.metadata.create_or_update(data=self.create_policy)
|
|
|
|
# Find by name
|
|
res_name = self.metadata.get_by_name(
|
|
entity=Policy, fqn=model_str(self.policy_entity.fullyQualifiedName)
|
|
)
|
|
|
|
res = self.metadata.get_list_entity_versions(
|
|
entity=Policy, entity_id=model_str(res_name.id)
|
|
)
|
|
assert res
|
|
|
|
def test_policy_get_entity_version(self):
|
|
"""
|
|
test get policy entity version
|
|
"""
|
|
self.metadata.create_or_update(data=self.create_policy)
|
|
|
|
# Find by name
|
|
res_name = self.metadata.get_by_name(
|
|
entity=Policy, fqn=model_str(self.policy_entity.fullyQualifiedName)
|
|
)
|
|
res = self.metadata.get_entity_version(
|
|
entity=Policy, entity_id=model_str(res_name.id), version=0.1
|
|
)
|
|
|
|
# check we get the correct version requested and the correct entity ID
|
|
assert res.version.root == 0.1
|
|
assert res.id == res_name.id
|
|
|
|
def test_policy_get_entity_ref(self):
|
|
"""
|
|
test get EntityReference
|
|
"""
|
|
res = self.metadata.create_or_update(data=self.create_policy)
|
|
entity_ref = self.metadata.get_entity_reference(
|
|
entity=Policy, fqn=res.fullyQualifiedName
|
|
)
|
|
|
|
assert res.id == entity_ref.id
|
|
|
|
def test_policy_patch_rule(self):
|
|
"""
|
|
test PATCHing the rules of a policy
|
|
"""
|
|
policy: Policy = self.metadata.create_or_update(self.create_policy)
|
|
dest_policy = deepcopy(policy)
|
|
if dest_policy.rules is None:
|
|
dest_policy.rules.root = list()
|
|
dest_policy.rules.root.append(self.rule_3)
|
|
# Add rule
|
|
|
|
res: Policy = self.metadata.patch(
|
|
entity=Policy, source=policy, destination=dest_policy
|
|
)
|
|
self.assertIsNotNone(res)
|
|
self.assertEqual(len(res.rules.root), 3)
|
|
self.assertEqual(res.rules.root[2].name, self.rule_3.name)
|
|
dest_policy = deepcopy(res)
|
|
dest_policy.rules.root.pop(2)
|
|
# Remove last rule
|
|
res = self.metadata.patch(entity=Policy, source=res, destination=dest_policy)
|
|
self.assertIsNotNone(res)
|
|
self.assertEqual(len(res.rules.root), 2)
|
|
self.assertEqual(res.rules.root[1].name, self.rule_2.name)
|
|
dest_policy = deepcopy(res)
|
|
dest_policy.rules.root.append(self.rule_3)
|
|
# Remove rule with fewer operations
|
|
res: Policy = self.metadata.patch(
|
|
entity=Policy, source=policy, destination=dest_policy
|
|
)
|
|
dest_policy = deepcopy(res)
|
|
dest_policy.rules.root.remove(self.rule_2)
|
|
res: Policy = self.metadata.patch(
|
|
entity=Policy, source=res, destination=dest_policy
|
|
)
|
|
self.assertIsNotNone(res)
|
|
self.assertEqual(len(res.rules.root), 2)
|
|
self.assertEqual(res.rules.root[1].name, self.rule_3.name)
|
|
self.assertEqual(len(res.rules.root[1].operations), len(self.rule_3.operations))
|
|
self.assertIsNone(res.rules.root[1].description)
|
|
|
|
# Remove rule with more operations
|
|
policy = self.metadata.create_or_update(self.create_policy)
|
|
dest_policy = deepcopy(policy)
|
|
dest_policy.rules.root.remove(self.rule_1)
|
|
res = self.metadata.patch(entity=Policy, source=res, destination=dest_policy)
|
|
self.assertIsNotNone(res)
|
|
self.assertEqual(len(res.rules.root), 1)
|
|
self.assertEqual(res.rules.root[0].name, self.rule_2.name)
|
|
self.assertEqual(len(res.rules.root[0].operations), len(self.rule_2.operations))
|
|
self.assertEqual(
|
|
res.rules.root[0].fullyQualifiedName, self.rule_2.fullyQualifiedName
|
|
)
|
|
|
|
dest_policy = deepcopy(res)
|
|
dest_policy.rules.root.remove(self.rule_2)
|
|
# Try to remove the only rule - Fails
|
|
res = self.metadata.patch(entity=Policy, source=res, destination=dest_policy)
|
|
self.assertIsNone(res)
|
|
|
|
def test_role_create(self):
|
|
"""
|
|
We can create a Role and we receive it back as Entity
|
|
"""
|
|
|
|
res = self.metadata.create_or_update(data=self.create_role)
|
|
|
|
self.assertEqual(res.name, self.role_entity.name)
|
|
self.assertEqual(res.policies.root[0].name, model_str(self.role_policy_1.name))
|
|
|
|
def test_role_update(self):
|
|
"""
|
|
Updating it properly changes its properties
|
|
"""
|
|
|
|
res_create = self.metadata.create_or_update(data=self.create_role)
|
|
|
|
updated = self.create_role.model_dump(exclude_unset=True)
|
|
updated["policies"] = [self.role_policy_2.name]
|
|
updated_entity = CreateRoleRequest(**updated)
|
|
|
|
res = self.metadata.create_or_update(data=updated_entity)
|
|
|
|
# Same ID, updated owner
|
|
self.assertEqual(res_create.id, res.id)
|
|
self.assertEqual(res.policies.root[0].name, model_str(self.role_policy_2.name))
|
|
|
|
def test_role_get_name(self):
|
|
"""
|
|
We can fetch a Role by name and get it back as Entity
|
|
"""
|
|
|
|
self.metadata.create_or_update(data=self.create_role)
|
|
|
|
res = self.metadata.get_by_name(
|
|
entity=Role, fqn=self.role_entity.fullyQualifiedName
|
|
)
|
|
self.assertEqual(res.name, self.role_entity.name)
|
|
|
|
def test_role_get_id(self):
|
|
"""
|
|
We can fetch a Role by ID and get it back as Entity
|
|
"""
|
|
|
|
self.metadata.create_or_update(data=self.create_role)
|
|
|
|
# First pick up by name
|
|
res_name = self.metadata.get_by_name(
|
|
entity=Role, fqn=self.role_entity.fullyQualifiedName
|
|
)
|
|
# Then fetch by ID
|
|
res = self.metadata.get_by_id(entity=Role, entity_id=model_str(res_name.id))
|
|
|
|
self.assertEqual(res_name.id, res.id)
|
|
|
|
def test_role_list(self):
|
|
"""
|
|
We can list all our Roles
|
|
"""
|
|
|
|
self.metadata.create_or_update(data=self.create_role)
|
|
|
|
res = self.metadata.list_entities(entity=Role)
|
|
|
|
# Fetch our test Database. We have already inserted it, so we should find it
|
|
data = next(
|
|
iter(ent for ent in res.entities if ent.name == self.role_entity.name), None
|
|
)
|
|
assert data
|
|
|
|
def test_role_list_all(self):
|
|
"""
|
|
Validate generator utility to fetch all roles
|
|
"""
|
|
fake_create = deepcopy(self.create_role)
|
|
for i in range(0, 10):
|
|
fake_create.name = EntityName(self.create_role.name.root + str(i))
|
|
self.metadata.create_or_update(data=fake_create)
|
|
|
|
all_entities = self.metadata.list_all_entities(
|
|
entity=Role, limit=2 # paginate in batches of pairs
|
|
)
|
|
assert (
|
|
len(list(all_entities)) >= 10
|
|
) # In case the default testing entity is not present
|
|
|
|
def test_role_delete(self):
|
|
"""
|
|
We can delete a Role by ID
|
|
"""
|
|
|
|
self.metadata.create_or_update(data=self.create_role)
|
|
|
|
# Find by name
|
|
res_name = self.metadata.get_by_name(
|
|
entity=Role, fqn=self.role_entity.fullyQualifiedName
|
|
)
|
|
# Then fetch by ID
|
|
res_id = self.metadata.get_by_id(entity=Role, entity_id=res_name.id)
|
|
|
|
# Delete
|
|
self.metadata.delete(entity=Role, entity_id=str(res_id.id.root))
|
|
|
|
# Then we should not find it
|
|
res = self.metadata.list_entities(entity=Role)
|
|
assert not next(
|
|
iter(
|
|
ent
|
|
for ent in res.entities
|
|
if ent.fullyQualifiedName == self.role_entity.fullyQualifiedName
|
|
),
|
|
None,
|
|
)
|
|
|
|
def test_role_list_versions(self):
|
|
"""
|
|
test list role entity versions
|
|
"""
|
|
self.metadata.create_or_update(data=self.create_role)
|
|
|
|
# Find by name
|
|
res_name = self.metadata.get_by_name(
|
|
entity=Role, fqn=self.role_entity.fullyQualifiedName
|
|
)
|
|
|
|
res = self.metadata.get_list_entity_versions(
|
|
entity=Role, entity_id=model_str(res_name.id)
|
|
)
|
|
assert res
|
|
|
|
def test_role_get_entity_version(self):
|
|
"""
|
|
test get role entity version
|
|
"""
|
|
self.metadata.create_or_update(data=self.create_role)
|
|
|
|
# Find by name
|
|
res_name = self.metadata.get_by_name(
|
|
entity=Role, fqn=self.role_entity.fullyQualifiedName
|
|
)
|
|
res = self.metadata.get_entity_version(
|
|
entity=Role, entity_id=res_name.id.root, version=0.1
|
|
)
|
|
|
|
# check we get the correct version requested and the correct entity ID
|
|
assert res.version.root == 0.1
|
|
assert res.id == res_name.id
|
|
|
|
def test_role_get_entity_ref(self):
|
|
"""
|
|
test get EntityReference
|
|
"""
|
|
res = self.metadata.create_or_update(data=self.create_role)
|
|
entity_ref = self.metadata.get_entity_reference(
|
|
entity=Role, fqn=res.fullyQualifiedName
|
|
)
|
|
|
|
assert res.id == entity_ref.id
|
|
|
|
def test_role_add_user(self):
|
|
"""
|
|
test adding a role to a user
|
|
"""
|
|
role: Role = self.metadata.create_or_update(data=self.create_role)
|
|
|
|
user: User = self.metadata.create_or_update(
|
|
data=CreateUserRequest(
|
|
name="test-role-user",
|
|
email="test-role@user.com",
|
|
roles=[role.id],
|
|
),
|
|
)
|
|
|
|
res: Role = self.metadata.get_by_name(
|
|
entity=Role,
|
|
fqn=self.role_entity.fullyQualifiedName,
|
|
fields=ROLE_FIELDS,
|
|
)
|
|
assert res.users.root[0].id == user.id
|
|
|
|
self.metadata.delete(entity=User, entity_id=user.id)
|
|
|
|
def test_role_add_team(self):
|
|
"""
|
|
Test adding a role to a team
|
|
"""
|
|
role: Role = self.metadata.create_or_update(data=self.create_role)
|
|
|
|
user: User = self.metadata.create_or_update(
|
|
data=CreateUserRequest(
|
|
name="test-role-user",
|
|
email="test-role@user.com",
|
|
),
|
|
)
|
|
|
|
team: Team = self.metadata.create_or_update(
|
|
data=CreateTeamRequest(
|
|
name="test-role-team-1",
|
|
teamType="Group",
|
|
users=[user.id],
|
|
defaultRoles=[role.id],
|
|
)
|
|
)
|
|
|
|
res: Role = self.metadata.get_by_name(
|
|
entity=Role,
|
|
fqn=self.role_entity.fullyQualifiedName,
|
|
fields=ROLE_FIELDS,
|
|
)
|
|
assert res.teams.root[0].id == team.id
|
|
|
|
self.metadata.delete(entity=Team, entity_id=team.id)
|
|
self.metadata.delete(entity=User, entity_id=user.id)
|
|
|
|
def test_role_patch_policies(self):
|
|
"""
|
|
test PATCHing the policies of a role
|
|
"""
|
|
|
|
# Add policy to role
|
|
role: Role = self.metadata.create_or_update(data=self.create_role)
|
|
|
|
res: Role = self.metadata.patch_role_policy(
|
|
entity_id=role.id,
|
|
policy_id=self.role_policy_2.id,
|
|
)
|
|
assert res
|
|
assert res.id == role.id
|
|
assert len(res.policies.root) == 2
|
|
assert res.policies.root[1].id == self.role_policy_2.id
|
|
|
|
# Remove last policy from role
|
|
res = self.metadata.patch_role_policy(
|
|
entity_id=role.id,
|
|
policy_id=self.role_policy_2.id,
|
|
operation=PatchOperation.REMOVE,
|
|
)
|
|
assert res
|
|
assert res.id == role.id
|
|
assert len(res.policies.root) == 1
|
|
assert res.policies.root[0].id == self.role_policy_1.id
|
|
|
|
# Remove first policy from role
|
|
res: Role = self.metadata.patch_role_policy(
|
|
entity_id=role.id,
|
|
policy_id=self.role_policy_2.id,
|
|
operation=PatchOperation.ADD,
|
|
)
|
|
res = self.metadata.patch_role_policy(
|
|
entity_id=role.id,
|
|
policy_id=self.role_policy_1.id,
|
|
operation=PatchOperation.REMOVE,
|
|
)
|
|
assert res
|
|
assert res.id == role.id
|
|
assert len(res.policies.root) == 1
|
|
assert res.policies.root[0].id == self.role_policy_2.id
|
|
|
|
# Try to remove the only policy - Fail
|
|
res = self.metadata.patch_role_policy(
|
|
entity_id=role.id,
|
|
policy_id=self.role_policy_2.id,
|
|
operation=PatchOperation.REMOVE,
|
|
)
|
|
self.assertEqual(res, None)
|
|
|
|
# Nonexistent role ID - Fail
|
|
res = self.metadata.patch_role_policy(
|
|
entity_id=str(uuid.uuid4()),
|
|
policy_id=self.role_policy_1.id,
|
|
operation=PatchOperation.ADD,
|
|
)
|
|
self.assertEqual(res, None)
|
|
|
|
# Attempt to remove nonexistent policy - Fail
|
|
res: Role = self.metadata.patch_role_policy(
|
|
entity_id=role.id,
|
|
policy_id=self.role_policy_1.id,
|
|
operation=PatchOperation.ADD,
|
|
)
|
|
res = self.metadata.patch_role_policy(
|
|
entity_id=role.id,
|
|
policy_id=str(uuid.uuid4()),
|
|
operation=PatchOperation.REMOVE,
|
|
)
|
|
self.assertEqual(res, None)
|