(cherry picked from commit 6f5a3afac5458ce729aa53dfa9a6a17f346903f4)
This commit is contained in:
Mohit Yadav 2025-04-29 12:04:48 +05:30 committed by mohitdeuex
parent a7597cac8d
commit c8c0fa5e70
4 changed files with 92 additions and 88 deletions

View File

@ -6,7 +6,6 @@ import static org.openmetadata.schema.type.Function.ParameterType.ALL_INDEX_ELAS
import static org.openmetadata.schema.type.Function.ParameterType.READ_FROM_PARAM_CONTEXT;
import static org.openmetadata.schema.type.Function.ParameterType.READ_FROM_PARAM_CONTEXT_PER_ENTITY;
import static org.openmetadata.schema.type.Function.ParameterType.SPECIFIC_INDEX_ELASTIC_SEARCH;
import static org.openmetadata.schema.type.ThreadType.Conversation;
import static org.openmetadata.service.Entity.INGESTION_PIPELINE;
import static org.openmetadata.service.Entity.PIPELINE;
import static org.openmetadata.service.Entity.TEAM;
@ -492,6 +491,16 @@ public class AlertsRuleEvaluator {
JsonUtils.pojoToJson(event.getEntity())));
}
public static Thread getThreadEntity(ChangeEvent event) {
Thread entity;
if (event.getEntity() instanceof String str) {
entity = JsonUtils.readValue(str, Thread.class);
} else {
entity = JsonUtils.convertValue(event.getEntity(), Thread.class);
}
return entity;
}
@Function(
name = "matchConversationUser",
input = "List of comma separated user names to matchConversationUser",
@ -514,11 +523,6 @@ public class AlertsRuleEvaluator {
Thread thread = getThread(changeEvent);
if (!thread.getType().equals(Conversation)) {
// Only applies to Conversation
return false;
}
List<MessageParser.EntityLink> mentions;
if (thread.getPostsCount() == 0) {
mentions = MessageParser.getEntityLinks(thread.getMessage());

View File

@ -17,6 +17,7 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.events.subscription.AlertUtil.convertInputListToString;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getThread;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getThreadEntity;
import static org.openmetadata.service.formatter.entity.IngestionPipelineFormatter.getIngestionPipelineUrl;
import static org.openmetadata.service.resources.feeds.MessageParser.replaceEntityLinks;
@ -176,17 +177,15 @@ public interface MessageDecorator<T> {
.filter(fqn -> !CommonUtil.nullOrEmpty(fqn))
.orElseGet(
() -> {
EntityInterface entityInterface = getEntity(event);
String fqn = entityInterface.getFullyQualifiedName();
if (CommonUtil.nullOrEmpty(fqn)) {
EntityInterface result =
Entity.getEntity(
event.getEntityType(), entityInterface.getId(), "id", Include.NON_DELETED);
fqn = result.getFullyQualifiedName();
if (event.getEntityType().equals(Entity.THREAD)) {
Thread thread = getThreadEntity(event);
return nullOrEmpty(thread.getEntityRef())
? thread.getId().toString()
: thread.getEntityRef().getFullyQualifiedName();
} else {
EntityInterface entityInterface = getEntity(event);
return entityInterface.getFullyQualifiedName();
}
return fqn;
});
}

View File

@ -13,6 +13,7 @@
package org.openmetadata.service.util;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.TEAM;
import static org.openmetadata.service.Entity.THREAD;
@ -164,40 +165,49 @@ public class SubscriptionUtil {
Set<String> receiversList = new HashSet<>();
Map<UUID, Team> teams = new HashMap<>();
Map<UUID, User> users = new HashMap<>();
addMentionedUsersToNotifyIfRequired(users, teams, category, thread);
addAssigneesUsersToNotifyIfRequired(users, teams, category, thread);
addThreadOwnerIfRequired(users, category, thread);
// Users
receiversList.addAll(getEmailOrWebhookEndpointForUsers(users.values().stream().toList(), type));
// Teams
receiversList.addAll(getEmailOrWebhookEndpointForTeams(teams.values().stream().toList(), type));
return receiversList;
}
private static void addTargetFromEntityLink(
Map<UUID, User> users, Map<UUID, Team> teams, List<MessageParser.EntityLink> mentions) {
Team tempTeamVar;
User tempUserVar;
if (category.equals(SubscriptionDestination.SubscriptionCategory.ASSIGNEES)) {
List<EntityReference> assignees = thread.getTask().getAssignees();
if (!nullOrEmpty(assignees)) {
for (EntityReference reference : assignees) {
if (Entity.USER.equals(reference.getType())) {
tempUserVar = Entity.getEntity(USER, reference.getId(), "profile", Include.NON_DELETED);
users.put(tempUserVar.getId(), tempUserVar);
} else if (TEAM.equals(reference.getType())) {
tempTeamVar = Entity.getEntity(TEAM, reference.getId(), "profile", Include.NON_DELETED);
teams.put(tempTeamVar.getId(), tempTeamVar);
}
}
}
for (Post post : thread.getPosts()) {
tempUserVar = Entity.getEntityByName(USER, post.getFrom(), "profile", Include.NON_DELETED);
for (MessageParser.EntityLink link : mentions) {
if (USER.equals(link.getEntityType())) {
tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED);
users.put(tempUserVar.getId(), tempUserVar);
List<MessageParser.EntityLink> mentions = MessageParser.getEntityLinks(post.getMessage());
for (MessageParser.EntityLink link : mentions) {
if (USER.equals(link.getEntityType())) {
tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED);
users.put(tempUserVar.getId(), tempUserVar);
} else if (TEAM.equals(link.getEntityType())) {
tempTeamVar = Entity.getEntity(link, "profile", Include.NON_DELETED);
teams.put(tempTeamVar.getId(), tempTeamVar);
}
}
} else if (TEAM.equals(link.getEntityType())) {
tempTeamVar = Entity.getEntity(link, "profile", Include.NON_DELETED);
teams.put(tempTeamVar.getId(), tempTeamVar);
}
}
}
private static void addTargetFromEntityReference(
Map<UUID, User> users, Map<UUID, Team> teams, List<EntityReference> references) {
Team tempTeamVar;
User tempUserVar;
for (EntityReference reference : references) {
if (Entity.USER.equals(reference.getType())) {
tempUserVar = Entity.getEntity(USER, reference.getId(), "profile", Include.NON_DELETED);
users.put(tempUserVar.getId(), tempUserVar);
} else if (TEAM.equals(reference.getType())) {
tempTeamVar = Entity.getEntity(TEAM, reference.getId(), "profile", Include.NON_DELETED);
teams.put(tempTeamVar.getId(), tempTeamVar);
}
}
}
private static void addThreadOwnerIfRequired(
Map<UUID, User> users, SubscriptionDestination.SubscriptionCategory category, Thread thread) {
User tempUserVar;
if (category.equals(SubscriptionDestination.SubscriptionCategory.OWNERS)) {
try {
tempUserVar =
@ -207,14 +217,42 @@ public class SubscriptionUtil {
LOG.warn("Thread created by unknown user: {}", thread.getCreatedBy());
}
}
}
// Users
receiversList.addAll(getEmailOrWebhookEndpointForUsers(users.values().stream().toList(), type));
private static void addAssigneesUsersToNotifyIfRequired(
Map<UUID, User> users,
Map<UUID, Team> teams,
SubscriptionDestination.SubscriptionCategory category,
Thread thread) {
if (category.equals(SubscriptionDestination.SubscriptionCategory.ASSIGNEES)) {
addTargetFromEntityReference(users, teams, listOrEmpty(thread.getTask().getAssignees()));
addTargetFromEntityLink(
users, teams, listOrEmpty(MessageParser.getEntityLinks(thread.getMessage())));
addUsersMentionedOnPosts(users, teams, thread.getPosts());
}
}
// Teams
receiversList.addAll(getEmailOrWebhookEndpointForTeams(teams.values().stream().toList(), type));
private static void addUsersMentionedOnPosts(
Map<UUID, User> users, Map<UUID, Team> teams, List<Post> posts) {
User tempUserVar;
for (Post post : listOrEmpty(posts)) {
tempUserVar = Entity.getEntityByName(USER, post.getFrom(), "profile", Include.NON_DELETED);
users.put(tempUserVar.getId(), tempUserVar);
addTargetFromEntityLink(
users, teams, listOrEmpty(MessageParser.getEntityLinks(post.getMessage())));
}
}
return receiversList;
private static void addMentionedUsersToNotifyIfRequired(
Map<UUID, User> users,
Map<UUID, Team> teams,
SubscriptionDestination.SubscriptionCategory category,
Thread thread) {
if (category.equals(SubscriptionDestination.SubscriptionCategory.MENTIONS)) {
addTargetFromEntityLink(
users, teams, listOrEmpty(MessageParser.getEntityLinks(thread.getMessage())));
addUsersMentionedOnPosts(users, teams, thread.getPosts());
}
}
public static Set<String> handleConversationNotification(
@ -226,46 +264,8 @@ public class SubscriptionUtil {
Map<UUID, Team> teams = new HashMap<>();
Map<UUID, User> users = new HashMap<>();
Team tempTeamVar;
User tempUserVar;
if (category.equals(SubscriptionDestination.SubscriptionCategory.MENTIONS)) {
List<MessageParser.EntityLink> mentions = MessageParser.getEntityLinks(thread.getMessage());
for (MessageParser.EntityLink link : mentions) {
if (USER.equals(link.getEntityType())) {
tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED);
users.put(tempUserVar.getId(), tempUserVar);
} else if (TEAM.equals(link.getEntityType())) {
tempTeamVar = Entity.getEntity(link, "", Include.NON_DELETED);
teams.put(tempTeamVar.getId(), tempTeamVar);
}
}
for (Post post : thread.getPosts()) {
tempUserVar = Entity.getEntityByName(USER, post.getFrom(), "profile", Include.NON_DELETED);
users.put(tempUserVar.getId(), tempUserVar);
mentions = MessageParser.getEntityLinks(post.getMessage());
for (MessageParser.EntityLink link : mentions) {
if (USER.equals(link.getEntityType())) {
tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED);
users.put(tempUserVar.getId(), tempUserVar);
} else if (TEAM.equals(link.getEntityType())) {
tempTeamVar = Entity.getEntity(link, "profile", Include.NON_DELETED);
teams.put(tempTeamVar.getId(), tempTeamVar);
}
}
}
}
if (category.equals(SubscriptionDestination.SubscriptionCategory.OWNERS)) {
try {
tempUserVar =
Entity.getEntityByName(USER, thread.getCreatedBy(), "profile", Include.NON_DELETED);
users.put(tempUserVar.getId(), tempUserVar);
} catch (Exception ex) {
LOG.warn("Thread created by unknown user: {}", thread.getCreatedBy());
}
}
addMentionedUsersToNotifyIfRequired(users, teams, category, thread);
addThreadOwnerIfRequired(users, category, thread);
// Users
receiversList.addAll(getEmailOrWebhookEndpointForUsers(users.values().stream().toList(), type));

View File

@ -261,6 +261,7 @@
{
"name" : "task",
"supportedFilters" : [
"filterByMentionedName"
]
},
{