diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java index f1a8d31ea93..3f6a09b9169 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java @@ -22,12 +22,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.openmetadata.catalog.Entity; @@ -101,9 +103,28 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { if (updateRequest != null) { client.update(updateRequest, RequestOptions.DEFAULT); } - } catch (Exception e) { + } catch (ElasticsearchException e) { LOG.error("failed to update ES doc"); LOG.debug(e.getMessage()); + if (e.status() == RestStatus.NOT_FOUND) { + LOG.error("ElasticSearch index is not found. Please run ./bootstrap-storage.sh es-create"); + throw new ElasticSearchRetriableException(e.getMessage()); + } else if (e.status() == RestStatus.GATEWAY_TIMEOUT) { + LOG.error("ElasticSearch is not reachable. Please check configuration and ElasticSearch health"); + throw new ElasticSearchRetriableException(e.getMessage()); + } else if (e.status() == RestStatus.INTERNAL_SERVER_ERROR) { + LOG.error( + "ElasticSearch internal server error. Pausing ElasticSearch publisher until " + + "ElasticSearch is recovered"); + throw new ElasticSearchRetriableException(e.getMessage()); + } else if (e.status() == RestStatus.REQUEST_TIMEOUT) { + LOG.error("ElasticSearch request timed out"); + throw new ElasticSearchRetriableException(e.getMessage()); + } else { + throw new EventPublisherException(e.getMessage()); + } + } catch (IOException ie) { + throw new EventPublisherException(ie.getMessage()); } } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchRetriableException.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchRetriableException.java new file mode 100644 index 00000000000..d41043f1255 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchRetriableException.java @@ -0,0 +1,23 @@ +package org.openmetadata.catalog.elasticsearch; + +import org.openmetadata.catalog.events.errors.RetriableException; + +public class ElasticSearchRetriableException extends RetriableException { + private static final long serialVersionUID = 1L; + + public ElasticSearchRetriableException() { + super(); + } + + public ElasticSearchRetriableException(String message, Throwable cause) { + super(message, cause); + } + + public ElasticSearchRetriableException(String message) { + super(message); + } + + public ElasticSearchRetriableException(Throwable cause) { + super(cause); + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/AbstractEventPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/AbstractEventPublisher.java index 9bfebc465c0..50cb89b46ed 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/AbstractEventPublisher.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/AbstractEventPublisher.java @@ -3,6 +3,7 @@ package org.openmetadata.catalog.events; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import org.openmetadata.catalog.events.errors.EventPublisherException; import org.openmetadata.catalog.events.errors.RetriableException; import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList; import org.openmetadata.catalog.type.ChangeEvent; @@ -59,6 +60,8 @@ public abstract class AbstractEventPublisher implements EventPublisher { } catch (RetriableException ex) { setNextBackOff(); Thread.sleep(currentBackoffTime); + } catch (EventPublisherException e) { + LOG.error("Failed to publish event {}", changeEvent); } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/slack/SlackRetriableException.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/slack/SlackRetriableException.java new file mode 100644 index 00000000000..71c7bf11982 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/slack/SlackRetriableException.java @@ -0,0 +1,23 @@ +package org.openmetadata.catalog.slack; + +import org.openmetadata.catalog.events.errors.RetriableException; + +public class SlackRetriableException extends RetriableException { + private static final long serialVersionUID = 1L; + + public SlackRetriableException() { + super(); + } + + public SlackRetriableException(String message, Throwable cause) { + super(message, cause); + } + + public SlackRetriableException(String message) { + super(message); + } + + public SlackRetriableException(Throwable cause) { + super(cause); + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/slack/SlackWebhookEventPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/slack/SlackWebhookEventPublisher.java index aa03b9f2c44..3b5ab18b23e 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/slack/SlackWebhookEventPublisher.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/slack/SlackWebhookEventPublisher.java @@ -1,11 +1,13 @@ package org.openmetadata.catalog.slack; import java.util.concurrent.TimeUnit; +import javax.ws.rs.ProcessingException; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import org.openmetadata.catalog.events.AbstractEventPublisher; import org.openmetadata.catalog.events.errors.EventPublisherException; import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList; @@ -48,9 +50,16 @@ public class SlackWebhookEventPublisher extends AbstractEventPublisher { LOG.info("log event {}", event); try { SlackMessage slackMessage = buildSlackMessage(event); - target.post(Entity.entity(slackMessage, MediaType.APPLICATION_JSON_TYPE)); - } catch (Exception e) { - LOG.error("failed to update ES doc", e); + Response response = target.post(Entity.entity(slackMessage, MediaType.APPLICATION_JSON_TYPE)); + if (response.getStatus() >= 300 && response.getStatus() < 400) { + throw new EventPublisherException( + "Slack webhook callback is getting redirected. " + "Please check your configuration"); + } else if (response.getStatus() >= 300 && response.getStatus() < 600) { + throw new SlackRetriableException(response.getStatusInfo().getReasonPhrase()); + } + } catch (ProcessingException e) { + LOG.error("Failed to publish event {} to slack ", event); + throw new EventPublisherException(e.getMessage()); } } }