From 5892595b1f844af989bb52192a9181ec0f456a88 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Thu, 23 Dec 2021 11:26:00 -0800 Subject: [PATCH] Fixes #1900 - Initial implementation of webhook API (#1905) Co-authored-by: Suresh Srinivas --- .idea/encodings.xml | 7 + .../mysql/v003__create_db_connection_info.sql | 8 + catalog-rest-service/pom.xml | 6 +- .../catalog/CatalogApplication.java | 2 + .../java/org/openmetadata/catalog/Entity.java | 1 + .../catalog/events/ChangeEventHandler.java | 4 +- .../catalog/events/EventFilter.java | 5 +- .../catalog/events/EventPubSub.java | 103 +++++ .../catalog/jdbi3/CollectionDAO.java | 27 ++ .../catalog/jdbi3/EntityRepository.java | 6 +- .../catalog/jdbi3/WebhookRepository.java | 414 ++++++++++++++++++ .../catalog/resources/CollectionRegistry.java | 78 ++-- .../resources/events/EventResource.java | 3 + .../resources/events/WebhookResource.java | 296 +++++++++++++ .../json/schema/api/events/createWebhook.json | 47 ++ .../json/schema/entity/events/webhook.json | 112 +++++ .../json/schema/type/changeEvent.json | 2 +- .../catalog/CatalogApplicationTest.java | 6 +- .../catalog/resources/EntityResourceTest.java | 17 +- .../events/WebhookCallbackResource.java | 96 ++++ .../resources/events/WebhookResourceTest.java | 184 ++++++++ .../openmetadata/catalog/util/TestUtils.java | 23 +- .../openmetadata/common/utils/CommonUtil.java | 8 + pom.xml | 10 +- 24 files changed, 1403 insertions(+), 62 deletions(-) create mode 100644 bootstrap/sql/mysql/v003__create_db_connection_info.sql create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPubSub.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java create mode 100644 catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json create mode 100644 catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json create mode 100644 catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java create mode 100644 catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java diff --git a/.idea/encodings.xml b/.idea/encodings.xml index c7d8cba3020..edf8aa6557b 100644 --- a/.idea/encodings.xml +++ b/.idea/encodings.xml @@ -5,6 +5,13 @@ + + + + + + + \ No newline at end of file diff --git a/bootstrap/sql/mysql/v003__create_db_connection_info.sql b/bootstrap/sql/mysql/v003__create_db_connection_info.sql new file mode 100644 index 00000000000..f77f22bc997 --- /dev/null +++ b/bootstrap/sql/mysql/v003__create_db_connection_info.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS webhook_entity ( + id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL, + name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL, + json JSON NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_name(name) + -- No versioning, updatedAt, updatedBy, or changeDescription fields for webhook +); \ No newline at end of file diff --git a/catalog-rest-service/pom.xml b/catalog-rest-service/pom.xml index 7d2e61fc966..87b69423fc6 100644 --- a/catalog-rest-service/pom.xml +++ b/catalog-rest-service/pom.xml @@ -315,11 +315,13 @@ javafaker 1.0.2 - + + com.lmax + disruptor + - org.codehaus.mojo diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java index 83c03135c8d..0b48a8ebecd 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java @@ -40,6 +40,7 @@ import org.glassfish.jersey.media.multipart.MultiPartFeature; import org.glassfish.jersey.server.ServerProperties; import org.jdbi.v3.core.Jdbi; import org.openmetadata.catalog.events.EventFilter; +import org.openmetadata.catalog.events.EventPubSub; import org.openmetadata.catalog.exception.CatalogGenericExceptionMapper; import org.openmetadata.catalog.exception.ConstraintViolationExceptionMapper; import org.openmetadata.catalog.exception.JsonMappingExceptionMapper; @@ -106,6 +107,7 @@ public class CatalogApplication extends Application { // Register Event Handler registerEventFilter(catalogConfig, environment, jdbi); + EventPubSub.start(); } @SneakyThrows diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java index 9513651b49e..07539299034 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java @@ -77,6 +77,7 @@ public final class Entity { // Operations // public static final String INGESTION = "ingestion"; + public static final String WEBHOOK = "webhook"; private Entity() {} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java index 315bba5598d..8dc9714ff50 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java @@ -42,13 +42,15 @@ public class ChangeEventHandler implements EventHandler { ChangeEvent changeEvent = getChangeEvent(method, responseContext); if (changeEvent != null) { LOG.info("Recording change event {} {}", changeEvent.getDateTime().getTime(), changeEvent); + EventPubSub.publish(changeEvent); if (changeEvent.getEntity() != null) { changeEvent.setEntity(JsonUtils.pojoToJson(changeEvent.getEntity())); } + dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); } } catch (Exception e) { - LOG.error("Failed to capture change event for method {} due to {}", method, e); + LOG.error("Failed to capture change event for method {} due to ", method, e); } return null; } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventFilter.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventFilter.java index 93f150623b2..02e992367cf 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventFilter.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventFilter.java @@ -51,21 +51,20 @@ public class EventFilter implements ContainerResponseFilter { ((Class) Class.forName(eventHandlerClassName)).getConstructor().newInstance(); eventHandler.init(config, jdbi); eventHandlers.add(eventHandler); + LOG.info("Added event handler {}", eventHandlerClassName); } } catch (Exception e) { - LOG.info(e.getMessage()); + LOG.info("Exception ", e); } } @Override public void filter(ContainerRequestContext requestContext, ContainerResponseContext responseContext) { - int responseCode = responseContext.getStatus(); String method = requestContext.getMethod(); if ((responseCode < 200 || responseCode > 299) || (!AUDITABLE_METHODS.contains(method))) { return; } - eventHandlers .parallelStream() .forEach( diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPubSub.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPubSub.java new file mode 100644 index 00000000000..092ad34fda6 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPubSub.java @@ -0,0 +1,103 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.events; + +import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.util.DaemonThreadFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.openmetadata.catalog.type.ChangeEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Change event PubSub built based on LMAX Disruptor. + */ +public class EventPubSub { + private static final Logger LOG = LoggerFactory.getLogger(EventPubSub.class); + private static Disruptor disruptor; + private static ExecutorService executor; + private static RingBuffer ringBuffer; + private static boolean STARTED = false; + + public static void start() { + if (!STARTED) { + disruptor = new Disruptor<>(ChangeEventHolder::new, 1024, DaemonThreadFactory.INSTANCE); + executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); + ringBuffer = disruptor.start(); + LOG.info("Disruptor started"); + STARTED = true; + } + } + + public static void shutdown() throws InterruptedException { + if (STARTED) { + disruptor.shutdown(); + disruptor.halt(); + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.SECONDS); + disruptor = null; + ringBuffer = null; + STARTED = false; + LOG.info("Disruptor stopped"); + } + } + + public static class ChangeEventHolder { + private ChangeEvent value; + + public void set(ChangeEvent event) { + this.value = event; + } + + public ChangeEvent get() { + return value; + } + } + + public static class ChangeEventFactory implements EventFactory { + public ChangeEventHolder newInstance() { + return new ChangeEventHolder(); + } + } + + public static void publish(ChangeEvent event) { + if (event != null) { + RingBuffer ringBuffer = disruptor.getRingBuffer(); + long sequence = ringBuffer.next(); + ringBuffer.get(sequence).set(event); + ringBuffer.publish(sequence); + } + } + + public static BatchEventProcessor addEventHandler(EventHandler eventHandler) { + BatchEventProcessor processor = + new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(), eventHandler); + ringBuffer.addGatingSequences(processor.getSequence()); + executor.execute(processor); + LOG.info("Processor added for {}", processor); + return processor; + } + + public static void removeProcessor(BatchEventProcessor processor) { + ringBuffer.removeGatingSequence(processor.getSequence()); + } + + public void close() {} +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java index d472e11e88b..b7d0b02d962 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java @@ -66,11 +66,13 @@ import org.openmetadata.catalog.jdbi3.TableRepository.TableEntityInterface; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamEntityInterface; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicEntityInterface; import org.openmetadata.catalog.jdbi3.UserRepository.UserEntityInterface; +import org.openmetadata.catalog.jdbi3.WebhookRepository.WebhookEntityInterface; import org.openmetadata.catalog.operations.workflows.Ingestion; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.TagLabel; import org.openmetadata.catalog.type.UsageDetails; import org.openmetadata.catalog.type.UsageStats; +import org.openmetadata.catalog.type.Webhook; import org.openmetadata.catalog.util.EntityUtil; public interface CollectionDAO { @@ -155,6 +157,9 @@ public interface CollectionDAO { @CreateSqlObject ChangeEventDAO changeEventDAO(); + @CreateSqlObject + WebhookDAO webhookDAO(); + interface DashboardDAO extends EntityDAO { @Override default String getTableName() { @@ -798,6 +803,28 @@ public interface CollectionDAO { } } + interface WebhookDAO extends EntityDAO { + @Override + default String getTableName() { + return "webhook_entity"; + } + + @Override + default Class getEntityClass() { + return Webhook.class; + } + + @Override + default String getNameColumn() { + return "name"; + } + + @Override + default EntityReference getEntityReference(Webhook entity) { + return new WebhookEntityInterface(entity).getEntityReference(); + } + } + @RegisterRowMapper(TagLabelMapper.class) interface TagDAO { @SqlUpdate("INSERT INTO tag_category (json) VALUES (:json)") diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java index 8de7c5fa1ea..6ea309309c4 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java @@ -405,7 +405,7 @@ public abstract class EntityRepository { return entity; } - protected T withHref(UriInfo uriInfo, T entity) { + public T withHref(UriInfo uriInfo, T entity) { if (uriInfo == null) { return entity; } @@ -607,7 +607,7 @@ public abstract class EntityRepository { } public final void storeUpdate() throws IOException { - if (updateVersion(original.getVersion())) { + if (updateVersion(original.getVersion())) { // Update changed the entity veresion // Store the old version String extensionName = EntityUtil.getVersionExtension(entityName, original.getVersion()); daoCollection @@ -616,7 +616,7 @@ public abstract class EntityRepository { // Store the new version EntityRepository.this.storeEntity(updated.getEntity(), true); - } else { + } else { // Update did not change the entity version updated.setUpdateDetails(original.getUpdatedBy(), original.getUpdatedAt()); } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java new file mode 100644 index 00000000000..272d4a8a990 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java @@ -0,0 +1,414 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + +import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.LifecycleAware; +import java.io.IOException; +import java.net.URI; +import java.net.UnknownHostException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation.Builder; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.jdbi.v3.sqlobject.transaction.Transaction; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.events.EventPubSub; +import org.openmetadata.catalog.events.EventPubSub.ChangeEventHolder; +import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList; +import org.openmetadata.catalog.resources.events.WebhookResource; +import org.openmetadata.catalog.security.SecurityUtil; +import org.openmetadata.catalog.type.ChangeDescription; +import org.openmetadata.catalog.type.ChangeEvent; +import org.openmetadata.catalog.type.EntityReference; +import org.openmetadata.catalog.type.FailureDetails; +import org.openmetadata.catalog.type.TagLabel; +import org.openmetadata.catalog.type.Webhook; +import org.openmetadata.catalog.type.Webhook.Status; +import org.openmetadata.catalog.util.EntityInterface; +import org.openmetadata.catalog.util.EntityUtil.Fields; +import org.openmetadata.catalog.util.JsonUtils; +import org.openmetadata.common.utils.CommonUtil; + +public class WebhookRepository extends EntityRepository { + private final CollectionDAO dao; + private static final List webhookPublisherList = new ArrayList<>(); + + public WebhookRepository(CollectionDAO dao) { + super( + WebhookResource.COLLECTION_PATH, + Entity.WEBHOOK, + Webhook.class, + dao.webhookDAO(), + dao, + Fields.EMPTY_FIELDS, + Fields.EMPTY_FIELDS); + this.dao = dao; + } + + @Override + public EntityInterface getEntityInterface(Webhook entity) { + return new WebhookEntityInterface(entity); + } + + @Override + public Webhook setFields(Webhook entity, Fields fields) throws IOException, ParseException { + return entity; // No fields to set + } + + @Override + public void prepare(Webhook entity) throws IOException { + // Nothing to prepare + } + + @Override + public void storeEntity(Webhook entity, boolean update) throws IOException { + entity.setHref(null); + if (update) { + dao.webhookDAO().update(entity.getId(), JsonUtils.pojoToJson(entity)); + } else { + dao.webhookDAO().insert(entity); + } + } + + @Override + public void storeRelationships(Webhook entity) { + // No relationship to store + } + + @Override + public void restorePatchAttributes(Webhook original, Webhook updated) { + updated.withId(original.getId()).withName(original.getName()); + } + + @Override + public EntityRepository.EntityUpdater getUpdater(Webhook original, Webhook updated, boolean patchOperation) { + return super.getUpdater(original, updated, patchOperation); + } + + public void addWebhook(Webhook webhook) { + WebhookPublisher publisher = new WebhookPublisher(webhook); + BatchEventProcessor processor = EventPubSub.addEventHandler(publisher); + publisher.setProcessor(processor); + webhookPublisherList.add(publisher); + publisher.test(); + LOG.info("Webhook added for {}", webhook); + } + + public static void deleteWebhook(UUID id) throws InterruptedException { + Iterator iterator = webhookPublisherList.iterator(); + while (iterator.hasNext()) { + WebhookPublisher publisher = iterator.next(); + if (publisher.getWebhook().getId().equals(id)) { + iterator.remove(); + publisher.getProcessor().halt(); + publisher.awaitShutdown(); + EventPubSub.removeProcessor(publisher.getProcessor()); + LOG.info("Webhook deleted {}", publisher.getWebhook()); + } + } + } + + @Transaction + public boolean delete(String id) { + return dao.webhookDAO().delete(UUID.fromString(id)) > 0; + } + + public static class WebhookEntityInterface implements EntityInterface { + private final Webhook entity; + + public WebhookEntityInterface(Webhook entity) { + this.entity = entity; + } + + @Override + public UUID getId() { + return entity.getId(); + } + + @Override + public String getDescription() { + return entity.getDescription(); + } + + @Override + public String getDisplayName() { + return entity.getName(); + } + + @Override + public EntityReference getOwner() { + return null; + } + + @Override + public String getFullyQualifiedName() { + return entity.getName(); + } + + @Override + public List getTags() { + return null; + } + + @Override + public Double getVersion() { + return entity.getVersion(); + } + + @Override + public String getUpdatedBy() { + return entity.getUpdatedBy(); + } + + @Override + public Date getUpdatedAt() { + return entity.getUpdatedAt(); + } + + @Override + public EntityReference getEntityReference() { + return new EntityReference() + .withId(getId()) + .withName(getFullyQualifiedName()) + .withDescription(getDescription()) + .withDisplayName(getDisplayName()) + .withType(Entity.WEBHOOK); + } + + @Override + public URI getHref() { + return entity.getHref(); + } + + @Override + public List getFollowers() { + return null; + } + + @Override + public Webhook getEntity() { + return entity; + } + + @Override + public ChangeDescription getChangeDescription() { + return entity.getChangeDescription(); + } + + @Override + public void setId(UUID id) { + entity.setId(id); + } + + @Override + public void setDescription(String description) { + entity.setDescription(description); + } + + @Override + public void setDisplayName(String displayName) {} + + @Override + public void setUpdateDetails(String updatedBy, Date updatedAt) { + entity.setUpdatedBy(updatedBy); + entity.setUpdatedAt(updatedAt); + } + + @Override + public void setChangeDescription(Double newVersion, ChangeDescription changeDescription) { + entity.setVersion(newVersion); + entity.setChangeDescription(changeDescription); + } + + @Override + public void setOwner(EntityReference owner) {} + + @Override + public Webhook withHref(URI href) { + return entity.withHref(href); + } + + @Override + public void setTags(List tags) {} + } + + /** One webhook call back per webhook subscription */ + public class WebhookPublisher implements EventHandler, LifecycleAware { + // Backoff timeout in seconds + private static final int BACKOFF_NORMAL = 0; + private static final int BACKOFF_3_SECONDS = 3; + private static final int BACKOFF_5_MINUTES = 5 * 60; + private static final int BACKOFF_1_HOUR = 60 * 60; + private static final int BACKOFF_24_HOUR = 24 * 60 * 60; + + private int currentBackoffTime = BACKOFF_NORMAL; + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + private final Webhook webhook; + private final List batch = new ArrayList<>(); + private BatchEventProcessor processor; + private Client client; + private Builder target; + + public WebhookPublisher(Webhook webhook) { + this.webhook = webhook; + } + + public void test() { + // TODO + } + + public Webhook getWebhook() { + return webhook; + } + + public void cleanup() { + // TODO + client.close(); + client = null; + } + + @Override + public void onEvent(ChangeEventHolder changeEventHolder, long sequence, boolean endOfBatch) throws Exception { + + batch.add(changeEventHolder.get()); + // Batch until either the batch size has reached the max size + if (!endOfBatch && batch.size() < webhook.getBatchSize()) { + return; + } + + // TODO send max batch size + ChangeEventList list = new ChangeEventList(batch, null, null, batch.size()); + Date attemptTime = new Date(); + try { + Response response = target.post(javax.ws.rs.client.Entity.entity(list, MediaType.APPLICATION_JSON)); + LOG.info( + "Webhook {}:{}:{} received response {}", + webhook.getName(), + webhook.getStatus(), + batch.size(), + response.getStatusInfo()); + // 2xx response means call back is successful + if (response.getStatus() >= 200 && response.getStatus() < 300) { // All 2xx responses + batch.clear(); + if (webhook.getStatus() != Status.SUCCESS) { + setStatus(Status.SUCCESS, null); + } + // 3xx response/redirection is not allowed for callback. Set the webhook state as in error + } else if (response.getStatus() >= 300 && response.getStatus() < 400) { + setErrorStatus(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase()); + // 4xx, 5xx response retry delivering events after timeout + } else if (response.getStatus() >= 300 && response.getStatus() < 600) { + setNextBackOff(); + setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase()); + Thread.sleep(currentBackoffTime); + } + } catch (ProcessingException ex) { + Throwable cause = ex.getCause(); + if (cause.getClass() == UnknownHostException.class) { + LOG.warn("Invalid webhook endpoint {}", webhook.getEndPoint()); + setErrorStatus(attemptTime, null, "UnknownHostException"); + } + } + } + + private void setErrorStatus(Date attemptTime, Integer statusCode, String reason) throws IOException { + if (webhook.getFailureDetails() == null || attemptTime != webhook.getFailureDetails().getLastFailedAttempt()) { + setStatus( + Status.ERROR, + new FailureDetails() + .withLastFailedAttempt(attemptTime) + .withLastFailedStatusCode(statusCode) + .withLastFailedReason(reason)); + } + throw new RuntimeException(reason); + } + + private void setAwaitingRetry(Date attemptTime, int statusCode, String reason) throws ParseException, IOException { + if (webhook.getFailureDetails() == null || attemptTime != webhook.getFailureDetails().getLastFailedAttempt()) { + setStatus( + Status.AWAITING_RETRY, + new FailureDetails() + .withLastFailedAttempt(attemptTime) + .withLastFailedStatusCode(statusCode) + .withLastFailedReason(reason) + .withNextAttempt(CommonUtil.getDateByOffsetSeconds(attemptTime, currentBackoffTime))); + } + } + + private void setStatus(Status status, FailureDetails details) throws IOException { + webhook.setStatus(status); + webhook.setFailureDetails(details); + // TODO versioning + storeEntity(webhook, true); + } + + @Override + public void onStart() { + test(); + LOG.info("Webhook processor with webhook {} started", webhook); + ClientBuilder clientBuilder = ClientBuilder.newBuilder(); + clientBuilder.connectTimeout(10, TimeUnit.SECONDS); + clientBuilder.readTimeout(12, TimeUnit.SECONDS); + client = clientBuilder.build(); + + // TODO clean this up + Map authHeaders = SecurityUtil.authHeaders("admin@open-metadata.org"); + target = SecurityUtil.addHeaders(client.target(webhook.getEndPoint()), authHeaders); + } + + @Override + public void onShutdown() { + cleanup(); + shutdownLatch.countDown(); + LOG.info("Cleaned up webhook {}", webhook); + } + + public void awaitShutdown() throws InterruptedException { + shutdownLatch.await(); + } + + public void setProcessor(BatchEventProcessor processor) { + this.processor = processor; + } + + public BatchEventProcessor getProcessor() { + return processor; + } + + public void setNextBackOff() { + if (currentBackoffTime == BACKOFF_NORMAL) { + currentBackoffTime = BACKOFF_3_SECONDS; + } else if (currentBackoffTime == BACKOFF_3_SECONDS) { + currentBackoffTime = BACKOFF_5_MINUTES; + } else if (currentBackoffTime == BACKOFF_5_MINUTES) { + currentBackoffTime = BACKOFF_1_HOUR; + } else if (currentBackoffTime == BACKOFF_1_HOUR) { + currentBackoffTime = BACKOFF_24_HOUR; + } + } + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/CollectionRegistry.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/CollectionRegistry.java index 3764ca29e74..a621f0546f4 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/CollectionRegistry.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/CollectionRegistry.java @@ -47,41 +47,22 @@ import org.slf4j.LoggerFactory; */ public final class CollectionRegistry { private static final Logger LOG = LoggerFactory.getLogger(CollectionRegistry.class); - private static CollectionRegistry instance = null; - - public static class CollectionDetails { - private final String resourceClass; - private final CollectionDescriptor cd; - private final List childCollections = new ArrayList<>(); - - CollectionDetails(CollectionDescriptor cd, String resourceClass) { - this.cd = cd; - this.resourceClass = resourceClass; - } - - public void addChildCollection(CollectionDetails child) { - CollectionInfo collectionInfo = child.cd.getCollection(); - LOG.info( - "Adding child collection {} to parent collection {}", collectionInfo.getName(), cd.getCollection().getName()); - childCollections.add(child.cd); - } - - public CollectionDescriptor[] getChildCollections() { - return childCollections.toArray(new CollectionDescriptor[0]); - } - } + private static CollectionRegistry INSTANCE = null; /** Map of collection endpoint path to collection details */ private final Map collectionMap = new HashMap<>(); + /** Resources used only for testing */ + private final List testResources = new ArrayList<>(); + private CollectionRegistry() {} public static CollectionRegistry getInstance() { - if (instance == null) { - instance = new CollectionRegistry(); - instance.initialize(); + if (INSTANCE == null) { + INSTANCE = new CollectionRegistry(); + INSTANCE.initialize(); } - return instance; + return INSTANCE; } private void initialize() { @@ -113,11 +94,6 @@ public final class CollectionRegistry { for (CollectionDetails collection : collections) { CollectionInfo collectionInfo = collection.cd.getCollection(); collectionMap.put(collectionInfo.getHref().getPath(), collection); - LOG.info( - "Initialized collection name {} href {} details {}", - collectionInfo.getName(), - collectionInfo.getHref(), - collection); } // Now add collections to their parents @@ -136,6 +112,10 @@ public final class CollectionRegistry { } } + public static void addTestResource(Object testResource) { + getInstance().testResources.add(testResource); + } + /** Register resources from CollectionRegistry */ public void registerResources( Jdbi jdbi, Environment environment, CatalogApplicationConfig config, CatalogAuthorizer authorizer) { @@ -145,7 +125,7 @@ public final class CollectionRegistry { String resourceClass = details.resourceClass; try { CollectionDAO daoObject = jdbi.onDemand(CollectionDAO.class); - Objects.requireNonNull(daoObject); + Objects.requireNonNull(daoObject, "CollectionDAO must not be null"); Object resource = createResource(daoObject, resourceClass, config, authorizer); environment.jersey().register(resource); LOG.info("Registering {}", resourceClass); @@ -153,6 +133,13 @@ public final class CollectionRegistry { LOG.warn("Failed to create resource for class {} {}", resourceClass, ex); } } + + // Now add test resources + testResources.forEach( + object -> { + LOG.info("Registering test resource {}", object); + environment.jersey().register(object); + }); } /** Get collection details based on annotations in Resource classes */ @@ -202,22 +189,41 @@ public final class CollectionRegistry { // Create the resource identified by resourceClass try { - LOG.info("Creating resource {}", resourceClass); resource = clz.getDeclaredConstructor(CollectionDAO.class, CatalogAuthorizer.class).newInstance(daoObject, authorizer); } catch (NoSuchMethodException ex) { - LOG.info("Creating resource {} with default constructor", resourceClass); resource = Class.forName(resourceClass).getConstructor().newInstance(); } // Call initialize method, if it exists try { Method initializeMethod = resource.getClass().getMethod("initialize", CatalogApplicationConfig.class); - LOG.info("Initializing resource {}", resourceClass); initializeMethod.invoke(resource, config); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { // Method does not exist and initialize is not called } return resource; } + + public static class CollectionDetails { + private final String resourceClass; + private final CollectionDescriptor cd; + private final List childCollections = new ArrayList<>(); + + CollectionDetails(CollectionDescriptor cd, String resourceClass) { + this.cd = cd; + this.resourceClass = resourceClass; + } + + public void addChildCollection(CollectionDetails child) { + CollectionInfo collectionInfo = child.cd.getCollection(); + LOG.info( + "Adding child collection {} to parent collection {}", collectionInfo.getName(), cd.getCollection().getName()); + childCollections.add(child.cd); + } + + public CollectionDescriptor[] getChildCollections() { + return childCollections.toArray(new CollectionDescriptor[0]); + } + } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventResource.java index 8ac418e5bad..3f0dc304029 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventResource.java @@ -52,8 +52,10 @@ import org.openmetadata.catalog.util.ResultList; @Collection(name = "events") public class EventResource { private final ChangeEventRepository dao; + private final CatalogAuthorizer authorizer; public static class ChangeEventList extends ResultList { + @SuppressWarnings("unused") /* Required for tests */ public ChangeEventList() {} @@ -67,6 +69,7 @@ public class EventResource { public EventResource(CollectionDAO dao, CatalogAuthorizer authorizer) { Objects.requireNonNull(dao, "ChangeEventRepository must not be null"); this.dao = new ChangeEventRepository(dao); + this.authorizer = authorizer; } @GET diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java new file mode 100644 index 00000000000..df8bed77dc6 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java @@ -0,0 +1,296 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.resources.events; + +import com.google.inject.Inject; +import io.swagger.annotations.Api; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.GeneralSecurityException; +import java.text.ParseException; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import javax.validation.Valid; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; +import javax.ws.rs.core.UriInfo; +import org.openmetadata.catalog.api.events.CreateWebhook; +import org.openmetadata.catalog.jdbi3.CollectionDAO; +import org.openmetadata.catalog.jdbi3.WebhookRepository; +import org.openmetadata.catalog.resources.Collection; +import org.openmetadata.catalog.security.CatalogAuthorizer; +import org.openmetadata.catalog.security.SecurityUtil; +import org.openmetadata.catalog.type.ChangeEvent; +import org.openmetadata.catalog.type.EntityHistory; +import org.openmetadata.catalog.type.Webhook; +import org.openmetadata.catalog.util.EntityUtil.Fields; +import org.openmetadata.catalog.util.RestUtil; +import org.openmetadata.catalog.util.RestUtil.PutResponse; +import org.openmetadata.catalog.util.ResultList; + +@Path("/v1/webhook") +@Api(value = "Webhook resource", tags = "webhook") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Collection(name = "webhook") +public class WebhookResource { + public static final String COLLECTION_PATH = "v1/webhook/"; + private final WebhookRepository dao; + private final CatalogAuthorizer authorizer; + + public static class WebhookList extends ResultList { + + @SuppressWarnings("unused") /* Required for tests */ + public WebhookList() {} + + public WebhookList(List data, String beforeCursor, String afterCursor, int total) + throws GeneralSecurityException, UnsupportedEncodingException { + super(data, beforeCursor, afterCursor, total); + } + } + + @Inject + public WebhookResource(CollectionDAO dao, CatalogAuthorizer authorizer) { + Objects.requireNonNull(dao, "ChangeEventRepository must not be null"); + this.dao = new WebhookRepository(dao); + this.authorizer = authorizer; + } + + @GET + @Operation( + summary = "List webhooks", + tags = "webhook", + description = "Get a list of webhook subscriptions", + responses = { + @ApiResponse( + responseCode = "200", + description = "List of webhooks", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = WebhookList.class))) + }) + public ResultList list( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Limit the number webhooks returned. (1 to 1000000, default = " + "10) ") + @DefaultValue("10") + @Min(1) + @Max(1000000) + @QueryParam("limit") + int limitParam, + @Parameter(description = "Returns list of webhooks before this cursor", schema = @Schema(type = "string")) + @QueryParam("before") + String before, + @Parameter(description = "Returns list of webhooks after this cursor", schema = @Schema(type = "string")) + @QueryParam("after") + String after) + throws IOException, ParseException, GeneralSecurityException { + RestUtil.validateCursors(before, after); + ResultList webhooks; + if (before != null) { // Reverse paging + webhooks = dao.listBefore(uriInfo, Fields.EMPTY_FIELDS, null, limitParam, before); + } else { // Forward paging or first page + webhooks = dao.listAfter(uriInfo, Fields.EMPTY_FIELDS, null, limitParam, after); + } + webhooks.getData().forEach(t -> dao.withHref(uriInfo, t)); + return webhooks; + } + + @GET + @Path("/{id}") + @Valid + @Operation( + summary = "Get a webhook", + tags = "webhook", + description = "Get a webhook by given Id", + responses = { + @ApiResponse( + responseCode = "200", + description = "Entity events", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = ChangeEvent.class))), + @ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found") + }) + public Webhook getWebhook( + @Context UriInfo uriInfo, + @Parameter(description = "webhook Id", schema = @Schema(type = "string")) @PathParam("id") String id) + throws IOException, GeneralSecurityException, ParseException { + return dao.get(uriInfo, id, Fields.EMPTY_FIELDS); + } + + @GET + @Path("/name/{name}") + @Operation( + summary = "Get a webhook by name", + tags = "webhook", + description = "Get a webhook by name.", + responses = { + @ApiResponse( + responseCode = "200", + description = "webhook", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Webhook.class))), + @ApiResponse(responseCode = "404", description = "Webhook for instance {id} is not found") + }) + public Webhook getByName( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Name of the webhook", schema = @Schema(type = "string")) @PathParam("name") String fqn) + throws IOException, ParseException { + return dao.getByName(uriInfo, fqn, Fields.EMPTY_FIELDS); + } + + @GET + @Path("/{id}/versions") + @Operation( + summary = "List webhook versions", + tags = "webhook", + description = "Get a list of all the versions of a webhook identified by `id`", + responses = { + @ApiResponse( + responseCode = "200", + description = "List of webhook versions", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = EntityHistory.class))) + }) + public EntityHistory listVersions( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "webhook Id", schema = @Schema(type = "string")) @PathParam("id") String id) + throws IOException, ParseException { + return dao.listVersions(id); + } + + @GET + @Path("/{id}/versions/{version}") + @Operation( + summary = "Get a version of the webhook", + tags = "webhook", + description = "Get a version of the webhook by given `id`", + responses = { + @ApiResponse( + responseCode = "200", + description = "webhook", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Webhook.class))), + @ApiResponse( + responseCode = "404", + description = "Webhook for instance {id} and version {version} is " + "not found") + }) + public Webhook getVersion( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "webhook Id", schema = @Schema(type = "string")) @PathParam("id") String id, + @Parameter( + description = "webhook version number in the form `major`.`minor`", + schema = @Schema(type = "string", example = "0.1 or 1.1")) + @PathParam("version") + String version) + throws IOException, ParseException { + return dao.getVersion(id, version); + } + + @POST + @Operation( + summary = "Subscribe to a new webhook", + tags = "webhook", + description = "Subscribe to a new webhook", + responses = { + @ApiResponse( + responseCode = "200", + description = "webhook", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Webhook.class))), + @ApiResponse(responseCode = "400", description = "Bad request") + }) + public Response createWebhook( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateWebhook create) + throws IOException { + SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); + Webhook webhook = getWebhook(securityContext, create); + webhook = dao.create(uriInfo, webhook); + dao.addWebhook(webhook); + return Response.created(webhook.getHref()).entity(webhook).build(); + } + + @PUT + @Operation( + summary = "Updated an existing or create a new webhook", + tags = "webhook", + description = "Updated an existing or create a new webhook", + responses = { + @ApiResponse( + responseCode = "200", + description = "webhook", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Webhook.class))), + @ApiResponse(responseCode = "400", description = "Bad request") + }) + public Response updateWebhook( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateWebhook create) + throws IOException, ParseException { + // TODO + // SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); + // Table table = getTable(securityContext, create); + Webhook webhook = getWebhook(securityContext, create); + PutResponse putResponse = dao.createOrUpdate(uriInfo, webhook); + return putResponse.toResponse(); + } + + @DELETE + @Path("/{id}") + @Valid + @Operation( + summary = "Delete a webhook", + tags = "webhook", + description = "Get a webhook by given Id", + responses = { + @ApiResponse( + responseCode = "200", + description = "Entity events", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Webhook.class))), + @ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found") + }) + public Response deleteWebhook( + @Context UriInfo uriInfo, + @Parameter(description = "webhook Id", schema = @Schema(type = "string")) @PathParam("id") String id) + throws IOException, GeneralSecurityException, ParseException { + dao.delete(id); + return Response.ok().build(); + } + + public Webhook getWebhook(SecurityContext securityContext, CreateWebhook create) { + return new Webhook() + .withDescription(create.getDescription()) + .withName(create.getName()) + .withId(UUID.randomUUID()) + .withEndPoint(create.getEndPoint()) + .withEventFilters(create.getEventFilters()) + .withUpdatedBy(securityContext.getUserPrincipal().getName()) + .withUpdatedAt(new Date()); + } +} diff --git a/catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json b/catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json new file mode 100644 index 00000000000..f93bffab18c --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json @@ -0,0 +1,47 @@ +{ + "$id": "https://open-metadata.org/schema/type/createWebhook.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ChangeEvent", + "description": "This schema defines webhook for receiving events from OpenMetadata", + "type": "object", + "properties": { + "name": { + "description": "Unique name of the application receiving webhook events.", + "type": "string", + "minLength": 1, + "maxLength": 128 + }, + "description": { + "description": "Description of the application", + "type": "string" + }, + "endPoint": { + "description": "Endpoint to receive the webhook events over POST requests.", + "type": "string", + "format": "uri" + }, + "batchSize": { + "description": "Maximum number of events sent in a batch (Default 10).", + "type": "integer", + "default": 10 + }, + "eventFilters": { + "description": "Endpoint to receive the webhook events over POST requests.", + "type": "array", + "items": { + "$ref": "../../type/changeEvent.json#/definitions/eventFilter" + } + }, + "timeout": { + "description": "Connection timeout in seconds. (Default = 10s)", + "type": "integer", + "default": 10 + } + }, + "required": [ + "name", + "endPoint", + "eventFilter" + ], + "additionalProperties": false +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json b/catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json new file mode 100644 index 00000000000..bafd66f7635 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json @@ -0,0 +1,112 @@ +{ + "$id": "https://open-metadata.org/schema/type/webhook.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ChangeEvent", + "description": "This schema defines webhook for receiving events from OpenMetadata", + "type": "object", + "javaType": "org.openmetadata.catalog.type.Webhook", + "properties": { + "id": { + "description": "Unique ID associated with a webhook subscription.", + "$ref": "../../type/basic.json#/definitions/uuid" + }, + "name": { + "description": "Unique name of the application receiving webhook events.", + "type": "string", + "minLength": 1, + "maxLength": 128 + }, + "description": { + "description": "Description of the application", + "type": "string" + }, + "endPoint": { + "description": "Endpoint to receive the webhook events over POST requests.", + "type": "string", + "format": "uri" + }, + "eventFilters": { + "description": "Endpoint to receive the webhook events over POST requests.", + "type": "array", + "items": { + "$ref": "../../type/changeEvent.json#/definitions/eventFilter" + } + }, + "batchSize": { + "description": "Maximum number of events sent in a batch (Default 10).", + "type": "integer", + "default": 10 + }, + "timeout": { + "description": "Connection timeout in seconds. (Default 10s)", + "type": "integer", + "default": 10 + }, + "enabled": { + "description": "When set to `true`, the webhook event notification is enabled. Set it to `false` to disable the subscription. (Default `true`)", + "type": "boolean", + "default": true + }, + "version" : { + "description": "Metadata version of the entity.", + "$ref": "../../type/entityHistory.json#/definitions/entityVersion" + }, + "updatedAt": { + "description": "Last update time corresponding to the new version of the entity.", + "$ref": "../../type/basic.json#/definitions/dateTime" + }, + "updatedBy": { + "description": "User who made the update.", + "type": "string" + }, + "status": { + "description": "Status is `success` on 200 response to notifications. Status is `error` on bad callback URL, connection failures, and `1xx`, `3xx` response. Status is `awaitingRetry` when previous attempt at delivery timed out or received `4xx`, `5xx` response. Status is `retryLimitReached` after all retries fail.", + "type": "string", + "enum": [ + "success", + "error", + "awaitingRetry", + "retryLimitReached" + ], + "default": "success" + }, + "failureDetails": { + "description": "Failure details are set only when `status` is not `success`", + "type": "object", + "properties": { + "lastFailedAttempt": { + "description": "Last non-successful callback time", + "$ref": "../../type/basic.json#/definitions/dateTime" + }, + "lastFailedStatusCode": { + "description": "Last non-successful activity response code received during callback", + "type": "integer" + }, + "lastFailedReason": { + "description": "Last non-successful activity response reason received during callback", + "type": "string" + }, + "nextAttempt": { + "description": "Next retry will be done at this time. Only valid is `status` is `awaitingRetry`.", + "$ref": "../../type/basic.json#/definitions/dateTime" + } + }, + "additionalProperties": false + }, + "href": { + "description": "Link to this webhook resource.", + "$ref": "../../type/basic.json#/definitions/href" + }, + "changeDescription": { + "description" : "Change that lead to this version of the entity.", + "$ref": "../../type/entityHistory.json#/definitions/changeDescription" + } + }, + "required": [ + "id", + "name", + "endPoint", + "eventFilter" + ], + "additionalProperties": false +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/json/schema/type/changeEvent.json b/catalog-rest-service/src/main/resources/json/schema/type/changeEvent.json index e6e1ce08b5b..e45f6ae76cc 100644 --- a/catalog-rest-service/src/main/resources/json/schema/type/changeEvent.json +++ b/catalog-rest-service/src/main/resources/json/schema/type/changeEvent.json @@ -1,5 +1,5 @@ { - "$id": "https://open-metadata.org/schema/type/auditLog.json", + "$id": "https://open-metadata.org/schema/type/changeEvent.json", "$schema": "http://json-schema.org/draft-07/schema#", "title": "ChangeEvent", "description": "This schema defines the change event type to capture the changes to entities. Entities change due to user activity, such as updating description of a dataset, changing ownership, or adding new tags. Entity also changes due to activities at the metadata sources, such as a new dataset was created, a datasets was deleted, or schema of a dataset is modified. When state of entity changes, an event is produced. These events can be used to build apps and bots that respond to the change from activities.", diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/CatalogApplicationTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/CatalogApplicationTest.java index 3e4549f92d5..3ad98219573 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/CatalogApplicationTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/CatalogApplicationTest.java @@ -22,7 +22,9 @@ import javax.ws.rs.client.WebTarget; import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.client.HttpUrlConnectorProvider; import org.junit.jupiter.api.extension.ExtendWith; +import org.openmetadata.catalog.resources.CollectionRegistry; import org.openmetadata.catalog.resources.EmbeddedMySqlSupport; +import org.openmetadata.catalog.resources.events.WebhookCallbackResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,11 +33,13 @@ import org.slf4j.LoggerFactory; public abstract class CatalogApplicationTest { public static final Logger LOG = LoggerFactory.getLogger(CatalogApplicationTest.class); - private static final String CONFIG_PATH; + protected static final String CONFIG_PATH; public static final DropwizardAppExtension APP; private static final Client client; + protected static final WebhookCallbackResource webhookCallbackResource = new WebhookCallbackResource(); static { + CollectionRegistry.addTestResource(webhookCallbackResource); CONFIG_PATH = ResourceHelpers.resourceFilePath("openmetadata-secure-test.yaml"); APP = new DropwizardAppExtension<>(CatalogApplication.class, CONFIG_PATH); client = ClientBuilder.newClient(); diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java index f4ceb9fc316..c66ab150e0b 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java @@ -58,6 +58,7 @@ import javax.json.JsonPatch; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response.Status; import org.apache.http.client.HttpResponseException; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -74,11 +75,13 @@ import org.openmetadata.catalog.entity.services.MessagingService; import org.openmetadata.catalog.entity.services.PipelineService; import org.openmetadata.catalog.entity.teams.Team; import org.openmetadata.catalog.entity.teams.User; +import org.openmetadata.catalog.events.EventPubSub; import org.openmetadata.catalog.exception.CatalogExceptionMessage; import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceEntityInterface; import org.openmetadata.catalog.jdbi3.MessagingServiceRepository.MessagingServiceEntityInterface; import org.openmetadata.catalog.jdbi3.PipelineServiceRepository.PipelineServiceEntityInterface; import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList; +import org.openmetadata.catalog.resources.events.WebhookResourceTest; import org.openmetadata.catalog.resources.services.DatabaseServiceResourceTest; import org.openmetadata.catalog.resources.services.MessagingServiceResourceTest; import org.openmetadata.catalog.resources.services.PipelineServiceResourceTest; @@ -153,6 +156,10 @@ public abstract class EntityResourceTest extends CatalogApplicationTest { @BeforeAll public static void setup(TestInfo test) throws URISyntaxException, IOException { + webhookCallbackResource.clearAllEvents(); + WebhookResourceTest webhookResourceTest = new WebhookResourceTest(); + + webhookResourceTest.createWebhooks(); UserResourceTest userResourceTest = new UserResourceTest(); USER1 = UserResourceTest.createUser(userResourceTest.create(test), authHeaders("test@open-metadata.org")); USER_OWNER1 = new EntityReference().withId(USER1.getId()).withType("user"); @@ -236,6 +243,14 @@ public abstract class EntityResourceTest extends CatalogApplicationTest { TIER2_TAG_LABEL = new TagLabel().withTagFQN(tag.getFullyQualifiedName()).withDescription(tag.getDescription()); } + @AfterAll + public static void afterAllTests() throws Exception { + EventPubSub.shutdown(); + // Ensure webhooks are in the right state + new WebhookResourceTest().validateWebhookEvents(); + APP.getEnvironment().getApplicationContext().getServer().stop(); + } + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Methods to be overridden entity test class /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -1003,8 +1018,6 @@ public abstract class EntityResourceTest extends CatalogApplicationTest { iteration++; } - LOG.info("Did not find change event {} {} {}", updateTime.getTime(), entityInterface.getId(), expectedEventType); - assertNotNull( changeEvent, "Expected change event " diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java new file mode 100644 index 00000000000..7871981902c --- /dev/null +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java @@ -0,0 +1,96 @@ +package org.openmetadata.catalog.resources.events; + +import io.swagger.annotations.Api; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; +import javax.ws.rs.core.UriInfo; +import org.openmetadata.catalog.type.ChangeEvent; + +@Path("v1/test/webhook") +@Api(value = "Topic data asset collection", tags = "Topic data asset collection") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class WebhookCallbackResource { + private final ConcurrentLinkedQueue changeEvents = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue changeEventsSlowServer = new ConcurrentLinkedQueue<>(); + + @POST + @Path("/ignore") + public Response receiveEventIgnore( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { + return Response.ok().build(); + } + + @POST + public Response receiveEvent( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { + changeEvents.addAll(events.getData()); + return Response.ok().build(); + } + + @POST + @Path("/slowServer") + public Response receiveEventWithDelay( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + changeEventsSlowServer.addAll(events.getData()); + return Response.ok().build(); + } + + @POST + @Path("/timeout") + public Response receiveEventWithTimeout( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { + try { + Thread.sleep(11 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return Response.ok().build(); + } + + @POST + @Path("/300") + public Response receiveEvent300( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { + return Response.status(Response.Status.MOVED_PERMANENTLY).build(); + } + + @POST + @Path("/400") + public Response receiveEvent400( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { + return Response.status(Response.Status.BAD_REQUEST).build(); + } + + @POST + @Path("/500") + public Response receiveEvent500( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); + } + + public ConcurrentLinkedQueue getEvents() { + return changeEvents; + } + + public ConcurrentLinkedQueue getEventsSlowServer() { + return changeEventsSlowServer; + } + + public void clearAllEvents() { + changeEvents.clear(); + changeEventsSlowServer.clear(); + } +} diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java new file mode 100644 index 00000000000..8862eadd7c2 --- /dev/null +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java @@ -0,0 +1,184 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.resources.events; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.http.client.HttpResponseException; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestInfo; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.api.events.CreateWebhook; +import org.openmetadata.catalog.jdbi3.WebhookRepository.WebhookEntityInterface; +import org.openmetadata.catalog.resources.EntityResourceTest; +import org.openmetadata.catalog.resources.events.WebhookResource.WebhookList; +import org.openmetadata.catalog.type.ChangeEvent; +import org.openmetadata.catalog.type.EntityReference; +import org.openmetadata.catalog.type.EventFilter; +import org.openmetadata.catalog.type.EventType; +import org.openmetadata.catalog.type.Webhook; +import org.openmetadata.catalog.type.Webhook.Status; +import org.openmetadata.catalog.util.EntityInterface; +import org.openmetadata.catalog.util.TestUtils; + +public class WebhookResourceTest extends EntityResourceTest { + public static List ALL_EVENTS_FILTER = new ArrayList<>(); + + static { + ALL_EVENTS_FILTER.add(new EventFilter().withEventType(EventType.ENTITY_CREATED)); + ALL_EVENTS_FILTER.add(new EventFilter().withEventType(EventType.ENTITY_UPDATED)); + ALL_EVENTS_FILTER.add(new EventFilter().withEventType(EventType.ENTITY_DELETED)); + } + + public WebhookResourceTest() { + super(Entity.WEBHOOK, Webhook.class, WebhookList.class, "webhook", "", false, false, false); + supportsPatch = false; + } + + @BeforeAll + public static void setup(TestInfo test) throws IOException, URISyntaxException { + EntityResourceTest.setup(test); + } + + @Override + public CreateWebhook createRequest(String name, String description, String displayName, EntityReference owner) + throws URISyntaxException { + String uri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/ignore"; + return new CreateWebhook() + .withName(name) + .withDescription(description) + .withEventFilters(ALL_EVENTS_FILTER) + .withEndPoint(URI.create(uri)); + } + + @Override + public void validateCreatedEntity(Webhook webhook, Object request, Map authHeaders) + throws HttpResponseException { + CreateWebhook createRequest = (CreateWebhook) request; + validateCommonEntityFields( + getEntityInterface(webhook), createRequest.getDescription(), TestUtils.getPrincipal(authHeaders), null); + assertEquals(createRequest.getName(), webhook.getName()); + assertEquals(createRequest.getEventFilters(), webhook.getEventFilters()); + } + + @Override + public void validateUpdatedEntity(Webhook webhook, Object request, Map authHeaders) + throws HttpResponseException { + validateCreatedEntity(webhook, request, authHeaders); + } + + @Override + public void compareEntities(Webhook expected, Webhook updated, Map authHeaders) + throws HttpResponseException { + // Patch not supported + } + + @Override + public EntityInterface getEntityInterface(Webhook entity) { + return new WebhookEntityInterface(entity); + } + + @Override + public void validateGetWithDifferentFields(Webhook entity, boolean byName) throws HttpResponseException {} + + @Override + public void assertFieldChange(String fieldName, Object expected, Object actual) throws IOException {} + + public void createWebhooks() throws IOException, URISyntaxException { + // Valid webhook callback + String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook"; + CreateWebhook createWebhook = + createRequest("validWebhook", "validWebhook", "", null).withEndPoint(URI.create(baseUri)); + createEntity(createWebhook, adminAuthHeaders()); + + // Webhook callback that responds slowly with 5 seconds delay + createWebhook.withName("slowServer").withEndPoint(URI.create(baseUri + "/slowServer")); + createEntity(createWebhook, adminAuthHeaders()); + + // Webhook callback that responds slowly with after 12 seconds (beyond connection + read response timeout) + createWebhook.withName("callbackTimeout").withEndPoint(URI.create(baseUri + "/timeout")); + createEntity(createWebhook, adminAuthHeaders()); + + // Webhook callback that responds with 300 error + createWebhook.withName("callbackResponse300").withEndPoint(URI.create(baseUri + "/300")); + createEntity(createWebhook, adminAuthHeaders()); + + // Webhook callback that responds with 400 error + createWebhook.withName("callbackResponse400").withEndPoint(URI.create(baseUri + "/400")); + createEntity(createWebhook, adminAuthHeaders()); + + // Webhook callback that responds with 400 error + createWebhook.withName("callbackResponse500").withEndPoint(URI.create(baseUri + "/500")); + createEntity(createWebhook, adminAuthHeaders()); + + // Webhook callback with invalid endpoint URI + createWebhook.withName("invalidEndpoint").withEndPoint(URI.create("http://invalidUnknownHost")); + createEntity(createWebhook, adminAuthHeaders()); + } + + public void validateWebhookEvents() throws HttpResponseException { + // Check the healthy callback server received all the change events + ConcurrentLinkedQueue callbackEvents = webhookCallbackResource.getEvents(); + List actualEvents = + getChangeEvents(null, null, null, callbackEvents.peek().getDateTime(), adminAuthHeaders()).getData(); + assertEquals(actualEvents.size(), callbackEvents.size()); + webhookCallbackResource.clearAllEvents(); + + // TODO enable this test + // Check the slow callback server received all the change events +// callbackEvents = webhookCallbackResource.getEventsSlowServer(); +// actualEvents = getChangeEvents(null, null, null, callbackEvents.peek().getDateTime(), +// adminAuthHeaders()).getData(); +// assertEquals(actualEvents.size() - 1, callbackEvents.size()); +// webhookCallbackResource.clearAllEvents(); + + // Check all webhook status + Webhook webhook = getEntityByName("validWebhook", "", adminAuthHeaders()); + assertEquals(Status.SUCCESS, webhook.getStatus()); + assertNull(webhook.getFailureDetails()); + + webhook = getEntityByName("slowServer", "", adminAuthHeaders()); + assertEquals(Status.SUCCESS, webhook.getStatus()); + assertNull(webhook.getFailureDetails()); + + webhook = getEntityByName("callbackResponse300", "", adminAuthHeaders()); + assertEquals(Status.ERROR, webhook.getStatus()); + assertEquals(301, webhook.getFailureDetails().getLastFailedStatusCode()); + assertEquals("Moved Permanently", webhook.getFailureDetails().getLastFailedReason()); + + webhook = getEntityByName("callbackResponse400", "", adminAuthHeaders()); + assertEquals(Status.AWAITING_RETRY, webhook.getStatus()); + assertEquals(400, webhook.getFailureDetails().getLastFailedStatusCode()); + assertEquals("Bad Request", webhook.getFailureDetails().getLastFailedReason()); + + webhook = getEntityByName("callbackResponse500", "", adminAuthHeaders()); + assertEquals(Status.AWAITING_RETRY, webhook.getStatus()); + assertEquals(500, webhook.getFailureDetails().getLastFailedStatusCode()); + assertEquals("Internal Server Error", webhook.getFailureDetails().getLastFailedReason()); + + webhook = getEntityByName("invalidEndpoint", "", adminAuthHeaders()); + assertEquals(Status.ERROR, webhook.getStatus()); + assertNull(webhook.getFailureDetails().getLastFailedStatusCode()); + assertEquals("UnknownHostException", webhook.getFailureDetails().getLastFailedReason()); + } +} diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/TestUtils.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/TestUtils.java index a42ff8a7dec..7a2e513c680 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/TestUtils.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/TestUtils.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.openmetadata.catalog.security.SecurityUtil.addHeaders; import java.net.URI; import java.net.URISyntaxException; @@ -164,54 +163,58 @@ public final class TestUtils { } public static void post(WebTarget target, Map headers) throws HttpResponseException { - Response response = addHeaders(target, headers).post(null); + Response response = SecurityUtil.addHeaders(target, headers).post(null); readResponse(response, Status.CREATED.getStatusCode()); } public static void post(WebTarget target, K request, Map headers) throws HttpResponseException { - Response response = addHeaders(target, headers).post(Entity.entity(request, MediaType.APPLICATION_JSON)); + Response response = + SecurityUtil.addHeaders(target, headers).post(Entity.entity(request, MediaType.APPLICATION_JSON)); readResponse(response, Status.CREATED.getStatusCode()); } public static T post(WebTarget target, K request, Class clz, Map headers) throws HttpResponseException { - Response response = addHeaders(target, headers).post(Entity.entity(request, MediaType.APPLICATION_JSON)); + Response response = + SecurityUtil.addHeaders(target, headers).post(Entity.entity(request, MediaType.APPLICATION_JSON)); return readResponse(response, clz, Status.CREATED.getStatusCode()); } public static T patch(WebTarget target, JsonPatch patch, Class clz, Map headers) throws HttpResponseException { Response response = - addHeaders(target, headers) + SecurityUtil.addHeaders(target, headers) .method("PATCH", Entity.entity(patch.toJsonArray().toString(), MediaType.APPLICATION_JSON_PATCH_JSON_TYPE)); return readResponse(response, clz, Status.OK.getStatusCode()); } public static void put(WebTarget target, K request, Status expectedStatus, Map headers) throws HttpResponseException { - Response response = addHeaders(target, headers).method("PUT", Entity.entity(request, MediaType.APPLICATION_JSON)); + Response response = + SecurityUtil.addHeaders(target, headers).method("PUT", Entity.entity(request, MediaType.APPLICATION_JSON)); readResponse(response, expectedStatus.getStatusCode()); } public static T put( WebTarget target, K request, Class clz, Status expectedStatus, Map headers) throws HttpResponseException { - Response response = addHeaders(target, headers).method("PUT", Entity.entity(request, MediaType.APPLICATION_JSON)); + Response response = + SecurityUtil.addHeaders(target, headers).method("PUT", Entity.entity(request, MediaType.APPLICATION_JSON)); return readResponse(response, clz, expectedStatus.getStatusCode()); } public static T get(WebTarget target, Class clz, Map headers) throws HttpResponseException { - final Response response = addHeaders(target, headers).get(); + final Response response = SecurityUtil.addHeaders(target, headers).get(); return readResponse(response, clz, Status.OK.getStatusCode()); } public static T delete(WebTarget target, Class clz, Map headers) throws HttpResponseException { - final Response response = addHeaders(target, headers).delete(); + final Response response = SecurityUtil.addHeaders(target, headers).delete(); return readResponse(response, clz, Status.OK.getStatusCode()); } public static void delete(WebTarget target, Map headers) throws HttpResponseException { - final Response response = addHeaders(target, headers).delete(); + final Response response = SecurityUtil.addHeaders(target, headers).delete(); if (!HttpStatus.isSuccess(response.getStatus())) { readResponseError(response); } diff --git a/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java b/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java index db988ac1ac9..a4905a7d03f 100644 --- a/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java +++ b/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java @@ -91,6 +91,14 @@ public final class CommonUtil { return calendar.getTime(); } + /** Get date after {@code days} from the given date or before i{@code days} when it is negative */ + public static Date getDateByOffsetSeconds(Date date, int seconds) throws ParseException { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(date); + calendar.add(Calendar.SECOND, seconds); + return calendar.getTime(); + } + /** Get date after {@code days} from the given date or before i{@code days} when it is negative */ public static Date getDateByOffset(DateFormat dateFormat, String strDate, int days) throws ParseException { Date date = dateFormat.parse(strDate); diff --git a/pom.xml b/pom.xml index e1d74d354ce..f87ddbb42c1 100644 --- a/pom.xml +++ b/pom.xml @@ -378,7 +378,11 @@ jackson-datatype-jsr353 ${jackson.version} - + + com.lmax + disruptor + 3.4.4 + @@ -422,7 +426,7 @@ org.apache.maven.plugins maven-jxr-plugin - 2.3 + 3.1.1 @@ -441,7 +445,7 @@ com.puppycrawl.tools checkstyle - 9.1 + 9.2