mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-03 20:19:31 +00:00 
			
		
		
		
	
						commit
						7c219d891f
					
				@ -23,6 +23,10 @@ then
 | 
				
			|||||||
fi
 | 
					fi
 | 
				
			||||||
base_dir=$(dirname $0)/..
 | 
					base_dir=$(dirname $0)/..
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					CATALOG_HOME=$base_dir
 | 
				
			||||||
 | 
					# OpenMetadata env script
 | 
				
			||||||
 | 
					. $CATALOG_HOME/conf/catalog-env.sh
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if [ "x$CATALOG_HEAP_OPTS" = "x" ]; then
 | 
					if [ "x$CATALOG_HEAP_OPTS" = "x" ]; then
 | 
				
			||||||
    export CATALOG_HEAP_OPTS="-Xmx1G -Xms1G"
 | 
					    export CATALOG_HEAP_OPTS="-Xmx1G -Xms1G"
 | 
				
			||||||
fi
 | 
					fi
 | 
				
			||||||
 | 
				
			|||||||
@ -19,7 +19,8 @@
 | 
				
			|||||||
# Home Dir
 | 
					# Home Dir
 | 
				
			||||||
base_dir=$(dirname $0)/..
 | 
					base_dir=$(dirname $0)/..
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#if HDP_DIR is not set its a dev env.
 | 
					CATALOG_HOME=$base_dirbase_dir=$(dirname $0)/..
 | 
				
			||||||
 | 
					
 | 
				
			||||||
CATALOG_HOME=$base_dir
 | 
					CATALOG_HOME=$base_dir
 | 
				
			||||||
PID_DIR=$base_dir/logs
 | 
					PID_DIR=$base_dir/logs
 | 
				
			||||||
LOG_DIR=$base_dir/logs
 | 
					LOG_DIR=$base_dir/logs
 | 
				
			||||||
 | 
				
			|||||||
@ -210,3 +210,12 @@ CREATE TABLE IF NOT EXISTS tag_usage (
 | 
				
			|||||||
    timestamp BIGINT,
 | 
					    timestamp BIGINT,
 | 
				
			||||||
    UNIQUE KEY unique_name(tagFQN, targetFQN)
 | 
					    UNIQUE KEY unique_name(tagFQN, targetFQN)
 | 
				
			||||||
);
 | 
					);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					CREATE TABLE IF NOT EXISTS audit_log (
 | 
				
			||||||
 | 
					    id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
 | 
				
			||||||
 | 
					    entityType VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.entityType') NOT NULL,
 | 
				
			||||||
 | 
					    username VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.username') NOT NULL,
 | 
				
			||||||
 | 
					    json JSON NOT NULL,
 | 
				
			||||||
 | 
					    timestamp BIGINT,
 | 
				
			||||||
 | 
					    PRIMARY KEY (id)
 | 
				
			||||||
 | 
					);
 | 
				
			||||||
@ -20,6 +20,7 @@ import com.google.inject.Guice;
 | 
				
			|||||||
import com.google.inject.Injector;
 | 
					import com.google.inject.Injector;
 | 
				
			||||||
import io.dropwizard.health.conf.HealthConfiguration;
 | 
					import io.dropwizard.health.conf.HealthConfiguration;
 | 
				
			||||||
import io.dropwizard.health.core.HealthCheckBundle;
 | 
					import io.dropwizard.health.core.HealthCheckBundle;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.events.EventFilter;
 | 
				
			||||||
import org.openmetadata.catalog.exception.CatalogGenericExceptionMapper;
 | 
					import org.openmetadata.catalog.exception.CatalogGenericExceptionMapper;
 | 
				
			||||||
import org.openmetadata.catalog.exception.ConstraintViolationExceptionMapper;
 | 
					import org.openmetadata.catalog.exception.ConstraintViolationExceptionMapper;
 | 
				
			||||||
import org.openmetadata.catalog.security.AuthenticationConfiguration;
 | 
					import org.openmetadata.catalog.security.AuthenticationConfiguration;
 | 
				
			||||||
@ -55,6 +56,7 @@ import org.slf4j.Logger;
 | 
				
			|||||||
import org.slf4j.LoggerFactory;
 | 
					import org.slf4j.LoggerFactory;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import javax.ws.rs.container.ContainerRequestFilter;
 | 
					import javax.ws.rs.container.ContainerRequestFilter;
 | 
				
			||||||
 | 
					import javax.ws.rs.container.ContainerResponseFilter;
 | 
				
			||||||
import javax.ws.rs.core.Response;
 | 
					import javax.ws.rs.core.Response;
 | 
				
			||||||
import java.lang.reflect.InvocationTargetException;
 | 
					import java.lang.reflect.InvocationTargetException;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -99,6 +101,9 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
 | 
				
			|||||||
    environment.jersey().register(new EarlyEofExceptionMapper());
 | 
					    environment.jersey().register(new EarlyEofExceptionMapper());
 | 
				
			||||||
    environment.healthChecks().register("UserDatabaseCheck", new CatalogHealthCheck(catalogConfig, jdbi));
 | 
					    environment.healthChecks().register("UserDatabaseCheck", new CatalogHealthCheck(catalogConfig, jdbi));
 | 
				
			||||||
    registerResources(catalogConfig, environment, jdbi);
 | 
					    registerResources(catalogConfig, environment, jdbi);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // Register Event Handler
 | 
				
			||||||
 | 
					    registerEventFilter(catalogConfig, environment, jdbi);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  @SneakyThrows
 | 
					  @SneakyThrows
 | 
				
			||||||
