diff --git a/catalog-rest-service/pom.xml b/catalog-rest-service/pom.xml
index b72abe16d53..febd81e5eee 100644
--- a/catalog-rest-service/pom.xml
+++ b/catalog-rest-service/pom.xml
@@ -213,6 +213,13 @@
2.17.1
+
+
+ io.github.java-diff-utils
+ java-diff-utils
+ 4.11
+
+
org.junit.jupiter
diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java
index f2178dd050a..8bbc86e1cc5 100644
--- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java
+++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java
@@ -19,23 +19,24 @@ import static org.openmetadata.catalog.type.EventType.ENTITY_UPDATED;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.Response.Status;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.feed.Thread;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.jdbi3.FeedRepository;
-import org.openmetadata.catalog.resources.feeds.MessageParser;
+import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
+import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.EventType;
-import org.openmetadata.catalog.type.FieldChange;
+import org.openmetadata.catalog.util.ChangeEventParser;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
@@ -71,7 +72,7 @@ public class ChangeEventHandler implements EventHandler {
// Add a new thread to the entity for every change event
// for the event to appear in activity feeds
- List threads = getThreads(responseContext);
+ List threads = getThreads(responseContext, changeEvent);
if (threads != null) {
for (var thread : threads) {
feedDao.create(thread);
@@ -170,76 +171,30 @@ public class ChangeEventHandler implements EventHandler {
.withCurrentVersion(changeEvent.getCurrentVersion());
}
- private enum CHANGE_TYPE {
- UPDATE,
- ADD,
- DELETE
- }
-
- private List getThreads(ContainerResponseContext responseContext) {
+ private List getThreads(ContainerResponseContext responseContext, ChangeEvent changeEvent) {
Object entity = responseContext.getEntity();
if (entity == null) {
return null; // Response has no entity to produce change event from
}
var entityInterface = Entity.getEntityInterface(entity);
- if (entityInterface.getChangeDescription() == null) {
+ // entityInterface can be null in case of Tags
+ // TODO: remove this null check when entityInterface should never be null
+ if (entityInterface == null || entityInterface.getChangeDescription() == null) {
return null;
}
- List fieldsUpdated = entityInterface.getChangeDescription().getFieldsUpdated();
- List threads = new ArrayList<>(getThreads(fieldsUpdated, entity, CHANGE_TYPE.UPDATE));
- List fieldsAdded = entityInterface.getChangeDescription().getFieldsAdded();
- threads.addAll(getThreads(fieldsAdded, entity, CHANGE_TYPE.ADD));
-
- List fieldsDeleted = entityInterface.getChangeDescription().getFieldsDeleted();
- threads.addAll(getThreads(fieldsDeleted, entity, CHANGE_TYPE.DELETE));
- return threads;
+ return getThreads(entity, entityInterface.getChangeDescription(), changeEvent);
}
- private List getThreads(List fields, Object entity, CHANGE_TYPE changeType) {
+ private List getThreads(Object entity, ChangeDescription changeDescription, ChangeEvent changeEvent) {
List threads = new ArrayList<>();
var entityInterface = Entity.getEntityInterface(entity);
- EntityReference entityReference = Entity.getEntityReference(entity);
- String entityType = entityReference.getType();
- String entityFQN = entityReference.getName();
- for (var field : fields) {
- // if field name has dots, then it is an array field
- String fieldName = field.getName();
- String arrayFieldName = null;
- String arrayFieldValue = null;
- String newFieldValue = field.getNewValue() != null ? field.getNewValue().toString() : StringUtils.EMPTY;
- if (fieldName.contains(".")) {
- String[] fieldNameParts = fieldName.split("\\.");
- // For array type, it should have 3 ex: columns.comment.description
- fieldName = fieldNameParts[0];
- if (fieldNameParts.length == 3) {
- arrayFieldName = fieldNameParts[1];
- arrayFieldValue = fieldNameParts[2];
- }
- }
- MessageParser.EntityLink link =
- new MessageParser.EntityLink(entityType, entityFQN, fieldName, arrayFieldName, arrayFieldValue);
-
- // Create an automated post
- String message = null;
- switch (changeType) {
- case ADD:
- message =
- String.format("Added %s: `%s`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
- break;
- case UPDATE:
- message =
- String.format("Updated %s to `%s`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
- break;
- case DELETE:
- message = String.format("Deleted %s", arrayFieldValue != null ? arrayFieldValue : fieldName);
- break;
- default:
- break;
- }
+ Map messages = ChangeEventParser.getFormattedMessages(changeDescription, entity, changeEvent);
+ // Create an automated thread
+ for (var link : messages.keySet()) {
threads.add(
new Thread()
.withId(UUID.randomUUID())
@@ -248,7 +203,7 @@ public class ChangeEventHandler implements EventHandler {
.withAbout(link.getLinkString())
.withUpdatedBy(entityInterface.getUpdatedBy())
.withUpdatedAt(System.currentTimeMillis())
- .withMessage(message));
+ .withMessage(messages.get(link)));
}
return threads;
diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/feeds/MessageParser.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/feeds/MessageParser.java
index 8be16ded364..ca59b9e9cda 100644
--- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/feeds/MessageParser.java
+++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/feeds/MessageParser.java
@@ -21,6 +21,7 @@ import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
@Slf4j
public final class MessageParser {
@@ -30,6 +31,8 @@ public final class MessageParser {
// Pattern to match the following markdown entity links:
// <#E/{entityType}/{entityFQN}> -- <#E/table/bigquery_gcp.shopify.raw_product_catalog>
// <#E/{entityType}/{entityFQN}/{fieldName}> -- <#E/table/bigquery_gcp.shopify.raw_product_catalog/description>
+ // <#E/{entityType}/{entityFQN}/{fieldName}/{arrayFieldName}>
+ // -- <#E/table/bigquery_gcp.shopify.raw_product_catalog/columns/comment>
// <#E/{entityType}/{entityFQN}/{fieldName}/{arrayFieldName}/{arrayFieldValue}>
// -- <#E/table/bigquery_gcp.shopify.raw_product_catalog/columns/comment/description>
private static final Pattern ENTITY_LINK_PATTERN =
@@ -82,11 +85,11 @@ public final class MessageParser {
this.linkType = LinkType.ENTITY_ARRAY_FIELD;
this.fullyQualifiedFieldType = String.format("%s.%s.member", entityType, fieldName);
this.fullyQualifiedFieldValue = String.format("%s.%s.%s", entityFqn, arrayFieldName, arrayFieldValue);
+ } else if (arrayFieldName != null) {
+ this.linkType = LinkType.ENTITY_ARRAY_FIELD;
+ this.fullyQualifiedFieldType = String.format("%s.%s.member", entityType, fieldName);
+ this.fullyQualifiedFieldValue = String.format("%s.%s", entityFqn, arrayFieldName);
} else if (fieldName != null) {
- if (arrayFieldName != null) {
- // Only array field name is not supported
- throw new IllegalArgumentException(invalidEntityLink());
- }
this.fullyQualifiedFieldType = String.format("%s.%s", entityType, fieldName);
this.fullyQualifiedFieldValue = String.format("%s.%s", entityFqn, fieldName);
@@ -105,7 +108,10 @@ public final class MessageParser {
builder.append("/").append(fieldName);
}
if (linkType == LinkType.ENTITY_ARRAY_FIELD) {
- builder.append("/").append(arrayFieldName).append("/").append(arrayFieldValue);
+ builder.append("/").append(arrayFieldName);
+ if (StringUtils.isNotEmpty(arrayFieldValue)) {
+ builder.append("/").append(arrayFieldValue);
+ }
}
builder.append(">");
return builder.toString();
diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/ChangeEventParser.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/ChangeEventParser.java
new file mode 100644
index 00000000000..fe25118af45
--- /dev/null
+++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/ChangeEventParser.java
@@ -0,0 +1,357 @@
+/*
+ * Copyright 2021 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openmetadata.catalog.util;
+
+import com.github.difflib.text.DiffRow;
+import com.github.difflib.text.DiffRowGenerator;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.json.JsonArray;
+import javax.json.JsonObject;
+import javax.json.JsonValue;
+import javax.json.JsonValue.ValueType;
+import javax.json.stream.JsonParsingException;
+import org.apache.commons.lang.StringUtils;
+import org.openmetadata.catalog.Entity;
+import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
+import org.openmetadata.catalog.type.ChangeDescription;
+import org.openmetadata.catalog.type.ChangeEvent;
+import org.openmetadata.catalog.type.EntityReference;
+import org.openmetadata.catalog.type.FieldChange;
+
+public final class ChangeEventParser {
+
+ private ChangeEventParser() {}
+
+ private enum CHANGE_TYPE {
+ UPDATE,
+ ADD,
+ DELETE
+ }
+
+ public static Map getFormattedMessages(
+ ChangeDescription changeDescription, Object entity, ChangeEvent changeEvent) {
+ // Store a map of entityLink -> message
+ Map messages;
+ Double previousVersion = changeDescription.getPreviousVersion();
+ Double currentVersion = changeEvent.getCurrentVersion();
+
+ List fieldsUpdated = changeDescription.getFieldsUpdated();
+ messages = getFormattedMessages(entity, fieldsUpdated, CHANGE_TYPE.UPDATE, previousVersion, currentVersion);
+
+ // fieldsAdded and fieldsDeleted need special handling since
+ // there is a possibility to merge them as one update message.
+ List fieldsAdded = changeDescription.getFieldsAdded();
+ List fieldsDeleted = changeDescription.getFieldsDeleted();
+ messages.putAll(getFormattedMessages(entity, fieldsAdded, fieldsDeleted, previousVersion, currentVersion));
+
+ return messages;
+ }
+
+ private static Map getFormattedMessages(
+ Object entity, List fields, CHANGE_TYPE changeType, Double previousVersion, Double currentVersion) {
+ Map messages = new HashMap<>();
+
+ for (var field : fields) {
+ // if field name has dots, then it is an array field
+ String fieldName = field.getName();
+
+ String newFieldValue = getFieldValue(field.getNewValue());
+ String oldFieldValue = getFieldValue(field.getOldValue());
+ EntityLink link = getEntityLink(fieldName, entity);
+ String message =
+ getFormattedMessage(
+ link, changeType, fieldName, oldFieldValue, newFieldValue, previousVersion, currentVersion);
+
+ messages.put(link, message);
+ }
+ return messages;
+ }
+
+ private static String getFieldValue(Object fieldValue) {
+ if (fieldValue != null) {
+ try {
+ // Check if field value is a json string
+ JsonValue json = JsonUtils.readJson(fieldValue.toString());
+ if (json.getValueType() == ValueType.ARRAY) {
+ JsonArray jsonArray = json.asJsonArray();
+ List labels = new ArrayList<>();
+ for (var item : jsonArray) {
+ if (item.getValueType() == ValueType.OBJECT) {
+ Set keys = item.asJsonObject().keySet();
+ if (keys.contains("tagFQN")) {
+ labels.add(item.asJsonObject().getString("tagFQN"));
+ } else if (keys.contains("displayName")) {
+ // Entity Reference will have a displayName
+ labels.add(item.asJsonObject().getString("displayName"));
+ }
+ }
+ }
+ return String.join(", ", labels);
+ } else if (json.getValueType() == ValueType.OBJECT) {
+ JsonObject jsonObject = json.asJsonObject();
+ // Entity Reference will have a displayName
+ Set keys = jsonObject.asJsonObject().keySet();
+ if (keys.contains("displayName")) {
+ return jsonObject.asJsonObject().getString("displayName");
+ }
+ }
+ } catch (JsonParsingException ex) {
+ // If unable to parse json, just return the string
+ }
+ return fieldValue.toString();
+ }
+ return StringUtils.EMPTY;
+ }
+
+ /**
+ * Tries to merge additions and deletions into updates and returns a map of formatted messages.
+ *
+ * @param entity Entity object.
+ * @param addedFields Fields that were added as part of the change event.
+ * @param deletedFields Fields that were deleted as part of the change event.
+ * @return A map of entity link -> formatted message.
+ */
+ private static Map getFormattedMessages(
+ Object entity,
+ List addedFields,
+ List deletedFields,
+ Double previousVersion,
+ Double currentVersion) {
+ // Major schema version changes such as renaming a column from colA to colB
+ // will be recorded as "Removed column colA" and "Added column colB"
+ // This method will try to detect such changes and combine those events into one update.
+
+ Map messages = new HashMap<>();
+
+ // if there is only added fields or only deleted fields, we cannot merge
+ if (addedFields.isEmpty() || deletedFields.isEmpty()) {
+ if (!addedFields.isEmpty()) {
+ messages = getFormattedMessages(entity, addedFields, CHANGE_TYPE.ADD, previousVersion, currentVersion);
+ } else if (!deletedFields.isEmpty()) {
+ messages = getFormattedMessages(entity, deletedFields, CHANGE_TYPE.DELETE, previousVersion, currentVersion);
+ }
+ return messages;
+ }
+ for (var field : deletedFields) {
+ Optional addedField =
+ addedFields.stream().filter(f -> f.getName().equals(field.getName())).findAny();
+ if (addedField.isPresent()) {
+ String fieldName = field.getName();
+ EntityLink link = getEntityLink(fieldName, entity);
+ // convert the added field and deleted field into one update message
+ String message =
+ getFormattedMessage(
+ link,
+ CHANGE_TYPE.UPDATE,
+ fieldName,
+ field.getOldValue(),
+ addedField.get().getNewValue(),
+ previousVersion,
+ currentVersion);
+ messages.put(link, message);
+ // Remove the field from addedFields list to avoid double processing
+ addedFields = addedFields.stream().filter(f -> !f.equals(addedField.get())).collect(Collectors.toList());
+ } else {
+ // process the deleted field
+ messages.putAll(
+ getFormattedMessages(
+ entity, Collections.singletonList(field), CHANGE_TYPE.DELETE, previousVersion, currentVersion));
+ }
+ }
+ // process the remaining added fields
+ if (!addedFields.isEmpty()) {
+ messages.putAll(getFormattedMessages(entity, addedFields, CHANGE_TYPE.ADD, previousVersion, currentVersion));
+ }
+ return messages;
+ }
+
+ private static EntityLink getEntityLink(String fieldName, Object entity) {
+ EntityReference entityReference = Entity.getEntityReference(entity);
+ String entityType = entityReference.getType();
+ String entityFQN = entityReference.getName();
+ String arrayFieldName = null;
+ String arrayFieldValue = null;
+
+ if (fieldName.contains(".")) {
+ String[] fieldNameParts = fieldName.split("\\.");
+ // For array type, it should have 3 parts. ex: columns.comment.description
+ fieldName = fieldNameParts[0];
+ if (fieldNameParts.length == 3) {
+ arrayFieldName = fieldNameParts[1];
+ arrayFieldValue = fieldNameParts[2];
+ } else if (fieldNameParts.length == 2) {
+ arrayFieldName = fieldNameParts[1];
+ }
+ }
+
+ return new EntityLink(entityType, entityFQN, fieldName, arrayFieldName, arrayFieldValue);
+ }
+
+ private static String getFormattedMessage(
+ EntityLink link,
+ CHANGE_TYPE changeType,
+ String fieldName,
+ Object oldFieldValue,
+ Object newFieldValue,
+ Double previousVersion,
+ Double currentVersion) {
+ String arrayFieldName = link.getArrayFieldName();
+ String arrayFieldValue = link.getArrayFieldValue();
+
+ String message = null;
+ String updatedField = fieldName;
+ if (arrayFieldValue != null) {
+ updatedField = String.format("%s.%s", arrayFieldName, arrayFieldValue);
+ } else if (arrayFieldName != null) {
+ updatedField = String.format("%s.%s", fieldName, arrayFieldName);
+ }
+
+ switch (changeType) {
+ case ADD:
+ message = String.format("Added %s: `%s`", updatedField, getFieldValue(newFieldValue));
+ break;
+ case UPDATE:
+ message = getUpdateMessage(updatedField, oldFieldValue, newFieldValue);
+ break;
+ case DELETE:
+ message = String.format("Deleted %s", updatedField);
+ break;
+ default:
+ break;
+ }
+ if (message != null) {
+ // Double subtraction gives strange results which cannot be relied upon.
+ // That is why using "> 0.9D" comparison instead of "== 1.0D"
+ double versionDiff = currentVersion - previousVersion;
+ String updateType = versionDiff > 0.9D ? "MAJOR" : "MINOR";
+ message =
+ String.format(
+ "%s
**Change Type:** *%s (%s -> %s)*", message, updateType, previousVersion, currentVersion);
+ }
+ return message;
+ }
+
+ private static String getPlainTextUpdateMessage(String updatedField, String oldValue, String newValue) {
+ // Get diff of old value and new value
+ String diff = getPlaintextDiff(oldValue, newValue);
+ return String.format("Updated %s : `%s`", updatedField, diff);
+ }
+
+ private static String getObjectUpdateMessage(String updatedField, JsonObject oldJson, JsonObject newJson) {
+ List labels = new ArrayList<>();
+ Set keys = newJson.keySet();
+ // check if each key's value is the same
+ for (var key : keys) {
+ if (!newJson.get(key).equals(oldJson.get(key))) {
+ labels.add(
+ String.format("%s: `%s`", key, getPlaintextDiff(oldJson.get(key).toString(), newJson.get(key).toString())));
+ }
+ }
+ String updates = String.join("
", labels);
+ // Include name of the field if the json contains "name" key
+ if (newJson.containsKey("name")) {
+ updatedField = String.format("%s.%s", updatedField, newJson.getString("name"));
+ }
+ return String.format("Updated %s :
%s", updatedField, updates);
+ }
+
+ private static String getUpdateMessage(String updatedField, Object oldValue, Object newValue) {
+ if (oldValue == null || oldValue.toString().isEmpty()) {
+ return String.format("Updated %s to `%s`", updatedField, getFieldValue(newValue));
+ } else if (updatedField.contains("tags") || updatedField.contains("owner")) {
+ return getPlainTextUpdateMessage(updatedField, getFieldValue(oldValue), getFieldValue(newValue));
+ }
+ // if old value is not empty, and is of type array or object, the updates can be across multiple keys
+ // Example: [{name: "col1", dataType: "varchar", dataLength: "20"}]
+
+ try {
+ // Check if field value is a json string
+ JsonValue newJson = JsonUtils.readJson(newValue.toString());
+ JsonValue oldJson = JsonUtils.readJson(oldValue.toString());
+ if (newJson.getValueType() == ValueType.ARRAY) {
+ JsonArray newJsonArray = newJson.asJsonArray();
+ JsonArray oldJsonArray = oldJson.asJsonArray();
+ if (newJsonArray.size() == 1 && oldJsonArray.size() == 1) {
+ // if there is only one item in the array, it can be safely considered as an update
+ JsonValue newItem = newJsonArray.get(0);
+ JsonValue oldItem = oldJsonArray.get(0);
+ if (newItem.getValueType() == ValueType.OBJECT) {
+ JsonObject newJsonItem = newItem.asJsonObject();
+ JsonObject oldJsonItem = oldItem.asJsonObject();
+ return getObjectUpdateMessage(updatedField, oldJsonItem, newJsonItem);
+ } else {
+ return getPlainTextUpdateMessage(updatedField, newItem.toString(), oldItem.toString());
+ }
+ } else {
+ return getPlainTextUpdateMessage(updatedField, getFieldValue(oldValue), getFieldValue(newValue));
+ }
+ } else if (newJson.getValueType() == ValueType.OBJECT) {
+ JsonObject newJsonObject = newJson.asJsonObject();
+ JsonObject oldJsonObject = oldJson.asJsonObject();
+ return getObjectUpdateMessage(updatedField, oldJsonObject, newJsonObject);
+ }
+ } catch (JsonParsingException ex) {
+ // update is of String type
+ return getPlainTextUpdateMessage(updatedField, oldValue.toString(), newValue.toString());
+ }
+ return StringUtils.EMPTY;
+ }
+
+ private static String getPlaintextDiff(String oldValue, String newValue) {
+ // create a configured DiffRowGenerator
+ String addMarker = "";
+ String removeMarker = "";
+ DiffRowGenerator generator =
+ DiffRowGenerator.create()
+ .showInlineDiffs(true)
+ .mergeOriginalRevised(true)
+ .inlineDiffByWord(true)
+ .oldTag(f -> removeMarker) // introduce a tag to mark removals
+ .newTag(f -> addMarker) // introduce a tag to mark new additions
+ .build();
+ // compute the differences
+ List rows = generator.generateDiffRows(List.of(oldValue), List.of(newValue));
+
+ // There will be only one row of output
+ String diff = rows.get(0).getOldLine();
+
+ // The additions and removals will be wrapped by and tags
+ // Replace them with html tags to render nicely in the UI
+ // Example: This is a test sentenceline
+ // This is a test sentenceline
+ String spanAdd = "";
+ String spanRemove = "";
+ String spanClose = "";
+ diff = replaceWithHtml(diff, addMarker, spanAdd, spanClose);
+ diff = replaceWithHtml(diff, removeMarker, spanRemove, spanClose);
+ return diff;
+ }
+
+ private static String replaceWithHtml(String diff, String marker, String openTag, String closeTag) {
+ int index = 0;
+ while (diff.contains(marker)) {
+ String replacement = index % 2 == 0 ? openTag : closeTag;
+ diff = diff.replaceFirst(marker, replacement);
+ index++;
+ }
+ return diff;
+ }
+}
diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java
index ecc7a25ce09..2fd06260e60 100644
--- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java
+++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java
@@ -18,7 +18,6 @@ import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
-import static org.openmetadata.catalog.exception.CatalogExceptionMessage.invalidEntityLink;
import static org.openmetadata.catalog.security.SecurityUtil.authHeaders;
import static org.openmetadata.catalog.util.TestUtils.ADMIN_AUTH_HEADERS;
import static org.openmetadata.catalog.util.TestUtils.NON_EXISTENT_ENTITY;
@@ -154,16 +153,6 @@ public class FeedResourceTest extends CatalogApplicationTest {
() -> createThread(create, AUTH_HEADERS), NOT_FOUND, entityNotFound(Entity.TABLE, "invalidTableName"));
}
- @Test
- void post_feedWithInvalidAbout_400() {
- // post with invalid entity link pattern
- // if entity link refers to an array member, then it should have both
- // field name and value
- CreateThread create =
- create().withAbout(String.format("<#E/table/%s/columns/description>", TABLE.getFullyQualifiedName()));
- assertResponse(() -> createThread(create, AUTH_HEADERS), BAD_REQUEST, invalidEntityLink());
- }
-
@Test
void post_validThreadAndList_200(TestInfo test) throws IOException {
int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getData().size();
diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/MessageParserTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/MessageParserTest.java
index dd01deb9046..3dd264001a6 100644
--- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/MessageParserTest.java
+++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/MessageParserTest.java
@@ -33,11 +33,13 @@ public class MessageParserTest {
+ // Invalid entity link
"<#E/table/tableFQN> "
+ "<#E/table/tableFQN/description> "
+ + "<#E/table/tableFQN/columns/c1> "
+ "<#E/table/tableFQN/columns/c1/description> ";
List links = MessageParser.getEntityLinks(s);
- assertEquals(3, links.size());
+ assertEquals(4, links.size());
assertEquals(new EntityLink("table", "tableFQN", null, null, null), links.get(0));
assertEquals(new EntityLink("table", "tableFQN", "description", null, null), links.get(1));
- assertEquals(new EntityLink("table", "tableFQN", "columns", "c1", "description"), links.get(2));
+ assertEquals(new EntityLink("table", "tableFQN", "columns", "c1", null), links.get(2));
+ assertEquals(new EntityLink("table", "tableFQN", "columns", "c1", "description"), links.get(3));
}
}
diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/ChangeEventParserTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/ChangeEventParserTest.java
new file mode 100644
index 00000000000..a649c7a012a
--- /dev/null
+++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/ChangeEventParserTest.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2021 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openmetadata.catalog.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.openmetadata.catalog.CatalogApplicationTest;
+import org.openmetadata.catalog.resources.databases.TableResourceTest;
+import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
+import org.openmetadata.catalog.type.ChangeDescription;
+import org.openmetadata.catalog.type.ChangeEvent;
+import org.openmetadata.catalog.type.EntityReference;
+import org.openmetadata.catalog.type.FieldChange;
+import org.openmetadata.catalog.type.TagLabel;
+import org.openmetadata.catalog.type.TagLabel.LabelType;
+import org.openmetadata.catalog.type.TagLabel.State;
+
+@Slf4j
+@TestInstance(Lifecycle.PER_CLASS)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class ChangeEventParserTest extends CatalogApplicationTest {
+
+ Object TABLE;
+
+ @BeforeAll
+ public void setup(TestInfo test) throws IOException, URISyntaxException {
+ TableResourceTest tableResourceTest = new TableResourceTest();
+ tableResourceTest.setup(test); // Initialize TableResourceTest for using helper methods
+ TABLE = tableResourceTest.createEntity(test, 1);
+ }
+
+ @Test
+ void testFormattedMessages() throws JsonProcessingException {
+ ChangeDescription changeDescription = new ChangeDescription();
+ ChangeEvent changeEvent = new ChangeEvent();
+ // Simulate updating tags of an entity from tag1 -> tag2
+ FieldChange addTag = new FieldChange();
+ addTag.withName("tags").withNewValue("tag2");
+ FieldChange deleteTag = new FieldChange();
+ deleteTag.withName("tags").withOldValue("tag1");
+ changeDescription.withFieldsAdded(List.of(addTag)).withFieldsDeleted(List.of(deleteTag)).withPreviousVersion(1.0);
+ changeEvent.withChangeDescription(changeDescription).withPreviousVersion(1.0).withCurrentVersion(1.1);
+
+ Map messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
+ assertEquals(1, messages.size());
+
+ TagLabel tag1 = new TagLabel();
+ tag1.withTagFQN("tag1").withLabelType(LabelType.DERIVED).withState(State.CONFIRMED);
+
+ TagLabel tag2 = new TagLabel();
+ tag2.withTagFQN("tag2").withLabelType(LabelType.DERIVED).withState(State.CONFIRMED);
+
+ addTag.withNewValue(JsonUtils.pojoToJson(List.of(tag2)));
+ deleteTag.withOldValue(JsonUtils.pojoToJson(List.of(tag1)));
+
+ Map jsonMessages =
+ ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
+ assertEquals(1, jsonMessages.size());
+
+ // The entity links and values of both the messages should be the same
+ assertEquals(messages.values().iterator().next(), jsonMessages.values().iterator().next());
+ }
+
+ @Test
+ void testEntityReferenceFormat() throws JsonProcessingException {
+ ChangeDescription changeDescription = new ChangeDescription();
+ ChangeEvent changeEvent = new ChangeEvent();
+ // Simulate adding owner to a table
+ EntityReference entityReference = new EntityReference();
+ entityReference.withId(UUID.randomUUID()).withName("user1").withDisplayName("User One");
+ FieldChange addOwner = new FieldChange();
+ addOwner.withName("owner").withNewValue(JsonUtils.pojoToJson(entityReference));
+
+ changeDescription.withFieldsAdded(List.of(addOwner)).withPreviousVersion(1.0);
+ changeEvent.withChangeDescription(changeDescription).withPreviousVersion(1.0).withCurrentVersion(1.1);
+
+ Map messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
+ assertEquals(1, messages.size());
+
+ assertEquals(
+ "Added owner: `User One`
**Change Type:** *MINOR (1.0 -> 1.1)*",
+ messages.values().iterator().next());
+ }
+
+ @Test
+ void testUpdateOfString() {
+ ChangeDescription changeDescription = new ChangeDescription();
+ ChangeEvent changeEvent = new ChangeEvent();
+ // Simulate a change of description in table
+ FieldChange updateDescription = new FieldChange();
+ updateDescription.withName("description").withNewValue("new description").withOldValue("old description");
+
+ changeDescription.withFieldsUpdated(List.of(updateDescription)).withPreviousVersion(1.0);
+ changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(1.1);
+
+ Map messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
+ assertEquals(1, messages.size());
+
+ assertEquals(
+ "Updated description : `old"
+ + "new description`
**Change Type:** *MINOR (1.0 -> 1.1)*",
+ messages.values().iterator().next());
+
+ // test if it updates correctly with one add and one delete change
+ changeDescription = new ChangeDescription();
+ FieldChange addDescription = new FieldChange();
+ FieldChange deleteDescription = new FieldChange();
+ addDescription.withName("description").withNewValue("new description");
+ deleteDescription.withName("description").withOldValue("old description");
+ changeDescription
+ .withFieldsAdded(List.of(addDescription))
+ .withFieldsDeleted(List.of(deleteDescription))
+ .withPreviousVersion(1.0);
+
+ changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(1.1);
+
+ // now test if both the type of updates give the same message
+ Map updatedMessages =
+ ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
+ assertEquals(1, updatedMessages.size());
+
+ assertEquals(messages.keySet().iterator().next(), updatedMessages.keySet().iterator().next());
+ assertEquals(messages.values().iterator().next(), updatedMessages.values().iterator().next());
+ }
+
+ @Test
+ void testMajorSchemaChange() {
+ ChangeDescription changeDescription = new ChangeDescription();
+ ChangeEvent changeEvent = new ChangeEvent();
+ // Simulate a change of column name in table
+ FieldChange addColumn = new FieldChange();
+ addColumn
+ .withName("columns")
+ .withNewValue(
+ "[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"constraint\":\"NOT_NULL\"}]");
+
+ FieldChange deleteColumn = new FieldChange();
+ deleteColumn
+ .withName("columns")
+ .withOldValue(
+ "[{\"name\":\"lo_order\",\"displayName\":\"lo_order\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_order\",\"constraint\":\"NOT_NULL\"}]");
+
+ changeDescription
+ .withFieldsAdded(List.of(addColumn))
+ .withFieldsDeleted(List.of(deleteColumn))
+ .withPreviousVersion(1.3);
+ changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(2.3);
+
+ Map messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
+ assertEquals(1, messages.size());
+
+ assertEquals(
+ "Updated columns.lo_orderpriority :
name: `\"lo_order\"\"lo_orderpriority\"`
displayName: `\"lo_order\"\"lo_orderpriority\"`
fullyQualifiedName: `\"local_mysql.sample_db.lineorder.lo_order\"lo_orderpriority\"`
**Change Type:** *MAJOR (1.3 -> 2.3)*",
+ messages.values().iterator().next());
+
+ // Simulate a change of datatype change in column
+ addColumn.withNewValue(
+ "[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"constraint\":\"NOT_NULL\"}]");
+ deleteColumn.withOldValue(
+ "[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"tags\":[],\"constraint\":\"NOT_NULL\"}]");
+
+ changeDescription
+ .withFieldsAdded(List.of(addColumn))
+ .withFieldsDeleted(List.of(deleteColumn))
+ .withPreviousVersion(1.3);
+ changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(2.3);
+
+ messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
+ assertEquals(1, messages.size());
+
+ assertEquals(
+ "Updated columns.lo_orderpriority :
dataType: `\"BLOB\"\"INT\"`
dataTypeDisplay: `\"blob\"\"int\"`
**Change Type:** *MAJOR (1.3 -> 2.3)*",
+ messages.values().iterator().next());
+
+ // Simulate multiple changes to columns
+ addColumn.withNewValue(
+ "[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\"},{\"name\":\"newColumn\",\"displayName\":\"newColumn\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.newColumn\"}]");
+ deleteColumn.withOldValue(
+ "[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\"}]");
+
+ changeDescription
+ .withFieldsAdded(List.of(addColumn))
+ .withFieldsDeleted(List.of(deleteColumn))
+ .withPreviousVersion(1.4);
+ changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(2.4);
+
+ messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
+ assertEquals(1, messages.size());
+
+ assertEquals(
+ "Updated columns : `lo_orderpriority, newColumn`
**Change Type:** *MAJOR (1.4 -> 2.4)*",
+ messages.values().iterator().next());
+ }
+}