Fixes #1900 - Initial implementation of webhook API (#1905)

Co-authored-by: Suresh Srinivas <sureshsrinivas@Suresh-Collate.local>
This commit is contained in:
Suresh Srinivas 2021-12-23 11:26:00 -08:00 committed by GitHub
parent f1a8a7886e
commit 5892595b1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1403 additions and 62 deletions

7
.idea/encodings.xml generated
View File

@ -5,6 +5,13 @@
<file url="file://$PROJECT_DIR$/catalog-rest-service/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/catalog-rest-service/src/main/resources/json/data" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/common/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/common/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/openmetadata-dist/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/openmetadata-dist/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/openmetadata-ui/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/openmetadata-ui/src/main/resources/json/data" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/openmetadata-ui/src/main/resources/ui/dist" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>

View File

@ -0,0 +1,8 @@
CREATE TABLE IF NOT EXISTS webhook_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
json JSON NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY unique_name(name)
-- No versioning, updatedAt, updatedBy, or changeDescription fields for webhook
);

View File

@ -315,11 +315,13 @@
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>

View File

@ -40,6 +40,7 @@ import org.glassfish.jersey.media.multipart.MultiPartFeature;
import org.glassfish.jersey.server.ServerProperties;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.catalog.events.EventFilter;
import org.openmetadata.catalog.events.EventPubSub;
import org.openmetadata.catalog.exception.CatalogGenericExceptionMapper;
import org.openmetadata.catalog.exception.ConstraintViolationExceptionMapper;
import org.openmetadata.catalog.exception.JsonMappingExceptionMapper;
@ -106,6 +107,7 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
// Register Event Handler
registerEventFilter(catalogConfig, environment, jdbi);
EventPubSub.start();
}
@SneakyThrows

View File

@ -77,6 +77,7 @@ public final class Entity {
// Operations
//
public static final String INGESTION = "ingestion";
public static final String WEBHOOK = "webhook";
private Entity() {}

View File

@ -42,13 +42,15 @@ public class ChangeEventHandler implements EventHandler {
ChangeEvent changeEvent = getChangeEvent(method, responseContext);
if (changeEvent != null) {
LOG.info("Recording change event {} {}", changeEvent.getDateTime().getTime(), changeEvent);
EventPubSub.publish(changeEvent);
if (changeEvent.getEntity() != null) {
changeEvent.setEntity(JsonUtils.pojoToJson(changeEvent.getEntity()));
}
dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
}
} catch (Exception e) {
LOG.error("Failed to capture change event for method {} due to {}", method, e);
LOG.error("Failed to capture change event for method {} due to ", method, e);
}
return null;
}

View File

