From 160806c01339059e90ccecf470bb7e376eb366c6 Mon Sep 17 00:00:00 2001 From: mohitdeuex <105265192+mohitdeuex@users.noreply.github.com> Date: Thu, 28 Jul 2022 23:45:12 +0530 Subject: [PATCH] [Backend][Slack] Slack integration with UI (#6261) * [Backend][Slack] Slack integration with UI * [Backend][Slack] Slack integration with UI - removed unused jdbi * [Backend][Slack] Slack integration with UI - URL fix + init webhook url during initialization * [Backend][Slack] Slack integration with UI - remove slack * [Backend][Slack] add Slack generic to exisiting jsons * [Backend][Slack] PG fix and Java CheckStyle --- .../v004__create_db_connection_info.sql | 3 + .../v004__create_db_connection_info.sql | 3 + .../catalog/CatalogApplication.java | 18 +- .../events/AbstractEventPublisher.java | 20 +- .../catalog/events/EventPublisher.java | 3 +- .../catalog/events/WebhookPublisher.java | 208 ++++++++++++++++ .../catalog/jdbi3/CollectionDAO.java | 3 + .../catalog/jdbi3/EntityRepository.java | 2 +- .../catalog/jdbi3/WebhookRepository.java | 230 +----------------- .../resources/events/WebhookResource.java | 19 +- .../slack/SlackWebhookEventPublisher.java | 44 +--- .../catalog/util/ChangeEventParser.java | 18 +- .../json/schema/api/events/createWebhook.json | 14 ++ .../json/schema/entity/events/webhook.json | 14 ++ 14 files changed, 311 insertions(+), 288 deletions(-) create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/events/WebhookPublisher.java diff --git a/bootstrap/sql/com.mysql.cj.jdbc.Driver/v004__create_db_connection_info.sql b/bootstrap/sql/com.mysql.cj.jdbc.Driver/v004__create_db_connection_info.sql index 5baefefa86b..3fb3000bbdc 100644 --- a/bootstrap/sql/com.mysql.cj.jdbc.Driver/v004__create_db_connection_info.sql +++ b/bootstrap/sql/com.mysql.cj.jdbc.Driver/v004__create_db_connection_info.sql @@ -31,3 +31,6 @@ WHERE serviceType = 'Looker'; UPDATE dashboard_service_entity SET json = JSON_REMOVE(json, '$.connection.config.env') WHERE serviceType = 'Looker'; + +UPDATE webhook_entity +SET json = JSON_INSERT(json, '$.webhookType', 'generic'); diff --git a/bootstrap/sql/org.postgresql.Driver/v004__create_db_connection_info.sql b/bootstrap/sql/org.postgresql.Driver/v004__create_db_connection_info.sql index ffc1bce53e6..397ed7fb958 100644 --- a/bootstrap/sql/org.postgresql.Driver/v004__create_db_connection_info.sql +++ b/bootstrap/sql/org.postgresql.Driver/v004__create_db_connection_info.sql @@ -25,3 +25,6 @@ WHERE serviceType = 'Looker' UPDATE dashboard_service_entity SET json = json::jsonb #- '{connection,config,username}' #- '{connection,config,password}' #- '{connection,config,env}' WHERE serviceType = 'Looker'; + +UPDATE webhook_entity +SET json = JSONB_SET(json::jsonb, '{webhookType}', '"generic"', true); \ No newline at end of file diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java index c29b90d70dd..2f08f580caa 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java @@ -79,8 +79,6 @@ import org.openmetadata.catalog.security.AuthorizerConfiguration; import org.openmetadata.catalog.security.NoopAuthorizer; import org.openmetadata.catalog.security.NoopFilter; import org.openmetadata.catalog.security.jwt.JWTTokenGenerator; -import org.openmetadata.catalog.slack.SlackPublisherConfiguration; -import org.openmetadata.catalog.slack.SlackWebhookEventPublisher; import org.openmetadata.catalog.socket.FeedServlet; import org.openmetadata.catalog.socket.SocketAddressFilter; import org.openmetadata.catalog.socket.WebSocketManager; @@ -148,13 +146,14 @@ public class CatalogApplication extends Application { environment.jersey().register(new EarlyEofExceptionMapper()); environment.jersey().register(JsonMappingExceptionMapper.class); environment.healthChecks().register("OpenMetadataServerHealthCheck", new OpenMetadataServerHealthCheck()); + // start event hub before registering publishers + EventPubSub.start(); + registerResources(catalogConfig, environment, jdbi, secretsManager); // Register Event Handler registerEventFilter(catalogConfig, environment, jdbi); environment.lifecycle().manage(new ManagedShutdown()); - // start event hub before registering publishers - EventPubSub.start(); // Register Event publishers registerEventPublisher(catalogConfig); @@ -252,17 +251,6 @@ public class CatalogApplication extends Application { new ElasticSearchEventPublisher(catalogApplicationConfig.getElasticSearchConfiguration()); EventPubSub.addEventHandler(elasticSearchEventPublisher); } - // register slack Event publishers - if (catalogApplicationConfig.getSlackEventPublishers() != null) { - for (SlackPublisherConfiguration slackPublisherConfiguration : - catalogApplicationConfig.getSlackEventPublishers()) { - if (slackPublisherConfiguration.getWebhookUrl() != null - && !slackPublisherConfiguration.getWebhookUrl().isEmpty()) { - SlackWebhookEventPublisher slackPublisher = new SlackWebhookEventPublisher(slackPublisherConfiguration); - EventPubSub.addEventHandler(slackPublisher); - } - } - } } private void registerResources( diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/AbstractEventPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/AbstractEventPublisher.java index 57abdca2292..58cef2dc634 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/AbstractEventPublisher.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/AbstractEventPublisher.java @@ -13,16 +13,16 @@ import org.openmetadata.catalog.type.EventType; @Slf4j public abstract class AbstractEventPublisher implements EventPublisher { // Backoff timeout in seconds. Delivering events is retried 5 times. - private static final int BACKOFF_NORMAL = 0; - private static final int BACKOFF_3_SECONDS = 3 * 1000; - private static final int BACKOFF_30_SECONDS = 30 * 1000; - private static final int BACKOFF_5_MINUTES = 5 * 60 * 1000; - private static final int BACKOFF_1_HOUR = 60 * 60 * 1000; - private static final int BACKOFF_24_HOUR = 24 * 60 * 60 * 1000; + protected static final int BACKOFF_NORMAL = 0; + protected static final int BACKOFF_3_SECONDS = 3 * 1000; + protected static final int BACKOFF_30_SECONDS = 30 * 1000; + protected static final int BACKOFF_5_MINUTES = 5 * 60 * 1000; + protected static final int BACKOFF_1_HOUR = 60 * 60 * 1000; + protected static final int BACKOFF_24_HOUR = 24 * 60 * 60 * 1000; - private int currentBackoffTime = BACKOFF_NORMAL; - private final List batch = new ArrayList<>(); - private final ConcurrentHashMap> filter = new ConcurrentHashMap<>(); + protected int currentBackoffTime = BACKOFF_NORMAL; + protected final List batch = new ArrayList<>(); + protected final ConcurrentHashMap> filter = new ConcurrentHashMap<>(); private final int batchSize; protected AbstractEventPublisher(int batchSize, List filters) { @@ -63,7 +63,7 @@ public abstract class AbstractEventPublisher implements EventPublisher { } } - private void setNextBackOff() { + protected void setNextBackOff() { if (currentBackoffTime == BACKOFF_NORMAL) { currentBackoffTime = BACKOFF_3_SECONDS; } else if (currentBackoffTime == BACKOFF_3_SECONDS) { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPublisher.java index 6e5aeaa4518..92ee58eb7bf 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPublisher.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPublisher.java @@ -2,10 +2,9 @@ package org.openmetadata.catalog.events; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.LifecycleAware; -import org.openmetadata.catalog.events.errors.EventPublisherException; import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList; public interface EventPublisher extends EventHandler, LifecycleAware { - void publish(ChangeEventList events) throws EventPublisherException; + void publish(ChangeEventList events) throws Exception; } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/WebhookPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/WebhookPublisher.java new file mode 100644 index 00000000000..9b26f6a5abc --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/WebhookPublisher.java @@ -0,0 +1,208 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.events; + +import com.lmax.disruptor.BatchEventProcessor; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.Response; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.catalog.events.errors.EventPublisherException; +import org.openmetadata.catalog.jdbi3.CollectionDAO; +import org.openmetadata.catalog.jdbi3.EntityRepository; +import org.openmetadata.catalog.jdbi3.WebhookRepository; +import org.openmetadata.catalog.jdbi3.WebhookRepository.WebhookUpdater; +import org.openmetadata.catalog.resources.events.EventResource; +import org.openmetadata.catalog.security.SecurityUtil; +import org.openmetadata.catalog.type.FailureDetails; +import org.openmetadata.catalog.type.Webhook; +import org.openmetadata.catalog.util.JsonUtils; +import org.openmetadata.catalog.util.RestUtil; +import org.openmetadata.common.utils.CommonUtil; + +/** + * WebhookPublisher publishes events to the webhook endpoint using POST http requests. There is one instance of + * WebhookPublisher per webhook subscription. Each WebhookPublish is an EventHandler that runs in a separate thread and + * receives events from LMAX Disruptor {@link EventPubSub} through {@link BatchEventProcessor}. + * + *

The failures during callback to Webhook endpoints are handled in this class as follows: + * + *

    + *
  • Webhook with unresolvable URLs are marked as "failed" and no further attempt is made to deliver the events + *
  • Webhook callbacks that return 3xx are marked as "failed" and no further attempt is made to deliver the events + *
  • Webhook callbacks that return 4xx, 5xx, or timeout are marked as "awaitingRetry" and 5 retry attempts are made + * to deliver the events with the following backoff - 3 seconds, 30 seconds, 5 minutes, 1 hours, and 24 hour. When + * all the 5 delivery attempts fail, the webhook state is marked as "retryLimitReached" and no further attempt is + * made to deliver the events. + *
+ */ +@Slf4j +public class WebhookPublisher extends AbstractEventPublisher { + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + private final Webhook webhook; + private BatchEventProcessor processor; + private Client client; + private CollectionDAO daoCollection; + + private WebhookRepository webhookRepository; + + public WebhookPublisher(Webhook webhook, CollectionDAO dao) { + super(webhook.getBatchSize(), webhook.getEventFilters()); + this.webhook = webhook; + this.daoCollection = dao; + this.webhookRepository = new WebhookRepository(dao); + } + + @Override + public void onStart() { + createClient(); + webhook.withFailureDetails(new FailureDetails()); + LOG.info("Webhook-lifecycle-onStart {}", webhook.getName()); + } + + @Override + public void onShutdown() { + currentBackoffTime = BACKOFF_NORMAL; + client.close(); + client = null; + shutdownLatch.countDown(); + LOG.info("Webhook-lifecycle-onShutdown {}", webhook.getName()); + } + + public synchronized Webhook getWebhook() { + return webhook; + } + + public synchronized void updateWebhook(Webhook updatedWebhook) { + currentBackoffTime = BACKOFF_NORMAL; + webhook.setTimeout(updatedWebhook.getTimeout()); + webhook.setBatchSize(updatedWebhook.getBatchSize()); + webhook.setEndpoint(updatedWebhook.getEndpoint()); + webhook.setEventFilters(updatedWebhook.getEventFilters()); + updateFilter(); + createClient(); + } + + private void updateFilter() { + filter.clear(); + webhook.getEventFilters().forEach(f -> filter.put(f.getEventType(), f.getEntities())); + } + + private void setErrorStatus(Long attemptTime, Integer statusCode, String reason) throws IOException { + if (!attemptTime.equals(webhook.getFailureDetails().getLastFailedAt())) { + setStatus(Webhook.Status.FAILED, attemptTime, statusCode, reason, null); + } + throw new RuntimeException(reason); + } + + private void setAwaitingRetry(Long attemptTime, int statusCode, String reason) throws IOException { + if (!attemptTime.equals(webhook.getFailureDetails().getLastFailedAt())) { + setStatus(Webhook.Status.AWAITING_RETRY, attemptTime, statusCode, reason, attemptTime + currentBackoffTime); + } + } + + private void setStatus(Webhook.Status status, Long attemptTime, Integer statusCode, String reason, Long timestamp) + throws IOException { + Webhook stored = daoCollection.webhookDAO().findEntityById(webhook.getId()); + webhook.setStatus(status); + webhook + .getFailureDetails() + .withLastFailedAt(attemptTime) + .withLastFailedStatusCode(statusCode) + .withLastFailedReason(reason) + .withNextAttempt(timestamp); + + // TODO: Fix this + WebhookUpdater updater = webhookRepository.getUpdater(stored, webhook, EntityRepository.Operation.PUT); + updater.update(); + } + + private synchronized void createClient() { + if (client != null) { + client.close(); + client = null; + } + ClientBuilder clientBuilder = ClientBuilder.newBuilder(); + clientBuilder.connectTimeout(10, TimeUnit.SECONDS); + clientBuilder.readTimeout(12, TimeUnit.SECONDS); + client = clientBuilder.build(); + } + + public void awaitShutdown() throws InterruptedException { + LOG.info("Awaiting shutdown webhook-lifecycle {}", webhook.getName()); + shutdownLatch.await(5, TimeUnit.SECONDS); + } + + public void setProcessor(BatchEventProcessor processor) { + this.processor = processor; + } + + public BatchEventProcessor getProcessor() { + return processor; + } + + private Invocation.Builder getTarget() { + Map authHeaders = SecurityUtil.authHeaders("admin@open-metadata.org"); + return SecurityUtil.addHeaders(client.target(webhook.getEndpoint()), authHeaders); + } + + @Override + public void publish(EventResource.ChangeEventList list) throws EventPublisherException, IOException { + long attemptTime = System.currentTimeMillis(); + try { + String json = JsonUtils.pojoToJson(list); + Response response; + if (webhook.getSecretKey() != null && !webhook.getSecretKey().isEmpty()) { + String hmac = "sha256=" + CommonUtil.calculateHMAC(webhook.getSecretKey(), json); + response = getTarget().header(RestUtil.SIGNATURE_HEADER, hmac).post(javax.ws.rs.client.Entity.json(json)); + } else { + response = getTarget().post(javax.ws.rs.client.Entity.json(json)); + } + LOG.info( + "Webhook {}:{}:{} received response {}", + webhook.getName(), + webhook.getStatus(), + batch.size(), + response.getStatusInfo()); + // 2xx response means call back is successful + if (response.getStatus() >= 200 && response.getStatus() < 300) { // All 2xx responses + webhook.getFailureDetails().setLastSuccessfulAt(batch.get(batch.size() - 1).getTimestamp()); + batch.clear(); + if (webhook.getStatus() != Webhook.Status.ACTIVE) { + setStatus(Webhook.Status.ACTIVE, null, null, null, null); + } + // 3xx response/redirection is not allowed for callback. Set the webhook state as in error + } else if (response.getStatus() >= 300 && response.getStatus() < 400) { + setErrorStatus(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase()); + // 4xx, 5xx response retry delivering events after timeout + } else if (response.getStatus() >= 300 && response.getStatus() < 600) { + setNextBackOff(); + setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase()); + Thread.sleep(currentBackoffTime); + } + } catch (Exception ex) { + Throwable cause = ex.getCause(); + if (cause.getClass() == UnknownHostException.class) { + LOG.warn("Invalid webhook {} endpoint {}", webhook.getName(), webhook.getEndpoint()); + setErrorStatus(attemptTime, null, "UnknownHostException"); + } + } + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java index 48d07d3c5af..e49cf086cac 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java @@ -1376,6 +1376,9 @@ public interface CollectionDAO { default boolean supportsSoftDelete() { return false; } + + @SqlQuery("SELECT json FROM ") + List listAllWebhooks(@Define("table") String table); } interface TagCategoryDAO extends EntityDAO { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java index 07ca70452f6..4d03ddfce01 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java @@ -1092,7 +1092,7 @@ public abstract class EntityRepository { return ingestionPipelines; } - enum Operation { + public enum Operation { PUT, PATCH, SOFT_DELETE; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java index 6a17311dac0..9126363b7f6 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java @@ -18,40 +18,23 @@ import static org.openmetadata.catalog.util.EntityUtil.failureDetailsMatch; import com.fasterxml.jackson.core.JsonProcessingException; import com.lmax.disruptor.BatchEventProcessor; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.LifecycleAware; import java.io.IOException; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import javax.ws.rs.ProcessingException; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Invocation.Builder; -import javax.ws.rs.core.Response; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.events.EventPubSub; import org.openmetadata.catalog.events.EventPubSub.ChangeEventHolder; -import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList; +import org.openmetadata.catalog.events.WebhookPublisher; import org.openmetadata.catalog.resources.events.WebhookResource; -import org.openmetadata.catalog.security.SecurityUtil; -import org.openmetadata.catalog.type.ChangeEvent; +import org.openmetadata.catalog.slack.SlackWebhookEventPublisher; import org.openmetadata.catalog.type.EventFilter; -import org.openmetadata.catalog.type.EventType; -import org.openmetadata.catalog.type.FailureDetails; import org.openmetadata.catalog.type.Webhook; import org.openmetadata.catalog.type.Webhook.Status; import org.openmetadata.catalog.util.EntityUtil.Fields; -import org.openmetadata.catalog.util.JsonUtils; -import org.openmetadata.catalog.util.RestUtil; -import org.openmetadata.common.utils.CommonUtil; @Slf4j public class WebhookRepository extends EntityRepository { @@ -101,7 +84,13 @@ public class WebhookRepository extends EntityRepository { webhook.setStatus(Status.DISABLED); return; } - WebhookPublisher publisher = new WebhookPublisher(webhook); + + WebhookPublisher publisher; + if (webhook.getWebhookType() == Webhook.WebhookType.slack) { + publisher = new SlackWebhookEventPublisher(webhook, daoCollection); + } else { + publisher = new WebhookPublisher(webhook, daoCollection); + } BatchEventProcessor processor = EventPubSub.addEventHandler(publisher); publisher.setProcessor(processor); webhookPublisherMap.put(webhook.getId(), publisher); @@ -145,207 +134,6 @@ public class WebhookRepository extends EntityRepository { webhookPublisherMap.remove(id); } - /** - * WebhookPublisher publishes events to the webhook endpoint using POST http requests. There is one instance of - * WebhookPublisher per webhook subscription. Each WebhookPublish is an EventHandler that runs in a separate thread - * and receives events from LMAX Disruptor {@link EventPubSub} through {@link BatchEventProcessor}. - * - *

The failures during callback to Webhook endpoints are handled in this class as follows: - * - *

    - *
  • Webhook with unresolvable URLs are marked as "failed" and no further attempt is made to deliver the events - *
  • Webhook callbacks that return 3xx are marked as "failed" and no further attempt is made to deliver the events - *
  • Webhook callbacks that return 4xx, 5xx, or timeout are marked as "awaitingRetry" and 5 retry attempts are - * made to deliver the events with the following backoff - 3 seconds, 30 seconds, 5 minutes, 1 hours, and 24 - * hour. When all the 5 delivery attempts fail, the webhook state is marked as "retryLimitReached" and no - * further attempt is made to deliver the events. - *
- */ - public class WebhookPublisher implements EventHandler, LifecycleAware { - // Backoff timeout in seconds. Delivering events is retried 5 times. - private static final int BACKOFF_NORMAL = 0; - private static final int BACKOFF_3_SECONDS = 3 * 1000; - private static final int BACKOFF_30_SECONDS = 30 * 1000; - private static final int BACKOFF_5_MINUTES = 5 * 60 * 1000; - private static final int BACKOFF_1_HOUR = 60 * 60 * 1000; - private static final int BACKOFF_24_HOUR = 24 * 60 * 60 * 1000; - - private int currentBackoffTime = BACKOFF_NORMAL; - private final CountDownLatch shutdownLatch = new CountDownLatch(1); - private final Webhook webhook; - private final List batch = new ArrayList<>(); - private BatchEventProcessor processor; - private Client client; - private final ConcurrentHashMap> filter = new ConcurrentHashMap<>(); - - public WebhookPublisher(Webhook webhook) { - this.webhook = webhook; - initFilter(); - } - - @Override - public void onStart() { - createClient(); - webhook.withFailureDetails(new FailureDetails()); - LOG.info("Webhook-lifecycle-onStart {}", webhook.getName()); - } - - @Override - public void onEvent(ChangeEventHolder changeEventHolder, long sequence, boolean endOfBatch) throws Exception { - // Ignore events that don't match the webhook event filters - ChangeEvent changeEvent = changeEventHolder.get(); - List entities = filter.get(changeEvent.getEventType()); - if (entities == null || (!entities.get(0).equals("*") && !entities.contains(changeEvent.getEntityType()))) { - return; - } - - // Batch until either the batch has ended or batch size has reached the max size - batch.add(changeEventHolder.get()); - if (!endOfBatch && batch.size() < webhook.getBatchSize()) { - return; - } - - ChangeEventList list = new ChangeEventList(batch, null, null, batch.size()); - long attemptTime = System.currentTimeMillis(); - try { - String json = JsonUtils.pojoToJson(list); - Response response; - if (webhook.getSecretKey() != null && !webhook.getSecretKey().isEmpty()) { - String hmac = "sha256=" + CommonUtil.calculateHMAC(webhook.getSecretKey(), json); - response = getTarget().header(RestUtil.SIGNATURE_HEADER, hmac).post(javax.ws.rs.client.Entity.json(json)); - } else { - response = getTarget().post(javax.ws.rs.client.Entity.json(json)); - } - LOG.info( - "Webhook {}:{}:{} received response {}", - webhook.getName(), - webhook.getStatus(), - batch.size(), - response.getStatusInfo()); - // 2xx response means call back is successful - if (response.getStatus() >= 200 && response.getStatus() < 300) { // All 2xx responses - batch.clear(); - webhook.getFailureDetails().setLastSuccessfulAt(changeEventHolder.get().getTimestamp()); - if (webhook.getStatus() != Status.ACTIVE) { - setStatus(Status.ACTIVE, null, null, null, null); - } - // 3xx response/redirection is not allowed for callback. Set the webhook state as in error - } else if (response.getStatus() >= 300 && response.getStatus() < 400) { - setErrorStatus(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase()); - // 4xx, 5xx response retry delivering events after timeout - } else if (response.getStatus() >= 300 && response.getStatus() < 600) { - setNextBackOff(); - setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase()); - Thread.sleep(currentBackoffTime); - } - } catch (ProcessingException ex) { - Throwable cause = ex.getCause(); - if (cause.getClass() == UnknownHostException.class) { - LOG.warn("Invalid webhook {} endpoint {}", webhook.getName(), webhook.getEndpoint()); - setErrorStatus(attemptTime, null, "UnknownHostException"); - } - } - } - - @Override - public void onShutdown() { - currentBackoffTime = BACKOFF_NORMAL; - client.close(); - client = null; - shutdownLatch.countDown(); - LOG.info("Webhook-lifecycle-onShutdown {}", webhook.getName()); - } - - public synchronized Webhook getWebhook() { - return webhook; - } - - public synchronized void updateWebhook(Webhook updatedWebhook) { - currentBackoffTime = BACKOFF_NORMAL; - webhook.setTimeout(updatedWebhook.getTimeout()); - webhook.setBatchSize(updatedWebhook.getBatchSize()); - webhook.setEndpoint(updatedWebhook.getEndpoint()); - webhook.setEventFilters(updatedWebhook.getEventFilters()); - initFilter(); - createClient(); - } - - private void initFilter() { - filter.clear(); - webhook.getEventFilters().forEach(f -> filter.put(f.getEventType(), f.getEntities())); - } - - private void setErrorStatus(Long attemptTime, Integer statusCode, String reason) throws IOException { - if (!attemptTime.equals(webhook.getFailureDetails().getLastFailedAt())) { - setStatus(Status.FAILED, attemptTime, statusCode, reason, null); - } - throw new RuntimeException(reason); - } - - private void setAwaitingRetry(Long attemptTime, int statusCode, String reason) throws IOException { - if (!attemptTime.equals(webhook.getFailureDetails().getLastFailedAt())) { - setStatus(Status.AWAITING_RETRY, attemptTime, statusCode, reason, attemptTime + currentBackoffTime); - } - } - - private void setStatus(Status status, Long attemptTime, Integer statusCode, String reason, Long timestamp) - throws IOException { - Webhook stored = daoCollection.webhookDAO().findEntityById(webhook.getId()); - webhook.setStatus(status); - webhook - .getFailureDetails() - .withLastFailedAt(attemptTime) - .withLastFailedStatusCode(statusCode) - .withLastFailedReason(reason) - .withNextAttempt(timestamp); - WebhookUpdater updater = new WebhookUpdater(stored, webhook, Operation.PUT); - updater.update(); - } - - private synchronized void createClient() { - if (client != null) { - client.close(); - client = null; - } - ClientBuilder clientBuilder = ClientBuilder.newBuilder(); - clientBuilder.connectTimeout(10, TimeUnit.SECONDS); - clientBuilder.readTimeout(12, TimeUnit.SECONDS); - client = clientBuilder.build(); - } - - private void awaitShutdown() throws InterruptedException { - LOG.info("Awaiting shutdown webhook-lifecycle {}", webhook.getName()); - shutdownLatch.await(5, TimeUnit.SECONDS); - } - - public void setProcessor(BatchEventProcessor processor) { - this.processor = processor; - } - - public BatchEventProcessor getProcessor() { - return processor; - } - - private void setNextBackOff() { - if (currentBackoffTime == BACKOFF_NORMAL) { - currentBackoffTime = BACKOFF_3_SECONDS; - } else if (currentBackoffTime == BACKOFF_3_SECONDS) { - currentBackoffTime = BACKOFF_30_SECONDS; - } else if (currentBackoffTime == BACKOFF_30_SECONDS) { - currentBackoffTime = BACKOFF_5_MINUTES; - } else if (currentBackoffTime == BACKOFF_5_MINUTES) { - currentBackoffTime = BACKOFF_1_HOUR; - } else if (currentBackoffTime == BACKOFF_1_HOUR) { - currentBackoffTime = BACKOFF_24_HOUR; - } - } - - private Builder getTarget() { - Map authHeaders = SecurityUtil.authHeaders("admin@open-metadata.org"); - return SecurityUtil.addHeaders(client.target(webhook.getEndpoint()), authHeaders); - } - } - public class WebhookUpdater extends EntityUpdater { public WebhookUpdater(Webhook original, Webhook updated, Operation operation) { super(original, updated, operation); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java index 6ba439a289a..ee5318d3957 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java @@ -45,8 +45,10 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.SecurityContext; import javax.ws.rs.core.UriInfo; +import org.openmetadata.catalog.CatalogApplicationConfig; import org.openmetadata.catalog.api.events.CreateWebhook; import org.openmetadata.catalog.jdbi3.CollectionDAO; +import org.openmetadata.catalog.jdbi3.CollectionDAO.WebhookDAO; import org.openmetadata.catalog.jdbi3.ListFilter; import org.openmetadata.catalog.jdbi3.WebhookRepository; import org.openmetadata.catalog.resources.Collection; @@ -57,6 +59,7 @@ import org.openmetadata.catalog.type.Include; import org.openmetadata.catalog.type.Webhook; import org.openmetadata.catalog.type.Webhook.Status; import org.openmetadata.catalog.util.EntityUtil; +import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.ResultList; @Path("/v1/webhook") @@ -66,6 +69,7 @@ import org.openmetadata.catalog.util.ResultList; @Collection(name = "webhook") public class WebhookResource extends EntityResource { public static final String COLLECTION_PATH = "v1/webhook/"; + private WebhookDAO webhookDAO; @Override public Webhook addHref(UriInfo uriInfo, Webhook entity) { @@ -84,6 +88,18 @@ public class WebhookResource extends EntityResource public WebhookResource(CollectionDAO dao, Authorizer authorizer) { super(Webhook.class, new WebhookRepository(dao), authorizer); + webhookDAO = dao.webhookDAO(); + } + + @SuppressWarnings("unused") // Method used for reflection + public void initialize(CatalogApplicationConfig config) throws IOException { + try { + List listAllWebhooks = webhookDAO.listAllWebhooks(webhookDAO.getTableName()); + List webhookList = JsonUtils.readObjects(listAllWebhooks, Webhook.class); + webhookList.forEach(dao::addWebhookPublisher); + } catch (Exception ex) { + // Starting application should not fail + } } @GET @@ -339,6 +355,7 @@ public class WebhookResource extends EntityResource .withTimeout(create.getTimeout()) .withEnabled(create.getEnabled()) .withSecretKey(create.getSecretKey()) - .withStatus(Boolean.TRUE.equals(create.getEnabled()) ? Status.ACTIVE : Status.DISABLED); + .withStatus(Boolean.TRUE.equals(create.getEnabled()) ? Status.ACTIVE : Status.DISABLED) + .withWebhookType(Webhook.WebhookType.fromValue(create.getWebhookType().value())); } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/slack/SlackWebhookEventPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/slack/SlackWebhookEventPublisher.java index 62db101a6c3..08770600251 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/slack/SlackWebhookEventPublisher.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/slack/SlackWebhookEventPublisher.java @@ -1,8 +1,5 @@ package org.openmetadata.catalog.slack; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; @@ -10,27 +7,27 @@ import javax.ws.rs.client.Invocation; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; -import org.openmetadata.catalog.events.AbstractEventPublisher; +import org.openmetadata.catalog.events.WebhookPublisher; import org.openmetadata.catalog.events.errors.EventPublisherException; +import org.openmetadata.catalog.jdbi3.CollectionDAO; import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList; import org.openmetadata.catalog.type.ChangeEvent; +import org.openmetadata.catalog.type.Webhook; import org.openmetadata.catalog.util.ChangeEventParser; @Slf4j -public class SlackWebhookEventPublisher extends AbstractEventPublisher { +public class SlackWebhookEventPublisher extends WebhookPublisher { private final Invocation.Builder target; private final Client client; - private final String openMetadataUrl; - public SlackWebhookEventPublisher(SlackPublisherConfiguration config) { - super(config.getBatchSize(), config.getFilters()); - String slackWebhookURL = config.getWebhookUrl(); + public SlackWebhookEventPublisher(Webhook webhook, CollectionDAO dao) { + super(webhook, dao); + String slackWebhookURL = webhook.getEndpoint().toString(); ClientBuilder clientBuilder = ClientBuilder.newBuilder(); clientBuilder.connectTimeout(10, TimeUnit.SECONDS); clientBuilder.readTimeout(12, TimeUnit.SECONDS); client = clientBuilder.build(); target = client.target(slackWebhookURL).request(); - openMetadataUrl = refineUri(config.getOpenMetadataUrl()); } @Override @@ -49,7 +46,7 @@ public class SlackWebhookEventPublisher extends AbstractEventPublisher { public void publish(ChangeEventList events) throws EventPublisherException { for (ChangeEvent event : events.getData()) { try { - SlackMessage slackMessage = ChangeEventParser.buildSlackMessage(event, getEntityUrl(event)); + SlackMessage slackMessage = ChangeEventParser.buildSlackMessage(event); Response response = target.post(javax.ws.rs.client.Entity.entity(slackMessage, MediaType.APPLICATION_JSON_TYPE)); if (response.getStatus() >= 300 && response.getStatus() < 400) { @@ -63,29 +60,4 @@ public class SlackWebhookEventPublisher extends AbstractEventPublisher { } } } - - private String getEntityUrl(ChangeEvent event) { - return String.format( - "<%s/%s/%s|%s>", - openMetadataUrl, - event.getEntityType(), - event.getEntityFullyQualifiedName(), - event.getEntityFullyQualifiedName()); - } - - private String refineUri(String url) { - URI urlInstance = null; - try { - urlInstance = new URI(url); - } catch (URISyntaxException e) { - LOG.error("Slack URL is not in url format - {}", url); - } - - if (Objects.nonNull(urlInstance)) { - String scheme = urlInstance.getScheme(); - String host = urlInstance.getHost(); - return String.format("%s://%s", scheme, host); - } - return url; - } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/ChangeEventParser.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/ChangeEventParser.java index 980b256115d..2c3b74ef70c 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/ChangeEventParser.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/ChangeEventParser.java @@ -20,11 +20,13 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import com.github.difflib.text.DiffRow; import com.github.difflib.text.DiffRowGenerator; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -69,12 +71,24 @@ public final class ChangeEventParser { SLACK } - public static SlackMessage buildSlackMessage(ChangeEvent event, String omdurl) { + public static String getEntityUrl(ChangeEvent event) { + EntityInterface entity = (EntityInterface) event.getEntity(); + URI urlInstance = entity.getHref(); + String fqn = event.getEntityFullyQualifiedName(); + if (Objects.nonNull(urlInstance)) { + String scheme = urlInstance.getScheme(); + String host = urlInstance.getHost(); + return String.format("<%s://%s/%s/%s|%s>", scheme, host, event.getEntityType(), fqn, fqn); + } + return urlInstance.toString(); + } + + public static SlackMessage buildSlackMessage(ChangeEvent event) { SlackMessage slackMessage = new SlackMessage(); slackMessage.setUsername(event.getUserName()); if (event.getEntity() != null) { String headerTxt = "%s posted on " + event.getEntityType() + " %s"; - String headerText = String.format(headerTxt, event.getUserName(), omdurl); + String headerText = String.format(headerTxt, event.getUserName(), getEntityUrl(event)); slackMessage.setText(headerText); } Map messages = diff --git a/catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json b/catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json index 35ed4efb7d9..24e284dabc1 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json @@ -50,6 +50,20 @@ "secretKey": { "description": "Secret set by the webhook client used for computing HMAC SHA256 signature of webhook payload and sent in `X-OM-Signature` header in POST requests to publish the events.", "type": "string" + }, + "webhookType": { + "description": "Type of webhook slack,generic etc", + "type": "string", + "default": "generic", + "enum": ["slack", "generic"], + "javaEnums": [ + { + "name": "slack" + }, + { + "name": "generic" + } + ] } }, "required": ["name", "endpoint", "eventFilters"], diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json b/catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json index 754e45173b4..e6907fb4617 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json @@ -23,6 +23,20 @@ "description": "Display Name that identifies this webhook.", "type": "string" }, + "webhookType": { + "description": "Type of webhook slack,generic etc", + "type": "string", + "default": "generic", + "enum": ["slack", "generic"], + "javaEnums": [ + { + "name": "slack" + }, + { + "name": "generic" + } + ] + }, "description": { "description": "Description of the application.", "$ref": "../../type/basic.json#/definitions/markdown"