mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-04 21:32:16 +00:00
[Backend][Slack] Slack integration with UI (#6261)
* [Backend][Slack] Slack integration with UI * [Backend][Slack] Slack integration with UI - removed unused jdbi * [Backend][Slack] Slack integration with UI - URL fix + init webhook url during initialization * [Backend][Slack] Slack integration with UI - remove slack * [Backend][Slack] add Slack generic to exisiting jsons * [Backend][Slack] PG fix and Java CheckStyle
This commit is contained in:
parent
3522a6ed67
commit
160806c013
@ -31,3 +31,6 @@ WHERE serviceType = 'Looker';
|
||||
UPDATE dashboard_service_entity
|
||||
SET json = JSON_REMOVE(json, '$.connection.config.env')
|
||||
WHERE serviceType = 'Looker';
|
||||
|
||||
UPDATE webhook_entity
|
||||
SET json = JSON_INSERT(json, '$.webhookType', 'generic');
|
||||
|
@ -25,3 +25,6 @@ WHERE serviceType = 'Looker'
|
||||
UPDATE dashboard_service_entity
|
||||
SET json = json::jsonb #- '{connection,config,username}' #- '{connection,config,password}' #- '{connection,config,env}'
|
||||
WHERE serviceType = 'Looker';
|
||||
|
||||
UPDATE webhook_entity
|
||||
SET json = JSONB_SET(json::jsonb, '{webhookType}', '"generic"', true);
|
@ -79,8 +79,6 @@ import org.openmetadata.catalog.security.AuthorizerConfiguration;
|
||||
import org.openmetadata.catalog.security.NoopAuthorizer;
|
||||
import org.openmetadata.catalog.security.NoopFilter;
|
||||
import org.openmetadata.catalog.security.jwt.JWTTokenGenerator;
|
||||
import org.openmetadata.catalog.slack.SlackPublisherConfiguration;
|
||||
import org.openmetadata.catalog.slack.SlackWebhookEventPublisher;
|
||||
import org.openmetadata.catalog.socket.FeedServlet;
|
||||
import org.openmetadata.catalog.socket.SocketAddressFilter;
|
||||
import org.openmetadata.catalog.socket.WebSocketManager;
|
||||
@ -148,13 +146,14 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
|
||||
environment.jersey().register(new EarlyEofExceptionMapper());
|
||||
environment.jersey().register(JsonMappingExceptionMapper.class);
|
||||
environment.healthChecks().register("OpenMetadataServerHealthCheck", new OpenMetadataServerHealthCheck());
|
||||
// start event hub before registering publishers
|
||||
EventPubSub.start();
|
||||
|
||||
registerResources(catalogConfig, environment, jdbi, secretsManager);
|
||||
|
||||
// Register Event Handler
|
||||
registerEventFilter(catalogConfig, environment, jdbi);
|
||||
environment.lifecycle().manage(new ManagedShutdown());
|
||||
// start event hub before registering publishers
|
||||
EventPubSub.start();
|
||||
// Register Event publishers
|
||||
registerEventPublisher(catalogConfig);
|
||||
|
||||
@ -252,17 +251,6 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
|
||||
new ElasticSearchEventPublisher(catalogApplicationConfig.getElasticSearchConfiguration());
|
||||
EventPubSub.addEventHandler(elasticSearchEventPublisher);
|
||||
}
|
||||
// register slack Event publishers
|
||||
if (catalogApplicationConfig.getSlackEventPublishers() != null) {
|
||||
for (SlackPublisherConfiguration slackPublisherConfiguration :
|
||||
catalogApplicationConfig.getSlackEventPublishers()) {
|
||||
if (slackPublisherConfiguration.getWebhookUrl() != null
|
||||
&& !slackPublisherConfiguration.getWebhookUrl().isEmpty()) {
|
||||
SlackWebhookEventPublisher slackPublisher = new SlackWebhookEventPublisher(slackPublisherConfiguration);
|
||||
EventPubSub.addEventHandler(slackPublisher);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void registerResources(
|
||||
|
@ -13,16 +13,16 @@ import org.openmetadata.catalog.type.EventType;
|
||||
@Slf4j
|
||||
public abstract class AbstractEventPublisher implements EventPublisher {
|
||||
// Backoff timeout in seconds. Delivering events is retried 5 times.
|
||||
private static final int BACKOFF_NORMAL = 0;
|
||||
private static final int BACKOFF_3_SECONDS = 3 * 1000;
|
||||
private static final int BACKOFF_30_SECONDS = 30 * 1000;
|
||||
private static final int BACKOFF_5_MINUTES = 5 * 60 * 1000;
|
||||
private static final int BACKOFF_1_HOUR = 60 * 60 * 1000;
|
||||
private static final int BACKOFF_24_HOUR = 24 * 60 * 60 * 1000;
|
||||
protected static final int BACKOFF_NORMAL = 0;
|
||||
protected static final int BACKOFF_3_SECONDS = 3 * 1000;
|
||||
protected static final int BACKOFF_30_SECONDS = 30 * 1000;
|
||||
protected static final int BACKOFF_5_MINUTES = 5 * 60 * 1000;
|
||||
protected static final int BACKOFF_1_HOUR = 60 * 60 * 1000;
|
||||
protected static final int BACKOFF_24_HOUR = 24 * 60 * 60 * 1000;
|
||||
|
||||
private int currentBackoffTime = BACKOFF_NORMAL;
|
||||
private final List<ChangeEvent> batch = new ArrayList<>();
|
||||
private final ConcurrentHashMap<EventType, List<String>> filter = new ConcurrentHashMap<>();
|
||||
protected int currentBackoffTime = BACKOFF_NORMAL;
|
||||
protected final List<ChangeEvent> batch = new ArrayList<>();
|
||||
protected final ConcurrentHashMap<EventType, List<String>> filter = new ConcurrentHashMap<>();
|
||||
private final int batchSize;
|
||||
|
||||
protected AbstractEventPublisher(int batchSize, List<EventFilter> filters) {
|
||||
@ -63,7 +63,7 @@ public abstract class AbstractEventPublisher implements EventPublisher {
|
||||
}
|
||||
}
|
||||
|
||||
private void setNextBackOff() {
|
||||
protected void setNextBackOff() {
|
||||
if (currentBackoffTime == BACKOFF_NORMAL) {
|
||||
currentBackoffTime = BACKOFF_3_SECONDS;
|
||||
} else if (currentBackoffTime == BACKOFF_3_SECONDS) {
|
||||
|
@ -2,10 +2,9 @@ package org.openmetadata.catalog.events;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import com.lmax.disruptor.LifecycleAware;
|
||||
import org.openmetadata.catalog.events.errors.EventPublisherException;
|
||||
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
|
||||
|
||||
public interface EventPublisher extends EventHandler<EventPubSub.ChangeEventHolder>, LifecycleAware {
|
||||
|
||||
void publish(ChangeEventList events) throws EventPublisherException;
|
||||
void publish(ChangeEventList events) throws Exception;
|
||||
}
|
||||
|
@ -0,0 +1,208 @@
|
||||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.catalog.events;
|
||||
|
||||
import com.lmax.disruptor.BatchEventProcessor;
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.ws.rs.client.Client;
|
||||
import javax.ws.rs.client.ClientBuilder;
|
||||
import javax.ws.rs.client.Invocation;
|
||||
import javax.ws.rs.core.Response;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.catalog.events.errors.EventPublisherException;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.catalog.jdbi3.EntityRepository;
|
||||
import org.openmetadata.catalog.jdbi3.WebhookRepository;
|
||||
import org.openmetadata.catalog.jdbi3.WebhookRepository.WebhookUpdater;
|
||||
import org.openmetadata.catalog.resources.events.EventResource;
|
||||
import org.openmetadata.catalog.security.SecurityUtil;
|
||||
import org.openmetadata.catalog.type.FailureDetails;
|
||||
import org.openmetadata.catalog.type.Webhook;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import org.openmetadata.catalog.util.RestUtil;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
|
||||
/**
|
||||
* WebhookPublisher publishes events to the webhook endpoint using POST http requests. There is one instance of
|
||||
* WebhookPublisher per webhook subscription. Each WebhookPublish is an EventHandler that runs in a separate thread and
|
||||
* receives events from LMAX Disruptor {@link EventPubSub} through {@link BatchEventProcessor}.
|
||||
*
|
||||
* <p>The failures during callback to Webhook endpoints are handled in this class as follows:
|
||||
*
|
||||
* <ul>
|
||||
* <li>Webhook with unresolvable URLs are marked as "failed" and no further attempt is made to deliver the events
|
||||
* <li>Webhook callbacks that return 3xx are marked as "failed" and no further attempt is made to deliver the events
|
||||
* <li>Webhook callbacks that return 4xx, 5xx, or timeout are marked as "awaitingRetry" and 5 retry attempts are made
|
||||
* to deliver the events with the following backoff - 3 seconds, 30 seconds, 5 minutes, 1 hours, and 24 hour. When
|
||||
* all the 5 delivery attempts fail, the webhook state is marked as "retryLimitReached" and no further attempt is
|
||||
* made to deliver the events.
|
||||
* </ul>
|
||||
*/
|
||||
@Slf4j
|
||||
public class WebhookPublisher extends AbstractEventPublisher {
|
||||
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
|
||||
private final Webhook webhook;
|
||||
private BatchEventProcessor<EventPubSub.ChangeEventHolder> processor;
|
||||
private Client client;
|
||||
private CollectionDAO daoCollection;
|
||||
|
||||
private WebhookRepository webhookRepository;
|
||||
|
||||
public WebhookPublisher(Webhook webhook, CollectionDAO dao) {
|
||||
super(webhook.getBatchSize(), webhook.getEventFilters());
|
||||
this.webhook = webhook;
|
||||
this.daoCollection = dao;
|
||||
this.webhookRepository = new WebhookRepository(dao);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStart() {
|
||||
createClient();
|
||||
webhook.withFailureDetails(new FailureDetails());
|
||||
LOG.info("Webhook-lifecycle-onStart {}", webhook.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onShutdown() {
|
||||
currentBackoffTime = BACKOFF_NORMAL;
|
||||
client.close();
|
||||
client = null;
|
||||
shutdownLatch.countDown();
|
||||
LOG.info("Webhook-lifecycle-onShutdown {}", webhook.getName());
|
||||
}
|
||||
|
||||
public synchronized Webhook getWebhook() {
|
||||
return webhook;
|
||||
}
|
||||
|
||||
public synchronized void updateWebhook(Webhook updatedWebhook) {
|
||||
currentBackoffTime = BACKOFF_NORMAL;
|
||||
webhook.setTimeout(updatedWebhook.getTimeout());
|
||||
webhook.setBatchSize(updatedWebhook.getBatchSize());
|
||||
webhook.setEndpoint(updatedWebhook.getEndpoint());
|
||||
webhook.setEventFilters(updatedWebhook.getEventFilters());
|
||||
updateFilter();
|
||||
createClient();
|
||||
}
|
||||
|
||||
private void updateFilter() {
|
||||
filter.clear();
|
||||
webhook.getEventFilters().forEach(f -> filter.put(f.getEventType(), f.getEntities()));
|
||||
}
|
||||
|
||||
private void setErrorStatus(Long attemptTime, Integer statusCode, String reason) throws IOException {
|
||||
if (!attemptTime.equals(webhook.getFailureDetails().getLastFailedAt())) {
|
||||
setStatus(Webhook.Status.FAILED, attemptTime, statusCode, reason, null);
|
||||
}
|
||||
throw new RuntimeException(reason);
|
||||
}
|
||||
|
||||
private void setAwaitingRetry(Long attemptTime, int statusCode, String reason) throws IOException {
|
||||
if (!attemptTime.equals(webhook.getFailureDetails().getLastFailedAt())) {
|
||||
setStatus(Webhook.Status.AWAITING_RETRY, attemptTime, statusCode, reason, attemptTime + currentBackoffTime);
|
||||
}
|
||||
}
|
||||
|
||||
private void setStatus(Webhook.Status status, Long attemptTime, Integer statusCode, String reason, Long timestamp)
|
||||
throws IOException {
|
||||
Webhook stored = daoCollection.webhookDAO().findEntityById(webhook.getId());
|
||||
webhook.setStatus(status);
|
||||
webhook
|
||||
.getFailureDetails()
|
||||
.withLastFailedAt(attemptTime)
|
||||
.withLastFailedStatusCode(statusCode)
|
||||
.withLastFailedReason(reason)
|
||||
.withNextAttempt(timestamp);
|
||||
|
||||
// TODO: Fix this
|
||||
WebhookUpdater updater = webhookRepository.getUpdater(stored, webhook, EntityRepository.Operation.PUT);
|
||||
updater.update();
|
||||
}
|
||||
|
||||
private synchronized void createClient() {
|
||||
if (client != null) {
|
||||
client.close();
|
||||
client = null;
|
||||
}
|
||||
ClientBuilder clientBuilder = ClientBuilder.newBuilder();
|
||||
clientBuilder.connectTimeout(10, TimeUnit.SECONDS);
|
||||
clientBuilder.readTimeout(12, TimeUnit.SECONDS);
|
||||
client = clientBuilder.build();
|
||||
}
|
||||
|
||||
public void awaitShutdown() throws InterruptedException {
|
||||
LOG.info("Awaiting shutdown webhook-lifecycle {}", webhook.getName());
|
||||
shutdownLatch.await(5, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void setProcessor(BatchEventProcessor<EventPubSub.ChangeEventHolder> processor) {
|
||||
this.processor = processor;
|
||||
}
|
||||
|
||||
public BatchEventProcessor<EventPubSub.ChangeEventHolder> getProcessor() {
|
||||
return processor;
|
||||
}
|
||||
|
||||
private Invocation.Builder getTarget() {
|
||||
Map<String, String> authHeaders = SecurityUtil.authHeaders("admin@open-metadata.org");
|
||||
return SecurityUtil.addHeaders(client.target(webhook.getEndpoint()), authHeaders);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publish(EventResource.ChangeEventList list) throws EventPublisherException, IOException {
|
||||
long attemptTime = System.currentTimeMillis();
|
||||
try {
|
||||
String json = JsonUtils.pojoToJson(list);
|
||||
Response response;
|
||||
if (webhook.getSecretKey() != null && !webhook.getSecretKey().isEmpty()) {
|
||||
String hmac = "sha256=" + CommonUtil.calculateHMAC(webhook.getSecretKey(), json);
|
||||
response = getTarget().header(RestUtil.SIGNATURE_HEADER, hmac).post(javax.ws.rs.client.Entity.json(json));
|
||||
} else {
|
||||
response = getTarget().post(javax.ws.rs.client.Entity.json(json));
|
||||
}
|
||||
LOG.info(
|
||||
"Webhook {}:{}:{} received response {}",
|
||||
webhook.getName(),
|
||||
webhook.getStatus(),
|
||||
batch.size(),
|
||||
response.getStatusInfo());
|
||||
// 2xx response means call back is successful
|
||||
if (response.getStatus() >= 200 && response.getStatus() < 300) { // All 2xx responses
|
||||
webhook.getFailureDetails().setLastSuccessfulAt(batch.get(batch.size() - 1).getTimestamp());
|
||||
batch.clear();
|
||||
if (webhook.getStatus() != Webhook.Status.ACTIVE) {
|
||||
setStatus(Webhook.Status.ACTIVE, null, null, null, null);
|
||||
}
|
||||
// 3xx response/redirection is not allowed for callback. Set the webhook state as in error
|
||||
} else if (response.getStatus() >= 300 && response.getStatus() < 400) {
|
||||
setErrorStatus(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
|
||||
// 4xx, 5xx response retry delivering events after timeout
|
||||
} else if (response.getStatus() >= 300 && response.getStatus() < 600) {
|
||||
setNextBackOff();
|
||||
setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
|
||||
Thread.sleep(currentBackoffTime);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
Throwable cause = ex.getCause();
|
||||
if (cause.getClass() == UnknownHostException.class) {
|
||||
LOG.warn("Invalid webhook {} endpoint {}", webhook.getName(), webhook.getEndpoint());
|
||||
setErrorStatus(attemptTime, null, "UnknownHostException");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1376,6 +1376,9 @@ public interface CollectionDAO {
|
||||
default boolean supportsSoftDelete() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@SqlQuery("SELECT json FROM <table>")
|
||||
List<String> listAllWebhooks(@Define("table") String table);
|
||||
}
|
||||
|
||||
interface TagCategoryDAO extends EntityDAO<TagCategory> {
|
||||
|
@ -1092,7 +1092,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
return ingestionPipelines;
|
||||
}
|
||||
|
||||
enum Operation {
|
||||
public enum Operation {
|
||||
PUT,
|
||||
PATCH,
|
||||
SOFT_DELETE;
|
||||
|
@ -18,40 +18,23 @@ import static org.openmetadata.catalog.util.EntityUtil.failureDetailsMatch;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.lmax.disruptor.BatchEventProcessor;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import com.lmax.disruptor.LifecycleAware;
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.ws.rs.ProcessingException;
|
||||
import javax.ws.rs.client.Client;
|
||||
import javax.ws.rs.client.ClientBuilder;
|
||||
import javax.ws.rs.client.Invocation.Builder;
|
||||
import javax.ws.rs.core.Response;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.events.EventPubSub;
|
||||
import org.openmetadata.catalog.events.EventPubSub.ChangeEventHolder;
|
||||
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
|
||||
import org.openmetadata.catalog.events.WebhookPublisher;
|
||||
import org.openmetadata.catalog.resources.events.WebhookResource;
|
||||
import org.openmetadata.catalog.security.SecurityUtil;
|
||||
import org.openmetadata.catalog.type.ChangeEvent;
|
||||
import org.openmetadata.catalog.slack.SlackWebhookEventPublisher;
|
||||
import org.openmetadata.catalog.type.EventFilter;
|
||||
import org.openmetadata.catalog.type.EventType;
|
||||
import org.openmetadata.catalog.type.FailureDetails;
|
||||
import org.openmetadata.catalog.type.Webhook;
|
||||
import org.openmetadata.catalog.type.Webhook.Status;
|
||||
import org.openmetadata.catalog.util.EntityUtil.Fields;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import org.openmetadata.catalog.util.RestUtil;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
|
||||
@Slf4j
|
||||
public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
@ -101,7 +84,13 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
webhook.setStatus(Status.DISABLED);
|
||||
return;
|
||||
}
|
||||
WebhookPublisher publisher = new WebhookPublisher(webhook);
|
||||
|
||||
WebhookPublisher publisher;
|
||||
if (webhook.getWebhookType() == Webhook.WebhookType.slack) {
|
||||
publisher = new SlackWebhookEventPublisher(webhook, daoCollection);
|
||||
} else {
|
||||
publisher = new WebhookPublisher(webhook, daoCollection);
|
||||
}
|
||||
BatchEventProcessor<ChangeEventHolder> processor = EventPubSub.addEventHandler(publisher);
|
||||
publisher.setProcessor(processor);
|
||||
webhookPublisherMap.put(webhook.getId(), publisher);
|
||||
@ -145,207 +134,6 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
webhookPublisherMap.remove(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* WebhookPublisher publishes events to the webhook endpoint using POST http requests. There is one instance of
|
||||
* WebhookPublisher per webhook subscription. Each WebhookPublish is an EventHandler that runs in a separate thread
|
||||
* and receives events from LMAX Disruptor {@link EventPubSub} through {@link BatchEventProcessor}.
|
||||
*
|
||||
* <p>The failures during callback to Webhook endpoints are handled in this class as follows:
|
||||
*
|
||||
* <ul>
|
||||
* <li>Webhook with unresolvable URLs are marked as "failed" and no further attempt is made to deliver the events
|
||||
* <li>Webhook callbacks that return 3xx are marked as "failed" and no further attempt is made to deliver the events
|
||||
* <li>Webhook callbacks that return 4xx, 5xx, or timeout are marked as "awaitingRetry" and 5 retry attempts are
|
||||
* made to deliver the events with the following backoff - 3 seconds, 30 seconds, 5 minutes, 1 hours, and 24
|
||||
* hour. When all the 5 delivery attempts fail, the webhook state is marked as "retryLimitReached" and no
|
||||
* further attempt is made to deliver the events.
|
||||
* </ul>
|
||||
*/
|
||||
public class WebhookPublisher implements EventHandler<ChangeEventHolder>, LifecycleAware {
|
||||
// Backoff timeout in seconds. Delivering events is retried 5 times.
|
||||
private static final int BACKOFF_NORMAL = 0;
|
||||
private static final int BACKOFF_3_SECONDS = 3 * 1000;
|
||||
private static final int BACKOFF_30_SECONDS = 30 * 1000;
|
||||
private static final int BACKOFF_5_MINUTES = 5 * 60 * 1000;
|
||||
private static final int BACKOFF_1_HOUR = 60 * 60 * 1000;
|
||||
private static final int BACKOFF_24_HOUR = 24 * 60 * 60 * 1000;
|
||||
|
||||
private int currentBackoffTime = BACKOFF_NORMAL;
|
||||
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
|
||||
private final Webhook webhook;
|
||||
private final List<ChangeEvent> batch = new ArrayList<>();
|
||||
private BatchEventProcessor<ChangeEventHolder> processor;
|
||||
private Client client;
|
||||
private final ConcurrentHashMap<EventType, List<String>> filter = new ConcurrentHashMap<>();
|
||||
|
||||
public WebhookPublisher(Webhook webhook) {
|
||||
this.webhook = webhook;
|
||||
initFilter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStart() {
|
||||
createClient();
|
||||
webhook.withFailureDetails(new FailureDetails());
|
||||
LOG.info("Webhook-lifecycle-onStart {}", webhook.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(ChangeEventHolder changeEventHolder, long sequence, boolean endOfBatch) throws Exception {
|
||||
// Ignore events that don't match the webhook event filters
|
||||
ChangeEvent changeEvent = changeEventHolder.get();
|
||||
List<String> entities = filter.get(changeEvent.getEventType());
|
||||
if (entities == null || (!entities.get(0).equals("*") && !entities.contains(changeEvent.getEntityType()))) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Batch until either the batch has ended or batch size has reached the max size
|
||||
batch.add(changeEventHolder.get());
|
||||
if (!endOfBatch && batch.size() < webhook.getBatchSize()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ChangeEventList list = new ChangeEventList(batch, null, null, batch.size());
|
||||
long attemptTime = System.currentTimeMillis();
|
||||
try {
|
||||
String json = JsonUtils.pojoToJson(list);
|
||||
Response response;
|
||||
if (webhook.getSecretKey() != null && !webhook.getSecretKey().isEmpty()) {
|
||||
String hmac = "sha256=" + CommonUtil.calculateHMAC(webhook.getSecretKey(), json);
|
||||
response = getTarget().header(RestUtil.SIGNATURE_HEADER, hmac).post(javax.ws.rs.client.Entity.json(json));
|
||||
} else {
|
||||
response = getTarget().post(javax.ws.rs.client.Entity.json(json));
|
||||
}
|
||||
LOG.info(
|
||||
"Webhook {}:{}:{} received response {}",
|
||||
webhook.getName(),
|
||||
webhook.getStatus(),
|
||||
batch.size(),
|
||||
response.getStatusInfo());
|
||||
// 2xx response means call back is successful
|
||||
if (response.getStatus() >= 200 && response.getStatus() < 300) { // All 2xx responses
|
||||
batch.clear();
|
||||
webhook.getFailureDetails().setLastSuccessfulAt(changeEventHolder.get().getTimestamp());
|
||||
if (webhook.getStatus() != Status.ACTIVE) {
|
||||
setStatus(Status.ACTIVE, null, null, null, null);
|
||||
}
|
||||
// 3xx response/redirection is not allowed for callback. Set the webhook state as in error
|
||||
} else if (response.getStatus() >= 300 && response.getStatus() < 400) {
|
||||
setErrorStatus(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
|
||||
// 4xx, 5xx response retry delivering events after timeout
|
||||
} else if (response.getStatus() >= 300 && response.getStatus() < 600) {
|
||||
setNextBackOff();
|
||||
setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
|
||||
Thread.sleep(currentBackoffTime);
|
||||
}
|
||||
} catch (ProcessingException ex) {
|
||||
Throwable cause = ex.getCause();
|
||||
if (cause.getClass() == UnknownHostException.class) {
|
||||
LOG.warn("Invalid webhook {} endpoint {}", webhook.getName(), webhook.getEndpoint());
|
||||
setErrorStatus(attemptTime, null, "UnknownHostException");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onShutdown() {
|
||||
currentBackoffTime = BACKOFF_NORMAL;
|
||||
client.close();
|
||||
client = null;
|
||||
shutdownLatch.countDown();
|
||||
LOG.info("Webhook-lifecycle-onShutdown {}", webhook.getName());
|
||||
}
|
||||
|
||||
public synchronized Webhook getWebhook() {
|
||||
return webhook;
|
||||
}
|
||||
|
||||
public synchronized void updateWebhook(Webhook updatedWebhook) {
|
||||
currentBackoffTime = BACKOFF_NORMAL;
|
||||
webhook.setTimeout(updatedWebhook.getTimeout());
|
||||
webhook.setBatchSize(updatedWebhook.getBatchSize());
|
||||
webhook.setEndpoint(updatedWebhook.getEndpoint());
|
||||
webhook.setEventFilters(updatedWebhook.getEventFilters());
|
||||
initFilter();
|
||||
createClient();
|
||||
}
|
||||
|
||||
private void initFilter() {
|
||||
filter.clear();
|
||||
webhook.getEventFilters().forEach(f -> filter.put(f.getEventType(), f.getEntities()));
|
||||
}
|
||||
|
||||
private void setErrorStatus(Long attemptTime, Integer statusCode, String reason) throws IOException {
|
||||
if (!attemptTime.equals(webhook.getFailureDetails().getLastFailedAt())) {
|
||||
setStatus(Status.FAILED, attemptTime, statusCode, reason, null);
|
||||
}
|
||||
throw new RuntimeException(reason);
|
||||
}
|
||||
|
||||
private void setAwaitingRetry(Long attemptTime, int statusCode, String reason) throws IOException {
|
||||
if (!attemptTime.equals(webhook.getFailureDetails().getLastFailedAt())) {
|
||||
setStatus(Status.AWAITING_RETRY, attemptTime, statusCode, reason, attemptTime + currentBackoffTime);
|
||||
}
|
||||
}
|
||||
|
||||
private void setStatus(Status status, Long attemptTime, Integer statusCode, String reason, Long timestamp)
|
||||
throws IOException {
|
||||
Webhook stored = daoCollection.webhookDAO().findEntityById(webhook.getId());
|
||||
webhook.setStatus(status);
|
||||
webhook
|
||||
.getFailureDetails()
|
||||
.withLastFailedAt(attemptTime)
|
||||
.withLastFailedStatusCode(statusCode)
|
||||
.withLastFailedReason(reason)
|
||||
.withNextAttempt(timestamp);
|
||||
WebhookUpdater updater = new WebhookUpdater(stored, webhook, Operation.PUT);
|
||||
updater.update();
|
||||
}
|
||||
|
||||
private synchronized void createClient() {
|
||||
if (client != null) {
|
||||
client.close();
|
||||
client = null;
|
||||
}
|
||||
ClientBuilder clientBuilder = ClientBuilder.newBuilder();
|
||||
clientBuilder.connectTimeout(10, TimeUnit.SECONDS);
|
||||
clientBuilder.readTimeout(12, TimeUnit.SECONDS);
|
||||
client = clientBuilder.build();
|
||||
}
|
||||
|
||||
private void awaitShutdown() throws InterruptedException {
|
||||
LOG.info("Awaiting shutdown webhook-lifecycle {}", webhook.getName());
|
||||
shutdownLatch.await(5, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void setProcessor(BatchEventProcessor<ChangeEventHolder> processor) {
|
||||
this.processor = processor;
|
||||
}
|
||||
|
||||
public BatchEventProcessor<ChangeEventHolder> getProcessor() {
|
||||
return processor;
|
||||
}
|
||||
|
||||
private void setNextBackOff() {
|
||||
if (currentBackoffTime == BACKOFF_NORMAL) {
|
||||
currentBackoffTime = BACKOFF_3_SECONDS;
|
||||
} else if (currentBackoffTime == BACKOFF_3_SECONDS) {
|
||||
currentBackoffTime = BACKOFF_30_SECONDS;
|
||||
} else if (currentBackoffTime == BACKOFF_30_SECONDS) {
|
||||
currentBackoffTime = BACKOFF_5_MINUTES;
|
||||
} else if (currentBackoffTime == BACKOFF_5_MINUTES) {
|
||||
currentBackoffTime = BACKOFF_1_HOUR;
|
||||
} else if (currentBackoffTime == BACKOFF_1_HOUR) {
|
||||
currentBackoffTime = BACKOFF_24_HOUR;
|
||||
}
|
||||
}
|
||||
|
||||
private Builder getTarget() {
|
||||
Map<String, String> authHeaders = SecurityUtil.authHeaders("admin@open-metadata.org");
|
||||
return SecurityUtil.addHeaders(client.target(webhook.getEndpoint()), authHeaders);
|
||||
}
|
||||
}
|
||||
|
||||
public class WebhookUpdater extends EntityUpdater {
|
||||
public WebhookUpdater(Webhook original, Webhook updated, Operation operation) {
|
||||
super(original, updated, operation);
|
||||
|
@ -45,8 +45,10 @@ import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.SecurityContext;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import org.openmetadata.catalog.CatalogApplicationConfig;
|
||||
import org.openmetadata.catalog.api.events.CreateWebhook;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO.WebhookDAO;
|
||||
import org.openmetadata.catalog.jdbi3.ListFilter;
|
||||
import org.openmetadata.catalog.jdbi3.WebhookRepository;
|
||||
import org.openmetadata.catalog.resources.Collection;
|
||||
@ -57,6 +59,7 @@ import org.openmetadata.catalog.type.Include;
|
||||
import org.openmetadata.catalog.type.Webhook;
|
||||
import org.openmetadata.catalog.type.Webhook.Status;
|
||||
import org.openmetadata.catalog.util.EntityUtil;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import org.openmetadata.catalog.util.ResultList;
|
||||
|
||||
@Path("/v1/webhook")
|
||||
@ -66,6 +69,7 @@ import org.openmetadata.catalog.util.ResultList;
|
||||
@Collection(name = "webhook")
|
||||
public class WebhookResource extends EntityResource<Webhook, WebhookRepository> {
|
||||
public static final String COLLECTION_PATH = "v1/webhook/";
|
||||
private WebhookDAO webhookDAO;
|
||||
|
||||
@Override
|
||||
public Webhook addHref(UriInfo uriInfo, Webhook entity) {
|
||||
@ -84,6 +88,18 @@ public class WebhookResource extends EntityResource<Webhook, WebhookRepository>
|
||||
|
||||
public WebhookResource(CollectionDAO dao, Authorizer authorizer) {
|
||||
super(Webhook.class, new WebhookRepository(dao), authorizer);
|
||||
webhookDAO = dao.webhookDAO();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused") // Method used for reflection
|
||||
public void initialize(CatalogApplicationConfig config) throws IOException {
|
||||
try {
|
||||
List<String> listAllWebhooks = webhookDAO.listAllWebhooks(webhookDAO.getTableName());
|
||||
List<Webhook> webhookList = JsonUtils.readObjects(listAllWebhooks, Webhook.class);
|
||||
webhookList.forEach(dao::addWebhookPublisher);
|
||||
} catch (Exception ex) {
|
||||
// Starting application should not fail
|
||||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
@ -339,6 +355,7 @@ public class WebhookResource extends EntityResource<Webhook, WebhookRepository>
|
||||
.withTimeout(create.getTimeout())
|
||||
.withEnabled(create.getEnabled())
|
||||
.withSecretKey(create.getSecretKey())
|
||||
.withStatus(Boolean.TRUE.equals(create.getEnabled()) ? Status.ACTIVE : Status.DISABLED);
|
||||
.withStatus(Boolean.TRUE.equals(create.getEnabled()) ? Status.ACTIVE : Status.DISABLED)
|
||||
.withWebhookType(Webhook.WebhookType.fromValue(create.getWebhookType().value()));
|
||||
}
|
||||
}
|
||||
|
@ -1,8 +1,5 @@
|
||||
package org.openmetadata.catalog.slack;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.ws.rs.client.Client;
|
||||
import javax.ws.rs.client.ClientBuilder;
|
||||
@ -10,27 +7,27 @@ import javax.ws.rs.client.Invocation;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.catalog.events.AbstractEventPublisher;
|
||||
import org.openmetadata.catalog.events.WebhookPublisher;
|
||||
import org.openmetadata.catalog.events.errors.EventPublisherException;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
|
||||
import org.openmetadata.catalog.type.ChangeEvent;
|
||||
import org.openmetadata.catalog.type.Webhook;
|
||||
import org.openmetadata.catalog.util.ChangeEventParser;
|
||||
|
||||
@Slf4j
|
||||
public class SlackWebhookEventPublisher extends AbstractEventPublisher {
|
||||
public class SlackWebhookEventPublisher extends WebhookPublisher {
|
||||
private final Invocation.Builder target;
|
||||
private final Client client;
|
||||
private final String openMetadataUrl;
|
||||
|
||||
public SlackWebhookEventPublisher(SlackPublisherConfiguration config) {
|
||||
super(config.getBatchSize(), config.getFilters());
|
||||
String slackWebhookURL = config.getWebhookUrl();
|
||||
public SlackWebhookEventPublisher(Webhook webhook, CollectionDAO dao) {
|
||||
super(webhook, dao);
|
||||
String slackWebhookURL = webhook.getEndpoint().toString();
|
||||
ClientBuilder clientBuilder = ClientBuilder.newBuilder();
|
||||
clientBuilder.connectTimeout(10, TimeUnit.SECONDS);
|
||||
clientBuilder.readTimeout(12, TimeUnit.SECONDS);
|
||||
client = clientBuilder.build();
|
||||
target = client.target(slackWebhookURL).request();
|
||||
openMetadataUrl = refineUri(config.getOpenMetadataUrl());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -49,7 +46,7 @@ public class SlackWebhookEventPublisher extends AbstractEventPublisher {
|
||||
public void publish(ChangeEventList events) throws EventPublisherException {
|
||||
for (ChangeEvent event : events.getData()) {
|
||||
try {
|
||||
SlackMessage slackMessage = ChangeEventParser.buildSlackMessage(event, getEntityUrl(event));
|
||||
SlackMessage slackMessage = ChangeEventParser.buildSlackMessage(event);
|
||||
Response response =
|
||||
target.post(javax.ws.rs.client.Entity.entity(slackMessage, MediaType.APPLICATION_JSON_TYPE));
|
||||
if (response.getStatus() >= 300 && response.getStatus() < 400) {
|
||||
@ -63,29 +60,4 @@ public class SlackWebhookEventPublisher extends AbstractEventPublisher {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getEntityUrl(ChangeEvent event) {
|
||||
return String.format(
|
||||
"<%s/%s/%s|%s>",
|
||||
openMetadataUrl,
|
||||
event.getEntityType(),
|
||||
event.getEntityFullyQualifiedName(),
|
||||
event.getEntityFullyQualifiedName());
|
||||
}
|
||||
|
||||
private String refineUri(String url) {
|
||||
URI urlInstance = null;
|
||||
try {
|
||||
urlInstance = new URI(url);
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Slack URL is not in url format - {}", url);
|
||||
}
|
||||
|
||||
if (Objects.nonNull(urlInstance)) {
|
||||
String scheme = urlInstance.getScheme();
|
||||
String host = urlInstance.getHost();
|
||||
return String.format("%s://%s", scheme, host);
|
||||
}
|
||||
return url;
|
||||
}
|
||||
}
|
||||
|
@ -20,11 +20,13 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
|
||||
import com.github.difflib.text.DiffRow;
|
||||
import com.github.difflib.text.DiffRowGenerator;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
@ -69,12 +71,24 @@ public final class ChangeEventParser {
|
||||
SLACK
|
||||
}
|
||||
|
||||
public static SlackMessage buildSlackMessage(ChangeEvent event, String omdurl) {
|
||||
public static String getEntityUrl(ChangeEvent event) {
|
||||
EntityInterface entity = (EntityInterface) event.getEntity();
|
||||
URI urlInstance = entity.getHref();
|
||||
String fqn = event.getEntityFullyQualifiedName();
|
||||
if (Objects.nonNull(urlInstance)) {
|
||||
String scheme = urlInstance.getScheme();
|
||||
String host = urlInstance.getHost();
|
||||
return String.format("<%s://%s/%s/%s|%s>", scheme, host, event.getEntityType(), fqn, fqn);
|
||||
}
|
||||
return urlInstance.toString();
|
||||
}
|
||||
|
||||
public static SlackMessage buildSlackMessage(ChangeEvent event) {
|
||||
SlackMessage slackMessage = new SlackMessage();
|
||||
slackMessage.setUsername(event.getUserName());
|
||||
if (event.getEntity() != null) {
|
||||
String headerTxt = "%s posted on " + event.getEntityType() + " %s";
|
||||
String headerText = String.format(headerTxt, event.getUserName(), omdurl);
|
||||
String headerText = String.format(headerTxt, event.getUserName(), getEntityUrl(event));
|
||||
slackMessage.setText(headerText);
|
||||
}
|
||||
Map<EntityLink, String> messages =
|
||||
|
@ -50,6 +50,20 @@
|
||||
"secretKey": {
|
||||
"description": "Secret set by the webhook client used for computing HMAC SHA256 signature of webhook payload and sent in `X-OM-Signature` header in POST requests to publish the events.",
|
||||
"type": "string"
|
||||
},
|
||||
"webhookType": {
|
||||
"description": "Type of webhook slack,generic etc",
|
||||
"type": "string",
|
||||
"default": "generic",
|
||||
"enum": ["slack", "generic"],
|
||||
"javaEnums": [
|
||||
{
|
||||
"name": "slack"
|
||||
},
|
||||
{
|
||||
"name": "generic"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": ["name", "endpoint", "eventFilters"],
|
||||
|
@ -23,6 +23,20 @@
|
||||
"description": "Display Name that identifies this webhook.",
|
||||
"type": "string"
|
||||
},
|
||||
"webhookType": {
|
||||
"description": "Type of webhook slack,generic etc",
|
||||
"type": "string",
|
||||
"default": "generic",
|
||||
"enum": ["slack", "generic"],
|
||||
"javaEnums": [
|
||||
{
|
||||
"name": "slack"
|
||||
},
|
||||
{
|
||||
"name": "generic"
|
||||
}
|
||||
]
|
||||
},
|
||||
"description": {
|
||||
"description": "Description of the application.",
|
||||
"$ref": "../../type/basic.json#/definitions/markdown"
|
||||
|
Loading…
x
Reference in New Issue
Block a user