mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-05 07:03:07 +00:00
This commit is contained in:
parent
79589e31ad
commit
bddb16e859
@ -106,6 +106,11 @@ public final class Entity {
|
|||||||
public static final String AIRFLOW_PIPELINE = "airflowPipeline";
|
public static final String AIRFLOW_PIPELINE = "airflowPipeline";
|
||||||
public static final String WEBHOOK = "webhook";
|
public static final String WEBHOOK = "webhook";
|
||||||
|
|
||||||
|
//
|
||||||
|
// List of entities whose changes should not be published to the Activity Feed
|
||||||
|
//
|
||||||
|
public static final List<String> ACTIVITY_FEED_EXCLUDED_ENTITIES = List.of(USER, TEAM, ROLE, POLICY, BOTS);
|
||||||
|
|
||||||
private Entity() {}
|
private Entity() {}
|
||||||
|
|
||||||
public static <T> void registerEntity(
|
public static <T> void registerEntity(
|
||||||
@ -185,6 +190,16 @@ public final class Entity {
|
|||||||
return !entityType.equals(TEAM);
|
return !entityType.equals(TEAM);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if the change events of the given entity type should be published to the activity feed.
|
||||||
|
*
|
||||||
|
* @param entityType Type of the entity.
|
||||||
|
* @return true if change events of the entity should be published to activity feed, false otherwise
|
||||||
|
*/
|
||||||
|
public static boolean shouldDisplayEntityChangeOnFeed(@NonNull String entityType) {
|
||||||
|
return !ACTIVITY_FEED_EXCLUDED_ENTITIES.contains(entityType);
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> EntityInterface<T> getEntityInterface(T entity) {
|
public static <T> EntityInterface<T> getEntityInterface(T entity) {
|
||||||
if (entity == null) {
|
if (entity == null) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -72,10 +72,15 @@ public class ChangeEventHandler implements EventHandler {
|
|||||||
|
|
||||||
// Add a new thread to the entity for every change event
|
// Add a new thread to the entity for every change event
|
||||||
// for the event to appear in activity feeds
|
// for the event to appear in activity feeds
|
||||||
List<Thread> threads = getThreads(responseContext, changeEvent);
|
if (Entity.shouldDisplayEntityChangeOnFeed(changeEvent.getEntityType())) {
|
||||||
if (threads != null) {
|
List<Thread> threads = getThreads(responseContext);
|
||||||
for (var thread : threads) {
|
if (threads != null) {
|
||||||
feedDao.create(thread);
|
for (var thread : threads) {
|
||||||
|
// Don't create a thread if there is no message
|
||||||
|
if (!thread.getMessage().isEmpty()) {
|
||||||
|
feedDao.create(thread);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -171,7 +176,7 @@ public class ChangeEventHandler implements EventHandler {
|
|||||||
.withCurrentVersion(changeEvent.getCurrentVersion());
|
.withCurrentVersion(changeEvent.getCurrentVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Thread> getThreads(ContainerResponseContext responseContext, ChangeEvent changeEvent) {
|
private List<Thread> getThreads(ContainerResponseContext responseContext) {
|
||||||
Object entity = responseContext.getEntity();
|
Object entity = responseContext.getEntity();
|
||||||
if (entity == null) {
|
if (entity == null) {
|
||||||
return null; // Response has no entity to produce change event from
|
return null; // Response has no entity to produce change event from
|
||||||
@ -184,14 +189,14 @@ public class ChangeEventHandler implements EventHandler {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return getThreads(entity, entityInterface.getChangeDescription(), changeEvent);
|
return getThreads(entity, entityInterface.getChangeDescription());
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Thread> getThreads(Object entity, ChangeDescription changeDescription, ChangeEvent changeEvent) {
|
private List<Thread> getThreads(Object entity, ChangeDescription changeDescription) {
|
||||||
List<Thread> threads = new ArrayList<>();
|
List<Thread> threads = new ArrayList<>();
|
||||||
var entityInterface = Entity.getEntityInterface(entity);
|
var entityInterface = Entity.getEntityInterface(entity);
|
||||||
|
|
||||||
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, entity, changeEvent);
|
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, entity);
|
||||||
|
|
||||||
// Create an automated thread
|
// Create an automated thread
|
||||||
for (var link : messages.keySet()) {
|
for (var link : messages.keySet()) {
|
||||||
|
@ -32,7 +32,6 @@ import org.apache.commons.lang.StringUtils;
|
|||||||
import org.openmetadata.catalog.Entity;
|
import org.openmetadata.catalog.Entity;
|
||||||
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
|
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
|
||||||
import org.openmetadata.catalog.type.ChangeDescription;
|
import org.openmetadata.catalog.type.ChangeDescription;
|
||||||
import org.openmetadata.catalog.type.ChangeEvent;
|
|
||||||
import org.openmetadata.catalog.type.EntityReference;
|
import org.openmetadata.catalog.type.EntityReference;
|
||||||
import org.openmetadata.catalog.type.FieldChange;
|
import org.openmetadata.catalog.type.FieldChange;
|
||||||
|
|
||||||
@ -46,8 +45,7 @@ public final class ChangeEventParser {
|
|||||||
DELETE
|
DELETE
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Map<EntityLink, String> getFormattedMessages(
|
public static Map<EntityLink, String> getFormattedMessages(ChangeDescription changeDescription, Object entity) {
|
||||||
ChangeDescription changeDescription, Object entity, ChangeEvent changeEvent) {
|
|
||||||
// Store a map of entityLink -> message
|
// Store a map of entityLink -> message
|
||||||
Map<EntityLink, String> messages;
|
Map<EntityLink, String> messages;
|
||||||
|
|
||||||
@ -82,7 +80,7 @@ public final class ChangeEventParser {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static String getFieldValue(Object fieldValue) {
|
private static String getFieldValue(Object fieldValue) {
|
||||||
if (fieldValue != null) {
|
if (fieldValue != null && !fieldValue.toString().isEmpty()) {
|
||||||
try {
|
try {
|
||||||
// Check if field value is a json string
|
// Check if field value is a json string
|
||||||
JsonValue json = JsonUtils.readJson(fieldValue.toString());
|
JsonValue json = JsonUtils.readJson(fieldValue.toString());
|
||||||
|
@ -33,7 +33,6 @@ import org.openmetadata.catalog.CatalogApplicationTest;
|
|||||||
import org.openmetadata.catalog.resources.databases.TableResourceTest;
|
import org.openmetadata.catalog.resources.databases.TableResourceTest;
|
||||||
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
|
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
|
||||||
import org.openmetadata.catalog.type.ChangeDescription;
|
import org.openmetadata.catalog.type.ChangeDescription;
|
||||||
import org.openmetadata.catalog.type.ChangeEvent;
|
|
||||||
import org.openmetadata.catalog.type.EntityReference;
|
import org.openmetadata.catalog.type.EntityReference;
|
||||||
import org.openmetadata.catalog.type.FieldChange;
|
import org.openmetadata.catalog.type.FieldChange;
|
||||||
import org.openmetadata.catalog.type.TagLabel;
|
import org.openmetadata.catalog.type.TagLabel;
|
||||||
@ -57,16 +56,14 @@ public class ChangeEventParserTest extends CatalogApplicationTest {
|
|||||||
@Test
|
@Test
|
||||||
void testFormattedMessages() throws JsonProcessingException {
|
void testFormattedMessages() throws JsonProcessingException {
|
||||||
ChangeDescription changeDescription = new ChangeDescription();
|
ChangeDescription changeDescription = new ChangeDescription();
|
||||||
ChangeEvent changeEvent = new ChangeEvent();
|
|
||||||
// Simulate updating tags of an entity from tag1 -> tag2
|
// Simulate updating tags of an entity from tag1 -> tag2
|
||||||
FieldChange addTag = new FieldChange();
|
FieldChange addTag = new FieldChange();
|
||||||
addTag.withName("tags").withNewValue("tag2");
|
addTag.withName("tags").withNewValue("tag2");
|
||||||
FieldChange deleteTag = new FieldChange();
|
FieldChange deleteTag = new FieldChange();
|
||||||
deleteTag.withName("tags").withOldValue("tag1");
|
deleteTag.withName("tags").withOldValue("tag1");
|
||||||
changeDescription.withFieldsAdded(List.of(addTag)).withFieldsDeleted(List.of(deleteTag)).withPreviousVersion(1.0);
|
changeDescription.withFieldsAdded(List.of(addTag)).withFieldsDeleted(List.of(deleteTag)).withPreviousVersion(1.0);
|
||||||
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(1.0).withCurrentVersion(1.1);
|
|
||||||
|
|
||||||
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
|
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
|
||||||
assertEquals(1, messages.size());
|
assertEquals(1, messages.size());
|
||||||
|
|
||||||
TagLabel tag1 = new TagLabel();
|
TagLabel tag1 = new TagLabel();
|
||||||
@ -78,8 +75,7 @@ public class ChangeEventParserTest extends CatalogApplicationTest {
|
|||||||
addTag.withNewValue(JsonUtils.pojoToJson(List.of(tag2)));
|
addTag.withNewValue(JsonUtils.pojoToJson(List.of(tag2)));
|
||||||
deleteTag.withOldValue(JsonUtils.pojoToJson(List.of(tag1)));
|
deleteTag.withOldValue(JsonUtils.pojoToJson(List.of(tag1)));
|
||||||
|
|
||||||
Map<EntityLink, String> jsonMessages =
|
Map<EntityLink, String> jsonMessages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
|
||||||
ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
|
|
||||||
assertEquals(1, jsonMessages.size());
|
assertEquals(1, jsonMessages.size());
|
||||||
|
|
||||||
// The entity links and values of both the messages should be the same
|
// The entity links and values of both the messages should be the same
|
||||||
@ -89,7 +85,6 @@ public class ChangeEventParserTest extends CatalogApplicationTest {
|
|||||||
@Test
|
@Test
|
||||||
void testEntityReferenceFormat() throws JsonProcessingException {
|
void testEntityReferenceFormat() throws JsonProcessingException {
|
||||||
ChangeDescription changeDescription = new ChangeDescription();
|
ChangeDescription changeDescription = new ChangeDescription();
|
||||||
ChangeEvent changeEvent = new ChangeEvent();
|
|
||||||
// Simulate adding owner to a table
|
// Simulate adding owner to a table
|
||||||
EntityReference entityReference = new EntityReference();
|
EntityReference entityReference = new EntityReference();
|
||||||
entityReference.withId(UUID.randomUUID()).withName("user1").withDisplayName("User One");
|
entityReference.withId(UUID.randomUUID()).withName("user1").withDisplayName("User One");
|
||||||
@ -97,9 +92,8 @@ public class ChangeEventParserTest extends CatalogApplicationTest {
|
|||||||
addOwner.withName("owner").withNewValue(JsonUtils.pojoToJson(entityReference));
|
addOwner.withName("owner").withNewValue(JsonUtils.pojoToJson(entityReference));
|
||||||
|
|
||||||
changeDescription.withFieldsAdded(List.of(addOwner)).withPreviousVersion(1.0);
|
changeDescription.withFieldsAdded(List.of(addOwner)).withPreviousVersion(1.0);
|
||||||
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(1.0).withCurrentVersion(1.1);
|
|
||||||
|
|
||||||
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
|
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
|
||||||
assertEquals(1, messages.size());
|
assertEquals(1, messages.size());
|
||||||
|
|
||||||
assertEquals("Added **owner**: `User One`", messages.values().iterator().next());
|
assertEquals("Added **owner**: `User One`", messages.values().iterator().next());
|
||||||
@ -108,15 +102,13 @@ public class ChangeEventParserTest extends CatalogApplicationTest {
|
|||||||
@Test
|
@Test
|
||||||
void testUpdateOfString() {
|
void testUpdateOfString() {
|
||||||
ChangeDescription changeDescription = new ChangeDescription();
|
ChangeDescription changeDescription = new ChangeDescription();
|
||||||
ChangeEvent changeEvent = new ChangeEvent();
|
|
||||||
// Simulate a change of description in table
|
// Simulate a change of description in table
|
||||||
FieldChange updateDescription = new FieldChange();
|
FieldChange updateDescription = new FieldChange();
|
||||||
updateDescription.withName("description").withNewValue("new description").withOldValue("old description");
|
updateDescription.withName("description").withNewValue("new description").withOldValue("old description");
|
||||||
|
|
||||||
changeDescription.withFieldsUpdated(List.of(updateDescription)).withPreviousVersion(1.0);
|
changeDescription.withFieldsUpdated(List.of(updateDescription)).withPreviousVersion(1.0);
|
||||||
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(1.1);
|
|
||||||
|
|
||||||
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
|
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
|
||||||
assertEquals(1, messages.size());
|
assertEquals(1, messages.size());
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
@ -135,11 +127,8 @@ public class ChangeEventParserTest extends CatalogApplicationTest {
|
|||||||
.withFieldsDeleted(List.of(deleteDescription))
|
.withFieldsDeleted(List.of(deleteDescription))
|
||||||
.withPreviousVersion(1.0);
|
.withPreviousVersion(1.0);
|
||||||
|
|
||||||
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(1.1);
|
|
||||||
|
|
||||||
// now test if both the type of updates give the same message
|
// now test if both the type of updates give the same message
|
||||||
Map<EntityLink, String> updatedMessages =
|
Map<EntityLink, String> updatedMessages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
|
||||||
ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
|
|
||||||
assertEquals(1, updatedMessages.size());
|
assertEquals(1, updatedMessages.size());
|
||||||
|
|
||||||
assertEquals(messages.keySet().iterator().next(), updatedMessages.keySet().iterator().next());
|
assertEquals(messages.keySet().iterator().next(), updatedMessages.keySet().iterator().next());
|
||||||
@ -149,7 +138,6 @@ public class ChangeEventParserTest extends CatalogApplicationTest {
|
|||||||
@Test
|
@Test
|
||||||
void testMajorSchemaChange() {
|
void testMajorSchemaChange() {
|
||||||
ChangeDescription changeDescription = new ChangeDescription();
|
ChangeDescription changeDescription = new ChangeDescription();
|
||||||
ChangeEvent changeEvent = new ChangeEvent();
|
|
||||||
// Simulate a change of column name in table
|
// Simulate a change of column name in table
|
||||||
FieldChange addColumn = new FieldChange();
|
FieldChange addColumn = new FieldChange();
|
||||||
addColumn
|
addColumn
|
||||||
@ -167,9 +155,8 @@ public class ChangeEventParserTest extends CatalogApplicationTest {
|
|||||||
.withFieldsAdded(List.of(addColumn))
|
.withFieldsAdded(List.of(addColumn))
|
||||||
.withFieldsDeleted(List.of(deleteColumn))
|
.withFieldsDeleted(List.of(deleteColumn))
|
||||||
.withPreviousVersion(1.3);
|
.withPreviousVersion(1.3);
|
||||||
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(2.3);
|
|
||||||
|
|
||||||
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
|
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
|
||||||
assertEquals(1, messages.size());
|
assertEquals(1, messages.size());
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
@ -186,9 +173,8 @@ public class ChangeEventParserTest extends CatalogApplicationTest {
|
|||||||
.withFieldsAdded(List.of(addColumn))
|
.withFieldsAdded(List.of(addColumn))
|
||||||
.withFieldsDeleted(List.of(deleteColumn))
|
.withFieldsDeleted(List.of(deleteColumn))
|
||||||
.withPreviousVersion(1.3);
|
.withPreviousVersion(1.3);
|
||||||
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(2.3);
|
|
||||||
|
|
||||||
messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
|
messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
|
||||||
assertEquals(1, messages.size());
|
assertEquals(1, messages.size());
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
@ -205,9 +191,8 @@ public class ChangeEventParserTest extends CatalogApplicationTest {
|
|||||||
.withFieldsAdded(List.of(addColumn))
|
.withFieldsAdded(List.of(addColumn))
|
||||||
.withFieldsDeleted(List.of(deleteColumn))
|
.withFieldsDeleted(List.of(deleteColumn))
|
||||||
.withPreviousVersion(1.4);
|
.withPreviousVersion(1.4);
|
||||||
changeEvent.withChangeDescription(changeDescription).withPreviousVersion(0.1).withCurrentVersion(2.4);
|
|
||||||
|
|
||||||
messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE, changeEvent);
|
messages = ChangeEventParser.getFormattedMessages(changeDescription, TABLE);
|
||||||
assertEquals(1, messages.size());
|
assertEquals(1, messages.size());
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user