Check change events for PUT entity operation

This commit is contained in:
sureshms 2021-11-10 12:40:27 -08:00
parent e1238186b2
commit db38f08735
19 changed files with 183 additions and 93 deletions

View File

@ -395,8 +395,8 @@ CREATE TABLE IF NOT EXISTS change_event (
entityType VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.entityType') NOT NULL, entityType VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.entityType') NOT NULL,
userName VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.userName') NOT NULL, userName VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.userName') NOT NULL,
dateTime TIMESTAMP GENERATED ALWAYS AS (TIMESTAMP(STR_TO_DATE(json ->> '$.dateTime', '%Y-%m-%dT%T.%fZ'))) NOT NULL, dateTime TIMESTAMP GENERATED ALWAYS AS (TIMESTAMP(STR_TO_DATE(json ->> '$.dateTime', '%Y-%m-%dT%T.%fZ'))) NOT NULL,
-- dateTime DATE GENERATED ALWAYS AS (STR_TO_DATE(json ->> '$.dateTime', '%Y-%m-%dT%T.%fZ')) NOT NULL,
json JSON NOT NULL, json JSON NOT NULL,
timestamp BIGINT,
INDEX (dateTime), INDEX (dateTime),
INDEX (eventType), INDEX (eventType),
INDEX (entityType) INDEX (entityType)

View File

@ -82,6 +82,15 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
final JdbiFactory factory = new JdbiFactory(); final JdbiFactory factory = new JdbiFactory();
final Jdbi jdbi = factory.build(environment, catalogConfig.getDataSourceFactory(), "mysql3"); final Jdbi jdbi = factory.build(environment, catalogConfig.getDataSourceFactory(), "mysql3");
SqlLogger sqlLogger = new SqlLogger() {
@Override
public void logAfterExecution(StatementContext context) {
LOG.info("sql {}, parameters {}, timeTaken {} ms", context.getRenderedSql(),
context.getBinding().toString(), context.getElapsedTime(ChronoUnit.MILLIS));
}
};
jdbi.setSqlLogger(sqlLogger);
// Register Authorizer // Register Authorizer
registerAuthorizer(catalogConfig, environment, jdbi); registerAuthorizer(catalogConfig, environment, jdbi);

View File

@ -26,6 +26,7 @@ import org.openmetadata.catalog.util.EntityInterface;
import javax.ws.rs.core.UriInfo; import javax.ws.rs.core.UriInfo;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -139,5 +140,25 @@ public final class Entity {
} }
return entityRepository.getEntityInterface(entity); return entityRepository.getEntityInterface(entity);
} }
public static class EntityList {
public static final EntityList EMPTY_LIST = new EntityList(null);
private final List<String> list;
public EntityList(String entitiesParam) {
if (entitiesParam == null) {
list = Collections.emptyList();
return;
}
list = Arrays.asList(entitiesParam.replaceAll("\\s", "").split(","));
for (String field : list) {
// TODO validate entity
}
}
public List<String> getList() {
return list;
}
}
} }

View File

