GEN-1234 - Clean up suggestions when a user is deleted (#17988)

* GEN-1234 - Clean up suggestions when a user is deleted

* add method

* add method

* fix postgres query
This commit is contained in:
Pere Miquel Brull 2024-09-26 16:22:36 +02:00
parent 3dbfe73aa9
commit bba91b9569
9 changed files with 245 additions and 10 deletions

View File

@ -11,12 +11,14 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
OM_JWT = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
def int_admin_ometa(url: str = "http://localhost:8585/api") -> OpenMetadata:
def int_admin_ometa(
url: str = "http://localhost:8585/api", jwt: str = OM_JWT
) -> OpenMetadata:
"""Initialize the ometa connection with default admin:admin creds"""
server_config = OpenMetadataConnection(
hostPort=url,
authProvider=AuthProvider.openmetadata,
securityConfig=OpenMetadataJWTClientConfig(jwtToken=CustomSecretStr(OM_JWT)),
securityConfig=OpenMetadataJWTClientConfig(jwtToken=CustomSecretStr(jwt)),
)
metadata = OpenMetadata(server_config)
assert metadata.health_check()

View File

@ -13,8 +13,13 @@ Mixin class containing Suggestions specific methods
To be used by OpenMetadata class
"""
from metadata.generated.schema.entity.feed.suggestion import Suggestion
from typing import Union
from metadata.generated.schema.entity.feed.suggestion import Suggestion, SuggestionType
from metadata.generated.schema.type import basic
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
@ -30,12 +35,50 @@ class OMetaSuggestionsMixin:
client: REST
def update_suggestion(self, suggestion: Suggestion) -> Suggestion:
"""
Update an existing Suggestion with new fields
"""
"""Update an existing Suggestion with new fields"""
resp = self.client.put(
f"{self.get_suffix(Suggestion)}/{str(suggestion.root.id.root)}",
data=suggestion.model_dump_json(),
)
return Suggestion(**resp)
def accept_suggestion(self, suggestion_id: Union[str, basic.Uuid]) -> None:
"""Accept a given suggestion"""
self.client.put(
f"{self.get_suffix(Suggestion)}/{model_str(suggestion_id)}/accept",
)
def reject_suggestion(self, suggestion_id: Union[str, basic.Uuid]) -> None:
"""Reject a given suggestion"""
self.client.put(
f"{self.get_suffix(Suggestion)}/{model_str(suggestion_id)}/reject",
)
def accept_all_suggestions(
self,
fqn: Union[str, FullyQualifiedEntityName],
user_id: Union[str, basic.Uuid],
suggestion_type: SuggestionType = SuggestionType.SuggestDescription,
) -> None:
"""Accept all suggestions"""
self.client.put(
f"{self.get_suffix(Suggestion)}/accept-all?"
f"userId={model_str(user_id)}&"
f"entityFQN={model_str(fqn)}&"
f"suggestionType={suggestion_type.value}",
)
def reject_all_suggestions(
self,
fqn: Union[str, FullyQualifiedEntityName],
user_id: Union[str, basic.Uuid],
suggestion_type: SuggestionType = SuggestionType.SuggestDescription,
) -> None:
"""Accept all suggestions"""
self.client.put(
f"{self.get_suffix(Suggestion)}/reject-all?"
f"userId={model_str(user_id)}&"
f"entityFQN={model_str(fqn)}&"
f"suggestionType={suggestion_type.value}",
)

View File

@ -19,6 +19,7 @@ from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, Union
from pydantic import BaseModel
from metadata.generated.schema.api.createBot import CreateBot
from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import (
CreateIngestionPipelineRequest,
)
@ -172,13 +173,16 @@ class OpenMetadata(
return route
def get_module_path(self, entity: Type[T]) -> str:
def get_module_path(self, entity: Type[T]) -> Optional[str]:
"""
Based on the entity, return the module path
it is found inside generated
"""
if issubclass(entity, CreateIngestionPipelineRequest):
return "services.ingestionPipelines"
if issubclass(entity, CreateBot):
# Bots schemas don't live inside any subdirectory
return None
return entity.__module__.split(".")[-2]
def get_create_entity_type(self, entity: Type[T]) -> Type[C]:

View File

@ -21,6 +21,7 @@ from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.api.createBot import CreateBot
from metadata.generated.schema.api.data.createAPICollection import (
CreateAPICollectionRequest,
)
@ -211,7 +212,8 @@ ROUTES = {
User.__name__: "/users",
CreateUserRequest.__name__: "/users",
AuthenticationMechanism.__name__: "/users/auth-mechanism",
Bot.__name__: "/bots", # We won't allow bot creation from the client
Bot.__name__: "/bots",
CreateBot.__name__: "/bots",
# Roles
Role.__name__: "/roles",
CreateRoleRequest.__name__: "/roles",

View File

@ -14,14 +14,20 @@ OpenMetadata high-level API Suggestion test
"""
from unittest import TestCase
import pytest
from _openmetadata_testutils.ometa import int_admin_ometa
from metadata.generated.schema.api.createBot import CreateBot
from metadata.generated.schema.api.feed.createSuggestion import CreateSuggestionRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.auth.jwtAuth import JWTAuthMechanism, JWTTokenExpiry
from metadata.generated.schema.entity.bot import Bot
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.feed.suggestion import Suggestion, SuggestionType
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.entity.teams.user import AuthenticationMechanism, User
from metadata.generated.schema.type.basic import EntityLink
from metadata.generated.schema.type.tagLabel import (
LabelType,
@ -30,11 +36,40 @@ from metadata.generated.schema.type.tagLabel import (
TagLabel,
TagSource,
)
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.clickhouse.utils import Tuple
from metadata.utils.entity_link import get_entity_link
from ..integration_base import generate_name, get_create_entity, get_create_service
def _create_bot(metadata: OpenMetadata) -> Tuple[User, Bot]:
"""Create a bot"""
bot_name = generate_name()
user: User = metadata.create_or_update(
data=CreateUserRequest(
name=bot_name,
email=f"{bot_name.root}@user.com",
isBot=True,
authenticationMechanism=AuthenticationMechanism(
authType="JWT",
config=JWTAuthMechanism(
JWTTokenExpiry=JWTTokenExpiry.Unlimited,
),
),
)
)
bot: Bot = metadata.create_or_update(
data=CreateBot(
name=bot_name,
botUser=bot_name.root,
)
)
return user, bot
class OMetaSuggestionTest(TestCase):
"""
Run this integration test with the local API available
@ -109,6 +144,138 @@ class OMetaSuggestionTest(TestCase):
# Suggestions only support POST (not PUT)
self.metadata.create(suggestion_request)
def test_accept_reject_suggestion(self):
"""We can create and accept a suggestion"""
suggestion_request = CreateSuggestionRequest(
description="i won't be accepted",
type=SuggestionType.SuggestDescription,
entityLink=EntityLink(
root=get_entity_link(Table, fqn=self.table.fullyQualifiedName.root)
),
)
self.metadata.patch_description(
entity=Table,
source=self.metadata.get_by_name(
entity=Table, fqn=self.table.fullyQualifiedName.root
),
description="I come from a patch",
)
# Suggestions only support POST (not PUT)
suggestion = self.metadata.create(suggestion_request)
# We can reject a suggestion
self.metadata.reject_suggestion(suggestion.root.id)
updated_table: Table = self.metadata.get_by_name(
entity=Table, fqn=self.table.fullyQualifiedName.root
)
assert updated_table.description.root == "I come from a patch"
# We create a new suggestion and accept it this time
suggestion_request = CreateSuggestionRequest(
description="something new",
type=SuggestionType.SuggestDescription,
entityLink=EntityLink(
root=get_entity_link(Table, fqn=self.table.fullyQualifiedName.root)
),
)
# Suggestions only support POST (not PUT)
suggestion = self.metadata.create(suggestion_request)
# We can accept a suggestion
self.metadata.accept_suggestion(suggestion.root.id)
updated_table: Table = self.metadata.get_by_name(
entity=Table, fqn=self.table.fullyQualifiedName.root
)
assert updated_table.description.root == "something new"
def test_accept_suggest_delete_user(self):
"""We can accept the suggestion of a deleted user"""
user, bot = _create_bot(self.metadata)
bot_metadata = int_admin_ometa(
jwt=user.authenticationMechanism.config.JWTToken.get_secret_value()
)
# We create a new suggestion and accept it this time
suggestion_request = CreateSuggestionRequest(
description="something new",
type=SuggestionType.SuggestDescription,
entityLink=EntityLink(
root=get_entity_link(Table, fqn=self.table.fullyQualifiedName.root)
),
)
# Suggestions only support POST (not PUT)
suggestion = bot_metadata.create(suggestion_request)
assert suggestion
# Delete the bot
self.metadata.delete(
entity=Bot,
entity_id=bot.id,
recursive=True,
hard_delete=True,
)
# We won't find the suggestion
with pytest.raises(APIError) as exc:
self.metadata.accept_suggestion(suggestion.root.id)
assert (
str(exc.value)
== f"Suggestion instance for {suggestion.root.id.root} not found"
)
def test_accept_all_delete_user(self):
"""We can accept all suggestions of a deleted user"""
user, bot = _create_bot(self.metadata)
bot_metadata = int_admin_ometa(
jwt=user.authenticationMechanism.config.JWTToken.get_secret_value()
)
self.metadata.patch_description(
entity=Table,
source=self.metadata.get_by_name(
entity=Table, fqn=self.table.fullyQualifiedName.root
),
description="I come from a patch",
)
# We create a new suggestion and accept it this time
suggestion_request = CreateSuggestionRequest(
description="something new from test_accept_all_delete_user",
type=SuggestionType.SuggestDescription,
entityLink=EntityLink(
root=get_entity_link(Table, fqn=self.table.fullyQualifiedName.root)
),
)
# Suggestions only support POST (not PUT)
suggestion = bot_metadata.create(suggestion_request)
assert suggestion
# Delete the bot
self.metadata.delete(
entity=Bot,
entity_id=bot.id,
recursive=True,
hard_delete=True,
)
# This will do nothing, since there's no suggestions there
self.metadata.accept_all_suggestions(
fqn=self.table.fullyQualifiedName.root,
user_id=user.id,
suggestion_type=SuggestionType.SuggestDescription,
)
updated_table: Table = self.metadata.get_by_name(
entity=Table, fqn=self.table.fullyQualifiedName.root
)
assert updated_table.description.root == "I come from a patch"
def test_create_tag_suggestion(self):
"""We can create a suggestion"""
suggestion_request = CreateSuggestionRequest(

View File

@ -5052,6 +5052,15 @@ public interface CollectionDAO {
@SqlUpdate("DELETE FROM suggestions WHERE fqnHash = :fqnHash")
void deleteByFQN(@BindUUID("fqnHash") String fullyQualifiedName);
@ConnectionAwareSqlUpdate(
value =
"DELETE FROM suggestions suggestions WHERE JSON_EXTRACT(json, '$.createdBy.id') = :createdBy",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value = "DELETE FROM suggestions suggestions WHERE json #>> '{createdBy,id}' = :createdBy",
connectionType = POSTGRES)
void deleteByCreatedBy(@BindUUID("createdBy") UUID id);
@SqlQuery("SELECT json FROM suggestions <condition> ORDER BY updatedAt DESC LIMIT :limit")
List<String> list(@Bind("limit") int limit, @Define("condition") String condition);

View File

@ -394,6 +394,9 @@ public class SuggestionRepository {
List<Suggestion> suggestions = getSuggestionList(jsons);
String beforeCursor = null;
String afterCursor;
if (nullOrEmpty(suggestions)) {
return new ResultList<>(suggestions, null, null, total);
}
if (suggestions.size() > limit) {
suggestions.remove(0);
beforeCursor = suggestions.get(0).getUpdatedAt().toString();
@ -415,6 +418,9 @@ public class SuggestionRepository {
List<Suggestion> suggestions = getSuggestionList(jsons);
String beforeCursor;
String afterCursor = null;
if (nullOrEmpty(suggestions)) {
return new ResultList<>(suggestions, null, null, total);
}
beforeCursor = after == null ? null : suggestions.get(0).getUpdatedAt().toString();
if (suggestions.size() > limit) {
suggestions.remove(limit);

View File

@ -611,6 +611,8 @@ public class UserRepository extends EntityRepository<User> {
if (Boolean.TRUE.equals(entity.getIsBot())) {
BotTokenCache.invalidateToken(entity.getName());
}
// Remove suggestions
daoCollection.suggestionDAO().deleteByCreatedBy(entity.getId());
}
/** Handles entity updated from PUT and POST operation. */

View File

@ -53,5 +53,5 @@
}
},
"additionalProperties": false,
"required": ["JWTToken", "JWTTokenExpiry"]
"required": ["JWTTokenExpiry"]
}