@ -51,21 +51,20 @@ public class EventFilter implements ContainerResponseFilter {
((Class<EventHandler>) Class.forName(eventHandlerClassName)).getConstructor().newInstance();
eventHandler.init(config, jdbi);
eventHandlers.add(eventHandler);
LOG.info("Added event handler {}", eventHandlerClassName);
}
} catch (Exception e) {
LOG.info(e.getMessage());
LOG.info("Exception ", e);
}
}
@Override
public void filter(ContainerRequestContext requestContext, ContainerResponseContext responseContext) {
int responseCode = responseContext.getStatus();
String method = requestContext.getMethod();
if ((responseCode < 200 || responseCode > 299) || (!AUDITABLE_METHODS.contains(method))) {
return;
}
eventHandlers
.parallelStream()
.forEach(

View File

@ -0,0 +1,103 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.events;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.openmetadata.catalog.type.ChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Change event PubSub built based on LMAX Disruptor.
*/
public class EventPubSub {
private static final Logger LOG = LoggerFactory.getLogger(EventPubSub.class);
private static Disruptor<ChangeEventHolder> disruptor;
private static ExecutorService executor;
private static RingBuffer<ChangeEventHolder> ringBuffer;
private static boolean STARTED = false;
public static void start() {
if (!STARTED) {
disruptor = new Disruptor<>(ChangeEventHolder::new, 1024, DaemonThreadFactory.INSTANCE);
executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
ringBuffer = disruptor.start();
LOG.info("Disruptor started");
STARTED = true;
}
}
public static void shutdown() throws InterruptedException {
if (STARTED) {
disruptor.shutdown();
disruptor.halt();
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
disruptor = null;
ringBuffer = null;
STARTED = false;
LOG.info("Disruptor stopped");
}
}
public static class ChangeEventHolder {
private ChangeEvent value;
public void set(ChangeEvent event) {
this.value = event;
}
public ChangeEvent get() {
return value;
}
}
public static class ChangeEventFactory implements EventFactory<ChangeEventHolder> {
public ChangeEventHolder newInstance() {
return new ChangeEventHolder();
}
}
public static void publish(ChangeEvent event) {
if (event != null) {
RingBuffer<ChangeEventHolder> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();
ringBuffer.get(sequence).set(event);
ringBuffer.publish(sequence);
}
}
public static BatchEventProcessor<ChangeEventHolder> addEventHandler(EventHandler<ChangeEventHolder> eventHandler) {
BatchEventProcessor<ChangeEventHolder> processor =
new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(), eventHandler);
ringBuffer.addGatingSequences(processor.getSequence());
executor.execute(processor);
LOG.info("Processor added for {}", processor);
return processor;
}
public static void removeProcessor(BatchEventProcessor<ChangeEventHolder> processor) {
ringBuffer.removeGatingSequence(processor.getSequence());
}
public void close() {}
}

View File

@ -66,11 +66,13 @@ import org.openmetadata.catalog.jdbi3.TableRepository.TableEntityInterface;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamEntityInterface;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicEntityInterface;
import org.openmetadata.catalog.jdbi3.UserRepository.UserEntityInterface;
import org.openmetadata.catalog.jdbi3.WebhookRepository.WebhookEntityInterface;
import org.openmetadata.catalog.operations.workflows.Ingestion;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.type.UsageDetails;
import org.openmetadata.catalog.type.UsageStats;
import org.openmetadata.catalog.type.Webhook;
import org.openmetadata.catalog.util.EntityUtil;
public interface CollectionDAO {
@ -155,6 +157,9 @@ public interface CollectionDAO {
@CreateSqlObject
ChangeEventDAO changeEventDAO();
@CreateSqlObject
WebhookDAO webhookDAO();
interface DashboardDAO extends EntityDAO<Dashboard> {
@Override
default String getTableName() {
@ -798,6 +803,28 @@ public interface CollectionDAO {
}
}
interface WebhookDAO extends EntityDAO<Webhook> {
@Override
default String getTableName() {
return "webhook_entity";
}
@Override
default Class<Webhook> getEntityClass() {
return Webhook.class;
}
@Override
default String getNameColumn() {
return "name";
}
@Override
default EntityReference getEntityReference(Webhook entity) {
return new WebhookEntityInterface(entity).getEntityReference();
}
}
@RegisterRowMapper(TagLabelMapper.class)
interface TagDAO {
@SqlUpdate("INSERT INTO tag_category (json) VALUES (:json)")

View File

@ -405,7 +405,7 @@ public abstract class EntityRepository<T> {
return entity;
}
protected T withHref(UriInfo uriInfo, T entity) {
public T withHref(UriInfo uriInfo, T entity) {
if (uriInfo == null) {
return entity;
}
@ -607,7 +607,7 @@ public abstract class EntityRepository<T> {
}
public final void storeUpdate() throws IOException {
if (updateVersion(original.getVersion())) {
if (updateVersion(original.getVersion())) { // Update changed the entity veresion
// Store the old version
String extensionName = EntityUtil.getVersionExtension(entityName, original.getVersion());
daoCollection
@ -616,7 +616,7 @@ public abstract class EntityRepository<T> {
// Store the new version
EntityRepository.this.storeEntity(updated.getEntity(), true);
} else {
} else { // Update did not change the entity version
updated.setUpdateDetails(original.getUpdatedBy(), original.getUpdatedAt());
}
}

View File

@ -0,0 +1,414 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import java.io.IOException;
import java.net.URI;
import java.net.UnknownHostException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.events.EventPubSub;
import org.openmetadata.catalog.events.EventPubSub.ChangeEventHolder;
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
import org.openmetadata.catalog.resources.events.WebhookResource;
import org.openmetadata.catalog.security.SecurityUtil;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.FailureDetails;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.type.Webhook;
import org.openmetadata.catalog.type.Webhook.Status;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.common.utils.CommonUtil;
public class WebhookRepository extends EntityRepository<Webhook> {
private final CollectionDAO dao;
private static final List<WebhookPublisher> webhookPublisherList = new ArrayList<>();
public WebhookRepository(CollectionDAO dao) {
super(
WebhookResource.COLLECTION_PATH,
Entity.WEBHOOK,
Webhook.class,
dao.webhookDAO(),
dao,
Fields.EMPTY_FIELDS,
Fields.EMPTY_FIELDS);
this.dao = dao;
}
@Override
public EntityInterface<Webhook> getEntityInterface(Webhook entity) {
return new WebhookEntityInterface(entity);
}
@Override
public Webhook setFields(Webhook entity, Fields fields) throws IOException, ParseException {
return entity; // No fields to set
}
@Override
public void prepare(Webhook entity) throws IOException {
// Nothing to prepare
}
@Override
public void storeEntity(Webhook entity, boolean update) throws IOException {
entity.setHref(null);
if (update) {
dao.webhookDAO().update(entity.getId(), JsonUtils.pojoToJson(entity));
} else {
dao.webhookDAO().insert(entity);
}
}
@Override
public void storeRelationships(Webhook entity) {
// No relationship to store
}
@Override
public void restorePatchAttributes(Webhook original, Webhook updated) {
updated.withId(original.getId()).withName(original.getName());
}
@Override
public EntityRepository<Webhook>.EntityUpdater getUpdater(Webhook original, Webhook updated, boolean patchOperation) {
return super.getUpdater(original, updated, patchOperation);
}
public void addWebhook(Webhook webhook) {
WebhookPublisher publisher = new WebhookPublisher(webhook);
BatchEventProcessor<ChangeEventHolder> processor = EventPubSub.addEventHandler(publisher);
publisher.setProcessor(processor);
webhookPublisherList.add(publisher);
publisher.test();
LOG.info("Webhook added for {}", webhook);
}
public static void deleteWebhook(UUID id) throws InterruptedException {
Iterator<WebhookPublisher> iterator = webhookPublisherList.iterator();
while (iterator.hasNext()) {
WebhookPublisher publisher = iterator.next();
if (publisher.getWebhook().getId().equals(id)) {
iterator.remove();
publisher.getProcessor().halt();
publisher.awaitShutdown();
EventPubSub.removeProcessor(publisher.getProcessor());
LOG.info("Webhook deleted {}", publisher.getWebhook());
}
}
}
@Transaction
public boolean delete(String id) {
return dao.webhookDAO().delete(UUID.fromString(id)) > 0;
}
public static class WebhookEntityInterface implements EntityInterface<Webhook> {
private final Webhook entity;
public WebhookEntityInterface(Webhook entity) {
this.entity = entity;
}
@Override
public UUID getId() {
return entity.getId();
}
@Override
public String getDescription() {
return entity.getDescription();
}
@Override
public String getDisplayName() {
return entity.getName();
}
@Override
public EntityReference getOwner() {
return null;
}
@Override
public String getFullyQualifiedName() {
return entity.getName();
}
@Override
public List<TagLabel> getTags() {
return null;
}
@Override
public Double getVersion() {
return entity.getVersion();
}
@Override
public String getUpdatedBy() {
return entity.getUpdatedBy();
}
@Override
public Date getUpdatedAt() {
return entity.getUpdatedAt();
}
@Override
public EntityReference getEntityReference() {
return new EntityReference()
.withId(getId())
.withName(getFullyQualifiedName())
.withDescription(getDescription())
.withDisplayName(getDisplayName())
.withType(Entity.WEBHOOK);
}
@Override
public URI getHref() {
return entity.getHref();
}
@Override
public List<EntityReference> getFollowers() {
return null;
}
@Override
public Webhook getEntity() {
return entity;
}
@Override
public ChangeDescription getChangeDescription() {
return entity.getChangeDescription();
}
@Override
public void setId(UUID id) {
entity.setId(id);
}
@Override
public void setDescription(String description) {
entity.setDescription(description);
}
@Override
public void setDisplayName(String displayName) {}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}
@Override
public void setChangeDescription(Double newVersion, ChangeDescription changeDescription) {
entity.setVersion(newVersion);
entity.setChangeDescription(changeDescription);
}
@Override
public void setOwner(EntityReference owner) {}
@Override
public Webhook withHref(URI href) {
return entity.withHref(href);
}
@Override
public void setTags(List<TagLabel> tags) {}
}
/** One webhook call back per webhook subscription */
public class WebhookPublisher implements EventHandler<ChangeEventHolder>, LifecycleAware {
// Backoff timeout in seconds
private static final int BACKOFF_NORMAL = 0;
private static final int BACKOFF_3_SECONDS = 3;
private static final int BACKOFF_5_MINUTES = 5 * 60;
private static final int BACKOFF_1_HOUR = 60 * 60;
private static final int BACKOFF_24_HOUR = 24 * 60 * 60;
private int currentBackoffTime = BACKOFF_NORMAL;
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final Webhook webhook;
private final List<ChangeEvent> batch = new ArrayList<>();
private BatchEventProcessor<ChangeEventHolder> processor;
private Client client;
private Builder target;
public WebhookPublisher(Webhook webhook) {
this.webhook = webhook;
}
public void test() {
// TODO
}
public Webhook getWebhook() {
return webhook;
}
public void cleanup() {
// TODO
client.close();
client = null;
}
@Override
public void onEvent(ChangeEventHolder changeEventHolder, long sequence, boolean endOfBatch) throws Exception {
batch.add(changeEventHolder.get());
// Batch until either the batch size has reached the max size
if (!endOfBatch && batch.size() < webhook.getBatchSize()) {
return;
}
// TODO send max batch size
ChangeEventList list = new ChangeEventList(batch, null, null, batch.size());
Date attemptTime = new Date();
try {
Response response = target.post(javax.ws.rs.client.Entity.entity(list, MediaType.APPLICATION_JSON));
LOG.info(
"Webhook {}:{}:{} received response {}",
webhook.getName(),
webhook.getStatus(),
batch.size(),
response.getStatusInfo());
// 2xx response means call back is successful
if (response.getStatus() >= 200 && response.getStatus() < 300) { // All 2xx responses
batch.clear();
if (webhook.getStatus() != Status.SUCCESS) {
setStatus(Status.SUCCESS, null);
}
// 3xx response/redirection is not allowed for callback. Set the webhook state as in error
} else if (response.getStatus() >= 300 && response.getStatus() < 400) {
setErrorStatus(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
// 4xx, 5xx response retry delivering events after timeout
} else if (response.getStatus() >= 300 && response.getStatus() < 600) {
setNextBackOff();
setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
Thread.sleep(currentBackoffTime);
}
} catch (ProcessingException ex) {
Throwable cause = ex.getCause();
if (cause.getClass() == UnknownHostException.class) {
LOG.warn("Invalid webhook endpoint {}", webhook.getEndPoint());
setErrorStatus(attemptTime, null, "UnknownHostException");
}
}
}
private void setErrorStatus(Date attemptTime, Integer statusCode, String reason) throws IOException {
if (webhook.getFailureDetails() == null || attemptTime != webhook.getFailureDetails().getLastFailedAttempt()) {
setStatus(
Status.ERROR,
new FailureDetails()
.withLastFailedAttempt(attemptTime)
.withLastFailedStatusCode(statusCode)
.withLastFailedReason(reason));
}
throw new RuntimeException(reason);
}
private void setAwaitingRetry(Date attemptTime, int statusCode, String reason) throws ParseException, IOException {
if (webhook.getFailureDetails() == null || attemptTime != webhook.getFailureDetails().getLastFailedAttempt()) {
setStatus(
Status.AWAITING_RETRY,
new FailureDetails()
.withLastFailedAttempt(attemptTime)
.withLastFailedStatusCode(statusCode)
.withLastFailedReason(reason)
.withNextAttempt(CommonUtil.getDateByOffsetSeconds(attemptTime, currentBackoffTime)));
}
}
private void setStatus(Status status, FailureDetails details) throws IOException {
webhook.setStatus(status);
webhook.setFailureDetails(details);
// TODO versioning
storeEntity(webhook, true);
}
@Override
public void onStart() {
test();
LOG.info("Webhook processor with webhook {} started", webhook);
ClientBuilder clientBuilder = ClientBuilder.newBuilder();
clientBuilder.connectTimeout(10, TimeUnit.SECONDS);
clientBuilder.readTimeout(12, TimeUnit.SECONDS);
client = clientBuilder.build();
// TODO clean this up
Map<String, String> authHeaders = SecurityUtil.authHeaders("admin@open-metadata.org");
target = SecurityUtil.addHeaders(client.target(webhook.getEndPoint()), authHeaders);
}
@Override
public void onShutdown() {
cleanup();
shutdownLatch.countDown();
LOG.info("Cleaned up webhook {}", webhook);
}
public void awaitShutdown() throws InterruptedException {
shutdownLatch.await();
}
public void setProcessor(BatchEventProcessor<ChangeEventHolder> processor) {
this.processor = processor;
}
public BatchEventProcessor<ChangeEventHolder> getProcessor() {
return processor;
}
public void setNextBackOff() {
if (currentBackoffTime == BACKOFF_NORMAL) {
currentBackoffTime = BACKOFF_3_SECONDS;
} else if (currentBackoffTime == BACKOFF_3_SECONDS) {
currentBackoffTime = BACKOFF_5_MINUTES;
} else if (currentBackoffTime == BACKOFF_5_MINUTES) {
currentBackoffTime = BACKOFF_1_HOUR;
} else if (currentBackoffTime == BACKOFF_1_HOUR) {
currentBackoffTime = BACKOFF_24_HOUR;
}
}
}
}

View File

@ -47,41 +47,22 @@ import org.slf4j.LoggerFactory;
*/
public final class CollectionRegistry {
private static final Logger LOG = LoggerFactory.getLogger(CollectionRegistry.class);
private static CollectionRegistry instance = null;
public static class CollectionDetails {
private final String resourceClass;
private final CollectionDescriptor cd;
private final List<CollectionDescriptor> childCollections = new ArrayList<>();
CollectionDetails(CollectionDescriptor cd, String resourceClass) {
this.cd = cd;
this.resourceClass = resourceClass;
}
public void addChildCollection(CollectionDetails child) {
CollectionInfo collectionInfo = child.cd.getCollection();
LOG.info(
"Adding child collection {} to parent collection {}", collectionInfo.getName(), cd.getCollection().getName());
childCollections.add(child.cd);
}
public CollectionDescriptor[] getChildCollections() {
return childCollections.toArray(new CollectionDescriptor[0]);
}
}
private static CollectionRegistry INSTANCE = null;
/** Map of collection endpoint path to collection details */
private final Map<String, CollectionDetails> collectionMap = new HashMap<>();
/** Resources used only for testing */
private final List<Object> testResources = new ArrayList<>();
private CollectionRegistry() {}
public static CollectionRegistry getInstance() {
if (instance == null) {
instance = new CollectionRegistry();
instance.initialize();
if (INSTANCE == null) {
INSTANCE = new CollectionRegistry();
INSTANCE.initialize();
}
return instance;
return INSTANCE;
}
private void initialize() {
@ -113,11 +94,6 @@ public final class CollectionRegistry {
for (CollectionDetails collection : collections) {
CollectionInfo collectionInfo = collection.cd.getCollection();
collectionMap.put(collectionInfo.getHref().getPath(), collection);
LOG.info(
"Initialized collection name {} href {} details {}",
collectionInfo.getName(),
collectionInfo.getHref(),
collection);
}
// Now add collections to their parents
@ -136,6 +112,10 @@ public final class CollectionRegistry {
}
}
public static void addTestResource(Object testResource) {
getInstance().testResources.add(testResource);
}
/** Register resources from CollectionRegistry */
public void registerResources(
Jdbi jdbi, Environment environment, CatalogApplicationConfig config, CatalogAuthorizer authorizer) {
@ -145,7 +125,7 @@ public final class CollectionRegistry {
String resourceClass = details.resourceClass;
try {
CollectionDAO daoObject = jdbi.onDemand(CollectionDAO.class);
Objects.requireNonNull(daoObject);
Objects.requireNonNull(daoObject, "CollectionDAO must not be null");
Object resource = createResource(daoObject, resourceClass, config, authorizer);
environment.jersey().register(resource);
LOG.info("Registering {}", resourceClass);
@ -153,6 +133,13 @@ public final class CollectionRegistry {
LOG.warn("Failed to create resource for class {} {}", resourceClass, ex);
}
}
// Now add test resources
testResources.forEach(
object -> {
LOG.info("Registering test resource {}", object);
environment.jersey().register(object);
});
}
/** Get collection details based on annotations in Resource classes */
@ -202,22 +189,41 @@ public final class CollectionRegistry {
// Create the resource identified by resourceClass
try {
LOG.info("Creating resource {}", resourceClass);
resource =
clz.getDeclaredConstructor(CollectionDAO.class, CatalogAuthorizer.class).newInstance(daoObject, authorizer);
} catch (NoSuchMethodException ex) {
LOG.info("Creating resource {} with default constructor", resourceClass);
resource = Class.forName(resourceClass).getConstructor().newInstance();
}
// Call initialize method, if it exists
try {
Method initializeMethod = resource.getClass().getMethod("initialize", CatalogApplicationConfig.class);
LOG.info("Initializing resource {}", resourceClass);
initializeMethod.invoke(resource, config);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
// Method does not exist and initialize is not called
}
return resource;
}
public static class CollectionDetails {
private final String resourceClass;
private final CollectionDescriptor cd;
private final List<CollectionDescriptor> childCollections = new ArrayList<>();
CollectionDetails(CollectionDescriptor cd, String resourceClass) {
this.cd = cd;
this.resourceClass = resourceClass;
}
public void addChildCollection(CollectionDetails child) {
CollectionInfo collectionInfo = child.cd.getCollection();
LOG.info(
"Adding child collection {} to parent collection {}", collectionInfo.getName(), cd.getCollection().getName());
childCollections.add(child.cd);
}
public CollectionDescriptor[] getChildCollections() {
return childCollections.toArray(new CollectionDescriptor[0]);
}
}
}

View File

@ -52,8 +52,10 @@ import org.openmetadata.catalog.util.ResultList;
@Collection(name = "events")
public class EventResource {
private final ChangeEventRepository dao;
private final CatalogAuthorizer authorizer;
public static class ChangeEventList extends ResultList<ChangeEvent> {
@SuppressWarnings("unused") /* Required for tests */
public ChangeEventList() {}
@ -67,6 +69,7 @@ public class EventResource {
public EventResource(CollectionDAO dao, CatalogAuthorizer authorizer) {
Objects.requireNonNull(dao, "ChangeEventRepository must not be null");
this.dao = new ChangeEventRepository(dao);
this.authorizer = authorizer;
}
@GET

View File

@ -0,0 +1,296 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.resources.events;
import com.google.inject.Inject;
import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import org.openmetadata.catalog.api.events.CreateWebhook;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.jdbi3.WebhookRepository;
import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.security.CatalogAuthorizer;
import org.openmetadata.catalog.security.SecurityUtil;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityHistory;
import org.openmetadata.catalog.type.Webhook;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.RestUtil.PutResponse;
import org.openmetadata.catalog.util.ResultList;
@Path("/v1/webhook")
@Api(value = "Webhook resource", tags = "webhook")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "webhook")
public class WebhookResource {
public static final String COLLECTION_PATH = "v1/webhook/";
private final WebhookRepository dao;
private final CatalogAuthorizer authorizer;
public static class WebhookList extends ResultList<Webhook> {
@SuppressWarnings("unused") /* Required for tests */
public WebhookList() {}
public WebhookList(List<Webhook> data, String beforeCursor, String afterCursor, int total)
throws GeneralSecurityException, UnsupportedEncodingException {
super(data, beforeCursor, afterCursor, total);
}
}
@Inject
public WebhookResource(CollectionDAO dao, CatalogAuthorizer authorizer) {
Objects.requireNonNull(dao, "ChangeEventRepository must not be null");
this.dao = new WebhookRepository(dao);
this.authorizer = authorizer;
}
@GET
@Operation(
summary = "List webhooks",
tags = "webhook",
description = "Get a list of webhook subscriptions",
responses = {
@ApiResponse(
responseCode = "200",
description = "List of webhooks",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = WebhookList.class)))
})
public ResultList<Webhook> list(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Limit the number webhooks returned. (1 to 1000000, default = " + "10) ")
@DefaultValue("10")
@Min(1)
@Max(1000000)
@QueryParam("limit")
int limitParam,
@Parameter(description = "Returns list of webhooks before this cursor", schema = @Schema(type = "string"))
@QueryParam("before")
String before,
@Parameter(description = "Returns list of webhooks after this cursor", schema = @Schema(type = "string"))
@QueryParam("after")
String after)
throws IOException, ParseException, GeneralSecurityException {
RestUtil.validateCursors(before, after);
ResultList<Webhook> webhooks;
if (before != null) { // Reverse paging
webhooks = dao.listBefore(uriInfo, Fields.EMPTY_FIELDS, null, limitParam, before);
} else { // Forward paging or first page
webhooks = dao.listAfter(uriInfo, Fields.EMPTY_FIELDS, null, limitParam, after);
}
webhooks.getData().forEach(t -> dao.withHref(uriInfo, t));
return webhooks;
}
@GET
@Path("/{id}")
@Valid
@Operation(
summary = "Get a webhook",
tags = "webhook",
description = "Get a webhook by given Id",
responses = {
@ApiResponse(
responseCode = "200",
description = "Entity events",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = ChangeEvent.class))),
@ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found")
})
public Webhook getWebhook(
@Context UriInfo uriInfo,
@Parameter(description = "webhook Id", schema = @Schema(type = "string")) @PathParam("id") String id)
throws IOException, GeneralSecurityException, ParseException {
return dao.get(uriInfo, id, Fields.EMPTY_FIELDS);
}
@GET
@Path("/name/{name}")
@Operation(
summary = "Get a webhook by name",
tags = "webhook",
description = "Get a webhook by name.",
responses = {
@ApiResponse(
responseCode = "200",
description = "webhook",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Webhook.class))),
@ApiResponse(responseCode = "404", description = "Webhook for instance {id} is not found")
})
public Webhook getByName(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Name of the webhook", schema = @Schema(type = "string")) @PathParam("name") String fqn)
throws IOException, ParseException {
return dao.getByName(uriInfo, fqn, Fields.EMPTY_FIELDS);
}
@GET
@Path("/{id}/versions")
@Operation(
summary = "List webhook versions",
tags = "webhook",
description = "Get a list of all the versions of a webhook identified by `id`",
responses = {
@ApiResponse(
responseCode = "200",
description = "List of webhook versions",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = EntityHistory.class)))
})
public EntityHistory listVersions(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "webhook Id", schema = @Schema(type = "string")) @PathParam("id") String id)
throws IOException, ParseException {
return dao.listVersions(id);
}
@GET
@Path("/{id}/versions/{version}")
@Operation(
summary = "Get a version of the webhook",
tags = "webhook",
description = "Get a version of the webhook by given `id`",
responses = {
@ApiResponse(
responseCode = "200",
description = "webhook",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Webhook.class))),
@ApiResponse(
responseCode = "404",
description = "Webhook for instance {id} and version {version} is " + "not found")
})
public Webhook getVersion(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "webhook Id", schema = @Schema(type = "string")) @PathParam("id") String id,
@Parameter(
description = "webhook version number in the form `major`.`minor`",
schema = @Schema(type = "string", example = "0.1 or 1.1"))
@PathParam("version")
String version)
throws IOException, ParseException {
return dao.getVersion(id, version);
}
@POST
@Operation(
summary = "Subscribe to a new webhook",
tags = "webhook",
description = "Subscribe to a new webhook",
responses = {
@ApiResponse(
responseCode = "200",
description = "webhook",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Webhook.class))),
@ApiResponse(responseCode = "400", description = "Bad request")
})
public Response createWebhook(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateWebhook create)
throws IOException {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
Webhook webhook = getWebhook(securityContext, create);
webhook = dao.create(uriInfo, webhook);
dao.addWebhook(webhook);
return Response.created(webhook.getHref()).entity(webhook).build();
}
@PUT
@Operation(
summary = "Updated an existing or create a new webhook",
tags = "webhook",
description = "Updated an existing or create a new webhook",
responses = {
@ApiResponse(
responseCode = "200",
description = "webhook",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Webhook.class))),
@ApiResponse(responseCode = "400", description = "Bad request")
})
public Response updateWebhook(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateWebhook create)
throws IOException, ParseException {
// TODO
// SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
// Table table = getTable(securityContext, create);
Webhook webhook = getWebhook(securityContext, create);
PutResponse<Webhook> putResponse = dao.createOrUpdate(uriInfo, webhook);
return putResponse.toResponse();
}
@DELETE
@Path("/{id}")
@Valid
@Operation(
summary = "Delete a webhook",
tags = "webhook",
description = "Get a webhook by given Id",
responses = {
@ApiResponse(
responseCode = "200",
description = "Entity events",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Webhook.class))),
@ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found")
})
public Response deleteWebhook(
@Context UriInfo uriInfo,
@Parameter(description = "webhook Id", schema = @Schema(type = "string")) @PathParam("id") String id)
throws IOException, GeneralSecurityException, ParseException {
dao.delete(id);
return Response.ok().build();
}
public Webhook getWebhook(SecurityContext securityContext, CreateWebhook create) {
return new Webhook()
.withDescription(create.getDescription())
.withName(create.getName())
.withId(UUID.randomUUID())
.withEndPoint(create.getEndPoint())
.withEventFilters(create.getEventFilters())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
}
}