@ -21,8 +21,8 @@ import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.jdbi3.CollectionDAO; import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.type.ChangeEvent; import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.ChangeEvent.EventType;
import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.EventType;
import org.openmetadata.catalog.util.EntityInterface; import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil; import org.openmetadata.catalog.util.RestUtil;
@ -54,11 +54,11 @@ public class ChangeEventHandler implements EventHandler {
try { try {
Object entity = responseContext.getEntity(); Object entity = responseContext.getEntity();
String changeType = responseContext.getHeaderString(RestUtil.CHANGE_CUSTOM_HEADER); String changeType = responseContext.getHeaderString(RestUtil.CHANGE_CUSTOM_HEADER);
System.out.println("Change type is " + changeType);
if (responseCode == Status.CREATED.getStatusCode()) { if (responseCode == Status.CREATED.getStatusCode()) {
EntityInterface entityInterface = Entity.getEntityInterface(entity); EntityInterface entityInterface = Entity.getEntityInterface(entity);
EntityReference entityReference = Entity.getEntityReference(entity); EntityReference entityReference = Entity.getEntityReference(entity);
System.out.println("Entity created at " + entityInterface.getUpdatedAt().getTime());
changeEvent = new ChangeEvent() changeEvent = new ChangeEvent()
.withEventType(EventType.ENTITY_CREATED) .withEventType(EventType.ENTITY_CREATED)
.withEntityId(entityInterface.getId()) .withEntityId(entityInterface.getId())
@ -72,10 +72,7 @@ public class ChangeEventHandler implements EventHandler {
} else if (changeType.equals(RestUtil.ENTITY_UPDATED)) { } else if (changeType.equals(RestUtil.ENTITY_UPDATED)) {
EntityInterface entityInterface = Entity.getEntityInterface(entity); EntityInterface entityInterface = Entity.getEntityInterface(entity);
EntityReference entityReference = Entity.getEntityReference(entity); EntityReference entityReference = Entity.getEntityReference(entity);
System.out.println("Entity updated at " + entityInterface.getUpdatedAt().getTime());
System.out.println(entityInterface.getId());
System.out.println(entity);
System.out.println(entityReference.getType());
changeEvent = new ChangeEvent() changeEvent = new ChangeEvent()
.withEventType(EventType.ENTITY_UPDATED) .withEventType(EventType.ENTITY_UPDATED)
.withEntityId(entityInterface.getId()) .withEntityId(entityInterface.getId())
@ -83,7 +80,7 @@ public class ChangeEventHandler implements EventHandler {
.withUserName(entityInterface.getUpdatedBy()) .withUserName(entityInterface.getUpdatedBy())
.withDateTime(entityInterface.getUpdatedAt()) .withDateTime(entityInterface.getUpdatedAt())
.withChangeDescription(entityInterface.getChangeDescription()) .withChangeDescription(entityInterface.getChangeDescription())
.withPreviousVersion(entityInterface.getVersion()) .withPreviousVersion(entityInterface.getChangeDescription().getPreviousVersion())
.withCurrentVersion(entityInterface.getVersion()); .withCurrentVersion(entityInterface.getVersion());
} else if (changeType.equals(RestUtil.ENTITY_FIELDS_CHANGED)){ } else if (changeType.equals(RestUtil.ENTITY_FIELDS_CHANGED)){
@ -93,8 +90,10 @@ public class ChangeEventHandler implements EventHandler {
} }
if (changeEvent != null) { if (changeEvent != null) {
System.out.println("Adding change " + changeEvent);
dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
System.out.println("Adding change event " + changeEvent);
} else {
System.out.println("Change event not recorded");
} }
} catch(Exception e) { } catch(Exception e) {
LOG.error("Failed to capture change event for method {} due to {}", method, e); LOG.error("Failed to capture change event for method {} due to {}", method, e);

View File

@ -17,16 +17,22 @@
package org.openmetadata.catalog.jdbi3; package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.transaction.Transaction; import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.catalog.Entity.EntityList;
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList; import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
import org.openmetadata.catalog.type.ChangeEvent; import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EventType;
import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.ResultList; import org.openmetadata.catalog.util.ResultList;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.sql.Timestamp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List; import java.util.List;
public class ChangeEventRepository { public class ChangeEventRepository {
@ -36,14 +42,27 @@ public class ChangeEventRepository {
public ChangeEventRepository(CollectionDAO dao) { this.dao = dao; } public ChangeEventRepository(CollectionDAO dao) { this.dao = dao; }
@Transaction @Transaction
public ResultList<ChangeEvent> list(String date, List<String> eventTypes, List<String> entityTypes) throws IOException, public ResultList<ChangeEvent> list(Date date, EntityList entityCreatedList,
EntityList entityUpdatedList, EntityList entityDeletedList) throws IOException,
GeneralSecurityException { GeneralSecurityException {
List<String> jsons = dao.changeEventDAO().list(eventTypes, entityTypes, date); String dateStr = RestUtil.DATE_TIME_FORMAT.format(date);
Timestamp timestamp = new Timestamp(date.getTime());
System.out.println("Listing from timestamp " + timestamp);
System.out.println("Listing from timestamp " + timestamp.getTime());
System.out.println("Listing from time " + date.getTime());
List<String> jsons = new ArrayList<>();
jsons.addAll(dao.changeEventDAO().list(EventType.ENTITY_CREATED.value(), entityCreatedList.getList(),
timestamp.getTime()));
jsons.addAll(dao.changeEventDAO().list(EventType.ENTITY_UPDATED.value(), entityUpdatedList.getList(),
timestamp.getTime()));
jsons.addAll(dao.changeEventDAO().list(EventType.ENTITY_DELETED.value(), entityDeletedList.getList(),
timestamp.getTime()));
System.out.println("Total change events " + jsons.size()); System.out.println("Total change events " + jsons.size());
List<ChangeEvent> changeEvents = new ArrayList<>(); List<ChangeEvent> changeEvents = new ArrayList<>();
for (String json : jsons) { for (String json : jsons) {
changeEvents.add(JsonUtils.readValue(json, ChangeEvent.class)); changeEvents.add(JsonUtils.readValue(json, ChangeEvent.class));
} }
changeEvents.sort(Comparator.comparing((ChangeEvent changeEvent) -> changeEvent.getDateTime().getTime()).reversed());
return new ChangeEventList(changeEvents, null, null, changeEvents.size()); // TODO return new ChangeEventList(changeEvents, null, null, changeEvents.size()); // TODO
} }
@ -52,4 +71,5 @@ public class ChangeEventRepository {
dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
return changeEvent; return changeEvent;
} }
} }

View File

@ -73,7 +73,9 @@ import org.openmetadata.catalog.util.EntityUtil;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date;
import java.util.List; import java.util.List;
public interface CollectionDAO { public interface CollectionDAO {
@ -808,13 +810,13 @@ public interface CollectionDAO {
void insert(@Bind("json") String json); void insert(@Bind("json") String json);
@SqlQuery("SELECT json FROM change_event WHERE " + @SqlQuery("SELECT json FROM change_event WHERE " +
"(eventType IN (<eventTypes>) OR eventType IS NULL) AND " + "eventType = :eventType AND " +
// "(entityType IN (<entityTypes>) OR entityType IS NULL) " + // "(entityType IN (<entityTypes>) OR entityType IS NULL) " +
"(entityType IN (<entityTypes>) OR entityType IS NULL) AND " + "(entityType IN (<entityTypes>) OR entityType IS NULL) AND " +
"dateTime >= :date " + "dateTime >= :dateTime " +
"ORDER BY dateTime DESC") "ORDER BY dateTime DESC")
List<String> list(@BindList("eventTypes") List<String> eventTypes, List<String> list(@Bind("eventType") String eventType,
@BindList("entityTypes") List<String> entityTypes, @BindList("entityTypes") List<String> entityTypes,
@Bind("date") String date); @Bind("dateTime") long dateTime);
} }
} }

View File

@ -200,7 +200,6 @@ public abstract class EntityRepository<T> {
@Transaction @Transaction
public final PutResponse<T> createOrUpdate(UriInfo uriInfo, T updated) throws IOException, ParseException { public final PutResponse<T> createOrUpdate(UriInfo uriInfo, T updated) throws IOException, ParseException {
String change;
validate(updated); validate(updated);
T original = JsonUtils.readValue(dao.findJsonByFqn(getFullyQualifiedName(updated)), entityClass); T original = JsonUtils.readValue(dao.findJsonByFqn(getFullyQualifiedName(updated)), entityClass);
if (original == null) { if (original == null) {
@ -212,7 +211,7 @@ public abstract class EntityRepository<T> {
EntityUpdater entityUpdater = getUpdater(original, updated, false); EntityUpdater entityUpdater = getUpdater(original, updated, false);
entityUpdater.update(); entityUpdater.update();
entityUpdater.store(); entityUpdater.store();
change = entityUpdater.fieldsChanged() ? RestUtil.ENTITY_UPDATED : RestUtil.ENTITY_NO_CHANGE; String change = entityUpdater.fieldsChanged() ? RestUtil.ENTITY_UPDATED : RestUtil.ENTITY_NO_CHANGE;
return new PutResponse<>(Status.OK, withHref(uriInfo, updated), change); return new PutResponse<>(Status.OK, withHref(uriInfo, updated), change);
} }
@ -485,7 +484,11 @@ public abstract class EntityRepository<T> {
JsonUtils.pojoToJson(original.getEntity())); JsonUtils.pojoToJson(original.getEntity()));
// Store the new version // Store the new version
System.out.println("Storing new version");
EntityRepository.this.store(updated.getEntity(), true); EntityRepository.this.store(updated.getEntity(), true);
} else {
System.out.println("Restoring old version");
updated.setUpdateDetails(original.getUpdatedBy(), original.getUpdatedAt());
} }
} }
} }

View File

@ -23,13 +23,12 @@ import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.Entity.EntityList;
import org.openmetadata.catalog.jdbi3.ChangeEventRepository; import org.openmetadata.catalog.jdbi3.ChangeEventRepository;
import org.openmetadata.catalog.jdbi3.CollectionDAO; import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.resources.Collection; import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.security.CatalogAuthorizer; import org.openmetadata.catalog.security.CatalogAuthorizer;
import org.openmetadata.catalog.type.ChangeEvent; import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.ChangeEvent.EventType;
import org.openmetadata.catalog.util.RestUtil; import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.ResultList; import org.openmetadata.catalog.util.ResultList;
@ -84,25 +83,26 @@ public class EventResource {
schema = @Schema(implementation = ChangeEvent.class))), schema = @Schema(implementation = ChangeEvent.class))),
@ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found") @ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found")
}) })
public ResultList<ChangeEvent> get( public ResultList<ChangeEvent> get(@Context UriInfo uriInfo,
@Context UriInfo uriInfo, // TODO document
@Parameter(description = "Entity type for which usage is requested", @Parameter(description = "Entities requested for `entityCreated` event",
required = true, schema = @Schema(type = "string", example = "table,dashboard,..."))
schema = @Schema(type = "string", example = "table, report, metrics, or dashboard")) @QueryParam("entityCreated") String entityCreated,
@QueryParam("entityTypes") List<String> entityTypes, @Parameter(description = "Entities requested for `entityUpdated` event",
@Parameter(description = "Event types", schema = @Schema(type = "string", example = "table,dashboard,..."))
required = true, @QueryParam("entityUpdated") String entityUpdated,
schema = @Schema(type = "string")) @Parameter(description = "Entities requested for `entityDeleted` event",
@QueryParam("eventTypes") List<String> eventTypes, schema = @Schema(type = "string", example = "table,dashboard,..."))
@Parameter(description = "Events since this date and time (inclusive) in ISO 8601 format.", @QueryParam("entityDeleted") String entityDeleted,
required = true, @Parameter(description = "Events starting from this date time in ISO8601 format",
schema = @Schema(type = "string")) required = true,
@QueryParam("date") String date) throws IOException, GeneralSecurityException, ParseException { schema = @Schema(type = "string", example = "2021-01-28T10:00:00.000000Z"))
// TODO hack @QueryParam("date") String date)
eventTypes = List.of(EventType.ENTITY_CREATED.toString(), EventType.ENTITY_UPDATED.toString()); throws IOException, GeneralSecurityException, ParseException {
entityTypes = List.of(Entity.TABLE);
Date parsedDate = RestUtil.DATE_TIME_FORMAT.parse(date); Date parsedDate = RestUtil.DATE_TIME_FORMAT.parse(date);
date = RestUtil.DATE_FORMAT.format(parsedDate); EntityList entityCreatedList = new EntityList(entityCreated);
return dao.list(date, eventTypes, entityTypes); EntityList entityUpdatedList = new EntityList(entityCreated);
EntityList entityDeletedList = new EntityList(entityCreated);
return dao.list(parsedDate, entityCreatedList, entityUpdatedList, entityDeletedList);
} }
} }

