Issue2194 - Change dateTime field to unix epoch timestamp milliseconds and add support for webhook secret key (#2195)

* Add support for webhook secret key

* Fixes #2194 - Change dateTime field to unix epoch timestamp milliseconds and add support for webhook secret key

* Fixes #2194 - Added support for upgrading existing metadata to move dateTime attributes to timestamp
This commit is contained in:
Suresh Srinivas 2022-01-13 12:08:42 -08:00 committed by GitHub
parent e0423ac16a
commit 5a8e2ee31f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
94 changed files with 464 additions and 311 deletions

1
.idea/compiler.xml generated
View File

@ -12,5 +12,6 @@
<module name="openmetadata-ui" />
</profile>
</annotationProcessing>
<bytecodeTargetLevel target="11" />
</component>
</project>

2
.idea/misc.xml generated
View File

@ -8,5 +8,5 @@
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_16" project-jdk-name="17" project-jdk-type="JavaSDK" />
<component name="ProjectRootManager" version="2" languageLevel="JDK_16" project-jdk-name="11" project-jdk-type="JavaSDK" />
</project>

View File

@ -8,97 +8,259 @@ CREATE TABLE IF NOT EXISTS webhook_entity (
-- No versioning, updatedAt, updatedBy, or changeDescription fields for webhook
);
--
-- Change timestamp column precision to include microseconds
--
UPDATE change_event
SET json = JSON_SET(json, '$.timestamp', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.dateTime', '%Y-%m-%dT%T.%fZ')));
UPDATE change_event
SET json = JSON_REMOVE(json, '$.dateTime');
ALTER TABLE change_event
DROP INDEX dateTime,
DROP COLUMN dateTime,
ADD COLUMN dateTime TIMESTAMP(6) GENERATED ALWAYS AS (STR_TO_DATE(json ->> '$.dateTime', '%Y-%m-%dT%T.%fZ')) NOT NULL
AFTER username,
ADD INDEX (dateTime);
ADD COLUMN eventTime BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.timestamp') NOT NULL AFTER username,
ADD INDEX (eventTime);
--
-- Update to add deleted fields to data entities
-- Update to add deleted fields to data entities and change updatedAt field to unix epoch time milliseconds
--
UPDATE dbservice_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE dbservice_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE messaging_service_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE messaging_service_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE dashboard_service_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE dashboard_service_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE pipeline_service_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE pipeline_service_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE storage_service_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE storage_service_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE database_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE database_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE table_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE table_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE table_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE metric_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE report_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE report_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE dashboard_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE dashboard_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE ml_model_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE ml_model_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE pipeline_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE pipeline_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE topic_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE topic_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE chart_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE chart_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE location_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE location_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE bot_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE bot_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE policy_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE policy_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE ingestion_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE ingestion_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE team_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE team_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE role_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE role_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE tag_category
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE tag_category
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
UPDATE tag
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE tag
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
@ -116,9 +278,14 @@ SET relation = 10 WHERE fromEntity = 'dashboard' AND toEntity = 'chart' AND rela
-- Remove user.deactivated field and use deleted instead
UPDATE user_entity
SET json = JSON_REMOVE(user_entity.json, '$.deactivated');
SET json = JSON_REMOVE(user_entity.json, '$.deactivated'),
json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
ALTER TABLE user_entity
DROP COLUMN updatedAt,
DROP INDEX updatedAt,
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
ADD INDEX(updatedAt),
DROP COLUMN deactivated,
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);

View File

@ -50,8 +50,8 @@ import org.slf4j.LoggerFactory;
public class ElasticSearchEventPublisher extends AbstractEventPublisher {
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchEventPublisher.class);
private RestHighLevelClient client;
private ElasticSearchIndexDefinition esIndexDefinition;
private final RestHighLevelClient client;
private final ElasticSearchIndexDefinition esIndexDefinition;
public ElasticSearchEventPublisher(ElasticSearchConfiguration esConfig) {
super(esConfig.getBatchSize(), new ArrayList<>());
@ -125,7 +125,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
Map<String, Object> fieldAddParams = new HashMap<>();
ESChangeDescription esChangeDescription =
ESChangeDescription.builder()
.updatedAt(event.getDateTime().getTime())
.updatedAt(event.getTimestamp())
.updatedBy(event.getUserName())
.fieldsAdded(changeDescription.getFieldsAdded())
.fieldsUpdated(changeDescription.getFieldsUpdated())
@ -133,7 +133,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
.build();
Map<String, Object> esChangeDescriptionDoc = JsonUtils.getMap(esChangeDescription);
fieldAddParams.put("change_description", esChangeDescriptionDoc);
fieldAddParams.put("last_updated_timestamp", event.getDateTime().getTime());
fieldAddParams.put("last_updated_timestamp", event.getTimestamp());
scriptTxt.append("ctx._source.change_descriptions.add(params.change_description); ");
scriptTxt.append("ctx._source.last_updated_timestamp=params.last_updated_timestamp;");
for (FieldChange fieldChange : fieldsAdded) {

View File

@ -350,7 +350,7 @@ class TableESIndex extends ElasticSearchIndex {
}
}
ParseTags parseTags = new ParseTags(tags);
Long updatedTimestamp = table.getUpdatedAt().getTime();
Long updatedTimestamp = table.getUpdatedAt();
TableESIndexBuilder tableESIndexBuilder =
internalBuilder()
.tableId(tableId)
@ -472,7 +472,7 @@ class TopicESIndex extends ElasticSearchIndex {
topic.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
ParseTags parseTags = new ParseTags(tags);
Long updatedTimestamp = topic.getUpdatedAt().getTime();
Long updatedTimestamp = topic.getUpdatedAt();
String description = topic.getDescription() != null ? topic.getDescription() : "";
String displayName = topic.getDisplayName() != null ? topic.getDisplayName() : "";
TopicESIndexBuilder topicESIndexBuilder =
@ -562,7 +562,7 @@ class DashboardESIndex extends ElasticSearchIndex {
List<ElasticSearchSuggest> suggest = new ArrayList<>();
suggest.add(ElasticSearchSuggest.builder().input(dashboard.getFullyQualifiedName()).weight(5).build());
suggest.add(ElasticSearchSuggest.builder().input(dashboard.getDisplayName()).weight(10).build());
Long updatedTimestamp = dashboard.getUpdatedAt().getTime();
Long updatedTimestamp = dashboard.getUpdatedAt();
if (dashboard.getTags() != null) {
dashboard.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
@ -659,7 +659,7 @@ class PipelineESIndex extends ElasticSearchIndex {
taskNames.add(task.getDisplayName());
taskDescriptions.add(task.getDescription());
}
Long updatedTimestamp = pipeline.getUpdatedAt().getTime();
Long updatedTimestamp = pipeline.getUpdatedAt();
ParseTags parseTags = new ParseTags(tags);
String description = pipeline.getDescription() != null ? pipeline.getDescription() : "";
String displayName = pipeline.getDisplayName() != null ? pipeline.getDisplayName() : "";

View File

@ -13,7 +13,6 @@
package org.openmetadata.catalog.events;
import java.util.Date;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import org.jdbi.v3.core.Jdbi;
@ -37,14 +36,12 @@ public class AuditEventHandler implements EventHandler {
if (responseContext.getEntity() != null) {
String path = requestContext.getUriInfo().getPath();
String username = requestContext.getSecurityContext().getUserPrincipal().getName();
Date nowAsISO = new Date();
try {
EntityReference entityReference = Entity.getEntityReference(responseContext.getEntity());
AuditLog auditLog =
new AuditLog()
.withPath(path)
.withDateTime(nowAsISO)
.withTimestamp(System.currentTimeMillis())
.withEntityId(entityReference.getId())
.withEntityType(entityReference.getType())
.withMethod(AuditLog.Method.fromValue(method))

View File

@ -44,7 +44,7 @@ public class ChangeEventHandler implements EventHandler {
if (changeEvent != null) {
LOG.info(
"Recording change event {}:{}:{}:{}",
changeEvent.getDateTime().getTime(),
changeEvent.getTimestamp(),
changeEvent.getEntityId(),
changeEvent.getEventType(),
changeEvent.getEntityType());
@ -122,7 +122,7 @@ public class ChangeEventHandler implements EventHandler {
.withEntityId(entityInterface.getId())
.withEntityType(entityType)
.withUserName(entityInterface.getUpdatedBy())
.withDateTime(entityInterface.getUpdatedAt())
.withTimestamp(entityInterface.getUpdatedAt())
.withChangeDescription(entityInterface.getChangeDescription())
.withCurrentVersion(entityInterface.getVersion());
}
@ -133,7 +133,7 @@ public class ChangeEventHandler implements EventHandler {
.withEntityId(changeEvent.getEntityId())
.withEntityType(changeEvent.getEntityType())
.withUserName(changeEvent.getUserName())
.withDateTime(changeEvent.getDateTime())
.withTimestamp(changeEvent.getTimestamp())
.withChangeDescription(changeEvent.getChangeDescription())
.withCurrentVersion(changeEvent.getCurrentVersion());
}

View File

@ -15,7 +15,6 @@ package org.openmetadata.catalog.jdbi3;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.UUID;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.Bots;
@ -113,7 +112,7 @@ public class BotsRepository extends EntityRepository<Bots> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -163,7 +162,7 @@ public class BotsRepository extends EntityRepository<Bots> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -20,13 +20,11 @@ import static org.openmetadata.catalog.type.EventType.ENTITY_UPDATED;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.ResultList;
public class ChangeEventRepository {
@ -38,13 +36,12 @@ public class ChangeEventRepository {
@Transaction
public ResultList<ChangeEvent> list(
Date date, List<String> entityCreatedList, List<String> entityUpdatedList, List<String> entityDeletedList)
long timestamp, List<String> entityCreatedList, List<String> entityUpdatedList, List<String> entityDeletedList)
throws IOException, GeneralSecurityException {
List<String> jsons = new ArrayList<>();
String dateParam = RestUtil.DATE_TIME_FORMAT.format(date);
jsons.addAll(dao.changeEventDAO().list(ENTITY_CREATED.value(), entityCreatedList, dateParam));
jsons.addAll(dao.changeEventDAO().list(ENTITY_UPDATED.value(), entityUpdatedList, dateParam));
jsons.addAll(dao.changeEventDAO().list(ENTITY_DELETED.value(), entityDeletedList, dateParam));
jsons.addAll(dao.changeEventDAO().list(ENTITY_CREATED.value(), entityCreatedList, timestamp));
jsons.addAll(dao.changeEventDAO().list(ENTITY_UPDATED.value(), entityUpdatedList, timestamp));
jsons.addAll(dao.changeEventDAO().list(ENTITY_DELETED.value(), entityDeletedList, timestamp));
List<ChangeEvent> changeEvents = new ArrayList<>();
for (String json : jsons) {

View File

@ -16,7 +16,6 @@ package org.openmetadata.catalog.jdbi3;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.openmetadata.catalog.Entity;
@ -197,7 +196,7 @@ public class ChartRepository extends EntityRepository<Chart> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -247,7 +246,7 @@ public class ChartRepository extends EntityRepository<Chart> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -1203,29 +1203,29 @@ public interface CollectionDAO {
@SqlUpdate("INSERT INTO change_event (json) VALUES (:json)")
void insert(@Bind("json") String json);
default List<String> list(String eventType, List<String> entityTypes, String dateTime) {
default List<String> list(String eventType, List<String> entityTypes, long timestamp) {
if (entityTypes == null) {
return Collections.emptyList();
}
if (entityTypes.get(0).equals("*")) {
return listWithoutEntityFilter(eventType, dateTime);
return listWithoutEntityFilter(eventType, timestamp);
}
return listWithEntityFilter(eventType, entityTypes, dateTime);
return listWithEntityFilter(eventType, entityTypes, timestamp);
}
@SqlQuery(
"SELECT json FROM change_event WHERE "
+ "eventType = :eventType AND (entityType IN (<entityTypes>)) AND dateTime >= :dateTime "
+ "ORDER BY dateTime ASC")
+ "eventType = :eventType AND (entityType IN (<entityTypes>)) AND eventTime >= :timestamp "
+ "ORDER BY eventTime ASC")
List<String> listWithEntityFilter(
@Bind("eventType") String eventType,
@BindList("entityTypes") List<String> entityTypes,
@Bind("dateTime") String dateTime);
@Bind("timestamp") long timestamp);
@SqlQuery(
"SELECT json FROM change_event WHERE "
+ "eventType = :eventType AND dateTime >= :dateTime "
+ "ORDER BY dateTime ASC")
List<String> listWithoutEntityFilter(@Bind("eventType") String eventType, @Bind("dateTime") String dateTime);
+ "eventType = :eventType AND eventTime >= :timestamp "
+ "ORDER BY eventTime ASC")
List<String> listWithoutEntityFilter(@Bind("eventType") String eventType, @Bind("timestamp") long timestamp);
}
}

View File

@ -20,7 +20,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -301,7 +300,7 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -352,7 +351,7 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -16,7 +16,6 @@ package org.openmetadata.catalog.jdbi3;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.UUID;
import javax.ws.rs.core.UriInfo;
import org.openmetadata.catalog.Entity;
@ -145,7 +144,7 @@ public class DashboardServiceRepository extends EntityRepository<DashboardServic
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -190,7 +189,7 @@ public class DashboardServiceRepository extends EntityRepository<DashboardServic
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -19,7 +19,6 @@ import static org.openmetadata.catalog.util.EntityUtil.toBoolean;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import javax.ws.rs.core.Response.Status;
@ -267,7 +266,7 @@ public class DatabaseRepository extends EntityRepository<Database> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -312,7 +311,7 @@ public class DatabaseRepository extends EntityRepository<Database> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -16,7 +16,6 @@ package org.openmetadata.catalog.jdbi3;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.UUID;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.DatabaseService;
@ -124,7 +123,7 @@ public class DatabaseServiceRepository extends EntityRepository<DatabaseService>
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -169,7 +168,7 @@ public class DatabaseServiceRepository extends EntityRepository<DatabaseService>
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -26,7 +26,6 @@ import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -355,7 +354,7 @@ public abstract class EntityRepository<T> {
// Apply JSON patch to the original entity to get the updated entity
T updated = JsonUtils.applyPatch(original, patch, entityClass);
EntityInterface<T> updatedEntity = getEntityInterface(updated);
updatedEntity.setUpdateDetails(user, new Date());
updatedEntity.setUpdateDetails(user, System.currentTimeMillis());
prepare(updated);
restorePatchAttributes(original, updated);
@ -398,7 +397,7 @@ public abstract class EntityRepository<T> {
.withEntityId(entityId)
.withEntityFullyQualifiedName(entityInterface.getFullyQualifiedName())
.withUserName(updatedBy)
.withDateTime(new Date())
.withTimestamp(System.currentTimeMillis())
.withCurrentVersion(entityInterface.getVersion())
.withPreviousVersion(change.getPreviousVersion());
@ -463,7 +462,7 @@ public abstract class EntityRepository<T> {
.withEntityType(entityName)
.withEntityId(entityId)
.withUserName(updatedBy)
.withDateTime(new Date())
.withTimestamp(System.currentTimeMillis())
.withCurrentVersion(entityInterface.getVersion())
.withPreviousVersion(change.getPreviousVersion());

View File

@ -15,7 +15,6 @@ package org.openmetadata.catalog.jdbi3;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -194,7 +193,7 @@ public class IngestionRepository extends EntityRepository<Ingestion> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -244,7 +243,7 @@ public class IngestionRepository extends EntityRepository<Ingestion> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -17,7 +17,6 @@ import java.io.IOException;
import java.net.URI;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -305,7 +304,7 @@ public class LocationRepository extends EntityRepository<Location> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -360,7 +359,7 @@ public class LocationRepository extends EntityRepository<Location> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -17,7 +17,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.openmetadata.catalog.Entity;
@ -124,7 +123,7 @@ public class MessagingServiceRepository extends EntityRepository<MessagingServic
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -169,7 +168,7 @@ public class MessagingServiceRepository extends EntityRepository<MessagingServic
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -15,7 +15,6 @@ package org.openmetadata.catalog.jdbi3;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -175,7 +174,7 @@ public class MetricsRepository extends EntityRepository<Metrics> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -225,7 +224,7 @@ public class MetricsRepository extends EntityRepository<Metrics> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.jdbi.v3.sqlobject.transaction.Transaction;
@ -294,7 +293,7 @@ public class MlModelRepository extends EntityRepository<MlModel> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -349,7 +348,7 @@ public class MlModelRepository extends EntityRepository<MlModel> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -18,7 +18,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -225,7 +224,7 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -275,7 +274,7 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -15,7 +15,6 @@ package org.openmetadata.catalog.jdbi3;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.UUID;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.PipelineService;
@ -124,7 +123,7 @@ public class PipelineServiceRepository extends EntityRepository<PipelineService>
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -169,7 +168,7 @@ public class PipelineServiceRepository extends EntityRepository<PipelineService>
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -18,7 +18,6 @@ import static org.openmetadata.catalog.util.EntityUtil.toBoolean;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
@ -287,7 +286,7 @@ public class PolicyRepository extends EntityRepository<Policy> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -327,7 +326,7 @@ public class PolicyRepository extends EntityRepository<Policy> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -15,7 +15,6 @@ package org.openmetadata.catalog.jdbi3;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.openmetadata.catalog.Entity;
@ -159,7 +158,7 @@ public class ReportRepository extends EntityRepository<Report> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -209,7 +208,7 @@ public class ReportRepository extends EntityRepository<Report> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -15,7 +15,6 @@ package org.openmetadata.catalog.jdbi3;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.UUID;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.teams.Role;
@ -126,7 +125,7 @@ public class RoleRepository extends EntityRepository<Role> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -167,7 +166,7 @@ public class RoleRepository extends EntityRepository<Role> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -17,7 +17,6 @@ import static org.openmetadata.catalog.util.EntityUtil.Fields;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.UUID;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.StorageService;
@ -114,7 +113,7 @@ public class StorageServiceRepository extends EntityRepository<StorageService> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -159,7 +158,7 @@ public class StorageServiceRepository extends EntityRepository<StorageService> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -757,7 +757,7 @@ public class TableRepository extends EntityRepository<Table> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -812,7 +812,7 @@ public class TableRepository extends EntityRepository<Table> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -121,7 +120,6 @@ public class TeamRepository extends EntityRepository<Team> {
daoCollection
.relationshipDAO()
.insert(team.getId().toString(), user.getId().toString(), "team", "user", Relationship.HAS.ordinal());
System.out.println("Team " + team.getName() + " has user " + user.getName());
}
}
@ -191,7 +189,7 @@ public class TeamRepository extends EntityRepository<Team> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -232,7 +230,7 @@ public class TeamRepository extends EntityRepository<Team> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -17,7 +17,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.jdbi.v3.sqlobject.transaction.Transaction;
@ -203,7 +202,7 @@ public class TopicRepository extends EntityRepository<Topic> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -253,7 +252,7 @@ public class TopicRepository extends EntityRepository<Topic> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -249,7 +248,7 @@ public class UserRepository extends EntityRepository<User> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -290,7 +289,7 @@ public class UserRepository extends EntityRepository<User> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}

View File

@ -23,7 +23,6 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -34,7 +33,6 @@ import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.catalog.Entity;
@ -52,6 +50,9 @@ import org.openmetadata.catalog.type.Webhook;
import org.openmetadata.catalog.type.Webhook.Status;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.common.utils.CommonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -209,7 +210,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
}
@Override
public Date getUpdatedAt() {
public long getUpdatedAt() {
return entity.getUpdatedAt();
}
@ -254,7 +255,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
public void setUpdateDetails(String updatedBy, long updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}
@ -307,7 +308,6 @@ public class WebhookRepository extends EntityRepository<Webhook> {
private final List<ChangeEvent> batch = new ArrayList<>();
private BatchEventProcessor<ChangeEventHolder> processor;
private Client client;
private Builder target;
private final ConcurrentHashMap<EventType, List<String>> filter = new ConcurrentHashMap<>();
public WebhookPublisher(Webhook webhook) {
@ -319,10 +319,6 @@ public class WebhookRepository extends EntityRepository<Webhook> {
public void onStart() {
createClient();
webhook.withFailureDetails(new FailureDetails());
// TODO clean this up
Map<String, String> authHeaders = SecurityUtil.authHeaders("admin@open-metadata.org");
target = SecurityUtil.addHeaders(client.target(webhook.getEndpoint()), authHeaders);
LOG.info("Webhook-lifecycle-onStart {}", webhook.getName());
}
@ -344,7 +340,14 @@ public class WebhookRepository extends EntityRepository<Webhook> {
ChangeEventList list = new ChangeEventList(batch, null, null, batch.size());
long attemptTime = System.currentTimeMillis();
try {
Response response = target.post(javax.ws.rs.client.Entity.entity(list, MediaType.APPLICATION_JSON));
String json = JsonUtils.pojoToJson(list);
Response response;
if (webhook.getSecretKey() != null) {
String hmac = "sha256=" + CommonUtil.calculateHMAC(webhook.getSecretKey(), json);
response = getTarget().header(RestUtil.SIGNATURE_HEADER, hmac).post(javax.ws.rs.client.Entity.json(json));
} else {
response = getTarget().post(javax.ws.rs.client.Entity.json(json));
}
LOG.info(
"Webhook {}:{}:{} received response {}",
webhook.getName(),
@ -354,7 +357,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
// 2xx response means call back is successful
if (response.getStatus() >= 200 && response.getStatus() < 300) { // All 2xx responses
batch.clear();
webhook.getFailureDetails().setLastSuccessfulAt(changeEventHolder.get().getDateTime().getTime());
webhook.getFailureDetails().setLastSuccessfulAt(changeEventHolder.get().getTimestamp());
if (webhook.getStatus() != Status.STARTED) {
setStatus(Status.STARTED, null, null, null, null);
}
@ -413,11 +416,11 @@ public class WebhookRepository extends EntityRepository<Webhook> {
private void setAwaitingRetry(Long attemptTime, int statusCode, String reason) throws IOException {
if (!attemptTime.equals(webhook.getFailureDetails().getLastFailedAt())) {
setStatus(Status.AWAITING_RETRY, attemptTime, statusCode, reason, new Date(attemptTime + currentBackoffTime));
setStatus(Status.AWAITING_RETRY, attemptTime, statusCode, reason, attemptTime + currentBackoffTime);
}
}
private void setStatus(Status status, Long attemptTime, Integer statusCode, String reason, Date date)
private void setStatus(Status status, Long attemptTime, Integer statusCode, String reason, Long timestamp)
throws IOException {
Webhook stored = daoCollection.webhookDAO().findEntityById(webhook.getId());
webhook.setStatus(status);
@ -426,7 +429,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
.withLastFailedAt(attemptTime)
.withLastFailedStatusCode(statusCode)
.withLastFailedReason(reason)
.withNextAttempt(date);
.withNextAttempt(timestamp);
WebhookUpdater updater = new WebhookUpdater(stored, webhook, false);
updater.update();
}
@ -468,6 +471,11 @@ public class WebhookRepository extends EntityRepository<Webhook> {
currentBackoffTime = BACKOFF_24_HOUR;
}
}
private Builder getTarget() {
Map<String, String> authHeaders = SecurityUtil.authHeaders("admin@open-metadata.org");
return SecurityUtil.addHeaders(client.target(webhook.getEndpoint()), authHeaders);
}
}
public class WebhookUpdater extends EntityUpdater {

View File

@ -22,7 +22,6 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import javax.validation.constraints.Max;
@ -138,7 +137,9 @@ public class BotsResource {
public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext, Bots bot)
throws IOException {
SecurityUtil.checkAdminRole(authorizer, securityContext);
bot.withId(UUID.randomUUID()).withUpdatedBy(securityContext.getUserPrincipal().getName()).withUpdatedAt(new Date());
bot.withId(UUID.randomUUID())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(System.currentTimeMillis());
dao.create(uriInfo, bot);
return Response.created(bot.getHref()).entity(bot).build();
}

View File

@ -28,7 +28,6 @@ import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -425,6 +424,6 @@ public class ChartResource {
.withTags(create.getTags())
.withOwner(create.getOwner())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -28,7 +28,6 @@ import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -430,6 +429,6 @@ public class DashboardResource {
.withTags(create.getTags())
.withOwner(create.getOwner())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -28,7 +28,6 @@ import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -417,6 +416,6 @@ public class DatabaseResource {
.withService(create.getService())
.withOwner(create.getOwner())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -27,7 +27,6 @@ import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import javax.json.JsonPatch;
@ -554,7 +553,7 @@ public class TableResource {
.withViewDefinition(create.getViewDefinition())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withOwner(create.getOwner())
.withUpdatedAt(new Date())
.withUpdatedAt(System.currentTimeMillis())
.withDatabase(new EntityReference().withId(create.getDatabase()));
}
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import javax.validation.Valid;
@ -41,7 +40,6 @@ import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.security.Authorizer;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.ResultList;
@Path("/v1/events")
@ -110,16 +108,15 @@ public class EventResource {
@QueryParam("entityDeleted")
String entityDeleted,
@Parameter(
description = "Events starting from this date time in ISO8601 format",
description = "Events starting from this unix timestamp in milliseconds",
required = true,
schema = @Schema(type = "string", example = "2021-01-28T10:00:00.000000Z"))
@QueryParam("date")
String date)
schema = @Schema(type = "long", example = "1426349294842"))
@QueryParam("timestamp")
long timestamp)
throws IOException, GeneralSecurityException, ParseException {
Date parsedDate = RestUtil.DATE_TIME_FORMAT.parse(date);
List<String> entityCreatedList = EntityList.getEntityList("entityCreated", entityCreated);
List<String> entityUpdatedList = EntityList.getEntityList("entityUpdated", entityUpdated);
List<String> entityDeletedList = EntityList.getEntityList("entityDeleted", entityDeleted);
return dao.list(parsedDate, entityCreatedList, entityUpdatedList, entityDeletedList);
return dao.list(timestamp, entityCreatedList, entityUpdatedList, entityDeletedList);
}
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -297,6 +296,7 @@ public class WebhookResource {
.withTimeout(create.getTimeout())
.withEnabled(create.getEnabled())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis())
.withSecretKey(create.getSecretKey());
}
}

View File

@ -20,7 +20,6 @@ import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -132,7 +131,8 @@ public class FeedResource {
@ApiResponse(responseCode = "400", description = "Bad request")
})
public Response create(@Context UriInfo uriInfo, @Valid CreateThread cr) throws IOException {
Thread thread = new Thread().withId(UUID.randomUUID()).withThreadTs(new Date()).withAbout(cr.getAbout());
Thread thread =
new Thread().withId(UUID.randomUUID()).withThreadTs(System.currentTimeMillis()).withAbout(cr.getAbout());
// For now redundantly storing everything in json (that includes fromEntity, addressedTo entity)
// TODO - This needs cleanup later if this information is too much or inconsistent in relationship table
FeedUtil.addPost(thread, new Post().withMessage(cr.getMessage()).withFrom(cr.getFrom()));

View File

@ -14,7 +14,6 @@
package org.openmetadata.catalog.resources.feeds;
import java.util.Collections;
import java.util.Date;
import org.openmetadata.catalog.entity.feed.Thread;
import org.openmetadata.catalog.type.Post;
@ -29,7 +28,7 @@ public final class FeedUtil {
thread.setPosts(Collections.singletonList(post));
} else {
// Add new post to the thread
post.setPostTs(new Date());
post.setPostTs(System.currentTimeMillis());
thread.getPosts().add(post);
}
}

View File

@ -27,7 +27,6 @@ import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -483,6 +482,6 @@ public class LocationResource {
.withTags(create.getTags())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withOwner(create.getOwner())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -182,6 +181,6 @@ public class MetricsResource {
metrics
.withId(UUID.randomUUID())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -27,7 +27,6 @@ import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -425,6 +424,6 @@ public class MlModelResource {
.withTags(create.getTags())
.withOwner(create.getOwner())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -28,7 +28,6 @@ import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -444,7 +443,7 @@ public class IngestionResource {
.withOwner(create.getOwner())
.withService(create.getService())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
private void deploy(Ingestion ingestion) {

View File

@ -28,7 +28,6 @@ import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -432,6 +431,6 @@ public class PipelineResource {
.withPipelineLocation(create.getPipelineLocation())
.withOwner(create.getOwner())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -28,7 +28,6 @@ import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -379,7 +378,7 @@ public class PolicyResource {
.withPolicyUrl(create.getPolicyUrl())
.withPolicyType(create.getPolicyType())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date())
.withUpdatedAt(System.currentTimeMillis())
.withRules(create.getRules());
if (create.getLocation() != null) {
policy = policy.withLocation(new EntityReference().withId(create.getLocation()));

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -165,6 +164,6 @@ public class ReportResource {
report
.withId(UUID.randomUUID())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -316,6 +315,6 @@ public class DashboardServiceResource {
.withPassword(create.getPassword())
.withIngestionSchedule(create.getIngestionSchedule())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -310,6 +309,6 @@ public class DatabaseServiceResource {
.withJdbc(create.getJdbc())
.withIngestionSchedule(create.getIngestionSchedule())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -320,6 +319,6 @@ public class MessagingServiceResource {
.withSchemaRegistry(create.getSchemaRegistry())
.withIngestionSchedule(create.getIngestionSchedule())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -322,6 +321,6 @@ public class PipelineServiceResource {
.withPipelineUrl(create.getPipelineUrl())
.withIngestionSchedule(create.getIngestionSchedule())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -309,6 +308,6 @@ public class StorageServiceResource {
.withDescription(create.getDescription())
.withServiceType(create.getServiceType())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -24,7 +24,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -97,7 +96,7 @@ public class TagResource {
String tagJson = IOUtil.toString(getClass().getClassLoader().getResourceAsStream(tagFile));
TagCategory tagCategory = JsonUtils.readValue(tagJson, TagCategory.class);
// TODO hack for now
Date now = new Date();
long now = System.currentTimeMillis();
tagCategory.withUpdatedBy("admin").withUpdatedAt(now);
tagCategory
.getChildren()
@ -291,7 +290,7 @@ public class TagResource {
.withCategoryType(create.getCategoryType())
.withDescription(create.getDescription())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
category = addHref(uriInfo, dao.createCategory(category));
return Response.created(category.getHref()).entity(category).build();
}
@ -323,7 +322,7 @@ public class TagResource {
.withDescription(create.getDescription())
.withAssociatedTags(create.getAssociatedTags())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
URI categoryHref = RestUtil.getHref(uriInfo, TAG_COLLECTION_PATH, category);
tag = addHref(categoryHref, dao.createPrimaryTag(category, tag));
return Response.created(tag.getHref()).entity(tag).build();
@ -364,7 +363,7 @@ public class TagResource {
.withDescription(create.getDescription())
.withAssociatedTags(create.getAssociatedTags())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
URI categoryHref = RestUtil.getHref(uriInfo, TAG_COLLECTION_PATH, category);
URI parentHRef = RestUtil.getHref(categoryHref, primaryTag);
tag = addHref(parentHRef, dao.createSecondaryTag(category, primaryTag, tag));

View File

@ -28,7 +28,6 @@ import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -343,6 +342,6 @@ public class RoleResource {
.withDescription(ct.getDescription())
.withDisplayName(ct.getDisplayName())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -28,7 +28,6 @@ import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -369,7 +368,7 @@ public class TeamResource {
.withDisplayName(ct.getDisplayName())
.withProfile(ct.getProfile())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date())
.withUpdatedAt(System.currentTimeMillis())
.withUsers(dao.getUsers(ct.getUsers()));
}
}

View File

@ -29,7 +29,6 @@ import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -414,7 +413,7 @@ public class UserResource {
.withProfile(create.getProfile())
.withTimezone(create.getTimezone())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date())
.withUpdatedAt(System.currentTimeMillis())
.withTeams(dao.validateTeams(create.getTeams()))
.withRoles(dao.validateRoles(create.getRoles()));
}

View File

@ -29,7 +29,6 @@ import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -432,6 +431,6 @@ public class TopicResource {
.withTags(create.getTags())
.withOwner(create.getOwner())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
}
}

View File

@ -17,7 +17,6 @@ import static org.openmetadata.catalog.resources.teams.UserResource.FIELD_LIST;
import java.io.IOException;
import java.text.ParseException;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -82,7 +81,7 @@ public class DefaultAuthorizer implements Authorizer {
.withEmail(adminUser + "@" + principalDomain)
.withIsAdmin(true)
.withUpdatedBy(adminUser)
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
addOrUpdateAdmin(user);
} catch (IOException | ParseException e) {
LOG.error("Failed to create admin user {}", adminUser, e);
@ -108,7 +107,7 @@ public class DefaultAuthorizer implements Authorizer {
.withEmail(botUser + "@" + principalDomain)
.withIsBot(true)
.withUpdatedBy(botUser)
.withUpdatedAt(new Date());
.withUpdatedAt(System.currentTimeMillis());
addOrUpdateAdmin(user);
} catch (IOException | ParseException e) {
LOG.error("Failed to create admin user {}", botUser, e);

View File

@ -14,7 +14,6 @@
package org.openmetadata.catalog.util;
import java.net.URI;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.openmetadata.catalog.type.ChangeDescription;
@ -45,7 +44,7 @@ public interface EntityInterface<T> {
String getUpdatedBy();
Date getUpdatedAt();
long getUpdatedAt();
URI getHref();
@ -73,7 +72,7 @@ public interface EntityInterface<T> {
void setDisplayName(String displayName);
void setUpdateDetails(String updatedBy, Date updatedAt);
void setUpdateDetails(String updatedBy, long updatedAt);
void setChangeDescription(Double newVersion, ChangeDescription changeDescription);

View File

@ -13,6 +13,8 @@
package org.openmetadata.catalog.util;
import static org.openmetadata.catalog.util.RestUtil.DATE_TIME_FORMAT;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@ -43,7 +45,7 @@ public final class JsonUtils {
OBJECT_MAPPER = new ObjectMapper();
// Ensure the date-time fields are serialized in ISO-8601 format
OBJECT_MAPPER.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
OBJECT_MAPPER.setDateFormat(RestUtil.DATE_TIME_FORMAT);
OBJECT_MAPPER.setDateFormat(DATE_TIME_FORMAT);
OBJECT_MAPPER.registerModule(new JSR353Module());
}

View File

@ -38,6 +38,7 @@ public final class RestUtil {
public static final String ENTITY_FIELDS_CHANGED = "entityFieldsChanged";
public static final String ENTITY_NO_CHANGE = "entityNoChange";
public static final String ENTITY_DELETED = "entityDeleted";
public static final String SIGNATURE_HEADER = "X-OM-Signature";
public static final DateFormat DATE_TIME_FORMAT;
public static final DateFormat DATE_FORMAT;

View File

@ -41,6 +41,10 @@
"description": "When set to `true`, the webhook event notification is enabled. Set it to `false` to disable the subscription. (Default `true`)",
"type": "boolean",
"default": true
},
"secretKey" : {
"description" : "Secret set by the webhook client used for computing HMAC SHA256 signature of webhook payload and sent in `X-OM-Signature` header in POST requests to publish the events.",
"type" : "string"
}
},
"required": [

View File

@ -29,8 +29,8 @@
"$ref": "../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -85,8 +85,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -35,8 +35,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -40,8 +40,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -64,8 +64,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -35,8 +35,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -264,8 +264,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -89,8 +89,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -35,8 +35,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -460,8 +460,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -72,8 +72,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -47,13 +47,17 @@
"type": "boolean",
"default": true
},
"secretKey" : {
"description" : "Secret set by the webhook client used for computing HMAC SHA256 signature of webhook payload and sent in `X-OM-Signature` header in POST requests to publish the events.",
"type" : "string"
},
"version" : {
"description": "Metadata version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt": {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy": {
"description": "User who made the update.",
@ -91,8 +95,8 @@
"type": "string"
},
"nextAttempt": {
"description": "Next retry will be done at this time. Only valid is `status` is `awaitingRetry`.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Next retry will be done at this time in Unix epoch time milliseconds. Only valid is `status` is `awaitingRetry`.",
"$ref": "../../type/basic.json#/definitions/timestamp"
}
},
"additionalProperties": false

View File

@ -16,9 +16,8 @@
"type": "string"
},
"postTs": {
"description": "Timestamp of the post.",
"type": "string",
"format": "date-time"
"description": "Timestamp of the post in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"from": {
"description": "ID of User (regular user or a bot) posting the message.",
@ -38,8 +37,8 @@
"$ref": "../../type/basic.json#/definitions/href"
},
"threadTs": {
"description": "Timestamp of the when the first post created the thread.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Timestamp of the when the first post created the thread in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"about": {
"description": "Data asset about which this thread is created for with format <#E/{entities}/{entityName}/{field}/{fieldValue}.",

View File

@ -94,8 +94,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt": {
"description": "Last update time corresponding to the new version of the Policy.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the Policy in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy": {
"description": "User who made the update.",

View File

@ -62,8 +62,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -102,8 +102,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -58,8 +58,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -54,8 +54,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -33,8 +33,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -50,8 +50,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",
@ -109,8 +109,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -32,8 +32,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -34,8 +34,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -35,8 +35,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -264,8 +264,8 @@
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy" : {
"description": "User who made the update.",

View File

@ -36,9 +36,9 @@
"description": "Type of Entity that is modified by the operation.",
"type": "string"
},
"dateTime": {
"description": "Date when the API call is made.",
"$ref": "basic.json#/definitions/dateTime"
"timestamp": {
"description": "Timestamp when the API call is made in Unix epoch time milliseconds in Unix epoch time milliseconds.",
"$ref": "basic.json#/definitions/timestamp"
}
},
"required": ["method", "responseCode", "path", "userName", "entityId", "entityType"],

View File

@ -18,8 +18,9 @@
"maxLength": 127
},
"timestamp": {
"description": "Timestamp in unixTimeMillis.",
"type": "string",
"description": "Timestamp in Unix epoch time milliseconds.",
"@comment" : "Note that during code generation this is converted into long",
"type": "integer",
"format": "utc-millisec"
},
"href": {

View File

@ -65,9 +65,9 @@
"description": "Name of the user whose activity resulted in the change.",
"type": "string"
},
"dateTime": {
"description": "Date and time when the change was made.",
"$ref": "basic.json#/definitions/dateTime"
"timestamp": {
"description": "Timestamp when the change was made in Unix epoch time milliseconds.",
"$ref": "basic.json#/definitions/timestamp"
},
"changeDescription" : {
"description": "For `eventType` `entityUpdated` this field captures details about what fields were added/updated/deleted. For `eventType` `entityCreated` or `entityDeleted` this field is null.",
@ -77,6 +77,6 @@
"description": "For `eventType` `entityCreated`, this field captures JSON coded string of the entity using the schema corresponding to `entityType`."
}
},
"required": ["eventType", "entityType", "entityId", "dateTime"],
"required": ["eventType", "entityType", "entityId", "timestamp"],
"additionalProperties": false
}

View File

@ -48,7 +48,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -107,7 +106,6 @@ import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.ResultList;
import org.openmetadata.catalog.util.TestUtils;
@ -918,21 +916,21 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
HttpResponseException exception =
assertThrows(
HttpResponseException.class,
() -> getChangeEvents("invalidEntity", entityName, null, new Date(), adminAuthHeaders()));
() -> getChangeEvents("invalidEntity", entityName, null, System.currentTimeMillis(), adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "Invalid entity invalidEntity in query param entityCreated");
// Invalid entityUpdated list
exception =
assertThrows(
HttpResponseException.class,
() -> getChangeEvents(null, "invalidEntity", entityName, new Date(), adminAuthHeaders()));
() -> getChangeEvents(null, "invalidEntity", entityName, System.currentTimeMillis(), adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "Invalid entity invalidEntity in query param entityUpdated");
// Invalid entityDeleted list
exception =
assertThrows(
HttpResponseException.class,
() -> getChangeEvents(entityName, null, "invalidEntity", new Date(), adminAuthHeaders()));
() -> getChangeEvents(entityName, null, "invalidEntity", System.currentTimeMillis(), adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "Invalid entity invalidEntity in query param entityDeleted");
}
@ -1180,18 +1178,18 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
*/
protected final void validateChangeEvents(
EntityInterface<T> entityInterface,
Date updateTime,
long timestamp,
EventType expectedEventType,
ChangeDescription expectedChangeDescription,
Map<String, String> authHeaders)
throws IOException {
validateChangeEvents(entityInterface, updateTime, expectedEventType, expectedChangeDescription, authHeaders, true);
validateChangeEvents(entityInterface, updateTime, expectedEventType, expectedChangeDescription, authHeaders, false);
validateChangeEvents(entityInterface, timestamp, expectedEventType, expectedChangeDescription, authHeaders, true);
validateChangeEvents(entityInterface, timestamp, expectedEventType, expectedChangeDescription, authHeaders, false);
}
private void validateChangeEvents(
EntityInterface<T> entityInterface,
Date updateTime,
long timestamp,
EventType expectedEventType,
ChangeDescription expectedChangeDescription,
Map<String, String> authHeaders,
@ -1207,30 +1205,26 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
// Try multiple times before giving up
if (withEventFilter) {
// Get change event with an event filter for specific entity entityName
changeEvents = getChangeEvents(entityName, entityName, null, updateTime, authHeaders);
changeEvents = getChangeEvents(entityName, entityName, null, timestamp, authHeaders);
} else {
// Get change event with no event filter for entity types
changeEvents = getChangeEvents("*", "*", null, updateTime, authHeaders);
changeEvents = getChangeEvents("*", "*", null, timestamp, authHeaders);
}
// Wait for change event to be recorded
if (changeEvents.getData().size() == 0) {
continue;
}
for (ChangeEvent event : changeEvents.getData()) {
if (event.getDateTime().getTime() == updateTime.getTime()) {
changeEvent = event;
break;
}
}
if (changeEvent == null) {
if (changeEvent == null || changeEvents.getData().size() == 0) {
try {
Thread.sleep(iteration * 10L); // Sleep with backoff
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (ChangeEvent event : changeEvents.getData()) {
if (event.getTimestamp() == timestamp) {
changeEvent = event;
break;
}
}
iteration++;
}
@ -1239,7 +1233,7 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
"Expected change event "
+ expectedEventType
+ " at "
+ updateTime.getTime()
+ timestamp
+ " was not found for entity "
+ entityInterface.getId());
@ -1271,13 +1265,13 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
}
protected ResultList<ChangeEvent> getChangeEvents(
String entityCreated, String entityUpdated, String entityDeleted, Date date, Map<String, String> authHeaders)
String entityCreated, String entityUpdated, String entityDeleted, long timestamp, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource("events");
target = entityCreated == null ? target : target.queryParam("entityCreated", entityCreated);
target = entityUpdated == null ? target : target.queryParam("entityUpdated", entityUpdated);
target = entityDeleted == null ? target : target.queryParam("entityDeleted", entityDeleted);
target = target.queryParam("date", RestUtil.DATE_TIME_FORMAT.format(date));
target = target.queryParam("timestamp", timestamp);
return TestUtils.get(target, ChangeEventList.class, authHeaders);
}
@ -1383,7 +1377,7 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
// Validate change events
validateChangeEvents(
entityInterface, event.getDateTime(), EventType.ENTITY_UPDATED, event.getChangeDescription(), authHeaders);
entityInterface, event.getTimestamp(), EventType.ENTITY_UPDATED, event.getChangeDescription(), authHeaders);
}
protected void deleteAndCheckFollower(
@ -1399,7 +1393,7 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
// Validate change events
validateChangeEvents(
entityInterface, change.getDateTime(), EventType.ENTITY_UPDATED, change.getChangeDescription(), authHeaders);
entityInterface, change.getTimestamp(), EventType.ENTITY_UPDATED, change.getChangeDescription(), authHeaders);
}
public T checkFollowerDeleted(UUID entityId, UUID userId, Map<String, String> authHeaders)

View File

@ -1,5 +1,8 @@
package org.openmetadata.catalog.resources.events;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -7,6 +10,7 @@ import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.ws.rs.Consumes;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@ -19,6 +23,9 @@ import javax.ws.rs.core.UriInfo;
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EventType;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.common.utils.CommonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,8 +46,13 @@ public class WebhookCallbackResource {
public Response receiveEventCount(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@HeaderParam(RestUtil.SIGNATURE_HEADER) String signature,
@PathParam("testName") String testName,
EventResource.ChangeEventList events) {
ChangeEventList events)
throws IOException {
String payload = JsonUtils.pojoToJson(events);
String computedSignature = "sha256=" + CommonUtil.calculateHMAC("webhookTest", payload);
assertEquals(computedSignature, signature);
addEventDetails(testName, events);
return Response.ok().build();
}
@ -49,7 +61,7 @@ public class WebhookCallbackResource {
@POST
@Path("/simulate/slowServer")
public Response receiveEventWithDelay(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
@Context UriInfo uriInfo, @Context SecurityContext securityContext, ChangeEventList events) {
addEventDetails("simulate-slowServer", events);
return Response.ok().build();
}
@ -58,7 +70,7 @@ public class WebhookCallbackResource {
@POST
@Path("/simulate/timeout")
public Response receiveEventWithTimeout(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
@Context UriInfo uriInfo, @Context SecurityContext securityContext, ChangeEventList events) {
addEventDetails("simulate-timeout", events);
try {
Thread.sleep(15 * 1000);
@ -72,7 +84,7 @@ public class WebhookCallbackResource {
@POST
@Path("/simulate/300")
public Response receiveEvent300(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
@Context UriInfo uriInfo, @Context SecurityContext securityContext, ChangeEventList events) {
addEventDetails("simulate-300", events);
return Response.status(Response.Status.MOVED_PERMANENTLY).build();
}
@ -81,7 +93,7 @@ public class WebhookCallbackResource {
@POST
@Path("/simulate/400")
public Response receiveEvent400(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
@Context UriInfo uriInfo, @Context SecurityContext securityContext, ChangeEventList events) {
addEventDetails("simulate-400", events);
return Response.status(Response.Status.BAD_REQUEST).build();
}
@ -90,7 +102,7 @@ public class WebhookCallbackResource {
@POST
@Path("/simulate/500")
public Response receiveEvent500(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
@Context UriInfo uriInfo, @Context SecurityContext securityContext, ChangeEventList events) {
addEventDetails("simulate-500", events);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
@ -103,7 +115,7 @@ public class WebhookCallbackResource {
@Context SecurityContext securityContext,
@PathParam("eventType") String eventType,
@PathParam("entityType") String entityType,
EventResource.ChangeEventList events) {
ChangeEventList events) {
String key = eventType + ":" + entityType;
List<ChangeEvent> list = entityCallbackMap.get(key);
if (list == null) {
@ -128,11 +140,11 @@ public class WebhookCallbackResource {
EventDetails details = eventMap.get(endpoint); // Default endpoint
if (details == null) {
details = new EventDetails();
details.setFirstEventTime(events.getData().get(0).getDateTime().getTime());
details.setFirstEventTime(events.getData().get(0).getTimestamp());
eventMap.put(endpoint, details);
}
details.getEvents().addAll(events.getData());
details.setLatestEventTime(events.getData().get(events.getData().size() - 1).getDateTime().getTime());
details.setLatestEventTime(events.getData().get(events.getData().size() - 1).getTimestamp());
LOG.info("Event received {}, total count {}", endpoint, details.getEvents().size());
}

View File

@ -23,7 +23,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -193,7 +192,8 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
.withEventFilters(ALL_EVENTS_FILTER)
.withEndpoint(URI.create(uri))
.withBatchSize(100)
.withEnabled(false);
.withEnabled(false)
.withSecretKey("webhookTest");
}
@Override
@ -276,8 +276,8 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
assertNotNull(callbackEvents);
assertNotNull(callbackEvents.peek());
List<ChangeEvent> actualEvents =
getChangeEvents("*", "*", "*", callbackEvents.peek().getDateTime(), adminAuthHeaders()).getData();
waitAndCheckForEvents(callbackEvents, actualEvents, 10, 100);
getChangeEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), adminAuthHeaders()).getData();
waitAndCheckForEvents(actualEvents, callbackEvents, 10, 100);
assertWebhookStatusSuccess("healthy");
}
@ -287,15 +287,15 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
// For the entity all the webhooks registered for created events have the right number of events
List<ChangeEvent> callbackEvents =
webhookCallbackResource.getEntityCallbackEvents(EventType.ENTITY_CREATED, entity);
Date date = callbackEvents.get(0).getDateTime();
List<ChangeEvent> events = getChangeEvents(entity, null, null, date, adminAuthHeaders()).getData();
long timestamp = callbackEvents.get(0).getTimestamp();
List<ChangeEvent> events = getChangeEvents(entity, null, null, timestamp, adminAuthHeaders()).getData();
waitAndCheckForEvents(callbackEvents, events, 30, 100);
// For the entity all the webhooks registered for updated events have the right number of events
callbackEvents = webhookCallbackResource.getEntityCallbackEvents(EventType.ENTITY_UPDATED, entity);
// Use previous date if no update events
date = callbackEvents.size() > 0 ? callbackEvents.get(0).getDateTime() : date;
events = getChangeEvents(null, entity, null, date, adminAuthHeaders()).getData();
timestamp = callbackEvents.size() > 0 ? callbackEvents.get(0).getTimestamp() : timestamp;
events = getChangeEvents(null, entity, null, timestamp, adminAuthHeaders()).getData();
waitAndCheckForEvents(callbackEvents, events, 30, 100);
// TODO add delete event support
@ -321,8 +321,8 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
assertNotNull(callbackEvents.peek());
List<ChangeEvent> actualEvents =
getChangeEvents("*", "*", "*", callbackEvents.peek().getDateTime(), adminAuthHeaders()).getData();
waitAndCheckForEvents(callbackEvents, actualEvents, 30, 100);
getChangeEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), adminAuthHeaders()).getData();
waitAndCheckForEvents(actualEvents, callbackEvents, 30, 100);
// Check all webhook status
assertWebhookStatusSuccess("slowServer");
@ -377,19 +377,11 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
expected.forEach(
c1 ->
LOG.info(
"expected {}:{}:{}:{}",
c1.getDateTime().getTime(),
c1.getEventType(),
c1.getEntityType(),
c1.getEntityId()));
"expected {}:{}:{}:{}", c1.getTimestamp(), c1.getEventType(), c1.getEntityType(), c1.getEntityId()));
received.forEach(
c1 ->
LOG.info(
"received {}:{}:{}:{}",
c1.getDateTime().getTime(),
c1.getEventType(),
c1.getEntityType(),
c1.getEntityId()));
"received {}:{}:{}:{}", c1.getTimestamp(), c1.getEventType(), c1.getEntityType(), c1.getEntityId()));
}
assertEquals(expected.size(), received.size());
}

View File

@ -15,12 +15,14 @@ package org.openmetadata.common.utils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
@ -31,6 +33,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -123,6 +127,7 @@ public final class CommonUtil {
return givenDate.after(startDate) && givenDate.before(endDate);
}
/** Parse a date using given DataFormat */
public static Date parseDate(String date, DateFormat dateFormat) {
try {
return dateFormat.parse(date);
@ -130,4 +135,21 @@ public final class CommonUtil {
throw new RuntimeException(e);
}
}
public static final String HMAC_SHA256_ALGORITHM = "HmacSHA256";
/** Get SHA256 Hash-based Message Authentication Code */
public static String calculateHMAC(String secretKey, String message) {
// return message;
try {
Mac mac = Mac.getInstance(HMAC_SHA256_ALGORITHM);
SecretKeySpec secretKeySpec =
new SecretKeySpec(secretKey.getBytes(StandardCharsets.UTF_8), HMAC_SHA256_ALGORITHM);
mac.init(secretKeySpec);
byte[] hmacSha256 = mac.doFinal(message.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(hmacSha256);
} catch (Exception e) {
throw new RuntimeException("Failed to calculate " + HMAC_SHA256_ALGORITHM, e);
}
}
}