Merge pull request #106 from open-metadata/fix_event_handlers

Add event handler configuration and make it optional
This commit is contained in:
Suresh Srinivas 2021-08-11 20:54:58 -07:00 committed by GitHub
commit 51813c77e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 94 additions and 20 deletions

View File

@ -212,10 +212,9 @@ CREATE TABLE IF NOT EXISTS tag_usage (
); );
CREATE TABLE IF NOT EXISTS audit_log ( CREATE TABLE IF NOT EXISTS audit_log (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.entityId') STORED NOT NULL, entityId VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.entityId') NOT NULL,
entityType VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.entityType') NOT NULL, entityType VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.entityType') NOT NULL,
username VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.userName') NOT NULL, username VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.userName') NOT NULL,
json JSON NOT NULL, json JSON NOT NULL,
timestamp BIGINT, timestamp BIGINT
PRIMARY KEY (id)
); );

View File

@ -155,9 +155,11 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
} }
private void registerEventFilter(CatalogApplicationConfig catalogConfig, Environment environment, DBI jdbi) { private void registerEventFilter(CatalogApplicationConfig catalogConfig, Environment environment, DBI jdbi) {
if (catalogConfig.getEventHandlerConfiguration() != null) {
ContainerResponseFilter eventFilter = new EventFilter(catalogConfig, jdbi); ContainerResponseFilter eventFilter = new EventFilter(catalogConfig, jdbi);
environment.jersey().register(eventFilter); environment.jersey().register(eventFilter);
} }
}
private void registerResources(CatalogApplicationConfig config, Environment environment, DBI jdbi) { private void registerResources(CatalogApplicationConfig config, Environment environment, DBI jdbi) {

View File

@ -18,6 +18,7 @@ package org.openmetadata.catalog;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.dropwizard.health.conf.HealthConfiguration; import io.dropwizard.health.conf.HealthConfiguration;
import org.openmetadata.catalog.events.EventHandlerConfiguration;
import org.openmetadata.catalog.security.AuthenticationConfiguration; import org.openmetadata.catalog.security.AuthenticationConfiguration;
import org.openmetadata.catalog.security.AuthorizerConfiguration; import org.openmetadata.catalog.security.AuthorizerConfiguration;
import io.dropwizard.Configuration; import io.dropwizard.Configuration;
@ -45,6 +46,9 @@ public class CatalogApplicationConfig extends Configuration {
@JsonProperty("elasticsearch") @JsonProperty("elasticsearch")
private ElasticSearchConfiguration elasticSearchConfiguration; private ElasticSearchConfiguration elasticSearchConfiguration;
@JsonProperty("eventHandlerConfiguration")
private EventHandlerConfiguration eventHandlerConfiguration;
public DataSourceFactory getDataSourceFactory() { public DataSourceFactory getDataSourceFactory() {
return dataSourceFactory; return dataSourceFactory;
} }
@ -77,6 +81,13 @@ public class CatalogApplicationConfig extends Configuration {
this.elasticSearchConfiguration = elasticSearchConfiguration; this.elasticSearchConfiguration = elasticSearchConfiguration;
} }
public EventHandlerConfiguration getEventHandlerConfiguration() {
return eventHandlerConfiguration;
}
public void setEventHandlerConfiguration(EventHandlerConfiguration eventHandlerConfiguration) {
this.eventHandlerConfiguration = eventHandlerConfiguration;
}
@Valid @Valid
@NotNull @NotNull
@JsonProperty("health") @JsonProperty("health")

View File

@ -17,12 +17,15 @@
package org.openmetadata.catalog.events; package org.openmetadata.catalog.events;
import org.openmetadata.catalog.CatalogApplicationConfig; import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.security.AuthenticationConfiguration;
import org.openmetadata.catalog.security.CatalogAuthorizer;
import org.openmetadata.catalog.util.ParallelStreamUtil; import org.openmetadata.catalog.util.ParallelStreamUtil;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter; import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.ext.Provider; import javax.ws.rs.ext.Provider;
@ -30,6 +33,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
@Provider @Provider
@ -48,12 +52,21 @@ public class EventFilter implements ContainerResponseFilter {
this.jdbi = jdbi; this.jdbi = jdbi;
this.forkJoinPool = new ForkJoinPool(FORK_JOIN_POOL_PARALLELISM); this.forkJoinPool = new ForkJoinPool(FORK_JOIN_POOL_PARALLELISM);
this.eventHandlers = new ArrayList<>(); this.eventHandlers = new ArrayList<>();
AuditEventHandler auditEventHandler = new AuditEventHandler(); registerEventHandlers(config, jdbi);
auditEventHandler.init(config, jdbi); }
eventHandlers.add(auditEventHandler);
ElasticSearchEventHandler elasticSearchEventHandler = new ElasticSearchEventHandler(); private void registerEventHandlers(CatalogApplicationConfig config, DBI jdbi) {
elasticSearchEventHandler.init(config, jdbi); try {
eventHandlers.add(elasticSearchEventHandler); Set<String> eventHandlerClassNames = config.getEventHandlerConfiguration().getEventHandlerClassNames();
for (String eventHandlerClassName : eventHandlerClassNames) {
EventHandler eventHandler = ((Class<EventHandler>) Class.forName(eventHandlerClassName))
.getConstructor().newInstance();
eventHandler.init(config, jdbi);
eventHandlers.add(eventHandler);
}
} catch (Exception e) {
LOG.info("Failed instantiate and regisger event handler {}".format(e.getMessage()));
}
} }
@Override @Override

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.events;
import java.util.Set;
public class EventHandlerConfiguration {
private Set<String> eventHandlerClassNames;
public Set<String> getEventHandlerClassNames() {
return eventHandlerClassNames;
}
public void setEventHandlerClassNames(Set<String> eventHandlerClassNames) {
this.eventHandlerClassNames = eventHandlerClassNames;
}
}

View File

@ -32,7 +32,7 @@ public abstract class CatalogApplicationTest {
public static final DropwizardAppExtension<CatalogApplicationConfig> APP; public static final DropwizardAppExtension<CatalogApplicationConfig> APP;
static { static {
CONFIG_PATH = ResourceHelpers.resourceFilePath("catalog-secure-test.yaml"); CONFIG_PATH = ResourceHelpers.resourceFilePath("openmetadata-secure-test.yaml");
APP = new DropwizardAppExtension<>(CatalogApplication.class, CONFIG_PATH); APP = new DropwizardAppExtension<>(CatalogApplication.class, CONFIG_PATH);
} }

View File

@ -44,14 +44,15 @@ public class EmbeddedMySqlSupport implements BeforeAllCallback, AfterAllCallback
.withUser("test", "") .withUser("test", "")
.build(); .build();
SchemaConfig schemaConfig = SchemaConfig.aSchemaConfig("catalog_test_db").build(); SchemaConfig schemaConfig = SchemaConfig.aSchemaConfig("openmetadata_test_db").build();
embeddedMysql = EmbeddedMysql.anEmbeddedMysql(config).addSchema(schemaConfig).start(); embeddedMysql = EmbeddedMysql.anEmbeddedMysql(config).addSchema(schemaConfig).start();
LOG.info("Embedded MySQL is started"); LOG.info("Embedded MySQL is started");
Flyway flyway = Flyway.configure() Flyway flyway = Flyway.configure()
// TODO Remove hardcoding // TODO Remove hardcoding
.dataSource("jdbc:mysql://localhost:3307/catalog_test_db?useSSL=false&serverTimezone=UTC", "test", "") .dataSource("jdbc:mysql://localhost:3307/openmetadata_test_db?useSSL=false&serverTimezone=UTC",
"test", "")
.sqlMigrationPrefix("v") .sqlMigrationPrefix("v")
.load(); .load();
flyway.clean(); flyway.clean();

View File

@ -93,8 +93,8 @@ logging:
appenders: appenders:
- type: console - type: console
- type: file - type: file
currentLogFilename: './logs/catalog.log' currentLogFilename: './logs/openmetadata.log'
archivedLogFilenamePattern: './logs/catalog-%d-%i.log.gz' archivedLogFilenamePattern: './logs/openmetadata-%d-%i.log.gz'
archive: true archive: true
archivedFileCount: 7 archivedFileCount: 7
maxFileSize: '1MiB' maxFileSize: '1MiB'
@ -106,7 +106,7 @@ database:
user: test user: test
password: password:
# the JDBC URL; the database is called washvalet # the JDBC URL; the database is called washvalet
url: jdbc:mysql://localhost:3307/catalog_test_db?useSSL=false&serverTimezone=UTC url: jdbc:mysql://localhost:3307/openmetadata_test_db?useSSL=false&serverTimezone=UTC
elasticsearch: elasticsearch:
host: localhost host: localhost
@ -138,3 +138,8 @@ authenticationConfiguration:
authority: "https://accounts.google.com" authority: "https://accounts.google.com"
clientId: "261867039324-neb92r2147i6upchb78tv29idk079bps.apps.googleusercontent.com" clientId: "261867039324-neb92r2147i6upchb78tv29idk079bps.apps.googleusercontent.com"
callbackUrl: "http://localhost:8585/callback" callbackUrl: "http://localhost:8585/callback"
eventHandlerConfiguration:
eventHandlerClassNames:
- "org.openmetadata.catalog.events.AuditEventHandler"

View File

@ -138,6 +138,12 @@ elasticsearch:
host: localhost host: localhost
port: 9200 port: 9200
eventHandlerConfiguration:
eventHandlerClassNames:
- "org.openmetadata.catalog.events.AuditEventHandler"
- "org.openmetadata.catalog.events.ElasticSearchEventHandler"
health: health:
delayedShutdownHandlerEnabled: true delayedShutdownHandlerEnabled: true
shutdownWaitPeriod: 1s shutdownWaitPeriod: 1s

View File

@ -99,8 +99,8 @@ logging:
- type: file - type: file
threshold: TRACE threshold: TRACE
logFormat: "%level [%d{HH:mm:ss.SSS}] [%t] %logger{5} - %msg %n" logFormat: "%level [%d{HH:mm:ss.SSS}] [%t] %logger{5} - %msg %n"
currentLogFilename: ./logs/catalog.log currentLogFilename: ./logs/openmetadata.log
archivedLogFilenamePattern: ./logs/catalog-%d{yyyy-MM-dd}-%i.log.gz archivedLogFilenamePattern: ./logs/openmetadata-%d{yyyy-MM-dd}-%i.log.gz
archivedFileCount: 7 archivedFileCount: 7
timeZone: UTC timeZone: UTC
maxFileSize: 50MB maxFileSize: 50MB
@ -119,6 +119,11 @@ elasticsearch:
host: localhost host: localhost
port: 9200 port: 9200
eventHandlerConfiguration:
eventHandlerClassNames:
- "org.openmetadata.catalog.events.AuditEventHandler"
- "org.openmetadata.catalog.events.ElasticSearchEventHandler"
health: health:
delayedShutdownHandlerEnabled: true delayedShutdownHandlerEnabled: true
shutdownWaitPeriod: 1s shutdownWaitPeriod: 1s

View File

@ -7,7 +7,7 @@
"service_name": "local_mysql", "service_name": "local_mysql",
"service_type": "MySQL", "service_type": "MySQL",
"include_pattern": { "include_pattern": {
"filter": ["mysql.*", "information_schema.*"] "excludes": ["mysql.*", "information_schema.*"]
} }
} }
}, },