Activity Feed Improvements (#15840)

* - Created Specific Schemas for different change Events
- Added Card Style for different Events
- Added Feed Info for runtime card creation and graph drawing in case test Case Result updates

* Review comment

* Test fix

* Revert unrequired changes

* Reverted changes

* Fixed Tests

* fix test

* Add Domain, Data Producs Asset addition/removal Uppdates

* Probale test case failure fix

* Followers Fields
This commit is contained in:
Mohit Yadav 2024-04-22 18:42:40 +05:30 committed by GitHub
parent c2193a963f
commit 27e9012a56
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
41 changed files with 1100 additions and 547 deletions

View File

@ -174,6 +174,8 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
// Configure the Fernet instance
Fernet.getInstance().setFernetKey(catalogConfig);
initializeWebsockets(catalogConfig, environment);
// init Secret Manager
SecretsManagerFactory.createSecretsManager(
catalogConfig.getSecretsManagerConfiguration(), catalogConfig.getClusterName());
@ -233,7 +235,6 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
registerMicrometerFilter(environment, catalogConfig.getEventMonitorConfiguration());
initializeWebsockets(catalogConfig, environment);
registerSamlServlets(catalogConfig, environment);
// Asset Servlet Registration

View File

@ -5,6 +5,7 @@ import static org.openmetadata.schema.api.events.CreateEventSubscription.AlertTy
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.apps.bundles.changeEvent.email.EmailPublisher;
import org.openmetadata.service.apps.bundles.changeEvent.feed.ActivityFeedPublisher;
import org.openmetadata.service.apps.bundles.changeEvent.gchat.GChatPublisher;
import org.openmetadata.service.apps.bundles.changeEvent.generic.GenericPublisher;
import org.openmetadata.service.apps.bundles.changeEvent.msteams.MSTeamsPublisher;
@ -18,8 +19,7 @@ public class AlertFactory {
case G_CHAT -> new GChatPublisher(config);
case WEBHOOK -> new GenericPublisher(config);
case EMAIL -> new EmailPublisher(config);
case ACTIVITY_FEED -> throw new IllegalArgumentException(
"Cannot create Activity Feed as Publisher.");
case ACTIVITY_FEED -> new ActivityFeedPublisher(config);
};
}
}

View File

@ -0,0 +1,106 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.apps.bundles.changeEvent.feed;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.schema.entity.events.SubscriptionDestination.SubscriptionType.ACTIVITY_FEED;
import static org.openmetadata.schema.type.EventType.ENTITY_DELETED;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import java.util.List;
import java.util.UUID;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.changeEvent.Destination;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.formatter.decorators.FeedMessageDecorator;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.FeedRepository;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.FeedUtils;
import org.openmetadata.service.util.JsonUtils;
@Slf4j
public class ActivityFeedPublisher implements Destination<ChangeEvent> {
private final FeedMessageDecorator feedMessageFormatter = new FeedMessageDecorator();
FeedRepository feedRepository = new FeedRepository();
@Getter private final SubscriptionDestination subscriptionDestination;
public ActivityFeedPublisher(SubscriptionDestination subscription) {
if (subscription.getType() == ACTIVITY_FEED) {
this.subscriptionDestination = subscription;
} else {
throw new IllegalArgumentException("Activity Alert Invoked with Illegal Type and Settings.");
}
}
@Override
public void sendMessage(ChangeEvent changeEvent) throws EventPublisherException {
try {
// Thread are created in FeedRepository Directly
if (!changeEvent.getEntityType().equals(Entity.THREAD)) {
for (Thread thread :
listOrEmpty(FeedUtils.getThreadWithMessage(feedMessageFormatter, changeEvent))) {
// Don't create a thread if there is no message
if (thread.getMessage() != null && !thread.getMessage().isEmpty()) {
feedRepository.create(thread, changeEvent);
WebSocketManager.getInstance()
.broadCastMessageToAll(
WebSocketManager.FEED_BROADCAST_CHANNEL, JsonUtils.pojoToJson(thread));
// Delete all conversations related to the entity
if (changeEvent.getEventType().equals(ENTITY_DELETED)) {
deleteAllConversationsRelatedToEntity(
getEntity(changeEvent), Entity.getCollectionDAO());
}
}
}
}
} catch (Exception ex) {
String message =
CatalogExceptionMessage.eventPublisherFailedToPublish(
ACTIVITY_FEED, changeEvent, ex.getMessage());
LOG.error(message);
throw new EventPublisherException(
message, Pair.of(subscriptionDestination.getId(), changeEvent));
}
}
private void deleteAllConversationsRelatedToEntity(
EntityInterface entityInterface, CollectionDAO collectionDAO) {
String entityId = entityInterface.getId().toString();
List<String> threadIds = collectionDAO.feedDAO().findByEntityId(entityId);
for (String threadId : threadIds) {
UUID id = UUID.fromString(threadId);
collectionDAO.relationshipDAO().deleteAll(id, Entity.THREAD);
collectionDAO.feedDAO().delete(id);
}
}
@Override
public boolean getEnabled() {
return subscriptionDestination.getEnabled();
}
public void close() {
LOG.info("Closing Activity Feed Publisher");
}
}

View File

