diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java index e45f1c22487..66ecb0d7ea1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java @@ -174,6 +174,8 @@ public class OpenMetadataApplication extends Application new GChatPublisher(config); case WEBHOOK -> new GenericPublisher(config); case EMAIL -> new EmailPublisher(config); - case ACTIVITY_FEED -> throw new IllegalArgumentException( - "Cannot create Activity Feed as Publisher."); + case ACTIVITY_FEED -> new ActivityFeedPublisher(config); }; } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/feed/ActivityFeedPublisher.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/feed/ActivityFeedPublisher.java new file mode 100644 index 00000000000..a499e482e6e --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/feed/ActivityFeedPublisher.java @@ -0,0 +1,106 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.apps.bundles.changeEvent.feed; + +import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; +import static org.openmetadata.schema.entity.events.SubscriptionDestination.SubscriptionType.ACTIVITY_FEED; +import static org.openmetadata.schema.type.EventType.ENTITY_DELETED; +import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity; + +import java.util.List; +import java.util.UUID; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.entity.events.SubscriptionDestination; +import org.openmetadata.schema.entity.feed.Thread; +import org.openmetadata.schema.type.ChangeEvent; +import org.openmetadata.service.Entity; +import org.openmetadata.service.apps.bundles.changeEvent.Destination; +import org.openmetadata.service.events.errors.EventPublisherException; +import org.openmetadata.service.exception.CatalogExceptionMessage; +import org.openmetadata.service.formatter.decorators.FeedMessageDecorator; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.FeedRepository; +import org.openmetadata.service.socket.WebSocketManager; +import org.openmetadata.service.util.FeedUtils; +import org.openmetadata.service.util.JsonUtils; + +@Slf4j +public class ActivityFeedPublisher implements Destination { + private final FeedMessageDecorator feedMessageFormatter = new FeedMessageDecorator(); + FeedRepository feedRepository = new FeedRepository(); + + @Getter private final SubscriptionDestination subscriptionDestination; + + public ActivityFeedPublisher(SubscriptionDestination subscription) { + if (subscription.getType() == ACTIVITY_FEED) { + this.subscriptionDestination = subscription; + } else { + throw new IllegalArgumentException("Activity Alert Invoked with Illegal Type and Settings."); + } + } + + @Override + public void sendMessage(ChangeEvent changeEvent) throws EventPublisherException { + try { + // Thread are created in FeedRepository Directly + if (!changeEvent.getEntityType().equals(Entity.THREAD)) { + for (Thread thread : + listOrEmpty(FeedUtils.getThreadWithMessage(feedMessageFormatter, changeEvent))) { + // Don't create a thread if there is no message + if (thread.getMessage() != null && !thread.getMessage().isEmpty()) { + feedRepository.create(thread, changeEvent); + WebSocketManager.getInstance() + .broadCastMessageToAll( + WebSocketManager.FEED_BROADCAST_CHANNEL, JsonUtils.pojoToJson(thread)); + // Delete all conversations related to the entity + if (changeEvent.getEventType().equals(ENTITY_DELETED)) { + deleteAllConversationsRelatedToEntity( + getEntity(changeEvent), Entity.getCollectionDAO()); + } + } + } + } + } catch (Exception ex) { + String message = + CatalogExceptionMessage.eventPublisherFailedToPublish( + ACTIVITY_FEED, changeEvent, ex.getMessage()); + LOG.error(message); + throw new EventPublisherException( + message, Pair.of(subscriptionDestination.getId(), changeEvent)); + } + } + + private void deleteAllConversationsRelatedToEntity( + EntityInterface entityInterface, CollectionDAO collectionDAO) { + String entityId = entityInterface.getId().toString(); + List threadIds = collectionDAO.feedDAO().findByEntityId(entityId); + for (String threadId : threadIds) { + UUID id = UUID.fromString(threadId); + collectionDAO.relationshipDAO().deleteAll(id, Entity.THREAD); + collectionDAO.feedDAO().delete(id); + } + } + + @Override + public boolean getEnabled() { + return subscriptionDestination.getEnabled(); + } + + public void close() { + LOG.info("Closing Activity Feed Publisher"); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java index e27e30e2e3c..287edcbe47d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java @@ -13,58 +13,45 @@ package org.openmetadata.service.events; -import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; -import static org.openmetadata.schema.type.EventType.ENTITY_DELETED; -import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity; import static org.openmetadata.service.formatter.util.FormatterUtil.getChangeEventFromResponseContext; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.List; import java.util.Optional; -import java.util.UUID; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.core.SecurityContext; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.openmetadata.schema.EntityInterface; -import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationConfig; -import org.openmetadata.service.events.subscription.AlertUtil; -import org.openmetadata.service.formatter.decorators.FeedMessageDecorator; -import org.openmetadata.service.jdbi3.CollectionDAO; -import org.openmetadata.service.jdbi3.FeedRepository; -import org.openmetadata.service.socket.WebSocketManager; -import org.openmetadata.service.util.FeedUtils; import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.WebsocketNotificationHandler; @Slf4j public class ChangeEventHandler implements EventHandler { - private ObjectMapper mapper; - private FeedMessageDecorator feedMessageDecorator = new FeedMessageDecorator(); - private final FeedRepository feedRepository = new FeedRepository(); private final WebsocketNotificationHandler websocketNotificationHandler = new WebsocketNotificationHandler(); - public void init(OpenMetadataApplicationConfig config) { - this.mapper = new ObjectMapper(); - } + public void init(OpenMetadataApplicationConfig config) {} @SneakyThrows public Void process( ContainerRequestContext requestContext, ContainerResponseContext responseContext) { + // GET operations don't produce change events , Response has no entity to produce change event + // from + if (requestContext.getMethod().equals("GET") || responseContext.getEntity() == null) { + return null; + } + + // Send to Notification Handler websocketNotificationHandler.processNotifications(responseContext); - String method = requestContext.getMethod(); + + // Send to Change Event Table SecurityContext securityContext = requestContext.getSecurityContext(); String loggedInUserName = securityContext.getUserPrincipal().getName(); try { - CollectionDAO collectionDAO = Entity.getCollectionDAO(); - CollectionDAO.ChangeEventDAO changeEventDAO = collectionDAO.changeEventDAO(); Optional optionalChangeEvent = - getChangeEventFromResponseContext(responseContext, loggedInUserName, method); + getChangeEventFromResponseContext(responseContext, loggedInUserName); if (optionalChangeEvent.isPresent()) { ChangeEvent changeEvent = optionalChangeEvent.get(); if (changeEvent.getEntityType().equals(Entity.QUERY)) { @@ -84,32 +71,14 @@ public class ChangeEventHandler implements EventHandler { changeEvent.setEntity(JsonUtils.pojoToMaskedJson(entity)); } - changeEventDAO.insert(JsonUtils.pojoToJson(changeEvent)); - - // Add a new thread (change event itself should not be for the thread) to the entity for - // every change event - // for the event to appear in activity feeds - if (!changeEvent.getEntityType().equals(Entity.THREAD) - && (AlertUtil.shouldProcessActivityFeedRequest(changeEvent))) { - for (Thread thread : - listOrEmpty( - FeedUtils.getThreadWithMessage( - feedMessageDecorator, changeEvent, loggedInUserName))) { - // Don't create a thread if there is no message - if (thread.getMessage() != null && !thread.getMessage().isEmpty()) { - feedRepository.create(thread, changeEvent); - String jsonThread = mapper.writeValueAsString(thread); - WebSocketManager.getInstance() - .broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread); - if (changeEvent.getEventType().equals(ENTITY_DELETED)) { - deleteAllConversationsRelatedToEntity(getEntity(changeEvent), collectionDAO); - } - } - } - } + // Thread are created in FeedRepository Directly + Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); } } catch (Exception e) { - LOG.error("Failed to capture the change event for method {} due to ", method, e); + LOG.error( + "Failed to capture the change event for method {} due to ", + requestContext.getMethod(), + e); } return null; } @@ -126,17 +95,6 @@ public class ChangeEventHandler implements EventHandler { .withCurrentVersion(changeEvent.getCurrentVersion()); } - private void deleteAllConversationsRelatedToEntity( - EntityInterface entityInterface, CollectionDAO collectionDAO) { - String entityId = entityInterface.getId().toString(); - List threadIds = collectionDAO.feedDAO().findByEntityId(entityId); - for (String threadId : threadIds) { - UUID id = UUID.fromString(threadId); - collectionDAO.relationshipDAO().deleteAll(id, Entity.THREAD); - collectionDAO.feedDAO().delete(id); - } - } - public void close() { /* Nothing to do */ } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java index 78257ddca41..3d34bcd3393 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java @@ -23,7 +23,6 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; -import org.openmetadata.schema.api.events.CreateEventSubscription; import org.openmetadata.schema.entity.events.EventSubscription; import org.openmetadata.schema.entity.events.EventSubscriptionOffset; import org.openmetadata.schema.entity.events.SubscriptionDestination; @@ -47,7 +46,6 @@ import org.quartz.impl.StdSchedulerFactory; public class EventSubscriptionScheduler { public static final String ALERT_JOB_GROUP = "OMAlertJobGroup"; public static final String ALERT_TRIGGER_GROUP = "OMAlertJobGroup"; - private static final String INVALID_ALERT = "Invalid Alert Type"; private static EventSubscriptionScheduler instance; private static volatile boolean initialized = false; private final Scheduler alertsScheduler = new StdSchedulerFactory().getScheduler(); @@ -76,10 +74,6 @@ public class EventSubscriptionScheduler { @Transaction public void addSubscriptionPublisher(EventSubscription eventSubscription) throws SchedulerException { - if (eventSubscription.getAlertType().equals(CreateEventSubscription.AlertType.ACTIVITY_FEED)) { - throw new IllegalArgumentException("Activity Feed is not a valid Alert Type"); - } - AlertPublisher alertPublisher = new AlertPublisher(); if (Boolean.FALSE.equals( eventSubscription.getEnabled())) { // Only add webhook that is enabled for publishing events @@ -147,11 +141,7 @@ public class EventSubscriptionScheduler { public void updateEventSubscription(EventSubscription eventSubscription) { // Remove Existing Subscription Publisher deleteEventSubscriptionPublisher(eventSubscription); - // TODO: fix this make AlertActivityFeedPublisher - if (Boolean.TRUE.equals(eventSubscription.getEnabled()) - && (!eventSubscription - .getAlertType() - .equals(CreateEventSubscription.AlertType.ACTIVITY_FEED))) { + if (Boolean.TRUE.equals(eventSubscription.getEnabled())) { addSubscriptionPublisher(eventSubscription); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/ActivityFeedAlertCache.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/ActivityFeedAlertCache.java deleted file mode 100644 index 404658b1b97..00000000000 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/ActivityFeedAlertCache.java +++ /dev/null @@ -1,47 +0,0 @@ -package org.openmetadata.service.events.subscription; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.UncheckedExecutionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import javax.annotation.CheckForNull; -import lombok.NonNull; -import lombok.extern.slf4j.Slf4j; -import org.openmetadata.schema.entity.events.EventSubscription; -import org.openmetadata.schema.type.Include; -import org.openmetadata.service.Entity; -import org.openmetadata.service.exception.EntityNotFoundException; - -@Slf4j -public class ActivityFeedAlertCache { - protected static final LoadingCache EVENT_SUB_CACHE = - CacheBuilder.newBuilder() - .maximumSize(1000) - .expireAfterWrite(3, TimeUnit.MINUTES) - .build(new ActivityFeedAlertLoader()); - private static final String ACTIVITY_FEED_ALERT = "ActivityFeedAlert"; - - private ActivityFeedAlertCache() { - // Private constructor for static class - } - - public static EventSubscription getActivityFeedAlert() { - try { - return EVENT_SUB_CACHE.get(ACTIVITY_FEED_ALERT); - } catch (ExecutionException | UncheckedExecutionException ex) { - throw new EntityNotFoundException(ex.getMessage()); - } - } - - static class ActivityFeedAlertLoader extends CacheLoader { - @Override - public @NonNull EventSubscription load(@CheckForNull String alertName) { - EventSubscription alert = - Entity.getEntityByName(Entity.EVENT_SUBSCRIPTION, alertName, "*", Include.NON_DELETED); - LOG.debug("Loaded Alert {}", alert); - return alert; - } - } -} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertUtil.java index 220c05b0074..83c163bd512 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertUtil.java @@ -137,14 +137,6 @@ public final class AlertUtil { return config.getResources().contains(event.getEntityType()); // Use Trigger Specific Settings } - public static boolean shouldProcessActivityFeedRequest(ChangeEvent event) { - // Check Trigger Conditions - FilteringRules filteringRules = - ActivityFeedAlertCache.getActivityFeedAlert().getFilteringRules(); - return AlertUtil.shouldTriggerAlert(event, filteringRules) - && AlertUtil.evaluateAlertConditions(event, filteringRules.getRules()); - } - public static SubscriptionStatus buildSubscriptionStatus( SubscriptionStatus.Status status, Long lastSuccessful, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java index e08d932d7a1..19491c3fe9d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java @@ -208,7 +208,7 @@ public interface MessageDecorator { } message.setHeader(headerText); } - List thread = FeedUtils.getThreadWithMessage(this, event, "admin"); + List thread = FeedUtils.getThreadWithMessage(this, event); List messages = new ArrayList<>(); thread.forEach(entry -> messages.add(entry.getMessage())); message.setMessages(messages); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/DefaultEntityFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/DefaultEntityFormatter.java index 81724b33d41..1fc98fc935d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/DefaultEntityFormatter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/DefaultEntityFormatter.java @@ -15,7 +15,7 @@ package org.openmetadata.service.formatter.entity; import static org.openmetadata.service.formatter.util.FormatterUtil.transformMessage; -import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.FieldChange; import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.formatter.util.FormatterUtil; @@ -24,9 +24,9 @@ public class DefaultEntityFormatter implements EntityFormatter { @Override public String format( MessageDecorator messageFormatter, + Thread thread, FieldChange fieldChange, - EntityInterface entity, FormatterUtil.CHANGE_TYPE changeType) { - return transformMessage(messageFormatter, fieldChange, entity, changeType); + return transformMessage(messageFormatter, thread, fieldChange, changeType); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/EntityFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/EntityFormatter.java index 3f6523bf586..99afdaa6c46 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/EntityFormatter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/EntityFormatter.java @@ -13,7 +13,7 @@ package org.openmetadata.service.formatter.entity; -import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.FieldChange; import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.formatter.util.FormatterUtil; @@ -21,7 +21,7 @@ import org.openmetadata.service.formatter.util.FormatterUtil; public interface EntityFormatter { String format( MessageDecorator messageDecorator, + Thread thread, FieldChange fieldChange, - EntityInterface entity, FormatterUtil.CHANGE_TYPE changeType); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/IngestionPipelineFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/IngestionPipelineFormatter.java index e451908c5a6..bf632f2e39b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/IngestionPipelineFormatter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/IngestionPipelineFormatter.java @@ -18,8 +18,11 @@ import static org.openmetadata.service.formatter.util.FormatterUtil.transformMes import java.text.SimpleDateFormat; import java.util.Date; import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; import org.openmetadata.schema.type.FieldChange; +import org.openmetadata.schema.type.Include; +import org.openmetadata.service.Entity; import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.formatter.util.FormatterUtil; import org.openmetadata.service.util.JsonUtils; @@ -30,17 +33,19 @@ public class IngestionPipelineFormatter implements EntityFormatter { @Override public String format( MessageDecorator messageFormatter, + Thread thread, FieldChange fieldChange, - EntityInterface entity, FormatterUtil.CHANGE_TYPE changeType) { if (PIPELINE_STATUS_FIELD.equals(fieldChange.getName())) { - return transformIngestionPipelineStatus(messageFormatter, fieldChange, entity); + return transformIngestionPipelineStatus(messageFormatter, thread, fieldChange); } - return transformMessage(messageFormatter, fieldChange, entity, changeType); + return transformMessage(messageFormatter, thread, fieldChange, changeType); } private String transformIngestionPipelineStatus( - MessageDecorator messageFormatter, FieldChange fieldChange, EntityInterface entity) { + MessageDecorator messageFormatter, Thread thread, FieldChange fieldChange) { + EntityInterface entity = + Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id", Include.ALL); String ingestionPipelineName = entity.getName(); PipelineStatus status = JsonUtils.readOrConvertValue(fieldChange.getNewValue(), PipelineStatus.class); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/KpiFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/KpiFormatter.java index 9a05698bf19..e893233d02c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/KpiFormatter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/KpiFormatter.java @@ -18,7 +18,10 @@ import static org.openmetadata.service.formatter.util.FormatterUtil.transformMes import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.dataInsight.type.KpiResult; import org.openmetadata.schema.dataInsight.type.KpiTarget; +import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.FieldChange; +import org.openmetadata.schema.type.Include; +import org.openmetadata.service.Entity; import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.formatter.util.FormatterUtil; @@ -28,17 +31,19 @@ public class KpiFormatter implements EntityFormatter { @Override public String format( MessageDecorator messageFormatter, + Thread thread, FieldChange fieldChange, - EntityInterface entity, FormatterUtil.CHANGE_TYPE changeType) { if (KPI_RESULT_FIELD.equals(fieldChange.getName())) { - return transformKpiResult(messageFormatter, fieldChange, entity); + return transformKpiResult(messageFormatter, thread, fieldChange); } - return transformMessage(messageFormatter, fieldChange, entity, changeType); + return transformMessage(messageFormatter, thread, fieldChange, changeType); } private String transformKpiResult( - MessageDecorator messageFormatter, FieldChange fieldChange, EntityInterface entity) { + MessageDecorator messageFormatter, Thread thread, FieldChange fieldChange) { + EntityInterface entity = + Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id", Include.ALL); String kpiName = entity.getName(); KpiResult result = (KpiResult) fieldChange.getNewValue(); if (result != null) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/PipelineFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/PipelineFormatter.java index f9f58044971..639bfc5e5d5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/PipelineFormatter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/PipelineFormatter.java @@ -19,7 +19,10 @@ import java.text.SimpleDateFormat; import java.util.Date; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.data.PipelineStatus; +import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.FieldChange; +import org.openmetadata.schema.type.Include; +import org.openmetadata.service.Entity; import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.formatter.util.FormatterUtil; import org.openmetadata.service.util.JsonUtils; @@ -30,17 +33,19 @@ public class PipelineFormatter implements EntityFormatter { @Override public String format( MessageDecorator messageFormatter, + Thread thread, FieldChange fieldChange, - EntityInterface entity, FormatterUtil.CHANGE_TYPE changeType) { if (PIPELINE_STATUS_FIELD.equals(fieldChange.getName())) { - return transformPipelineStatus(messageFormatter, fieldChange, entity); + return transformPipelineStatus(messageFormatter, thread, fieldChange); } - return transformMessage(messageFormatter, fieldChange, entity, changeType); + return transformMessage(messageFormatter, thread, fieldChange, changeType); } private String transformPipelineStatus( - MessageDecorator messageFormatter, FieldChange fieldChange, EntityInterface entity) { + MessageDecorator messageFormatter, Thread thread, FieldChange fieldChange) { + EntityInterface entity = + Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id", Include.ALL); String pipelineName = entity.getName(); PipelineStatus status = JsonUtils.readOrConvertValue(fieldChange.getNewValue(), PipelineStatus.class); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/QueryFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/QueryFormatter.java index d0b913a462f..27bb5deb87f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/QueryFormatter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/QueryFormatter.java @@ -17,10 +17,12 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.formatter.util.FormatterUtil.transformMessage; import java.util.List; -import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.data.Query; +import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.FieldChange; +import org.openmetadata.schema.type.Include; +import org.openmetadata.service.Entity; import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.formatter.util.FormatterUtil; import org.openmetadata.service.util.JsonUtils; @@ -31,19 +33,33 @@ public class QueryFormatter implements EntityFormatter { @Override public String format( MessageDecorator messageFormatter, + Thread thread, FieldChange fieldChange, - EntityInterface entity, FormatterUtil.CHANGE_TYPE changeType) { if (QUERY_USED_IN_FIELD.equals(fieldChange.getName())) { - return transformQueryUsedIn(messageFormatter, fieldChange, entity, changeType); + return transformQueryUsedIn(messageFormatter, thread, fieldChange, changeType); } - return transformMessage(messageFormatter, fieldChange, entity, changeType); + return transformMessage(messageFormatter, thread, fieldChange, changeType); + } + + private String transformQueryUsedIn( + MessageDecorator messageFormatter, + Thread thread, + FieldChange fieldChange, + FormatterUtil.CHANGE_TYPE changeType) { + String newVal = getFieldValue(fieldChange.getNewValue(), messageFormatter, thread); + String oldVal = getFieldValue(fieldChange.getOldValue(), messageFormatter, thread); + return transformMessage( + messageFormatter, + thread, + new FieldChange().withNewValue(newVal).withOldValue(oldVal).withName(QUERY_USED_IN_FIELD), + changeType); } @SuppressWarnings("unchecked") private static String getFieldValue( - Object fieldValue, EntityInterface entity, MessageDecorator messageFormatter) { - Query query = (Query) entity; + Object fieldValue, MessageDecorator messageFormatter, Thread thread) { + Query query = Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id", Include.ALL); StringBuilder field = new StringBuilder(); List tableRefs = fieldValue instanceof String @@ -67,18 +83,4 @@ public class QueryFormatter implements EntityFormatter { } return field.toString(); } - - private String transformQueryUsedIn( - MessageDecorator messageFormatter, - FieldChange fieldChange, - EntityInterface entity, - FormatterUtil.CHANGE_TYPE changeType) { - String newVal = getFieldValue(fieldChange.getNewValue(), entity, messageFormatter); - String oldVal = getFieldValue(fieldChange.getOldValue(), entity, messageFormatter); - return transformMessage( - messageFormatter, - new FieldChange().withNewValue(newVal).withOldValue(oldVal).withName(QUERY_USED_IN_FIELD), - entity, - changeType); - } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/TestCaseFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/TestCaseFormatter.java deleted file mode 100644 index 93770477e62..00000000000 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/entity/TestCaseFormatter.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2021 Collate - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.openmetadata.service.formatter.entity; - -import static org.openmetadata.service.formatter.util.FormatterUtil.transformMessage; - -import org.openmetadata.schema.EntityInterface; -import org.openmetadata.schema.tests.TestCase; -import org.openmetadata.schema.tests.type.TestCaseResult; -import org.openmetadata.schema.tests.type.TestCaseStatus; -import org.openmetadata.schema.type.FieldChange; -import org.openmetadata.service.formatter.decorators.FeedMessageDecorator; -import org.openmetadata.service.formatter.decorators.MessageDecorator; -import org.openmetadata.service.formatter.util.FormatterUtil; -import org.openmetadata.service.resources.feeds.MessageParser; -import org.openmetadata.service.util.JsonUtils; - -public class TestCaseFormatter implements EntityFormatter { - private static final String TEST_RESULT_FIELD = "testCaseResult"; - - @Override - public String format( - MessageDecorator messageFormatter, - FieldChange fieldChange, - EntityInterface entity, - FormatterUtil.CHANGE_TYPE changeType) { - if (TEST_RESULT_FIELD.equals(fieldChange.getName())) { - return transformTestCaseResult(messageFormatter, fieldChange, entity); - } - return transformMessage(messageFormatter, fieldChange, entity, changeType); - } - - private String transformTestCaseResult( - MessageDecorator messageFormatter, FieldChange fieldChange, EntityInterface entity) { - String testCaseName = entity.getName(); - TestCaseResult result = JsonUtils.convertValue(fieldChange.getNewValue(), TestCaseResult.class); - TestCase testCaseEntity = (TestCase) entity; - if (result != null) { - String format = - String.format( - "Test Case %s is %s in %s", - messageFormatter.getBold(), - messageFormatter.getBold(), - MessageParser.EntityLink.parse(testCaseEntity.getEntityLink()).getEntityFQN()); - return String.format( - format, testCaseName, getStatusMessage(messageFormatter, result.getTestCaseStatus())); - } - String format = - String.format( - "Test Case %s is updated in %s", - messageFormatter.getBold(), messageFormatter.getBold()); - return String.format( - format, - testCaseName, - MessageParser.EntityLink.parse(testCaseEntity.getEntityLink()).getEntityFQN()); - } - - private String getStatusMessage(MessageDecorator messageDecorator, TestCaseStatus status) { - if (messageDecorator instanceof FeedMessageDecorator) { - return switch (status) { - case Success -> "Passed"; - case Failed -> "Failed"; - case Aborted -> "Aborted"; - case Queued -> "Queued"; - }; - } else { - return switch (status) { - case Success -> "Passed"; - case Failed -> "Failed"; - case Aborted -> "Aborted"; - case Queued -> "Queued"; - }; - } - } -} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/factory/ParserFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/factory/ParserFactory.java index ec9fb32504c..9849861e731 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/factory/ParserFactory.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/factory/ParserFactory.java @@ -13,6 +13,11 @@ package org.openmetadata.service.formatter.factory; +import static org.openmetadata.service.Entity.FIELD_ASSETS; +import static org.openmetadata.service.formatter.field.TestCaseResultFormatter.TEST_RESULT_FIELD; + +import org.openmetadata.schema.entity.feed.Thread; +import org.openmetadata.schema.type.FieldChange; import org.openmetadata.service.Entity; import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.formatter.entity.DefaultEntityFormatter; @@ -21,20 +26,24 @@ import org.openmetadata.service.formatter.entity.IngestionPipelineFormatter; import org.openmetadata.service.formatter.entity.KpiFormatter; import org.openmetadata.service.formatter.entity.PipelineFormatter; import org.openmetadata.service.formatter.entity.QueryFormatter; -import org.openmetadata.service.formatter.entity.TestCaseFormatter; +import org.openmetadata.service.formatter.field.AssetsFieldFormatter; import org.openmetadata.service.formatter.field.DefaultFieldFormatter; +import org.openmetadata.service.formatter.field.DescriptionFormatter; import org.openmetadata.service.formatter.field.FollowersFormatter; import org.openmetadata.service.formatter.field.OwnerFormatter; import org.openmetadata.service.formatter.field.TagFormatter; -import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.formatter.field.TestCaseResultFormatter; public final class ParserFactory { private ParserFactory() {} public static EntityFormatter getEntityParser(String entityType) { + // Handle Thread entity separately + if (entityType.equals(Entity.THREAD)) { + throw new IllegalArgumentException("Thread entity cannot be handled by Entity Parser."); + } return switch (entityType) { case Entity.QUERY -> new QueryFormatter(); - case Entity.TEST_CASE -> new TestCaseFormatter(); case Entity.KPI -> new KpiFormatter(); case Entity.INGESTION_PIPELINE -> new IngestionPipelineFormatter(); case Entity.PIPELINE -> new PipelineFormatter(); @@ -44,19 +53,17 @@ public final class ParserFactory { public static DefaultFieldFormatter getFieldParserObject( MessageDecorator decorator, - String fieldOldValue, - String fieldNewValue, - String fieldChangeName, - MessageParser.EntityLink entityLink) { + Thread thread, + FieldChange fieldChange, + String fieldChangeName) { return switch (fieldChangeName) { - case Entity.FIELD_TAGS -> new TagFormatter( - decorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink); - case Entity.FIELD_FOLLOWERS -> new FollowersFormatter( - decorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink); - case Entity.FIELD_OWNER -> new OwnerFormatter( - decorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink); - default -> new DefaultFieldFormatter( - decorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink); + case Entity.FIELD_TAGS -> new TagFormatter(decorator, thread, fieldChange); + case Entity.FIELD_FOLLOWERS -> new FollowersFormatter(decorator, thread, fieldChange); + case Entity.FIELD_OWNER -> new OwnerFormatter(decorator, thread, fieldChange); + case Entity.FIELD_DESCRIPTION -> new DescriptionFormatter(decorator, thread, fieldChange); + case TEST_RESULT_FIELD -> new TestCaseResultFormatter(decorator, thread, fieldChange); + case FIELD_ASSETS -> new AssetsFieldFormatter(decorator, thread, fieldChange); + default -> new DefaultFieldFormatter(decorator, thread, fieldChange); }; } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/AssetsFieldFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/AssetsFieldFormatter.java new file mode 100644 index 00000000000..1853a10dbee --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/AssetsFieldFormatter.java @@ -0,0 +1,59 @@ +package org.openmetadata.service.formatter.field; + +import static org.openmetadata.service.Entity.FIELD_ASSETS; + +import org.openmetadata.schema.entity.feed.AssetsFeedInfo; +import org.openmetadata.schema.entity.feed.FeedInfo; +import org.openmetadata.schema.entity.feed.Thread; +import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.FieldChange; +import org.openmetadata.service.formatter.decorators.MessageDecorator; +import org.openmetadata.service.util.JsonUtils; + +public class AssetsFieldFormatter extends DefaultFieldFormatter { + private static final String HEADER_MESSAGE = "%s %s the assets in %s %s"; + + public AssetsFieldFormatter( + MessageDecorator messageDecorator, Thread thread, FieldChange fieldChange) { + super(messageDecorator, thread, fieldChange); + } + + @Override + public String formatAddedField() { + String message = getHeaderForAssetsUpdate(Thread.FieldOperation.ADDED.value()); + // Update the thread with the required information + populateAssetsFeedInfo(Thread.FieldOperation.ADDED, message); + return message; + } + + @Override + public String formatDeletedField() { + String message = getHeaderForAssetsUpdate(Thread.FieldOperation.DELETED.value()); + // Update the thread with the required information + populateAssetsFeedInfo(Thread.FieldOperation.DELETED, message); + return message; + } + + private void populateAssetsFeedInfo(Thread.FieldOperation operation, String threadMessage) { + AssetsFeedInfo assetsFeedInfo = + new AssetsFeedInfo() + .withUpdatedAssets( + JsonUtils.readOrConvertValues(fieldChange.getNewValue(), EntityReference.class)); + FeedInfo feedInfo = + new FeedInfo() + .withHeaderMessage(threadMessage) + .withFieldName(FIELD_ASSETS) + .withEntitySpecificInfo(assetsFeedInfo); + populateThreadFeedInfo( + thread, threadMessage, Thread.CardStyle.DESCRIPTION, operation, feedInfo); + } + + private String getHeaderForAssetsUpdate(String opMessage) { + return String.format( + HEADER_MESSAGE, + thread.getUpdatedBy(), + opMessage, + thread.getEntityType(), + thread.getEntityUrlLink()); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/DefaultFieldFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/DefaultFieldFormatter.java index 4d1e2c72f3a..4791da6ae4e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/DefaultFieldFormatter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/DefaultFieldFormatter.java @@ -13,28 +13,44 @@ package org.openmetadata.service.formatter.field; +import static java.lang.String.format; +import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME; +import static org.openmetadata.service.Entity.FIELD_NAME; +import static org.openmetadata.service.formatter.util.FormatterUtil.getEntityLinkForFieldName; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import javax.json.JsonArray; +import javax.json.JsonObject; +import javax.json.JsonValue; +import javax.json.stream.JsonParsingException; +import org.apache.commons.lang.StringUtils; +import org.openmetadata.schema.entity.feed.FeedInfo; +import org.openmetadata.schema.entity.feed.Thread; +import org.openmetadata.schema.type.FieldChange; import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.formatter.util.FormatterUtil; import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.util.JsonUtils; public class DefaultFieldFormatter implements FieldFormatter { - private final String fieldChangeName; - private final String fieldOldValue; - private final String fieldNewValue; - private final MessageParser.EntityLink entityLink; - private final MessageDecorator messageDecorator; + protected final Thread thread; + protected final String fieldChangeName; + protected final FieldChange fieldChange; + protected final String fieldOldValue; + protected final String fieldNewValue; + protected final MessageDecorator messageDecorator; public DefaultFieldFormatter( - MessageDecorator messageDecorator, - String fieldOldValue, - String fieldNewValue, - String fieldChangeName, - MessageParser.EntityLink entityLink) { + MessageDecorator messageDecorator, Thread thread, FieldChange fieldChange) { this.messageDecorator = messageDecorator; - this.fieldChangeName = fieldChangeName; - this.fieldOldValue = fieldOldValue; - this.fieldNewValue = fieldNewValue; - this.entityLink = entityLink; + this.fieldChangeName = getFieldNameChange(fieldChange.getName(), thread); + this.fieldOldValue = getFieldValue(fieldChange.getOldValue()); + this.fieldNewValue = getFieldValue(fieldChange.getNewValue()); + this.thread = thread; + this.fieldChange = fieldChange; } @Override @@ -70,7 +86,7 @@ public class DefaultFieldFormatter implements FieldFormatter { @Override public MessageParser.EntityLink getEntityLink() { - return entityLink; + return MessageParser.EntityLink.parse(thread.getAbout()); } public String formatAddedField() { @@ -114,4 +130,76 @@ public class DefaultFieldFormatter implements FieldFormatter { } return message; } + + public static void populateThreadFeedInfo( + Thread thread, + String threadMessage, + Thread.CardStyle cardStyle, + Thread.FieldOperation operation, + FeedInfo feedInfo) { + thread.withMessage(threadMessage); + thread.withCardStyle(cardStyle); + thread.withFieldOperation(operation); + thread.withFeedInfo(feedInfo); + } + + public static String getFieldValue(Object fieldValue) { + if (nullOrEmpty(fieldValue)) { + return StringUtils.EMPTY; + } + try { + JsonValue json = JsonUtils.readJson(fieldValue.toString()); + if (json.getValueType() == JsonValue.ValueType.ARRAY) { + JsonArray jsonArray = json.asJsonArray(); + List labels = new ArrayList<>(); + for (JsonValue item : jsonArray) { + if (item.getValueType() == JsonValue.ValueType.OBJECT) { + Set keys = item.asJsonObject().keySet(); + if (keys.contains("tagFQN")) { + labels.add(item.asJsonObject().getString("tagFQN")); + } else if (keys.contains(FIELD_DISPLAY_NAME)) { + // Entity Reference will have a displayName + labels.add(item.asJsonObject().getString(FIELD_DISPLAY_NAME)); + } else if (keys.contains(FIELD_NAME)) { + // Glossary term references has only "name" field + labels.add(item.asJsonObject().getString(FIELD_NAME)); + } else if (keys.contains("constraintType")) { + labels.add(item.asJsonObject().getString("constraintType")); + } + } else if (item.getValueType() == JsonValue.ValueType.STRING) { + // The string might be enclosed with double quotes + // Check if string has double quotes and strip trailing whitespaces + String label = item.toString().replaceAll("^\"|\"$", ""); + labels.add(label.strip()); + } + } + return String.join(", ", labels); + } else if (json.getValueType() == JsonValue.ValueType.OBJECT) { + JsonObject jsonObject = json.asJsonObject(); + // Entity Reference will have a displayName + Set keys = jsonObject.asJsonObject().keySet(); + if (keys.contains(FIELD_DISPLAY_NAME)) { + return jsonObject.asJsonObject().getString(FIELD_DISPLAY_NAME); + } else if (keys.contains(FIELD_NAME)) { + return jsonObject.asJsonObject().getString(FIELD_NAME); + } + } + } catch (JsonParsingException ex) { + // If unable to parse json, just return the string + } + return fieldValue.toString(); + } + + public static String getFieldNameChange(String fieldChangeName, Thread thread) { + MessageParser.EntityLink link = getEntityLinkForFieldName(fieldChangeName, thread); + String arrayFieldName = link.getArrayFieldName(); + String arrayFieldValue = link.getArrayFieldValue(); + String updatedField = fieldChangeName; + if (arrayFieldValue != null) { + updatedField = format("%s.%s", arrayFieldName, arrayFieldValue); + } else if (arrayFieldName != null) { + updatedField = format("%s.%s", fieldChangeName, arrayFieldName); + } + return updatedField; + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/DescriptionFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/DescriptionFormatter.java new file mode 100644 index 00000000000..f8b96272df4 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/DescriptionFormatter.java @@ -0,0 +1,67 @@ +package org.openmetadata.service.formatter.field; + +import static org.openmetadata.service.Entity.FIELD_DESCRIPTION; + +import org.openmetadata.schema.entity.feed.DescriptionFeedInfo; +import org.openmetadata.schema.entity.feed.FeedInfo; +import org.openmetadata.schema.entity.feed.Thread; +import org.openmetadata.schema.type.FieldChange; +import org.openmetadata.service.formatter.decorators.MessageDecorator; + +public class DescriptionFormatter extends DefaultFieldFormatter { + private static final String HEADER_MESSAGE = "%s %s the description for %s %s"; + + public DescriptionFormatter( + MessageDecorator messageDecorator, Thread thread, FieldChange fieldChange) { + super(messageDecorator, thread, fieldChange); + } + + @Override + public String formatAddedField() { + String message = super.formatAddedField(); + // Update the thread with the required information + populateDescriptionFeedInfo(Thread.FieldOperation.ADDED, message, message); + return message; + } + + @Override + public String formatUpdatedField() { + String message = super.formatUpdatedField(); + // Update the thread with the required information + populateDescriptionFeedInfo(Thread.FieldOperation.UPDATED, message, message); + return message; + } + + @Override + public String formatDeletedField() { + String message = super.formatDeletedField(); + // Update the thread with the required information + populateDescriptionFeedInfo(Thread.FieldOperation.DELETED, message, message); + return message; + } + + private void populateDescriptionFeedInfo( + Thread.FieldOperation operation, String threadMessage, String diffMessage) { + DescriptionFeedInfo descriptionFeedInfo = + new DescriptionFeedInfo() + .withPreviousDescription(fieldOldValue) + .withNewDescription(fieldNewValue) + .withDiffMessage(diffMessage); + FeedInfo feedInfo = + new FeedInfo() + .withHeaderMessage(getHeaderForDescriptionUpdate(operation.value())) + .withFieldName(FIELD_DESCRIPTION) + .withEntitySpecificInfo(descriptionFeedInfo); + populateThreadFeedInfo( + thread, threadMessage, Thread.CardStyle.DESCRIPTION, operation, feedInfo); + } + + private String getHeaderForDescriptionUpdate(String eventTypeMessage) { + return String.format( + HEADER_MESSAGE, + thread.getUpdatedBy(), + eventTypeMessage, + thread.getEntityType(), + thread.getEntityUrlLink()); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/FollowersFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/FollowersFormatter.java index 64aceb70f88..af716e24a76 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/FollowersFormatter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/FollowersFormatter.java @@ -13,17 +13,14 @@ package org.openmetadata.service.formatter.field; +import org.openmetadata.schema.entity.feed.Thread; +import org.openmetadata.schema.type.FieldChange; import org.openmetadata.service.formatter.decorators.MessageDecorator; -import org.openmetadata.service.resources.feeds.MessageParser; public class FollowersFormatter extends DefaultFieldFormatter { public FollowersFormatter( - MessageDecorator messageDecorator, - String fieldOldValue, - String fieldNewValue, - String fieldChangeName, - MessageParser.EntityLink entityLink) { - super(messageDecorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink); + MessageDecorator messageDecorator, Thread thread, FieldChange fieldChange) { + super(messageDecorator, thread, fieldChange); } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/OwnerFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/OwnerFormatter.java index 1f8e0070c14..5e0b84e6528 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/OwnerFormatter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/OwnerFormatter.java @@ -13,19 +13,28 @@ package org.openmetadata.service.formatter.field; -import org.apache.commons.lang.StringUtils; +import static org.openmetadata.service.Entity.FIELD_OWNER; + import org.openmetadata.common.utils.CommonUtil; +import org.openmetadata.schema.entity.feed.FeedInfo; +import org.openmetadata.schema.entity.feed.OwnerFeedInfo; +import org.openmetadata.schema.entity.feed.Thread; +import org.openmetadata.schema.type.FieldChange; import org.openmetadata.service.formatter.decorators.MessageDecorator; -import org.openmetadata.service.resources.feeds.MessageParser; public class OwnerFormatter extends DefaultFieldFormatter { + private static final String HEADER_MESSAGE = "%s %s the owner for %s %s"; + public OwnerFormatter( - MessageDecorator messageDecorator, - String fieldOldValue, - String fieldNewValue, - String fieldChangeName, - MessageParser.EntityLink entityLink) { - super(messageDecorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink); + MessageDecorator messageDecorator, Thread thread, FieldChange fieldChange) { + super(messageDecorator, thread, fieldChange); + } + + @Override + public String formatAddedField() { + String message = super.formatAddedField(); + populateOwnerFeedInfo(Thread.FieldOperation.ADDED, message); + return message; } @Override @@ -38,23 +47,52 @@ public class OwnerFormatter extends DefaultFieldFormatter { + this.getMessageDecorator().httpAddMarker() + this.getFieldNewValue() + this.getMessageDecorator().httpAddMarker(); - String spanAdd = this.getMessageDecorator().getAddMarker(); - String spanAddClose = this.getMessageDecorator().getAddMarkerClose(); - String spanRemove = this.getMessageDecorator().getRemoveMarker(); - String spanRemoveClose = this.getMessageDecorator().getRemoveMarkerClose(); diff = this.getMessageDecorator() .replaceMarkers( - diff, this.getMessageDecorator().httpAddMarker(), spanAdd, spanAddClose); + diff, + this.getMessageDecorator().httpAddMarker(), + this.getMessageDecorator().getAddMarker(), + this.getMessageDecorator().getAddMarkerClose()); diff = this.getMessageDecorator() .replaceMarkers( - diff, this.getMessageDecorator().httpRemoveMarker(), spanRemove, spanRemoveClose); - if (CommonUtil.nullOrEmpty(diff)) { - return StringUtils.EMPTY; - } else { + diff, + this.getMessageDecorator().httpRemoveMarker(), + this.getMessageDecorator().getRemoveMarker(), + this.getMessageDecorator().getRemoveMarkerClose()); + if (!CommonUtil.nullOrEmpty(diff)) { String field = String.format("Updated %s: %s", this.getMessageDecorator().getBold(), diff); - return String.format(field, this.getFieldChangeName()); + diff = String.format(field, this.getFieldChangeName()); } + populateOwnerFeedInfo(Thread.FieldOperation.UPDATED, diff); + return diff; + } + + @Override + public String formatDeletedField() { + String message = super.formatDeletedField(); + populateOwnerFeedInfo(Thread.FieldOperation.DELETED, message); + return message; + } + + private void populateOwnerFeedInfo(Thread.FieldOperation operation, String threadMessage) { + OwnerFeedInfo ownerFeedInfo = + new OwnerFeedInfo().withPreviousOwner(fieldOldValue).withUpdatedOwner(fieldNewValue); + FeedInfo feedInfo = + new FeedInfo() + .withHeaderMessage(getHeaderForOwnerUpdate(operation.value())) + .withFieldName(FIELD_OWNER) + .withEntitySpecificInfo(ownerFeedInfo); + populateThreadFeedInfo(thread, threadMessage, Thread.CardStyle.OWNER, operation, feedInfo); + } + + private String getHeaderForOwnerUpdate(String eventTypeMessage) { + return String.format( + HEADER_MESSAGE, + thread.getUpdatedBy(), + eventTypeMessage, + thread.getEntityType(), + thread.getEntityUrlLink()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/TagFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/TagFormatter.java index 5a255094665..70f3aba21c7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/TagFormatter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/TagFormatter.java @@ -13,18 +13,23 @@ package org.openmetadata.service.formatter.field; +import static org.openmetadata.service.Entity.FIELD_TAGS; + +import java.util.List; +import org.openmetadata.schema.entity.feed.FeedInfo; +import org.openmetadata.schema.entity.feed.TagFeedInfo; +import org.openmetadata.schema.entity.feed.Thread; +import org.openmetadata.schema.type.FieldChange; +import org.openmetadata.schema.type.TagLabel; import org.openmetadata.service.formatter.decorators.MessageDecorator; -import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.util.JsonUtils; public class TagFormatter extends DefaultFieldFormatter { + private static final String HEADER_MESSAGE = "%s %s the tags for %s %s"; public TagFormatter( - MessageDecorator messageDecorator, - String fieldOldValue, - String fieldNewValue, - String fieldChangeName, - MessageParser.EntityLink entityLink) { - super(messageDecorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink); + MessageDecorator messageDecorator, Thread thread, FieldChange fieldChange) { + super(messageDecorator, thread, fieldChange); } @Override @@ -63,6 +68,7 @@ public class TagFormatter extends DefaultFieldFormatter { .replaceMarkers( message, this.getMessageDecorator().httpAddMarker(), spanAdd, spanAddClose); } + populateTagFeedInfo(Thread.FieldOperation.ADDED, message); return message; } @@ -112,6 +118,7 @@ public class TagFormatter extends DefaultFieldFormatter { this.getFieldChangeName(), diff); } + populateTagFeedInfo(Thread.FieldOperation.UPDATED, message); return String.format(message, this.getFieldChangeName()); } @@ -151,6 +158,7 @@ public class TagFormatter extends DefaultFieldFormatter { .replaceMarkers( message, this.getMessageDecorator().httpRemoveMarker(), spanAdd, spanAddClose); } + populateTagFeedInfo(Thread.FieldOperation.DELETED, message); return message; } @@ -160,4 +168,27 @@ public class TagFormatter extends DefaultFieldFormatter { } return fieldName; } + + private void populateTagFeedInfo(Thread.FieldOperation operation, String threadMessage) { + List oldTags = + JsonUtils.readOrConvertValues(fieldChange.getOldValue(), TagLabel.class); + List newTags = + JsonUtils.readOrConvertValues(fieldChange.getNewValue(), TagLabel.class); + TagFeedInfo tagFeedInfo = new TagFeedInfo().withPreviousTags(oldTags).withUpdatedTags(newTags); + FeedInfo feedInfo = + new FeedInfo() + .withHeaderMessage(getHeaderForTagsUpdate(operation.value())) + .withFieldName(FIELD_TAGS) + .withEntitySpecificInfo(tagFeedInfo); + populateThreadFeedInfo(thread, threadMessage, Thread.CardStyle.TAGS, operation, feedInfo); + } + + private String getHeaderForTagsUpdate(String eventTypeMessage) { + return String.format( + HEADER_MESSAGE, + thread.getUpdatedBy(), + eventTypeMessage, + thread.getEntityType(), + thread.getEntityUrlLink()); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/TestCaseResultFormatter.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/TestCaseResultFormatter.java new file mode 100644 index 00000000000..1aa2d52bd58 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/field/TestCaseResultFormatter.java @@ -0,0 +1,126 @@ +package org.openmetadata.service.formatter.field; + +import org.openmetadata.schema.entity.feed.FeedInfo; +import org.openmetadata.schema.entity.feed.TestCaseResultFeedInfo; +import org.openmetadata.schema.entity.feed.Thread; +import org.openmetadata.schema.tests.TestCase; +import org.openmetadata.schema.tests.TestSuite; +import org.openmetadata.schema.tests.type.TestCaseResult; +import org.openmetadata.schema.tests.type.TestCaseStatus; +import org.openmetadata.schema.type.FieldChange; +import org.openmetadata.schema.type.Include; +import org.openmetadata.service.Entity; +import org.openmetadata.service.formatter.decorators.FeedMessageDecorator; +import org.openmetadata.service.formatter.decorators.MessageDecorator; +import org.openmetadata.service.jdbi3.TestCaseRepository; +import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.util.JsonUtils; +import org.openmetadata.service.util.ResultList; + +public class TestCaseResultFormatter extends DefaultFieldFormatter { + public static final String TEST_RESULT_FIELD = "testCaseResult"; + private static final String HEADER_MESSAGE = "%s added results to test Case %s"; + + public TestCaseResultFormatter( + MessageDecorator messageDecorator, Thread thread, FieldChange fieldChange) { + super(messageDecorator, thread, fieldChange); + } + + @Override + public String formatAddedField() { + String message; + if (fieldChangeName.equals(TEST_RESULT_FIELD)) { + message = transformTestCaseResult(messageDecorator, thread, fieldChange); + } else { + message = super.formatAddedField(); + } + // Update the thread with the required information + populateTestResultFeedInfo(Thread.FieldOperation.UPDATED, message); + return message; + } + + @Override + public String formatUpdatedField() { + String message; + if (fieldChangeName.equals(TEST_RESULT_FIELD)) { + message = transformTestCaseResult(messageDecorator, thread, fieldChange); + } else { + message = super.formatUpdatedField(); + } + // Update the thread with the required information + populateTestResultFeedInfo(Thread.FieldOperation.UPDATED, message); + return message; + } + + private void populateTestResultFeedInfo(Thread.FieldOperation operation, String threadMessage) { + long currentTime = System.currentTimeMillis(); + long lastWeekTime = currentTime - 7 * 24 * 60 * 60 * 1000; + TestCaseRepository testCaseRepository = + (TestCaseRepository) Entity.getEntityRepository(Entity.TEST_CASE); + TestCase testCaseEntity = + Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id,testSuite", Include.ALL); + TestSuite testSuiteEntity = Entity.getEntity(testCaseEntity.getTestSuite(), "id", Include.ALL); + ResultList testCaseResultResultList = + testCaseRepository.getTestCaseResults( + testCaseEntity.getFullyQualifiedName(), lastWeekTime, currentTime); + TestCaseResultFeedInfo testCaseResultFeedInfo = + new TestCaseResultFeedInfo() + .withTestCaseResult(testCaseResultResultList.getData()) + .withEntityTestResultSummary(testSuiteEntity.getTestCaseResultSummary()); + FeedInfo feedInfo = + new FeedInfo() + .withHeaderMessage(getHeaderForTestResultUpdate()) + .withFieldName(TEST_RESULT_FIELD) + .withEntitySpecificInfo(testCaseResultFeedInfo); + populateThreadFeedInfo( + thread, threadMessage, Thread.CardStyle.TEST_CASE_RESULT, operation, feedInfo); + } + + private String getHeaderForTestResultUpdate() { + return String.format(HEADER_MESSAGE, thread.getUpdatedBy(), thread.getEntityUrlLink()); + } + + private String transformTestCaseResult( + MessageDecorator messageFormatter, Thread thread, FieldChange fieldChange) { + TestCase testCaseEntity = + Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id", Include.ALL); + String testCaseName = testCaseEntity.getName(); + TestCaseResult result = JsonUtils.convertValue(fieldChange.getNewValue(), TestCaseResult.class); + if (result != null) { + String format = + String.format( + "Test Case %s is %s in %s", + messageFormatter.getBold(), + messageFormatter.getBold(), + MessageParser.EntityLink.parse(testCaseEntity.getEntityLink()).getEntityFQN()); + return String.format( + format, testCaseName, getStatusMessage(messageFormatter, result.getTestCaseStatus())); + } + String format = + String.format( + "Test Case %s is updated in %s", + messageFormatter.getBold(), messageFormatter.getBold()); + return String.format( + format, + testCaseName, + MessageParser.EntityLink.parse(testCaseEntity.getEntityLink()).getEntityFQN()); + } + + private String getStatusMessage(MessageDecorator messageDecorator, TestCaseStatus status) { + if (messageDecorator instanceof FeedMessageDecorator) { + return switch (status) { + case Success -> "Passed"; + case Failed -> "Failed"; + case Aborted -> "Aborted"; + case Queued -> "Queued"; + }; + } else { + return switch (status) { + case Success -> "Passed"; + case Failed -> "Failed"; + case Aborted -> "Aborted"; + case Queued -> "Queued"; + }; + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/util/FormatterUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/util/FormatterUtil.java index 575e7e510f7..d2a3e90d9be 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/util/FormatterUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/util/FormatterUtil.java @@ -13,36 +13,26 @@ package org.openmetadata.service.formatter.util; -import static java.lang.String.format; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.schema.type.EventType.ENTITY_CREATED; -import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME; -import static org.openmetadata.service.Entity.FIELD_NAME; import static org.openmetadata.service.Entity.THREAD; import static org.openmetadata.service.formatter.factory.ParserFactory.getFieldParserObject; +import static org.openmetadata.service.formatter.field.DefaultFieldFormatter.getFieldNameChange; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; -import javax.json.JsonArray; -import javax.json.JsonObject; -import javax.json.JsonValue; -import javax.json.stream.JsonParsingException; import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.ChangeDescription; import org.openmetadata.schema.type.ChangeEvent; -import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.EventType; import org.openmetadata.schema.type.FieldChange; import org.openmetadata.service.formatter.decorators.MessageDecorator; @@ -56,10 +46,11 @@ import org.openmetadata.service.util.RestUtil; @Slf4j public class FormatterUtil { - public static MessageParser.EntityLink getEntityLink(String fieldName, EntityInterface entity) { - EntityReference entityReference = entity.getEntityReference(); - String entityType = entityReference.getType(); - String entityFQN = entityReference.getFullyQualifiedName(); + public static MessageParser.EntityLink getEntityLinkForFieldName( + String fieldName, Thread thread) { + MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(thread.getAbout()); + String entityType = thread.getEntityType(); + String entityFQN = entityLink.getEntityFQN(); String arrayFieldName = null; String arrayFieldValue = null; @@ -79,53 +70,6 @@ public class FormatterUtil { entityType, entityFQN, fieldName, arrayFieldName, arrayFieldValue); } - public static String getFieldValue(Object fieldValue) { - if (nullOrEmpty(fieldValue)) { - return StringUtils.EMPTY; - } - try { - JsonValue json = JsonUtils.readJson(fieldValue.toString()); - if (json.getValueType() == JsonValue.ValueType.ARRAY) { - JsonArray jsonArray = json.asJsonArray(); - List labels = new ArrayList<>(); - for (JsonValue item : jsonArray) { - if (item.getValueType() == JsonValue.ValueType.OBJECT) { - Set keys = item.asJsonObject().keySet(); - if (keys.contains("tagFQN")) { - labels.add(item.asJsonObject().getString("tagFQN")); - } else if (keys.contains(FIELD_DISPLAY_NAME)) { - // Entity Reference will have a displayName - labels.add(item.asJsonObject().getString(FIELD_DISPLAY_NAME)); - } else if (keys.contains(FIELD_NAME)) { - // Glossary term references have only "name" field - labels.add(item.asJsonObject().getString(FIELD_NAME)); - } else if (keys.contains("constraintType")) { - labels.add(item.asJsonObject().getString("constraintType")); - } - } else if (item.getValueType() == JsonValue.ValueType.STRING) { - // The string might be enclosed with double quotes - // Check if string has double quotes and strip trailing whitespaces - String label = item.toString().replaceAll("^\"|\"$", ""); - labels.add(label.strip()); - } - } - return String.join(", ", labels); - } else if (json.getValueType() == JsonValue.ValueType.OBJECT) { - JsonObject jsonObject = json.asJsonObject(); - // Entity Reference will have a displayName - Set keys = jsonObject.asJsonObject().keySet(); - if (keys.contains(FIELD_DISPLAY_NAME)) { - return jsonObject.asJsonObject().getString(FIELD_DISPLAY_NAME); - } else if (keys.contains(FIELD_NAME)) { - return jsonObject.asJsonObject().getString(FIELD_NAME); - } - } - } catch (JsonParsingException ex) { - // If unable to parse json, just return the string - } - return fieldValue.toString(); - } - ////// used in alerts rule evaluator/// public static Set getUpdatedField(ChangeEvent event) { Set fields = new HashSet<>(); @@ -151,33 +95,21 @@ public class FormatterUtil { public static String transformMessage( MessageDecorator messageFormatter, + Thread thread, FieldChange fieldChange, - EntityInterface entity, CHANGE_TYPE changeType) { - MessageParser.EntityLink link = getEntityLink(fieldChange.getName(), entity); - String arrayFieldName = link.getArrayFieldName(); + MessageParser.EntityLink link = getEntityLinkForFieldName(fieldChange.getName(), thread); String arrayFieldValue = link.getArrayFieldValue(); - - String message; - String updatedField = fieldChange.getName(); - if (arrayFieldValue != null) { - updatedField = format("%s.%s", arrayFieldName, arrayFieldValue); - } else if (arrayFieldName != null) { - updatedField = format("%s.%s", fieldChange.getName(), arrayFieldName); - } - - String oldField = getFieldValue(fieldChange.getOldValue()); - String newField = getFieldValue(fieldChange.getNewValue()); + String updateField = getFieldNameChange(fieldChange.getName(), thread); DefaultFieldFormatter fieldSpecificFormatter; if (nullOrEmpty(arrayFieldValue)) { fieldSpecificFormatter = - getFieldParserObject(messageFormatter, oldField, newField, updatedField, link); + getFieldParserObject(messageFormatter, thread, fieldChange, updateField); } else { fieldSpecificFormatter = - getFieldParserObject(messageFormatter, oldField, newField, arrayFieldValue, link); + getFieldParserObject(messageFormatter, thread, fieldChange, arrayFieldValue); } - message = fieldSpecificFormatter.getFormattedMessage(changeType); - return message; + return fieldSpecificFormatter.getFormattedMessage(changeType); } public enum CHANGE_TYPE { @@ -186,15 +118,13 @@ public class FormatterUtil { DELETE } - public static Map getFormattedMessages( - MessageDecorator messageFormatter, - ChangeDescription changeDescription, - EntityInterface entity) { + public static List getFormattedMessages( + MessageDecorator messageFormatter, Thread thread, ChangeDescription changeDescription) { // Store a map of entityLink -> message List fieldsUpdated = changeDescription.getFieldsUpdated(); - Map messages = + List messages = getFormattedMessagesForAllFieldChange( - messageFormatter, entity, fieldsUpdated, CHANGE_TYPE.UPDATE); + messageFormatter, thread, fieldsUpdated, CHANGE_TYPE.UPDATE); // fieldsAdded and fieldsDeleted need special handling since // there is a possibility to merge them as one update message. @@ -202,13 +132,13 @@ public class FormatterUtil { List fieldsDeleted = changeDescription.getFieldsDeleted(); if (fieldsAdded.isEmpty() || fieldsDeleted.isEmpty()) { if (!fieldsAdded.isEmpty()) { - messages.putAll( + messages.addAll( getFormattedMessagesForAllFieldChange( - messageFormatter, entity, fieldsAdded, CHANGE_TYPE.ADD)); + messageFormatter, thread, fieldsAdded, CHANGE_TYPE.ADD)); } else if (!fieldsDeleted.isEmpty()) { - messages.putAll( + messages.addAll( getFormattedMessagesForAllFieldChange( - messageFormatter, entity, fieldsDeleted, CHANGE_TYPE.DELETE)); + messageFormatter, thread, fieldsDeleted, CHANGE_TYPE.DELETE)); } return messages; } @@ -217,61 +147,59 @@ public class FormatterUtil { fieldsAdded.stream().filter(f -> f.getName().equals(field.getName())).findAny(); if (addedField.isPresent()) { String fieldName = field.getName(); - MessageParser.EntityLink link = FormatterUtil.getEntityLink(fieldName, entity); + MessageParser.EntityLink link = FormatterUtil.getEntityLinkForFieldName(fieldName, thread); // convert the added field and deleted field into one update message + Thread tempThread = JsonUtils.deepCopy(thread, Thread.class); String message = ParserFactory.getEntityParser(link.getEntityType()) .format( messageFormatter, + tempThread, new FieldChange() .withName(fieldName) .withOldValue(field.getOldValue()) .withNewValue(addedField.get().getNewValue()), - entity, CHANGE_TYPE.UPDATE); - messages.put(link, message); + tempThread.withMessage(message); + messages.add(tempThread); // Remove the field from addedFields list to avoid double processing fieldsAdded = fieldsAdded.stream().filter(f -> !f.equals(addedField.get())).toList(); } else { // process the deleted field - messages.putAll( + messages.addAll( getFormattedMessagesForAllFieldChange( - messageFormatter, entity, Collections.singletonList(field), CHANGE_TYPE.DELETE)); + messageFormatter, thread, Collections.singletonList(field), CHANGE_TYPE.DELETE)); } } // process the remaining added fields if (!fieldsAdded.isEmpty()) { - messages.putAll( + messages.addAll( getFormattedMessagesForAllFieldChange( - messageFormatter, entity, fieldsAdded, CHANGE_TYPE.ADD)); + messageFormatter, thread, fieldsAdded, CHANGE_TYPE.ADD)); } return messages; } - private static Map getFormattedMessagesForAllFieldChange( + private static List getFormattedMessagesForAllFieldChange( MessageDecorator messageFormatter, - EntityInterface entity, + Thread thread, List fields, CHANGE_TYPE changeType) { - Map messages = new HashMap<>(); + List threads = new ArrayList<>(); for (FieldChange field : fields) { - MessageParser.EntityLink link = FormatterUtil.getEntityLink(field.getName(), entity); + Thread tempEntity = JsonUtils.deepCopy(thread, Thread.class).withId(UUID.randomUUID()); + // We are creating multiple thread on the same entity based on different messages String message = - ParserFactory.getEntityParser(link.getEntityType()) - .format(messageFormatter, field, entity, changeType); - messages.put(link, message); + ParserFactory.getEntityParser(thread.getEntityType()) + .format(messageFormatter, tempEntity, field, changeType); + tempEntity.withMessage(message); + threads.add(tempEntity); } - return messages; + return threads; } public static Optional getChangeEventFromResponseContext( - ContainerResponseContext responseContext, String updateBy, String method) { - // GET operations don't produce change events , Response has no entity to produce change event - // from - if (method.equals("GET") || responseContext.getEntity() == null) { - return Optional.empty(); - } - + ContainerResponseContext responseContext, String updateBy) { Optional eventType = getEventTypeFromResponse(responseContext); if (eventType.isEmpty() || !responseContext.hasEntity()) { return Optional.empty(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index d6a1b75d0fb..afae8213813 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -21,6 +21,7 @@ import static org.openmetadata.schema.type.EventType.ENTITY_FIELDS_CHANGED; import static org.openmetadata.schema.type.EventType.ENTITY_NO_CHANGE; import static org.openmetadata.schema.type.EventType.ENTITY_RESTORED; import static org.openmetadata.schema.type.EventType.ENTITY_SOFT_DELETED; +import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED; import static org.openmetadata.schema.type.Include.ALL; import static org.openmetadata.schema.type.Include.DELETED; import static org.openmetadata.schema.type.Include.NON_DELETED; @@ -120,6 +121,7 @@ import org.openmetadata.schema.type.Column; import org.openmetadata.schema.type.EntityHistory; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.EventType; +import org.openmetadata.schema.type.FieldChange; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.LifeCycle; import org.openmetadata.schema.type.ProviderType; @@ -1836,9 +1838,50 @@ public abstract class EntityRepository { } result.withSuccessRequest(success); + + // Create a Change Event on successful addition/removal of assets + if (result.getStatus().equals(ApiStatus.SUCCESS)) { + EntityInterface entityInterface = Entity.getEntity(fromEntity, entityId, "id", ALL); + ChangeDescription change = + addBulkAddRemoveChangeDescription( + entityInterface.getVersion(), isAdd, request.getAssets(), null); + ChangeEvent changeEvent = + getChangeEvent(entityInterface, change, fromEntity, entityInterface.getVersion()); + Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); + } + return result; } + private ChangeDescription addBulkAddRemoveChangeDescription( + Double version, boolean isAdd, Object newValue, Object oldValue) { + FieldChange fieldChange = + new FieldChange().withName("assets").withNewValue(newValue).withOldValue(oldValue); + ChangeDescription change = new ChangeDescription().withPreviousVersion(version); + if (isAdd) { + change.getFieldsAdded().add(fieldChange); + } else { + change.getFieldsDeleted().add(fieldChange); + } + return change; + } + + private ChangeEvent getChangeEvent( + EntityInterface updated, ChangeDescription change, String entityType, Double prevVersion) { + return new ChangeEvent() + .withId(UUID.randomUUID()) + .withEntity(updated) + .withChangeDescription(change) + .withEventType(ENTITY_UPDATED) + .withEntityType(entityType) + .withEntityId(updated.getId()) + .withEntityFullyQualifiedName(updated.getFullyQualifiedName()) + .withUserName(updated.getUpdatedBy()) + .withTimestamp(System.currentTimeMillis()) + .withCurrentVersion(updated.getVersion()) + .withPreviousVersion(prevVersion); + } + /** Remove owner relationship for a given entity */ @Transaction private void removeOwner(T entity, EntityReference owner) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EventSubscriptionRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EventSubscriptionRepository.java index 389b77c9093..5e1c42d1d21 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EventSubscriptionRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EventSubscriptionRepository.java @@ -50,9 +50,7 @@ public class EventSubscriptionRepository extends EntityRepository destinations = new ArrayList<>(); entity .getDestinations() @@ -135,13 +133,16 @@ public class EventSubscriptionRepository extends EntityRepository eventSubList = JsonUtils.readObjects(listAllEventsSubscriptions, EventSubscription.class); for (EventSubscription subscription : eventSubList) { - if (subscription.getAlertType() != ACTIVITY_FEED) { - EventSubscriptionScheduler.getInstance().addSubscriptionPublisher(subscription); - } + EventSubscriptionScheduler.getInstance().addSubscriptionPublisher(subscription); } } catch (Exception ex) { // Starting application should not fail diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/feeds/FeedResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/feeds/FeedResource.java index 976799e1d0f..07457173031 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/feeds/FeedResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/feeds/FeedResource.java @@ -592,7 +592,9 @@ public class FeedResource { .withAnnouncement(create.getAnnouncementDetails()) .withChatbot(create.getChatbotDetails()) .withUpdatedBy(securityContext.getUserPrincipal().getName()) - .withUpdatedAt(System.currentTimeMillis()); + .withUpdatedAt(System.currentTimeMillis()) + .withEntityType(Entity.THREAD) + .withGeneratedBy(Thread.GeneratedBy.USER); } private Post getPost(CreatePost create) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/FeedUtils.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/FeedUtils.java index c013fde2a7e..bbf230f3486 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/FeedUtils.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/FeedUtils.java @@ -16,15 +16,12 @@ package org.openmetadata.service.util; import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity; import static org.openmetadata.service.formatter.util.FormatterUtil.getFormattedMessages; -import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.feed.Thread; -import org.openmetadata.schema.type.ChangeDescription; import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.service.Entity; import org.openmetadata.service.formatter.decorators.MessageDecorator; @@ -35,18 +32,17 @@ public final class FeedUtils { private FeedUtils() {} public static List getThreadWithMessage( - MessageDecorator messageDecorator, ChangeEvent changeEvent, String loggedInUserName) { + MessageDecorator messageDecorator, ChangeEvent changeEvent) { if (changeEvent == null || changeEvent.getEntity() == null) { return Collections.emptyList(); // Response has no entity to produce change event from } // Change Event is of Thread or Data Assets if (changeEvent.getEntityType().equals(Entity.THREAD)) { - // TODO: + // Thread type create from FeedRepository return Collections.emptyList(); - // return List.of(AlertsRuleEvaluator.getThread(changeEvent)); } else if (Entity.getEntityList().contains(changeEvent.getEntityType())) { - return populateMessageForDataAssets(messageDecorator, changeEvent, loggedInUserName); + return populateMessageForDataAssets(messageDecorator, changeEvent); } else { LOG.error( "Invalid Entity Type: {}, Currently Change Events are expected as Thread or Data Assets", @@ -55,18 +51,20 @@ public final class FeedUtils { } } - private static List populateMessageForThread( - ChangeEvent changeEvent, String loggedInUserName) { - return null; - } - private static List populateMessageForDataAssets( - MessageDecorator messageDecorator, ChangeEvent changeEvent, String loggedInUserName) { + MessageDecorator messageDecorator, ChangeEvent changeEvent) { String message; EntityInterface entityInterface = getEntity(changeEvent); MessageParser.EntityLink about = new MessageParser.EntityLink( changeEvent.getEntityType(), entityInterface.getFullyQualifiedName(), null, null, null); + Thread thread = + getThread( + messageDecorator, + entityInterface, + about.getLinkString(), + changeEvent.getEntityType(), + changeEvent.getUserName()); // In Case EventType is not valid return switch (changeEvent.getEventType()) { case ENTITY_CREATED -> { @@ -74,71 +72,57 @@ public final class FeedUtils { String.format( "Created **%s**: `%s`", changeEvent.getEntityType(), entityInterface.getFullyQualifiedName()); - yield List.of(getThread(about.getLinkString(), message, loggedInUserName)); + yield List.of(thread.withMessage(message)); } - case ENTITY_UPDATED -> getThreadWithMessage( - messageDecorator, entityInterface, changeEvent.getChangeDescription(), loggedInUserName); + case ENTITY_UPDATED -> getFormattedMessages( + messageDecorator, thread, changeEvent.getChangeDescription()); case ENTITY_SOFT_DELETED -> { message = String.format( "Soft deleted **%s**: `%s`", changeEvent.getEntityType(), entityInterface.getFullyQualifiedName()); - yield List.of(getThread(about.getLinkString(), message, loggedInUserName)); + yield List.of(thread.withMessage(message)); } case ENTITY_DELETED -> { message = String.format( "Permanently Deleted **%s**: `%s`", changeEvent.getEntityType(), entityInterface.getFullyQualifiedName()); - yield List.of(getThread(about.getLinkString(), message, loggedInUserName)); + yield List.of(thread.withMessage(message)); } case LOGICAL_TEST_CASE_ADDED -> { message = String.format( "Added Logical Test Cases to **%s**: `%s`", changeEvent.getEntityType(), entityInterface.getFullyQualifiedName()); - yield List.of(getThread(about.getLinkString(), message, loggedInUserName)); + yield List.of(thread.withMessage(message)); } default -> { - if (entityInterface.getChangeDescription() == null) { + if (changeEvent.getChangeDescription() == null) { yield Collections.emptyList(); } - yield getThreadWithMessage( - messageDecorator, - entityInterface, - entityInterface.getChangeDescription(), - loggedInUserName); + yield getFormattedMessages(messageDecorator, thread, changeEvent.getChangeDescription()); } }; } - private static List getThreadWithMessage( - MessageDecorator messageDecorator, - EntityInterface entity, - ChangeDescription changeDescription, + public static Thread getThread( + MessageDecorator decorator, + EntityInterface entityInterface, + String linkString, + String entityType, String loggedInUserName) { - List threads = new ArrayList<>(); - - Map messages = - getFormattedMessages(messageDecorator, changeDescription, entity); - - // Create an automated thread - for (Map.Entry entry : messages.entrySet()) { - threads.add(getThread(entry.getKey().getLinkString(), entry.getValue(), loggedInUserName)); - } - - return threads; - } - - private static Thread getThread(String linkString, String message, String loggedInUserName) { return new Thread() .withId(UUID.randomUUID()) .withThreadTs(System.currentTimeMillis()) .withCreatedBy(loggedInUserName) .withAbout(linkString) + .withEntityId(entityInterface.getId()) + .withEntityType(entityType) .withReactions(Collections.emptyList()) .withUpdatedBy(loggedInUserName) .withUpdatedAt(System.currentTimeMillis()) - .withMessage(message); + .withGeneratedBy(Thread.GeneratedBy.SYSTEM) + .withEntityUrlLink(decorator.buildEntityUrl(entityType, entityInterface)); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/JsonUtils.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/JsonUtils.java index 33baa86e3b3..f69f4acac7e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/JsonUtils.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/JsonUtils.java @@ -126,6 +126,14 @@ public final class JsonUtils { } } + public static List readOrConvertValues(Object obj, Class clz) { + if (obj instanceof String str) { + return readObjects(str, clz); + } else { + return convertObjects(obj, clz); + } + } + public static T readValue(String json, String clazzName) { try { return (T) readValue(json, Class.forName(clazzName)); @@ -156,6 +164,15 @@ public final class JsonUtils { } } + /** Convert an array of objects of type {@code T} from json */ + public static List convertObjects(Object json, Class clz) { + if (json == null) { + return Collections.emptyList(); + } + TypeFactory typeFactory = OBJECT_MAPPER.getTypeFactory(); + return OBJECT_MAPPER.convertValue(json, typeFactory.constructCollectionType(List.class, clz)); + } + /** Read an array of objects of type {@code T} from json */ public static List readObjects(String json, Class clz) { if (json == null) { diff --git a/openmetadata-service/src/main/resources/json/data/eventsubscription/ActivityFeedEvents.json b/openmetadata-service/src/main/resources/json/data/eventsubscription/ActivityFeedEvents.json index 1abf827e266..df421c0be80 100644 --- a/openmetadata-service/src/main/resources/json/data/eventsubscription/ActivityFeedEvents.json +++ b/openmetadata-service/src/main/resources/json/data/eventsubscription/ActivityFeedEvents.json @@ -10,7 +10,7 @@ { "name": "matchAnySource", "effect": "include", - "condition": "matchAnySource({'chart','container','dashboard','dashboardDataModel','database','databaseSchema','glossary','glossaryTerm','mlmodel','pipeline','query','report','searchIndex','storedProcedure','table','topic','testSuite','testCase'})" + "condition": "matchAnySource({'chart','container','dashboard','dashboardDataModel','database','databaseSchema','glossary','glossaryTerm','mlmodel','pipeline','query','report','searchIndex','storedProcedure','table','topic','testSuite','testCase', 'domain', 'dataProduct'})" }, { "name": "matchUpdatedBy", @@ -19,8 +19,8 @@ }, { "name": "matchAnyFieldChange", - "effect": "exclude", - "condition": "matchAnyFieldChange({'usageSummary', 'sourceHash', 'lifeCycle'})" + "effect": "include", + "condition": "matchAnyFieldChange({'description', 'domain', 'owner', 'tags', 'followers', 'extension','parameterValues', 'assets'})" } ] }, @@ -57,7 +57,9 @@ "table", "topic", "testSuite", - "testCase" + "testCase", + "domain", + "dataProduct" ] } ] @@ -76,14 +78,18 @@ }, { "name": "filterByFieldChange", - "effect" : "exclude", + "effect" : "include", "arguments": [ { "name": "fieldChangeList", "input": [ - "usageSummary", - "sourceHash", - "lifeCycle" + "description", + "domain", + "owner", + "tags", + "followers", + "extension", + "parameterValues" ] } ] @@ -91,5 +97,6 @@ ] }, "provider" : "system", + "pollInterval" : 30, "enabled" : true } \ No newline at end of file diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/ChangeEventParserResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/ChangeEventParserResourceTest.java index 4fe65214339..89957792d2d 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/ChangeEventParserResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/ChangeEventParserResourceTest.java @@ -15,15 +15,14 @@ package org.openmetadata.service.resources; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.openmetadata.service.Entity.FIELD_OWNER; -import static org.openmetadata.service.formatter.util.FormatterUtil.getFormattedMessages; import static org.openmetadata.service.util.EntityUtil.fieldAdded; import static org.openmetadata.service.util.EntityUtil.fieldDeleted; import static org.openmetadata.service.util.EntityUtil.fieldUpdated; +import static org.openmetadata.service.util.FeedUtils.getThreadWithMessage; import java.io.IOException; import java.net.URISyntaxException; import java.util.List; -import java.util.Map; import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeAll; @@ -34,18 +33,21 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.TestMethodOrder; import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.ChangeDescription; +import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.EventType; import org.openmetadata.schema.type.FieldChange; import org.openmetadata.schema.type.TagLabel; import org.openmetadata.schema.type.TagLabel.LabelType; import org.openmetadata.schema.type.TagLabel.State; +import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationTest; import org.openmetadata.service.formatter.decorators.FeedMessageDecorator; import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.formatter.decorators.SlackMessageDecorator; import org.openmetadata.service.resources.databases.TableResourceTest; -import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; import org.openmetadata.service.util.JsonUtils; @Slf4j @@ -66,20 +68,27 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { @Test void testFormattedMessages() { + TagLabel oldTag1 = new TagLabel(); + oldTag1.withTagFQN("tag1").withLabelType(LabelType.DERIVED).withState(State.CONFIRMED); + + TagLabel oldTag2 = new TagLabel(); + oldTag2.withTagFQN("tag2").withLabelType(LabelType.DERIVED).withState(State.CONFIRMED); + ChangeDescription changeDescription = new ChangeDescription(); // Simulate updating tags of an entity from tag1 -> tag2 FieldChange addTag = new FieldChange(); - addTag.withName("tags").withNewValue("tag2"); + addTag.withName("tags").withNewValue(JsonUtils.pojoToJson(List.of(oldTag2))); FieldChange deleteTag = new FieldChange(); - deleteTag.withName("tags").withOldValue("tag1"); + deleteTag.withName("tags").withOldValue(JsonUtils.pojoToJson(List.of(oldTag1))); changeDescription .withFieldsAdded(List.of(addTag)) .withFieldsDeleted(List.of(deleteTag)) .withPreviousVersion(1.0); - Map messages = - getFormattedMessages(feedMessageFormatter, changeDescription, TABLE); - assertEquals(1, messages.size()); + ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1); + + List threadWithMessages = getThreadWithMessage(feedMessageFormatter, changeEvent); + assertEquals(1, threadWithMessages.size()); TagLabel tag1 = new TagLabel(); tag1.withTagFQN("tag1").withLabelType(LabelType.DERIVED).withState(State.CONFIRMED); @@ -90,12 +99,30 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { addTag.withNewValue(JsonUtils.pojoToJson(List.of(tag2))); deleteTag.withOldValue(JsonUtils.pojoToJson(List.of(tag1))); - Map jsonMessages = - getFormattedMessages(feedMessageFormatter, changeDescription, TABLE); - assertEquals(1, jsonMessages.size()); + List updatedThreadWithMessages = + getThreadWithMessage(feedMessageFormatter, changeEvent); + assertEquals(1, updatedThreadWithMessages.size()); // The entity links and values of both the messages should be the same - assertEquals(messages.values().iterator().next(), jsonMessages.values().iterator().next()); + assertEquals( + threadWithMessages.get(0).getMessage(), updatedThreadWithMessages.get(0).getMessage()); + } + + private ChangeEvent getChangeEvent( + EventType eventType, + ChangeDescription changeDescription, + Double previousVersion, + Double newVersion) { + return new ChangeEvent() + .withId(UUID.randomUUID()) + .withEventType(eventType) + .withEntityId(TABLE.getId()) + .withEntityType(Entity.TABLE) + .withEntityFullyQualifiedName(TABLE.getFullyQualifiedName()) + .withChangeDescription(changeDescription) + .withPreviousVersion(previousVersion) + .withCurrentVersion(newVersion) + .withEntity(TABLE); } @Test @@ -106,13 +133,14 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { entityReference.withId(UUID.randomUUID()).withName("user1").withDisplayName("User One"); fieldAdded(changeDescription, FIELD_OWNER, JsonUtils.pojoToJson(entityReference)); - Map messages = - getFormattedMessages(feedMessageFormatter, changeDescription, TABLE); - assertEquals(1, messages.size()); + ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1); + + List threadWithMessages = getThreadWithMessage(feedMessageFormatter, changeEvent); + assertEquals(1, threadWithMessages.size()); assertEquals( "Added **owner**: User One", - messages.values().iterator().next()); + threadWithMessages.get(0).getMessage()); } @Test @@ -121,14 +149,14 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { ChangeDescription changeDescription = new ChangeDescription(); fieldUpdated(changeDescription, "description", "old description", "new description"); - Map messages = - getFormattedMessages(feedMessageFormatter, changeDescription, TABLE); - assertEquals(1, messages.size()); + ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1); + List threadMessages = getThreadWithMessage(feedMessageFormatter, changeEvent); + assertEquals(1, threadMessages.size()); assertEquals( "Updated **description**: old " + "new description", - messages.values().iterator().next()); + threadMessages.get(0).getMessage()); // test if it updates correctly with one add and one delete change changeDescription = new ChangeDescription().withPreviousVersion(1.0); @@ -136,12 +164,12 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { fieldDeleted(changeDescription, "description", "old description"); // now test if both the type of updates give the same message - Map updatedMessages = - getFormattedMessages(feedMessageFormatter, changeDescription, TABLE); - assertEquals(1, updatedMessages.size()); + changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1); + List threadUpdatedMessages = getThreadWithMessage(feedMessageFormatter, changeEvent); + assertEquals(1, threadUpdatedMessages.size()); - assertEquals(messages.keySet().iterator().next(), updatedMessages.keySet().iterator().next()); - assertEquals(messages.values().iterator().next(), updatedMessages.values().iterator().next()); + assertEquals(threadMessages.get(0).getMessage(), threadUpdatedMessages.get(0).getMessage()); + assertEquals(threadMessages.get(0).getAbout(), threadUpdatedMessages.get(0).getAbout()); } @Test @@ -150,25 +178,29 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { // Simulate a change of description in table fieldUpdated(changeDescription, "description", "old description", "new description"); - Map messages = - getFormattedMessages(slackMessageFormatter, changeDescription, TABLE); - assertEquals(1, messages.size()); + ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1); + + List threadsWithMessages = getThreadWithMessage(slackMessageFormatter, changeEvent); + assertEquals(1, threadsWithMessages.size()); assertEquals( - "Updated *description*: ~old~ *new* description", messages.values().iterator().next()); + "Updated *description*: ~old~ *new* description", threadsWithMessages.get(0).getMessage()); // test if it updates correctly with one add and one delete change changeDescription = new ChangeDescription().withPreviousVersion(1.0); fieldAdded(changeDescription, "description", "new description"); fieldDeleted(changeDescription, "description", "old description"); + changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1); + List threadsWithUpdatedMessages = + getThreadWithMessage(slackMessageFormatter, changeEvent); // now test if both the type of updates give the same message - Map updatedMessages = - getFormattedMessages(slackMessageFormatter, changeDescription, TABLE); - assertEquals(1, updatedMessages.size()); + assertEquals(1, threadsWithUpdatedMessages.size()); - assertEquals(messages.keySet().iterator().next(), updatedMessages.keySet().iterator().next()); - assertEquals(messages.values().iterator().next(), updatedMessages.values().iterator().next()); + assertEquals( + threadsWithMessages.get(0).getAbout(), threadsWithUpdatedMessages.get(0).getAbout()); + assertEquals( + threadsWithMessages.get(0).getMessage(), threadsWithUpdatedMessages.get(0).getMessage()); } @Test @@ -185,13 +217,14 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { "columns", "[{\"name\":\"lo_order\",\"displayName\":\"lo_order\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_order\",\"constraint\":\"NOT_NULL\"}]"); - Map messages = - getFormattedMessages(feedMessageFormatter, changeDescription, TABLE); - assertEquals(1, messages.size()); + ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4); + + List threadWithMessages = getThreadWithMessage(feedMessageFormatter, changeEvent); + assertEquals(1, threadWithMessages.size()); assertEquals( "Updated **columns**: lo_order priority", - messages.values().iterator().next()); + threadWithMessages.get(0).getMessage()); // Simulate a change of datatype change in column changeDescription = new ChangeDescription().withPreviousVersion(1.3); @@ -204,10 +237,11 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { "columns", "[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"tags\":[],\"constraint\":\"NOT_NULL\"}]"); - messages = getFormattedMessages(feedMessageFormatter, changeDescription, TABLE); - assertEquals(1, messages.size()); + changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4); + threadWithMessages = getThreadWithMessage(feedMessageFormatter, changeEvent); + assertEquals(1, threadWithMessages.size()); - assertEquals("Updated **columns**: lo_orderpriority", messages.values().iterator().next()); + assertEquals("Updated **columns**: lo_orderpriority", threadWithMessages.get(0).getMessage()); // Simulate multiple changes to columns changeDescription = new ChangeDescription().withPreviousVersion(1.4); @@ -220,12 +254,13 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { "columns", "[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\"}]"); - messages = getFormattedMessages(feedMessageFormatter, changeDescription, TABLE); - assertEquals(1, messages.size()); + changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4); + threadWithMessages = getThreadWithMessage(feedMessageFormatter, changeEvent); + assertEquals(1, threadWithMessages.size()); assertEquals( "Updated **columns**: lo_orderpriority , newColumn", - messages.values().iterator().next()); + threadWithMessages.get(0).getMessage()); } @Test @@ -242,11 +277,11 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { "columns", "[{\"name\":\"lo_order\",\"displayName\":\"lo_order\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_order\",\"constraint\":\"NOT_NULL\"}]"); - Map messages = - getFormattedMessages(slackMessageFormatter, changeDescription, TABLE); - assertEquals(1, messages.size()); + ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4); + List threadWithMessages = getThreadWithMessage(slackMessageFormatter, changeEvent); + assertEquals(1, threadWithMessages.size()); - assertEquals("Updated *columns*: lo_order *priority*", messages.values().iterator().next()); + assertEquals("Updated *columns*: lo_order *priority*", threadWithMessages.get(0).getMessage()); // Simulate a change of datatype change in column changeDescription = new ChangeDescription().withPreviousVersion(1.3); @@ -259,10 +294,11 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { "columns", "[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"tags\":[],\"constraint\":\"NOT_NULL\"}]"); - messages = getFormattedMessages(slackMessageFormatter, changeDescription, TABLE); - assertEquals(1, messages.size()); + changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4); + threadWithMessages = getThreadWithMessage(slackMessageFormatter, changeEvent); + assertEquals(1, threadWithMessages.size()); - assertEquals("Updated *columns*: lo_orderpriority", messages.values().iterator().next()); + assertEquals("Updated *columns*: lo_orderpriority", threadWithMessages.get(0).getMessage()); // Simulate multiple changes to columns changeDescription = new ChangeDescription().withPreviousVersion(1.4); @@ -275,10 +311,12 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest { "columns", "[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\"}]"); - messages = getFormattedMessages(slackMessageFormatter, changeDescription, TABLE); - assertEquals(1, messages.size()); + changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4); + threadWithMessages = getThreadWithMessage(slackMessageFormatter, changeEvent); + assertEquals(1, threadWithMessages.size()); assertEquals( - "Updated *columns*: lo_orderpriority *, newColumn*", messages.values().iterator().next()); + "Updated *columns*: lo_orderpriority *, newColumn*", + threadWithMessages.get(0).getMessage()); } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java index 9dd81c7ac13..6e6c414a5a7 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java @@ -2980,7 +2980,7 @@ public abstract class EntityResourceTest { @@ -1230,6 +1249,11 @@ public class FeedResourceTest extends OpenMetadataApplicationTest { @Test @Order(2) void list_threadsWithMentionsFilter() throws HttpResponseException { + EventSubscriptionResourceTest eventSubscriptionResourceTest = + new EventSubscriptionResourceTest(); + EventSubscription subscription = + eventSubscriptionResourceTest.getEntityByName("ActivityFeedAlert", ADMIN_AUTH_HEADERS); + eventSubscriptionResourceTest.waitForAllEventToComplete(subscription.getId()); // Create a thread with user mention createAndCheck( create() @@ -1254,6 +1278,11 @@ public class FeedResourceTest extends OpenMetadataApplicationTest { @Test void list_threadsWithFollowsFilter() throws HttpResponseException { + EventSubscriptionResourceTest eventSubscriptionResourceTest = + new EventSubscriptionResourceTest(); + EventSubscription subscription = + eventSubscriptionResourceTest.getEntityByName("ActivityFeedAlert", ADMIN_AUTH_HEADERS); + eventSubscriptionResourceTest.waitForAllEventToComplete(subscription.getId()); // Get the initial thread count of TABLE2 String entityLink = String.format("<#E::table::%s>", TABLE2.getFullyQualifiedName()); int initialThreadCount = @@ -1271,6 +1300,7 @@ public class FeedResourceTest extends OpenMetadataApplicationTest { // User started following this table with() .pollInterval(ONE_SECOND) + .timeout(ONE_MINUTE) .await("Threads With Follows") .until( () -> { diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/feed/assets.json b/openmetadata-spec/src/main/resources/json/schema/entity/feed/assets.json new file mode 100644 index 00000000000..e5b04fa0dba --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/feed/assets.json @@ -0,0 +1,14 @@ +{ + "$id": "https://open-metadata.org/schema/entity/feed/assets.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "AssetsFeedInfo", + "javaType": "org.openmetadata.schema.entity.feed.AssetsFeedInfo", + "description": "This schema defines the schema for Assets addition/deletion Updates.", + "type": "object", + "properties": { + "updatedAssets": { + "$ref": "../../type/entityReferenceList.json" + } + }, + "additionalProperties": false +} diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/feed/description.json b/openmetadata-spec/src/main/resources/json/schema/entity/feed/description.json new file mode 100644 index 00000000000..6ca582e07f0 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/feed/description.json @@ -0,0 +1,23 @@ +{ + "$id": "https://open-metadata.org/schema/entity/feed/description.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DescriptionFeedInfo", + "javaType": "org.openmetadata.schema.entity.feed.DescriptionFeedInfo", + "description": "This schema defines the schema for Description Updates.", + "type": "object", + "properties": { + "previousDescription": { + "type": "string", + "description": "The previous description of the entity." + }, + "newDescription": { + "description": "The new description of the entity.", + "type": "string" + }, + "diffMessage": { + "description": "The difference between the previous and new descriptions.", + "type": "string" + } + }, + "additionalProperties": false +} diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/feed/owner.json b/openmetadata-spec/src/main/resources/json/schema/entity/feed/owner.json new file mode 100644 index 00000000000..cb9ac79562d --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/feed/owner.json @@ -0,0 +1,19 @@ +{ + "$id": "https://open-metadata.org/schema/entity/feed/owner.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "OwnerFeedInfo", + "description": "This schema defines the schema for Ownership Updates for Feed.", + "javaType": "org.openmetadata.schema.entity.feed.OwnerFeedInfo", + "type": "object", + "properties": { + "previousOwner": { + "description": "List of previous tags.", + "type": "string" + }, + "updatedOwner": { + "description": "Updated Owners.", + "type": "string" + } + }, + "additionalProperties": false +} diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/feed/tag.json b/openmetadata-spec/src/main/resources/json/schema/entity/feed/tag.json new file mode 100644 index 00000000000..30b88fe1176 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/feed/tag.json @@ -0,0 +1,25 @@ +{ + "$id": "https://open-metadata.org/schema/entity/feed/tag.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "TagFeedInfo", + "description": "This schema defines the schema for Tag Updates for Feed.", + "javaType": "org.openmetadata.schema.entity.feed.TagFeedInfo", + "type": "object", + "properties": { + "previousTags": { + "description": "List of previous tags.", + "type": "array", + "items": { + "$ref": "../../type/tagLabel.json" + } + }, + "updatedTags": { + "description": "List of updated tags.", + "type": "array", + "items": { + "$ref": "../../type/tagLabel.json" + } + } + }, + "additionalProperties": false +} diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/feed/testCaseResult.json b/openmetadata-spec/src/main/resources/json/schema/entity/feed/testCaseResult.json new file mode 100644 index 00000000000..ba692abf56f --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/feed/testCaseResult.json @@ -0,0 +1,25 @@ +{ + "$id": "https://open-metadata.org/schema/entity/feed/testCaseResult.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "TestCaseResultFeedInfo", + "description": "This schema defines the schema for Test Case Result Updates for Feed.", + "javaType": "org.openmetadata.schema.entity.feed.TestCaseResultFeedInfo", + "type": "object", + "properties": { + "entityTestResultSummary": { + "description": "Summary of test case execution", + "type": "array", + "items": { + "$ref": "../../tests/testSuite.json#/definitions/testSuiteConnection/resultSummary" + } + }, + "testCaseResult": { + "description": "Test Case Result for last 7 days.", + "type": "array", + "items": { + "$ref": "../../tests/basic.json#/definitions/testCaseResult" + } + } + }, + "additionalProperties": false +} diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/feed/thread.json b/openmetadata-spec/src/main/resources/json/schema/entity/feed/thread.json index f78f6e5038c..a1bcc0a60e0 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/feed/thread.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/feed/thread.json @@ -4,7 +4,6 @@ "title": "Thread", "description": "This schema defines the Thread entity. A Thread is a collection of posts made by the users. The first post that starts a thread is **about** a data asset **from** a user. Other users can respond to this post by creating new posts in the thread. Note that bot users can also interact with a thread. A post can contains links that mention Users or other Data Assets.", "type": "object", - "definitions": { "taskType": { "javaType": "org.openmetadata.schema.type.TaskType", @@ -212,6 +211,63 @@ "description": "Entity Id of the entity in `about` that the thread belongs to.", "$ref": "../../type/basic.json#/definitions/uuid" }, + "entityType": { + "description": "Entity Type the thread is about.", + "type": "string" + }, + "entityUrlLink": { + "description": "Link to the entity in `about` that the thread belongs to.", + "type": "string" + }, + "generatedBy": { + "description": "User or team that generated the thread.", + "type": "string", + "enum": ["user", "system"], + "default": "user" + }, + "cardStyle": { + "description": "Card style for the thread.", + "type": "string", + "enum": ["default", "description", "tags", "owner", "testCaseResult", "customProperties"], + "default": "default" + }, + "fieldOperation": { + "description": "Operation on thread, whether the field was added, or updated or deleted.", + "type": "string", + "enum": ["added", "updated", "deleted"], + "default": "updated" + }, + "feedInfo": { + "description": "Entity Id of the entity in `about` that the thread belongs to.", + "type": "object", + "properties": { + "headerMessage": { + "description": "Header message for the feed.", + "type": "string" + }, + "fieldName": { + "description": "Field Name message for the feed.", + "type": "string" + }, + "entitySpecificInfo": { + "oneOf": [ + { + "$ref": "./description.json" + }, + { + "$ref": "./owner.json" + }, + { + "$ref": "./testCaseResult.json" + }, + { + "$ref": "./tag.json" + } + ] + } + }, + "additionalProperties": false + }, "addressedTo": { "description": "User or team this thread is addressed to in format <#E::{entities}::{entityName}::{field}::{fieldValue}.", "$ref": "../../type/basic.json#/definitions/entityLink"