View File

@ -0,0 +1,47 @@
{
"$id": "https://open-metadata.org/schema/type/createWebhook.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ChangeEvent",
"description": "This schema defines webhook for receiving events from OpenMetadata",
"type": "object",
"properties": {
"name": {
"description": "Unique name of the application receiving webhook events.",
"type": "string",
"minLength": 1,
"maxLength": 128
},
"description": {
"description": "Description of the application",
"type": "string"
},
"endPoint": {
"description": "Endpoint to receive the webhook events over POST requests.",
"type": "string",
"format": "uri"
},
"batchSize": {
"description": "Maximum number of events sent in a batch (Default 10).",
"type": "integer",
"default": 10
},
"eventFilters": {
"description": "Endpoint to receive the webhook events over POST requests.",
"type": "array",
"items": {
"$ref": "../../type/changeEvent.json#/definitions/eventFilter"
}
},
"timeout": {
"description": "Connection timeout in seconds. (Default = 10s)",
"type": "integer",
"default": 10
}
},
"required": [
"name",
"endPoint",
"eventFilter"
],
"additionalProperties": false
}

View File

@ -0,0 +1,112 @@
{
"$id": "https://open-metadata.org/schema/type/webhook.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ChangeEvent",
"description": "This schema defines webhook for receiving events from OpenMetadata",
"type": "object",
"javaType": "org.openmetadata.catalog.type.Webhook",
"properties": {
"id": {
"description": "Unique ID associated with a webhook subscription.",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"name": {
"description": "Unique name of the application receiving webhook events.",
"type": "string",
"minLength": 1,
"maxLength": 128
},
"description": {
"description": "Description of the application",
"type": "string"
},
"endPoint": {
"description": "Endpoint to receive the webhook events over POST requests.",
"type": "string",
"format": "uri"
},
"eventFilters": {
"description": "Endpoint to receive the webhook events over POST requests.",
"type": "array",
"items": {
"$ref": "../../type/changeEvent.json#/definitions/eventFilter"
}
},
"batchSize": {
"description": "Maximum number of events sent in a batch (Default 10).",
"type": "integer",
"default": 10
},
"timeout": {
"description": "Connection timeout in seconds. (Default 10s)",
"type": "integer",
"default": 10
},
"enabled": {
"description": "When set to `true`, the webhook event notification is enabled. Set it to `false` to disable the subscription. (Default `true`)",
"type": "boolean",
"default": true
},
"version" : {
"description": "Metadata version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt": {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
},
"updatedBy": {
"description": "User who made the update.",
"type": "string"
},
"status": {
"description": "Status is `success` on 200 response to notifications. Status is `error` on bad callback URL, connection failures, and `1xx`, `3xx` response. Status is `awaitingRetry` when previous attempt at delivery timed out or received `4xx`, `5xx` response. Status is `retryLimitReached` after all retries fail.",
"type": "string",
"enum": [
"success",
"error",
"awaitingRetry",
"retryLimitReached"
],
"default": "success"
},
"failureDetails": {
"description": "Failure details are set only when `status` is not `success`",
"type": "object",
"properties": {
"lastFailedAttempt": {
"description": "Last non-successful callback time",
"$ref": "../../type/basic.json#/definitions/dateTime"
},
"lastFailedStatusCode": {
"description": "Last non-successful activity response code received during callback",
"type": "integer"
},
"lastFailedReason": {
"description": "Last non-successful activity response reason received during callback",
"type": "string"
},
"nextAttempt": {
"description": "Next retry will be done at this time. Only valid is `status` is `awaitingRetry`.",
"$ref": "../../type/basic.json#/definitions/dateTime"
}
},
"additionalProperties": false
},
"href": {
"description": "Link to this webhook resource.",
"$ref": "../../type/basic.json#/definitions/href"
},
"changeDescription": {
"description" : "Change that lead to this version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/changeDescription"
}
},
"required": [
"id",
"name",
"endPoint",
"eventFilter"
],
"additionalProperties": false
}

View File

@ -1,5 +1,5 @@
{
"$id": "https://open-metadata.org/schema/type/auditLog.json",
"$id": "https://open-metadata.org/schema/type/changeEvent.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ChangeEvent",
"description": "This schema defines the change event type to capture the changes to entities. Entities change due to user activity, such as updating description of a dataset, changing ownership, or adding new tags. Entity also changes due to activities at the metadata sources, such as a new dataset was created, a datasets was deleted, or schema of a dataset is modified. When state of entity changes, an event is produced. These events can be used to build apps and bots that respond to the change from activities.",

View File

@ -22,7 +22,9 @@ import javax.ws.rs.client.WebTarget;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.HttpUrlConnectorProvider;
import org.junit.jupiter.api.extension.ExtendWith;
import org.openmetadata.catalog.resources.CollectionRegistry;
import org.openmetadata.catalog.resources.EmbeddedMySqlSupport;
import org.openmetadata.catalog.resources.events.WebhookCallbackResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,11 +33,13 @@ import org.slf4j.LoggerFactory;
public abstract class CatalogApplicationTest {
public static final Logger LOG = LoggerFactory.getLogger(CatalogApplicationTest.class);
private static final String CONFIG_PATH;
protected static final String CONFIG_PATH;
public static final DropwizardAppExtension<CatalogApplicationConfig> APP;
private static final Client client;
protected static final WebhookCallbackResource webhookCallbackResource = new WebhookCallbackResource();
static {
CollectionRegistry.addTestResource(webhookCallbackResource);
CONFIG_PATH = ResourceHelpers.resourceFilePath("openmetadata-secure-test.yaml");
APP = new DropwizardAppExtension<>(CatalogApplication.class, CONFIG_PATH);
client = ClientBuilder.newClient();

View File

@ -58,6 +58,7 @@ import javax.json.JsonPatch;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import org.apache.http.client.HttpResponseException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@ -74,11 +75,13 @@ import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.entity.teams.Team;
import org.openmetadata.catalog.entity.teams.User;
import org.openmetadata.catalog.events.EventPubSub;
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceEntityInterface;
import org.openmetadata.catalog.jdbi3.MessagingServiceRepository.MessagingServiceEntityInterface;
import org.openmetadata.catalog.jdbi3.PipelineServiceRepository.PipelineServiceEntityInterface;
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
import org.openmetadata.catalog.resources.events.WebhookResourceTest;
import org.openmetadata.catalog.resources.services.DatabaseServiceResourceTest;
import org.openmetadata.catalog.resources.services.MessagingServiceResourceTest;
import org.openmetadata.catalog.resources.services.PipelineServiceResourceTest;
@ -153,6 +156,10 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
@BeforeAll
public static void setup(TestInfo test) throws URISyntaxException, IOException {
webhookCallbackResource.clearAllEvents();
WebhookResourceTest webhookResourceTest = new WebhookResourceTest();
webhookResourceTest.createWebhooks();
UserResourceTest userResourceTest = new UserResourceTest();
USER1 = UserResourceTest.createUser(userResourceTest.create(test), authHeaders("test@open-metadata.org"));
USER_OWNER1 = new EntityReference().withId(USER1.getId()).withType("user");
@ -236,6 +243,14 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
TIER2_TAG_LABEL = new TagLabel().withTagFQN(tag.getFullyQualifiedName()).withDescription(tag.getDescription());
}
@AfterAll
public static void afterAllTests() throws Exception {
EventPubSub.shutdown();
// Ensure webhooks are in the right state
new WebhookResourceTest().validateWebhookEvents();
APP.getEnvironment().getApplicationContext().getServer().stop();
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Methods to be overridden entity test class
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -1003,8 +1018,6 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
iteration++;
}
LOG.info("Did not find change event {} {} {}", updateTime.getTime(), entityInterface.getId(), expectedEventType);
assertNotNull(
changeEvent,
"Expected change event "

View File

@ -0,0 +1,96 @@
package org.openmetadata.catalog.resources.events;
import io.swagger.annotations.Api;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import org.openmetadata.catalog.type.ChangeEvent;
@Path("v1/test/webhook")
@Api(value = "Topic data asset collection", tags = "Topic data asset collection")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class WebhookCallbackResource {
private final ConcurrentLinkedQueue<ChangeEvent> changeEvents = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<ChangeEvent> changeEventsSlowServer = new ConcurrentLinkedQueue<>();
@POST
@Path("/ignore")
public Response receiveEventIgnore(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
return Response.ok().build();
}
@POST
public Response receiveEvent(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
changeEvents.addAll(events.getData());
return Response.ok().build();
}
@POST
@Path("/slowServer")
public Response receiveEventWithDelay(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
changeEventsSlowServer.addAll(events.getData());
return Response.ok().build();
}
@POST
@Path("/timeout")
public Response receiveEventWithTimeout(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
try {
Thread.sleep(11 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Response.ok().build();
}
@POST
@Path("/300")
public Response receiveEvent300(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
return Response.status(Response.Status.MOVED_PERMANENTLY).build();
}
@POST
@Path("/400")
public Response receiveEvent400(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
@POST
@Path("/500")
public Response receiveEvent500(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
public ConcurrentLinkedQueue<ChangeEvent> getEvents() {
return changeEvents;
}
public ConcurrentLinkedQueue<ChangeEvent> getEventsSlowServer() {
return changeEventsSlowServer;
}
public void clearAllEvents() {
changeEvents.clear();
changeEventsSlowServer.clear();
}
}

View File

@ -0,0 +1,184 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.resources.events;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.http.client.HttpResponseException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInfo;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.events.CreateWebhook;
import org.openmetadata.catalog.jdbi3.WebhookRepository.WebhookEntityInterface;
import org.openmetadata.catalog.resources.EntityResourceTest;
import org.openmetadata.catalog.resources.events.WebhookResource.WebhookList;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.EventFilter;
import org.openmetadata.catalog.type.EventType;
import org.openmetadata.catalog.type.Webhook;
import org.openmetadata.catalog.type.Webhook.Status;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.TestUtils;
public class WebhookResourceTest extends EntityResourceTest<Webhook> {
public static List<EventFilter> ALL_EVENTS_FILTER = new ArrayList<>();
static {
ALL_EVENTS_FILTER.add(new EventFilter().withEventType(EventType.ENTITY_CREATED));
ALL_EVENTS_FILTER.add(new EventFilter().withEventType(EventType.ENTITY_UPDATED));
ALL_EVENTS_FILTER.add(new EventFilter().withEventType(EventType.ENTITY_DELETED));
}
public WebhookResourceTest() {
super(Entity.WEBHOOK, Webhook.class, WebhookList.class, "webhook", "", false, false, false);
supportsPatch = false;
}
@BeforeAll
public static void setup(TestInfo test) throws IOException, URISyntaxException {
EntityResourceTest.setup(test);
}
@Override
public CreateWebhook createRequest(String name, String description, String displayName, EntityReference owner)
throws URISyntaxException {
String uri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/ignore";
return new CreateWebhook()
.withName(name)
.withDescription(description)
.withEventFilters(ALL_EVENTS_FILTER)
.withEndPoint(URI.create(uri));
}
@Override
public void validateCreatedEntity(Webhook webhook, Object request, Map<String, String> authHeaders)
throws HttpResponseException {
CreateWebhook createRequest = (CreateWebhook) request;
validateCommonEntityFields(
getEntityInterface(webhook), createRequest.getDescription(), TestUtils.getPrincipal(authHeaders), null);
assertEquals(createRequest.getName(), webhook.getName());
assertEquals(createRequest.getEventFilters(), webhook.getEventFilters());
}
@Override
public void validateUpdatedEntity(Webhook webhook, Object request, Map<String, String> authHeaders)
throws HttpResponseException {
validateCreatedEntity(webhook, request, authHeaders);
}
@Override
public void compareEntities(Webhook expected, Webhook updated, Map<String, String> authHeaders)
throws HttpResponseException {
// Patch not supported
}
@Override
public EntityInterface<Webhook> getEntityInterface(Webhook entity) {
return new WebhookEntityInterface(entity);
}
@Override
public void validateGetWithDifferentFields(Webhook entity, boolean byName) throws HttpResponseException {}
@Override
public void assertFieldChange(String fieldName, Object expected, Object actual) throws IOException {}
public void createWebhooks() throws IOException, URISyntaxException {
// Valid webhook callback
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook";
CreateWebhook createWebhook =
createRequest("validWebhook", "validWebhook", "", null).withEndPoint(URI.create(baseUri));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback that responds slowly with 5 seconds delay
createWebhook.withName("slowServer").withEndPoint(URI.create(baseUri + "/slowServer"));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback that responds slowly with after 12 seconds (beyond connection + read response timeout)
createWebhook.withName("callbackTimeout").withEndPoint(URI.create(baseUri + "/timeout"));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback that responds with 300 error
createWebhook.withName("callbackResponse300").withEndPoint(URI.create(baseUri + "/300"));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback that responds with 400 error
createWebhook.withName("callbackResponse400").withEndPoint(URI.create(baseUri + "/400"));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback that responds with 400 error
createWebhook.withName("callbackResponse500").withEndPoint(URI.create(baseUri + "/500"));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback with invalid endpoint URI
createWebhook.withName("invalidEndpoint").withEndPoint(URI.create("http://invalidUnknownHost"));
createEntity(createWebhook, adminAuthHeaders());
}
public void validateWebhookEvents() throws HttpResponseException {
// Check the healthy callback server received all the change events
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = webhookCallbackResource.getEvents();
List<ChangeEvent> actualEvents =
getChangeEvents(null, null, null, callbackEvents.peek().getDateTime(), adminAuthHeaders()).getData();
assertEquals(actualEvents.size(), callbackEvents.size());
webhookCallbackResource.clearAllEvents();
// TODO enable this test
// Check the slow callback server received all the change events
// callbackEvents = webhookCallbackResource.getEventsSlowServer();
// actualEvents = getChangeEvents(null, null, null, callbackEvents.peek().getDateTime(),
// adminAuthHeaders()).getData();
// assertEquals(actualEvents.size() - 1, callbackEvents.size());
// webhookCallbackResource.clearAllEvents();
// Check all webhook status
Webhook webhook = getEntityByName("validWebhook", "", adminAuthHeaders());
assertEquals(Status.SUCCESS, webhook.getStatus());
assertNull(webhook.getFailureDetails());
webhook = getEntityByName("slowServer", "", adminAuthHeaders());
assertEquals(Status.SUCCESS, webhook.getStatus());
assertNull(webhook.getFailureDetails());
webhook = getEntityByName("callbackResponse300", "", adminAuthHeaders());
assertEquals(Status.ERROR, webhook.getStatus());
assertEquals(301, webhook.getFailureDetails().getLastFailedStatusCode());
assertEquals("Moved Permanently", webhook.getFailureDetails().getLastFailedReason());
webhook = getEntityByName("callbackResponse400", "", adminAuthHeaders());
assertEquals(Status.AWAITING_RETRY, webhook.getStatus());
assertEquals(400, webhook.getFailureDetails().getLastFailedStatusCode());
assertEquals("Bad Request", webhook.getFailureDetails().getLastFailedReason());
webhook = getEntityByName("callbackResponse500", "", adminAuthHeaders());
assertEquals(Status.AWAITING_RETRY, webhook.getStatus());
assertEquals(500, webhook.getFailureDetails().getLastFailedStatusCode());
assertEquals("Internal Server Error", webhook.getFailureDetails().getLastFailedReason());
webhook = getEntityByName("invalidEndpoint", "", adminAuthHeaders());
assertEquals(Status.ERROR, webhook.getStatus());
assertNull(webhook.getFailureDetails().getLastFailedStatusCode());
assertEquals("UnknownHostException", webhook.getFailureDetails().getLastFailedReason());
}
}

View File

@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.openmetadata.catalog.security.SecurityUtil.addHeaders;
import java.net.URI;
import java.net.URISyntaxException;
@ -164,54 +163,58 @@ public final class TestUtils {
}
public static void post(WebTarget target, Map<String, String> headers) throws HttpResponseException {
Response response = addHeaders(target, headers).post(null);
Response response = SecurityUtil.addHeaders(target, headers).post(null);
readResponse(response, Status.CREATED.getStatusCode());
}
public static <K> void post(WebTarget target, K request, Map<String, String> headers) throws HttpResponseException {
Response response = addHeaders(target, headers).post(Entity.entity(request, MediaType.APPLICATION_JSON));
Response response =
SecurityUtil.addHeaders(target, headers).post(Entity.entity(request, MediaType.APPLICATION_JSON));
readResponse(response, Status.CREATED.getStatusCode());
}
public static <T, K> T post(WebTarget target, K request, Class<T> clz, Map<String, String> headers)
throws HttpResponseException {
Response response = addHeaders(target, headers).post(Entity.entity(request, MediaType.APPLICATION_JSON));
Response response =
SecurityUtil.addHeaders(target, headers).post(Entity.entity(request, MediaType.APPLICATION_JSON));
return readResponse(response, clz, Status.CREATED.getStatusCode());
}
public static <T> T patch(WebTarget target, JsonPatch patch, Class<T> clz, Map<String, String> headers)
throws HttpResponseException {
Response response =
addHeaders(target, headers)
SecurityUtil.addHeaders(target, headers)
.method("PATCH", Entity.entity(patch.toJsonArray().toString(), MediaType.APPLICATION_JSON_PATCH_JSON_TYPE));
return readResponse(response, clz, Status.OK.getStatusCode());
}
public static <K> void put(WebTarget target, K request, Status expectedStatus, Map<String, String> headers)
throws HttpResponseException {
Response response = addHeaders(target, headers).method("PUT", Entity.entity(request, MediaType.APPLICATION_JSON));
Response response =
SecurityUtil.addHeaders(target, headers).method("PUT", Entity.entity(request, MediaType.APPLICATION_JSON));
readResponse(response, expectedStatus.getStatusCode());
}
public static <T, K> T put(
WebTarget target, K request, Class<T> clz, Status expectedStatus, Map<String, String> headers)
throws HttpResponseException {
Response response = addHeaders(target, headers).method("PUT", Entity.entity(request, MediaType.APPLICATION_JSON));
Response response =
SecurityUtil.addHeaders(target, headers).method("PUT", Entity.entity(request, MediaType.APPLICATION_JSON));
return readResponse(response, clz, expectedStatus.getStatusCode());
}
public static <T> T get(WebTarget target, Class<T> clz, Map<String, String> headers) throws HttpResponseException {
final Response response = addHeaders(target, headers).get();
final Response response = SecurityUtil.addHeaders(target, headers).get();
return readResponse(response, clz, Status.OK.getStatusCode());
}
public static <T> T delete(WebTarget target, Class<T> clz, Map<String, String> headers) throws HttpResponseException {
final Response response = addHeaders(target, headers).delete();
final Response response = SecurityUtil.addHeaders(target, headers).delete();
return readResponse(response, clz, Status.OK.getStatusCode());
}
public static void delete(WebTarget target, Map<String, String> headers) throws HttpResponseException {
final Response response = addHeaders(target, headers).delete();
final Response response = SecurityUtil.addHeaders(target, headers).delete();
if (!HttpStatus.isSuccess(response.getStatus())) {
readResponseError(response);
}

View File

@ -91,6 +91,14 @@ public final class CommonUtil {
return calendar.getTime();
}
/** Get date after {@code days} from the given date or before i{@code days} when it is negative */
public static Date getDateByOffsetSeconds(Date date, int seconds) throws ParseException {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.SECOND, seconds);
return calendar.getTime();
}
/** Get date after {@code days} from the given date or before i{@code days} when it is negative */
public static Date getDateByOffset(DateFormat dateFormat, String strDate, int days) throws ParseException {
Date date = dateFormat.parse(strDate);

10
pom.xml
View File

@ -378,7 +378,11 @@
<artifactId>jackson-datatype-jsr353</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -422,7 +426,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jxr-plugin</artifactId>
<version>2.3</version>
<version>3.1.1</version>
</plugin>
</plugins>
</reporting>
@ -441,7 +445,7 @@
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>9.1</version>
<version>9.2</version>
</dependency>
</dependencies>
</plugin>