Event logging

This commit is contained in:
Suresh Srinivas 2021-08-03 22:40:42 -07:00
parent 8a92d253db
commit 00f541d1b4
10 changed files with 301 additions and 3 deletions

View File

@ -23,6 +23,10 @@ then
fi
base_dir=$(dirname $0)/..
CATALOG_HOME=$base_dir
# OpenMetadata env script
. $CATALOG_HOME/conf/catalog-env.sh
if [ "x$CATALOG_HEAP_OPTS" = "x" ]; then
export CATALOG_HEAP_OPTS="-Xmx1G -Xms1G"
fi

View File

@ -19,7 +19,8 @@
# Home Dir
base_dir=$(dirname $0)/..
#if HDP_DIR is not set its a dev env.
CATALOG_HOME=$base_dirbase_dir=$(dirname $0)/..
CATALOG_HOME=$base_dir
PID_DIR=$base_dir/logs
LOG_DIR=$base_dir/logs

View File

@ -210,3 +210,12 @@ CREATE TABLE IF NOT EXISTS tag_usage (
timestamp BIGINT,
UNIQUE KEY unique_name(tagFQN, targetFQN)
);
CREATE TABLE IF NOT EXISTS audit_log (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
entityType VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.entityType') NOT NULL,
username VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.username') NOT NULL,
json JSON NOT NULL,
timestamp BIGINT,
PRIMARY KEY (id)
);

View File

