From c8c0fa5e70ac276b27bb64aa47b6d78932df4a15 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Tue, 29 Apr 2025 12:04:48 +0530 Subject: [PATCH] Fixes #19774 (#21013) (cherry picked from commit 6f5a3afac5458ce729aa53dfa9a6a17f346903f4) --- .../subscription/AlertsRuleEvaluator.java | 16 +- .../decorators/MessageDecorator.java | 19 ++- .../service/util/SubscriptionUtil.java | 144 +++++++++--------- .../json/data/EventSubResourceDescriptor.json | 1 + 4 files changed, 92 insertions(+), 88 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java index 8db0110d157..2ff89c20912 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java @@ -6,7 +6,6 @@ import static org.openmetadata.schema.type.Function.ParameterType.ALL_INDEX_ELAS import static org.openmetadata.schema.type.Function.ParameterType.READ_FROM_PARAM_CONTEXT; import static org.openmetadata.schema.type.Function.ParameterType.READ_FROM_PARAM_CONTEXT_PER_ENTITY; import static org.openmetadata.schema.type.Function.ParameterType.SPECIFIC_INDEX_ELASTIC_SEARCH; -import static org.openmetadata.schema.type.ThreadType.Conversation; import static org.openmetadata.service.Entity.INGESTION_PIPELINE; import static org.openmetadata.service.Entity.PIPELINE; import static org.openmetadata.service.Entity.TEAM; @@ -492,6 +491,16 @@ public class AlertsRuleEvaluator { JsonUtils.pojoToJson(event.getEntity()))); } + public static Thread getThreadEntity(ChangeEvent event) { + Thread entity; + if (event.getEntity() instanceof String str) { + entity = JsonUtils.readValue(str, Thread.class); + } else { + entity = JsonUtils.convertValue(event.getEntity(), Thread.class); + } + return entity; + } + @Function( name = "matchConversationUser", input = "List of comma separated user names to matchConversationUser", @@ -514,11 +523,6 @@ public class AlertsRuleEvaluator { Thread thread = getThread(changeEvent); - if (!thread.getType().equals(Conversation)) { - // Only applies to Conversation - return false; - } - List mentions; if (thread.getPostsCount() == 0) { mentions = MessageParser.getEntityLinks(thread.getMessage()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java index 80b1479b251..a522b33f401 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java @@ -17,6 +17,7 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.events.subscription.AlertUtil.convertInputListToString; import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity; import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getThread; +import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getThreadEntity; import static org.openmetadata.service.formatter.entity.IngestionPipelineFormatter.getIngestionPipelineUrl; import static org.openmetadata.service.resources.feeds.MessageParser.replaceEntityLinks; @@ -176,17 +177,15 @@ public interface MessageDecorator { .filter(fqn -> !CommonUtil.nullOrEmpty(fqn)) .orElseGet( () -> { - EntityInterface entityInterface = getEntity(event); - String fqn = entityInterface.getFullyQualifiedName(); - - if (CommonUtil.nullOrEmpty(fqn)) { - EntityInterface result = - Entity.getEntity( - event.getEntityType(), entityInterface.getId(), "id", Include.NON_DELETED); - fqn = result.getFullyQualifiedName(); + if (event.getEntityType().equals(Entity.THREAD)) { + Thread thread = getThreadEntity(event); + return nullOrEmpty(thread.getEntityRef()) + ? thread.getId().toString() + : thread.getEntityRef().getFullyQualifiedName(); + } else { + EntityInterface entityInterface = getEntity(event); + return entityInterface.getFullyQualifiedName(); } - - return fqn; }); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/SubscriptionUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/SubscriptionUtil.java index bea22fb5cd0..734df43cab8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/SubscriptionUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/SubscriptionUtil.java @@ -13,6 +13,7 @@ package org.openmetadata.service.util; +import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.Entity.TEAM; import static org.openmetadata.service.Entity.THREAD; @@ -164,40 +165,49 @@ public class SubscriptionUtil { Set receiversList = new HashSet<>(); Map teams = new HashMap<>(); Map users = new HashMap<>(); + addMentionedUsersToNotifyIfRequired(users, teams, category, thread); + addAssigneesUsersToNotifyIfRequired(users, teams, category, thread); + addThreadOwnerIfRequired(users, category, thread); + // Users + receiversList.addAll(getEmailOrWebhookEndpointForUsers(users.values().stream().toList(), type)); + // Teams + receiversList.addAll(getEmailOrWebhookEndpointForTeams(teams.values().stream().toList(), type)); + return receiversList; + } + private static void addTargetFromEntityLink( + Map users, Map teams, List mentions) { Team tempTeamVar; User tempUserVar; - - if (category.equals(SubscriptionDestination.SubscriptionCategory.ASSIGNEES)) { - List assignees = thread.getTask().getAssignees(); - if (!nullOrEmpty(assignees)) { - for (EntityReference reference : assignees) { - if (Entity.USER.equals(reference.getType())) { - tempUserVar = Entity.getEntity(USER, reference.getId(), "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - } else if (TEAM.equals(reference.getType())) { - tempTeamVar = Entity.getEntity(TEAM, reference.getId(), "profile", Include.NON_DELETED); - teams.put(tempTeamVar.getId(), tempTeamVar); - } - } - } - - for (Post post : thread.getPosts()) { - tempUserVar = Entity.getEntityByName(USER, post.getFrom(), "profile", Include.NON_DELETED); + for (MessageParser.EntityLink link : mentions) { + if (USER.equals(link.getEntityType())) { + tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED); users.put(tempUserVar.getId(), tempUserVar); - List mentions = MessageParser.getEntityLinks(post.getMessage()); - for (MessageParser.EntityLink link : mentions) { - if (USER.equals(link.getEntityType())) { - tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - } else if (TEAM.equals(link.getEntityType())) { - tempTeamVar = Entity.getEntity(link, "profile", Include.NON_DELETED); - teams.put(tempTeamVar.getId(), tempTeamVar); - } - } + } else if (TEAM.equals(link.getEntityType())) { + tempTeamVar = Entity.getEntity(link, "profile", Include.NON_DELETED); + teams.put(tempTeamVar.getId(), tempTeamVar); } } + } + private static void addTargetFromEntityReference( + Map users, Map teams, List references) { + Team tempTeamVar; + User tempUserVar; + for (EntityReference reference : references) { + if (Entity.USER.equals(reference.getType())) { + tempUserVar = Entity.getEntity(USER, reference.getId(), "profile", Include.NON_DELETED); + users.put(tempUserVar.getId(), tempUserVar); + } else if (TEAM.equals(reference.getType())) { + tempTeamVar = Entity.getEntity(TEAM, reference.getId(), "profile", Include.NON_DELETED); + teams.put(tempTeamVar.getId(), tempTeamVar); + } + } + } + + private static void addThreadOwnerIfRequired( + Map users, SubscriptionDestination.SubscriptionCategory category, Thread thread) { + User tempUserVar; if (category.equals(SubscriptionDestination.SubscriptionCategory.OWNERS)) { try { tempUserVar = @@ -207,14 +217,42 @@ public class SubscriptionUtil { LOG.warn("Thread created by unknown user: {}", thread.getCreatedBy()); } } + } - // Users - receiversList.addAll(getEmailOrWebhookEndpointForUsers(users.values().stream().toList(), type)); + private static void addAssigneesUsersToNotifyIfRequired( + Map users, + Map teams, + SubscriptionDestination.SubscriptionCategory category, + Thread thread) { + if (category.equals(SubscriptionDestination.SubscriptionCategory.ASSIGNEES)) { + addTargetFromEntityReference(users, teams, listOrEmpty(thread.getTask().getAssignees())); + addTargetFromEntityLink( + users, teams, listOrEmpty(MessageParser.getEntityLinks(thread.getMessage()))); + addUsersMentionedOnPosts(users, teams, thread.getPosts()); + } + } - // Teams - receiversList.addAll(getEmailOrWebhookEndpointForTeams(teams.values().stream().toList(), type)); + private static void addUsersMentionedOnPosts( + Map users, Map teams, List posts) { + User tempUserVar; + for (Post post : listOrEmpty(posts)) { + tempUserVar = Entity.getEntityByName(USER, post.getFrom(), "profile", Include.NON_DELETED); + users.put(tempUserVar.getId(), tempUserVar); + addTargetFromEntityLink( + users, teams, listOrEmpty(MessageParser.getEntityLinks(post.getMessage()))); + } + } - return receiversList; + private static void addMentionedUsersToNotifyIfRequired( + Map users, + Map teams, + SubscriptionDestination.SubscriptionCategory category, + Thread thread) { + if (category.equals(SubscriptionDestination.SubscriptionCategory.MENTIONS)) { + addTargetFromEntityLink( + users, teams, listOrEmpty(MessageParser.getEntityLinks(thread.getMessage()))); + addUsersMentionedOnPosts(users, teams, thread.getPosts()); + } } public static Set handleConversationNotification( @@ -226,46 +264,8 @@ public class SubscriptionUtil { Map teams = new HashMap<>(); Map users = new HashMap<>(); - Team tempTeamVar; - User tempUserVar; - - if (category.equals(SubscriptionDestination.SubscriptionCategory.MENTIONS)) { - List mentions = MessageParser.getEntityLinks(thread.getMessage()); - for (MessageParser.EntityLink link : mentions) { - if (USER.equals(link.getEntityType())) { - tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - } else if (TEAM.equals(link.getEntityType())) { - tempTeamVar = Entity.getEntity(link, "", Include.NON_DELETED); - teams.put(tempTeamVar.getId(), tempTeamVar); - } - } - - for (Post post : thread.getPosts()) { - tempUserVar = Entity.getEntityByName(USER, post.getFrom(), "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - mentions = MessageParser.getEntityLinks(post.getMessage()); - for (MessageParser.EntityLink link : mentions) { - if (USER.equals(link.getEntityType())) { - tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - } else if (TEAM.equals(link.getEntityType())) { - tempTeamVar = Entity.getEntity(link, "profile", Include.NON_DELETED); - teams.put(tempTeamVar.getId(), tempTeamVar); - } - } - } - } - - if (category.equals(SubscriptionDestination.SubscriptionCategory.OWNERS)) { - try { - tempUserVar = - Entity.getEntityByName(USER, thread.getCreatedBy(), "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - } catch (Exception ex) { - LOG.warn("Thread created by unknown user: {}", thread.getCreatedBy()); - } - } + addMentionedUsersToNotifyIfRequired(users, teams, category, thread); + addThreadOwnerIfRequired(users, category, thread); // Users receiversList.addAll(getEmailOrWebhookEndpointForUsers(users.values().stream().toList(), type)); diff --git a/openmetadata-service/src/main/resources/json/data/EventSubResourceDescriptor.json b/openmetadata-service/src/main/resources/json/data/EventSubResourceDescriptor.json index ac45f17c8e9..5db2f027479 100644 --- a/openmetadata-service/src/main/resources/json/data/EventSubResourceDescriptor.json +++ b/openmetadata-service/src/main/resources/json/data/EventSubResourceDescriptor.json @@ -261,6 +261,7 @@ { "name" : "task", "supportedFilters" : [ + "filterByMentionedName" ] }, {