@ -13,58 +13,45 @@
package org.openmetadata.service.events;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.schema.type.EventType.ENTITY_DELETED;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import static org.openmetadata.service.formatter.util.FormatterUtil.getChangeEventFromResponseContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.SecurityContext;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.formatter.decorators.FeedMessageDecorator;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.FeedRepository;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.FeedUtils;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.WebsocketNotificationHandler;
@Slf4j
public class ChangeEventHandler implements EventHandler {
private ObjectMapper mapper;
private FeedMessageDecorator feedMessageDecorator = new FeedMessageDecorator();
private final FeedRepository feedRepository = new FeedRepository();
private final WebsocketNotificationHandler websocketNotificationHandler =
new WebsocketNotificationHandler();
public void init(OpenMetadataApplicationConfig config) {
this.mapper = new ObjectMapper();
}
public void init(OpenMetadataApplicationConfig config) {}
@SneakyThrows
public Void process(
ContainerRequestContext requestContext, ContainerResponseContext responseContext) {
// GET operations don't produce change events , Response has no entity to produce change event
// from
if (requestContext.getMethod().equals("GET") || responseContext.getEntity() == null) {
return null;
}
// Send to Notification Handler
websocketNotificationHandler.processNotifications(responseContext);
String method = requestContext.getMethod();
// Send to Change Event Table
SecurityContext securityContext = requestContext.getSecurityContext();
String loggedInUserName = securityContext.getUserPrincipal().getName();
try {
CollectionDAO collectionDAO = Entity.getCollectionDAO();
CollectionDAO.ChangeEventDAO changeEventDAO = collectionDAO.changeEventDAO();
Optional<ChangeEvent> optionalChangeEvent =
getChangeEventFromResponseContext(responseContext, loggedInUserName, method);
getChangeEventFromResponseContext(responseContext, loggedInUserName);
if (optionalChangeEvent.isPresent()) {
ChangeEvent changeEvent = optionalChangeEvent.get();
if (changeEvent.getEntityType().equals(Entity.QUERY)) {
@ -84,32 +71,14 @@ public class ChangeEventHandler implements EventHandler {
changeEvent.setEntity(JsonUtils.pojoToMaskedJson(entity));
}
changeEventDAO.insert(JsonUtils.pojoToJson(changeEvent));
// Add a new thread (change event itself should not be for the thread) to the entity for
// every change event
// for the event to appear in activity feeds
if (!changeEvent.getEntityType().equals(Entity.THREAD)
&& (AlertUtil.shouldProcessActivityFeedRequest(changeEvent))) {
for (Thread thread :
listOrEmpty(
FeedUtils.getThreadWithMessage(
feedMessageDecorator, changeEvent, loggedInUserName))) {
// Don't create a thread if there is no message
if (thread.getMessage() != null && !thread.getMessage().isEmpty()) {
feedRepository.create(thread, changeEvent);
String jsonThread = mapper.writeValueAsString(thread);
WebSocketManager.getInstance()
.broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread);
if (changeEvent.getEventType().equals(ENTITY_DELETED)) {
deleteAllConversationsRelatedToEntity(getEntity(changeEvent), collectionDAO);
}
}
}
}
// Thread are created in FeedRepository Directly
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
}
} catch (Exception e) {
LOG.error("Failed to capture the change event for method {} due to ", method, e);
LOG.error(
"Failed to capture the change event for method {} due to ",
requestContext.getMethod(),
e);
}
return null;
}
@ -126,17 +95,6 @@ public class ChangeEventHandler implements EventHandler {
.withCurrentVersion(changeEvent.getCurrentVersion());
}
private void deleteAllConversationsRelatedToEntity(
EntityInterface entityInterface, CollectionDAO collectionDAO) {
String entityId = entityInterface.getId().toString();
List<String> threadIds = collectionDAO.feedDAO().findByEntityId(entityId);
for (String threadId : threadIds) {
UUID id = UUID.fromString(threadId);
collectionDAO.relationshipDAO().deleteAll(id, Entity.THREAD);
collectionDAO.feedDAO().delete(id);
}
}
public void close() {
/* Nothing to do */
}

View File

@ -23,7 +23,6 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.events.CreateEventSubscription;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.EventSubscriptionOffset;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
@ -47,7 +46,6 @@ import org.quartz.impl.StdSchedulerFactory;
public class EventSubscriptionScheduler {
public static final String ALERT_JOB_GROUP = "OMAlertJobGroup";
public static final String ALERT_TRIGGER_GROUP = "OMAlertJobGroup";
private static final String INVALID_ALERT = "Invalid Alert Type";
private static EventSubscriptionScheduler instance;
private static volatile boolean initialized = false;
private final Scheduler alertsScheduler = new StdSchedulerFactory().getScheduler();
@ -76,10 +74,6 @@ public class EventSubscriptionScheduler {
@Transaction
public void addSubscriptionPublisher(EventSubscription eventSubscription)
throws SchedulerException {
if (eventSubscription.getAlertType().equals(CreateEventSubscription.AlertType.ACTIVITY_FEED)) {
throw new IllegalArgumentException("Activity Feed is not a valid Alert Type");
}
AlertPublisher alertPublisher = new AlertPublisher();
if (Boolean.FALSE.equals(
eventSubscription.getEnabled())) { // Only add webhook that is enabled for publishing events
@ -147,11 +141,7 @@ public class EventSubscriptionScheduler {
public void updateEventSubscription(EventSubscription eventSubscription) {
// Remove Existing Subscription Publisher
deleteEventSubscriptionPublisher(eventSubscription);
// TODO: fix this make AlertActivityFeedPublisher
if (Boolean.TRUE.equals(eventSubscription.getEnabled())
&& (!eventSubscription
.getAlertType()
.equals(CreateEventSubscription.AlertType.ACTIVITY_FEED))) {
if (Boolean.TRUE.equals(eventSubscription.getEnabled())) {
addSubscriptionPublisher(eventSubscription);
}
}

View File

@ -1,47 +0,0 @@
package org.openmetadata.service.events.subscription;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
@Slf4j
public class ActivityFeedAlertCache {
protected static final LoadingCache<String, EventSubscription> EVENT_SUB_CACHE =
CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(3, TimeUnit.MINUTES)
.build(new ActivityFeedAlertLoader());
private static final String ACTIVITY_FEED_ALERT = "ActivityFeedAlert";
private ActivityFeedAlertCache() {
// Private constructor for static class
}
public static EventSubscription getActivityFeedAlert() {
try {
return EVENT_SUB_CACHE.get(ACTIVITY_FEED_ALERT);
} catch (ExecutionException | UncheckedExecutionException ex) {
throw new EntityNotFoundException(ex.getMessage());
}
}
static class ActivityFeedAlertLoader extends CacheLoader<String, EventSubscription> {
@Override
public @NonNull EventSubscription load(@CheckForNull String alertName) {
EventSubscription alert =
Entity.getEntityByName(Entity.EVENT_SUBSCRIPTION, alertName, "*", Include.NON_DELETED);
LOG.debug("Loaded Alert {}", alert);
return alert;
}
}
}

View File

@ -137,14 +137,6 @@ public final class AlertUtil {
return config.getResources().contains(event.getEntityType()); // Use Trigger Specific Settings
}
public static boolean shouldProcessActivityFeedRequest(ChangeEvent event) {
// Check Trigger Conditions
FilteringRules filteringRules =
ActivityFeedAlertCache.getActivityFeedAlert().getFilteringRules();
return AlertUtil.shouldTriggerAlert(event, filteringRules)
&& AlertUtil.evaluateAlertConditions(event, filteringRules.getRules());
}
public static SubscriptionStatus buildSubscriptionStatus(
SubscriptionStatus.Status status,
Long lastSuccessful,

View File

@ -208,7 +208,7 @@ public interface MessageDecorator<T> {
}
message.setHeader(headerText);
}
List<Thread> thread = FeedUtils.getThreadWithMessage(this, event, "admin");
List<Thread> thread = FeedUtils.getThreadWithMessage(this, event);
List<String> messages = new ArrayList<>();
thread.forEach(entry -> messages.add(entry.getMessage()));
message.setMessages(messages);

View File

@ -15,7 +15,7 @@ package org.openmetadata.service.formatter.entity;
import static org.openmetadata.service.formatter.util.FormatterUtil.transformMessage;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.util.FormatterUtil;
@ -24,9 +24,9 @@ public class DefaultEntityFormatter implements EntityFormatter {
@Override
public String format(
MessageDecorator<?> messageFormatter,
Thread thread,
FieldChange fieldChange,
EntityInterface entity,
FormatterUtil.CHANGE_TYPE changeType) {
return transformMessage(messageFormatter, fieldChange, entity, changeType);
return transformMessage(messageFormatter, thread, fieldChange, changeType);
}
}

View File

@ -13,7 +13,7 @@
package org.openmetadata.service.formatter.entity;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.util.FormatterUtil;
@ -21,7 +21,7 @@ import org.openmetadata.service.formatter.util.FormatterUtil;
public interface EntityFormatter {
String format(
MessageDecorator<?> messageDecorator,
Thread thread,
FieldChange fieldChange,
EntityInterface entity,
FormatterUtil.CHANGE_TYPE changeType);
}

View File

@ -18,8 +18,11 @@ import static org.openmetadata.service.formatter.util.FormatterUtil.transformMes
import java.text.SimpleDateFormat;
import java.util.Date;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.util.FormatterUtil;
import org.openmetadata.service.util.JsonUtils;
@ -30,17 +33,19 @@ public class IngestionPipelineFormatter implements EntityFormatter {
@Override
public String format(
MessageDecorator<?> messageFormatter,
Thread thread,
FieldChange fieldChange,
EntityInterface entity,
FormatterUtil.CHANGE_TYPE changeType) {
if (PIPELINE_STATUS_FIELD.equals(fieldChange.getName())) {
return transformIngestionPipelineStatus(messageFormatter, fieldChange, entity);
return transformIngestionPipelineStatus(messageFormatter, thread, fieldChange);
}
return transformMessage(messageFormatter, fieldChange, entity, changeType);
return transformMessage(messageFormatter, thread, fieldChange, changeType);
}
private String transformIngestionPipelineStatus(
MessageDecorator<?> messageFormatter, FieldChange fieldChange, EntityInterface entity) {
MessageDecorator<?> messageFormatter, Thread thread, FieldChange fieldChange) {
EntityInterface entity =
Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id", Include.ALL);
String ingestionPipelineName = entity.getName();
PipelineStatus status =
JsonUtils.readOrConvertValue(fieldChange.getNewValue(), PipelineStatus.class);

View File

@ -18,7 +18,10 @@ import static org.openmetadata.service.formatter.util.FormatterUtil.transformMes
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.dataInsight.type.KpiResult;
import org.openmetadata.schema.dataInsight.type.KpiTarget;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.util.FormatterUtil;
@ -28,17 +31,19 @@ public class KpiFormatter implements EntityFormatter {
@Override
public String format(
MessageDecorator<?> messageFormatter,
Thread thread,
FieldChange fieldChange,
EntityInterface entity,
FormatterUtil.CHANGE_TYPE changeType) {
if (KPI_RESULT_FIELD.equals(fieldChange.getName())) {
return transformKpiResult(messageFormatter, fieldChange, entity);
return transformKpiResult(messageFormatter, thread, fieldChange);
}
return transformMessage(messageFormatter, fieldChange, entity, changeType);
return transformMessage(messageFormatter, thread, fieldChange, changeType);
}
private String transformKpiResult(
MessageDecorator<?> messageFormatter, FieldChange fieldChange, EntityInterface entity) {
MessageDecorator<?> messageFormatter, Thread thread, FieldChange fieldChange) {
EntityInterface entity =
Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id", Include.ALL);
String kpiName = entity.getName();
KpiResult result = (KpiResult) fieldChange.getNewValue();
if (result != null) {

View File

@ -19,7 +19,10 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.PipelineStatus;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.util.FormatterUtil;
import org.openmetadata.service.util.JsonUtils;
@ -30,17 +33,19 @@ public class PipelineFormatter implements EntityFormatter {
@Override
public String format(
MessageDecorator<?> messageFormatter,
Thread thread,
FieldChange fieldChange,
EntityInterface entity,
FormatterUtil.CHANGE_TYPE changeType) {
if (PIPELINE_STATUS_FIELD.equals(fieldChange.getName())) {
return transformPipelineStatus(messageFormatter, fieldChange, entity);
return transformPipelineStatus(messageFormatter, thread, fieldChange);
}
return transformMessage(messageFormatter, fieldChange, entity, changeType);
return transformMessage(messageFormatter, thread, fieldChange, changeType);
}
private String transformPipelineStatus(
MessageDecorator<?> messageFormatter, FieldChange fieldChange, EntityInterface entity) {
MessageDecorator<?> messageFormatter, Thread thread, FieldChange fieldChange) {
EntityInterface entity =
Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id", Include.ALL);
String pipelineName = entity.getName();
PipelineStatus status =
JsonUtils.readOrConvertValue(fieldChange.getNewValue(), PipelineStatus.class);

View File

@ -17,10 +17,12 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.formatter.util.FormatterUtil.transformMessage;
import java.util.List;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.Query;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.util.FormatterUtil;
import org.openmetadata.service.util.JsonUtils;
@ -31,19 +33,33 @@ public class QueryFormatter implements EntityFormatter {
@Override
public String format(
MessageDecorator<?> messageFormatter,
Thread thread,
FieldChange fieldChange,
EntityInterface entity,
FormatterUtil.CHANGE_TYPE changeType) {
if (QUERY_USED_IN_FIELD.equals(fieldChange.getName())) {
return transformQueryUsedIn(messageFormatter, fieldChange, entity, changeType);
return transformQueryUsedIn(messageFormatter, thread, fieldChange, changeType);
}
return transformMessage(messageFormatter, fieldChange, entity, changeType);
return transformMessage(messageFormatter, thread, fieldChange, changeType);
}
private String transformQueryUsedIn(
MessageDecorator<?> messageFormatter,
Thread thread,
FieldChange fieldChange,
FormatterUtil.CHANGE_TYPE changeType) {
String newVal = getFieldValue(fieldChange.getNewValue(), messageFormatter, thread);
String oldVal = getFieldValue(fieldChange.getOldValue(), messageFormatter, thread);
return transformMessage(
messageFormatter,
thread,
new FieldChange().withNewValue(newVal).withOldValue(oldVal).withName(QUERY_USED_IN_FIELD),
changeType);
}
@SuppressWarnings("unchecked")
private static String getFieldValue(
Object fieldValue, EntityInterface entity, MessageDecorator<?> messageFormatter) {
Query query = (Query) entity;
Object fieldValue, MessageDecorator<?> messageFormatter, Thread thread) {
Query query = Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id", Include.ALL);
StringBuilder field = new StringBuilder();
List<EntityReference> tableRefs =
fieldValue instanceof String
@ -67,18 +83,4 @@ public class QueryFormatter implements EntityFormatter {
}
return field.toString();
}
private String transformQueryUsedIn(
MessageDecorator<?> messageFormatter,
FieldChange fieldChange,
EntityInterface entity,
FormatterUtil.CHANGE_TYPE changeType) {
String newVal = getFieldValue(fieldChange.getNewValue(), entity, messageFormatter);
String oldVal = getFieldValue(fieldChange.getOldValue(), entity, messageFormatter);
return transformMessage(
messageFormatter,
new FieldChange().withNewValue(newVal).withOldValue(oldVal).withName(QUERY_USED_IN_FIELD),
entity,
changeType);
}
}

View File

@ -1,86 +0,0 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.formatter.entity;
import static org.openmetadata.service.formatter.util.FormatterUtil.transformMessage;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.type.TestCaseResult;
import org.openmetadata.schema.tests.type.TestCaseStatus;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.formatter.decorators.FeedMessageDecorator;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.util.FormatterUtil;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.util.JsonUtils;
public class TestCaseFormatter implements EntityFormatter {
private static final String TEST_RESULT_FIELD = "testCaseResult";
@Override
public String format(
MessageDecorator<?> messageFormatter,
FieldChange fieldChange,
EntityInterface entity,
FormatterUtil.CHANGE_TYPE changeType) {
if (TEST_RESULT_FIELD.equals(fieldChange.getName())) {
return transformTestCaseResult(messageFormatter, fieldChange, entity);
}
return transformMessage(messageFormatter, fieldChange, entity, changeType);
}
private String transformTestCaseResult(
MessageDecorator<?> messageFormatter, FieldChange fieldChange, EntityInterface entity) {
String testCaseName = entity.getName();
TestCaseResult result = JsonUtils.convertValue(fieldChange.getNewValue(), TestCaseResult.class);
TestCase testCaseEntity = (TestCase) entity;
if (result != null) {
String format =
String.format(
"Test Case %s is %s in %s",
messageFormatter.getBold(),
messageFormatter.getBold(),
MessageParser.EntityLink.parse(testCaseEntity.getEntityLink()).getEntityFQN());
return String.format(
format, testCaseName, getStatusMessage(messageFormatter, result.getTestCaseStatus()));
}
String format =
String.format(
"Test Case %s is updated in %s",
messageFormatter.getBold(), messageFormatter.getBold());
return String.format(
format,
testCaseName,
MessageParser.EntityLink.parse(testCaseEntity.getEntityLink()).getEntityFQN());
}
private String getStatusMessage(MessageDecorator<?> messageDecorator, TestCaseStatus status) {
if (messageDecorator instanceof FeedMessageDecorator) {
return switch (status) {
case Success -> "<span style=\"color:#48CA9E\">Passed</span>";
case Failed -> "<span style=\"color:#F24822\">Failed</span>";
case Aborted -> "<span style=\"color:#FFBE0E\">Aborted</span>";
case Queued -> "<span style=\"color:#959595\">Queued</span>";
};
} else {
return switch (status) {
case Success -> "Passed";
case Failed -> "Failed";
case Aborted -> "Aborted";
case Queued -> "Queued";
};
}
}
}

View File

@ -13,6 +13,11 @@
package org.openmetadata.service.formatter.factory;
import static org.openmetadata.service.Entity.FIELD_ASSETS;
import static org.openmetadata.service.formatter.field.TestCaseResultFormatter.TEST_RESULT_FIELD;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.Entity;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.entity.DefaultEntityFormatter;
@ -21,20 +26,24 @@ import org.openmetadata.service.formatter.entity.IngestionPipelineFormatter;
import org.openmetadata.service.formatter.entity.KpiFormatter;
import org.openmetadata.service.formatter.entity.PipelineFormatter;
import org.openmetadata.service.formatter.entity.QueryFormatter;
import org.openmetadata.service.formatter.entity.TestCaseFormatter;
import org.openmetadata.service.formatter.field.AssetsFieldFormatter;
import org.openmetadata.service.formatter.field.DefaultFieldFormatter;
import org.openmetadata.service.formatter.field.DescriptionFormatter;
import org.openmetadata.service.formatter.field.FollowersFormatter;
import org.openmetadata.service.formatter.field.OwnerFormatter;
import org.openmetadata.service.formatter.field.TagFormatter;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.formatter.field.TestCaseResultFormatter;
public final class ParserFactory {
private ParserFactory() {}
public static EntityFormatter getEntityParser(String entityType) {
// Handle Thread entity separately
if (entityType.equals(Entity.THREAD)) {
throw new IllegalArgumentException("Thread entity cannot be handled by Entity Parser.");
}
return switch (entityType) {
case Entity.QUERY -> new QueryFormatter();
case Entity.TEST_CASE -> new TestCaseFormatter();
case Entity.KPI -> new KpiFormatter();
case Entity.INGESTION_PIPELINE -> new IngestionPipelineFormatter();
case Entity.PIPELINE -> new PipelineFormatter();
@ -44,19 +53,17 @@ public final class ParserFactory {
public static DefaultFieldFormatter getFieldParserObject(
MessageDecorator<?> decorator,
String fieldOldValue,
String fieldNewValue,
String fieldChangeName,
MessageParser.EntityLink entityLink) {
Thread thread,
FieldChange fieldChange,
String fieldChangeName) {
return switch (fieldChangeName) {
case Entity.FIELD_TAGS -> new TagFormatter(
decorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink);
case Entity.FIELD_FOLLOWERS -> new FollowersFormatter(
decorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink);
case Entity.FIELD_OWNER -> new OwnerFormatter(
decorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink);
default -> new DefaultFieldFormatter(
decorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink);
case Entity.FIELD_TAGS -> new TagFormatter(decorator, thread, fieldChange);
case Entity.FIELD_FOLLOWERS -> new FollowersFormatter(decorator, thread, fieldChange);
case Entity.FIELD_OWNER -> new OwnerFormatter(decorator, thread, fieldChange);
case Entity.FIELD_DESCRIPTION -> new DescriptionFormatter(decorator, thread, fieldChange);
case TEST_RESULT_FIELD -> new TestCaseResultFormatter(decorator, thread, fieldChange);
case FIELD_ASSETS -> new AssetsFieldFormatter(decorator, thread, fieldChange);
default -> new DefaultFieldFormatter(decorator, thread, fieldChange);
};
}
}

View File

@ -0,0 +1,59 @@
package org.openmetadata.service.formatter.field;
import static org.openmetadata.service.Entity.FIELD_ASSETS;
import org.openmetadata.schema.entity.feed.AssetsFeedInfo;
import org.openmetadata.schema.entity.feed.FeedInfo;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.util.JsonUtils;
public class AssetsFieldFormatter extends DefaultFieldFormatter {
private static final String HEADER_MESSAGE = "%s %s the assets in %s %s";
public AssetsFieldFormatter(
MessageDecorator<?> messageDecorator, Thread thread, FieldChange fieldChange) {
super(messageDecorator, thread, fieldChange);
}
@Override
public String formatAddedField() {
String message = getHeaderForAssetsUpdate(Thread.FieldOperation.ADDED.value());
// Update the thread with the required information
populateAssetsFeedInfo(Thread.FieldOperation.ADDED, message);
return message;
}
@Override
public String formatDeletedField() {
String message = getHeaderForAssetsUpdate(Thread.FieldOperation.DELETED.value());
// Update the thread with the required information
populateAssetsFeedInfo(Thread.FieldOperation.DELETED, message);
return message;
}
private void populateAssetsFeedInfo(Thread.FieldOperation operation, String threadMessage) {
AssetsFeedInfo assetsFeedInfo =
new AssetsFeedInfo()
.withUpdatedAssets(
JsonUtils.readOrConvertValues(fieldChange.getNewValue(), EntityReference.class));
FeedInfo feedInfo =
new FeedInfo()
.withHeaderMessage(threadMessage)
.withFieldName(FIELD_ASSETS)
.withEntitySpecificInfo(assetsFeedInfo);
populateThreadFeedInfo(
thread, threadMessage, Thread.CardStyle.DESCRIPTION, operation, feedInfo);
}
private String getHeaderForAssetsUpdate(String opMessage) {
return String.format(
HEADER_MESSAGE,
thread.getUpdatedBy(),
opMessage,
thread.getEntityType(),
thread.getEntityUrlLink());
}
}

View File

@ -13,28 +13,44 @@
package org.openmetadata.service.formatter.field;
import static java.lang.String.format;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME;
import static org.openmetadata.service.Entity.FIELD_NAME;
import static org.openmetadata.service.formatter.util.FormatterUtil.getEntityLinkForFieldName;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonValue;
import javax.json.stream.JsonParsingException;
import org.apache.commons.lang.StringUtils;
import org.openmetadata.schema.entity.feed.FeedInfo;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.util.FormatterUtil;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.util.JsonUtils;
public class DefaultFieldFormatter implements FieldFormatter {
private final String fieldChangeName;
private final String fieldOldValue;
private final String fieldNewValue;
private final MessageParser.EntityLink entityLink;
private final MessageDecorator<?> messageDecorator;
protected final Thread thread;
protected final String fieldChangeName;
protected final FieldChange fieldChange;
protected final String fieldOldValue;
protected final String fieldNewValue;
protected final MessageDecorator<?> messageDecorator;
public DefaultFieldFormatter(
MessageDecorator<?> messageDecorator,
String fieldOldValue,
String fieldNewValue,
String fieldChangeName,
MessageParser.EntityLink entityLink) {
MessageDecorator<?> messageDecorator, Thread thread, FieldChange fieldChange) {
this.messageDecorator = messageDecorator;
this.fieldChangeName = fieldChangeName;
this.fieldOldValue = fieldOldValue;
this.fieldNewValue = fieldNewValue;
this.entityLink = entityLink;
this.fieldChangeName = getFieldNameChange(fieldChange.getName(), thread);
this.fieldOldValue = getFieldValue(fieldChange.getOldValue());
this.fieldNewValue = getFieldValue(fieldChange.getNewValue());
this.thread = thread;
this.fieldChange = fieldChange;
}
@Override
@ -70,7 +86,7 @@ public class DefaultFieldFormatter implements FieldFormatter {
@Override
public MessageParser.EntityLink getEntityLink() {
return entityLink;
return MessageParser.EntityLink.parse(thread.getAbout());
}
public String formatAddedField() {
@ -114,4 +130,76 @@ public class DefaultFieldFormatter implements FieldFormatter {
}
return message;
}
public static void populateThreadFeedInfo(
Thread thread,
String threadMessage,
Thread.CardStyle cardStyle,
Thread.FieldOperation operation,
FeedInfo feedInfo) {
thread.withMessage(threadMessage);
thread.withCardStyle(cardStyle);
thread.withFieldOperation(operation);
thread.withFeedInfo(feedInfo);
}
public static String getFieldValue(Object fieldValue) {
if (nullOrEmpty(fieldValue)) {
return StringUtils.EMPTY;
}
try {
JsonValue json = JsonUtils.readJson(fieldValue.toString());
if (json.getValueType() == JsonValue.ValueType.ARRAY) {
JsonArray jsonArray = json.asJsonArray();
List<String> labels = new ArrayList<>();
for (JsonValue item : jsonArray) {
if (item.getValueType() == JsonValue.ValueType.OBJECT) {
Set<String> keys = item.asJsonObject().keySet();
if (keys.contains("tagFQN")) {
labels.add(item.asJsonObject().getString("tagFQN"));
} else if (keys.contains(FIELD_DISPLAY_NAME)) {
// Entity Reference will have a displayName
labels.add(item.asJsonObject().getString(FIELD_DISPLAY_NAME));
} else if (keys.contains(FIELD_NAME)) {
// Glossary term references has only "name" field
labels.add(item.asJsonObject().getString(FIELD_NAME));
} else if (keys.contains("constraintType")) {
labels.add(item.asJsonObject().getString("constraintType"));
}
} else if (item.getValueType() == JsonValue.ValueType.STRING) {
// The string might be enclosed with double quotes
// Check if string has double quotes and strip trailing whitespaces
String label = item.toString().replaceAll("^\"|\"$", "");
labels.add(label.strip());
}
}
return String.join(", ", labels);
} else if (json.getValueType() == JsonValue.ValueType.OBJECT) {
JsonObject jsonObject = json.asJsonObject();
// Entity Reference will have a displayName
Set<String> keys = jsonObject.asJsonObject().keySet();
if (keys.contains(FIELD_DISPLAY_NAME)) {
return jsonObject.asJsonObject().getString(FIELD_DISPLAY_NAME);
} else if (keys.contains(FIELD_NAME)) {
return jsonObject.asJsonObject().getString(FIELD_NAME);
}
}
} catch (JsonParsingException ex) {
// If unable to parse json, just return the string
}
return fieldValue.toString();
}
public static String getFieldNameChange(String fieldChangeName, Thread thread) {
MessageParser.EntityLink link = getEntityLinkForFieldName(fieldChangeName, thread);
String arrayFieldName = link.getArrayFieldName();
String arrayFieldValue = link.getArrayFieldValue();
String updatedField = fieldChangeName;
if (arrayFieldValue != null) {
updatedField = format("%s.%s", arrayFieldName, arrayFieldValue);
} else if (arrayFieldName != null) {
updatedField = format("%s.%s", fieldChangeName, arrayFieldName);
}
return updatedField;
}
}

View File

@ -0,0 +1,67 @@
package org.openmetadata.service.formatter.field;
import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
import org.openmetadata.schema.entity.feed.DescriptionFeedInfo;
import org.openmetadata.schema.entity.feed.FeedInfo;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
public class DescriptionFormatter extends DefaultFieldFormatter {
private static final String HEADER_MESSAGE = "%s %s the description for %s %s";
public DescriptionFormatter(
MessageDecorator<?> messageDecorator, Thread thread, FieldChange fieldChange) {
super(messageDecorator, thread, fieldChange);
}
@Override
public String formatAddedField() {
String message = super.formatAddedField();
// Update the thread with the required information
populateDescriptionFeedInfo(Thread.FieldOperation.ADDED, message, message);
return message;
}
@Override
public String formatUpdatedField() {
String message = super.formatUpdatedField();
// Update the thread with the required information
populateDescriptionFeedInfo(Thread.FieldOperation.UPDATED, message, message);
return message;
}
@Override
public String formatDeletedField() {
String message = super.formatDeletedField();
// Update the thread with the required information
populateDescriptionFeedInfo(Thread.FieldOperation.DELETED, message, message);
return message;
}
private void populateDescriptionFeedInfo(
Thread.FieldOperation operation, String threadMessage, String diffMessage) {
DescriptionFeedInfo descriptionFeedInfo =
new DescriptionFeedInfo()
.withPreviousDescription(fieldOldValue)
.withNewDescription(fieldNewValue)
.withDiffMessage(diffMessage);
FeedInfo feedInfo =
new FeedInfo()
.withHeaderMessage(getHeaderForDescriptionUpdate(operation.value()))
.withFieldName(FIELD_DESCRIPTION)
.withEntitySpecificInfo(descriptionFeedInfo);
populateThreadFeedInfo(
thread, threadMessage, Thread.CardStyle.DESCRIPTION, operation, feedInfo);
}
private String getHeaderForDescriptionUpdate(String eventTypeMessage) {
return String.format(
HEADER_MESSAGE,
thread.getUpdatedBy(),
eventTypeMessage,
thread.getEntityType(),
thread.getEntityUrlLink());
}
}

View File

@ -13,17 +13,14 @@
package org.openmetadata.service.formatter.field;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.resources.feeds.MessageParser;
public class FollowersFormatter extends DefaultFieldFormatter {
public FollowersFormatter(
MessageDecorator<?> messageDecorator,
String fieldOldValue,
String fieldNewValue,
String fieldChangeName,
MessageParser.EntityLink entityLink) {
super(messageDecorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink);
MessageDecorator<?> messageDecorator, Thread thread, FieldChange fieldChange) {
super(messageDecorator, thread, fieldChange);
}
@Override

View File

@ -13,19 +13,28 @@
package org.openmetadata.service.formatter.field;
import org.apache.commons.lang.StringUtils;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.entity.feed.FeedInfo;
import org.openmetadata.schema.entity.feed.OwnerFeedInfo;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.resources.feeds.MessageParser;
public class OwnerFormatter extends DefaultFieldFormatter {
private static final String HEADER_MESSAGE = "%s %s the owner for %s %s";
public OwnerFormatter(
MessageDecorator<?> messageDecorator,
String fieldOldValue,
String fieldNewValue,
String fieldChangeName,
MessageParser.EntityLink entityLink) {
super(messageDecorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink);
MessageDecorator<?> messageDecorator, Thread thread, FieldChange fieldChange) {
super(messageDecorator, thread, fieldChange);
}
@Override
public String formatAddedField() {
String message = super.formatAddedField();
populateOwnerFeedInfo(Thread.FieldOperation.ADDED, message);
return message;
}
@Override
@ -38,23 +47,52 @@ public class OwnerFormatter extends DefaultFieldFormatter {
+ this.getMessageDecorator().httpAddMarker()
+ this.getFieldNewValue()
+ this.getMessageDecorator().httpAddMarker();
String spanAdd = this.getMessageDecorator().getAddMarker();
String spanAddClose = this.getMessageDecorator().getAddMarkerClose();
String spanRemove = this.getMessageDecorator().getRemoveMarker();
String spanRemoveClose = this.getMessageDecorator().getRemoveMarkerClose();
diff =
this.getMessageDecorator()
.replaceMarkers(
diff, this.getMessageDecorator().httpAddMarker(), spanAdd, spanAddClose);
diff,
this.getMessageDecorator().httpAddMarker(),
this.getMessageDecorator().getAddMarker(),
this.getMessageDecorator().getAddMarkerClose());
diff =
this.getMessageDecorator()
.replaceMarkers(
diff, this.getMessageDecorator().httpRemoveMarker(), spanRemove, spanRemoveClose);
if (CommonUtil.nullOrEmpty(diff)) {
return StringUtils.EMPTY;
} else {
diff,
this.getMessageDecorator().httpRemoveMarker(),
this.getMessageDecorator().getRemoveMarker(),
this.getMessageDecorator().getRemoveMarkerClose());
if (!CommonUtil.nullOrEmpty(diff)) {
String field = String.format("Updated %s: %s", this.getMessageDecorator().getBold(), diff);
return String.format(field, this.getFieldChangeName());
diff = String.format(field, this.getFieldChangeName());
}
populateOwnerFeedInfo(Thread.FieldOperation.UPDATED, diff);
return diff;
}
@Override
public String formatDeletedField() {
String message = super.formatDeletedField();
populateOwnerFeedInfo(Thread.FieldOperation.DELETED, message);
return message;
}
private void populateOwnerFeedInfo(Thread.FieldOperation operation, String threadMessage) {
OwnerFeedInfo ownerFeedInfo =
new OwnerFeedInfo().withPreviousOwner(fieldOldValue).withUpdatedOwner(fieldNewValue);
FeedInfo feedInfo =
new FeedInfo()
.withHeaderMessage(getHeaderForOwnerUpdate(operation.value()))
.withFieldName(FIELD_OWNER)
.withEntitySpecificInfo(ownerFeedInfo);
populateThreadFeedInfo(thread, threadMessage, Thread.CardStyle.OWNER, operation, feedInfo);
}
private String getHeaderForOwnerUpdate(String eventTypeMessage) {
return String.format(
HEADER_MESSAGE,
thread.getUpdatedBy(),
eventTypeMessage,
thread.getEntityType(),
thread.getEntityUrlLink());
}
}

View File

@ -13,18 +13,23 @@
package org.openmetadata.service.formatter.field;
import static org.openmetadata.service.Entity.FIELD_TAGS;
import java.util.List;
import org.openmetadata.schema.entity.feed.FeedInfo;
import org.openmetadata.schema.entity.feed.TagFeedInfo;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.util.JsonUtils;
public class TagFormatter extends DefaultFieldFormatter {
private static final String HEADER_MESSAGE = "%s %s the tags for %s %s";
public TagFormatter(
MessageDecorator<?> messageDecorator,
String fieldOldValue,
String fieldNewValue,
String fieldChangeName,
MessageParser.EntityLink entityLink) {
super(messageDecorator, fieldOldValue, fieldNewValue, fieldChangeName, entityLink);
MessageDecorator<?> messageDecorator, Thread thread, FieldChange fieldChange) {
super(messageDecorator, thread, fieldChange);
}
@Override
@ -63,6 +68,7 @@ public class TagFormatter extends DefaultFieldFormatter {
.replaceMarkers(
message, this.getMessageDecorator().httpAddMarker(), spanAdd, spanAddClose);
}
populateTagFeedInfo(Thread.FieldOperation.ADDED, message);
return message;
}
@ -112,6 +118,7 @@ public class TagFormatter extends DefaultFieldFormatter {
this.getFieldChangeName(),
diff);
}
populateTagFeedInfo(Thread.FieldOperation.UPDATED, message);
return String.format(message, this.getFieldChangeName());
}
@ -151,6 +158,7 @@ public class TagFormatter extends DefaultFieldFormatter {
.replaceMarkers(
message, this.getMessageDecorator().httpRemoveMarker(), spanAdd, spanAddClose);
}
populateTagFeedInfo(Thread.FieldOperation.DELETED, message);
return message;
}
@ -160,4 +168,27 @@ public class TagFormatter extends DefaultFieldFormatter {
}
return fieldName;
}
private void populateTagFeedInfo(Thread.FieldOperation operation, String threadMessage) {
List<TagLabel> oldTags =
JsonUtils.readOrConvertValues(fieldChange.getOldValue(), TagLabel.class);
List<TagLabel> newTags =
JsonUtils.readOrConvertValues(fieldChange.getNewValue(), TagLabel.class);
TagFeedInfo tagFeedInfo = new TagFeedInfo().withPreviousTags(oldTags).withUpdatedTags(newTags);
FeedInfo feedInfo =
new FeedInfo()
.withHeaderMessage(getHeaderForTagsUpdate(operation.value()))
.withFieldName(FIELD_TAGS)
.withEntitySpecificInfo(tagFeedInfo);
populateThreadFeedInfo(thread, threadMessage, Thread.CardStyle.TAGS, operation, feedInfo);
}
private String getHeaderForTagsUpdate(String eventTypeMessage) {
return String.format(
HEADER_MESSAGE,
thread.getUpdatedBy(),
eventTypeMessage,
thread.getEntityType(),
thread.getEntityUrlLink());
}
}

View File

@ -0,0 +1,126 @@
package org.openmetadata.service.formatter.field;
import org.openmetadata.schema.entity.feed.FeedInfo;
import org.openmetadata.schema.entity.feed.TestCaseResultFeedInfo;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.TestSuite;
import org.openmetadata.schema.tests.type.TestCaseResult;
import org.openmetadata.schema.tests.type.TestCaseStatus;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.formatter.decorators.FeedMessageDecorator;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.jdbi3.TestCaseRepository;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
public class TestCaseResultFormatter extends DefaultFieldFormatter {
public static final String TEST_RESULT_FIELD = "testCaseResult";
private static final String HEADER_MESSAGE = "%s added results to test Case %s";
public TestCaseResultFormatter(
MessageDecorator<?> messageDecorator, Thread thread, FieldChange fieldChange) {
super(messageDecorator, thread, fieldChange);
}
@Override
public String formatAddedField() {
String message;
if (fieldChangeName.equals(TEST_RESULT_FIELD)) {
message = transformTestCaseResult(messageDecorator, thread, fieldChange);
} else {
message = super.formatAddedField();
}
// Update the thread with the required information
populateTestResultFeedInfo(Thread.FieldOperation.UPDATED, message);
return message;
}
@Override
public String formatUpdatedField() {
String message;
if (fieldChangeName.equals(TEST_RESULT_FIELD)) {
message = transformTestCaseResult(messageDecorator, thread, fieldChange);
} else {
message = super.formatUpdatedField();
}
// Update the thread with the required information
populateTestResultFeedInfo(Thread.FieldOperation.UPDATED, message);
return message;
}
private void populateTestResultFeedInfo(Thread.FieldOperation operation, String threadMessage) {
long currentTime = System.currentTimeMillis();
long lastWeekTime = currentTime - 7 * 24 * 60 * 60 * 1000;
TestCaseRepository testCaseRepository =
(TestCaseRepository) Entity.getEntityRepository(Entity.TEST_CASE);
TestCase testCaseEntity =
Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id,testSuite", Include.ALL);
TestSuite testSuiteEntity = Entity.getEntity(testCaseEntity.getTestSuite(), "id", Include.ALL);
ResultList<TestCaseResult> testCaseResultResultList =
testCaseRepository.getTestCaseResults(
testCaseEntity.getFullyQualifiedName(), lastWeekTime, currentTime);
TestCaseResultFeedInfo testCaseResultFeedInfo =
new TestCaseResultFeedInfo()
.withTestCaseResult(testCaseResultResultList.getData())
.withEntityTestResultSummary(testSuiteEntity.getTestCaseResultSummary());
FeedInfo feedInfo =
new FeedInfo()
.withHeaderMessage(getHeaderForTestResultUpdate())
.withFieldName(TEST_RESULT_FIELD)
.withEntitySpecificInfo(testCaseResultFeedInfo);
populateThreadFeedInfo(
thread, threadMessage, Thread.CardStyle.TEST_CASE_RESULT, operation, feedInfo);
}
private String getHeaderForTestResultUpdate() {
return String.format(HEADER_MESSAGE, thread.getUpdatedBy(), thread.getEntityUrlLink());
}
private String transformTestCaseResult(
MessageDecorator<?> messageFormatter, Thread thread, FieldChange fieldChange) {
TestCase testCaseEntity =
Entity.getEntity(thread.getEntityType(), thread.getEntityId(), "id", Include.ALL);
String testCaseName = testCaseEntity.getName();
TestCaseResult result = JsonUtils.convertValue(fieldChange.getNewValue(), TestCaseResult.class);
if (result != null) {
String format =
String.format(
"Test Case %s is %s in %s",
messageFormatter.getBold(),
messageFormatter.getBold(),
MessageParser.EntityLink.parse(testCaseEntity.getEntityLink()).getEntityFQN());
return String.format(
format, testCaseName, getStatusMessage(messageFormatter, result.getTestCaseStatus()));
}
String format =
String.format(
"Test Case %s is updated in %s",
messageFormatter.getBold(), messageFormatter.getBold());
return String.format(
format,
testCaseName,
MessageParser.EntityLink.parse(testCaseEntity.getEntityLink()).getEntityFQN());
}
private String getStatusMessage(MessageDecorator<?> messageDecorator, TestCaseStatus status) {
if (messageDecorator instanceof FeedMessageDecorator) {
return switch (status) {
case Success -> "<span style=\"color:#48CA9E\">Passed</span>";
case Failed -> "<span style=\"color:#F24822\">Failed</span>";
case Aborted -> "<span style=\"color:#FFBE0E\">Aborted</span>";
case Queued -> "<span style=\"color:#959595\">Queued</span>";
};
} else {
return switch (status) {
case Success -> "Passed";
case Failed -> "Failed";
case Aborted -> "Aborted";
case Queued -> "Queued";
};
}
}
}

View File

@ -13,36 +13,26 @@
package org.openmetadata.service.formatter.util;
import static java.lang.String.format;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.schema.type.EventType.ENTITY_CREATED;
import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME;
import static org.openmetadata.service.Entity.FIELD_NAME;
import static org.openmetadata.service.Entity.THREAD;
import static org.openmetadata.service.formatter.factory.ParserFactory.getFieldParserObject;
import static org.openmetadata.service.formatter.field.DefaultFieldFormatter.getFieldNameChange;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonValue;
import javax.json.stream.JsonParsingException;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
@ -56,10 +46,11 @@ import org.openmetadata.service.util.RestUtil;
@Slf4j
public class FormatterUtil {
public static MessageParser.EntityLink getEntityLink(String fieldName, EntityInterface entity) {
EntityReference entityReference = entity.getEntityReference();
String entityType = entityReference.getType();
String entityFQN = entityReference.getFullyQualifiedName();
public static MessageParser.EntityLink getEntityLinkForFieldName(
String fieldName, Thread thread) {
MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(thread.getAbout());
String entityType = thread.getEntityType();
String entityFQN = entityLink.getEntityFQN();
String arrayFieldName = null;
String arrayFieldValue = null;
@ -79,53 +70,6 @@ public class FormatterUtil {
entityType, entityFQN, fieldName, arrayFieldName, arrayFieldValue);
}
public static String getFieldValue(Object fieldValue) {
if (nullOrEmpty(fieldValue)) {
return StringUtils.EMPTY;
}
try {
JsonValue json = JsonUtils.readJson(fieldValue.toString());
if (json.getValueType() == JsonValue.ValueType.ARRAY) {
JsonArray jsonArray = json.asJsonArray();
List<String> labels = new ArrayList<>();
for (JsonValue item : jsonArray) {
if (item.getValueType() == JsonValue.ValueType.OBJECT) {
Set<String> keys = item.asJsonObject().keySet();
if (keys.contains("tagFQN")) {
labels.add(item.asJsonObject().getString("tagFQN"));
} else if (keys.contains(FIELD_DISPLAY_NAME)) {
// Entity Reference will have a displayName
labels.add(item.asJsonObject().getString(FIELD_DISPLAY_NAME));
} else if (keys.contains(FIELD_NAME)) {
// Glossary term references have only "name" field
labels.add(item.asJsonObject().getString(FIELD_NAME));
} else if (keys.contains("constraintType")) {
labels.add(item.asJsonObject().getString("constraintType"));
}
} else if (item.getValueType() == JsonValue.ValueType.STRING) {
// The string might be enclosed with double quotes
// Check if string has double quotes and strip trailing whitespaces
String label = item.toString().replaceAll("^\"|\"$", "");
labels.add(label.strip());
}
}
return String.join(", ", labels);
} else if (json.getValueType() == JsonValue.ValueType.OBJECT) {
JsonObject jsonObject = json.asJsonObject();
// Entity Reference will have a displayName
Set<String> keys = jsonObject.asJsonObject().keySet();
if (keys.contains(FIELD_DISPLAY_NAME)) {
return jsonObject.asJsonObject().getString(FIELD_DISPLAY_NAME);
} else if (keys.contains(FIELD_NAME)) {
return jsonObject.asJsonObject().getString(FIELD_NAME);
}
}
} catch (JsonParsingException ex) {
// If unable to parse json, just return the string
}
return fieldValue.toString();
}
////// used in alerts rule evaluator///
public static Set<String> getUpdatedField(ChangeEvent event) {
Set<String> fields = new HashSet<>();
@ -151,33 +95,21 @@ public class FormatterUtil {
public static String transformMessage(
MessageDecorator<?> messageFormatter,
Thread thread,
FieldChange fieldChange,
EntityInterface entity,
CHANGE_TYPE changeType) {
MessageParser.EntityLink link = getEntityLink(fieldChange.getName(), entity);
String arrayFieldName = link.getArrayFieldName();
MessageParser.EntityLink link = getEntityLinkForFieldName(fieldChange.getName(), thread);
String arrayFieldValue = link.getArrayFieldValue();
String message;
String updatedField = fieldChange.getName();
if (arrayFieldValue != null) {
updatedField = format("%s.%s", arrayFieldName, arrayFieldValue);
} else if (arrayFieldName != null) {
updatedField = format("%s.%s", fieldChange.getName(), arrayFieldName);
}
String oldField = getFieldValue(fieldChange.getOldValue());
String newField = getFieldValue(fieldChange.getNewValue());
String updateField = getFieldNameChange(fieldChange.getName(), thread);
DefaultFieldFormatter fieldSpecificFormatter;
if (nullOrEmpty(arrayFieldValue)) {
fieldSpecificFormatter =
getFieldParserObject(messageFormatter, oldField, newField, updatedField, link);
getFieldParserObject(messageFormatter, thread, fieldChange, updateField);
} else {
fieldSpecificFormatter =
getFieldParserObject(messageFormatter, oldField, newField, arrayFieldValue, link);
getFieldParserObject(messageFormatter, thread, fieldChange, arrayFieldValue);
}
message = fieldSpecificFormatter.getFormattedMessage(changeType);
return message;
return fieldSpecificFormatter.getFormattedMessage(changeType);
}
public enum CHANGE_TYPE {
@ -186,15 +118,13 @@ public class FormatterUtil {
DELETE
}
public static Map<MessageParser.EntityLink, String> getFormattedMessages(
MessageDecorator<?> messageFormatter,
ChangeDescription changeDescription,
EntityInterface entity) {
public static List<Thread> getFormattedMessages(
MessageDecorator<?> messageFormatter, Thread thread, ChangeDescription changeDescription) {
// Store a map of entityLink -> message
List<FieldChange> fieldsUpdated = changeDescription.getFieldsUpdated();
Map<MessageParser.EntityLink, String> messages =
List<Thread> messages =
getFormattedMessagesForAllFieldChange(
messageFormatter, entity, fieldsUpdated, CHANGE_TYPE.UPDATE);
messageFormatter, thread, fieldsUpdated, CHANGE_TYPE.UPDATE);
// fieldsAdded and fieldsDeleted need special handling since
// there is a possibility to merge them as one update message.
@ -202,13 +132,13 @@ public class FormatterUtil {
List<FieldChange> fieldsDeleted = changeDescription.getFieldsDeleted();
if (fieldsAdded.isEmpty() || fieldsDeleted.isEmpty()) {
if (!fieldsAdded.isEmpty()) {
messages.putAll(
messages.addAll(
getFormattedMessagesForAllFieldChange(
messageFormatter, entity, fieldsAdded, CHANGE_TYPE.ADD));
messageFormatter, thread, fieldsAdded, CHANGE_TYPE.ADD));
} else if (!fieldsDeleted.isEmpty()) {
messages.putAll(
messages.addAll(
getFormattedMessagesForAllFieldChange(
messageFormatter, entity, fieldsDeleted, CHANGE_TYPE.DELETE));
messageFormatter, thread, fieldsDeleted, CHANGE_TYPE.DELETE));
}
return messages;
}
@ -217,61 +147,59 @@ public class FormatterUtil {
fieldsAdded.stream().filter(f -> f.getName().equals(field.getName())).findAny();
if (addedField.isPresent()) {
String fieldName = field.getName();
MessageParser.EntityLink link = FormatterUtil.getEntityLink(fieldName, entity);
MessageParser.EntityLink link = FormatterUtil.getEntityLinkForFieldName(fieldName, thread);
// convert the added field and deleted field into one update message
Thread tempThread = JsonUtils.deepCopy(thread, Thread.class);
String message =
ParserFactory.getEntityParser(link.getEntityType())
.format(
messageFormatter,
tempThread,
new FieldChange()
.withName(fieldName)
.withOldValue(field.getOldValue())
.withNewValue(addedField.get().getNewValue()),
entity,
CHANGE_TYPE.UPDATE);
messages.put(link, message);
tempThread.withMessage(message);
messages.add(tempThread);
// Remove the field from addedFields list to avoid double processing
fieldsAdded = fieldsAdded.stream().filter(f -> !f.equals(addedField.get())).toList();
} else {
// process the deleted field
messages.putAll(
messages.addAll(
getFormattedMessagesForAllFieldChange(
messageFormatter, entity, Collections.singletonList(field), CHANGE_TYPE.DELETE));
messageFormatter, thread, Collections.singletonList(field), CHANGE_TYPE.DELETE));
}
}
// process the remaining added fields
if (!fieldsAdded.isEmpty()) {
messages.putAll(
messages.addAll(
getFormattedMessagesForAllFieldChange(
messageFormatter, entity, fieldsAdded, CHANGE_TYPE.ADD));
messageFormatter, thread, fieldsAdded, CHANGE_TYPE.ADD));
}
return messages;
}
private static Map<MessageParser.EntityLink, String> getFormattedMessagesForAllFieldChange(
private static List<Thread> getFormattedMessagesForAllFieldChange(
MessageDecorator<?> messageFormatter,
EntityInterface entity,
Thread thread,
List<FieldChange> fields,
CHANGE_TYPE changeType) {
Map<MessageParser.EntityLink, String> messages = new HashMap<>();
List<Thread> threads = new ArrayList<>();
for (FieldChange field : fields) {
MessageParser.EntityLink link = FormatterUtil.getEntityLink(field.getName(), entity);
Thread tempEntity = JsonUtils.deepCopy(thread, Thread.class).withId(UUID.randomUUID());
// We are creating multiple thread on the same entity based on different messages
String message =
ParserFactory.getEntityParser(link.getEntityType())
.format(messageFormatter, field, entity, changeType);
messages.put(link, message);
ParserFactory.getEntityParser(thread.getEntityType())
.format(messageFormatter, tempEntity, field, changeType);
tempEntity.withMessage(message);
threads.add(tempEntity);
}
return messages;
return threads;
}
public static Optional<ChangeEvent> getChangeEventFromResponseContext(
ContainerResponseContext responseContext, String updateBy, String method) {
// GET operations don't produce change events , Response has no entity to produce change event
// from
if (method.equals("GET") || responseContext.getEntity() == null) {
return Optional.empty();
}
ContainerResponseContext responseContext, String updateBy) {
Optional<EventType> eventType = getEventTypeFromResponse(responseContext);
if (eventType.isEmpty() || !responseContext.hasEntity()) {
return Optional.empty();

View File

@ -21,6 +21,7 @@ import static org.openmetadata.schema.type.EventType.ENTITY_FIELDS_CHANGED;
import static org.openmetadata.schema.type.EventType.ENTITY_NO_CHANGE;
import static org.openmetadata.schema.type.EventType.ENTITY_RESTORED;
import static org.openmetadata.schema.type.EventType.ENTITY_SOFT_DELETED;
import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.schema.type.Include.DELETED;
import static org.openmetadata.schema.type.Include.NON_DELETED;
@ -120,6 +121,7 @@ import org.openmetadata.schema.type.Column;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.LifeCycle;
import org.openmetadata.schema.type.ProviderType;
@ -1836,9 +1838,50 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
result.withSuccessRequest(success);
// Create a Change Event on successful addition/removal of assets
if (result.getStatus().equals(ApiStatus.SUCCESS)) {
EntityInterface entityInterface = Entity.getEntity(fromEntity, entityId, "id", ALL);
ChangeDescription change =
addBulkAddRemoveChangeDescription(
entityInterface.getVersion(), isAdd, request.getAssets(), null);
ChangeEvent changeEvent =
getChangeEvent(entityInterface, change, fromEntity, entityInterface.getVersion());
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
}
return result;
}
private ChangeDescription addBulkAddRemoveChangeDescription(
Double version, boolean isAdd, Object newValue, Object oldValue) {
FieldChange fieldChange =
new FieldChange().withName("assets").withNewValue(newValue).withOldValue(oldValue);
ChangeDescription change = new ChangeDescription().withPreviousVersion(version);
if (isAdd) {
change.getFieldsAdded().add(fieldChange);
} else {
change.getFieldsDeleted().add(fieldChange);
}
return change;
}
private ChangeEvent getChangeEvent(
EntityInterface updated, ChangeDescription change, String entityType, Double prevVersion) {
return new ChangeEvent()
.withId(UUID.randomUUID())
.withEntity(updated)
.withChangeDescription(change)
.withEventType(ENTITY_UPDATED)
.withEntityType(entityType)
.withEntityId(updated.getId())
.withEntityFullyQualifiedName(updated.getFullyQualifiedName())
.withUserName(updated.getUpdatedBy())
.withTimestamp(System.currentTimeMillis())
.withCurrentVersion(updated.getVersion())
.withPreviousVersion(prevVersion);
}
/** Remove owner relationship for a given entity */
@Transaction
private void removeOwner(T entity, EntityReference owner) {

View File

@ -50,9 +50,7 @@ public class EventSubscriptionRepository extends EntityRepository<EventSubscript
@Override
public void setFields(EventSubscription entity, Fields fields) {
if (fields.contains("statusDetails")
&& !entity.getAlertType().equals(CreateEventSubscription.AlertType.ACTIVITY_FEED)
&& !entity.getDestinations().isEmpty()) {
if (fields.contains("statusDetails") && !entity.getDestinations().isEmpty()) {
List<SubscriptionDestination> destinations = new ArrayList<>();
entity
.getDestinations()
@ -135,13 +133,16 @@ public class EventSubscriptionRepository extends EntityRepository<EventSubscript
@Override
public void entitySpecificUpdate() {
recordChange("enabled", original.getEnabled(), updated.getEnabled());
recordChange("batchSize", original.getBatchSize(), updated.getBatchSize());
recordChange("input", original.getInput(), updated.getInput(), true);
recordChange(
"filteringRules", original.getFilteringRules(), updated.getFilteringRules(), true);
recordChange("destinations", original.getDestinations(), updated.getDestinations(), true);
recordChange("trigger", original.getTrigger(), updated.getTrigger(), true);
recordChange("batchSize", original.getBatchSize(), updated.getBatchSize());
if (!original.getAlertType().equals(CreateEventSubscription.AlertType.ACTIVITY_FEED)) {
recordChange("enabled", original.getEnabled(), updated.getEnabled());
recordChange("destinations", original.getDestinations(), updated.getDestinations(), true);
recordChange("trigger", original.getTrigger(), updated.getTrigger(), true);
}
}
}
}

View File

@ -15,7 +15,6 @@ package org.openmetadata.service.resources.events.subscription;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.schema.api.events.CreateEventSubscription.AlertType.ACTIVITY_FEED;
import static org.openmetadata.schema.api.events.CreateEventSubscription.AlertType.NOTIFICATION;
import static org.openmetadata.service.events.subscription.AlertUtil.validateAndBuildFilteringConditions;
@ -139,9 +138,7 @@ public class EventSubscriptionResource
List<EventSubscription> eventSubList =
JsonUtils.readObjects(listAllEventsSubscriptions, EventSubscription.class);
for (EventSubscription subscription : eventSubList) {
if (subscription.getAlertType() != ACTIVITY_FEED) {
EventSubscriptionScheduler.getInstance().addSubscriptionPublisher(subscription);
}
EventSubscriptionScheduler.getInstance().addSubscriptionPublisher(subscription);
}
} catch (Exception ex) {
// Starting application should not fail

View File

@ -592,7 +592,9 @@ public class FeedResource {
.withAnnouncement(create.getAnnouncementDetails())
.withChatbot(create.getChatbotDetails())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(System.currentTimeMillis());
.withUpdatedAt(System.currentTimeMillis())
.withEntityType(Entity.THREAD)
.withGeneratedBy(Thread.GeneratedBy.USER);
}
private Post getPost(CreatePost create) {

View File

@ -16,15 +16,12 @@ package org.openmetadata.service.util;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import static org.openmetadata.service.formatter.util.FormatterUtil.getFormattedMessages;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
@ -35,18 +32,17 @@ public final class FeedUtils {
private FeedUtils() {}
public static List<Thread> getThreadWithMessage(
MessageDecorator<?> messageDecorator, ChangeEvent changeEvent, String loggedInUserName) {
MessageDecorator<?> messageDecorator, ChangeEvent changeEvent) {
if (changeEvent == null || changeEvent.getEntity() == null) {
return Collections.emptyList(); // Response has no entity to produce change event from
}
// Change Event is of Thread or Data Assets
if (changeEvent.getEntityType().equals(Entity.THREAD)) {
// TODO:
// Thread type create from FeedRepository
return Collections.emptyList();
// return List.of(AlertsRuleEvaluator.getThread(changeEvent));
} else if (Entity.getEntityList().contains(changeEvent.getEntityType())) {
return populateMessageForDataAssets(messageDecorator, changeEvent, loggedInUserName);
return populateMessageForDataAssets(messageDecorator, changeEvent);
} else {
LOG.error(
"Invalid Entity Type: {}, Currently Change Events are expected as Thread or Data Assets",
@ -55,18 +51,20 @@ public final class FeedUtils {
}
}
private static List<Thread> populateMessageForThread(
ChangeEvent changeEvent, String loggedInUserName) {
return null;
}
private static List<Thread> populateMessageForDataAssets(
MessageDecorator<?> messageDecorator, ChangeEvent changeEvent, String loggedInUserName) {
MessageDecorator<?> messageDecorator, ChangeEvent changeEvent) {
String message;
EntityInterface entityInterface = getEntity(changeEvent);
MessageParser.EntityLink about =
new MessageParser.EntityLink(
changeEvent.getEntityType(), entityInterface.getFullyQualifiedName(), null, null, null);
Thread thread =
getThread(
messageDecorator,
entityInterface,
about.getLinkString(),
changeEvent.getEntityType(),
changeEvent.getUserName());
// In Case EventType is not valid
return switch (changeEvent.getEventType()) {
case ENTITY_CREATED -> {
@ -74,71 +72,57 @@ public final class FeedUtils {
String.format(
"Created **%s**: `%s`",
changeEvent.getEntityType(), entityInterface.getFullyQualifiedName());
yield List.of(getThread(about.getLinkString(), message, loggedInUserName));
yield List.of(thread.withMessage(message));
}
case ENTITY_UPDATED -> getThreadWithMessage(
messageDecorator, entityInterface, changeEvent.getChangeDescription(), loggedInUserName);
case ENTITY_UPDATED -> getFormattedMessages(
messageDecorator, thread, changeEvent.getChangeDescription());
case ENTITY_SOFT_DELETED -> {
message =
String.format(
"Soft deleted **%s**: `%s`",
changeEvent.getEntityType(), entityInterface.getFullyQualifiedName());
yield List.of(getThread(about.getLinkString(), message, loggedInUserName));
yield List.of(thread.withMessage(message));
}
case ENTITY_DELETED -> {
message =
String.format(
"Permanently Deleted **%s**: `%s`",
changeEvent.getEntityType(), entityInterface.getFullyQualifiedName());
yield List.of(getThread(about.getLinkString(), message, loggedInUserName));
yield List.of(thread.withMessage(message));
}
case LOGICAL_TEST_CASE_ADDED -> {
message =
String.format(
"Added Logical Test Cases to **%s**: `%s`",
changeEvent.getEntityType(), entityInterface.getFullyQualifiedName());
yield List.of(getThread(about.getLinkString(), message, loggedInUserName));
yield List.of(thread.withMessage(message));
}
default -> {
if (entityInterface.getChangeDescription() == null) {
if (changeEvent.getChangeDescription() == null) {
yield Collections.emptyList();
}
yield getThreadWithMessage(
messageDecorator,
entityInterface,
entityInterface.getChangeDescription(),
loggedInUserName);
yield getFormattedMessages(messageDecorator, thread, changeEvent.getChangeDescription());
}
};
}
private static List<Thread> getThreadWithMessage(
MessageDecorator<?> messageDecorator,
EntityInterface entity,
ChangeDescription changeDescription,
public static Thread getThread(
MessageDecorator<?> decorator,
EntityInterface entityInterface,
String linkString,
String entityType,
String loggedInUserName) {
List<Thread> threads = new ArrayList<>();
Map<MessageParser.EntityLink, String> messages =
getFormattedMessages(messageDecorator, changeDescription, entity);
// Create an automated thread
for (Map.Entry<MessageParser.EntityLink, String> entry : messages.entrySet()) {
threads.add(getThread(entry.getKey().getLinkString(), entry.getValue(), loggedInUserName));
}
return threads;
}
private static Thread getThread(String linkString, String message, String loggedInUserName) {
return new Thread()
.withId(UUID.randomUUID())
.withThreadTs(System.currentTimeMillis())
.withCreatedBy(loggedInUserName)
.withAbout(linkString)
.withEntityId(entityInterface.getId())
.withEntityType(entityType)
.withReactions(Collections.emptyList())
.withUpdatedBy(loggedInUserName)
.withUpdatedAt(System.currentTimeMillis())
.withMessage(message);
.withGeneratedBy(Thread.GeneratedBy.SYSTEM)
.withEntityUrlLink(decorator.buildEntityUrl(entityType, entityInterface));
}
}

View File

@ -126,6 +126,14 @@ public final class JsonUtils {
}
}
public static <T> List<T> readOrConvertValues(Object obj, Class<T> clz) {
if (obj instanceof String str) {
return readObjects(str, clz);
} else {
return convertObjects(obj, clz);
}
}
public static <T> T readValue(String json, String clazzName) {
try {
return (T) readValue(json, Class.forName(clazzName));
@ -156,6 +164,15 @@ public final class JsonUtils {
}
}
/** Convert an array of objects of type {@code T} from json */
public static <T> List<T> convertObjects(Object json, Class<T> clz) {
if (json == null) {
return Collections.emptyList();
}
TypeFactory typeFactory = OBJECT_MAPPER.getTypeFactory();
return OBJECT_MAPPER.convertValue(json, typeFactory.constructCollectionType(List.class, clz));
}
/** Read an array of objects of type {@code T} from json */
public static <T> List<T> readObjects(String json, Class<T> clz) {
if (json == null) {

View File

@ -10,7 +10,7 @@
{
"name": "matchAnySource",
"effect": "include",
"condition": "matchAnySource({'chart','container','dashboard','dashboardDataModel','database','databaseSchema','glossary','glossaryTerm','mlmodel','pipeline','query','report','searchIndex','storedProcedure','table','topic','testSuite','testCase'})"
"condition": "matchAnySource({'chart','container','dashboard','dashboardDataModel','database','databaseSchema','glossary','glossaryTerm','mlmodel','pipeline','query','report','searchIndex','storedProcedure','table','topic','testSuite','testCase', 'domain', 'dataProduct'})"
},
{
"name": "matchUpdatedBy",
@ -19,8 +19,8 @@
},
{
"name": "matchAnyFieldChange",
"effect": "exclude",
"condition": "matchAnyFieldChange({'usageSummary', 'sourceHash', 'lifeCycle'})"
"effect": "include",
"condition": "matchAnyFieldChange({'description', 'domain', 'owner', 'tags', 'followers', 'extension','parameterValues', 'assets'})"
}
]
},
@ -57,7 +57,9 @@
"table",
"topic",
"testSuite",
"testCase"
"testCase",
"domain",
"dataProduct"
]
}
]
@ -76,14 +78,18 @@
},
{
"name": "filterByFieldChange",
"effect" : "exclude",
"effect" : "include",
"arguments": [
{
"name": "fieldChangeList",
"input": [
"usageSummary",
"sourceHash",
"lifeCycle"
"description",
"domain",
"owner",
"tags",
"followers",
"extension",
"parameterValues"
]
}
]
@ -91,5 +97,6 @@
]
},
"provider" : "system",
"pollInterval" : 30,
"enabled" : true
}

View File

@ -15,15 +15,14 @@ package org.openmetadata.service.resources;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import static org.openmetadata.service.formatter.util.FormatterUtil.getFormattedMessages;
import static org.openmetadata.service.util.EntityUtil.fieldAdded;
import static org.openmetadata.service.util.EntityUtil.fieldDeleted;
import static org.openmetadata.service.util.EntityUtil.fieldUpdated;
import static org.openmetadata.service.util.FeedUtils.getThreadWithMessage;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
@ -34,18 +33,21 @@ import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.TestMethodOrder;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.TagLabel.LabelType;
import org.openmetadata.schema.type.TagLabel.State;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationTest;
import org.openmetadata.service.formatter.decorators.FeedMessageDecorator;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.decorators.SlackMessageDecorator;
import org.openmetadata.service.resources.databases.TableResourceTest;
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.service.util.JsonUtils;
@Slf4j
@ -66,20 +68,27 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
@Test
void testFormattedMessages() {
TagLabel oldTag1 = new TagLabel();
oldTag1.withTagFQN("tag1").withLabelType(LabelType.DERIVED).withState(State.CONFIRMED);
TagLabel oldTag2 = new TagLabel();
oldTag2.withTagFQN("tag2").withLabelType(LabelType.DERIVED).withState(State.CONFIRMED);
ChangeDescription changeDescription = new ChangeDescription();
// Simulate updating tags of an entity from tag1 -> tag2
FieldChange addTag = new FieldChange();
addTag.withName("tags").withNewValue("tag2");
addTag.withName("tags").withNewValue(JsonUtils.pojoToJson(List.of(oldTag2)));
FieldChange deleteTag = new FieldChange();
deleteTag.withName("tags").withOldValue("tag1");
deleteTag.withName("tags").withOldValue(JsonUtils.pojoToJson(List.of(oldTag1)));
changeDescription
.withFieldsAdded(List.of(addTag))
.withFieldsDeleted(List.of(deleteTag))
.withPreviousVersion(1.0);
Map<EntityLink, String> messages =
getFormattedMessages(feedMessageFormatter, changeDescription, TABLE);
assertEquals(1, messages.size());
ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1);
List<Thread> threadWithMessages = getThreadWithMessage(feedMessageFormatter, changeEvent);
assertEquals(1, threadWithMessages.size());
TagLabel tag1 = new TagLabel();
tag1.withTagFQN("tag1").withLabelType(LabelType.DERIVED).withState(State.CONFIRMED);
@ -90,12 +99,30 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
addTag.withNewValue(JsonUtils.pojoToJson(List.of(tag2)));
deleteTag.withOldValue(JsonUtils.pojoToJson(List.of(tag1)));
Map<EntityLink, String> jsonMessages =
getFormattedMessages(feedMessageFormatter, changeDescription, TABLE);
assertEquals(1, jsonMessages.size());
List<Thread> updatedThreadWithMessages =
getThreadWithMessage(feedMessageFormatter, changeEvent);
assertEquals(1, updatedThreadWithMessages.size());
// The entity links and values of both the messages should be the same
assertEquals(messages.values().iterator().next(), jsonMessages.values().iterator().next());
assertEquals(
threadWithMessages.get(0).getMessage(), updatedThreadWithMessages.get(0).getMessage());
}
private ChangeEvent getChangeEvent(
EventType eventType,
ChangeDescription changeDescription,
Double previousVersion,
Double newVersion) {
return new ChangeEvent()
.withId(UUID.randomUUID())
.withEventType(eventType)
.withEntityId(TABLE.getId())
.withEntityType(Entity.TABLE)
.withEntityFullyQualifiedName(TABLE.getFullyQualifiedName())
.withChangeDescription(changeDescription)
.withPreviousVersion(previousVersion)
.withCurrentVersion(newVersion)
.withEntity(TABLE);
}
@Test
@ -106,13 +133,14 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
entityReference.withId(UUID.randomUUID()).withName("user1").withDisplayName("User One");
fieldAdded(changeDescription, FIELD_OWNER, JsonUtils.pojoToJson(entityReference));
Map<EntityLink, String> messages =
getFormattedMessages(feedMessageFormatter, changeDescription, TABLE);
assertEquals(1, messages.size());
ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1);
List<Thread> threadWithMessages = getThreadWithMessage(feedMessageFormatter, changeEvent);
assertEquals(1, threadWithMessages.size());
assertEquals(
"Added **owner**: <span class=\"diff-added\">User One</span>",
messages.values().iterator().next());
threadWithMessages.get(0).getMessage());
}
@Test
@ -121,14 +149,14 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
ChangeDescription changeDescription = new ChangeDescription();
fieldUpdated(changeDescription, "description", "old description", "new description");
Map<EntityLink, String> messages =
getFormattedMessages(feedMessageFormatter, changeDescription, TABLE);
assertEquals(1, messages.size());
ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1);
List<Thread> threadMessages = getThreadWithMessage(feedMessageFormatter, changeEvent);
assertEquals(1, threadMessages.size());
assertEquals(
"Updated **description**: <span class=\"diff-removed\">old</span> "
+ "<span class=\"diff-added\">new</span> description",
messages.values().iterator().next());
threadMessages.get(0).getMessage());
// test if it updates correctly with one add and one delete change
changeDescription = new ChangeDescription().withPreviousVersion(1.0);
@ -136,12 +164,12 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
fieldDeleted(changeDescription, "description", "old description");
// now test if both the type of updates give the same message
Map<EntityLink, String> updatedMessages =
getFormattedMessages(feedMessageFormatter, changeDescription, TABLE);
assertEquals(1, updatedMessages.size());
changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1);
List<Thread> threadUpdatedMessages = getThreadWithMessage(feedMessageFormatter, changeEvent);
assertEquals(1, threadUpdatedMessages.size());
assertEquals(messages.keySet().iterator().next(), updatedMessages.keySet().iterator().next());
assertEquals(messages.values().iterator().next(), updatedMessages.values().iterator().next());
assertEquals(threadMessages.get(0).getMessage(), threadUpdatedMessages.get(0).getMessage());
assertEquals(threadMessages.get(0).getAbout(), threadUpdatedMessages.get(0).getAbout());
}
@Test
@ -150,25 +178,29 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
// Simulate a change of description in table
fieldUpdated(changeDescription, "description", "old description", "new description");
Map<EntityLink, String> messages =
getFormattedMessages(slackMessageFormatter, changeDescription, TABLE);
assertEquals(1, messages.size());
ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1);
List<Thread> threadsWithMessages = getThreadWithMessage(slackMessageFormatter, changeEvent);
assertEquals(1, threadsWithMessages.size());
assertEquals(
"Updated *description*: ~old~ *new* description", messages.values().iterator().next());
"Updated *description*: ~old~ *new* description", threadsWithMessages.get(0).getMessage());
// test if it updates correctly with one add and one delete change
changeDescription = new ChangeDescription().withPreviousVersion(1.0);
fieldAdded(changeDescription, "description", "new description");
fieldDeleted(changeDescription, "description", "old description");
changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.0, 1.1);
List<Thread> threadsWithUpdatedMessages =
getThreadWithMessage(slackMessageFormatter, changeEvent);
// now test if both the type of updates give the same message
Map<EntityLink, String> updatedMessages =
getFormattedMessages(slackMessageFormatter, changeDescription, TABLE);
assertEquals(1, updatedMessages.size());
assertEquals(1, threadsWithUpdatedMessages.size());
assertEquals(messages.keySet().iterator().next(), updatedMessages.keySet().iterator().next());
assertEquals(messages.values().iterator().next(), updatedMessages.values().iterator().next());
assertEquals(
threadsWithMessages.get(0).getAbout(), threadsWithUpdatedMessages.get(0).getAbout());
assertEquals(
threadsWithMessages.get(0).getMessage(), threadsWithUpdatedMessages.get(0).getMessage());
}
@Test
@ -185,13 +217,14 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
"columns",
"[{\"name\":\"lo_order\",\"displayName\":\"lo_order\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_order\",\"constraint\":\"NOT_NULL\"}]");
Map<EntityLink, String> messages =
getFormattedMessages(feedMessageFormatter, changeDescription, TABLE);
assertEquals(1, messages.size());
ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4);
List<Thread> threadWithMessages = getThreadWithMessage(feedMessageFormatter, changeEvent);
assertEquals(1, threadWithMessages.size());
assertEquals(
"Updated **columns**: lo_order <span class=\"diff-added\">priority</span>",
messages.values().iterator().next());
threadWithMessages.get(0).getMessage());
// Simulate a change of datatype change in column
changeDescription = new ChangeDescription().withPreviousVersion(1.3);
@ -204,10 +237,11 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
"columns",
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"tags\":[],\"constraint\":\"NOT_NULL\"}]");
messages = getFormattedMessages(feedMessageFormatter, changeDescription, TABLE);
assertEquals(1, messages.size());
changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4);
threadWithMessages = getThreadWithMessage(feedMessageFormatter, changeEvent);
assertEquals(1, threadWithMessages.size());
assertEquals("Updated **columns**: lo_orderpriority", messages.values().iterator().next());
assertEquals("Updated **columns**: lo_orderpriority", threadWithMessages.get(0).getMessage());
// Simulate multiple changes to columns
changeDescription = new ChangeDescription().withPreviousVersion(1.4);
@ -220,12 +254,13 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
"columns",
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\"}]");
messages = getFormattedMessages(feedMessageFormatter, changeDescription, TABLE);
assertEquals(1, messages.size());
changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4);
threadWithMessages = getThreadWithMessage(feedMessageFormatter, changeEvent);
assertEquals(1, threadWithMessages.size());
assertEquals(
"Updated **columns**: lo_orderpriority <span class=\"diff-added\">, newColumn</span>",
messages.values().iterator().next());
threadWithMessages.get(0).getMessage());
}
@Test
@ -242,11 +277,11 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
"columns",
"[{\"name\":\"lo_order\",\"displayName\":\"lo_order\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_order\",\"constraint\":\"NOT_NULL\"}]");
Map<EntityLink, String> messages =
getFormattedMessages(slackMessageFormatter, changeDescription, TABLE);
assertEquals(1, messages.size());
ChangeEvent changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4);
List<Thread> threadWithMessages = getThreadWithMessage(slackMessageFormatter, changeEvent);
assertEquals(1, threadWithMessages.size());
assertEquals("Updated *columns*: lo_order *priority*", messages.values().iterator().next());
assertEquals("Updated *columns*: lo_order *priority*", threadWithMessages.get(0).getMessage());
// Simulate a change of datatype change in column
changeDescription = new ChangeDescription().withPreviousVersion(1.3);
@ -259,10 +294,11 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
"columns",
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"tags\":[],\"constraint\":\"NOT_NULL\"}]");
messages = getFormattedMessages(slackMessageFormatter, changeDescription, TABLE);
assertEquals(1, messages.size());
changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4);
threadWithMessages = getThreadWithMessage(slackMessageFormatter, changeEvent);
assertEquals(1, threadWithMessages.size());
assertEquals("Updated *columns*: lo_orderpriority", messages.values().iterator().next());
assertEquals("Updated *columns*: lo_orderpriority", threadWithMessages.get(0).getMessage());
// Simulate multiple changes to columns
changeDescription = new ChangeDescription().withPreviousVersion(1.4);
@ -275,10 +311,12 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
"columns",
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\"}]");
messages = getFormattedMessages(slackMessageFormatter, changeDescription, TABLE);
assertEquals(1, messages.size());
changeEvent = getChangeEvent(EventType.ENTITY_UPDATED, changeDescription, 1.3, 1.4);
threadWithMessages = getThreadWithMessage(slackMessageFormatter, changeEvent);
assertEquals(1, threadWithMessages.size());
assertEquals(
"Updated *columns*: lo_orderpriority *, newColumn*", messages.values().iterator().next());
"Updated *columns*: lo_orderpriority *, newColumn*",
threadWithMessages.get(0).getMessage());
}
}

