[Backend] Unify Slack and Activity Feed Parser (#6064)

* [Backend][SAML] Unify Slack and Activity Feed Parser

* [Backend][UnifyParser] added Slack Tests
This commit is contained in:
mohitdeuex 2022-07-16 01:38:29 +05:30 committed by GitHub
parent 4c30b01286
commit 081bfa1e13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 262 additions and 248 deletions

View File

@ -305,7 +305,8 @@ public class ChangeEventHandler implements EventHandler {
private List<Thread> getThreads(
EntityInterface entity, ChangeDescription changeDescription, String loggedInUserName) {
List<Thread> threads = new ArrayList<>();
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, entity);
Map<EntityLink, String> messages =
ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.FEED, changeDescription, entity);
// Create an automated thread
for (var link : messages.keySet()) {

View File

@ -75,12 +75,9 @@ import org.openmetadata.catalog.type.TaskDetails;
import org.openmetadata.catalog.type.TaskStatus;
import org.openmetadata.catalog.type.TaskType;
import org.openmetadata.catalog.type.ThreadType;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.*;
import org.openmetadata.catalog.util.RestUtil.DeleteResponse;
import org.openmetadata.catalog.util.RestUtil.PatchResponse;
import org.openmetadata.catalog.util.ResultList;
@Slf4j
public class FeedRepository {
@ -367,7 +364,9 @@ public class FeedRepository {
oldValue = task.getOldValue();
}
message =
String.format("Resolved the Task with Description - %s", getPlaintextDiff(oldValue, task.getNewValue()));
String.format(
"Resolved the Task with Description - %s",
getPlaintextDiff(ChangeEventParser.PUBLISH_TO.FEED, oldValue, task.getNewValue()));
} else if (List.of(TaskType.RequestTag, TaskType.UpdateTag).contains(type)) {
List<TagLabel> tags;
if (task.getOldValue() != null) {
@ -376,7 +375,10 @@ public class FeedRepository {
}
tags = JsonUtils.readObjects(task.getNewValue(), TagLabel.class);
String newValue = getTagFQNs(tags);
message = String.format("Resolved the Task with Tag(s) - %s", getPlaintextDiff(oldValue, newValue));
message =
String.format(
"Resolved the Task with Tag(s) - %s",
getPlaintextDiff(ChangeEventParser.PUBLISH_TO.FEED, oldValue, newValue));
} else {
message = "Resolved the Task.";
}

View File

@ -1,33 +1,20 @@
package org.openmetadata.catalog.slack;
import static org.openmetadata.catalog.Entity.FIELD_DELETED;
import static org.openmetadata.catalog.Entity.FIELD_DESCRIPTION;
import static org.openmetadata.catalog.Entity.FIELD_FOLLOWERS;
import static org.openmetadata.catalog.Entity.FIELD_OWNER;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONArray;
import org.json.JSONObject;
import org.openmetadata.catalog.events.AbstractEventPublisher;
import org.openmetadata.catalog.events.errors.EventPublisherException;
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.EventType;
import org.openmetadata.catalog.type.FieldChange;
import org.openmetadata.catalog.util.ChangeEventParser;
@Slf4j
public class SlackWebhookEventPublisher extends AbstractEventPublisher {
@ -62,7 +49,7 @@ public class SlackWebhookEventPublisher extends AbstractEventPublisher {
public void publish(ChangeEventList events) throws EventPublisherException {
for (ChangeEvent event : events.getData()) {
try {
SlackMessage slackMessage = buildSlackMessage(event);
SlackMessage slackMessage = ChangeEventParser.buildSlackMessage(event, getEntityUrl(event));
Response response =
target.post(javax.ws.rs.client.Entity.entity(slackMessage, MediaType.APPLICATION_JSON_TYPE));
if (response.getStatus() >= 300 && response.getStatus() < 400) {
@ -77,178 +64,6 @@ public class SlackWebhookEventPublisher extends AbstractEventPublisher {
}
}
private SlackMessage buildSlackMessage(ChangeEvent event) {
SlackMessage slackMessage = new SlackMessage();
slackMessage.setUsername(event.getUserName());
if (event.getEntity() != null) {
String headerText = getHeaderText(event);
slackMessage.setText(headerText);
}
List<SlackAttachment> attachmentList = new ArrayList<>();
List<SlackAttachment> addedEvents = getAddedEventsText(event);
List<SlackAttachment> updatedEvents = getUpdatedEventsText(event);
List<SlackAttachment> deletedEvents = getDeletedEventsText(event);
attachmentList.addAll(addedEvents);
attachmentList.addAll(updatedEvents);
attachmentList.addAll(deletedEvents);
slackMessage.setAttachments(attachmentList.toArray(new SlackAttachment[0]));
return slackMessage;
}
private String getHeaderText(ChangeEvent event) {
String headerTxt = "%s %s %s";
String operation = "";
String entityUrl = getEntityUrl(event);
if (event.getEventType().equals(EventType.ENTITY_CREATED)) {
operation = "created";
} else if (event.getEventType().equals(EventType.ENTITY_UPDATED)) {
operation = "updated";
} else if (event.getEventType().equals(EventType.ENTITY_SOFT_DELETED)) {
operation = "deleted";
}
return String.format(headerTxt, event.getUserName(), operation, entityUrl);
}
private List<SlackAttachment> getAddedEventsText(ChangeEvent event) {
List<SlackAttachment> attachments = new ArrayList<>();
ChangeDescription changeDescription = event.getChangeDescription();
if (changeDescription != null
&& changeDescription.getFieldsAdded() != null
&& !changeDescription.getFieldsAdded().isEmpty()) {
for (FieldChange fieldChange : changeDescription.getFieldsAdded()) {
SlackAttachment attachment = new SlackAttachment();
StringBuilder title = new StringBuilder();
if (fieldChange.getName().contains("tags")) {
title.append("Added tags ");
title.append(fieldChange.getName().replace("tags", ""));
attachment.setText(parseTags((String) fieldChange.getNewValue()));
} else if (fieldChange.getName().equals("columns")) {
title.append("Added new columns ");
attachment.setText(parseColumns((String) fieldChange.getNewValue(), event));
} else if (fieldChange.getName().equals(FIELD_OWNER)) {
title.append("Added ownership");
attachment.setText(parseOwnership((String) fieldChange.getOldValue(), (String) fieldChange.getNewValue()));
} else if (fieldChange.getName().equals(FIELD_FOLLOWERS)) {
@SuppressWarnings("unchecked")
String followers = parseFollowers((List<EntityReference>) fieldChange.getNewValue());
String entityUrl = getEntityUrl(event);
attachment.setText(followers + " started following " + entityUrl);
} else if (fieldChange.getName().contains(FIELD_DESCRIPTION)) {
title.append("Added description to ");
title.append(fieldChange.getName());
attachment.setText((String) fieldChange.getNewValue());
}
attachment.setTitle(title.toString());
attachments.add(attachment);
}
}
return attachments;
}
private List<SlackAttachment> getUpdatedEventsText(ChangeEvent event) {
List<SlackAttachment> attachments = new ArrayList<>();
ChangeDescription changeDescription = event.getChangeDescription();
if (changeDescription != null
&& changeDescription.getFieldsUpdated() != null
&& !changeDescription.getFieldsUpdated().isEmpty()) {
for (FieldChange fieldChange : changeDescription.getFieldsUpdated()) {
// when the entity is deleted we will get deleted set as true. We do not need to parse this for Slack messages.
if (!fieldChange.getName().equals(FIELD_DELETED)) {
SlackAttachment attachment = new SlackAttachment();
attachment.setTitle("Updated " + fieldChange.getName());
if (fieldChange.getName().equals(FIELD_OWNER)) {
attachment.setText(parseOwnership((String) fieldChange.getOldValue(), (String) fieldChange.getNewValue()));
} else {
String updatedStr = fieldChange.getOldValue() + " to " + fieldChange.getNewValue();
attachment.setText(updatedStr);
}
attachments.add(attachment);
}
}
}
return attachments;
}
private List<SlackAttachment> getDeletedEventsText(ChangeEvent event) {
List<SlackAttachment> attachments = new ArrayList<>();
ChangeDescription changeDescription = event.getChangeDescription();
if (changeDescription != null
&& changeDescription.getFieldsDeleted() != null
&& !changeDescription.getFieldsDeleted().isEmpty()) {
for (FieldChange fieldChange : changeDescription.getFieldsDeleted()) {
SlackAttachment attachment = new SlackAttachment();
StringBuilder title = new StringBuilder();
if (fieldChange.getName().contains("tags")) {
attachment = new SlackAttachment();
title.append("Deleted tags from ");
title.append(fieldChange.getName().replace(".tags", ""));
attachment.setText(parseTags((String) fieldChange.getOldValue()));
attachment.setTitle(title.toString());
} else if (fieldChange.getName().contains("columns")) {
attachment = new SlackAttachment();
title.append("Deleted columns ");
attachment.setText(parseColumns((String) fieldChange.getOldValue(), event));
attachment.setTitle(title.toString());
} else if (fieldChange.getName().equals(FIELD_FOLLOWERS)) {
@SuppressWarnings("unchecked")
String followers = parseFollowers((List<EntityReference>) fieldChange.getOldValue());
String entityUrl = getEntityUrl(event);
attachment.setText(followers + " unfollowed " + entityUrl);
}
attachments.add(attachment);
}
}
return attachments;
}
private String parseTags(String tags) {
JSONArray jsonTags = new JSONArray(tags);
StringBuilder tagsText = new StringBuilder("\n");
for (int i = 0; i < jsonTags.length(); i++) {
String tagFQN = jsonTags.getJSONObject(i).getString("tagFQN");
tagsText.append("- ");
tagsText.append(tagFQN);
tagsText.append("\n");
}
return tagsText.toString();
}
private String parseColumns(String columns, ChangeEvent event) {
JSONArray jsonColumns = new JSONArray(columns);
StringBuilder columnsText = new StringBuilder("\n");
for (int i = 0; i < jsonColumns.length(); i++) {
String columnName = jsonColumns.getJSONObject(i).getString("name");
String columnType = jsonColumns.getJSONObject(i).getString("dataType");
String columnFQDN = jsonColumns.getJSONObject(i).getString("fullyQualifiedName");
String columnURL = String.format("<%s/%s/%s|%s>", openMetadataUrl, event.getEntityType(), columnFQDN, columnName);
columnsText.append("- ");
columnsText.append(columnURL);
columnsText.append(" of type ");
columnsText.append(columnType);
columnsText.append("\n");
}
return columnsText.toString();
}
private String parseOwnership(String prevOwner, String newOwner) {
StringBuilder owner = new StringBuilder("\n");
if (prevOwner != null) {
JSONObject prevOwnerJson = new JSONObject(prevOwner);
owner.append("Updated from previous owner ");
owner.append(prevOwnerJson.getString("name"));
owner.append(" to ");
} else {
owner.append("Set owner to ");
}
JSONObject newOwnerJson = new JSONObject(newOwner);
owner.append(newOwnerJson.getString("name"));
return owner.toString();
}
private String parseFollowers(List<EntityReference> followers) {
return followers.stream().map(EntityReference::getName).collect(Collectors.joining(","));
}
private String getEntityUrl(ChangeEvent event) {
return String.format(
"<%s/%s/%s|%s>",

View File

@ -37,11 +37,24 @@ import org.apache.commons.lang.StringUtils;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.EntityInterface;
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.catalog.slack.SlackAttachment;
import org.openmetadata.catalog.slack.SlackMessage;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.FieldChange;
public final class ChangeEventParser {
public static final String FEED_ADD_MARKER = "<!add>";
public static final String FEED_REMOVE_MARKER = "<!remove>";
public static final String SLACK_STRIKE_MARKER = "~%s~ ";
public static final String FEED_BOLD = "**%s**";
public static final String SLACK_BOLD = "*%s* ";
public static final String FEED_SPAN_ADD = "<span class=\"diff-added\">";
public static final String FEED_SPAN_REMOVE = "<span class=\"diff-removed\">";
public static final String FEED_SPAN_CLOSE = "</span>";
public static final String FEED_LINE_BREAK = " <br/> ";
public static final String SLACK_LINE_BREAK = "\n";
private ChangeEventParser() {}
@ -51,25 +64,53 @@ public final class ChangeEventParser {
DELETE
}
public enum PUBLISH_TO {
FEED,
SLACK
}
public static SlackMessage buildSlackMessage(ChangeEvent event, String omdurl) {
SlackMessage slackMessage = new SlackMessage();
slackMessage.setUsername(event.getUserName());
if (event.getEntity() != null) {
String headerTxt = "%s posted on " + event.getEntityType() + " %s";
String headerText = String.format(headerTxt, event.getUserName(), omdurl);
slackMessage.setText(headerText);
}
Map<EntityLink, String> messages =
getFormattedMessages(PUBLISH_TO.SLACK, event.getChangeDescription(), (EntityInterface) event.getEntity());
List<SlackAttachment> attachmentList = new ArrayList<>();
for (var entryset : messages.entrySet()) {
SlackAttachment attachment = new SlackAttachment();
List<String> mark = new ArrayList<>();
mark.add("text");
attachment.setMarkdownIn(mark);
attachment.setText(entryset.getValue());
attachmentList.add(attachment);
}
slackMessage.setAttachments(attachmentList.toArray(new SlackAttachment[0]));
return slackMessage;
}
public static Map<EntityLink, String> getFormattedMessages(
ChangeDescription changeDescription, EntityInterface entity) {
PUBLISH_TO publishTo, ChangeDescription changeDescription, EntityInterface entity) {
// Store a map of entityLink -> message
Map<EntityLink, String> messages;
List<FieldChange> fieldsUpdated = changeDescription.getFieldsUpdated();
messages = getFormattedMessages(entity, fieldsUpdated, CHANGE_TYPE.UPDATE);
messages = getFormattedMessagesForAllFieldChange(publishTo, entity, fieldsUpdated, CHANGE_TYPE.UPDATE);
// fieldsAdded and fieldsDeleted need special handling since
// there is a possibility to merge them as one update message.
List<FieldChange> fieldsAdded = changeDescription.getFieldsAdded();
List<FieldChange> fieldsDeleted = changeDescription.getFieldsDeleted();
messages.putAll(getFormattedMessages(entity, fieldsAdded, fieldsDeleted));
messages.putAll(mergeAddtionsDeletion(publishTo, entity, fieldsAdded, fieldsDeleted));
return messages;
}
private static Map<EntityLink, String> getFormattedMessages(
EntityInterface entity, List<FieldChange> fields, CHANGE_TYPE changeType) {
private static Map<EntityLink, String> getFormattedMessagesForAllFieldChange(
PUBLISH_TO publishTo, EntityInterface entity, List<FieldChange> fields, CHANGE_TYPE changeType) {
Map<EntityLink, String> messages = new HashMap<>();
for (var field : fields) {
@ -80,7 +121,7 @@ public final class ChangeEventParser {
String oldFieldValue = getFieldValue(field.getOldValue());
EntityLink link = getEntityLink(fieldName, entity);
if (!fieldName.equals("failureDetails")) {
String message = getFormattedMessage(link, changeType, fieldName, oldFieldValue, newFieldValue);
String message = createMessageForField(publishTo, link, changeType, fieldName, oldFieldValue, newFieldValue);
messages.put(link, message);
}
}
@ -135,8 +176,8 @@ public final class ChangeEventParser {
}
/** Tries to merge additions and deletions into updates and returns a map of formatted messages. */
private static Map<EntityLink, String> getFormattedMessages(
EntityInterface entity, List<FieldChange> addedFields, List<FieldChange> deletedFields) {
private static Map<EntityLink, String> mergeAddtionsDeletion(
PUBLISH_TO publishTo, EntityInterface entity, List<FieldChange> addedFields, List<FieldChange> deletedFields) {
// Major schema version changes such as renaming a column from colA to colB
// will be recorded as "Removed column colA" and "Added column colB"
// This method will try to detect such changes and combine those events into one update.
@ -146,9 +187,9 @@ public final class ChangeEventParser {
// if there is only added fields or only deleted fields, we cannot merge
if (addedFields.isEmpty() || deletedFields.isEmpty()) {
if (!addedFields.isEmpty()) {
messages = getFormattedMessages(entity, addedFields, CHANGE_TYPE.ADD);
messages = getFormattedMessagesForAllFieldChange(publishTo, entity, addedFields, CHANGE_TYPE.ADD);
} else if (!deletedFields.isEmpty()) {
messages = getFormattedMessages(entity, deletedFields, CHANGE_TYPE.DELETE);
messages = getFormattedMessagesForAllFieldChange(publishTo, entity, deletedFields, CHANGE_TYPE.DELETE);
}
return messages;
}
@ -160,19 +201,21 @@ public final class ChangeEventParser {
EntityLink link = getEntityLink(fieldName, entity);
// convert the added field and deleted field into one update message
String message =
getFormattedMessage(
link, CHANGE_TYPE.UPDATE, fieldName, field.getOldValue(), addedField.get().getNewValue());
createMessageForField(
publishTo, link, CHANGE_TYPE.UPDATE, fieldName, field.getOldValue(), addedField.get().getNewValue());
messages.put(link, message);
// Remove the field from addedFields list to avoid double processing
addedFields = addedFields.stream().filter(f -> !f.equals(addedField.get())).collect(Collectors.toList());
} else {
// process the deleted field
messages.putAll(getFormattedMessages(entity, Collections.singletonList(field), CHANGE_TYPE.DELETE));
messages.putAll(
getFormattedMessagesForAllFieldChange(
publishTo, entity, Collections.singletonList(field), CHANGE_TYPE.DELETE));
}
}
// process the remaining added fields
if (!addedFields.isEmpty()) {
messages.putAll(getFormattedMessages(entity, addedFields, CHANGE_TYPE.ADD));
messages.putAll(getFormattedMessagesForAllFieldChange(publishTo, entity, addedFields, CHANGE_TYPE.ADD));
}
return messages;
}
@ -199,8 +242,13 @@ public final class ChangeEventParser {
return new EntityLink(entityType, entityFQN, fieldName, arrayFieldName, arrayFieldValue);
}
private static String getFormattedMessage(
EntityLink link, CHANGE_TYPE changeType, String fieldName, Object oldFieldValue, Object newFieldValue) {
private static String createMessageForField(
PUBLISH_TO publishTo,
EntityLink link,
CHANGE_TYPE changeType,
String fieldName,
Object oldFieldValue,
Object newFieldValue) {
String arrayFieldName = link.getArrayFieldName();
String arrayFieldValue = link.getArrayFieldValue();
@ -216,19 +264,27 @@ public final class ChangeEventParser {
case ADD:
String fieldValue = getFieldValue(newFieldValue);
if (Entity.FIELD_FOLLOWERS.equals(updatedField)) {
message = String.format("Followed **%s** `%s`", link.getEntityType(), link.getEntityFQN());
message =
String.format(
("Followed " + (publishTo == PUBLISH_TO.FEED ? FEED_BOLD : SLACK_BOLD) + " `%s`"),
link.getEntityType(),
link.getEntityFQN());
} else if (fieldValue != null && !fieldValue.isEmpty()) {
message = String.format("Added **%s**: `%s`", updatedField, fieldValue);
message =
String.format(
("Added " + (publishTo == PUBLISH_TO.FEED ? FEED_BOLD : SLACK_BOLD) + ": `%s`"),
updatedField,
fieldValue);
}
break;
case UPDATE:
message = getUpdateMessage(updatedField, oldFieldValue, newFieldValue);
message = getUpdateMessage(publishTo, updatedField, oldFieldValue, newFieldValue);
break;
case DELETE:
if (Entity.FIELD_FOLLOWERS.equals(updatedField)) {
message = String.format("Unfollowed %s `%s`", link.getEntityType(), link.getEntityFQN());
} else {
message = String.format("Deleted **%s**", updatedField);
message = String.format(("Deleted " + (publishTo == PUBLISH_TO.FEED ? FEED_BOLD : SLACK_BOLD)), updatedField);
}
break;
default:
@ -237,40 +293,56 @@ public final class ChangeEventParser {
return message;
}
private static String getPlainTextUpdateMessage(String updatedField, String oldValue, String newValue) {
private static String getPlainTextUpdateMessage(
PUBLISH_TO publishTo, String updatedField, String oldValue, String newValue) {
// Get diff of old value and new value
String diff = getPlaintextDiff(oldValue, newValue);
return nullOrEmpty(diff) ? StringUtils.EMPTY : String.format("Updated **%s**: %s", updatedField, diff);
String diff = getPlaintextDiff(publishTo, oldValue, newValue);
return nullOrEmpty(diff)
? StringUtils.EMPTY
: String.format(
"Updated " + (publishTo == PUBLISH_TO.FEED ? FEED_BOLD : SLACK_BOLD) + ": %s", updatedField, diff);
}
private static String getObjectUpdateMessage(String updatedField, JsonObject oldJson, JsonObject newJson) {
private static String getObjectUpdateMessage(
PUBLISH_TO publishTo, String updatedField, JsonObject oldJson, JsonObject newJson) {
List<String> labels = new ArrayList<>();
Set<String> keys = newJson.keySet();
// check if each key's value is the same
for (var key : keys) {
if (!newJson.get(key).equals(oldJson.get(key))) {
labels.add(
String.format("%s: %s", key, getPlaintextDiff(oldJson.get(key).toString(), newJson.get(key).toString())));
String.format(
"%s: %s", key, getPlaintextDiff(publishTo, oldJson.get(key).toString(), newJson.get(key).toString())));
}
}
String updates = String.join(" <br/> ", labels);
String updates = String.join((publishTo == PUBLISH_TO.FEED ? FEED_LINE_BREAK : SLACK_LINE_BREAK), labels);
// Include name of the field if the json contains "name" key
if (newJson.containsKey("name")) {
updatedField = String.format("%s.%s", updatedField, newJson.getString("name"));
}
return String.format("Updated **%s**: <br/> %s", updatedField, updates);
return String.format(
"Updated "
+ (publishTo == PUBLISH_TO.FEED ? FEED_BOLD : SLACK_BOLD)
+ ":"
+ (publishTo == PUBLISH_TO.FEED ? FEED_LINE_BREAK : SLACK_LINE_BREAK)
+ "%s",
updatedField,
updates);
}
private static String getUpdateMessage(String updatedField, Object oldValue, Object newValue) {
private static String getUpdateMessage(PUBLISH_TO publishTo, String updatedField, Object oldValue, Object newValue) {
// New value should not be null in any case for an update
if (newValue == null) {
return StringUtils.EMPTY;
}
if (oldValue == null || oldValue.toString().isEmpty()) {
return String.format("Updated **%s** to %s", updatedField, getFieldValue(newValue));
return String.format(
"Updated " + (publishTo == PUBLISH_TO.FEED ? FEED_BOLD : SLACK_BOLD) + " to %s",
updatedField,
getFieldValue(newValue));
} else if (updatedField.contains("tags") || updatedField.contains(FIELD_OWNER)) {
return getPlainTextUpdateMessage(updatedField, getFieldValue(oldValue), getFieldValue(newValue));
return getPlainTextUpdateMessage(publishTo, updatedField, getFieldValue(oldValue), getFieldValue(newValue));
}
// if old value is not empty, and is of type array or object, the updates can be across multiple keys
// Example: [{name: "col1", dataType: "varchar", dataLength: "20"}]
@ -290,30 +362,31 @@ public final class ChangeEventParser {
if (newItem.getValueType() == ValueType.OBJECT) {
JsonObject newJsonItem = newItem.asJsonObject();
JsonObject oldJsonItem = oldItem.asJsonObject();
return getObjectUpdateMessage(updatedField, oldJsonItem, newJsonItem);
return getObjectUpdateMessage(publishTo, updatedField, oldJsonItem, newJsonItem);
} else {
return getPlainTextUpdateMessage(updatedField, newItem.toString(), oldItem.toString());
return getPlainTextUpdateMessage(publishTo, updatedField, newItem.toString(), oldItem.toString());
}
} else {
return getPlainTextUpdateMessage(updatedField, getFieldValue(oldValue), getFieldValue(newValue));
return getPlainTextUpdateMessage(publishTo, updatedField, getFieldValue(oldValue), getFieldValue(newValue));
}
} else if (newJson.getValueType() == ValueType.OBJECT) {
JsonObject newJsonObject = newJson.asJsonObject();
JsonObject oldJsonObject = oldJson.asJsonObject();
return getObjectUpdateMessage(updatedField, oldJsonObject, newJsonObject);
return getObjectUpdateMessage(publishTo, updatedField, oldJsonObject, newJsonObject);
}
} catch (JsonParsingException ex) {
// update is of type String
// ignore this exception and process update message for plain text
}
}
return getPlainTextUpdateMessage(updatedField, oldValue.toString(), newValue.toString());
return getPlainTextUpdateMessage(publishTo, updatedField, oldValue.toString(), newValue.toString());
}
public static String getPlaintextDiff(String oldValue, String newValue) {
public static String getPlaintextDiff(PUBLISH_TO publishTo, String oldValue, String newValue) {
// create a configured DiffRowGenerator
String addMarker = "<!add>";
String removeMarker = "<!remove>";
String addMarker = FEED_ADD_MARKER;
String removeMarker = FEED_REMOVE_MARKER;
DiffRowGenerator generator =
DiffRowGenerator.create()
.showInlineDiffs(true)
@ -339,17 +412,29 @@ public final class ChangeEventParser {
// Replace them with html tags to render nicely in the UI
// Example: This is a test <!remove>sentence<!remove><!add>line<!add>
// This is a test <span class="diff-removed">sentence</span><span class="diff-added">line</span>
String spanAdd = "<span class=\"diff-added\">";
String spanRemove = "<span class=\"diff-removed\">";
String spanClose = "</span>";
String spanAdd;
String spanAddClose;
String spanRemove;
String spanRemoveClose;
if (publishTo == PUBLISH_TO.FEED) {
spanAdd = FEED_SPAN_ADD;
spanAddClose = FEED_SPAN_CLOSE;
spanRemove = FEED_SPAN_REMOVE;
spanRemoveClose = FEED_SPAN_CLOSE;
} else {
spanAdd = "*";
spanAddClose = "* ";
spanRemove = "~";
spanRemoveClose = "~ ";
}
if (diff != null) {
diff = replaceWithHtml(diff, addMarker, spanAdd, spanClose);
diff = replaceWithHtml(diff, removeMarker, spanRemove, spanClose);
diff = replaceMarkers(diff, addMarker, spanAdd, spanAddClose);
diff = replaceMarkers(diff, removeMarker, spanRemove, spanRemoveClose);
}
return diff;
}
private static String replaceWithHtml(String diff, String marker, String openTag, String closeTag) {
private static String replaceMarkers(String diff, String marker, String openTag, String closeTag) {
int index = 0;
while (diff.contains(marker)) {
String replacement = index % 2 == 0 ? openTag : closeTag;

View File

@ -67,7 +67,8 @@ class ChangeEventParserResourceTest extends CatalogApplicationTest {
deleteTag.withName("tags").withOldValue("tag1");
changeDescription.withFieldsAdded(List.of(addTag)).withFieldsDeleted(List.of(deleteTag)).withPreviousVersion(1.0);
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
Map<EntityLink, String> messages =
ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.FEED, changeDescription, TABLE);
assertEquals(1, messages.size());
TagLabel tag1 = new TagLabel();
@ -79,7 +80,8 @@ class ChangeEventParserResourceTest extends CatalogApplicationTest {
addTag.withNewValue(JsonUtils.pojoToJson(List.of(tag2)));
deleteTag.withOldValue(JsonUtils.pojoToJson(List.of(tag1)));
Map<EntityLink, String> jsonMessages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
Map<EntityLink, String> jsonMessages =
ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.FEED, changeDescription, TABLE);
assertEquals(1, jsonMessages.size());
// The entity links and values of both the messages should be the same
@ -97,7 +99,8 @@ class ChangeEventParserResourceTest extends CatalogApplicationTest {
changeDescription.withFieldsAdded(List.of(addOwner)).withPreviousVersion(1.0);
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
Map<EntityLink, String> messages =
ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.FEED, changeDescription, TABLE);
assertEquals(1, messages.size());
assertEquals("Added **owner**: `User One`", messages.values().iterator().next());
@ -112,7 +115,8 @@ class ChangeEventParserResourceTest extends CatalogApplicationTest {
changeDescription.withFieldsUpdated(List.of(updateDescription)).withPreviousVersion(1.0);
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
Map<EntityLink, String> messages =
ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.FEED, changeDescription, TABLE);
assertEquals(1, messages.size());
assertEquals(
@ -132,7 +136,43 @@ class ChangeEventParserResourceTest extends CatalogApplicationTest {
.withPreviousVersion(1.0);
// now test if both the type of updates give the same message
Map<EntityLink, String> updatedMessages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
Map<EntityLink, String> updatedMessages =
ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.FEED, changeDescription, TABLE);
assertEquals(1, updatedMessages.size());
assertEquals(messages.keySet().iterator().next(), updatedMessages.keySet().iterator().next());
assertEquals(messages.values().iterator().next(), updatedMessages.values().iterator().next());
}
@Test
void testUpdateOfStringSlack() {
ChangeDescription changeDescription = new ChangeDescription();
// Simulate a change of description in table
FieldChange updateDescription = new FieldChange();
updateDescription.withName("description").withNewValue("new description").withOldValue("old description");
changeDescription.withFieldsUpdated(List.of(updateDescription)).withPreviousVersion(1.0);
Map<EntityLink, String> messages =
ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.SLACK, changeDescription, TABLE);
assertEquals(1, messages.size());
assertEquals("Updated *description* : ~old~ *new* description", messages.values().iterator().next());
// test if it updates correctly with one add and one delete change
changeDescription = new ChangeDescription();
FieldChange addDescription = new FieldChange();
FieldChange deleteDescription = new FieldChange();
addDescription.withName("description").withNewValue("new description");
deleteDescription.withName("description").withOldValue("old description");
changeDescription
.withFieldsAdded(List.of(addDescription))
.withFieldsDeleted(List.of(deleteDescription))
.withPreviousVersion(1.0);
// now test if both the type of updates give the same message
Map<EntityLink, String> updatedMessages =
ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.SLACK, changeDescription, TABLE);
assertEquals(1, updatedMessages.size());
assertEquals(messages.keySet().iterator().next(), updatedMessages.keySet().iterator().next());
@ -160,7 +200,8 @@ class ChangeEventParserResourceTest extends CatalogApplicationTest {
.withFieldsDeleted(List.of(deleteColumn))
.withPreviousVersion(1.3);
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
Map<EntityLink, String> messages =
ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.FEED, changeDescription, TABLE);
assertEquals(1, messages.size());
assertEquals(
@ -178,7 +219,7 @@ class ChangeEventParserResourceTest extends CatalogApplicationTest {
.withFieldsDeleted(List.of(deleteColumn))
.withPreviousVersion(1.3);
messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
messages = ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.FEED, changeDescription, TABLE);
assertEquals(1, messages.size());
assertEquals(
@ -196,11 +237,80 @@ class ChangeEventParserResourceTest extends CatalogApplicationTest {
.withFieldsDeleted(List.of(deleteColumn))
.withPreviousVersion(1.4);
messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
messages = ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.FEED, changeDescription, TABLE);
assertEquals(1, messages.size());
assertEquals(
"Updated **columns**: lo_orderpriority<span class=\"diff-added\">, newColumn</span>",
messages.values().iterator().next());
}
@Test
void testMajorSchemaChangeSlack() {
ChangeDescription changeDescription = new ChangeDescription();
// Simulate a change of column name in table
FieldChange addColumn = new FieldChange();
addColumn
.withName("columns")
.withNewValue(
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"constraint\":\"NOT_NULL\"}]");
FieldChange deleteColumn = new FieldChange();
deleteColumn
.withName("columns")
.withOldValue(
"[{\"name\":\"lo_order\",\"displayName\":\"lo_order\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_order\",\"constraint\":\"NOT_NULL\"}]");
changeDescription
.withFieldsAdded(List.of(addColumn))
.withFieldsDeleted(List.of(deleteColumn))
.withPreviousVersion(1.3);
Map<EntityLink, String> messages =
ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.SLACK, changeDescription, TABLE);
assertEquals(1, messages.size());
assertEquals(
"Updated *columns.lo_orderpriority* :\n"
+ "name: ~\"lo_order\"~ *\"lo_orderpriority\"* \n"
+ "displayName: ~\"lo_order\"~ *\"lo_orderpriority\"* \n"
+ "fullyQualifiedName: \"local_mysql.sample_db.lineorder.~lo_order\"~ *lo_orderpriority\"* ",
messages.values().iterator().next());
// Simulate a change of datatype change in column
addColumn.withNewValue(
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"constraint\":\"NOT_NULL\"}]");
deleteColumn.withOldValue(
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"tags\":[],\"constraint\":\"NOT_NULL\"}]");
changeDescription
.withFieldsAdded(List.of(addColumn))
.withFieldsDeleted(List.of(deleteColumn))
.withPreviousVersion(1.3);
messages = ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.SLACK, changeDescription, TABLE);
assertEquals(1, messages.size());
assertEquals(
"Updated *columns.lo_orderpriority* :\n"
+ "dataType: ~\"BLOB\"~ *\"INT\"* \n"
+ "dataTypeDisplay: ~\"blob\"~ *\"int\"* ",
messages.values().iterator().next());
// Simulate multiple changes to columns
addColumn.withNewValue(
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\"},{\"name\":\"newColumn\",\"displayName\":\"newColumn\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.newColumn\"}]");
deleteColumn.withOldValue(
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\"}]");
changeDescription
.withFieldsAdded(List.of(addColumn))
.withFieldsDeleted(List.of(deleteColumn))
.withPreviousVersion(1.4);
messages = ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.SLACK, changeDescription, TABLE);
assertEquals(1, messages.size());
assertEquals("Updated *columns* : lo_orderpriority*, newColumn* ", messages.values().iterator().next());
}
}

View File

@ -101,6 +101,7 @@ import org.openmetadata.catalog.type.TaskDetails;
import org.openmetadata.catalog.type.TaskStatus;
import org.openmetadata.catalog.type.TaskType;
import org.openmetadata.catalog.type.ThreadType;
import org.openmetadata.catalog.util.ChangeEventParser;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.ResultList;
import org.openmetadata.catalog.util.TestUtils;
@ -457,7 +458,7 @@ public class FeedResourceTest extends CatalogApplicationTest {
assertEquals(TaskStatus.Closed, task.getStatus());
assertEquals(1, taskThread.getPostsCount());
assertEquals(1, taskThread.getPosts().size());
String diff = getPlaintextDiff("old description", "accepted description");
String diff = getPlaintextDiff(ChangeEventParser.PUBLISH_TO.FEED, "old description", "accepted description");
String expectedMessage = String.format("Resolved the Task with Description - %s", diff);
assertEquals(expectedMessage, taskThread.getPosts().get(0).getMessage());
}
@ -566,7 +567,7 @@ public class FeedResourceTest extends CatalogApplicationTest {
assertEquals(TaskStatus.Closed, task.getStatus());
assertEquals(1, taskThread.getPostsCount());
assertEquals(1, taskThread.getPosts().size());
String diff = getPlaintextDiff("", USER_ADDRESS_TAG_LABEL.getTagFQN());
String diff = getPlaintextDiff(ChangeEventParser.PUBLISH_TO.FEED, "", USER_ADDRESS_TAG_LABEL.getTagFQN());
String expectedMessage = String.format("Resolved the Task with Tag(s) - %s", diff);
assertEquals(expectedMessage, taskThread.getPosts().get(0).getMessage());
}