ElasticSearch & Slack Event publishers

This commit is contained in:
Sriharsha Chintalapani 2022-01-09 10:38:12 -08:00
parent b368d093af
commit 9d701dcff2
5 changed files with 83 additions and 4 deletions

View File

@ -22,12 +22,14 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.openmetadata.catalog.Entity;
@ -101,9 +103,28 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
if (updateRequest != null) {
client.update(updateRequest, RequestOptions.DEFAULT);
}
} catch (Exception e) {
} catch (ElasticsearchException e) {
LOG.error("failed to update ES doc");
LOG.debug(e.getMessage());
if (e.status() == RestStatus.NOT_FOUND) {
LOG.error("ElasticSearch index is not found. Please run ./bootstrap-storage.sh es-create");
throw new ElasticSearchRetriableException(e.getMessage());
} else if (e.status() == RestStatus.GATEWAY_TIMEOUT) {
LOG.error("ElasticSearch is not reachable. Please check configuration and ElasticSearch health");
throw new ElasticSearchRetriableException(e.getMessage());
} else if (e.status() == RestStatus.INTERNAL_SERVER_ERROR) {
LOG.error(
"ElasticSearch internal server error. Pausing ElasticSearch publisher until "
+ "ElasticSearch is recovered");
throw new ElasticSearchRetriableException(e.getMessage());
} else if (e.status() == RestStatus.REQUEST_TIMEOUT) {
LOG.error("ElasticSearch request timed out");
throw new ElasticSearchRetriableException(e.getMessage());
} else {
throw new EventPublisherException(e.getMessage());
}
} catch (IOException ie) {
throw new EventPublisherException(ie.getMessage());
}
}
}

View File

@ -0,0 +1,23 @@
package org.openmetadata.catalog.elasticsearch;
import org.openmetadata.catalog.events.errors.RetriableException;
public class ElasticSearchRetriableException extends RetriableException {
private static final long serialVersionUID = 1L;
public ElasticSearchRetriableException() {
super();
}
public ElasticSearchRetriableException(String message, Throwable cause) {
super(message, cause);
}
public ElasticSearchRetriableException(String message) {
super(message);
}
public ElasticSearchRetriableException(Throwable cause) {
super(cause);
}
}

View File

@ -3,6 +3,7 @@ package org.openmetadata.catalog.events;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.openmetadata.catalog.events.errors.EventPublisherException;
import org.openmetadata.catalog.events.errors.RetriableException;
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
import org.openmetadata.catalog.type.ChangeEvent;
@ -59,6 +60,8 @@ public abstract class AbstractEventPublisher implements EventPublisher {
} catch (RetriableException ex) {
setNextBackOff();
Thread.sleep(currentBackoffTime);
} catch (EventPublisherException e) {
LOG.error("Failed to publish event {}", changeEvent);
}
}

View File

@ -0,0 +1,23 @@
package org.openmetadata.catalog.slack;
import org.openmetadata.catalog.events.errors.RetriableException;
public class SlackRetriableException extends RetriableException {
private static final long serialVersionUID = 1L;
public SlackRetriableException() {
super();
}
public SlackRetriableException(String message, Throwable cause) {
super(message, cause);
}
public SlackRetriableException(String message) {
super(message);
}
public SlackRetriableException(Throwable cause) {
super(cause);
}
}

View File

@ -1,11 +1,13 @@
package org.openmetadata.catalog.slack;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.openmetadata.catalog.events.AbstractEventPublisher;
import org.openmetadata.catalog.events.errors.EventPublisherException;
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
@ -48,9 +50,16 @@ public class SlackWebhookEventPublisher extends AbstractEventPublisher {
LOG.info("log event {}", event);
try {
SlackMessage slackMessage = buildSlackMessage(event);
target.post(Entity.entity(slackMessage, MediaType.APPLICATION_JSON_TYPE));
} catch (Exception e) {
LOG.error("failed to update ES doc", e);
Response response = target.post(Entity.entity(slackMessage, MediaType.APPLICATION_JSON_TYPE));
if (response.getStatus() >= 300 && response.getStatus() < 400) {
throw new EventPublisherException(
"Slack webhook callback is getting redirected. " + "Please check your configuration");
} else if (response.getStatus() >= 300 && response.getStatus() < 600) {
throw new SlackRetriableException(response.getStatusInfo().getReasonPhrase());
}
} catch (ProcessingException e) {
LOG.error("Failed to publish event {} to slack ", event);
throw new EventPublisherException(e.getMessage());
}
}
}