View File

@ -2980,7 +2980,7 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
assertEquals(EventType.ENTITY_CREATED, changeEvent.getEventType());
assertEquals(0.1, changeEvent.getPreviousVersion());
assertNull(changeEvent.getChangeDescription());
T changeEventEntity = JsonUtils.readValue((String) changeEvent.getEntity(), entityClass);
T changeEventEntity = JsonUtils.readOrConvertValue(changeEvent.getEntity(), entityClass);
validateCommonEntityFields(entity, changeEventEntity, getPrincipalName(authHeaders));
compareChangeEventsEntities(entity, changeEventEntity, authHeaders);
} else if (expectedEventType == EventType.ENTITY_UPDATED) {

View File

@ -189,7 +189,7 @@ public class EventSubscriptionResourceTest
deleteEntity(alert.getId(), ADMIN_AUTH_HEADERS);
}
private void waitForAllEventToComplete(UUID alertId) throws HttpResponseException {
public void waitForAllEventToComplete(UUID alertId) throws HttpResponseException {
boolean result;
do {
WebTarget target =

View File

@ -18,6 +18,7 @@ import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.awaitility.Awaitility.with;
import static org.awaitility.Durations.ONE_MINUTE;
import static org.awaitility.Durations.ONE_SECOND;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -85,6 +86,7 @@ import org.openmetadata.schema.api.feed.ResolveTask;
import org.openmetadata.schema.api.feed.ThreadCount;
import org.openmetadata.schema.api.teams.CreateTeam;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.entity.teams.Team;
import org.openmetadata.schema.entity.teams.User;
@ -110,6 +112,7 @@ import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.util.FeedMessage;
import org.openmetadata.service.jdbi3.FeedRepository.FilterType;
import org.openmetadata.service.resources.databases.TableResourceTest;
import org.openmetadata.service.resources.events.EventSubscriptionResourceTest;
import org.openmetadata.service.resources.feeds.FeedResource.PostList;
import org.openmetadata.service.resources.feeds.FeedResource.ThreadList;
import org.openmetadata.service.resources.teams.TeamResourceTest;
@ -265,6 +268,11 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
@Test
@Order(1)
void post_validThreadAndList_200(TestInfo test) throws IOException {
EventSubscriptionResourceTest eventSubscriptionResourceTest =
new EventSubscriptionResourceTest();
EventSubscription subscription =
eventSubscriptionResourceTest.getEntityByName("ActivityFeedAlert", ADMIN_AUTH_HEADERS);
eventSubscriptionResourceTest.waitForAllEventToComplete(subscription.getId());
int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
int userThreadCount = listThreads(USER_LINK, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
int tableThreadCount = listThreads(TABLE_LINK, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
@ -753,6 +761,11 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
@NullSource
@MethodSource("provideStringsForListThreads")
void get_listThreadsWithPagination(String entityLink) throws HttpResponseException {
EventSubscriptionResourceTest eventSubscriptionResourceTest =
new EventSubscriptionResourceTest();
EventSubscription subscription =
eventSubscriptionResourceTest.getEntityByName("ActivityFeedAlert", ADMIN_AUTH_HEADERS);
eventSubscriptionResourceTest.waitForAllEventToComplete(subscription.getId());
// Create 10 threads
int totalThreadCount = listThreads(entityLink, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
for (int i = 1; i <= 10; i++) {
@ -1178,6 +1191,11 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
@Test
void list_threadsWithOwnerOrFollowerFilter() throws HttpResponseException {
EventSubscriptionResourceTest eventSubscriptionResourceTest =
new EventSubscriptionResourceTest();
EventSubscription subscription =
eventSubscriptionResourceTest.getEntityByName("ActivityFeedAlert", ADMIN_AUTH_HEADERS);
eventSubscriptionResourceTest.waitForAllEventToComplete(subscription.getId());
int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
String user1 = USER1.getId().toString(); // user1 is the owner of TABLE
// Get thread counts for user1 and user2
@ -1211,6 +1229,7 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
followTable(TABLE2.getId(), USER1.getId(), USER_AUTH_HEADERS);
with()
.pollInterval(ONE_SECOND)
.timeout(ONE_MINUTE)
.await("Threads With Follows")
.until(
() -> {
@ -1230,6 +1249,11 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
@Test
@Order(2)
void list_threadsWithMentionsFilter() throws HttpResponseException {
EventSubscriptionResourceTest eventSubscriptionResourceTest =
new EventSubscriptionResourceTest();
EventSubscription subscription =
eventSubscriptionResourceTest.getEntityByName("ActivityFeedAlert", ADMIN_AUTH_HEADERS);
eventSubscriptionResourceTest.waitForAllEventToComplete(subscription.getId());
// Create a thread with user mention
createAndCheck(
create()
@ -1254,6 +1278,11 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
@Test
void list_threadsWithFollowsFilter() throws HttpResponseException {
EventSubscriptionResourceTest eventSubscriptionResourceTest =
new EventSubscriptionResourceTest();
EventSubscription subscription =
eventSubscriptionResourceTest.getEntityByName("ActivityFeedAlert", ADMIN_AUTH_HEADERS);
eventSubscriptionResourceTest.waitForAllEventToComplete(subscription.getId());
// Get the initial thread count of TABLE2
String entityLink = String.format("<#E::table::%s>", TABLE2.getFullyQualifiedName());
int initialThreadCount =
@ -1271,6 +1300,7 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
// User started following this table
with()
.pollInterval(ONE_SECOND)
.timeout(ONE_MINUTE)
.await("Threads With Follows")
.until(
() -> {

View File

@ -0,0 +1,14 @@
{
"$id": "https://open-metadata.org/schema/entity/feed/assets.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "AssetsFeedInfo",
"javaType": "org.openmetadata.schema.entity.feed.AssetsFeedInfo",
"description": "This schema defines the schema for Assets addition/deletion Updates.",
"type": "object",
"properties": {
"updatedAssets": {
"$ref": "../../type/entityReferenceList.json"
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,23 @@
{
"$id": "https://open-metadata.org/schema/entity/feed/description.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DescriptionFeedInfo",
"javaType": "org.openmetadata.schema.entity.feed.DescriptionFeedInfo",
"description": "This schema defines the schema for Description Updates.",
"type": "object",
"properties": {
"previousDescription": {
"type": "string",
"description": "The previous description of the entity."
},
"newDescription": {
"description": "The new description of the entity.",
"type": "string"
},
"diffMessage": {
"description": "The difference between the previous and new descriptions.",
"type": "string"
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,19 @@
{
"$id": "https://open-metadata.org/schema/entity/feed/owner.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "OwnerFeedInfo",
"description": "This schema defines the schema for Ownership Updates for Feed.",
"javaType": "org.openmetadata.schema.entity.feed.OwnerFeedInfo",
"type": "object",
"properties": {
"previousOwner": {
"description": "List of previous tags.",
"type": "string"
},
"updatedOwner": {
"description": "Updated Owners.",
"type": "string"
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,25 @@
{
"$id": "https://open-metadata.org/schema/entity/feed/tag.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "TagFeedInfo",
"description": "This schema defines the schema for Tag Updates for Feed.",
"javaType": "org.openmetadata.schema.entity.feed.TagFeedInfo",
"type": "object",
"properties": {
"previousTags": {
"description": "List of previous tags.",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
}
},
"updatedTags": {
"description": "List of updated tags.",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
}
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,25 @@
{
"$id": "https://open-metadata.org/schema/entity/feed/testCaseResult.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "TestCaseResultFeedInfo",
"description": "This schema defines the schema for Test Case Result Updates for Feed.",
"javaType": "org.openmetadata.schema.entity.feed.TestCaseResultFeedInfo",
"type": "object",
"properties": {
"entityTestResultSummary": {
"description": "Summary of test case execution",
"type": "array",
"items": {
"$ref": "../../tests/testSuite.json#/definitions/testSuiteConnection/resultSummary"
}
},
"testCaseResult": {
"description": "Test Case Result for last 7 days.",
"type": "array",
"items": {
"$ref": "../../tests/basic.json#/definitions/testCaseResult"
}
}
},
"additionalProperties": false
}

View File

@ -4,7 +4,6 @@
"title": "Thread",
"description": "This schema defines the Thread entity. A Thread is a collection of posts made by the users. The first post that starts a thread is **about** a data asset **from** a user. Other users can respond to this post by creating new posts in the thread. Note that bot users can also interact with a thread. A post can contains links that mention Users or other Data Assets.",
"type": "object",
"definitions": {
"taskType": {
"javaType": "org.openmetadata.schema.type.TaskType",
@ -212,6 +211,63 @@
"description": "Entity Id of the entity in `about` that the thread belongs to.",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"entityType": {
"description": "Entity Type the thread is about.",
"type": "string"
},
"entityUrlLink": {
"description": "Link to the entity in `about` that the thread belongs to.",
"type": "string"
},
"generatedBy": {
"description": "User or team that generated the thread.",
"type": "string",
"enum": ["user", "system"],
"default": "user"
},
"cardStyle": {
"description": "Card style for the thread.",
"type": "string",
"enum": ["default", "description", "tags", "owner", "testCaseResult", "customProperties"],
"default": "default"
},
"fieldOperation": {
"description": "Operation on thread, whether the field was added, or updated or deleted.",
"type": "string",
"enum": ["added", "updated", "deleted"],
"default": "updated"
},
"feedInfo": {
"description": "Entity Id of the entity in `about` that the thread belongs to.",
"type": "object",
"properties": {
"headerMessage": {
"description": "Header message for the feed.",
"type": "string"
},
"fieldName": {
"description": "Field Name message for the feed.",
"type": "string"
},
"entitySpecificInfo": {
"oneOf": [
{
"$ref": "./description.json"
},
{
"$ref": "./owner.json"
},
{
"$ref": "./testCaseResult.json"
},
{
"$ref": "./tag.json"
}
]
}
},
"additionalProperties": false
},
"addressedTo": {
"description": "User or team this thread is addressed to in format <#E::{entities}::{entityName}::{field}::{fieldValue}.",
"$ref": "../../type/basic.json#/definitions/entityLink"