mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-03 03:59:12 +00:00
WIP - adding tests to check change events
This commit is contained in:
parent
922db8d785
commit
fe49d144df
@ -393,7 +393,7 @@ CREATE TABLE IF NOT EXISTS tag_usage (
|
||||
CREATE TABLE IF NOT EXISTS change_event (
|
||||
eventType VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.eventType') NOT NULL,
|
||||
entityType VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.entityType') NOT NULL,
|
||||
username VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.userName') NOT NULL,
|
||||
userName VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.userName') NOT NULL,
|
||||
dateTime TIMESTAMP GENERATED ALWAYS AS (TIMESTAMP(STR_TO_DATE(json ->> '$.dateTime', '%Y-%m-%dT%T.%fZ'))) NOT NULL,
|
||||
json JSON NOT NULL,
|
||||
timestamp BIGINT,
|
||||
|
||||
@ -59,33 +59,45 @@ public class ChangeEventHandler implements EventHandler {
|
||||
if (responseCode == Status.CREATED.getStatusCode()) {
|
||||
EntityInterface entityInterface = Entity.getEntityInterface(entity);
|
||||
EntityReference entityReference = Entity.getEntityReference(entity);
|
||||
|
||||
changeEvent.withEventType(EventType.ENTITY_CREATED).withEntityId(entityInterface.getId())
|
||||
.withEntityType(entityReference.getType()).withDateTime(entityInterface.getUpdatedAt())
|
||||
changeEvent = new ChangeEvent()
|
||||
.withEventType(EventType.ENTITY_CREATED)
|
||||
.withEntityId(entityInterface.getId())
|
||||
.withEntityType(entityReference.getType())
|
||||
.withUserName(entityInterface.getUpdatedBy())
|
||||
.withDateTime(entityInterface.getUpdatedAt())
|
||||
.withEntity(JsonUtils.pojoToJson(entity))
|
||||
.withPreviousVersion(entityInterface.getVersion())
|
||||
.withCurrentVersion(entityInterface.getVersion());
|
||||
|
||||
} else if (changeType.equals(RestUtil.ENTITY_UPDATED)) {
|
||||
EntityInterface entityInterface = Entity.getEntityInterface(entity);
|
||||
EntityReference entityReference = Entity.getEntityReference(entity);
|
||||
changeEvent.withEventType(EventType.ENTITY_UPDATED).withEntityId(entityInterface.getId())
|
||||
|
||||
System.out.println(entityInterface.getId());
|
||||
System.out.println(entity);
|
||||
System.out.println(entityReference.getType());
|
||||
changeEvent = new ChangeEvent()
|
||||
.withEventType(EventType.ENTITY_UPDATED)
|
||||
.withEntityId(entityInterface.getId())
|
||||
.withEntityType(entityReference.getType())
|
||||
.withUserName(entityInterface.getUpdatedBy())
|
||||
.withDateTime(entityInterface.getUpdatedAt())
|
||||
.withChangeDescription(entityInterface.getChangeDescription())
|
||||
.withPreviousVersion(entityInterface.getVersion())
|
||||
.withCurrentVersion(entityInterface.getVersion());
|
||||
|
||||
} else if (changeType.equals(RestUtil.ENTITY_FIELDS_CHANGED)){
|
||||
changeEvent = (ChangeEvent) entity;
|
||||
} else if (changeType.equals(RestUtil.ENTITY_DELETED)) {
|
||||
changeEvent = (ChangeEvent) entity;
|
||||
}
|
||||
System.out.println(changeEvent);
|
||||
|
||||
if (changeEvent != null) {
|
||||
System.out.println("Adding change " + changeEvent);
|
||||
dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
|
||||
}
|
||||
} catch(Exception e) {
|
||||
LOG.error("Failed to capture change event for {} and method {} due to {}", method, e.getMessage());
|
||||
LOG.error("Failed to capture change event for method {} due to {}", method, e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
package org.openmetadata.catalog.jdbi3;
|
||||
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.catalog.resources.events.EventsResource.ChangeEventList;
|
||||
import org.openmetadata.catalog.type.ChangeEvent;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import org.openmetadata.catalog.util.ResultList;
|
||||
@ -24,7 +25,9 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
public class ChangeEventRepository {
|
||||
@ -34,13 +37,15 @@ public class ChangeEventRepository {
|
||||
public ChangeEventRepository(CollectionDAO dao) { this.dao = dao; }
|
||||
|
||||
@Transaction
|
||||
public ResultList<ChangeEvent> list(String date, List<String> eventTypes, List<String> entityTypes) throws IOException {
|
||||
List<String> jsons = dao.changeEventDAO().list(eventTypes, entityTypes,date);
|
||||
public ResultList<ChangeEvent> list(String date, List<String> eventTypes, List<String> entityTypes) throws IOException,
|
||||
GeneralSecurityException {
|
||||
List<String> jsons = dao.changeEventDAO().list(eventTypes, entityTypes, date);
|
||||
System.out.println("Total change events " + jsons.size());
|
||||
List<ChangeEvent> changeEvents = new ArrayList<>();
|
||||
for (String json : jsons) {
|
||||
changeEvents.add(JsonUtils.readValue(json, ChangeEvent.class));
|
||||
}
|
||||
return null; // TODO
|
||||
return new ChangeEventList(changeEvents, null, null, changeEvents.size()); // TODO
|
||||
}
|
||||
|
||||
@Transaction
|
||||
|
||||
@ -808,11 +808,13 @@ public interface CollectionDAO {
|
||||
void insert(@Bind("json") String json);
|
||||
|
||||
@SqlQuery("SELECT json FROM change_event WHERE " +
|
||||
"(eventType IN (<eventTypes>) OR eventTypes IS NULL) AND " +
|
||||
"(entityType IN (<entityTypes>) OR entityTypes IS NULL) AND " +
|
||||
"(eventType IN (<eventTypes>) OR eventType IS NULL) AND " +
|
||||
// "(entityType IN (<entityTypes>) OR entityType IS NULL) " +
|
||||
"(entityType IN (<entityTypes>) OR entityType IS NULL) AND " +
|
||||
"dateTime >= :date " +
|
||||
"ORDER BY dateTime")
|
||||
List<String> list(@BindList("eventTypes") List<String> eventTypes, @Bind("entityTypes") List<String> entityTypes,
|
||||
"ORDER BY dateTime DESC")
|
||||
List<String> list(@BindList("eventTypes") List<String> eventTypes,
|
||||
@BindList("entityTypes") List<String> entityTypes,
|
||||
@Bind("date") String date);
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,29 +26,26 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.jdbi3.ChangeEventRepository;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.catalog.jdbi3.UsageRepository;
|
||||
import org.openmetadata.catalog.resources.Collection;
|
||||
import org.openmetadata.catalog.resources.teams.UserResource;
|
||||
import org.openmetadata.catalog.security.CatalogAuthorizer;
|
||||
import org.openmetadata.catalog.type.ChangeEvent;
|
||||
import org.openmetadata.catalog.type.ChangeEvent.EventType;
|
||||
import org.openmetadata.catalog.type.EntityUsage;
|
||||
import org.openmetadata.catalog.util.RestUtil;
|
||||
import org.openmetadata.catalog.util.ResultList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.validation.Valid;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.text.ParseException;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
@ -61,6 +58,16 @@ import java.util.Objects;
|
||||
public class EventsResource {
|
||||
private final ChangeEventRepository dao;
|
||||
|
||||
public static class ChangeEventList extends ResultList<ChangeEvent> {
|
||||
@SuppressWarnings("unused") /* Required for tests */
|
||||
public ChangeEventList() {}
|
||||
|
||||
public ChangeEventList(List<ChangeEvent> data, String beforeCursor, String afterCursor, int total)
|
||||
throws GeneralSecurityException, UnsupportedEncodingException {
|
||||
super(data, beforeCursor, afterCursor, total);
|
||||
}
|
||||
}
|
||||
|
||||
@Inject
|
||||
public EventsResource(CollectionDAO dao, CatalogAuthorizer authorizer) {
|
||||
Objects.requireNonNull(dao, "ChangeEventRepository must not be null");
|
||||
@ -86,11 +93,16 @@ public class EventsResource {
|
||||
@Parameter(description = "Event types",
|
||||
required = true,
|
||||
schema = @Schema(type = "string"))
|
||||
@PathParam("eventTypes") List<String> eventTypes,
|
||||
@QueryParam("eventTypes") List<String> eventTypes,
|
||||
@Parameter(description = "Events since this date and time (inclusive) in ISO 8601 format.",
|
||||
required = true,
|
||||
schema = @Schema(type = "string"))
|
||||
@QueryParam("date") String date) throws IOException {
|
||||
@QueryParam("date") String date) throws IOException, GeneralSecurityException, ParseException {
|
||||
// TODO hack
|
||||
eventTypes = List.of(EventType.ENTITY_CREATED.toString(), EventType.ENTITY_UPDATED.toString());
|
||||
entityTypes = List.of(Entity.TABLE);
|
||||
Date parsedDate = RestUtil.DATE_TIME_FORMAT.parse(date);
|
||||
date = RestUtil.DATE_FORMAT.format(parsedDate);
|
||||
return dao.list(date, eventTypes, entityTypes);
|
||||
}
|
||||
}
|
||||
@ -22,11 +22,14 @@ import org.openmetadata.catalog.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceEntityInterface;
|
||||
import org.openmetadata.catalog.jdbi3.MessagingServiceRepository.MessagingServiceEntityInterface;
|
||||
import org.openmetadata.catalog.jdbi3.PipelineServiceRepository.PipelineServiceEntityInterface;
|
||||
import org.openmetadata.catalog.resources.events.EventsResource.ChangeEventList;
|
||||
import org.openmetadata.catalog.resources.services.DatabaseServiceResourceTest;
|
||||
import org.openmetadata.catalog.resources.services.MessagingServiceResourceTest;
|
||||
import org.openmetadata.catalog.resources.teams.TeamResourceTest;
|
||||
import org.openmetadata.catalog.resources.teams.UserResourceTest;
|
||||
import org.openmetadata.catalog.type.ChangeDescription;
|
||||
import org.openmetadata.catalog.type.ChangeEvent;
|
||||
import org.openmetadata.catalog.type.ChangeEvent.EventType;
|
||||
import org.openmetadata.catalog.type.EntityHistory;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.type.FieldChange;
|
||||
@ -34,6 +37,7 @@ import org.openmetadata.catalog.type.TagLabel;
|
||||
import org.openmetadata.catalog.util.EntityInterface;
|
||||
import org.openmetadata.catalog.util.EntityUtil;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import org.openmetadata.catalog.util.RestUtil;
|
||||
import org.openmetadata.catalog.util.ResultList;
|
||||
import org.openmetadata.catalog.util.TestUtils;
|
||||
import org.openmetadata.catalog.util.TestUtils.UpdateType;
|
||||
@ -50,6 +54,7 @@ import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
@ -623,18 +628,7 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
|
||||
validateUpdatedEntity(updated, request, authHeaders);
|
||||
validateChangeDescription(updated, updateType, changeDescription);
|
||||
|
||||
// GET ../entity/{id}/versions to list the operation
|
||||
// Get a list of entity versions API and ensure it is correct
|
||||
EntityHistory history = getVersionList(entityInterface.getId(), authHeaders);
|
||||
T latestVersion = JsonUtils.readValue((String) history.getVersions().get(0), entityClass);
|
||||
validateChangeDescription(latestVersion, updateType, changeDescription);
|
||||
if (updateType == UpdateType.CREATED) {
|
||||
assertEquals(1, history.getVersions().size()); // PUT used for creating entity, there is only one version
|
||||
} else if (updateType != NO_CHANGE){
|
||||
// Entity changed by PUT. Check the previous version exists
|
||||
T previousVersion = JsonUtils.readValue((String) history.getVersions().get(1), entityClass);
|
||||
assertEquals(changeDescription.getPreviousVersion(), getEntityInterface(previousVersion).getVersion());
|
||||
}
|
||||
validateEntityHistory(entityInterface.getId(), updateType, changeDescription, authHeaders);
|
||||
|
||||
// GET ../entity/{id}/versions/{versionId} to get specific versions of the entity
|
||||
// Get the latest version of the entity from the versions API and ensure it is correct
|
||||
@ -646,13 +640,40 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
|
||||
assertEquals(changeDescription.getPreviousVersion(), getEntityInterface(previousVersion).getVersion());
|
||||
}
|
||||
|
||||
// GET the newly updated database and validate
|
||||
// GET the newly updated entity and validate
|
||||
T getEntity = getEntity(entityInterface.getId(), authHeaders);
|
||||
validateUpdatedEntity(getEntity, request, authHeaders);
|
||||
validateChangeDescription(getEntity, updateType, changeDescription);
|
||||
|
||||
// Finally get changeEvents and ensure it is correct
|
||||
List<String> entityNames = List.of(entityInterface.getEntityReference().getType());
|
||||
List<EventType> eventTypes = List.of(EventType.ENTITY_CREATED, EventType.ENTITY_UPDATED);
|
||||
ResultList<ChangeEvent> changeEvents = getChangeEvents(eventTypes, entityNames, entityInterface.getUpdatedAt(),
|
||||
authHeaders);
|
||||
System.out.println("data " + changeEvents.getData());
|
||||
System.out.println("size " + changeEvents.getData().size());
|
||||
changeEvents.getData().forEach(e -> System.out.println(e));
|
||||
return updated;
|
||||
}
|
||||
|
||||
private void validateEntityHistory(UUID id, UpdateType updateType, ChangeDescription expectedChangeDescription,
|
||||
Map<String, String> authHeaders) throws IOException {
|
||||
// GET ../entity/{id}/versions to list the all the versions of an entity
|
||||
EntityHistory history = getVersionList(id, authHeaders);
|
||||
T latestVersion = JsonUtils.readValue((String) history.getVersions().get(0), entityClass);
|
||||
|
||||
// Make sure the latest version has changeDescription as received during update
|
||||
validateChangeDescription(latestVersion, updateType, expectedChangeDescription);
|
||||
if (updateType == UpdateType.CREATED) {
|
||||
// PUT used for creating entity, there is only one version
|
||||
assertEquals(1, history.getVersions().size());
|
||||
} else if (updateType != NO_CHANGE) {
|
||||
// Entity changed by PUT. Check the previous version exists
|
||||
T previousVersion = JsonUtils.readValue((String) history.getVersions().get(1), entityClass);
|
||||
assertEquals(expectedChangeDescription.getPreviousVersion(), getEntityInterface(previousVersion).getVersion());
|
||||
}
|
||||
}
|
||||
|
||||
protected final T patchEntityAndCheck(T updated, String originalJson, Map<String, String> authHeaders,
|
||||
UpdateType updateType, ChangeDescription expectedChange)
|
||||
throws IOException {
|
||||
@ -703,6 +724,15 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
|
||||
return TestUtils.get(target, EntityHistory.class, authHeaders);
|
||||
}
|
||||
|
||||
protected ResultList<ChangeEvent> getChangeEvents(List<EventType> eventTypes, List<String> entityTypes, Date date,
|
||||
Map<String, String> authHeaders) throws HttpResponseException {
|
||||
WebTarget target = getResource("events");
|
||||
target = target.queryParam("eventTypes", eventTypes);
|
||||
target = target.queryParam("entityTypes", entityTypes);
|
||||
target = target.queryParam("date", RestUtil.DATE_TIME_FORMAT.format(date));
|
||||
return TestUtils.get(target, ChangeEventList.class, authHeaders);
|
||||
}
|
||||
|
||||
protected T getVersion(UUID id, Double version, Map<String, String> authHeaders) throws HttpResponseException {
|
||||
WebTarget target = getResource(collectionName + "/" + id + "/versions/" + version.toString());
|
||||
return TestUtils.get(target, entityClass, authHeaders);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user