View File

@ -171,7 +171,6 @@ public final class RestUtil {
public T getEntity() { return entity; } public T getEntity() { return entity; }
public Response toResponse() { public Response toResponse() {
System.out.println("Status " + status + " changeType " + changeType);
ResponseBuilder responseBuilder = Response.status(status).header(CHANGE_CUSTOM_HEADER, changeType); ResponseBuilder responseBuilder = Response.status(status).header(CHANGE_CUSTOM_HEADER, changeType);
if (changeType.equals(RestUtil.ENTITY_CREATED) || changeType.equals(RestUtil.ENTITY_UPDATED) || if (changeType.equals(RestUtil.ENTITY_CREATED) || changeType.equals(RestUtil.ENTITY_UPDATED) ||
changeType.equals(RestUtil.ENTITY_NO_CHANGE)) { changeType.equals(RestUtil.ENTITY_NO_CHANGE)) {

View File

@ -4,9 +4,11 @@
"title": "ChangeEvent", "title": "ChangeEvent",
"description": "This schema defines the change event type to capture the changes to entities. Entities change due to user activity, such as updating description of a dataset, changing ownership, or adding new tags. Entity also changes due to activities at the metadata sources, such as a new dataset was created, a datasets was deleted, or schema of a dataset is modified. When state of entity changes, an event is produced. These events can be used to build apps and bots that respond to the change from activities.", "description": "This schema defines the change event type to capture the changes to entities. Entities change due to user activity, such as updating description of a dataset, changing ownership, or adding new tags. Entity also changes due to activities at the metadata sources, such as a new dataset was created, a datasets was deleted, or schema of a dataset is modified. When state of entity changes, an event is produced. These events can be used to build apps and bots that respond to the change from activities.",
"type": "object", "type": "object",
"javaType": "org.openmetadata.catalog.type.ChangeEvent",
"definitions": { "definitions": {
"eventType" :{ "eventType" :{
"javaType": "org.openmetadata.catalog.type.EventType",
"description": "Type of event", "description": "Type of event",
"type": "string", "type": "string",
"enum": [ "enum": [
@ -15,20 +17,23 @@
"entityDeleted" "entityDeleted"
] ]
}, },
"versionChange" : { "eventFilter": {
"description": "Details of version change for `entityCreated`, `entityUpdated`, and `entityDeleted` events", "type": "object",
"type" : "object", "javaType": "org.openmetadata.catalog.type.EventFilter",
"properties": { "properties": {
"oldVersion" : { "eventType": {
"$ref" : "entityHistory.json#/definitions/entityVersion" "description": "Event type that is being requested.",
"$ref": "#/definitions/eventType"
}, },
"newVersion" : { "entities": {
"$ref" : "entityHistory.json#/definitions/entityVersion" "description": "Entities for which the events are needed. Example - `table`, `topic`, etc. **When not set, events for all the entities will be provided.**",
}, "type": "array",
"entity" : { "items": {
"description": "JSON coded string for the entity using the schema corresponding to `entityType` that includes the current value of entity with `changeDescription` attribute that provides details on fields added, updated, and deleted." "type": "string"
}
} }
} },
"required": ["eventType"]
} }
}, },
"properties" : { "properties" : {

View File

@ -29,9 +29,9 @@ import org.openmetadata.catalog.resources.teams.TeamResourceTest;
import org.openmetadata.catalog.resources.teams.UserResourceTest; import org.openmetadata.catalog.resources.teams.UserResourceTest;
import org.openmetadata.catalog.type.ChangeDescription; import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent; import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.ChangeEvent.EventType;
import org.openmetadata.catalog.type.EntityHistory; import org.openmetadata.catalog.type.EntityHistory;
import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.EventType;
import org.openmetadata.catalog.type.FieldChange; import org.openmetadata.catalog.type.FieldChange;
import org.openmetadata.catalog.type.TagLabel; import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.util.EntityInterface; import org.openmetadata.catalog.util.EntityInterface;
@ -83,6 +83,7 @@ import static org.openmetadata.catalog.util.TestUtils.userAuthHeaders;
public abstract class EntityResourceTest<T> extends CatalogApplicationTest { public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
private static final Logger LOG = LoggerFactory.getLogger(EntityResourceTest.class); private static final Logger LOG = LoggerFactory.getLogger(EntityResourceTest.class);
private final String entityName;
private final Class<T> entityClass; private final Class<T> entityClass;
private final Class<? extends ResultList<T>> entityListClass; private final Class<? extends ResultList<T>> entityListClass;
private final String collectionName; private final String collectionName;
@ -110,8 +111,10 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
public static final TagLabel USER_BANK_ACCOUNT_TAG_LABEL = new TagLabel().withTagFQN("User.BankAccount"); public static final TagLabel USER_BANK_ACCOUNT_TAG_LABEL = new TagLabel().withTagFQN("User.BankAccount");
public static final TagLabel TIER1_TAG_LABEL = new TagLabel().withTagFQN("Tier.Tier1"); public static final TagLabel TIER1_TAG_LABEL = new TagLabel().withTagFQN("Tier.Tier1");
public EntityResourceTest(Class<T> entityClass, Class<? extends ResultList<T>> entityListClass, String collectionName, public EntityResourceTest(String entityName, Class<T> entityClass, Class<? extends ResultList<T>> entityListClass,
String collectionName,
String fields, boolean supportsFollowers, boolean supportsOwner, boolean supportsTags) { String fields, boolean supportsFollowers, boolean supportsOwner, boolean supportsTags) {
this.entityName = entityName;
this.entityClass = entityClass; this.entityClass = entityClass;
this.entityListClass = entityListClass; this.entityListClass = entityListClass;
this.collectionName = collectionName; this.collectionName = collectionName;
@ -196,7 +199,7 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
throws HttpResponseException; throws HttpResponseException;
// Entity specific validate for entity create using PATCH // Entity specific validate for entity create using PATCH
public abstract void validatePatchedEntity(T expected, T updated, Map<String, String> authHeaders) public abstract void compareEntities(T expected, T updated, Map<String, String> authHeaders)
throws HttpResponseException; throws HttpResponseException;
// Get interface to access all common entity attributes // Get interface to access all common entity attributes
@ -626,7 +629,6 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
EntityInterface<T> entityInterface = getEntityInterface(updated); EntityInterface<T> entityInterface = getEntityInterface(updated);
validateUpdatedEntity(updated, request, authHeaders); validateUpdatedEntity(updated, request, authHeaders);
validateChangeDescription(updated, updateType, changeDescription); validateChangeDescription(updated, updateType, changeDescription);
validateEntityHistory(entityInterface.getId(), updateType, changeDescription, authHeaders); validateEntityHistory(entityInterface.getId(), updateType, changeDescription, authHeaders);
// GET ../entity/{id}/versions/{versionId} to get specific versions of the entity // GET ../entity/{id}/versions/{versionId} to get specific versions of the entity
@ -643,15 +645,7 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
T getEntity = getEntity(entityInterface.getId(), authHeaders); T getEntity = getEntity(entityInterface.getId(), authHeaders);
validateUpdatedEntity(getEntity, request, authHeaders); validateUpdatedEntity(getEntity, request, authHeaders);
validateChangeDescription(getEntity, updateType, changeDescription); validateChangeDescription(getEntity, updateType, changeDescription);
validateChangeEvents(entityInterface, updateType, authHeaders);
// Finally get changeEvents and ensure it is correct
List<String> entityNames = List.of(entityInterface.getEntityReference().getType());
List<EventType> eventTypes = List.of(EventType.ENTITY_CREATED, EventType.ENTITY_UPDATED);
ResultList<ChangeEvent> changeEvents = getChangeEvents(eventTypes, entityNames, entityInterface.getUpdatedAt(),
authHeaders);
System.out.println("data " + changeEvents.getData());
System.out.println("size " + changeEvents.getData().size());
changeEvents.getData().forEach(e -> System.out.println(e));
return updated; return updated;
} }
@ -680,12 +674,12 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
// Validate information returned in patch response has the updates // Validate information returned in patch response has the updates
T returned = patchEntity(entityInterface.getId(), originalJson, updated, authHeaders); T returned = patchEntity(entityInterface.getId(), originalJson, updated, authHeaders);
validatePatchedEntity(updated, returned, authHeaders); compareEntities(updated, returned, authHeaders);
validateChangeDescription(returned, updateType, expectedChange); validateChangeDescription(returned, updateType, expectedChange);
// GET the entity and Validate information returned // GET the entity and Validate information returned
T getEntity = getEntity(entityInterface.getId(), authHeaders); T getEntity = getEntity(entityInterface.getId(), authHeaders);
validatePatchedEntity(updated, getEntity, authHeaders); compareEntities(updated, getEntity, authHeaders);
validateChangeDescription(getEntity, updateType, expectedChange); validateChangeDescription(getEntity, updateType, expectedChange);
return returned; return returned;
} }
@ -718,16 +712,45 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
} }
} }
protected final void validateChangeEvents(EntityInterface<T> entityInterface, UpdateType updateType,
Map<String, String> authHeaders) throws IOException {
ResultList<ChangeEvent> changeEvents = getChangeEvents(entityName, entityName, null,
entityInterface.getUpdatedAt(), authHeaders);
assertTrue(changeEvents.getData().size() > 0);
// Top most changeEvent corresponds to the update
ChangeEvent changeEvent = changeEvents.getData().get(0);
assertEquals(changeEvent.getDateTime().getTime(), entityInterface.getUpdatedAt().getTime());
assertEquals(changeEvent.getEntityId(), entityInterface.getId());
assertEquals(changeEvent.getCurrentVersion(), entityInterface.getVersion());
assertEquals(changeEvent.getUserName(), entityInterface.getUpdatedBy());
assertEquals(changeEvent.getEntityType(), entityName);
if (updateType == UpdateType.CREATED) {
assertEquals(changeEvent.getEventType(), EventType.ENTITY_CREATED);
assertEquals(changeEvent.getPreviousVersion(), 0.1);
assertNull(changeEvent.getChangeDescription());
compareEntities(entityInterface.getEntity(),
JsonUtils.readValue((String)changeEvent.getEntity(), entityClass), authHeaders);
} else if (updateType == MINOR_UPDATE) {
}
}
protected EntityHistory getVersionList(UUID id, Map<String, String> authHeaders) throws HttpResponseException { protected EntityHistory getVersionList(UUID id, Map<String, String> authHeaders) throws HttpResponseException {
WebTarget target = getResource(collectionName + "/" + id + "/versions"); WebTarget target = getResource(collectionName + "/" + id + "/versions");
return TestUtils.get(target, EntityHistory.class, authHeaders); return TestUtils.get(target, EntityHistory.class, authHeaders);
} }
protected ResultList<ChangeEvent> getChangeEvents(List<EventType> eventTypes, List<String> entityTypes, Date date, protected ResultList<ChangeEvent> getChangeEvents(String entityCreated, String entityUpdated, String entityDeleted,
Map<String, String> authHeaders) throws HttpResponseException { Date date, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource("events"); WebTarget target = getResource("events");
target = target.queryParam("eventTypes", eventTypes); target = entityCreated == null ? target : target.queryParam("entityCreated", entityCreated);
target = target.queryParam("entityTypes", entityTypes); target = entityUpdated == null ? target : target.queryParam("entityUpdated", entityUpdated);
target = entityDeleted == null ? target : target.queryParam("entityDeleted", entityDeleted);
target = target.queryParam("date", RestUtil.DATE_TIME_FORMAT.format(date)); target = target.queryParam("date", RestUtil.DATE_TIME_FORMAT.format(date));
return TestUtils.get(target, ChangeEventList.class, authHeaders); return TestUtils.get(target, ChangeEventList.class, authHeaders);
} }

View File

@ -65,7 +65,8 @@ public class ChartResourceTest extends EntityResourceTest<Chart> {
public static EntityReference LOOKER_REFERENCE; public static EntityReference LOOKER_REFERENCE;
public ChartResourceTest() { public ChartResourceTest() {
super(Chart.class, ChartList.class, "charts", ChartResource.FIELDS, true, true, true); super(Entity.CHART, Chart.class, ChartList.class, "charts", ChartResource.FIELDS,
true, true, true);
} }
@BeforeAll @BeforeAll
@ -306,7 +307,7 @@ public class ChartResourceTest extends EntityResourceTest<Chart> {
} }
@Override @Override
public void validatePatchedEntity(Chart expected, Chart patched, Map<String, String> authHeaders) { public void compareEntities(Chart expected, Chart patched, Map<String, String> authHeaders) {
validateCommonEntityFields(getEntityInterface(patched), expected.getDescription(), validateCommonEntityFields(getEntityInterface(patched), expected.getDescription(),
TestUtils.getPrincipal(authHeaders), expected.getOwner()); TestUtils.getPrincipal(authHeaders), expected.getOwner());
assertService(expected.getService(), patched.getService()); assertService(expected.getService(), patched.getService());

View File

@ -78,7 +78,8 @@ public class DashboardResourceTest extends EntityResourceTest<Dashboard> {
public static List<EntityReference> CHART_REFERENCES; public static List<EntityReference> CHART_REFERENCES;
public DashboardResourceTest() { public DashboardResourceTest() {
super(Dashboard.class, DashboardList.class, "dashboards", DashboardResource.FIELDS, true, true, true); super(Entity.DASHBOARD, Dashboard.class, DashboardList.class, "dashboards", DashboardResource.FIELDS, true,
true, true);
} }
@ -397,7 +398,7 @@ public class DashboardResourceTest extends EntityResourceTest<Dashboard> {
} }
@Override @Override
public void validatePatchedEntity(Dashboard expected, Dashboard updated, Map<String, String> authHeaders) { public void compareEntities(Dashboard expected, Dashboard updated, Map<String, String> authHeaders) {
} }
@Override @Override

View File

@ -54,7 +54,8 @@ import static org.openmetadata.catalog.util.TestUtils.authHeaders;
public class DatabaseResourceTest extends EntityResourceTest<Database> { public class DatabaseResourceTest extends EntityResourceTest<Database> {
public DatabaseResourceTest() { public DatabaseResourceTest() {
super(Database.class, DatabaseList.class, "databases", DatabaseResource.FIELDS, false, true, false); super(Entity.DATABASE, Database.class, DatabaseList.class, "databases",
DatabaseResource.FIELDS, false, true, false);
} }
@BeforeAll @BeforeAll
@ -317,7 +318,7 @@ public class DatabaseResourceTest extends EntityResourceTest<Database> {
} }
@Override @Override
public void validatePatchedEntity(Database expected, Database updated, Map<String, String> authHeaders) { public void compareEntities(Database expected, Database updated, Map<String, String> authHeaders) {
validateCommonEntityFields(getEntityInterface(updated), expected.getDescription(), validateCommonEntityFields(getEntityInterface(updated), expected.getDescription(),
TestUtils.getPrincipal(authHeaders), expected.getOwner()); TestUtils.getPrincipal(authHeaders), expected.getOwner());
// Validate service // Validate service

View File

@ -130,7 +130,8 @@ public class TableResourceTest extends EntityResourceTest<Table> {
public TableResourceTest() { public TableResourceTest() {
super(Table.class, TableList.class, "tables", TableResource.FIELDS, true, true, true); super(Entity.TABLE, Table.class, TableList.class, "tables", TableResource.FIELDS,
true, true, true);
} }
@BeforeAll @BeforeAll
@ -278,10 +279,12 @@ public class TableResourceTest extends EntityResourceTest<Table> {
CreateTable create1 = create(test, 1).withColumns(Arrays.asList(c1, c2)); CreateTable create1 = create(test, 1).withColumns(Arrays.asList(c1, c2));
Table table1 = createAndCheckEntity(create1, adminAuthHeaders()); Table table1 = createAndCheckEntity(create1, adminAuthHeaders());
// Test PUT operation // Test PUT operation - put operation to create
CreateTable create2 = create(test, 2).withColumns(Arrays.asList(c1, c2)).withName("put_complexColumnType"); CreateTable create2 = create(test, 2).withColumns(Arrays.asList(c1, c2)).withName("put_complexColumnType");
Table table2= updateAndCheckEntity(create2, CREATED, adminAuthHeaders(), UpdateType.CREATED, null); System.out.println("Put to create");
// Update without any change Table table2 = updateAndCheckEntity(create2, CREATED, adminAuthHeaders(), UpdateType.CREATED, null);
// Test PUT operation again without any change
System.out.println("Put with no change");
ChangeDescription change = getChangeDescription(table2.getVersion()); ChangeDescription change = getChangeDescription(table2.getVersion());
updateAndCheckEntity(create2, Status.OK, adminAuthHeaders(), NO_CHANGE, change); updateAndCheckEntity(create2, Status.OK, adminAuthHeaders(), NO_CHANGE, change);
@ -1315,7 +1318,7 @@ public class TableResourceTest extends EntityResourceTest<Table> {
} }
@Override @Override
public void validatePatchedEntity(Table expected, Table patched, Map<String, String> authHeaders) throws HttpResponseException { public void compareEntities(Table expected, Table patched, Map<String, String> authHeaders) throws HttpResponseException {
validateCommonEntityFields(getEntityInterface(patched), expected.getDescription(), validateCommonEntityFields(getEntityInterface(patched), expected.getDescription(),
TestUtils.getPrincipal(authHeaders), expected.getOwner()); TestUtils.getPrincipal(authHeaders), expected.getOwner());
@ -1344,14 +1347,13 @@ public class TableResourceTest extends EntityResourceTest<Table> {
return; return;
} }
if (fieldName.startsWith("columns") && fieldName.endsWith("constraint")) { if (fieldName.startsWith("columns") && fieldName.endsWith("constraint")) {
// Column constraint
ColumnConstraint expectedConstraint = (ColumnConstraint) expected; ColumnConstraint expectedConstraint = (ColumnConstraint) expected;
ColumnConstraint actualConstraint = ColumnConstraint.fromValue((String) actual); ColumnConstraint actualConstraint = ColumnConstraint.fromValue((String) actual);
assertEquals(expectedConstraint, actualConstraint); assertEquals(expectedConstraint, actualConstraint);
} else if (fieldName.endsWith("tableConstraints")) { } else if (fieldName.endsWith("tableConstraints")) {
List<TableConstraint> expectedConstraints = (List<TableConstraint>) expected; List<TableConstraint> expectedConstraints = (List<TableConstraint>) expected;
List<TableConstraint> actualConstraints = JsonUtils.readObjects(actual.toString(), TableConstraint.class); List<TableConstraint> actualConstraints = JsonUtils.readObjects(actual.toString(), TableConstraint.class);
assertEquals(expectedConstraints, actualConstraints); assertEquals(expectedConstraints, actualConstraints);
} else if (fieldName.contains("columns") && !fieldName.endsWith("tags") && !fieldName.endsWith("description")) { } else if (fieldName.contains("columns") && !fieldName.endsWith("tags") && !fieldName.endsWith("description")) {
List<Column> expectedRefs = (List<Column>) expected; List<Column> expectedRefs = (List<Column>) expected;
List<Column> actualRefs = JsonUtils.readObjects(actual.toString(), Column.class); List<Column> actualRefs = JsonUtils.readObjects(actual.toString(), Column.class);

View File

@ -68,7 +68,8 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline> {
public static List<Task> TASKS; public static List<Task> TASKS;
public PipelineResourceTest() { public PipelineResourceTest() {
super(Pipeline.class, PipelineList.class, "pipelines", PipelineResource.FIELDS, true, true, true); super(Entity.PIPELINE, Pipeline.class, PipelineList.class, "pipelines", PipelineResource.FIELDS,
true, true, true);
} }
@ -106,7 +107,7 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline> {
} }
@Override @Override
public void validatePatchedEntity(Pipeline expected, Pipeline updated, Map<String, String> authHeaders) throws HttpResponseException { public void compareEntities(Pipeline expected, Pipeline updated, Map<String, String> authHeaders) throws HttpResponseException {
validateCommonEntityFields(getEntityInterface(updated), expected.getDescription(), validateCommonEntityFields(getEntityInterface(updated), expected.getDescription(),
TestUtils.getPrincipal(authHeaders), expected.getOwner()); TestUtils.getPrincipal(authHeaders), expected.getOwner());
assertEquals(expected.getDisplayName(), updated.getDisplayName()); assertEquals(expected.getDisplayName(), updated.getDisplayName());

View File

@ -71,7 +71,8 @@ public class TeamResourceTest extends EntityResourceTest<Team> {
final Profile PROFILE = new Profile().withImages(new ImageList().withImage(URI.create("http://image.com"))); final Profile PROFILE = new Profile().withImages(new ImageList().withImage(URI.create("http://image.com")));
public TeamResourceTest() { public TeamResourceTest() {
super(Team.class, TeamList.class, "teams", TeamResource.FIELDS, false, false, false); super(Entity.TEAM, Team.class, TeamList.class, "teams", TeamResource.FIELDS,
false, false, false);
} }
@Test @Test
@ -451,7 +452,7 @@ public class TeamResourceTest extends EntityResourceTest<Team> {
} }
@Override @Override
public void validatePatchedEntity(Team expected, Team updated, Map<String, String> authHeaders) { public void compareEntities(Team expected, Team updated, Map<String, String> authHeaders) {
validateCommonEntityFields(getEntityInterface(updated), expected.getDescription(), validateCommonEntityFields(getEntityInterface(updated), expected.getDescription(),
TestUtils.getPrincipal(authHeaders), null); TestUtils.getPrincipal(authHeaders), null);

View File

@ -83,7 +83,8 @@ public class UserResourceTest extends EntityResourceTest<User> {
final Profile PROFILE = new Profile().withImages(new ImageList().withImage(URI.create("http://image.com"))); final Profile PROFILE = new Profile().withImages(new ImageList().withImage(URI.create("http://image.com")));
public UserResourceTest() { public UserResourceTest() {
super(User.class, UserList.class, "users", UserResource.FIELDS, false, false, false); super(Entity.USER, User.class, UserList.class, "users", UserResource.FIELDS,
false, false, false);
} }
@Test @Test
@ -321,7 +322,7 @@ public class UserResourceTest extends EntityResourceTest<User> {
assertNull(user.getDisplayName()); assertNull(user.getDisplayName());
assertNull(user.getIsBot()); assertNull(user.getIsBot());
assertNull(user.getProfile()); assertNull(user.getProfile());
assertNull(user.getDeactivated()); // assertNull(user.getDeactivated());
assertNull(user.getTimezone()); assertNull(user.getTimezone());
EntityReference team1 = new TeamEntityInterface(createTeam(TeamResourceTest.create(test, 1), EntityReference team1 = new TeamEntityInterface(createTeam(TeamResourceTest.create(test, 1),
@ -562,7 +563,7 @@ public class UserResourceTest extends EntityResourceTest<User> {
} }
@Override @Override
public void validatePatchedEntity(User expected, User updated, Map<String, String> authHeaders) { public void compareEntities(User expected, User updated, Map<String, String> authHeaders) {
validateCommonEntityFields(getEntityInterface(expected), expected.getDescription(), validateCommonEntityFields(getEntityInterface(expected), expected.getDescription(),
TestUtils.getPrincipal(authHeaders), null); TestUtils.getPrincipal(authHeaders), null);

View File

@ -54,7 +54,8 @@ import static org.openmetadata.catalog.util.TestUtils.authHeaders;
public class TopicResourceTest extends EntityResourceTest<Topic> { public class TopicResourceTest extends EntityResourceTest<Topic> {
public TopicResourceTest() { public TopicResourceTest() {
super(Topic.class, TopicList.class, "topics", TopicResource.FIELDS, true, true, true); super(Entity.TOPIC, Topic.class, TopicList.class, "topics", TopicResource.FIELDS,
true, true, true);
} }
@Test @Test
@ -304,7 +305,7 @@ public class TopicResourceTest extends EntityResourceTest<Topic> {
} }
@Override @Override
public void validatePatchedEntity(Topic expected, Topic updated, Map<String, String> authHeaders) throws HttpResponseException { public void compareEntities(Topic expected, Topic updated, Map<String, String> authHeaders) throws HttpResponseException {
validateCommonEntityFields(getEntityInterface(updated), expected.getDescription(), validateCommonEntityFields(getEntityInterface(updated), expected.getDescription(),
TestUtils.getPrincipal(authHeaders), expected.getOwner()); TestUtils.getPrincipal(authHeaders), expected.getOwner());
assertService(expected.getService(), expected.getService()); assertService(expected.getService(), expected.getService());