Event logging

This commit is contained in:
Suresh Srinivas 2021-08-05 00:25:18 -07:00
parent 00f541d1b4
commit bf7e86f846
5 changed files with 195 additions and 46 deletions

View File

@ -0,0 +1,70 @@
package org.openmetadata.catalog.events;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.entity.audit.AuditLog;
import org.openmetadata.catalog.jdbi3.AuditLogRepository;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.EntityUtil;
import org.skife.jdbi.v2.DBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
import java.util.UUID;
public class AuditEventHandler implements EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(AuditEventHandler.class);
private AuditLogRepository auditLogRepository;
private final DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'"); // Quoted "Z" to indicate UTC,
// no timezone offset
public void init(CatalogApplicationConfig config, DBI jdbi) {
this.auditLogRepository = jdbi.onDemand(AuditLogRepository.class);
TimeZone tz = TimeZone.getTimeZone("UTC");
this.df.setTimeZone(tz);
}
public Void process(ContainerRequestContext requestContext,
ContainerResponseContext responseContext) {
int responseCode = responseContext.getStatus();
String method = requestContext.getMethod();
LOG.info("are we processing, harsha");
if (responseContext.getEntity() != null) {
String path = requestContext.getUriInfo().getPath();
String username = requestContext.getSecurityContext().getUserPrincipal().getName();
String nowAsISO = df.format(new Date());
try {
EntityReference entityReference = EntityUtil.getEntityReference(responseContext.getEntity(),
responseContext.getEntity().getClass());
if (entityReference != null) {
AuditLog auditLog = new AuditLog().withId(UUID.randomUUID())
.withPath(path)
.withDate(nowAsISO)
.withEntityId(entityReference.getId())
.withEntityType(entityReference.getType())
.withEntity(entityReference)
.withMethod(method)
.withUsername(username)
.withResponseCode(responseCode);
auditLogRepository.create(auditLog);
LOG.debug("Added audit log entry: {}", auditLog);
} else {
LOG.error("Failed to capture audit log for {}", path);
}
} catch(Exception e) {
LOG.error("Failed to capture audit log due to {}", e.getMessage());
}
}
return null;
}
public void close() {}
}

View File

@ -17,10 +17,7 @@
package org.openmetadata.catalog.events;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.entity.audit.AuditLog;
import org.openmetadata.catalog.jdbi3.AuditLogRepository;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.ParallelStreamUtil;
import org.skife.jdbi.v2.DBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,30 +27,30 @@ import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.ext.Provider;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
@Provider
public class EventFilter implements ContainerResponseFilter {
private static final Logger LOG = LoggerFactory.getLogger(EventFilter.class);
private static final List<String> AUDITABLE_METHODS = Arrays.asList("POST", "PUT", "PATCH", "DELETE");
private final DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'"); // Quoted "Z" to indicate UTC, no timezone offset
private AuditLogRepository auditLogRepository;
private static final int FORK_JOIN_POOL_PARALLELISM = 20;
private CatalogApplicationConfig config;
private DBI jdbi;
private final ForkJoinPool forkJoinPool;
List<EventHandler> eventHandlers;
@SuppressWarnings("unused")
private EventFilter() {
}
public EventFilter(CatalogApplicationConfig config, DBI dbi) {
this.auditLogRepository = dbi.onDemand(AuditLogRepository.class);
TimeZone tz = TimeZone.getTimeZone("UTC");
this.df.setTimeZone(tz);
public EventFilter(CatalogApplicationConfig config, DBI jdbi) {
this.config = config;
this.jdbi = jdbi;
this.forkJoinPool = new ForkJoinPool(FORK_JOIN_POOL_PARALLELISM);
this.eventHandlers = new ArrayList<>();
AuditEventHandler auditEventHandler = new AuditEventHandler();
auditEventHandler.init(config, jdbi);
eventHandlers.add(auditEventHandler);
}
@Override
@ -65,33 +62,11 @@ public class EventFilter implements ContainerResponseFilter {
if ((responseCode < 200 || responseCode > 299) || (!AUDITABLE_METHODS.contains(method))) {
return;
}
if (responseContext.getEntity() != null) {
String path = requestContext.getUriInfo().getPath();
String username = requestContext.getSecurityContext().getUserPrincipal().getName();
String nowAsISO = df.format(new Date());
try {
EntityReference entityReference = EntityUtil.getEntityReference(responseContext.getEntity(),
responseContext.getEntity().getClass());
if (entityReference != null) {
AuditLog auditLog = new AuditLog().withId(UUID.randomUUID())
.withPath(path)
.withDate(nowAsISO)
.withEntityId(entityReference.getId())
.withEntityType(entityReference.getType())
.withEntity(entityReference)
.withMethod(method)
.withUsername(username)
.withResponseCode(responseCode);
auditLogRepository.create(auditLog);
LOG.debug("Added audit log entry: {}", auditLog);
} else {
LOG.error("Failed to capture audit log for {}", path);
}
} catch(Exception e) {
LOG.error("Failed to capture audit log due to {}", e.getMessage());
}
}
eventHandlers.parallelStream().forEach(eventHandler -> ParallelStreamUtil.runAsync(() ->
eventHandler.process(requestContext, responseContext), forkJoinPool));
}
}

View File

@ -0,0 +1,13 @@
package org.openmetadata.catalog.events;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.skife.jdbi.v2.DBI;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
public interface EventHandler {
void init(CatalogApplicationConfig config, DBI jdbi);
Void process(ContainerRequestContext requestContext, ContainerResponseContext responseContext);
void close();
}

View File

@ -61,7 +61,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

View File

@ -0,0 +1,92 @@
package org.openmetadata.catalog.util;
import com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class ParallelStreamUtil {
private static final Logger LOG = LoggerFactory.getLogger(ParallelStreamUtil.class);
public static <T> T execute(Supplier<T> supplier, Executor executor) {
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.debug("execute start");
try {
CompletableFuture<T> resultFuture = CompletableFuture.supplyAsync(supplier, executor);
return resultFuture.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
handleExecutionException(e);
// shouldn't reach here
throw new IllegalStateException("Shouldn't reach here");
} finally {
LOG.debug("execute complete - elapsed: {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
stopwatch.stop();
}
}
public static <T> T executeWithTimeout(int timeoutInSeconds, Supplier<T> supplier, Executor executor) {
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.debug("execute start");
try {
CompletableFuture<T> resultFuture = CompletableFuture.supplyAsync(supplier, executor);
return resultFuture.get(timeoutInSeconds, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
handleExecutionException(e);
// shouldn't reach here
throw new IllegalStateException("Shouldn't reach here");
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
LOG.debug("execute complete - elapsed: {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
stopwatch.stop();
}
}
public static void runAsync(Callable<Void> callable, Executor executor) {
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.debug("runAsync start");
CompletableFuture<Void> res = CompletableFuture.supplyAsync(() -> {
try {
return callable.call();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}, executor);
res.whenComplete((r, th) -> {
// LOG any exceptions
if (th != null) {
LOG.error("Got exception while running async task", th.getCause());
}
LOG.debug("runAsync complete - elapsed: {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
stopwatch.stop();
});
}
private static void handleExecutionException(ExecutionException e) {
Throwable t = e.getCause();
if (t != null) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RuntimeException(t);
}
} else {
throw new RuntimeException(e);
}
}
}