From fe49d144df817de35146d7dfd1455258a41998c7 Mon Sep 17 00:00:00 2001 From: sureshms Date: Tue, 9 Nov 2021 18:01:17 -0800 Subject: [PATCH] WIP - adding tests to check change events --- .../mysql/v001__create_db_connection_info.sql | 2 +- .../catalog/events/ChangeEventHandler.java | 24 ++++++-- .../catalog/jdbi3/ChangeEventRepository.java | 11 +++- .../catalog/jdbi3/CollectionDAO.java | 10 ++-- .../resources/events/EventsResource.java | 28 +++++++--- .../catalog/resources/EntityResourceTest.java | 56 ++++++++++++++----- 6 files changed, 96 insertions(+), 35 deletions(-) diff --git a/bootstrap/sql/mysql/v001__create_db_connection_info.sql b/bootstrap/sql/mysql/v001__create_db_connection_info.sql index 28b9e4c6ece..27a724382da 100644 --- a/bootstrap/sql/mysql/v001__create_db_connection_info.sql +++ b/bootstrap/sql/mysql/v001__create_db_connection_info.sql @@ -393,7 +393,7 @@ CREATE TABLE IF NOT EXISTS tag_usage ( CREATE TABLE IF NOT EXISTS change_event ( eventType VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.eventType') NOT NULL, entityType VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.entityType') NOT NULL, - username VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.userName') NOT NULL, + userName VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.userName') NOT NULL, dateTime TIMESTAMP GENERATED ALWAYS AS (TIMESTAMP(STR_TO_DATE(json ->> '$.dateTime', '%Y-%m-%dT%T.%fZ'))) NOT NULL, json JSON NOT NULL, timestamp BIGINT, diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java index ac0797369aa..d5bb6701314 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java @@ -59,33 +59,45 @@ public class ChangeEventHandler implements EventHandler { if (responseCode == Status.CREATED.getStatusCode()) { EntityInterface entityInterface = Entity.getEntityInterface(entity); EntityReference entityReference = Entity.getEntityReference(entity); - - changeEvent.withEventType(EventType.ENTITY_CREATED).withEntityId(entityInterface.getId()) - .withEntityType(entityReference.getType()).withDateTime(entityInterface.getUpdatedAt()) + changeEvent = new ChangeEvent() + .withEventType(EventType.ENTITY_CREATED) + .withEntityId(entityInterface.getId()) + .withEntityType(entityReference.getType()) + .withUserName(entityInterface.getUpdatedBy()) + .withDateTime(entityInterface.getUpdatedAt()) .withEntity(JsonUtils.pojoToJson(entity)) .withPreviousVersion(entityInterface.getVersion()) .withCurrentVersion(entityInterface.getVersion()); + } else if (changeType.equals(RestUtil.ENTITY_UPDATED)) { EntityInterface entityInterface = Entity.getEntityInterface(entity); EntityReference entityReference = Entity.getEntityReference(entity); - changeEvent.withEventType(EventType.ENTITY_UPDATED).withEntityId(entityInterface.getId()) + + System.out.println(entityInterface.getId()); + System.out.println(entity); + System.out.println(entityReference.getType()); + changeEvent = new ChangeEvent() + .withEventType(EventType.ENTITY_UPDATED) + .withEntityId(entityInterface.getId()) .withEntityType(entityReference.getType()) + .withUserName(entityInterface.getUpdatedBy()) .withDateTime(entityInterface.getUpdatedAt()) .withChangeDescription(entityInterface.getChangeDescription()) .withPreviousVersion(entityInterface.getVersion()) .withCurrentVersion(entityInterface.getVersion()); + } else if (changeType.equals(RestUtil.ENTITY_FIELDS_CHANGED)){ changeEvent = (ChangeEvent) entity; } else if (changeType.equals(RestUtil.ENTITY_DELETED)) { changeEvent = (ChangeEvent) entity; } - System.out.println(changeEvent); if (changeEvent != null) { + System.out.println("Adding change " + changeEvent); dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); } } catch(Exception e) { - LOG.error("Failed to capture change event for {} and method {} due to {}", method, e.getMessage()); + LOG.error("Failed to capture change event for method {} due to {}", method, e); } } return null; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChangeEventRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChangeEventRepository.java index 151ea81082b..7d0703f0d0d 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChangeEventRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChangeEventRepository.java @@ -17,6 +17,7 @@ package org.openmetadata.catalog.jdbi3; import org.jdbi.v3.sqlobject.transaction.Transaction; +import org.openmetadata.catalog.resources.events.EventsResource.ChangeEventList; import org.openmetadata.catalog.type.ChangeEvent; import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.ResultList; @@ -24,7 +25,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.security.GeneralSecurityException; import java.util.ArrayList; +import java.util.Date; import java.util.List; public class ChangeEventRepository { @@ -34,13 +37,15 @@ public class ChangeEventRepository { public ChangeEventRepository(CollectionDAO dao) { this.dao = dao; } @Transaction - public ResultList list(String date, List eventTypes, List entityTypes) throws IOException { - List jsons = dao.changeEventDAO().list(eventTypes, entityTypes,date); + public ResultList list(String date, List eventTypes, List entityTypes) throws IOException, + GeneralSecurityException { + List jsons = dao.changeEventDAO().list(eventTypes, entityTypes, date); + System.out.println("Total change events " + jsons.size()); List changeEvents = new ArrayList<>(); for (String json : jsons) { changeEvents.add(JsonUtils.readValue(json, ChangeEvent.class)); } - return null; // TODO + return new ChangeEventList(changeEvents, null, null, changeEvents.size()); // TODO } @Transaction diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java index 3d9b503e2c5..392b24d3e09 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java @@ -808,11 +808,13 @@ public interface CollectionDAO { void insert(@Bind("json") String json); @SqlQuery("SELECT json FROM change_event WHERE " + - "(eventType IN () OR eventTypes IS NULL) AND " + - "(entityType IN () OR entityTypes IS NULL) AND " + + "(eventType IN () OR eventType IS NULL) AND " + +// "(entityType IN () OR entityType IS NULL) " + + "(entityType IN () OR entityType IS NULL) AND " + "dateTime >= :date " + - "ORDER BY dateTime") - List list(@BindList("eventTypes") List eventTypes, @Bind("entityTypes") List entityTypes, + "ORDER BY dateTime DESC") + List list(@BindList("eventTypes") List eventTypes, + @BindList("entityTypes") List entityTypes, @Bind("date") String date); } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventsResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventsResource.java index ed51e8ff47b..f2f0186ce95 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventsResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventsResource.java @@ -26,29 +26,26 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.jdbi3.ChangeEventRepository; import org.openmetadata.catalog.jdbi3.CollectionDAO; -import org.openmetadata.catalog.jdbi3.UsageRepository; import org.openmetadata.catalog.resources.Collection; -import org.openmetadata.catalog.resources.teams.UserResource; import org.openmetadata.catalog.security.CatalogAuthorizer; import org.openmetadata.catalog.type.ChangeEvent; import org.openmetadata.catalog.type.ChangeEvent.EventType; -import org.openmetadata.catalog.type.EntityUsage; import org.openmetadata.catalog.util.RestUtil; import org.openmetadata.catalog.util.ResultList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.validation.Valid; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; -import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.UriInfo; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.GeneralSecurityException; +import java.text.ParseException; import java.util.Date; import java.util.List; import java.util.Objects; @@ -61,6 +58,16 @@ import java.util.Objects; public class EventsResource { private final ChangeEventRepository dao; + public static class ChangeEventList extends ResultList { + @SuppressWarnings("unused") /* Required for tests */ + public ChangeEventList() {} + + public ChangeEventList(List data, String beforeCursor, String afterCursor, int total) + throws GeneralSecurityException, UnsupportedEncodingException { + super(data, beforeCursor, afterCursor, total); + } + } + @Inject public EventsResource(CollectionDAO dao, CatalogAuthorizer authorizer) { Objects.requireNonNull(dao, "ChangeEventRepository must not be null"); @@ -86,11 +93,16 @@ public class EventsResource { @Parameter(description = "Event types", required = true, schema = @Schema(type = "string")) - @PathParam("eventTypes") List eventTypes, + @QueryParam("eventTypes") List eventTypes, @Parameter(description = "Events since this date and time (inclusive) in ISO 8601 format.", required = true, schema = @Schema(type = "string")) - @QueryParam("date") String date) throws IOException { + @QueryParam("date") String date) throws IOException, GeneralSecurityException, ParseException { + // TODO hack + eventTypes = List.of(EventType.ENTITY_CREATED.toString(), EventType.ENTITY_UPDATED.toString()); + entityTypes = List.of(Entity.TABLE); + Date parsedDate = RestUtil.DATE_TIME_FORMAT.parse(date); + date = RestUtil.DATE_FORMAT.format(parsedDate); return dao.list(date, eventTypes, entityTypes); } } \ No newline at end of file diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java index 3637fc2c5a4..24aaa1821a3 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java @@ -22,11 +22,14 @@ import org.openmetadata.catalog.exception.CatalogExceptionMessage; import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceEntityInterface; import org.openmetadata.catalog.jdbi3.MessagingServiceRepository.MessagingServiceEntityInterface; import org.openmetadata.catalog.jdbi3.PipelineServiceRepository.PipelineServiceEntityInterface; +import org.openmetadata.catalog.resources.events.EventsResource.ChangeEventList; import org.openmetadata.catalog.resources.services.DatabaseServiceResourceTest; import org.openmetadata.catalog.resources.services.MessagingServiceResourceTest; import org.openmetadata.catalog.resources.teams.TeamResourceTest; import org.openmetadata.catalog.resources.teams.UserResourceTest; import org.openmetadata.catalog.type.ChangeDescription; +import org.openmetadata.catalog.type.ChangeEvent; +import org.openmetadata.catalog.type.ChangeEvent.EventType; import org.openmetadata.catalog.type.EntityHistory; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.FieldChange; @@ -34,6 +37,7 @@ import org.openmetadata.catalog.type.TagLabel; import org.openmetadata.catalog.util.EntityInterface; import org.openmetadata.catalog.util.EntityUtil; import org.openmetadata.catalog.util.JsonUtils; +import org.openmetadata.catalog.util.RestUtil; import org.openmetadata.catalog.util.ResultList; import org.openmetadata.catalog.util.TestUtils; import org.openmetadata.catalog.util.TestUtils.UpdateType; @@ -50,6 +54,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -623,18 +628,7 @@ public abstract class EntityResourceTest extends CatalogApplicationTest { validateUpdatedEntity(updated, request, authHeaders); validateChangeDescription(updated, updateType, changeDescription); - // GET ../entity/{id}/versions to list the operation - // Get a list of entity versions API and ensure it is correct - EntityHistory history = getVersionList(entityInterface.getId(), authHeaders); - T latestVersion = JsonUtils.readValue((String) history.getVersions().get(0), entityClass); - validateChangeDescription(latestVersion, updateType, changeDescription); - if (updateType == UpdateType.CREATED) { - assertEquals(1, history.getVersions().size()); // PUT used for creating entity, there is only one version - } else if (updateType != NO_CHANGE){ - // Entity changed by PUT. Check the previous version exists - T previousVersion = JsonUtils.readValue((String) history.getVersions().get(1), entityClass); - assertEquals(changeDescription.getPreviousVersion(), getEntityInterface(previousVersion).getVersion()); - } + validateEntityHistory(entityInterface.getId(), updateType, changeDescription, authHeaders); // GET ../entity/{id}/versions/{versionId} to get specific versions of the entity // Get the latest version of the entity from the versions API and ensure it is correct @@ -646,13 +640,40 @@ public abstract class EntityResourceTest extends CatalogApplicationTest { assertEquals(changeDescription.getPreviousVersion(), getEntityInterface(previousVersion).getVersion()); } - // GET the newly updated database and validate + // GET the newly updated entity and validate T getEntity = getEntity(entityInterface.getId(), authHeaders); validateUpdatedEntity(getEntity, request, authHeaders); validateChangeDescription(getEntity, updateType, changeDescription); + + // Finally get changeEvents and ensure it is correct + List entityNames = List.of(entityInterface.getEntityReference().getType()); + List eventTypes = List.of(EventType.ENTITY_CREATED, EventType.ENTITY_UPDATED); + ResultList changeEvents = getChangeEvents(eventTypes, entityNames, entityInterface.getUpdatedAt(), + authHeaders); + System.out.println("data " + changeEvents.getData()); + System.out.println("size " + changeEvents.getData().size()); + changeEvents.getData().forEach(e -> System.out.println(e)); return updated; } + private void validateEntityHistory(UUID id, UpdateType updateType, ChangeDescription expectedChangeDescription, + Map authHeaders) throws IOException { + // GET ../entity/{id}/versions to list the all the versions of an entity + EntityHistory history = getVersionList(id, authHeaders); + T latestVersion = JsonUtils.readValue((String) history.getVersions().get(0), entityClass); + + // Make sure the latest version has changeDescription as received during update + validateChangeDescription(latestVersion, updateType, expectedChangeDescription); + if (updateType == UpdateType.CREATED) { + // PUT used for creating entity, there is only one version + assertEquals(1, history.getVersions().size()); + } else if (updateType != NO_CHANGE) { + // Entity changed by PUT. Check the previous version exists + T previousVersion = JsonUtils.readValue((String) history.getVersions().get(1), entityClass); + assertEquals(expectedChangeDescription.getPreviousVersion(), getEntityInterface(previousVersion).getVersion()); + } + } + protected final T patchEntityAndCheck(T updated, String originalJson, Map authHeaders, UpdateType updateType, ChangeDescription expectedChange) throws IOException { @@ -703,6 +724,15 @@ public abstract class EntityResourceTest extends CatalogApplicationTest { return TestUtils.get(target, EntityHistory.class, authHeaders); } + protected ResultList getChangeEvents(List eventTypes, List entityTypes, Date date, + Map authHeaders) throws HttpResponseException { + WebTarget target = getResource("events"); + target = target.queryParam("eventTypes", eventTypes); + target = target.queryParam("entityTypes", entityTypes); + target = target.queryParam("date", RestUtil.DATE_TIME_FORMAT.format(date)); + return TestUtils.get(target, ChangeEventList.class, authHeaders); + } + protected T getVersion(UUID id, Double version, Map authHeaders) throws HttpResponseException { WebTarget target = getResource(collectionName + "/" + id + "/versions/" + version.toString()); return TestUtils.get(target, entityClass, authHeaders);