@ -20,6 +20,7 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import io.dropwizard.health.conf.HealthConfiguration;
import io.dropwizard.health.core.HealthCheckBundle;
import org.openmetadata.catalog.events.EventFilter;
import org.openmetadata.catalog.exception.CatalogGenericExceptionMapper;
import org.openmetadata.catalog.exception.ConstraintViolationExceptionMapper;
import org.openmetadata.catalog.security.AuthenticationConfiguration;
@ -55,6 +56,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.core.Response;
import java.lang.reflect.InvocationTargetException;
@ -99,6 +101,9 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
environment.jersey().register(new EarlyEofExceptionMapper());
environment.healthChecks().register("UserDatabaseCheck", new CatalogHealthCheck(catalogConfig, jdbi));
registerResources(catalogConfig, environment, jdbi);
// Register Event Handler
registerEventFilter(catalogConfig, environment, jdbi);
}
@SneakyThrows
@ -149,6 +154,11 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
injector = Guice.createInjector(new CatalogModule(authorizer));
}
private void registerEventFilter(CatalogApplicationConfig catalogConfig, Environment environment, DBI jdbi) {
ContainerResponseFilter eventFilter = new EventFilter(catalogConfig, jdbi);
environment.jersey().register(eventFilter);
}
private void registerResources(CatalogApplicationConfig config, Environment environment, DBI jdbi) {
jdbi.registerContainerFactory(new OptionalContainerFactory());

View File

@ -29,7 +29,7 @@ import static org.openmetadata.catalog.resources.teams.UserResource.FIELD_LIST;
public class CatalogHealthCheck extends HealthCheck {
private final UserRepository userRepository;
private final EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, "teams");
private final EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, "profile");
public CatalogHealthCheck(CatalogApplicationConfig config, DBI jdbi) {
super();

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.events;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.entity.audit.AuditLog;
import org.openmetadata.catalog.jdbi3.AuditLogRepository;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.EntityUtil;
import org.skife.jdbi.v2.DBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.ext.Provider;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.UUID;
@Provider
public class EventFilter implements ContainerResponseFilter {
private static final Logger LOG = LoggerFactory.getLogger(EventFilter.class);
private static final List<String> AUDITABLE_METHODS = Arrays.asList("POST", "PUT", "PATCH", "DELETE");
private final DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'"); // Quoted "Z" to indicate UTC, no timezone offset
private AuditLogRepository auditLogRepository;
@SuppressWarnings("unused")
private EventFilter() {
}
public EventFilter(CatalogApplicationConfig config, DBI dbi) {
this.auditLogRepository = dbi.onDemand(AuditLogRepository.class);
TimeZone tz = TimeZone.getTimeZone("UTC");
this.df.setTimeZone(tz);
}
@Override
public void filter(ContainerRequestContext requestContext,
ContainerResponseContext responseContext) throws IOException {
int responseCode = responseContext.getStatus();
String method = requestContext.getMethod();
if ((responseCode < 200 || responseCode > 299) || (!AUDITABLE_METHODS.contains(method))) {
return;
}
if (responseContext.getEntity() != null) {
String path = requestContext.getUriInfo().getPath();
String username = requestContext.getSecurityContext().getUserPrincipal().getName();
String nowAsISO = df.format(new Date());
try {
EntityReference entityReference = EntityUtil.getEntityReference(responseContext.getEntity(),
responseContext.getEntity().getClass());
if (entityReference != null) {
AuditLog auditLog = new AuditLog().withId(UUID.randomUUID())
.withPath(path)
.withDate(nowAsISO)
.withEntityId(entityReference.getId())
.withEntityType(entityReference.getType())
.withEntity(entityReference)
.withMethod(method)
.withUsername(username)
.withResponseCode(responseCode);
auditLogRepository.create(auditLog);
LOG.debug("Added audit log entry: {}", auditLog);
} else {
LOG.error("Failed to capture audit log for {}", path);
}
} catch(Exception e) {
LOG.error("Failed to capture audit log due to {}", e.getMessage());
}
}
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import org.openmetadata.catalog.entity.audit.AuditLog;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.JsonUtils;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public abstract class AuditLogRepository {
public static final Logger LOG = LoggerFactory.getLogger(AuditLogRepository.class);
@CreateSqlObject
abstract AuditLogDAO auditLogDAO();
@Transaction
public List<AuditLog> list() throws IOException {
List<String> jsons = auditLogDAO().list();
List<AuditLog> auditLogs = new ArrayList<>();
for (String json : jsons) {
auditLogs.add(JsonUtils.readValue(json, AuditLog.class));
}
return auditLogs;
}
@Transaction
public AuditLog get(String id) throws IOException {
return EntityUtil.validate(id, auditLogDAO().findById(id), AuditLog.class);
}
@Transaction
public List<AuditLog> getByEntityType(String entityType) throws IOException {
List<String> jsons = auditLogDAO().findByEntityType(entityType);
List<AuditLog> auditLogs = new ArrayList<>();
for (String json: jsons) {
auditLogs.add(JsonUtils.readValue(json, AuditLog.class));
}
return auditLogs;
}
@Transaction
public AuditLog create(AuditLog auditLog) throws IOException {
auditLogDAO().insert(JsonUtils.pojoToJson(auditLog));
return auditLog;
}
@Transaction
public void delete(String id) throws IOException {
auditLogDAO().delete(id);
}
public interface AuditLogDAO {
@SqlUpdate("INSERT INTO audit_log (json) VALUES (:json)")
void insert(@Bind("json") String json);
@SqlQuery("SELECT json FROM audit_log WHERE id = :id")
String findById(@Bind("id") String id);
@SqlQuery("SELECT json FROM audit_log WHERE entity_type = :entity_type")
List<String> findByEntityType(@Bind("entity_type") String entityType);
@SqlQuery("SELECT json FROM audit_log")
List<String> list();
@SqlUpdate("UPDATE audit_log_entity SET json = :json WHERE id = :id")
void update(@Bind("id") String id, @Bind("json") String json);
@SqlUpdate("DELETE FROM audit_log_entity WHERE id = :id")
int delete(@Bind("id") String id);
}
}

View File

@ -254,7 +254,7 @@ public abstract class UserRepository {
List<String> teamIds = relationshipDAO().findFrom(user.getId().toString(), CONTAINS.ordinal(), "team");
List<Team> teams = new ArrayList<>();
for (String teamId : teamIds) {
LOG.info("Adding team {}", teamId);
LOG.debug("Adding team {}", teamId);
String json = teamDAO().findById(teamId);
Team team = JsonUtils.readValue(json, Team.class);
if (team != null) {

View File

@ -61,6 +61,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@ -265,6 +266,33 @@ public final class EntityUtil {
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn));
}
public static EntityReference getEntityReference(Object entity, Class<?> clazz) throws IOException {
if (clazz.toString().toLowerCase().endsWith(Entity.TABLE.toLowerCase())) {
Table instance = (Table) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.DATABASE.toLowerCase())) {
Database instance = (Database) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.METRICS.toLowerCase())) {
Metrics instance = (Metrics) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.DATABASE_SERVICE.toLowerCase())) {
DatabaseService instance = (DatabaseService) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.REPORT.toLowerCase())) {
Report instance = (Report) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.TEAM.toLowerCase())) {
Team instance = (Team) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.USER.toLowerCase())) {
User instance = (User) entity;
return getEntityReference(instance);
}
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(
String.format("Failed to find entity class {}", clazz.toString())));
}
public static EntityReference getEntityReference(DatabaseService service) {
return new EntityReference().withName(service.getName()).withId(service.getId())
.withType(Entity.DATABASE_SERVICE);

View File

@ -0,0 +1,49 @@
{
"$id": "https://streaminlinedata.ai/entity/audit/auditLog.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Audit Log entity",
"description": "Entity that represents a Audit Log",
"type": "object",
"properties" : {
"id": {
"description": "Unique identifier that identifies a Audit Log Entry",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"method": {
"description": "HTTP Method used in a call",
"type": "string",
"minLength": 1,
"maxLength": 64
},
"responseCode": {
"description": "HTTP response code for the api requested",
"type": "integer"
},
"path": {
"description": "requested API Path",
"type": "string"
},
"username": {
"description": "Name of the user who requested for the API",
"type": "string"
},
"date": {
"description": "Date which the api call is made",
"$ref": "../../type/basic.json#/definitions/date"
},
"entityId": {
"description": "entityReference Id",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"entityType": {
"description": "Entity Type",
"type": "string"
},
"entity" : {
"description": "Link to entity on which api request is done",
"$ref" : "../../type/entityReference.json"
}
},
"required": ["id", "method", "responseCode", "user", "entity"]
}