Remove Cleanups for JdbiUnitOfWorkProvider + added cases for autocomm… (#13163)

* Remove Cleanups for JdbiUnitOfWorkProvider + added cases for autocommit in case of non transactional events

* remove unnecessary clear

* typos
This commit is contained in:
Mohit Yadav 2023-09-13 08:58:42 +05:30 committed by GitHub
parent 1865f46ab5
commit 6d4e425f36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 103 additions and 78 deletions

View File

@ -87,6 +87,7 @@ import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.jdbi3.unitofwork.JdbiTransactionManager;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkApplicationEventListener;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;
import org.openmetadata.service.migration.Migration;
@ -142,9 +143,9 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
ChangeEventConfig.initialize(catalogConfig);
final Jdbi jdbi = createAndSetupJDBI(environment, catalogConfig.getDataSourceFactory());
JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider = JdbiUnitOfWorkProvider.withDefault(jdbi);
CollectionDAO daoObject =
(CollectionDAO) getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, CollectionDAO.class);
environment.jersey().register(new JdbiUnitOfWorkApplicationEventListener(jdbiUnitOfWorkProvider, new HashSet<>()));
CollectionDAO daoObject = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class);
JdbiTransactionManager.initialize(jdbiUnitOfWorkProvider.getHandleManager());
environment.jersey().register(new JdbiUnitOfWorkApplicationEventListener(new HashSet<>()));
// Configure the Fernet instance
Fernet.getInstance().setFernetKey(catalogConfig);

View File

