Issue #3809: Add python client for Roles and Policies (#10531)

* Issue #3809: Add python client for Roles and Policies
Includes Tests

* #3809: Add python client for Roles and Policies
- Moved constants to enums in client_utils.py
- Updated all patch methods to utilized new enums
- includes tests

* #3809: Add python client for Roles and Policies
- includes tests
- merged upstream updates and updated to use new enums
This commit is contained in:
Schlameel 2023-03-20 00:42:01 -07:00 committed by GitHub
parent 47297bf0f2
commit df855ad8c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1345 additions and 61 deletions

View File

@ -24,6 +24,12 @@ from metadata.generated.schema.type import basic
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import LabelType, State, TagSource
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.patch import (
PatchField,
PatchOperation,
PatchPath,
PatchValue,
)
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.helpers import find_column_in_table_with_index
from metadata.utils.logger import ometa_logger
@ -32,29 +38,6 @@ logger = ometa_logger()
T = TypeVar("T", bound=BaseModel)
OPERATION = "op"
PATH = "path"
VALUE = "value"
VALUE_ID: str = "id"
VALUE_TYPE: str = "type"
# Operations
ADD = "add"
REPLACE = "replace"
REMOVE = "remove"
# OM specific description handling
ENTITY_DESCRIPTION = "/description"
COL_DESCRIPTION = "/columns/{index}/description"
TABLE_CONSTRAINTS = "/tableConstraints"
ENTITY_TAG = "/tags/{tag_index}"
COL_TAG = "/columns/{index}/tags/{tag_index}"
# Paths
OWNER_PATH: str = "/owner"
OWNER_TYPES: List[str] = ["user", "team"]
@ -130,9 +113,11 @@ class OMetaPatchMixin(Generic[T]):
data=json.dumps(
[
{
OPERATION: ADD if not instance.description else REPLACE,
PATH: ENTITY_DESCRIPTION,
VALUE: description,
PatchField.OPERATION: PatchOperation.ADD
if not instance.description
else PatchOperation.REPLACE,
PatchField.PATH: PatchPath.DESCRIPTION,
PatchField.VALUE: description,
}
]
),
@ -196,9 +181,13 @@ class OMetaPatchMixin(Generic[T]):
data=json.dumps(
[
{
OPERATION: ADD if not col.description else REPLACE,
PATH: COL_DESCRIPTION.format(index=col_index),
VALUE: description,
PatchField.OPERATION: PatchOperation.ADD
if not col.description
else PatchOperation.REPLACE,
PatchField.PATH: PatchPath.COLUMNS_DESCRIPTION.format(
index=col_index
),
PatchField.VALUE: description,
}
]
),
@ -241,13 +230,15 @@ class OMetaPatchMixin(Generic[T]):
data=json.dumps(
[
{
OPERATION: ADD if not table.tableConstraints else REPLACE,
PATH: TABLE_CONSTRAINTS,
VALUE: [
PatchField.OPERATION: PatchOperation.ADD
if not table.tableConstraints
else PatchOperation.REPLACE,
PatchField.PATH: PatchPath.TABLE_CONSTRAINTS,
PatchField.VALUE: [
{
"constraintType": constraint.constraintType.value,
"columns": constraint.columns,
"referredColumns": [
PatchValue.CONSTRAINT_TYPE: constraint.constraintType.value,
PatchValue.COLUMNS: constraint.columns,
PatchValue.REFERRED_COLUMNS: [
col.__root__
for col in constraint.referredColumns or []
],
@ -274,7 +265,9 @@ class OMetaPatchMixin(Generic[T]):
entity_id: Union[str, basic.Uuid],
tag_fqn: str,
from_glossary: bool = False,
operation: str = ADD,
operation: Union[
PatchOperation.ADD, PatchOperation.REMOVE
] = PatchOperation.ADD,
) -> Optional[T]:
"""
Given an Entity type and ID, JSON PATCH the tag.
@ -296,15 +289,17 @@ class OMetaPatchMixin(Generic[T]):
try:
res = None
if operation == ADD:
if operation == PatchOperation.ADD:
res = self.client.patch(
path=f"{self.get_suffix(entity)}/{model_str(entity_id)}",
data=json.dumps(
[
{
OPERATION: ADD,
PATH: ENTITY_TAG.format(tag_index=tag_index),
VALUE: {
PatchField.OPERATION: PatchOperation.ADD,
PatchField.PATH: PatchPath.TAGS.format(
tag_index=tag_index
),
PatchField.VALUE: {
"labelType": LabelType.Automated.value,
"source": TagSource.Classification.value
if not from_glossary
@ -316,14 +311,16 @@ class OMetaPatchMixin(Generic[T]):
]
),
)
elif operation == REMOVE:
elif operation == PatchOperation.REMOVE:
res = self.client.patch(
path=f"{self.get_suffix(entity)}/{model_str(entity_id)}",
data=json.dumps(
[
{
OPERATION: REMOVE,
PATH: ENTITY_TAG.format(tag_index=tag_index),
PatchField.OPERATION: PatchOperation.REMOVE,
PatchField.PATH: PatchPath.TAGS.format(
tag_index=tag_index
),
}
]
),
@ -344,7 +341,9 @@ class OMetaPatchMixin(Generic[T]):
column_name: str,
tag_fqn: str,
from_glossary: bool = False,
operation: str = ADD,
operation: Union[
PatchOperation.ADD, PatchOperation.REMOVE
] = PatchOperation.ADD,
is_suggested: bool = False,
) -> Optional[T]:
"""Given an Entity ID, JSON PATCH the tag of the column
@ -372,38 +371,38 @@ class OMetaPatchMixin(Generic[T]):
tag_index = len(col.tags) - 1 if col.tags else 0
try:
res = None
if operation == ADD:
if operation == PatchOperation.ADD:
res = self.client.patch(
path=f"{self.get_suffix(Table)}/{model_str(entity_id)}",
data=json.dumps(
[
{
OPERATION: ADD,
PATH: COL_TAG.format(
PatchField.OPERATION: PatchOperation.ADD,
PatchField.PATH: PatchPath.COLUMNS_TAGS.format(
index=col_index, tag_index=tag_index
),
VALUE: {
"labelType": LabelType.Automated.value,
"source": TagSource.Classification.value
PatchField.VALUE: {
PatchValue.LABEL_TYPE: LabelType.Automated.value,
PatchValue.SOURCE: TagSource.Classification.value
if not from_glossary
else TagSource.Glossary.value,
"state": State.Suggested.value
PatchValue.STATE: State.Suggested.value
if is_suggested
else State.Confirmed.value,
"tagFQN": tag_fqn,
PatchValue.TAG_FQN: tag_fqn,
},
}
]
),
)
elif operation == REMOVE:
elif operation == PatchOperation.REMOVE:
res = self.client.patch(
path=f"{self.get_suffix(Table)}/{model_str(entity_id)}",
data=json.dumps(
[
{
OPERATION: REMOVE,
PATH: COL_TAG.format(
PatchField.OPERATION: PatchOperation.REMOVE,
PatchField.PATH: PatchPath.COLUMNS_TAGS.format(
index=col_index, tag_index=tag_index
),
}
@ -453,11 +452,11 @@ class OMetaPatchMixin(Generic[T]):
return None
data: Dict = {
PATH: OWNER_PATH,
PatchField.PATH: PatchPath.OWNER,
}
if owner is None:
data[OPERATION] = REMOVE
data[PatchField.OPERATION] = PatchOperation.REMOVE
else:
if owner.type not in OWNER_TYPES:
valid_owner_types: str = ", ".join(f'"{o}"' for o in OWNER_TYPES)
@ -467,10 +466,12 @@ class OMetaPatchMixin(Generic[T]):
)
return None
data[OPERATION] = ADD if instance.owner is None else REPLACE
data[VALUE] = {
VALUE_ID: model_str(owner.id),
VALUE_TYPE: owner.type,
data[PatchField.OPERATION] = (
PatchOperation.ADD if instance.owner is None else PatchOperation.REPLACE
)
data[PatchField.VALUE] = {
PatchValue.ID: model_str(owner.id),
PatchValue.TYPE: owner.type,
}
try:

View File

@ -0,0 +1,445 @@
# Copyright 2023 Schlameel
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Mixin class containing Role and Policy specific methods
To be used by OpenMetadata class
"""
import json
import traceback
from typing import Dict, Generic, List, Optional, Type, TypeVar, Union
from pydantic import BaseModel
from metadata.generated.schema.entity.policies.accessControl.rule import Rule
from metadata.generated.schema.entity.policies.policy import Policy
from metadata.generated.schema.entity.teams.role import Role
from metadata.generated.schema.type import basic
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.patch import (
PatchField,
PatchOperation,
PatchPath,
PatchValue,
)
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
T = TypeVar("T", bound=BaseModel)
class OMetaRolePolicyMixin(Generic[T]):
"""
OpenMetadata API methods related to Roles and Policies.
To be inherited by OpenMetadata
"""
client: REST
def _fetch_entity_if_exists(
self, entity: Type[T], entity_id: Union[str, basic.Uuid]
) -> Optional[T]:
"""
Validates if we can update a description or not. Will return
the instance if it can be updated. None otherwise.
Args
entity (T): Entity Type
entity_id: ID
description: new description to add
force: if True, we will patch any existing description. Otherwise, we will maintain
the existing data.
Returns
instance to update
"""
instance = self.get_by_id(entity=entity, entity_id=entity_id, fields=["*"])
if not instance:
logger.warning(
f"Cannot find an instance of '{entity.__class__.__name__}' with id [{str(entity_id)}]."
)
return None
return instance
@staticmethod
def _get_rule_merge_patches(
previous: List,
current: List,
rule_index: int,
path: str,
is_enum: bool,
) -> List[Dict]:
"""
Get the operations required to overwrite the set (resources or operations) of a rule.
Args
previous: the previous set to be overwritten by current
current: the current set to overwrite previous
rule_index: the index of the rule on which we are being operated
path: the formattable string that names the path
is_enum: is the set enums or not
Returns
List of patch operations
"""
data: List[Dict] = []
for index in range(len(previous) - 1, len(current) - 1, -1):
data.append(
{
PatchField.OPERATION: PatchOperation.REMOVE,
PatchField.PATH: path.format(
rule_index=rule_index - 1, index=index
),
}
)
index: int = 0
for item in current:
data.append(
{
PatchField.OPERATION: PatchOperation.REPLACE
if index < len(previous)
else PatchOperation.ADD,
PatchField.PATH: path.format(
rule_index=rule_index - 1, index=index
),
PatchField.VALUE: item.name if is_enum else item,
}
)
index += 1
return data
@staticmethod
def _get_optional_rule_patch(
previous: Union[basic.FullyQualifiedEntityName, basic.Markdown],
current: Union[basic.FullyQualifiedEntityName, basic.Markdown],
rule_index: int,
path: str,
) -> List[Dict]:
"""
Get the operations required to update an optional rule field
Args
previous: the field from the previous rule
current: the field from the current rule
rule_index: the index of the previous rule
path: path string for the filed
Returns
list with one dict describing the operation to update the field
"""
data: List[Dict] = []
if current is None:
if previous is not None:
data = [
{
PatchField.OPERATION: PatchOperation.REMOVE,
PatchField.PATH: path.format(rule_index=rule_index),
}
]
else:
data = [
{
PatchField.OPERATION: PatchOperation.ADD
if previous is None
else PatchOperation.REPLACE,
PatchField.PATH: path.format(rule_index=rule_index),
PatchField.VALUE: str(current.__root__),
}
]
return data
def patch_role_policy(
self,
entity_id: Union[str, basic.Uuid],
policy_id: Union[str, basic.Uuid],
operation: Union[
PatchOperation.ADD, PatchOperation.REMOVE
] = PatchOperation.ADD,
) -> Optional[Role]:
"""
Given a Role ID, JSON PATCH the policies.
Args
entity_id: ID of the role to be patched
policy_id: ID of the policy to be added or removed
operation: Operation to be performed. Either 'add' or 'remove'
Returns
Updated Entity
"""
instance: Role = self._fetch_entity_if_exists(entity=Role, entity_id=entity_id)
if not instance:
return None
policy_index: int = len(instance.policies.__root__) - 1
data: List
if operation is PatchOperation.REMOVE:
if len(instance.policies.__root__) == 1:
logger.error(
f"The Role with id [{model_str(entity_id)}] has only one (1)"
f" policy. Unable to remove."
)
return None
data = [
{
PatchField.OPERATION: PatchOperation.REMOVE,
PatchField.PATH: PatchPath.POLICIES.format(index=policy_index),
}
]
index: int = 0
is_policy_found: bool = False
for policy in instance.policies.__root__:
if model_str(policy.id) == model_str(policy_id):
is_policy_found = True
continue
data.append(
{
PatchField.OPERATION: PatchOperation.REPLACE,
PatchField.PATH: PatchPath.POLICIES_DESCRIPTION.format(
index=index
),
PatchField.VALUE: model_str(policy.description.__root__),
}
)
data.append(
{
PatchField.OPERATION: PatchOperation.REPLACE
if policy.displayName
else PatchOperation.ADD,
PatchField.PATH: PatchPath.POLICIES_DISPLAY_NAME.format(
index=index
),
PatchField.VALUE: model_str(
policy.displayName if policy.displayName else policy.name
),
}
)
data.append(
{
PatchField.OPERATION: PatchOperation.REPLACE,
PatchField.PATH: PatchPath.POLICIES_FQN.format(index=index),
PatchField.VALUE: model_str(policy.fullyQualifiedName),
}
)
data.append(
{
PatchField.OPERATION: PatchOperation.REPLACE,
PatchField.PATH: PatchPath.POLICIES_HREF.format(index=index),
PatchField.VALUE: model_str(policy.href),
}
)
data.append(
{
PatchField.OPERATION: PatchOperation.REPLACE,
PatchField.PATH: PatchPath.POLICIES_ID.format(index=index),
PatchField.VALUE: model_str(policy.id),
}
)
data.append(
{
PatchField.OPERATION: PatchOperation.REPLACE,
PatchField.PATH: PatchPath.POLICIES_NAME.format(index=index),
PatchField.VALUE: model_str(policy.name),
}
)
index += 1
if not is_policy_found:
logger.error(
f"Policy [{model_str(policy_id)}] not found for Role [{model_str(entity_id)}]."
" No policies removed."
)
return None
else:
data = [
{
PatchField.OPERATION: operation,
PatchField.PATH: PatchPath.POLICIES.format(index=policy_index),
PatchField.VALUE: {
PatchValue.ID: model_str(policy_id),
PatchValue.TYPE: PatchValue.POLICY,
},
}
]
try:
res = self.client.patch(
path=PatchPath.ROLES.format(role_id=model_str(entity_id)),
data=json.dumps(data),
)
return Role(**res)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error trying to PATCH policies for Role [{model_str(entity_id)}]: {exc}"
)
return None
def patch_policy_rule(
self,
entity_id: Union[str, basic.Uuid],
rule: Optional[Rule] = None,
operation: Union[
PatchOperation.ADD, PatchOperation.REMOVE
] = PatchOperation.ADD,
) -> Optional[Policy]:
"""
Given a Policy ID, JSON PATCH the rule (add or remove).
Args
entity_id: ID of the role to be patched
rule: The rule to add or remove
operation: The operation to perform, either "add" or "remove"
Returns
Updated Entity
"""
instance: Policy = self._fetch_entity_if_exists(
entity=Policy, entity_id=entity_id
)
if not instance:
return None
rule_index: int = len(instance.rules.__root__) - 1
data: List[Dict]
if operation == PatchOperation.ADD:
data = [
{
PatchField.OPERATION: PatchOperation.ADD,
PatchField.PATH: PatchPath.RULES.format(rule_index=rule_index + 1),
PatchField.VALUE: {
PatchValue.NAME: rule.name,
PatchValue.CONDITION: rule.condition.__root__,
PatchValue.EFFECT: rule.effect.name,
PatchValue.OPERATIONS: [
operation.name for operation in rule.operations
],
PatchValue.RESOURCES: list(rule.resources),
},
}
]
if rule.description is not None:
data[0][PatchField.VALUE][PatchValue.DESCRIPTION] = str(
rule.description.__root__
)
if rule.fullyQualifiedName is not None:
data[0][PatchField.VALUE][PatchValue.FQN] = str(
rule.fullyQualifiedName.__root__
)
else:
if rule_index == 0:
logger.error(f"Unable to remove only rule from Policy [{entity_id}].")
return None
data = [
{
PatchField.OPERATION: PatchOperation.REMOVE,
PatchField.PATH: PatchPath.RULES.format(rule_index=rule_index),
}
]
for rule_index in range(len(instance.rules.__root__) - 1, -1, -1):
current_rule: Rule = instance.rules.__root__[rule_index]
if current_rule.name == rule.name:
break
if rule_index == 0:
logger.error(
f"Rule [{rule.name}] not found in Policy [{entity_id}]. Unable to remove rule."
)
return None
previous_rule: Rule = instance.rules.__root__[rule_index - 1]
# Condition
data.append(
{
PatchField.OPERATION: PatchOperation.REPLACE,
PatchField.PATH: PatchPath.RULES_CONDITION.format(
rule_index=rule_index - 1
),
PatchField.VALUE: current_rule.condition.__root__,
}
)
# Description - Optional
data += OMetaRolePolicyMixin._get_optional_rule_patch(
previous=previous_rule.description,
current=current_rule.description,
rule_index=rule_index - 1,
path=PatchPath.RULES_DESCRIPTION,
)
# Effect
data.append(
{
PatchField.OPERATION: PatchOperation.REPLACE,
PatchField.PATH: PatchPath.RULES_EFFECT.format(
rule_index=rule_index - 1
),
PatchField.VALUE: current_rule.effect.name,
}
)
# Fully qualified name - Optional
data += OMetaRolePolicyMixin._get_optional_rule_patch(
previous=previous_rule.fullyQualifiedName,
current=current_rule.fullyQualifiedName,
rule_index=rule_index - 1,
path=PatchPath.RULES_FQN,
)
# Name
data.append(
{
PatchField.OPERATION: PatchOperation.REPLACE,
PatchField.PATH: PatchPath.RULES_NAME.format(
rule_index=rule_index - 1
),
PatchField.VALUE: current_rule.name,
}
)
# Operations
data += OMetaRolePolicyMixin._get_rule_merge_patches(
previous=previous_rule.operations,
current=current_rule.operations,
rule_index=rule_index,
path=PatchPath.RULES_OPERATIONS,
is_enum=True,
)
# Resources
data += OMetaRolePolicyMixin._get_rule_merge_patches(
previous=previous_rule.resources,
current=current_rule.resources,
rule_index=rule_index,
path=PatchPath.RULES_RESOURCES,
is_enum=False,
)
try:
res = self.client.patch(
path=PatchPath.POLICIES.format(index=model_str(entity_id)),
data=json.dumps(data),
)
return Policy(**res)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error trying to PATCH description for Role [{model_str(entity_id)}]: {exc}"
)
return None

View File

@ -92,6 +92,7 @@ from metadata.ingestion.ometa.mixins.mlmodel_mixin import OMetaMlModelMixin
from metadata.ingestion.ometa.mixins.patch_mixin import OMetaPatchMixin
from metadata.ingestion.ometa.mixins.pipeline_mixin import OMetaPipelineMixin
from metadata.ingestion.ometa.mixins.query_mixin import OMetaQueryMixin
from metadata.ingestion.ometa.mixins.role_policy_mixin import OMetaRolePolicyMixin
from metadata.ingestion.ometa.mixins.server_mixin import OMetaServerMixin
from metadata.ingestion.ometa.mixins.service_mixin import OMetaServiceMixin
from metadata.ingestion.ometa.mixins.table_mixin import OMetaTableMixin
@ -167,6 +168,7 @@ class OpenMetadata(
OMetaIngestionPipelineMixin,
OMetaUserMixin,
OMetaQueryMixin,
OMetaRolePolicyMixin,
Generic[T, C],
):
"""

View File

@ -0,0 +1,89 @@
# Copyright 2021 Schlameel
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper definitions for JSON PATCH field names and values
"""
from enum import Enum
class PatchField(str, Enum):
"""
JSON PATCH field names
"""
OPERATION = "op"
PATH = "path"
VALUE = "value"
class PatchValue(str, Enum):
"""
JSON PATCH value field names
"""
ID = "id"
COLUMNS = "columns"
CONDITION = "condition"
CONSTRAINT_TYPE = "constraintType"
DESCRIPTION = "description"
EFFECT = "effect"
FQN = "fullyQualifiedName"
LABEL_TYPE = "labelType"
NAME = "name"
OPERATIONS = "operations"
POLICY = "policy"
REFERRED_COLUMNS = "referredColumns"
RESOURCES = "resources"
SOURCE = "source"
STATE = "state"
TAG_FQN = "tagFQN"
TYPE = "type"
class PatchPath(str, Enum):
"""
JSON PATCH path strings
"""
COLUMNS_DESCRIPTION = "/columns/{index}/description"
COLUMNS_TAGS = "/columns/{index}/tags/{tag_index}"
DESCRIPTION = "/description"
POLICIES = "/policies/{index}"
POLICIES_HREF = "/policies/{index}/href"
POLICIES_DESCRIPTION = "/policies/{index}/description"
POLICIES_FQN = "/policies/{index}/fullyQualifiedName"
POLICIES_NAME = "/policies/{index}/name"
POLICIES_ID = "/policies/{index}/id"
POLICIES_DISPLAY_NAME = "/policies/{index}/displayName"
OWNER = "/owner"
ROLES = "/roles/{role_id}"
RULES = "/rules/{rule_index}"
RULES_CONDITION = "/rules/{rule_index}/condition"
RULES_DESCRIPTION = "/rules/{rule_index}/description"
RULES_EFFECT = "/rules/{rule_index}/effect"
RULES_FQN = "/rules/{rule_index}/fullyQualifiedName"
RULES_NAME = "/rules/{rule_index}/name"
RULES_OPERATIONS = "/rules/{rule_index}/operations/{index}"
RULES_RESOURCES = "/rules/{rule_index}/resources/{index}"
TABLE_CONSTRAINTS = "/tableConstraints"
TAGS = "/tags/{tag_index}"
# Operations
class PatchOperation(str, Enum):
"""
JSON PATCH operation strings
"""
ADD = "add"
REPLACE = "replace"
REMOVE = "remove"

View File

@ -0,0 +1,747 @@
# Copyright 2023 Schlameel
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenMetadata high-level API Policy test
"""
import uuid
from copy import deepcopy
from typing import List
from unittest import TestCase
from metadata.generated.schema.api.policies.createPolicy import CreatePolicyRequest
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.policies.accessControl.resourceDescriptor import (
Operation,
)
from metadata.generated.schema.entity.policies.accessControl.rule import Effect, Rule
from metadata.generated.schema.entity.policies.policy import Policy
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.teams.role import Role
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.patch import PatchOperation
from metadata.ingestion.ometa.utils import model_str
# Conditions
CONDITION_IS_OWNER: str = "isOwner()"
CONDITION_IS_NOT_OWNER: str = "!isOwner"
CONDITION_NO_OWNER_IS_OWNER: str = "noOwner() || isOwner()"
# Resources
RESOURCE_BOT: str = "Bot"
RESOURCE_PIPELINE: str = "Pipeline"
RESOURCE_TABLE: str = "Table"
ROLE_FIELDS: List[str] = ["policies", "teams", "users"]
class OMetaRolePolicyTest(TestCase):
"""
Run this integration test with the local API available
Install the ingestion package before running the tests
"""
service_entity_id = None
policy_entity: Policy = None
role_entity: Role = None
create_policy: CreatePolicyRequest = None
create_role: CreateRoleRequest = None
role_policy_1: Policy = None
role_policy_2: Policy = None
rule_1: Rule = None
rule_2: Rule = None
rule_3: Rule = None
server_config = OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
),
)
metadata = OpenMetadata(server_config)
assert metadata.health_check()
@classmethod
def setUpClass(cls) -> None:
"""
Prepare ingredients
"""
cls.rule_1: Rule = Rule(
name="rule-1",
description="Description of rule-1",
resources=[
RESOURCE_TABLE,
],
operations=[
Operation.EditAll,
Operation.ViewAll,
],
effect=Effect.allow,
condition=CONDITION_IS_OWNER,
)
cls.rule_2: Rule = Rule(
name="rule-2",
description="Description of rule-2",
fullyQualifiedName="test-policy-1.rule-2",
resources=[
RESOURCE_BOT,
RESOURCE_PIPELINE,
RESOURCE_TABLE,
],
operations=[
Operation.EditCustomFields,
],
effect=Effect.deny,
condition=CONDITION_NO_OWNER_IS_OWNER,
)
cls.rule_3: Rule = Rule(
name="rule-3",
fullyQualifiedName="test-policy-1.rule-3",
resources=[
RESOURCE_TABLE,
],
operations=[
Operation.EditAll,
Operation.ViewAll,
],
effect=Effect.allow,
condition=CONDITION_IS_OWNER,
)
cls.policy_entity = Policy(
id=uuid.uuid4(),
name="test-policy-1",
fullyQualifiedName="test-policy-1",
description="Description of test policy 1",
rules=[
cls.rule_1,
cls.rule_2,
],
)
cls.create_policy = CreatePolicyRequest(
name="test-policy-1",
description="Description of test policy 1",
rules=[
cls.rule_1,
cls.rule_2,
],
)
cls.role_policy_1 = cls.metadata.create_or_update(
CreatePolicyRequest(
name="test-role-policy-1",
description="Description of test role policy 1",
rules=[
cls.rule_1,
cls.rule_2,
],
)
)
cls.role_policy_2 = cls.metadata.create_or_update(
data=CreatePolicyRequest(
name="test-role-policy-2",
description="Description of test role policy 2",
rules=[
cls.rule_1,
],
)
)
cls.role_entity = Role(
id=uuid.uuid4(),
name="test-role",
fullyQualifiedName="test-role",
policies=[
EntityReference(id=model_str(cls.role_policy_1.id), type="policy"),
],
)
cls.create_role = CreateRoleRequest(
name="test-role",
policies=[
cls.role_policy_1.name,
],
)
@classmethod
def tearDownClass(cls) -> None:
"""
Clean up
"""
policies = cls.metadata.list_entities(entity=Policy)
for policy in policies.entities:
if model_str(policy.name).startswith(model_str(cls.policy_entity.name)):
cls.metadata.delete(entity=Policy, entity_id=model_str(policy.id))
cls.metadata.delete(entity=Policy, entity_id=model_str(cls.role_policy_1.id))
cls.metadata.delete(entity=Policy, entity_id=model_str(cls.role_policy_2.id))
roles = cls.metadata.list_entities(entity=Role)
for role in roles.entities:
if model_str(role.name.__root__).startswith(
model_str(cls.role_entity.name.__root__)
):
cls.metadata.delete(entity=Role, entity_id=model_str(role.id))
def test_policy_create(self):
"""
We can create a Policy and we receive it back as Entity
"""
res: Policy = self.metadata.create_or_update(data=self.create_policy)
self.assertEqual(res.name, self.policy_entity.name)
self.assertEqual(res.rules.__root__[0].name, self.rule_1.name)
def test_policy_update(self):
"""
Updating it properly changes its properties
"""
res_create = self.metadata.create_or_update(data=self.create_policy)
updated = self.create_policy.dict(exclude_unset=True)
updated["rules"] = [self.rule_3]
updated_policy_entity = CreatePolicyRequest(**updated)
res = self.metadata.create_or_update(data=updated_policy_entity)
# Same ID, updated owner
self.assertEqual(res_create.id, res.id)
self.assertEqual(res.rules.__root__[0].name, self.rule_3.name)
def test_policy_get_name(self):
"""
We can fetch a Policy by name and get it back as Entity
"""
self.metadata.create_or_update(data=self.create_policy)
res = self.metadata.get_by_name(
entity=Policy, fqn=model_str(self.policy_entity.fullyQualifiedName)
)
self.assertEqual(res.name, self.policy_entity.name)
def test_policy_get_id(self):
"""
We can fetch a Policy by ID and get it back as Entity
"""
self.metadata.create_or_update(data=self.create_policy)
# First pick up by name
res_name = self.metadata.get_by_name(
entity=Policy, fqn=model_str(self.policy_entity.fullyQualifiedName)
)
# Then fetch by ID
res = self.metadata.get_by_id(entity=Policy, entity_id=model_str(res_name.id))
self.assertEqual(res_name.id, res.id)
def test_policy_list(self):
"""
We can list all our Policies
"""
self.metadata.create_or_update(data=self.create_policy)
res = self.metadata.list_entities(entity=Policy)
# Fetch our test Database. We have already inserted it, so we should find it
data = next(
iter(ent for ent in res.entities if ent.name == self.policy_entity.name),
None,
)
assert data
def test_policy_list_all(self):
"""
Validate generator utility to fetch all Policies
"""
fake_create = deepcopy(self.create_policy)
for i in range(0, 10):
fake_create.name = model_str(self.create_policy.name) + str(i)
self.metadata.create_or_update(data=fake_create)
all_entities = self.metadata.list_all_entities(
entity=Policy, limit=2 # paginate in batches of pairs
)
assert (
len(list(all_entities)) >= 10
) # In case the default testing entity is not present
def test_policy_delete(self):
"""
We can delete a Policy by ID
"""
self.metadata.create_or_update(data=self.create_policy)
# Find by name
res_name = self.metadata.get_by_name(
entity=Policy, fqn=model_str(self.policy_entity.fullyQualifiedName)
)
# Then fetch by ID
res_id = self.metadata.get_by_id(entity=Policy, entity_id=res_name.id)
# Delete
self.metadata.delete(entity=Policy, entity_id=model_str(res_id.id))
# Then we should not find it
res = self.metadata.list_entities(entity=Policy)
assert not next(
iter(
ent
for ent in res.entities
if ent.fullyQualifiedName == self.policy_entity.fullyQualifiedName
),
None,
)
def test_policy_list_versions(self):
"""
test list policy entity versions
"""
self.metadata.create_or_update(data=self.create_policy)
# Find by name
res_name = self.metadata.get_by_name(
entity=Policy, fqn=model_str(self.policy_entity.fullyQualifiedName)
)
res = self.metadata.get_list_entity_versions(
entity=Policy, entity_id=model_str(res_name.id)
)
assert res
def test_policy_get_entity_version(self):
"""
test get policy entity version
"""
self.metadata.create_or_update(data=self.create_policy)
# Find by name
res_name = self.metadata.get_by_name(
entity=Policy, fqn=model_str(self.policy_entity.fullyQualifiedName)
)
res = self.metadata.get_entity_version(
entity=Policy, entity_id=model_str(res_name.id), version=0.1
)
# check we get the correct version requested and the correct entity ID
assert res.version.__root__ == 0.1
assert res.id == res_name.id
def test_policy_get_entity_ref(self):
"""
test get EntityReference
"""
res = self.metadata.create_or_update(data=self.create_policy)
entity_ref = self.metadata.get_entity_reference(
entity=Policy, fqn=res.fullyQualifiedName
)
assert res.id == entity_ref.id
def test_policy_patch_rule(self):
"""
test PATCHing the rules of a policy
"""
policy: Policy = self.metadata.create_or_update(self.create_policy)
# Add rule
res: Policy = self.metadata.patch_policy_rule(
entity_id=policy.id,
rule=self.rule_3,
operation=PatchOperation.ADD,
)
self.assertIsNotNone(res)
self.assertEqual(len(res.rules.__root__), 3)
self.assertEqual(res.rules.__root__[2].name, self.rule_3.name)
# Remove last rule
res = self.metadata.patch_policy_rule(
entity_id=policy.id,
rule=self.rule_3,
operation=PatchOperation.REMOVE,
)
self.assertIsNotNone(res)
self.assertEqual(len(res.rules.__root__), 2)
self.assertEqual(res.rules.__root__[1].name, self.rule_2.name)
# Remove rule with fewer operations
self.metadata.patch_policy_rule(
entity_id=policy.id,
rule=self.rule_3,
operation=PatchOperation.ADD,
)
res = self.metadata.patch_policy_rule(
entity_id=policy.id,
rule=self.rule_2,
operation=PatchOperation.REMOVE,
)
self.assertIsNotNone(res)
self.assertEqual(len(res.rules.__root__), 2)
self.assertEqual(res.rules.__root__[1].name, self.rule_3.name)
self.assertEqual(
len(res.rules.__root__[1].operations), len(self.rule_3.operations)
)
self.assertIsNone(res.rules.__root__[1].description)
# Remove rule with more operations
policy = self.metadata.create_or_update(self.create_policy)
res = self.metadata.patch_policy_rule(
entity_id=policy.id,
rule=self.rule_1,
operation=PatchOperation.REMOVE,
)
self.assertIsNotNone(res)
self.assertEqual(len(res.rules.__root__), 1)
self.assertEqual(res.rules.__root__[0].name, self.rule_2.name)
self.assertEqual(
len(res.rules.__root__[0].operations), len(self.rule_2.operations)
)
self.assertEqual(
res.rules.__root__[0].fullyQualifiedName, self.rule_2.fullyQualifiedName
)
# Try to remove the only rule - Fails
res = self.metadata.patch_policy_rule(
entity_id=policy.id,
rule=self.rule_2,
operation=PatchOperation.REMOVE,
)
self.assertIsNone(res)
# Try to remove a nonexistent rule - Fails
policy = self.metadata.create_or_update(self.create_policy)
res = self.metadata.patch_policy_rule(
entity_id=policy.id,
rule=self.rule_3,
operation=PatchOperation.REMOVE,
)
self.assertIsNone(res)
# Try to patch a nonexistent policy - Fails
res = self.metadata.patch_policy_rule(
entity_id=str(uuid.uuid4()),
rule=self.rule_3,
operation=PatchOperation.ADD,
)
def test_role_create(self):
"""
We can create a Role and we receive it back as Entity
"""
res = self.metadata.create_or_update(data=self.create_role)
self.assertEqual(res.name, self.role_entity.name)
self.assertEqual(
res.policies.__root__[0].name, model_str(self.role_policy_1.name)
)
def test_role_update(self):
"""
Updating it properly changes its properties
"""
res_create = self.metadata.create_or_update(data=self.create_role)
updated = self.create_role.dict(exclude_unset=True)
updated["policies"] = [self.role_policy_2.name]
updated_entity = CreateRoleRequest(**updated)
res = self.metadata.create_or_update(data=updated_entity)
# Same ID, updated owner
self.assertEqual(res_create.id, res.id)
self.assertEqual(
res.policies.__root__[0].name, model_str(self.role_policy_2.name)
)
def test_role_get_name(self):
"""
We can fetch a Role by name and get it back as Entity
"""
self.metadata.create_or_update(data=self.create_role)
res = self.metadata.get_by_name(
entity=Role, fqn=self.role_entity.fullyQualifiedName
)
self.assertEqual(res.name, self.role_entity.name)
def test_role_get_id(self):
"""
We can fetch a Role by ID and get it back as Entity
"""
self.metadata.create_or_update(data=self.create_role)
# First pick up by name
res_name = self.metadata.get_by_name(
entity=Role, fqn=self.role_entity.fullyQualifiedName
)
# Then fetch by ID
res = self.metadata.get_by_id(entity=Role, entity_id=model_str(res_name.id))
self.assertEqual(res_name.id, res.id)
def test_role_list(self):
"""
We can list all our Roles
"""
self.metadata.create_or_update(data=self.create_role)
res = self.metadata.list_entities(entity=Role)
# Fetch our test Database. We have already inserted it, so we should find it
data = next(
iter(ent for ent in res.entities if ent.name == self.role_entity.name), None
)
assert data
def test_role_list_all(self):
"""
Validate generator utility to fetch all roles
"""
fake_create = deepcopy(self.create_role)
for i in range(0, 10):
fake_create.name = f"{model_str(self.create_role.name.__root__)}-{str(i)}"
self.metadata.create_or_update(data=fake_create)
all_entities = self.metadata.list_all_entities(
entity=Role, limit=2 # paginate in batches of pairs
)
assert (
len(list(all_entities)) >= 10
) # In case the default testing entity is not present
def test_role_delete(self):
"""
We can delete a Role by ID
"""
self.metadata.create_or_update(data=self.create_role)
# Find by name
res_name = self.metadata.get_by_name(
entity=Role, fqn=self.role_entity.fullyQualifiedName
)
# Then fetch by ID
res_id = self.metadata.get_by_id(entity=Role, entity_id=res_name.id)
# Delete
self.metadata.delete(entity=Role, entity_id=str(res_id.id.__root__))
# Then we should not find it
res = self.metadata.list_entities(entity=Role)
assert not next(
iter(
ent
for ent in res.entities
if ent.fullyQualifiedName == self.role_entity.fullyQualifiedName
),
None,
)
def test_role_list_versions(self):
"""
test list role entity versions
"""
self.metadata.create_or_update(data=self.create_role)
# Find by name
res_name = self.metadata.get_by_name(
entity=Role, fqn=self.role_entity.fullyQualifiedName
)
res = self.metadata.get_list_entity_versions(
entity=Role, entity_id=model_str(res_name.id)
)
assert res
def test_role_get_entity_version(self):
"""
test get role entity version
"""
self.metadata.create_or_update(data=self.create_role)
# Find by name
res_name = self.metadata.get_by_name(
entity=Role, fqn=self.role_entity.fullyQualifiedName
)
res = self.metadata.get_entity_version(
entity=Role, entity_id=res_name.id.__root__, version=0.1
)
# check we get the correct version requested and the correct entity ID
assert res.version.__root__ == 0.1
assert res.id == res_name.id
def test_role_get_entity_ref(self):
"""
test get EntityReference
"""
res = self.metadata.create_or_update(data=self.create_role)
entity_ref = self.metadata.get_entity_reference(
entity=Role, fqn=res.fullyQualifiedName
)
assert res.id == entity_ref.id
def test_role_add_user(self):
"""
test adding a role to a user
"""
role: Role = self.metadata.create_or_update(data=self.create_role)
user: User = self.metadata.create_or_update(
data=CreateUserRequest(
name="test-role-user",
email="test-role@user.com",
roles=[role.id],
),
)
res: Role = self.metadata.get_by_name(
entity=Role,
fqn=self.role_entity.fullyQualifiedName,
fields=ROLE_FIELDS,
)
assert res.users.__root__[0].id == user.id
self.metadata.delete(entity=User, entity_id=user.id)
def test_role_add_team(self):
"""
Test adding a role to a team
"""
role: Role = self.metadata.create_or_update(data=self.create_role)
user: User = self.metadata.create_or_update(
data=CreateUserRequest(
name="test-role-user",
email="test-role@user.com",
),
)
team: Team = self.metadata.create_or_update(
data=CreateTeamRequest(
name="test-role-team-1",
teamType="Group",
users=[user.id],
defaultRoles=[role.id],
)
)
res: Role = self.metadata.get_by_name(
entity=Role,
fqn=self.role_entity.fullyQualifiedName,
fields=ROLE_FIELDS,
)
assert res.teams.__root__[0].id == team.id
self.metadata.delete(entity=Team, entity_id=team.id)
self.metadata.delete(entity=User, entity_id=user.id)
def test_role_patch_policies(self):
"""
test PATCHing the policies of a role
"""
# Add policy to role
role: Role = self.metadata.create_or_update(data=self.create_role)
res: Role = self.metadata.patch_role_policy(
entity_id=role.id,
policy_id=self.role_policy_2.id,
)
assert res
assert res.id == role.id
assert len(res.policies.__root__) == 2
assert res.policies.__root__[1].id == self.role_policy_2.id
# Remove last policy from role
res = self.metadata.patch_role_policy(
entity_id=role.id,
policy_id=self.role_policy_2.id,
operation=PatchOperation.REMOVE,
)
assert res
assert res.id == role.id
assert len(res.policies.__root__) == 1
assert res.policies.__root__[0].id == self.role_policy_1.id
# Remove first policy from role
res: Role = self.metadata.patch_role_policy(
entity_id=role.id,
policy_id=self.role_policy_2.id,
operation=PatchOperation.ADD,
)
res = self.metadata.patch_role_policy(
entity_id=role.id,
policy_id=self.role_policy_1.id,
operation=PatchOperation.REMOVE,
)
assert res
assert res.id == role.id
assert len(res.policies.__root__) == 1
assert res.policies.__root__[0].id == self.role_policy_2.id
# Try to remove the only policy - Fail
res = self.metadata.patch_role_policy(
entity_id=role.id,
policy_id=self.role_policy_2.id,
operation=PatchOperation.REMOVE,
)
self.assertEqual(res, None)
# Nonexistent role ID - Fail
res = self.metadata.patch_role_policy(
entity_id=str(uuid.uuid4()),
policy_id=self.role_policy_1.id,
operation=PatchOperation.ADD,
)
self.assertEqual(res, None)
# Attempt to remove nonexistent policy - Fail
res: Role = self.metadata.patch_role_policy(
entity_id=role.id,
policy_id=self.role_policy_1.id,
operation=PatchOperation.ADD,
)
res = self.metadata.patch_role_policy(
entity_id=role.id,
policy_id=str(uuid.uuid4()),
operation=PatchOperation.REMOVE,
)
self.assertEqual(res, None)