@ -149,6 +154,11 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
 | 
				
			|||||||
    injector = Guice.createInjector(new CatalogModule(authorizer));
 | 
					    injector = Guice.createInjector(new CatalogModule(authorizer));
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  private void registerEventFilter(CatalogApplicationConfig catalogConfig, Environment environment, DBI jdbi) {
 | 
				
			||||||
 | 
					    ContainerResponseFilter eventFilter = new EventFilter(catalogConfig, jdbi);
 | 
				
			||||||
 | 
					    environment.jersey().register(eventFilter);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  private void registerResources(CatalogApplicationConfig config, Environment environment, DBI jdbi) {
 | 
					  private void registerResources(CatalogApplicationConfig config, Environment environment, DBI jdbi) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    jdbi.registerContainerFactory(new OptionalContainerFactory());
 | 
					    jdbi.registerContainerFactory(new OptionalContainerFactory());
 | 
				
			||||||
 | 
				
			|||||||
@ -29,7 +29,7 @@ import static org.openmetadata.catalog.resources.teams.UserResource.FIELD_LIST;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
public class CatalogHealthCheck extends HealthCheck {
 | 
					public class CatalogHealthCheck extends HealthCheck {
 | 
				
			||||||
  private final UserRepository userRepository;
 | 
					  private final UserRepository userRepository;
 | 
				
			||||||
  private final EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, "teams");
 | 
					  private final EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, "profile");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public CatalogHealthCheck(CatalogApplicationConfig config, DBI jdbi) {
 | 
					  public CatalogHealthCheck(CatalogApplicationConfig config, DBI jdbi) {
 | 
				
			||||||
    super();
 | 
					    super();
 | 
				
			||||||
 | 
				
			|||||||
@ -0,0 +1,85 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  Licensed to the Apache Software Foundation (ASF) under one or more
 | 
				
			||||||
 | 
					 *  contributor license agreements. See the NOTICE file distributed with
 | 
				
			||||||
 | 
					 *  this work for additional information regarding copyright ownership.
 | 
				
			||||||
 | 
					 *  The ASF licenses this file to You under the Apache License, Version 2.0
 | 
				
			||||||
 | 
					 *  (the "License"); you may not use this file except in compliance with
 | 
				
			||||||
 | 
					 *  the License. You may obtain a copy of the License at
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 *  http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					 *  Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					 *  distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					 *  See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					 *  limitations under the License.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package org.openmetadata.catalog.events;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.CatalogApplicationConfig;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.entity.audit.AuditLog;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.jdbi3.AuditLogRepository;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.type.EntityReference;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.util.EntityUtil;
 | 
				
			||||||
 | 
					import org.skife.jdbi.v2.DBI;
 | 
				
			||||||
 | 
					import org.slf4j.Logger;
 | 
				
			||||||
 | 
					import org.slf4j.LoggerFactory;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import javax.ws.rs.container.ContainerRequestContext;
 | 
				
			||||||
 | 
					import javax.ws.rs.container.ContainerResponseContext;
 | 
				
			||||||
 | 
					import java.text.DateFormat;
 | 
				
			||||||
 | 
					import java.text.SimpleDateFormat;
 | 
				
			||||||
 | 
					import java.util.Date;
 | 
				
			||||||
 | 
					import java.util.TimeZone;
 | 
				
			||||||
 | 
					import java.util.UUID;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public class AuditEventHandler implements  EventHandler {
 | 
				
			||||||
 | 
					  private static final Logger LOG = LoggerFactory.getLogger(AuditEventHandler.class);
 | 
				
			||||||
 | 
					  private AuditLogRepository auditLogRepository;
 | 
				
			||||||
 | 
					  private final DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'"); // Quoted "Z" to indicate UTC,
 | 
				
			||||||
 | 
					  // no timezone offset
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public void init(CatalogApplicationConfig config, DBI jdbi) {
 | 
				
			||||||
 | 
					    this.auditLogRepository = jdbi.onDemand(AuditLogRepository.class);
 | 
				
			||||||
 | 
					    TimeZone tz = TimeZone.getTimeZone("UTC");
 | 
				
			||||||
 | 
					    this.df.setTimeZone(tz);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public Void process(ContainerRequestContext requestContext,
 | 
				
			||||||
 | 
					                      ContainerResponseContext responseContext) {
 | 
				
			||||||
 | 
					    int responseCode = responseContext.getStatus();
 | 
				
			||||||
 | 
					    String method = requestContext.getMethod();
 | 
				
			||||||
 | 
					    if (responseContext.getEntity() != null) {
 | 
				
			||||||
 | 
					      String path = requestContext.getUriInfo().getPath();
 | 
				
			||||||
 | 
					      String username = requestContext.getSecurityContext().getUserPrincipal().getName();
 | 
				
			||||||
 | 
					      String nowAsISO = df.format(new Date());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      try {
 | 
				
			||||||
 | 
					        EntityReference entityReference = EntityUtil.getEntityReference(responseContext.getEntity(),
 | 
				
			||||||
 | 
					                responseContext.getEntity().getClass());
 | 
				
			||||||
 | 
					        if (entityReference != null) {
 | 
				
			||||||
 | 
					          AuditLog auditLog = new AuditLog().withId(UUID.randomUUID())
 | 
				
			||||||
 | 
					                  .withPath(path)
 | 
				
			||||||
 | 
					                  .withDate(nowAsISO)
 | 
				
			||||||
 | 
					                  .withEntityId(entityReference.getId())
 | 
				
			||||||
 | 
					                  .withEntityType(entityReference.getType())
 | 
				
			||||||
 | 
					                  .withEntity(entityReference)
 | 
				
			||||||
 | 
					                  .withMethod(method)
 | 
				
			||||||
 | 
					                  .withUsername(username)
 | 
				
			||||||
 | 
					                  .withResponseCode(responseCode);
 | 
				
			||||||
 | 
					          auditLogRepository.create(auditLog);
 | 
				
			||||||
 | 
					          LOG.debug("Added audit log entry: {}", auditLog);
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					          LOG.error("Failed to capture audit log for {}", path);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					      } catch(Exception e) {
 | 
				
			||||||
 | 
					        LOG.error("Failed to capture audit log due to {}", e.getMessage());
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return null;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public void close() {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -0,0 +1,112 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  Licensed to the Apache Software Foundation (ASF) under one or more
 | 
				
			||||||
 | 
					 *  contributor license agreements. See the NOTICE file distributed with
 | 
				
			||||||
 | 
					 *  this work for additional information regarding copyright ownership.
 | 
				
			||||||
 | 
					 *  The ASF licenses this file to You under the Apache License, Version 2.0
 | 
				
			||||||
 | 
					 *  (the "License"); you may not use this file except in compliance with
 | 
				
			||||||
 | 
					 *  the License. You may obtain a copy of the License at
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 *  http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					 *  Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					 *  distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					 *  See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					 *  limitations under the License.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package org.openmetadata.catalog.events;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.apache.http.HttpHost;
 | 
				
			||||||
 | 
					import org.elasticsearch.action.ActionListener;
 | 
				
			||||||
 | 
					import org.elasticsearch.action.update.UpdateRequest;
 | 
				
			||||||
 | 
					import org.elasticsearch.action.update.UpdateResponse;
 | 
				
			||||||
 | 
					import org.elasticsearch.client.RequestOptions;
 | 
				
			||||||
 | 
					import org.elasticsearch.client.RestClient;
 | 
				
			||||||
 | 
					import org.elasticsearch.client.RestHighLevelClient;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.CatalogApplicationConfig;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.ElasticSearchConfiguration;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.Entity;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.entity.data.Table;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.type.Column;
 | 
				
			||||||
 | 
					import org.skife.jdbi.v2.DBI;
 | 
				
			||||||
 | 
					import org.slf4j.Logger;
 | 
				
			||||||
 | 
					import org.slf4j.LoggerFactory;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import javax.ws.rs.container.ContainerRequestContext;
 | 
				
			||||||
 | 
					import javax.ws.rs.container.ContainerResponseContext;
 | 
				
			||||||
 | 
					import java.util.ArrayList;
 | 
				
			||||||
 | 
					import java.util.HashMap;
 | 
				
			||||||
 | 
					import java.util.HashSet;
 | 
				
			||||||
 | 
					import java.util.List;
 | 
				
			||||||
 | 
					import java.util.Map;
 | 
				
			||||||
 | 
					import java.util.Set;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public class ElasticSearchEventHandler implements EventHandler {
 | 
				
			||||||
 | 
					  private static final Logger LOG = LoggerFactory.getLogger(AuditEventHandler.class);
 | 
				
			||||||
 | 
					  private RestHighLevelClient client;
 | 
				
			||||||
 | 
					  private final ActionListener<UpdateResponse> listener = new ActionListener<>() {
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public void onResponse(UpdateResponse updateResponse) {
 | 
				
			||||||
 | 
					      LOG.info("Updated Elastic Search", updateResponse);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public void onFailure(Exception e) {
 | 
				
			||||||
 | 
					      LOG.error("Failed to update Elastic Search", e);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public void init(CatalogApplicationConfig config, DBI jdbi) {
 | 
				
			||||||
 | 
					    ElasticSearchConfiguration esConfig = config.getElasticSearchConfiguration();
 | 
				
			||||||
 | 
					    this.client = new RestHighLevelClient(
 | 
				
			||||||
 | 
					            RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(), "http"))
 | 
				
			||||||
 | 
					    );
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public Void process(ContainerRequestContext requestContext,
 | 
				
			||||||
 | 
					                      ContainerResponseContext responseContext) {
 | 
				
			||||||
 | 
					    try {
 | 
				
			||||||
 | 
					      int responseCode = responseContext.getStatus();
 | 
				
			||||||
 | 
					      String method = requestContext.getMethod();
 | 
				
			||||||
 | 
					      if (responseContext.getEntity() != null) {
 | 
				
			||||||
 | 
					        Object entity = responseContext.getEntity();
 | 
				
			||||||
 | 
					        if (entity.getClass().toString().toLowerCase().endsWith(Entity.TABLE.toLowerCase())) {
 | 
				
			||||||
 | 
					          LOG.info("updating elastic search");
 | 
				
			||||||
 | 
					          Table instance = (Table) entity;
 | 
				
			||||||
 | 
					          Map<String, Object> jsonMap = new HashMap<>();
 | 
				
			||||||
 | 
					          jsonMap.put("description", instance.getDescription());
 | 
				
			||||||
 | 
					          Set<String> tags = new HashSet<>();
 | 
				
			||||||
 | 
					          List<String> columnDescriptions = new ArrayList<>();
 | 
				
			||||||
 | 
					          instance.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
 | 
				
			||||||
 | 
					          for(Column column: instance.getColumns()) {
 | 
				
			||||||
 | 
					            column.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
 | 
				
			||||||
 | 
					            columnDescriptions.add(column.getDescription());
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					          if (!tags.isEmpty()) {
 | 
				
			||||||
 | 
					            List<String> tagsList = new ArrayList<>();
 | 
				
			||||||
 | 
					            tagsList.addAll(tags);
 | 
				
			||||||
 | 
					            jsonMap.put("tags", tagsList);
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					          if (!columnDescriptions.isEmpty()) {
 | 
				
			||||||
 | 
					            jsonMap.put("column_descriptions", columnDescriptions);
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					          UpdateRequest updateRequest = new UpdateRequest("table_search_index", instance.getId().toString());
 | 
				
			||||||
 | 
					          updateRequest.doc(jsonMap);
 | 
				
			||||||
 | 
					          client.updateAsync(updateRequest, RequestOptions.DEFAULT, listener);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    } catch (Exception e) {
 | 
				
			||||||
 | 
					      LOG.error("failed to update ES doc", e);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return null;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public void close() {
 | 
				
			||||||
 | 
					    try {
 | 
				
			||||||
 | 
					      this.client.close();
 | 
				
			||||||
 | 
					    } catch (Exception e) {
 | 
				
			||||||
 | 
					      LOG.error("Failed to close elastic search", e);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -0,0 +1,75 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  Licensed to the Apache Software Foundation (ASF) under one or more
 | 
				
			||||||
 | 
					 *  contributor license agreements. See the NOTICE file distributed with
 | 
				
			||||||
 | 
					 *  this work for additional information regarding copyright ownership.
 | 
				
			||||||
 | 
					 *  The ASF licenses this file to You under the Apache License, Version 2.0
 | 
				
			||||||
 | 
					 *  (the "License"); you may not use this file except in compliance with
 | 
				
			||||||
 | 
					 *  the License. You may obtain a copy of the License at
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 *  http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					 *  Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					 *  distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					 *  See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					 *  limitations under the License.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package org.openmetadata.catalog.events;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.CatalogApplicationConfig;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.util.ParallelStreamUtil;
 | 
				
			||||||
 | 
					import org.skife.jdbi.v2.DBI;
 | 
				
			||||||
 | 
					import org.slf4j.Logger;
 | 
				
			||||||
 | 
					import org.slf4j.LoggerFactory;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import javax.ws.rs.container.ContainerRequestContext;
 | 
				
			||||||
 | 
					import javax.ws.rs.container.ContainerResponseContext;
 | 
				
			||||||
 | 
					import javax.ws.rs.container.ContainerResponseFilter;
 | 
				
			||||||
 | 
					import javax.ws.rs.ext.Provider;
 | 
				
			||||||
 | 
					import java.io.IOException;
 | 
				
			||||||
 | 
					import java.util.ArrayList;
 | 
				
			||||||
 | 
					import java.util.Arrays;
 | 
				
			||||||
 | 
					import java.util.List;
 | 
				
			||||||
 | 
					import java.util.concurrent.ForkJoinPool;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Provider
 | 
				
			||||||
 | 
					public class EventFilter implements ContainerResponseFilter {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  private static final Logger LOG = LoggerFactory.getLogger(EventFilter.class);
 | 
				
			||||||
 | 
					  private static final List<String> AUDITABLE_METHODS = Arrays.asList("POST", "PUT", "PATCH", "DELETE");
 | 
				
			||||||
 | 
					  private static final int FORK_JOIN_POOL_PARALLELISM = 20;
 | 
				
			||||||
 | 
					  private CatalogApplicationConfig config;
 | 
				
			||||||
 | 
					  private DBI jdbi;
 | 
				
			||||||
 | 
					  private final ForkJoinPool forkJoinPool;
 | 
				
			||||||
 | 
					  private List<EventHandler> eventHandlers;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public EventFilter(CatalogApplicationConfig config, DBI jdbi) {
 | 
				
			||||||
 | 
					    this.config = config;
 | 
				
			||||||
 | 
					    this.jdbi = jdbi;
 | 
				
			||||||
 | 
					    this.forkJoinPool = new ForkJoinPool(FORK_JOIN_POOL_PARALLELISM);
 | 
				
			||||||
 | 
					    this.eventHandlers = new ArrayList<>();
 | 
				
			||||||
 | 
					    AuditEventHandler auditEventHandler = new AuditEventHandler();
 | 
				
			||||||
 | 
					    auditEventHandler.init(config, jdbi);
 | 
				
			||||||
 | 
					    eventHandlers.add(auditEventHandler);
 | 
				
			||||||
 | 
					    ElasticSearchEventHandler elasticSearchEventHandler = new ElasticSearchEventHandler();
 | 
				
			||||||
 | 
					    elasticSearchEventHandler.init(config, jdbi);
 | 
				
			||||||
 | 
					    eventHandlers.add(elasticSearchEventHandler);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  @Override
 | 
				
			||||||
 | 
					  public void filter(ContainerRequestContext requestContext,
 | 
				
			||||||
 | 
					                     ContainerResponseContext responseContext) throws IOException {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    int responseCode = responseContext.getStatus();
 | 
				
			||||||
 | 
					    String method = requestContext.getMethod();
 | 
				
			||||||
 | 
					    if ((responseCode < 200 || responseCode > 299) || (!AUDITABLE_METHODS.contains(method))) {
 | 
				
			||||||
 | 
					      return;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    eventHandlers.parallelStream().forEach(eventHandler -> ParallelStreamUtil.runAsync(() ->
 | 
				
			||||||
 | 
					            eventHandler.process(requestContext, responseContext), forkJoinPool));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -0,0 +1,29 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  Licensed to the Apache Software Foundation (ASF) under one or more
 | 
				
			||||||
 | 
					 *  contributor license agreements. See the NOTICE file distributed with
 | 
				
			||||||
 | 
					 *  this work for additional information regarding copyright ownership.
 | 
				
			||||||
 | 
					 *  The ASF licenses this file to You under the Apache License, Version 2.0
 | 
				
			||||||
 | 
					 *  (the "License"); you may not use this file except in compliance with
 | 
				
			||||||
 | 
					 *  the License. You may obtain a copy of the License at
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 *  http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					 *  Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					 *  distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					 *  See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					 *  limitations under the License.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package org.openmetadata.catalog.events;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.CatalogApplicationConfig;
 | 
				
			||||||
 | 
					import org.skife.jdbi.v2.DBI;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import javax.ws.rs.container.ContainerRequestContext;
 | 
				
			||||||
 | 
					import javax.ws.rs.container.ContainerResponseContext;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public interface EventHandler {
 | 
				
			||||||
 | 
					  void init(CatalogApplicationConfig config, DBI jdbi);
 | 
				
			||||||
 | 
					  Void process(ContainerRequestContext requestContext, ContainerResponseContext responseContext);
 | 
				
			||||||
 | 
					  void close();
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -0,0 +1,100 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  Licensed to the Apache Software Foundation (ASF) under one or more
 | 
				
			||||||
 | 
					 *  contributor license agreements. See the NOTICE file distributed with
 | 
				
			||||||
 | 
					 *  this work for additional information regarding copyright ownership.
 | 
				
			||||||
 | 
					 *  The ASF licenses this file to You under the Apache License, Version 2.0
 | 
				
			||||||
 | 
					 *  (the "License"); you may not use this file except in compliance with
 | 
				
			||||||
 | 
					 *  the License. You may obtain a copy of the License at
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 *  http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					 *  Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					 *  distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					 *  See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					 *  limitations under the License.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package org.openmetadata.catalog.jdbi3;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.entity.audit.AuditLog;
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.util.EntityUtil;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.openmetadata.catalog.util.JsonUtils;
 | 
				
			||||||
 | 
					import org.skife.jdbi.v2.sqlobject.Bind;
 | 
				
			||||||
 | 
					import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
 | 
				
			||||||
 | 
					import org.skife.jdbi.v2.sqlobject.SqlQuery;
 | 
				
			||||||
 | 
					import org.skife.jdbi.v2.sqlobject.SqlUpdate;
 | 
				
			||||||
 | 
					import org.skife.jdbi.v2.sqlobject.Transaction;
 | 
				
			||||||
 | 
					import org.slf4j.Logger;
 | 
				
			||||||
 | 
					import org.slf4j.LoggerFactory;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.io.IOException;
 | 
				
			||||||
 | 
					import java.util.ArrayList;
 | 
				
			||||||
 | 
					import java.util.List;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public abstract class AuditLogRepository {
 | 
				
			||||||
 | 
					  public static final Logger LOG = LoggerFactory.getLogger(AuditLogRepository.class);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  @CreateSqlObject
 | 
				
			||||||
 | 
					  abstract AuditLogDAO auditLogDAO();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  @Transaction
 | 
				
			||||||
 | 
					  public List<AuditLog> list() throws IOException {
 | 
				
			||||||
 | 
					    List<String> jsons = auditLogDAO().list();
 | 
				
			||||||
 | 
					    List<AuditLog> auditLogs = new ArrayList<>();
 | 
				
			||||||
 | 
					    for (String json : jsons) {
 | 
				
			||||||
 | 
					      auditLogs.add(JsonUtils.readValue(json, AuditLog.class));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return auditLogs;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  @Transaction
 | 
				
			||||||
 | 
					  public AuditLog get(String id) throws IOException {
 | 
				
			||||||
 | 
					    return EntityUtil.validate(id, auditLogDAO().findById(id), AuditLog.class);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  @Transaction
 | 
				
			||||||
 | 
					  public List<AuditLog> getByEntityType(String entityType) throws IOException {
 | 
				
			||||||
 | 
					    List<String> jsons = auditLogDAO().findByEntityType(entityType);
 | 
				
			||||||
 | 
					    List<AuditLog> auditLogs = new ArrayList<>();
 | 
				
			||||||
 | 
					    for (String json: jsons) {
 | 
				
			||||||
 | 
					      auditLogs.add(JsonUtils.readValue(json, AuditLog.class));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return auditLogs;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  @Transaction
 | 
				
			||||||
 | 
					  public AuditLog create(AuditLog auditLog) throws IOException {
 | 
				
			||||||
 | 
					    auditLogDAO().insert(JsonUtils.pojoToJson(auditLog));
 | 
				
			||||||
 | 
					    return auditLog;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  @Transaction
 | 
				
			||||||
 | 
					  public void delete(String id) throws IOException {
 | 
				
			||||||
 | 
					    auditLogDAO().delete(id);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public interface AuditLogDAO {
 | 
				
			||||||
 | 
					    @SqlUpdate("INSERT INTO audit_log (json) VALUES (:json)")
 | 
				
			||||||
 | 
					    void insert(@Bind("json") String json);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @SqlQuery("SELECT json FROM audit_log WHERE id = :id")
 | 
				
			||||||
 | 
					    String findById(@Bind("id") String id);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @SqlQuery("SELECT json FROM audit_log WHERE entity_type = :entity_type")
 | 
				
			||||||
 | 
					    List<String> findByEntityType(@Bind("entity_type") String entityType);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @SqlQuery("SELECT json FROM audit_log")
 | 
				
			||||||
 | 
					    List<String> list();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @SqlUpdate("UPDATE audit_log_entity SET json = :json WHERE id = :id")
 | 
				
			||||||
 | 
					    void update(@Bind("id") String id, @Bind("json") String json);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @SqlUpdate("DELETE FROM audit_log_entity WHERE id = :id")
 | 
				
			||||||
 | 
					    int delete(@Bind("id") String id);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -254,7 +254,7 @@ public abstract class UserRepository {
 | 
				
			|||||||
    List<String> teamIds = relationshipDAO().findFrom(user.getId().toString(), CONTAINS.ordinal(), "team");
 | 
					    List<String> teamIds = relationshipDAO().findFrom(user.getId().toString(), CONTAINS.ordinal(), "team");
 | 
				
			||||||
    List<Team> teams = new ArrayList<>();
 | 
					    List<Team> teams = new ArrayList<>();
 | 
				
			||||||
    for (String teamId : teamIds) {
 | 
					    for (String teamId : teamIds) {
 | 
				
			||||||
      LOG.info("Adding team {}", teamId);
 | 
					      LOG.debug("Adding team {}", teamId);
 | 
				
			||||||
      String json = teamDAO().findById(teamId);
 | 
					      String json = teamDAO().findById(teamId);
 | 
				
			||||||
      Team team = JsonUtils.readValue(json, Team.class);
 | 
					      Team team = JsonUtils.readValue(json, Team.class);
 | 
				
			||||||
      if (team != null) {
 | 
					      if (team != null) {
 | 
				
			||||||
 | 
				
			|||||||
@ -265,6 +265,33 @@ public final class EntityUtil {
 | 
				
			|||||||
    throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn));
 | 
					    throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn));
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public static EntityReference getEntityReference(Object entity, Class<?> clazz) throws IOException {
 | 
				
			||||||
 | 
					    if (clazz.toString().toLowerCase().endsWith(Entity.TABLE.toLowerCase())) {
 | 
				
			||||||
 | 
					      Table instance = (Table) entity;
 | 
				
			||||||
 | 
					      return getEntityReference(instance);
 | 
				
			||||||
 | 
					    } else if (clazz.toString().toLowerCase().endsWith(Entity.DATABASE.toLowerCase())) {
 | 
				
			||||||
 | 
					      Database instance = (Database) entity;
 | 
				
			||||||
 | 
					      return getEntityReference(instance);
 | 
				
			||||||
 | 
					    } else if (clazz.toString().toLowerCase().endsWith(Entity.METRICS.toLowerCase())) {
 | 
				
			||||||
 | 
					      Metrics instance = (Metrics) entity;
 | 
				
			||||||
 | 
					      return getEntityReference(instance);
 | 
				
			||||||
 | 
					    } else if (clazz.toString().toLowerCase().endsWith(Entity.DATABASE_SERVICE.toLowerCase())) {
 | 
				
			||||||
 | 
					      DatabaseService instance = (DatabaseService) entity;
 | 
				
			||||||
 | 
					      return getEntityReference(instance);
 | 
				
			||||||
 | 
					    } else if (clazz.toString().toLowerCase().endsWith(Entity.REPORT.toLowerCase())) {
 | 
				
			||||||
 | 
					      Report instance = (Report) entity;
 | 
				
			||||||
 | 
					      return getEntityReference(instance);
 | 
				
			||||||
 | 
					    } else if (clazz.toString().toLowerCase().endsWith(Entity.TEAM.toLowerCase())) {
 | 
				
			||||||
 | 
					      Team instance = (Team) entity;
 | 
				
			||||||
 | 
					      return getEntityReference(instance);
 | 
				
			||||||
 | 
					    } else if (clazz.toString().toLowerCase().endsWith(Entity.USER.toLowerCase())) {
 | 
				
			||||||
 | 
					      User instance = (User) entity;
 | 
				
			||||||
 | 
					      return getEntityReference(instance);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(
 | 
				
			||||||
 | 
					            String.format("Failed to find entity class {}", clazz.toString())));
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public static EntityReference getEntityReference(DatabaseService service) {
 | 
					  public static EntityReference getEntityReference(DatabaseService service) {
 | 
				
			||||||
    return new EntityReference().withName(service.getName()).withId(service.getId())
 | 
					    return new EntityReference().withName(service.getName()).withId(service.getId())
 | 
				
			||||||
            .withType(Entity.DATABASE_SERVICE);
 | 
					            .withType(Entity.DATABASE_SERVICE);
 | 
				
			||||||
 | 
				
			|||||||
@ -0,0 +1,112 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  Licensed to the Apache Software Foundation (ASF) under one or more
 | 
				
			||||||
 | 
					 *  contributor license agreements. See the NOTICE file distributed with
 | 
				
			||||||
 | 
					 *  this work for additional information regarding copyright ownership.
 | 
				
			||||||
 | 
					 *  The ASF licenses this file to You under the Apache License, Version 2.0
 | 
				
			||||||
 | 
					 *  (the "License"); you may not use this file except in compliance with
 | 
				
			||||||
 | 
					 *  the License. You may obtain a copy of the License at
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 *  http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					 *  Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					 *  distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					 *  See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					 *  limitations under the License.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package org.openmetadata.catalog.util;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.google.common.base.Stopwatch;
 | 
				
			||||||
 | 
					import org.slf4j.Logger;
 | 
				
			||||||
 | 
					import org.slf4j.LoggerFactory;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.util.concurrent.Callable;
 | 
				
			||||||
 | 
					import java.util.concurrent.CompletableFuture;
 | 
				
			||||||
 | 
					import java.util.concurrent.ExecutionException;
 | 
				
			||||||
 | 
					import java.util.concurrent.Executor;
 | 
				
			||||||
 | 
					import java.util.concurrent.TimeUnit;
 | 
				
			||||||
 | 
					import java.util.function.Supplier;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public final class ParallelStreamUtil {
 | 
				
			||||||
 | 
					  private static final Logger LOG = LoggerFactory.getLogger(ParallelStreamUtil.class);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  private ParallelStreamUtil() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public static <T> T execute(Supplier<T> supplier, Executor executor) {
 | 
				
			||||||
 | 
					    Stopwatch stopwatch = Stopwatch.createStarted();
 | 
				
			||||||
 | 
					    LOG.debug("execute start");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    try {
 | 
				
			||||||
 | 
					      CompletableFuture<T> resultFuture = CompletableFuture.supplyAsync(supplier, executor);
 | 
				
			||||||
 | 
					      return resultFuture.get();
 | 
				
			||||||
 | 
					    } catch (InterruptedException e) {
 | 
				
			||||||
 | 
					      throw new RuntimeException(e);
 | 
				
			||||||
 | 
					    } catch (ExecutionException e) {
 | 
				
			||||||
 | 
					      handleExecutionException(e);
 | 
				
			||||||
 | 
					      // shouldn't reach here
 | 
				
			||||||
 | 
					      throw new IllegalStateException("Shouldn't reach here");
 | 
				
			||||||
 | 
					    } finally {
 | 
				
			||||||
 | 
					      LOG.debug("execute complete - elapsed: {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
 | 
				
			||||||
 | 
					      stopwatch.stop();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public static <T> T executeWithTimeout(int timeoutInSeconds, Supplier<T> supplier, Executor executor) {
 | 
				
			||||||
 | 
					    Stopwatch stopwatch = Stopwatch.createStarted();
 | 
				
			||||||
 | 
					    LOG.debug("execute start");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    try {
 | 
				
			||||||
 | 
					      CompletableFuture<T> resultFuture = CompletableFuture.supplyAsync(supplier, executor);
 | 
				
			||||||
 | 
					      return resultFuture.get(timeoutInSeconds, TimeUnit.SECONDS);
 | 
				
			||||||
 | 
					    } catch (InterruptedException e) {
 | 
				
			||||||
 | 
					      throw new RuntimeException(e);
 | 
				
			||||||
 | 
					    } catch (ExecutionException e) {
 | 
				
			||||||
 | 
					      handleExecutionException(e);
 | 
				
			||||||
 | 
					      // shouldn't reach here
 | 
				
			||||||
 | 
					      throw new IllegalStateException("Shouldn't reach here");
 | 
				
			||||||
 | 
					    } catch (Exception e) {
 | 
				
			||||||
 | 
					      throw new RuntimeException(e);
 | 
				
			||||||
 | 
					    } finally {
 | 
				
			||||||
 | 
					      LOG.debug("execute complete - elapsed: {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
 | 
				
			||||||
 | 
					      stopwatch.stop();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public static void runAsync(Callable<Void> callable, Executor executor) {
 | 
				
			||||||
 | 
					    Stopwatch stopwatch = Stopwatch.createStarted();
 | 
				
			||||||
 | 
					    LOG.debug("runAsync start");
 | 
				
			||||||
 | 
					    CompletableFuture<Void> res = CompletableFuture.supplyAsync(() -> {
 | 
				
			||||||
 | 
					      try {
 | 
				
			||||||
 | 
					        return callable.call();
 | 
				
			||||||
 | 
					      } catch (Exception ex) {
 | 
				
			||||||
 | 
					        throw new RuntimeException(ex);
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }, executor);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    res.whenComplete((r, th) -> {
 | 
				
			||||||
 | 
					      // LOG any exceptions
 | 
				
			||||||
 | 
					      if (th != null) {
 | 
				
			||||||
 | 
					        LOG.error("Got exception while running async task", th.getCause());
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					      LOG.debug("runAsync complete - elapsed: {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
 | 
				
			||||||
 | 
					      stopwatch.stop();
 | 
				
			||||||
 | 
					    });
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  private static void handleExecutionException(ExecutionException e) {
 | 
				
			||||||
 | 
					    Throwable t = e.getCause();
 | 
				
			||||||
 | 
					    if (t != null) {
 | 
				
			||||||
 | 
					      if (t instanceof RuntimeException) {
 | 
				
			||||||
 | 
					        throw (RuntimeException) t;
 | 
				
			||||||
 | 
					      } else {
 | 
				
			||||||
 | 
					        throw new RuntimeException(t);
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					      throw new RuntimeException(e);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -0,0 +1,49 @@
 | 
				
			|||||||
 | 
					{
 | 
				
			||||||
 | 
					  "$id": "https://streaminlinedata.ai/entity/audit/auditLog.json",
 | 
				
			||||||
 | 
					  "$schema": "http://json-schema.org/draft-07/schema#",
 | 
				
			||||||
 | 
					  "title": "Audit Log entity",
 | 
				
			||||||
 | 
					  "description": "Entity that represents a Audit Log",
 | 
				
			||||||
 | 
					  "type": "object",
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  "properties" : {
 | 
				
			||||||
 | 
					    "id": {
 | 
				
			||||||
 | 
					      "description": "Unique identifier that identifies a Audit Log Entry",
 | 
				
			||||||
 | 
					      "$ref": "../../type/basic.json#/definitions/uuid"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    "method": {
 | 
				
			||||||
 | 
					      "description": "HTTP Method used in a call",
 | 
				
			||||||
 | 
					      "type": "string",
 | 
				
			||||||
 | 
					      "minLength": 1,
 | 
				
			||||||
 | 
					      "maxLength": 64
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    "responseCode": {
 | 
				
			||||||
 | 
					      "description": "HTTP response code for the api requested",
 | 
				
			||||||
 | 
					      "type": "integer"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    "path": {
 | 
				
			||||||
 | 
					      "description": "requested API Path",
 | 
				
			||||||
 | 
					      "type": "string"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    "username": {
 | 
				
			||||||
 | 
					      "description": "Name of the user who requested for the API",
 | 
				
			||||||
 | 
					      "type": "string"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    "date": {
 | 
				
			||||||
 | 
					      "description": "Date which the api call is made",
 | 
				
			||||||
 | 
					      "$ref": "../../type/basic.json#/definitions/date"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    "entityId": {
 | 
				
			||||||
 | 
					      "description": "entityReference Id",
 | 
				
			||||||
 | 
					      "$ref": "../../type/basic.json#/definitions/uuid"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    "entityType": {
 | 
				
			||||||
 | 
					      "description": "Entity Type",
 | 
				
			||||||
 | 
					      "type": "string"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    "entity" : {
 | 
				
			||||||
 | 
					      "description": "Link to entity on which api request is done",
 | 
				
			||||||
 | 
					      "$ref" : "../../type/entityReference.json"
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  },
 | 
				
			||||||
 | 
					  "required": ["id", "method", "responseCode", "user", "entity"]
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -1,17 +1,5 @@
 | 
				
			|||||||
#  Licensed to the Apache Software Foundation (ASF) under one or more
 | 
					# Copyright Contributors to the Amundsen project.
 | 
				
			||||||
#  contributor license agreements. See the NOTICE file distributed with
 | 
					# SPDX-License-Identifier: Apache-2.0
 | 
				
			||||||
#  this work for additional information regarding copyright ownership.
 | 
					 | 
				
			||||||
#  The ASF licenses this file to You under the Apache License, Version 2.0
 | 
					 | 
				
			||||||
#  (the "License"); you may not use this file except in compliance with
 | 
					 | 
				
			||||||
#  the License. You may obtain a copy of the License at
 | 
					 | 
				
			||||||
#
 | 
					 | 
				
			||||||
#  http://www.apache.org/licenses/LICENSE-2.0
 | 
					 | 
				
			||||||
#
 | 
					 | 
				
			||||||
#  Unless required by applicable law or agreed to in writing, software
 | 
					 | 
				
			||||||
#  distributed under the License is distributed on an "AS IS" BASIS,
 | 
					 | 
				
			||||||
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					 | 
				
			||||||
#  See the License for the specific language governing permissions and
 | 
					 | 
				
			||||||
#  limitations under the License.
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
import logging
 | 
					import logging
 | 
				
			||||||
@ -159,7 +147,9 @@ class ElasticSearchBulkSink(BulkSink):
 | 
				
			|||||||
        self.elasticsearch_client.indices.create(index=self.table_index_new, body=self.elasticsearch_mapping)
 | 
					        self.elasticsearch_client.indices.create(index=self.table_index_new, body=self.elasticsearch_mapping)
 | 
				
			||||||
        for doc in docs:
 | 
					        for doc in docs:
 | 
				
			||||||
            index_row = dict(index=dict(_index=self.table_index_new,
 | 
					            index_row = dict(index=dict(_index=self.table_index_new,
 | 
				
			||||||
 | 
					                                        _id=doc.table_id,
 | 
				
			||||||
                                        _type=self.elasticsearch_doc_type))
 | 
					                                        _type=self.elasticsearch_doc_type))
 | 
				
			||||||
 | 
					            print(index_row)
 | 
				
			||||||
            records.append(index_row)
 | 
					            records.append(index_row)
 | 
				
			||||||
            records.append(doc.json())
 | 
					            records.append(doc.json())
 | 
				
			||||||
            count += 1
 | 
					            count += 1
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user