Fix #2834: Merge the change events 'delete' and 'add' as 'update' for Activity Feed (#3006)

This commit is contained in:
Vivek Ratnavel Subramanian 2022-02-27 12:01:35 -08:00 committed by GitHub
parent 83c9b75a5a
commit 68d09a85cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 613 additions and 78 deletions

View File

@ -213,6 +213,13 @@
<version>2.17.1</version>
</dependency>
<!-- Diff util to compute diffs in plain text -->
<dependency>
<groupId>io.github.java-diff-utils</groupId>
<artifactId>java-diff-utils</artifactId>
<version>4.11</version>
</dependency>
<!--test dependencies-->
<dependency>
<groupId>org.junit.jupiter</groupId>

View File

@ -19,23 +19,24 @@ import static org.openmetadata.catalog.type.EventType.ENTITY_UPDATED;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.Response.Status;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.feed.Thread;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.jdbi3.FeedRepository;
import org.openmetadata.catalog.resources.feeds.MessageParser;
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.EventType;
import org.openmetadata.catalog.type.FieldChange;
import org.openmetadata.catalog.util.ChangeEventParser;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
@ -71,7 +72,7 @@ public class ChangeEventHandler implements EventHandler {
// Add a new thread to the entity for every change event
// for the event to appear in activity feeds
List<Thread> threads = getThreads(responseContext);
List<Thread> threads = getThreads(responseContext, changeEvent);
if (threads != null) {
for (var thread : threads) {
feedDao.create(thread);
@ -170,76 +171,30 @@ public class ChangeEventHandler implements EventHandler {
.withCurrentVersion(changeEvent.getCurrentVersion());
}
private enum CHANGE_TYPE {
UPDATE,
ADD,
DELETE
}
private List<Thread> getThreads(ContainerResponseContext responseContext) {
private List<Thread> getThreads(ContainerResponseContext responseContext, ChangeEvent changeEvent) {
Object entity = responseContext.getEntity();
if (entity == null) {
return null; // Response has no entity to produce change event from
}
var entityInterface = Entity.getEntityInterface(entity);
if (entityInterface.getChangeDescription() == null) {
// entityInterface can be null in case of Tags
// TODO: remove this null check when entityInterface should never be null
if (entityInterface == null || entityInterface.getChangeDescription() == null) {
return null;
}
List<FieldChange> fieldsUpdated = entityInterface.getChangeDescription().getFieldsUpdated();
List<Thread> threads = new ArrayList<>(getThreads(fieldsUpdated, entity, CHANGE_TYPE.UPDATE));
List<FieldChange> fieldsAdded = entityInterface.getChangeDescription().getFieldsAdded();
threads.addAll(getThreads(fieldsAdded, entity, CHANGE_TYPE.ADD));
List<FieldChange> fieldsDeleted = entityInterface.getChangeDescription().getFieldsDeleted();
threads.addAll(getThreads(fieldsDeleted, entity, CHANGE_TYPE.DELETE));
return threads;
return getThreads(entity, entityInterface.getChangeDescription(), changeEvent);
}
private List<Thread> getThreads(List<FieldChange> fields, Object entity, CHANGE_TYPE changeType) {
private List<Thread> getThreads(Object entity, ChangeDescription changeDescription, ChangeEvent changeEvent) {
List<Thread> threads = new ArrayList<>();
var entityInterface = Entity.getEntityInterface(entity);
EntityReference entityReference = Entity.getEntityReference(entity);
String entityType = entityReference.getType();
String entityFQN = entityReference.getName();
for (var field : fields) {
// if field name has dots, then it is an array field
String fieldName = field.getName();
String arrayFieldName = null;
String arrayFieldValue = null;
String newFieldValue = field.getNewValue() != null ? field.getNewValue().toString() : StringUtils.EMPTY;
if (fieldName.contains(".")) {
String[] fieldNameParts = fieldName.split("\\.");
// For array type, it should have 3 ex: columns.comment.description
fieldName = fieldNameParts[0];
if (fieldNameParts.length == 3) {
arrayFieldName = fieldNameParts[1];
arrayFieldValue = fieldNameParts[2];
}
}
MessageParser.EntityLink link =
new MessageParser.EntityLink(entityType, entityFQN, fieldName, arrayFieldName, arrayFieldValue);
// Create an automated post
String message = null;
switch (changeType) {
case ADD:
message =
String.format("Added %s: `%s`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
break;
case UPDATE:
message =
String.format("Updated %s to `%s`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
break;
case DELETE:
message = String.format("Deleted %s", arrayFieldValue != null ? arrayFieldValue : fieldName);
break;
default:
break;
}
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, entity, changeEvent);
// Create an automated thread
for (var link : messages.keySet()) {
threads.add(
new Thread()
.withId(UUID.randomUUID())
@ -248,7 +203,7 @@ public class ChangeEventHandler implements EventHandler {
.withAbout(link.getLinkString())
.withUpdatedBy(entityInterface.getUpdatedBy())
.withUpdatedAt(System.currentTimeMillis())
.withMessage(message));
.withMessage(messages.get(link)));
}
return threads;

View File

@ -21,6 +21,7 @@ import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@Slf4j
public final class MessageParser {
@ -30,6 +31,8 @@ public final class MessageParser {
// Pattern to match the following markdown entity links:
// <#E/{entityType}/{entityFQN}> -- <#E/table/bigquery_gcp.shopify.raw_product_catalog>
// <#E/{entityType}/{entityFQN}/{fieldName}> -- <#E/table/bigquery_gcp.shopify.raw_product_catalog/description>
// <#E/{entityType}/{entityFQN}/{fieldName}/{arrayFieldName}>
// -- <#E/table/bigquery_gcp.shopify.raw_product_catalog/columns/comment>
// <#E/{entityType}/{entityFQN}/{fieldName}/{arrayFieldName}/{arrayFieldValue}>
// -- <#E/table/bigquery_gcp.shopify.raw_product_catalog/columns/comment/description>
private static final Pattern ENTITY_LINK_PATTERN =
@ -82,11 +85,11 @@ public final class MessageParser {
this.linkType = LinkType.ENTITY_ARRAY_FIELD;
this.fullyQualifiedFieldType = String.format("%s.%s.member", entityType, fieldName);
this.fullyQualifiedFieldValue = String.format("%s.%s.%s", entityFqn, arrayFieldName, arrayFieldValue);
} else if (arrayFieldName != null) {
this.linkType = LinkType.ENTITY_ARRAY_FIELD;
this.fullyQualifiedFieldType = String.format("%s.%s.member", entityType, fieldName);
this.fullyQualifiedFieldValue = String.format("%s.%s", entityFqn, arrayFieldName);
} else if (fieldName != null) {
if (arrayFieldName != null) {
// Only array field name is not supported
throw new IllegalArgumentException(invalidEntityLink());
}
this.fullyQualifiedFieldType = String.format("%s.%s", entityType, fieldName);
this.fullyQualifiedFieldValue = String.format("%s.%s", entityFqn, fieldName);
@ -105,7 +108,10 @@ public final class MessageParser {
builder.append("/").append(fieldName);
}
if (linkType == LinkType.ENTITY_ARRAY_FIELD) {
builder.append("/").append(arrayFieldName).append("/").append(arrayFieldValue);
builder.append("/").append(arrayFieldName);
if (StringUtils.isNotEmpty(arrayFieldValue)) {
builder.append("/").append(arrayFieldValue);
}
}
builder.append(">");
return builder.toString();

View File

@ -0,0 +1,357 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.util;
import com.github.difflib.text.DiffRow;
import com.github.difflib.text.DiffRowGenerator;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonValue;
import javax.json.JsonValue.ValueType;
import javax.json.stream.JsonParsingException;
import org.apache.commons.lang.StringUtils;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.FieldChange;
public final class ChangeEventParser {
private ChangeEventParser() {}
private enum CHANGE_TYPE {
UPDATE,
ADD,
DELETE
}
public static Map<EntityLink, String> getFormattedMessages(
ChangeDescription changeDescription, Object entity, ChangeEvent changeEvent) {
// Store a map of entityLink -> message
Map<EntityLink, String> messages;
Double previousVersion = changeDescription.getPreviousVersion();
Double currentVersion = changeEvent.getCurrentVersion();
List<FieldChange> fieldsUpdated = changeDescription.getFieldsUpdated();
messages = getFormattedMessages(entity, fieldsUpdated, CHANGE_TYPE.UPDATE, previousVersion, currentVersion);
// fieldsAdded and fieldsDeleted need special handling since
// there is a possibility to merge them as one update message.
List<FieldChange> fieldsAdded = changeDescription.getFieldsAdded();
List<FieldChange> fieldsDeleted = changeDescription.getFieldsDeleted();
messages.putAll(getFormattedMessages(entity, fieldsAdded, fieldsDeleted, previousVersion, currentVersion));
return messages;
}
private static Map<EntityLink, String> getFormattedMessages(
Object entity, List<FieldChange> fields, CHANGE_TYPE changeType, Double previousVersion, Double currentVersion) {
Map<EntityLink, String> messages = new HashMap<>();
for (var field : fields) {
// if field name has dots, then it is an array field
String fieldName = field.getName();
String newFieldValue = getFieldValue(field.getNewValue());
String oldFieldValue = getFieldValue(field.getOldValue());
EntityLink link = getEntityLink(fieldName, entity);
String message =
getFormattedMessage(
link, changeType, fieldName, oldFieldValue, newFieldValue, previousVersion, currentVersion);
messages.put(link, message);
}
return messages;
}
private static String getFieldValue(Object fieldValue) {
if (fieldValue != null) {
try {
// Check if field value is a json string
JsonValue json = JsonUtils.readJson(fieldValue.toString());
if (json.getValueType() == ValueType.ARRAY) {
JsonArray jsonArray = json.asJsonArray();
List<String> labels = new ArrayList<>();
for (var item : jsonArray) {
if (item.getValueType() == ValueType.OBJECT) {
Set<String> keys = item.asJsonObject().keySet();
if (keys.contains("tagFQN")) {
labels.add(item.asJsonObject().getString("tagFQN"));
} else if (keys.contains("displayName")) {
// Entity Reference will have a displayName
labels.add(item.asJsonObject().getString("displayName"));
}
}
}
return String.join(", ", labels);
} else if (json.getValueType() == ValueType.OBJECT) {
JsonObject jsonObject = json.asJsonObject();
// Entity Reference will have a displayName
Set<String> keys = jsonObject.asJsonObject().keySet();
if (keys.contains("displayName")) {
return jsonObject.asJsonObject().getString("displayName");
}
}
} catch (JsonParsingException ex) {
// If unable to parse json, just return the string
}
return fieldValue.toString();
}
return StringUtils.EMPTY;
}
/**
* Tries to merge additions and deletions into updates and returns a map of formatted messages.
*
* @param entity Entity object.
* @param addedFields Fields that were added as part of the change event.
* @param deletedFields Fields that were deleted as part of the change event.
* @return A map of entity link -> formatted message.
*/
private static Map<EntityLink, String> getFormattedMessages(
Object entity,
List<FieldChange> addedFields,
List<FieldChange> deletedFields,
Double previousVersion,
Double currentVersion) {
// Major schema version changes such as renaming a column from colA to colB
// will be recorded as "Removed column colA" and "Added column colB"
// This method will try to detect such changes and combine those events into one update.
Map<EntityLink, String> messages = new HashMap<>();
// if there is only added fields or only deleted fields, we cannot merge
if (addedFields.isEmpty() || deletedFields.isEmpty()) {
if (!addedFields.isEmpty()) {
messages = getFormattedMessages(entity, addedFields, CHANGE_TYPE.ADD, previousVersion, currentVersion);
} else if (!deletedFields.isEmpty()) {
messages = getFormattedMessages(entity, deletedFields, CHANGE_TYPE.DELETE, previousVersion, currentVersion);
}
return messages;
}
for (var field : deletedFields) {
Optional<FieldChange> addedField =
addedFields.stream().filter(f -> f.getName().equals(field.getName())).findAny();
if (addedField.isPresent()) {
String fieldName = field.getName();
EntityLink link = getEntityLink(fieldName, entity);
// convert the added field and deleted field into one update message
String message =
getFormattedMessage(
link,
CHANGE_TYPE.UPDATE,
fieldName,
field.getOldValue(),
addedField.get().getNewValue(),
previousVersion,
currentVersion);
messages.put(link, message);
// Remove the field from addedFields list to avoid double processing
addedFields = addedFields.stream().filter(f -> !f.equals(addedField.get())).collect(Collectors.toList());
} else {
// process the deleted field
messages.putAll(
getFormattedMessages(
entity, Collections.singletonList(field), CHANGE_TYPE.DELETE, previousVersion, currentVersion));
}
}
// process the remaining added fields
if (!addedFields.isEmpty()) {
messages.putAll(getFormattedMessages(entity, addedFields, CHANGE_TYPE.ADD, previousVersion, currentVersion));
}
return messages;
}
private static EntityLink getEntityLink(String fieldName, Object entity) {
EntityReference entityReference = Entity.getEntityReference(entity);
String entityType = entityReference.getType();
String entityFQN = entityReference.getName();
String arrayFieldName = null;
String arrayFieldValue = null;
if (fieldName.contains(".")) {
String[] fieldNameParts = fieldName.split("\\.");
// For array type, it should have 3 parts. ex: columns.comment.description
fieldName = fieldNameParts[0];
if (fieldNameParts.length == 3) {
arrayFieldName = fieldNameParts[1];
arrayFieldValue = fieldNameParts[2];
} else if (fieldNameParts.length == 2) {
arrayFieldName = fieldNameParts[1];
}
}
return new EntityLink(entityType, entityFQN, fieldName, arrayFieldName, arrayFieldValue);
}
private static String getFormattedMessage(
EntityLink link,
CHANGE_TYPE changeType,
String fieldName,
Object oldFieldValue,
Object newFieldValue,
Double previousVersion,
Double currentVersion) {
String arrayFieldName = link.getArrayFieldName();
String arrayFieldValue = link.getArrayFieldValue();
String message = null;
String updatedField = fieldName;
if (arrayFieldValue != null) {
updatedField = String.format("%s.%s", arrayFieldName, arrayFieldValue);
} else if (arrayFieldName != null) {
updatedField = String.format("%s.%s", fieldName, arrayFieldName);
}
switch (changeType) {
case ADD:
message = String.format("Added %s: `%s`", updatedField, getFieldValue(newFieldValue));
break;
case UPDATE:
message = getUpdateMessage(updatedField, oldFieldValue, newFieldValue);
break;
case DELETE:
message = String.format("Deleted %s", updatedField);
break;
default:
break;
}
if (message != null) {
// Double subtraction gives strange results which cannot be relied upon.
// That is why using "> 0.9D" comparison instead of "== 1.0D"
double versionDiff = currentVersion - previousVersion;
String updateType = versionDiff > 0.9D ? "MAJOR" : "MINOR";
message =
String.format(
"%s <br/><br/> **Change Type:** *%s (%s -> %s)*", message, updateType, previousVersion, currentVersion);
}
return message;
}
private static String getPlainTextUpdateMessage(String updatedField, String oldValue, String newValue) {
// Get diff of old value and new value
String diff = getPlaintextDiff(oldValue, newValue);
return String.format("Updated %s : `%s`", updatedField, diff);
}
private static String getObjectUpdateMessage(String updatedField, JsonObject oldJson, JsonObject newJson) {
List<String> labels = new ArrayList<>();
Set<String> keys = newJson.keySet();
// check if each key's value is the same
for (var key : keys) {
if (!newJson.get(key).equals(oldJson.get(key))) {
labels.add(
String.format("%s: `%s`", key, getPlaintextDiff(oldJson.get(key).toString(), newJson.get(key).toString())));
}
}
String updates = String.join(" <br/> ", labels);
// Include name of the field if the json contains "name" key
if (newJson.containsKey("name")) {
updatedField = String.format("%s.%s", updatedField, newJson.getString("name"));
}
return String.format("Updated %s : <br/> %s", updatedField, updates);
}
private static String getUpdateMessage(String updatedField, Object oldValue, Object newValue) {
if (oldValue == null || oldValue.toString().isEmpty()) {
return String.format("Updated %s to `%s`", updatedField, getFieldValue(newValue));
} else if (updatedField.contains("tags") || updatedField.contains("owner")) {
return getPlainTextUpdateMessage(updatedField, getFieldValue(oldValue), getFieldValue(newValue));
}
// if old value is not empty, and is of type array or object, the updates can be across multiple keys
// Example: [{name: "col1", dataType: "varchar", dataLength: "20"}]
try {
// Check if field value is a json string
JsonValue newJson = JsonUtils.readJson(newValue.toString());
JsonValue oldJson = JsonUtils.readJson(oldValue.toString());
if (newJson.getValueType() == ValueType.ARRAY) {
JsonArray newJsonArray = newJson.asJsonArray();
JsonArray oldJsonArray = oldJson.asJsonArray();
if (newJsonArray.size() == 1 && oldJsonArray.size() == 1) {
// if there is only one item in the array, it can be safely considered as an update
JsonValue newItem = newJsonArray.get(0);
JsonValue oldItem = oldJsonArray.get(0);
if (newItem.getValueType() == ValueType.OBJECT) {
JsonObject newJsonItem = newItem.asJsonObject();
JsonObject oldJsonItem = oldItem.asJsonObject();
return getObjectUpdateMessage(updatedField, oldJsonItem, newJsonItem);
} else {
return getPlainTextUpdateMessage(updatedField, newItem.toString(), oldItem.toString());
}
} else {
return getPlainTextUpdateMessage(updatedField, getFieldValue(oldValue), getFieldValue(newValue));
}
} else if (newJson.getValueType() == ValueType.OBJECT) {
JsonObject newJsonObject = newJson.asJsonObject();
JsonObject oldJsonObject = oldJson.asJsonObject();
return getObjectUpdateMessage(updatedField, oldJsonObject, newJsonObject);
}
} catch (JsonParsingException ex) {
// update is of String type
return getPlainTextUpdateMessage(updatedField, oldValue.toString(), newValue.toString());
}
return StringUtils.EMPTY;
}
private static String getPlaintextDiff(String oldValue, String newValue) {
// create a configured DiffRowGenerator
String addMarker = "<!add>";
String removeMarker = "<!remove>";
DiffRowGenerator generator =
DiffRowGenerator.create()
.showInlineDiffs(true)
.mergeOriginalRevised(true)
.inlineDiffByWord(true)
.oldTag(f -> removeMarker) // introduce a tag to mark removals
.newTag(f -> addMarker) // introduce a tag to mark new additions
.build();
// compute the differences
List<DiffRow> rows = generator.generateDiffRows(List.of(oldValue), List.of(newValue));
// There will be only one row of output
String diff = rows.get(0).getOldLine();
// The additions and removals will be wrapped by <!add> and <!remove> tags
// Replace them with html tags to render nicely in the UI
// Example: This is a test <!remove>sentence<!remove><!add>line<!add>
// This is a test <span class="diff-removed">sentence</span><span class="diff-added">line</span>
String spanAdd = "<span class=\"diff-added\">";
String spanRemove = "<span class=\"diff-removed\">";
String spanClose = "</span>";
diff = replaceWithHtml(diff, addMarker, spanAdd, spanClose);
diff = replaceWithHtml(diff, removeMarker, spanRemove, spanClose);
return diff;
}
private static String replaceWithHtml(String diff, String marker, String openTag, String closeTag) {
int index = 0;
while (diff.contains(marker)) {
String replacement = index % 2 == 0 ? openTag : closeTag;
diff = diff.replaceFirst(marker, replacement);
index++;
}
return diff;
}
}

View File

@ -18,7 +18,6 @@ import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.invalidEntityLink;
import static org.openmetadata.catalog.security.SecurityUtil.authHeaders;
import static org.openmetadata.catalog.util.TestUtils.ADMIN_AUTH_HEADERS;
import static org.openmetadata.catalog.util.TestUtils.NON_EXISTENT_ENTITY;
@ -154,16 +153,6 @@ public class FeedResourceTest extends CatalogApplicationTest {
() -> createThread(create, AUTH_HEADERS), NOT_FOUND, entityNotFound(Entity.TABLE, "invalidTableName"));
}
@Test
void post_feedWithInvalidAbout_400() {
// post with invalid entity link pattern
// if entity link refers to an array member, then it should have both
// field name and value
CreateThread create =
create().withAbout(String.format("<#E/table/%s/columns/description>", TABLE.getFullyQualifiedName()));
assertResponse(() -> createThread(create, AUTH_HEADERS), BAD_REQUEST, invalidEntityLink());
}
@Test
void post_validThreadAndList_200(TestInfo test) throws IOException {
int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getData().size();

View File

@ -33,11 +33,13 @@ public class MessageParserTest {
+ // Invalid entity link
"<#E/table/tableFQN> "
+ "<#E/table/tableFQN/description> "
+ "<#E/table/tableFQN/columns/c1> "
+ "<#E/table/tableFQN/columns/c1/description> ";
List<EntityLink> links = MessageParser.getEntityLinks(s);
assertEquals(3, links.size());
assertEquals(4, links.size());
assertEquals(new EntityLink("table", "tableFQN", null, null, null), links.get(0));
assertEquals(new EntityLink("table", "tableFQN", "description", null, null), links.get(1));
assertEquals(new EntityLink("table", "tableFQN", "columns", "c1", "description"), links.get(2));
assertEquals(new EntityLink("table", "tableFQN", "columns", "c1", null), links.get(2));
assertEquals(new EntityLink("table", "tableFQN", "columns", "c1", "description"), links.get(3));
}
}

View File

@ -0,0 +1,219 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.util;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.TestMethodOrder;
import org.openmetadata.catalog.CatalogApplicationTest;
import org.openmetadata.catalog.resources.databases.TableResourceTest;
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.FieldChange;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.type.TagLabel.LabelType;
import org.openmetadata.catalog.type.TagLabel.State;
@Slf4j
@TestInstance(Lifecycle.PER_CLASS)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class ChangeEventParserTest extends CatalogApplicationTest {
Object TABLE;
@BeforeAll
public void setup(TestInfo test) throws IOException, URISyntaxException {
TableResourceTest tableResourceTest = new TableResourceTest();
tableResourceTest.setup(test); // Initialize TableResourceTest for using helper methods
TABLE = tableResourceTest.createEntity(test, 1);
}
@Test
void testFormattedMessages() throws JsonProcessingException {
ChangeDescription changeDescription = new ChangeDescription();
ChangeEvent changeEvent = new ChangeEvent();
// Simulate updating tags of an entity from tag1 -> tag2
FieldChange addTag = new FieldChange();
addTag.withName("tags").withNewValue("tag2");
FieldChange deleteTag = new FieldChange();
deleteTag.withName("tags").withOldValue("tag1");
changeDescription.withFieldsAdded(List.of(addTag)).withFieldsDeleted(List.of(deleteTag)).withPreviousVersion(1.0);
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(1.0).withCurrentVersion(1.1);
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
assertEquals(1, messages.size());
TagLabel tag1 = new TagLabel();
tag1.withTagFQN("tag1").withLabelType(LabelType.DERIVED).withState(State.CONFIRMED);
TagLabel tag2 = new TagLabel();
tag2.withTagFQN("tag2").withLabelType(LabelType.DERIVED).withState(State.CONFIRMED);
addTag.withNewValue(JsonUtils.pojoToJson(List.of(tag2)));
deleteTag.withOldValue(JsonUtils.pojoToJson(List.of(tag1)));
Map<EntityLink, String> jsonMessages =
ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
assertEquals(1, jsonMessages.size());
// The entity links and values of both the messages should be the same
assertEquals(messages.values().iterator().next(), jsonMessages.values().iterator().next());
}
@Test
void testEntityReferenceFormat() throws JsonProcessingException {
ChangeDescription changeDescription = new ChangeDescription();
ChangeEvent changeEvent = new ChangeEvent();
// Simulate adding owner to a table
EntityReference entityReference = new EntityReference();
entityReference.withId(UUID.randomUUID()).withName("user1").withDisplayName("User One");
FieldChange addOwner = new FieldChange();
addOwner.withName("owner").withNewValue(JsonUtils.pojoToJson(entityReference));
changeDescription.withFieldsAdded(List.of(addOwner)).withPreviousVersion(1.0);
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(1.0).withCurrentVersion(1.1);
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
assertEquals(1, messages.size());
assertEquals(
"Added owner: `User One` <br/><br/> **Change Type:** *MINOR (1.0 -> 1.1)*",
messages.values().iterator().next());
}
@Test
void testUpdateOfString() {
ChangeDescription changeDescription = new ChangeDescription();
ChangeEvent changeEvent = new ChangeEvent();
// Simulate a change of description in table
FieldChange updateDescription = new FieldChange();
updateDescription.withName("description").withNewValue("new description").withOldValue("old description");
changeDescription.withFieldsUpdated(List.of(updateDescription)).withPreviousVersion(1.0);
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(1.1);
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
assertEquals(1, messages.size());
assertEquals(
"Updated description : `<span class=\"diff-removed\">old</span>"
+ "<span class=\"diff-added\">new</span> description` <br/><br/> **Change Type:** *MINOR (1.0 -> 1.1)*",
messages.values().iterator().next());
// test if it updates correctly with one add and one delete change
changeDescription = new ChangeDescription();
FieldChange addDescription = new FieldChange();
FieldChange deleteDescription = new FieldChange();
addDescription.withName("description").withNewValue("new description");
deleteDescription.withName("description").withOldValue("old description");
changeDescription
.withFieldsAdded(List.of(addDescription))
.withFieldsDeleted(List.of(deleteDescription))
.withPreviousVersion(1.0);
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(1.1);
// now test if both the type of updates give the same message
Map<EntityLink, String> updatedMessages =
ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
assertEquals(1, updatedMessages.size());
assertEquals(messages.keySet().iterator().next(), updatedMessages.keySet().iterator().next());
assertEquals(messages.values().iterator().next(), updatedMessages.values().iterator().next());
}
@Test
void testMajorSchemaChange() {
ChangeDescription changeDescription = new ChangeDescription();
ChangeEvent changeEvent = new ChangeEvent();
// Simulate a change of column name in table
FieldChange addColumn = new FieldChange();
addColumn
.withName("columns")
.withNewValue(
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"constraint\":\"NOT_NULL\"}]");
FieldChange deleteColumn = new FieldChange();
deleteColumn
.withName("columns")
.withOldValue(
"[{\"name\":\"lo_order\",\"displayName\":\"lo_order\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_order\",\"constraint\":\"NOT_NULL\"}]");
changeDescription
.withFieldsAdded(List.of(addColumn))
.withFieldsDeleted(List.of(deleteColumn))
.withPreviousVersion(1.3);
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(2.3);
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
assertEquals(1, messages.size());
assertEquals(
"Updated columns.lo_orderpriority : <br/> name: `<span class=\"diff-removed\">\"lo_order\"</span><span class=\"diff-added\">\"lo_orderpriority\"</span>` <br/> displayName: `<span class=\"diff-removed\">\"lo_order\"</span><span class=\"diff-added\">\"lo_orderpriority\"</span>` <br/> fullyQualifiedName: `\"local_mysql.sample_db.lineorder.<span class=\"diff-removed\">lo_order\"</span><span class=\"diff-added\">lo_orderpriority\"</span>` <br/><br/> **Change Type:** *MAJOR (1.3 -> 2.3)*",
messages.values().iterator().next());
// Simulate a change of datatype change in column
addColumn.withNewValue(
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"constraint\":\"NOT_NULL\"}]");
deleteColumn.withOldValue(
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\",\"tags\":[],\"constraint\":\"NOT_NULL\"}]");
changeDescription
.withFieldsAdded(List.of(addColumn))
.withFieldsDeleted(List.of(deleteColumn))
.withPreviousVersion(1.3);
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(2.3);
messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
assertEquals(1, messages.size());
assertEquals(
"Updated columns.lo_orderpriority : <br/> dataType: `<span class=\"diff-removed\">\"BLOB\"</span><span class=\"diff-added\">\"INT\"</span>` <br/> dataTypeDisplay: `<span class=\"diff-removed\">\"blob\"</span><span class=\"diff-added\">\"int\"</span>` <br/><br/> **Change Type:** *MAJOR (1.3 -> 2.3)*",
messages.values().iterator().next());
// Simulate multiple changes to columns
addColumn.withNewValue(
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\"},{\"name\":\"newColumn\",\"displayName\":\"newColumn\",\"dataType\":\"INT\",\"dataLength\":1,\"dataTypeDisplay\":\"int\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.newColumn\"}]");
deleteColumn.withOldValue(
"[{\"name\":\"lo_orderpriority\",\"displayName\":\"lo_orderpriority\",\"dataType\":\"BLOB\",\"dataLength\":1,\"dataTypeDisplay\":\"blob\",\"fullyQualifiedName\":\"local_mysql.sample_db.lineorder.lo_orderpriority\"}]");
changeDescription
.withFieldsAdded(List.of(addColumn))
.withFieldsDeleted(List.of(deleteColumn))
.withPreviousVersion(1.4);
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(2.4);
messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
assertEquals(1, messages.size());
assertEquals(
"Updated columns : `lo_orderpriority<span class=\"diff-added\">, newColumn</span>` <br/><br/> **Change Type:** *MAJOR (1.4 -> 2.4)*",
messages.values().iterator().next());
}
}