mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-26 09:22:14 +00:00
Fixes 1486 - Refactor ChangeEventHandler to make getting change events a utility method (#1487)
Co-authored-by: sureshms <suresh@getcollate.io>
This commit is contained in:
parent
bbb1f03137
commit
3ae3eec3d0
@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
package org.openmetadata.catalog.events;
|
package org.openmetadata.catalog.events;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import org.jdbi.v3.core.Jdbi;
|
import org.jdbi.v3.core.Jdbi;
|
||||||
import org.openmetadata.catalog.CatalogApplicationConfig;
|
import org.openmetadata.catalog.CatalogApplicationConfig;
|
||||||
import org.openmetadata.catalog.Entity;
|
import org.openmetadata.catalog.Entity;
|
||||||
@ -43,62 +44,81 @@ public class ChangeEventHandler implements EventHandler {
|
|||||||
|
|
||||||
public Void process(ContainerRequestContext requestContext,
|
public Void process(ContainerRequestContext requestContext,
|
||||||
ContainerResponseContext responseContext) {
|
ContainerResponseContext responseContext) {
|
||||||
int responseCode = responseContext.getStatus();
|
|
||||||
String method = requestContext.getMethod();
|
String method = requestContext.getMethod();
|
||||||
if (method.equals("GET")) {
|
try {
|
||||||
return null;
|
ChangeEvent changeEvent = getChangeEvent(method, responseContext);
|
||||||
}
|
if (changeEvent != null) {
|
||||||
|
LOG.info("Recording change event {} {}", changeEvent.getDateTime().getTime(), changeEvent);
|
||||||
if (responseContext.getEntity() != null) {
|
if (changeEvent.getEntity() != null) {
|
||||||
ChangeEvent changeEvent = null;
|
changeEvent.setEntity(JsonUtils.pojoToJson(changeEvent.getEntity()));
|
||||||
try {
|
|
||||||
Object entity = responseContext.getEntity();
|
|
||||||
String changeType = responseContext.getHeaderString(RestUtil.CHANGE_CUSTOM_HEADER);
|
|
||||||
|
|
||||||
if (responseCode == Status.CREATED.getStatusCode() && !RestUtil.ENTITY_FIELDS_CHANGED.equals(changeType)) {
|
|
||||||
EntityInterface entityInterface = Entity.getEntityInterface(entity);
|
|
||||||
EntityReference entityReference = Entity.getEntityReference(entity);
|
|
||||||
changeEvent = new ChangeEvent()
|
|
||||||
.withEventType(EventType.ENTITY_CREATED)
|
|
||||||
.withEntityId(entityInterface.getId())
|
|
||||||
.withEntityType(entityReference.getType())
|
|
||||||
.withUserName(entityInterface.getUpdatedBy())
|
|
||||||
.withDateTime(entityInterface.getUpdatedAt())
|
|
||||||
.withEntity(JsonUtils.pojoToJson(entity))
|
|
||||||
.withPreviousVersion(entityInterface.getVersion())
|
|
||||||
.withCurrentVersion(entityInterface.getVersion());
|
|
||||||
|
|
||||||
} else if (changeType == null) {
|
|
||||||
return null;
|
|
||||||
} else if (changeType.equals(RestUtil.ENTITY_UPDATED)) {
|
|
||||||
EntityInterface entityInterface = Entity.getEntityInterface(entity);
|
|
||||||
EntityReference entityReference = Entity.getEntityReference(entity);
|
|
||||||
changeEvent = new ChangeEvent()
|
|
||||||
.withEventType(EventType.ENTITY_UPDATED)
|
|
||||||
.withEntityId(entityInterface.getId())
|
|
||||||
.withEntityType(entityReference.getType())
|
|
||||||
.withUserName(entityInterface.getUpdatedBy())
|
|
||||||
.withDateTime(entityInterface.getUpdatedAt())
|
|
||||||
.withChangeDescription(entityInterface.getChangeDescription())
|
|
||||||
.withPreviousVersion(entityInterface.getChangeDescription().getPreviousVersion())
|
|
||||||
.withCurrentVersion(entityInterface.getVersion());
|
|
||||||
|
|
||||||
} else if (changeType.equals(RestUtil.ENTITY_FIELDS_CHANGED)){
|
|
||||||
changeEvent = (ChangeEvent) entity;
|
|
||||||
} else if (changeType.equals(RestUtil.ENTITY_DELETED)) {
|
|
||||||
changeEvent = (ChangeEvent) entity;
|
|
||||||
}
|
}
|
||||||
|
dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
|
||||||
if (changeEvent != null) {
|
|
||||||
LOG.info("Recording change event {} {}", changeEvent.getDateTime().getTime(), changeEvent);
|
|
||||||
dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
|
|
||||||
}
|
|
||||||
} catch(Exception e) {
|
|
||||||
LOG.error("Failed to capture change event for method {} due to {}", method, e);
|
|
||||||
}
|
}
|
||||||
|
} catch(Exception e) {
|
||||||
|
LOG.error("Failed to capture change event for method {} due to {}", method, e);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ChangeEvent getChangeEvent(String method, ContainerResponseContext responseContext)
|
||||||
|
throws JsonProcessingException {
|
||||||
|
// GET operations don't produce change events
|
||||||
|
if (method.equals("GET")) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Object entity = responseContext.getEntity();
|
||||||
|
if (entity == null) {
|
||||||
|
return null; // Response has no entity to produce change event from
|
||||||
|
}
|
||||||
|
|
||||||
|
int responseCode = responseContext.getStatus();
|
||||||
|
String changeType = responseContext.getHeaderString(RestUtil.CHANGE_CUSTOM_HEADER);
|
||||||
|
|
||||||
|
// Entity was created by either POST .../entities or PUT .../entities
|
||||||
|
if (responseCode == Status.CREATED.getStatusCode() && !RestUtil.ENTITY_FIELDS_CHANGED.equals(changeType)) {
|
||||||
|
EntityInterface entityInterface = Entity.getEntityInterface(entity);
|
||||||
|
String entityType = Entity.getEntityReference(entity).getType();
|
||||||
|
return getChangeEvent(EventType.ENTITY_CREATED, entityType, entityInterface)
|
||||||
|
.withEntity(entity);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// PUT or PATCH operation didn't result in any change
|
||||||
|
if (changeType == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entity was updated by either PUT .../entities or PATCH .../entities
|
||||||
|
if (changeType.equals(RestUtil.ENTITY_UPDATED)) {
|
||||||
|
EntityInterface entityInterface = Entity.getEntityInterface(entity);
|
||||||
|
String entityType = Entity.getEntityReference(entity).getType();
|
||||||
|
return getChangeEvent(EventType.ENTITY_UPDATED, entityType, entityInterface)
|
||||||
|
.withPreviousVersion(entityInterface.getChangeDescription().getPreviousVersion());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entity field was updated by PUT .../entities/{id}/fieldName - Example PUT ../tables/{id}/follower
|
||||||
|
if (changeType.equals(RestUtil.ENTITY_FIELDS_CHANGED)){
|
||||||
|
return (ChangeEvent) entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entity was deleted by DELETE .../entities/{id}
|
||||||
|
if (changeType.equals(RestUtil.ENTITY_DELETED)) {
|
||||||
|
return (ChangeEvent) entity;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ChangeEvent getChangeEvent(EventType eventType, String entityType, EntityInterface entityInterface) {
|
||||||
|
return new ChangeEvent()
|
||||||
|
.withEventType(eventType)
|
||||||
|
.withEntityId(entityInterface.getId())
|
||||||
|
.withEntityType(entityType)
|
||||||
|
.withUserName(entityInterface.getUpdatedBy())
|
||||||
|
.withDateTime(entityInterface.getUpdatedAt())
|
||||||
|
.withChangeDescription(entityInterface.getChangeDescription())
|
||||||
|
.withCurrentVersion(entityInterface.getVersion());
|
||||||
|
}
|
||||||
|
|
||||||
public void close() {}
|
public void close() {}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user