@ -20,7 +20,6 @@ import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.type.AuditLog;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
@ -28,7 +27,7 @@ import org.slf4j.MarkerFactory;
public class AuditEventHandler implements EventHandler {
private final Marker auditMarker = MarkerFactory.getMarker("AUDIT");
public void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider provider) {
public void init(OpenMetadataApplicationConfig config) {
// Nothing to do
}

View File

@ -25,7 +25,6 @@ import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.SecurityContext;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.ChangeEvent;
@ -35,7 +34,6 @@ import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.FeedRepository;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.FeedUtils;
import org.openmetadata.service.util.JsonUtils;
@ -46,12 +44,9 @@ public class ChangeEventHandler implements EventHandler {
private ObjectMapper mapper;
private NotificationHandler notificationHandler;
private JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider;
public void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider) {
public void init(OpenMetadataApplicationConfig config) {
this.mapper = new ObjectMapper();
this.notificationHandler = new NotificationHandler(jdbiUnitOfWorkProvider);
this.jdbiUnitOfWorkProvider = jdbiUnitOfWorkProvider;
this.notificationHandler = new NotificationHandler();
}
@SneakyThrows
@ -60,10 +55,7 @@ public class ChangeEventHandler implements EventHandler {
SecurityContext securityContext = requestContext.getSecurityContext();
String loggedInUserName = securityContext.getUserPrincipal().getName();
try {
Handle handle = jdbiUnitOfWorkProvider.getHandleManager().get();
handle.getConnection().setAutoCommit(true);
CollectionDAO collectionDAO =
(CollectionDAO) getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, CollectionDAO.class);
CollectionDAO collectionDAO = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class);
CollectionDAO.ChangeEventDAO changeEventDAO = collectionDAO.changeEventDAO();
FeedRepository feedRepository = new FeedRepository(collectionDAO);
if (responseContext.getEntity() != null && responseContext.getEntity().getClass().equals(Thread.class)) {
@ -110,8 +102,6 @@ public class ChangeEventHandler implements EventHandler {
}
} catch (Exception e) {
LOG.error("Failed to capture the change event for method {} due to ", method, e);
} finally {
jdbiUnitOfWorkProvider.getHandleManager().clear();
}
return null;
}

View File

@ -52,7 +52,7 @@ public class EventFilter implements ContainerResponseFilter {
@SuppressWarnings("unchecked")
EventHandler eventHandler =
((Class<EventHandler>) Class.forName(eventHandlerClassName)).getConstructor().newInstance();
eventHandler.init(config, provider);
eventHandler.init(config);
eventHandlers.add(eventHandler);
LOG.info("Added event handler {}", eventHandlerClassName);
}

View File

@ -16,10 +16,9 @@ package org.openmetadata.service.events;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;
public interface EventHandler {
void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider jdbi);
void init(OpenMetadataApplicationConfig config);
Void process(ContainerRequestContext requestContext, ContainerResponseContext responseContext);

View File

@ -7,7 +7,6 @@ import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.UriInfo;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;
import org.openmetadata.service.util.MicrometerBundleSingleton;
@Slf4j
@ -17,7 +16,7 @@ public class WebAnalyticEventHandler implements EventHandler {
public static final String WEB_ANALYTIC_ENDPOINT = "v1/analytics/web/events/collect";
private static final String COUNTER_NAME = "web.analytics.events";
public void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider provider) {
public void init(OpenMetadataApplicationConfig config) {
this.prometheusMeterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry;
this.clusterName = config.getClusterName();
}

View File

@ -7,18 +7,14 @@ import org.glassfish.jersey.server.monitoring.RequestEventListener;
@Slf4j
class HttpGetRequestJdbiUnitOfWorkEventListener implements RequestEventListener {
private final JdbiTransactionAspect transactionAspect;
HttpGetRequestJdbiUnitOfWorkEventListener(JdbiHandleManager handleManager) {
this.transactionAspect = new JdbiTransactionAspect(handleManager);
}
HttpGetRequestJdbiUnitOfWorkEventListener() {}
@Override
public void onEvent(RequestEvent event) {
RequestEvent.Type type = event.getType();
LOG.debug("Handling GET Request Event {} {}", type, Thread.currentThread().getId());
if (type == RequestEvent.Type.FINISHED) {
transactionAspect.terminateHandle();
JdbiTransactionManager.getInstance().terminateHandle();
}
}
}

View File

@ -2,8 +2,11 @@ package org.openmetadata.service.jdbi3.unitofwork;
import java.util.concurrent.ThreadFactory;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
public interface JdbiHandleManager {
Jdbi getJdbi();
Handle get();
void clear();

View File

@ -8,14 +8,29 @@ import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Handles;
@Slf4j
public class JdbiTransactionAspect {
public class JdbiTransactionManager {
private static JdbiTransactionManager instance;
private static volatile boolean initialized = false;
private final JdbiHandleManager handleManager;
private final Set<Integer> IN_TRANSACTION_HANDLES = Collections.newSetFromMap(new ConcurrentHashMap<>());
public JdbiTransactionAspect(JdbiHandleManager handleManager) {
private JdbiTransactionManager(JdbiHandleManager handleManager) {
this.handleManager = handleManager;
}
public static void initialize(JdbiHandleManager handleManager) {
if (!initialized) {
instance = new JdbiTransactionManager(handleManager);
initialized = true;
} else {
LOG.info("Jdbi Transaction Manager is already initialized");
}
}
public static JdbiTransactionManager getInstance() {
return instance;
}
public void begin(boolean autoCommit) {
try {
Handle handle = handleManager.get();
@ -79,7 +94,13 @@ public class JdbiTransactionAspect {
}
public void terminateHandle() {
if (IN_TRANSACTION_HANDLES.contains(handleManager.get().hashCode())) {
handleManager.clear();
}
IN_TRANSACTION_HANDLES.remove(handleManager.get().hashCode());
handleManager.clear();
}
public boolean containsHandle(int hashCode) {
return IN_TRANSACTION_HANDLES.contains(hashCode);
}
}

View File

@ -11,12 +11,9 @@ import org.glassfish.jersey.server.monitoring.RequestEventListener;
@Slf4j
public class JdbiUnitOfWorkApplicationEventListener implements ApplicationEventListener {
private final JdbiUnitOfWorkProvider unitOfWorkProvider;
private final Set<String> excludedPaths;
public JdbiUnitOfWorkApplicationEventListener(JdbiUnitOfWorkProvider unitOfWorkProvider, Set<String> excludedPaths) {
this.unitOfWorkProvider = unitOfWorkProvider;
public JdbiUnitOfWorkApplicationEventListener(Set<String> excludedPaths) {
this.excludedPaths = excludedPaths;
}
@ -33,8 +30,8 @@ public class JdbiUnitOfWorkApplicationEventListener implements ApplicationEventL
return null;
}
if (event.getContainerRequest().getMethod().equals(HttpMethod.GET)) {
return new HttpGetRequestJdbiUnitOfWorkEventListener(unitOfWorkProvider.getHandleManager());
return new HttpGetRequestJdbiUnitOfWorkEventListener();
}
return new NonHttpGetRequestJdbiUnitOfWorkEventListener(unitOfWorkProvider.getHandleManager());
return new NonHttpGetRequestJdbiUnitOfWorkEventListener();
}
}

View File

@ -1,14 +1,15 @@
package org.openmetadata.service.jdbi3.unitofwork;
import com.google.common.reflect.Reflection;
import java.util.*;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
@Slf4j
@SuppressWarnings({"UnstableApiUsage", "rawtypes", "unchecked"})
public class JdbiUnitOfWorkProvider {
private static JdbiUnitOfWorkProvider instance;
private static volatile boolean initialized = false;
private final JdbiHandleManager handleManager;
private JdbiUnitOfWorkProvider(JdbiHandleManager handleManager) {
@ -16,19 +17,35 @@ public class JdbiUnitOfWorkProvider {
}
public static JdbiUnitOfWorkProvider withDefault(Jdbi dbi) {
JdbiHandleManager handleManager = new RequestScopedJdbiHandleManager(dbi);
return new JdbiUnitOfWorkProvider(handleManager);
return initialize(new RequestScopedJdbiHandleManager(dbi));
}
public static JdbiUnitOfWorkProvider withLinked(Jdbi dbi) {
JdbiHandleManager handleManager = new LinkedRequestScopedJdbiHandleManager(dbi);
return new JdbiUnitOfWorkProvider(handleManager);
return initialize(new LinkedRequestScopedJdbiHandleManager(dbi));
}
public static JdbiUnitOfWorkProvider getInstance() {
return instance;
}
private static JdbiUnitOfWorkProvider initialize(JdbiHandleManager handleManager) {
if (!initialized) {
instance = new JdbiUnitOfWorkProvider(handleManager);
initialized = true;
} else {
LOG.info("JdbiUnitOfWorkProvider is already initialized");
}
return instance;
}
public JdbiHandleManager getHandleManager() {
return handleManager;
}
public Handle getHandle() {
return handleManager.get();
}
/**
* getWrappedInstanceForDaoClass generates a proxy instance of the dao class for which the jdbi unit of work aspect
* would be wrapped around with. This method however may be used in case the classpath scanning is disabled. If the
@ -37,15 +54,15 @@ public class JdbiUnitOfWorkProvider {
* @param daoClass the DAO class for which a proxy needs to be created fo
* @return the wrapped instance ready to be passed around
*/
public static Object getWrappedInstanceForDaoClass(JdbiUnitOfWorkProvider provider, Class daoClass) {
public static Object getWrappedInstanceForDaoClass(Class daoClass) {
if (daoClass == null) {
throw new IllegalArgumentException("DAO Class cannot be null");
}
LOG.debug(
"Binding class [{}] with proxy handler [{}] ",
daoClass.getSimpleName(),
provider.getHandleManager().getClass().getSimpleName());
ManagedHandleInvocationHandler handler = new ManagedHandleInvocationHandler<>(provider, daoClass);
JdbiUnitOfWorkProvider.getInstance().getHandleManager().getClass().getSimpleName());
ManagedHandleInvocationHandler handler = new ManagedHandleInvocationHandler<>(daoClass);
Object proxiedInstance = Reflection.newProxy(daoClass, handler);
return daoClass.cast(proxiedInstance);
}

View File

@ -31,6 +31,11 @@ class LinkedRequestScopedJdbiHandleManager implements JdbiHandleManager {
this.dbi = dbi;
}
@Override
public Jdbi getJdbi() {
return dbi;
}
@Override
public Handle get() {
String parent = substringBetween(Thread.currentThread().getName());

View File

@ -13,10 +13,8 @@ import org.openmetadata.service.jdbi3.CollectionDAO;
public class ManagedHandleInvocationHandler<T> implements InvocationHandler {
private static final Object[] NO_ARGS = {};
private final Class<T> underlying;
private final JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider;
public ManagedHandleInvocationHandler(JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider, Class<T> underlying) {
this.jdbiUnitOfWorkProvider = jdbiUnitOfWorkProvider;
public ManagedHandleInvocationHandler(Class<T> underlying) {
this.underlying = underlying;
}
@ -40,7 +38,7 @@ public class ManagedHandleInvocationHandler<T> implements InvocationHandler {
}
private Object handleInvocation(Method method, Object[] args) throws Throwable {
Handle handle = jdbiUnitOfWorkProvider.getHandleManager().get();
Handle handle = JdbiUnitOfWorkProvider.getInstance().getHandle();
LOG.debug(
"{}.{} [{}] Thread Id [{}] with handle id [{}]",
method.getDeclaringClass().getSimpleName(),
@ -50,8 +48,12 @@ public class ManagedHandleInvocationHandler<T> implements InvocationHandler {
handle.hashCode());
if (CollectionDAO.class.isAssignableFrom(underlying) && method.isAnnotationPresent(CreateSqlObject.class)) {
return getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, method.getReturnType());
return getWrappedInstanceForDaoClass(method.getReturnType());
} else {
if (!JdbiTransactionManager.getInstance().containsHandle(handle.hashCode())) {
// This is non-transactional request
handle.getConnection().setAutoCommit(true);
}
Object dao = handle.attach(underlying);
try {
return method.invoke(dao, args);

View File

@ -8,11 +8,7 @@ import org.glassfish.jersey.server.monitoring.RequestEventListener;
@Slf4j
class NonHttpGetRequestJdbiUnitOfWorkEventListener implements RequestEventListener {
private final JdbiTransactionAspect transactionAspect;
NonHttpGetRequestJdbiUnitOfWorkEventListener(JdbiHandleManager handleManager) {
this.transactionAspect = new JdbiTransactionAspect(handleManager);
}
NonHttpGetRequestJdbiUnitOfWorkEventListener() {}
@Override
public void onEvent(RequestEvent event) {
@ -23,13 +19,13 @@ class NonHttpGetRequestJdbiUnitOfWorkEventListener implements RequestEventListen
boolean isTransactional = isTransactional(event);
if (isTransactional) {
if (type == RequestEvent.Type.RESOURCE_METHOD_START) {
transactionAspect.begin(false);
JdbiTransactionManager.getInstance().begin(false);
} else if (type == RequestEvent.Type.RESP_FILTERS_START) {
transactionAspect.commit();
JdbiTransactionManager.getInstance().commit();
} else if (type == RequestEvent.Type.ON_EXCEPTION) {
transactionAspect.rollback();
JdbiTransactionManager.getInstance().rollback();
} else if (type == RequestEvent.Type.FINISHED) {
transactionAspect.terminateHandle();
JdbiTransactionManager.getInstance().terminateHandle();
}
}
}

View File

@ -6,7 +6,6 @@ import org.jdbi.v3.core.Jdbi;
@Slf4j
class RequestScopedJdbiHandleManager implements JdbiHandleManager {
private final Jdbi dbi;
@SuppressWarnings("ThreadLocalUsage")
@ -16,6 +15,11 @@ class RequestScopedJdbiHandleManager implements JdbiHandleManager {
this.dbi = dbi;
}
@Override
public Jdbi getJdbi() {
return dbi;
}
@Override
public Handle get() {
if (threadLocal.get() == null) {

View File

@ -77,7 +77,7 @@ public class SearchResource {
public void initialize(OpenMetadataApplicationConfig config) {
if (config.getElasticSearchConfiguration() != null) {
searchClient = IndexUtil.getSearchClient(config.getElasticSearchConfiguration(), dao);
ReIndexingHandler.initialize(searchClient, dao);
ReIndexingHandler.initialize(searchClient);
}
}

View File

@ -31,7 +31,6 @@ import java.util.concurrent.Executors;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.entity.teams.Team;
import org.openmetadata.schema.entity.teams.User;
@ -49,29 +48,23 @@ import org.openmetadata.service.socket.WebSocketManager;
@Slf4j
public class NotificationHandler {
private final ObjectMapper mapper;
private final JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider;
private final ExecutorService threadScheduler;
public NotificationHandler(JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider) {
public NotificationHandler() {
this.mapper = new ObjectMapper();
this.threadScheduler = Executors.newFixedThreadPool(1);
this.jdbiUnitOfWorkProvider = jdbiUnitOfWorkProvider;
}
public void processNotifications(ContainerResponseContext responseContext) {
threadScheduler.submit(
() -> {
try {
Handle handle = jdbiUnitOfWorkProvider.getHandleManager().get();
handle.getConnection().setAutoCommit(true);
CollectionDAO collectionDAO =
(CollectionDAO) getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, CollectionDAO.class);
CollectionDAO collectionDAO = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class);
handleNotifications(responseContext, collectionDAO);
} catch (Exception ex) {
LOG.error("[NotificationHandler] Failed to use mapper in converting to Json", ex);
} finally {
jdbiUnitOfWorkProvider.getHandleManager().clear();
JdbiUnitOfWorkProvider.getInstance().getHandleManager().clear();
}
});
}

View File

@ -13,6 +13,8 @@
package org.openmetadata.service.util;
import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
@ -63,10 +65,10 @@ public class ReIndexingHandler {
return instance;
}
public static void initialize(SearchClient client, CollectionDAO daoObject) {
public static void initialize(SearchClient client) {
if (!initialized) {
searchClient = client;
dao = daoObject;
dao = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class);
taskQueue = new ArrayBlockingQueue<>(5);
threadScheduler = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, taskQueue);
instance = new ReIndexingHandler();
@ -117,7 +119,7 @@ public class ReIndexingHandler {
"eventPublisherJob",
JsonUtils.pojoToJson(jobData));
// Create Job
SearchIndexWorkflow job = new SearchIndexWorkflow(dao, searchClient, jobData);
SearchIndexWorkflow job = new SearchIndexWorkflow(searchClient, jobData);
threadScheduler.submit(job);
REINDEXING_JOB_MAP.put(jobData.getId(), job);
return jobData;

View File

@ -13,6 +13,7 @@
package org.openmetadata.service.workflows.searchIndex;
import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass;
import static org.openmetadata.service.util.ReIndexingHandler.REINDEXING_JOB_EXTENSION;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess;
@ -74,8 +75,8 @@ public class SearchIndexWorkflow implements Runnable {
private final CollectionDAO dao;
private volatile boolean stopped = false;
public SearchIndexWorkflow(CollectionDAO dao, SearchClient client, EventPublisherJob request) {
this.dao = dao;
public SearchIndexWorkflow(SearchClient client, EventPublisherJob request) {
this.dao = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class);
this.jobData = request;
request
.getEntities()