diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java index f1dabfbdf1a..346a54abddf 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java @@ -87,6 +87,7 @@ import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator; import org.openmetadata.service.jdbi3.locator.ConnectionType; +import org.openmetadata.service.jdbi3.unitofwork.JdbiTransactionManager; import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkApplicationEventListener; import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; import org.openmetadata.service.migration.Migration; @@ -142,9 +143,9 @@ public class OpenMetadataApplication extends Application())); + CollectionDAO daoObject = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class); + JdbiTransactionManager.initialize(jdbiUnitOfWorkProvider.getHandleManager()); + environment.jersey().register(new JdbiUnitOfWorkApplicationEventListener(new HashSet<>())); // Configure the Fernet instance Fernet.getInstance().setFernetKey(catalogConfig); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/AuditEventHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/AuditEventHandler.java index d25c81ac52e..b6b52e9edb0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/AuditEventHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/AuditEventHandler.java @@ -20,7 +20,6 @@ import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.type.AuditLog; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.service.OpenMetadataApplicationConfig; -import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; import org.slf4j.Marker; import org.slf4j.MarkerFactory; @@ -28,7 +27,7 @@ import org.slf4j.MarkerFactory; public class AuditEventHandler implements EventHandler { private final Marker auditMarker = MarkerFactory.getMarker("AUDIT"); - public void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider provider) { + public void init(OpenMetadataApplicationConfig config) { // Nothing to do } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java index 0cf9be79e36..4e709eeee1d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java @@ -25,7 +25,6 @@ import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.core.SecurityContext; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.jdbi.v3.core.Handle; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.ChangeEvent; @@ -35,7 +34,6 @@ import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.events.subscription.AlertUtil; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.FeedRepository; -import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; import org.openmetadata.service.socket.WebSocketManager; import org.openmetadata.service.util.FeedUtils; import org.openmetadata.service.util.JsonUtils; @@ -46,12 +44,9 @@ public class ChangeEventHandler implements EventHandler { private ObjectMapper mapper; private NotificationHandler notificationHandler; - private JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider; - - public void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider) { + public void init(OpenMetadataApplicationConfig config) { this.mapper = new ObjectMapper(); - this.notificationHandler = new NotificationHandler(jdbiUnitOfWorkProvider); - this.jdbiUnitOfWorkProvider = jdbiUnitOfWorkProvider; + this.notificationHandler = new NotificationHandler(); } @SneakyThrows @@ -60,10 +55,7 @@ public class ChangeEventHandler implements EventHandler { SecurityContext securityContext = requestContext.getSecurityContext(); String loggedInUserName = securityContext.getUserPrincipal().getName(); try { - Handle handle = jdbiUnitOfWorkProvider.getHandleManager().get(); - handle.getConnection().setAutoCommit(true); - CollectionDAO collectionDAO = - (CollectionDAO) getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, CollectionDAO.class); + CollectionDAO collectionDAO = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class); CollectionDAO.ChangeEventDAO changeEventDAO = collectionDAO.changeEventDAO(); FeedRepository feedRepository = new FeedRepository(collectionDAO); if (responseContext.getEntity() != null && responseContext.getEntity().getClass().equals(Thread.class)) { @@ -110,8 +102,6 @@ public class ChangeEventHandler implements EventHandler { } } catch (Exception e) { LOG.error("Failed to capture the change event for method {} due to ", method, e); - } finally { - jdbiUnitOfWorkProvider.getHandleManager().clear(); } return null; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/EventFilter.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/EventFilter.java index 6b4ed1f7a15..95dfbd28ffc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/EventFilter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/EventFilter.java @@ -52,7 +52,7 @@ public class EventFilter implements ContainerResponseFilter { @SuppressWarnings("unchecked") EventHandler eventHandler = ((Class) Class.forName(eventHandlerClassName)).getConstructor().newInstance(); - eventHandler.init(config, provider); + eventHandler.init(config); eventHandlers.add(eventHandler); LOG.info("Added event handler {}", eventHandlerClassName); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/EventHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/EventHandler.java index 5bafcaf2118..87be66204c3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/EventHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/EventHandler.java @@ -16,10 +16,9 @@ package org.openmetadata.service.events; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerResponseContext; import org.openmetadata.service.OpenMetadataApplicationConfig; -import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; public interface EventHandler { - void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider jdbi); + void init(OpenMetadataApplicationConfig config); Void process(ContainerRequestContext requestContext, ContainerResponseContext responseContext); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/WebAnalyticEventHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/WebAnalyticEventHandler.java index 856141df519..6b9ac9c86ec 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/WebAnalyticEventHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/WebAnalyticEventHandler.java @@ -7,7 +7,6 @@ import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.core.UriInfo; import lombok.extern.slf4j.Slf4j; import org.openmetadata.service.OpenMetadataApplicationConfig; -import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; import org.openmetadata.service.util.MicrometerBundleSingleton; @Slf4j @@ -17,7 +16,7 @@ public class WebAnalyticEventHandler implements EventHandler { public static final String WEB_ANALYTIC_ENDPOINT = "v1/analytics/web/events/collect"; private static final String COUNTER_NAME = "web.analytics.events"; - public void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider provider) { + public void init(OpenMetadataApplicationConfig config) { this.prometheusMeterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry; this.clusterName = config.getClusterName(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/HttpGetRequestJdbiUnitOfWorkEventListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/HttpGetRequestJdbiUnitOfWorkEventListener.java index afd185683c6..5be16b86b4f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/HttpGetRequestJdbiUnitOfWorkEventListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/HttpGetRequestJdbiUnitOfWorkEventListener.java @@ -7,18 +7,14 @@ import org.glassfish.jersey.server.monitoring.RequestEventListener; @Slf4j class HttpGetRequestJdbiUnitOfWorkEventListener implements RequestEventListener { - private final JdbiTransactionAspect transactionAspect; - - HttpGetRequestJdbiUnitOfWorkEventListener(JdbiHandleManager handleManager) { - this.transactionAspect = new JdbiTransactionAspect(handleManager); - } + HttpGetRequestJdbiUnitOfWorkEventListener() {} @Override public void onEvent(RequestEvent event) { RequestEvent.Type type = event.getType(); LOG.debug("Handling GET Request Event {} {}", type, Thread.currentThread().getId()); if (type == RequestEvent.Type.FINISHED) { - transactionAspect.terminateHandle(); + JdbiTransactionManager.getInstance().terminateHandle(); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiHandleManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiHandleManager.java index 46ac9f5aee9..d4db5bd8317 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiHandleManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiHandleManager.java @@ -2,8 +2,11 @@ package org.openmetadata.service.jdbi3.unitofwork; import java.util.concurrent.ThreadFactory; import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.Jdbi; public interface JdbiHandleManager { + Jdbi getJdbi(); + Handle get(); void clear(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiTransactionAspect.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiTransactionManager.java similarity index 77% rename from openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiTransactionAspect.java rename to openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiTransactionManager.java index 6cbcccc648c..621adf0350e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiTransactionAspect.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiTransactionManager.java @@ -8,14 +8,29 @@ import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.Handles; @Slf4j -public class JdbiTransactionAspect { +public class JdbiTransactionManager { + private static JdbiTransactionManager instance; + private static volatile boolean initialized = false; private final JdbiHandleManager handleManager; private final Set IN_TRANSACTION_HANDLES = Collections.newSetFromMap(new ConcurrentHashMap<>()); - public JdbiTransactionAspect(JdbiHandleManager handleManager) { + private JdbiTransactionManager(JdbiHandleManager handleManager) { this.handleManager = handleManager; } + public static void initialize(JdbiHandleManager handleManager) { + if (!initialized) { + instance = new JdbiTransactionManager(handleManager); + initialized = true; + } else { + LOG.info("Jdbi Transaction Manager is already initialized"); + } + } + + public static JdbiTransactionManager getInstance() { + return instance; + } + public void begin(boolean autoCommit) { try { Handle handle = handleManager.get(); @@ -79,7 +94,13 @@ public class JdbiTransactionAspect { } public void terminateHandle() { + if (IN_TRANSACTION_HANDLES.contains(handleManager.get().hashCode())) { + handleManager.clear(); + } IN_TRANSACTION_HANDLES.remove(handleManager.get().hashCode()); - handleManager.clear(); + } + + public boolean containsHandle(int hashCode) { + return IN_TRANSACTION_HANDLES.contains(hashCode); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkApplicationEventListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkApplicationEventListener.java index b6066bf831d..209f86a3f80 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkApplicationEventListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkApplicationEventListener.java @@ -11,12 +11,9 @@ import org.glassfish.jersey.server.monitoring.RequestEventListener; @Slf4j public class JdbiUnitOfWorkApplicationEventListener implements ApplicationEventListener { - - private final JdbiUnitOfWorkProvider unitOfWorkProvider; private final Set excludedPaths; - public JdbiUnitOfWorkApplicationEventListener(JdbiUnitOfWorkProvider unitOfWorkProvider, Set excludedPaths) { - this.unitOfWorkProvider = unitOfWorkProvider; + public JdbiUnitOfWorkApplicationEventListener(Set excludedPaths) { this.excludedPaths = excludedPaths; } @@ -33,8 +30,8 @@ public class JdbiUnitOfWorkApplicationEventListener implements ApplicationEventL return null; } if (event.getContainerRequest().getMethod().equals(HttpMethod.GET)) { - return new HttpGetRequestJdbiUnitOfWorkEventListener(unitOfWorkProvider.getHandleManager()); + return new HttpGetRequestJdbiUnitOfWorkEventListener(); } - return new NonHttpGetRequestJdbiUnitOfWorkEventListener(unitOfWorkProvider.getHandleManager()); + return new NonHttpGetRequestJdbiUnitOfWorkEventListener(); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkProvider.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkProvider.java index 5268422c175..b30db8fe221 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkProvider.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkProvider.java @@ -1,14 +1,15 @@ package org.openmetadata.service.jdbi3.unitofwork; import com.google.common.reflect.Reflection; -import java.util.*; import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.Jdbi; @Slf4j @SuppressWarnings({"UnstableApiUsage", "rawtypes", "unchecked"}) public class JdbiUnitOfWorkProvider { - + private static JdbiUnitOfWorkProvider instance; + private static volatile boolean initialized = false; private final JdbiHandleManager handleManager; private JdbiUnitOfWorkProvider(JdbiHandleManager handleManager) { @@ -16,19 +17,35 @@ public class JdbiUnitOfWorkProvider { } public static JdbiUnitOfWorkProvider withDefault(Jdbi dbi) { - JdbiHandleManager handleManager = new RequestScopedJdbiHandleManager(dbi); - return new JdbiUnitOfWorkProvider(handleManager); + return initialize(new RequestScopedJdbiHandleManager(dbi)); } public static JdbiUnitOfWorkProvider withLinked(Jdbi dbi) { - JdbiHandleManager handleManager = new LinkedRequestScopedJdbiHandleManager(dbi); - return new JdbiUnitOfWorkProvider(handleManager); + return initialize(new LinkedRequestScopedJdbiHandleManager(dbi)); + } + + public static JdbiUnitOfWorkProvider getInstance() { + return instance; + } + + private static JdbiUnitOfWorkProvider initialize(JdbiHandleManager handleManager) { + if (!initialized) { + instance = new JdbiUnitOfWorkProvider(handleManager); + initialized = true; + } else { + LOG.info("JdbiUnitOfWorkProvider is already initialized"); + } + return instance; } public JdbiHandleManager getHandleManager() { return handleManager; } + public Handle getHandle() { + return handleManager.get(); + } + /** * getWrappedInstanceForDaoClass generates a proxy instance of the dao class for which the jdbi unit of work aspect * would be wrapped around with. This method however may be used in case the classpath scanning is disabled. If the @@ -37,15 +54,15 @@ public class JdbiUnitOfWorkProvider { * @param daoClass the DAO class for which a proxy needs to be created fo * @return the wrapped instance ready to be passed around */ - public static Object getWrappedInstanceForDaoClass(JdbiUnitOfWorkProvider provider, Class daoClass) { + public static Object getWrappedInstanceForDaoClass(Class daoClass) { if (daoClass == null) { throw new IllegalArgumentException("DAO Class cannot be null"); } LOG.debug( "Binding class [{}] with proxy handler [{}] ", daoClass.getSimpleName(), - provider.getHandleManager().getClass().getSimpleName()); - ManagedHandleInvocationHandler handler = new ManagedHandleInvocationHandler<>(provider, daoClass); + JdbiUnitOfWorkProvider.getInstance().getHandleManager().getClass().getSimpleName()); + ManagedHandleInvocationHandler handler = new ManagedHandleInvocationHandler<>(daoClass); Object proxiedInstance = Reflection.newProxy(daoClass, handler); return daoClass.cast(proxiedInstance); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/LinkedRequestScopedJdbiHandleManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/LinkedRequestScopedJdbiHandleManager.java index 7a86198ea12..b5527351707 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/LinkedRequestScopedJdbiHandleManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/LinkedRequestScopedJdbiHandleManager.java @@ -31,6 +31,11 @@ class LinkedRequestScopedJdbiHandleManager implements JdbiHandleManager { this.dbi = dbi; } + @Override + public Jdbi getJdbi() { + return dbi; + } + @Override public Handle get() { String parent = substringBetween(Thread.currentThread().getName()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/ManagedHandleInvocationHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/ManagedHandleInvocationHandler.java index dc24310e1a5..7bc80bb0dec 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/ManagedHandleInvocationHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/ManagedHandleInvocationHandler.java @@ -13,10 +13,8 @@ import org.openmetadata.service.jdbi3.CollectionDAO; public class ManagedHandleInvocationHandler implements InvocationHandler { private static final Object[] NO_ARGS = {}; private final Class underlying; - private final JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider; - public ManagedHandleInvocationHandler(JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider, Class underlying) { - this.jdbiUnitOfWorkProvider = jdbiUnitOfWorkProvider; + public ManagedHandleInvocationHandler(Class underlying) { this.underlying = underlying; } @@ -40,7 +38,7 @@ public class ManagedHandleInvocationHandler implements InvocationHandler { } private Object handleInvocation(Method method, Object[] args) throws Throwable { - Handle handle = jdbiUnitOfWorkProvider.getHandleManager().get(); + Handle handle = JdbiUnitOfWorkProvider.getInstance().getHandle(); LOG.debug( "{}.{} [{}] Thread Id [{}] with handle id [{}]", method.getDeclaringClass().getSimpleName(), @@ -50,8 +48,12 @@ public class ManagedHandleInvocationHandler implements InvocationHandler { handle.hashCode()); if (CollectionDAO.class.isAssignableFrom(underlying) && method.isAnnotationPresent(CreateSqlObject.class)) { - return getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, method.getReturnType()); + return getWrappedInstanceForDaoClass(method.getReturnType()); } else { + if (!JdbiTransactionManager.getInstance().containsHandle(handle.hashCode())) { + // This is non-transactional request + handle.getConnection().setAutoCommit(true); + } Object dao = handle.attach(underlying); try { return method.invoke(dao, args); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/NonHttpGetRequestJdbiUnitOfWorkEventListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/NonHttpGetRequestJdbiUnitOfWorkEventListener.java index 6ac779c9cfe..d5804d63e0f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/NonHttpGetRequestJdbiUnitOfWorkEventListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/NonHttpGetRequestJdbiUnitOfWorkEventListener.java @@ -8,11 +8,7 @@ import org.glassfish.jersey.server.monitoring.RequestEventListener; @Slf4j class NonHttpGetRequestJdbiUnitOfWorkEventListener implements RequestEventListener { - private final JdbiTransactionAspect transactionAspect; - - NonHttpGetRequestJdbiUnitOfWorkEventListener(JdbiHandleManager handleManager) { - this.transactionAspect = new JdbiTransactionAspect(handleManager); - } + NonHttpGetRequestJdbiUnitOfWorkEventListener() {} @Override public void onEvent(RequestEvent event) { @@ -23,13 +19,13 @@ class NonHttpGetRequestJdbiUnitOfWorkEventListener implements RequestEventListen boolean isTransactional = isTransactional(event); if (isTransactional) { if (type == RequestEvent.Type.RESOURCE_METHOD_START) { - transactionAspect.begin(false); + JdbiTransactionManager.getInstance().begin(false); } else if (type == RequestEvent.Type.RESP_FILTERS_START) { - transactionAspect.commit(); + JdbiTransactionManager.getInstance().commit(); } else if (type == RequestEvent.Type.ON_EXCEPTION) { - transactionAspect.rollback(); + JdbiTransactionManager.getInstance().rollback(); } else if (type == RequestEvent.Type.FINISHED) { - transactionAspect.terminateHandle(); + JdbiTransactionManager.getInstance().terminateHandle(); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/RequestScopedJdbiHandleManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/RequestScopedJdbiHandleManager.java index 27ffb1d907b..bf2ce23d24f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/RequestScopedJdbiHandleManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/RequestScopedJdbiHandleManager.java @@ -6,7 +6,6 @@ import org.jdbi.v3.core.Jdbi; @Slf4j class RequestScopedJdbiHandleManager implements JdbiHandleManager { - private final Jdbi dbi; @SuppressWarnings("ThreadLocalUsage") @@ -16,6 +15,11 @@ class RequestScopedJdbiHandleManager implements JdbiHandleManager { this.dbi = dbi; } + @Override + public Jdbi getJdbi() { + return dbi; + } + @Override public Handle get() { if (threadLocal.get() == null) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java index 2ec0aa7886c..a687d81c8e9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java @@ -77,7 +77,7 @@ public class SearchResource { public void initialize(OpenMetadataApplicationConfig config) { if (config.getElasticSearchConfiguration() != null) { searchClient = IndexUtil.getSearchClient(config.getElasticSearchConfiguration(), dao); - ReIndexingHandler.initialize(searchClient, dao); + ReIndexingHandler.initialize(searchClient); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/NotificationHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/NotificationHandler.java index b5a979e4976..9060b48cac1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/NotificationHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/NotificationHandler.java @@ -31,7 +31,6 @@ import java.util.concurrent.Executors; import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; -import org.jdbi.v3.core.Handle; import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.entity.teams.Team; import org.openmetadata.schema.entity.teams.User; @@ -49,29 +48,23 @@ import org.openmetadata.service.socket.WebSocketManager; @Slf4j public class NotificationHandler { private final ObjectMapper mapper; - - private final JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider; private final ExecutorService threadScheduler; - public NotificationHandler(JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider) { + public NotificationHandler() { this.mapper = new ObjectMapper(); this.threadScheduler = Executors.newFixedThreadPool(1); - this.jdbiUnitOfWorkProvider = jdbiUnitOfWorkProvider; } public void processNotifications(ContainerResponseContext responseContext) { threadScheduler.submit( () -> { try { - Handle handle = jdbiUnitOfWorkProvider.getHandleManager().get(); - handle.getConnection().setAutoCommit(true); - CollectionDAO collectionDAO = - (CollectionDAO) getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, CollectionDAO.class); + CollectionDAO collectionDAO = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class); handleNotifications(responseContext, collectionDAO); } catch (Exception ex) { LOG.error("[NotificationHandler] Failed to use mapper in converting to Json", ex); } finally { - jdbiUnitOfWorkProvider.getHandleManager().clear(); + JdbiUnitOfWorkProvider.getInstance().getHandleManager().clear(); } }); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java index 56711b5e7c0..fbb4a45792b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java @@ -13,6 +13,8 @@ package org.openmetadata.service.util; +import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass; + import java.time.LocalDateTime; import java.time.ZoneId; import java.util.ArrayList; @@ -63,10 +65,10 @@ public class ReIndexingHandler { return instance; } - public static void initialize(SearchClient client, CollectionDAO daoObject) { + public static void initialize(SearchClient client) { if (!initialized) { searchClient = client; - dao = daoObject; + dao = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class); taskQueue = new ArrayBlockingQueue<>(5); threadScheduler = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, taskQueue); instance = new ReIndexingHandler(); @@ -117,7 +119,7 @@ public class ReIndexingHandler { "eventPublisherJob", JsonUtils.pojoToJson(jobData)); // Create Job - SearchIndexWorkflow job = new SearchIndexWorkflow(dao, searchClient, jobData); + SearchIndexWorkflow job = new SearchIndexWorkflow(searchClient, jobData); threadScheduler.submit(job); REINDEXING_JOB_MAP.put(jobData.getId(), job); return jobData; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java index 7d9fa708a8a..502fe7364c7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java @@ -13,6 +13,7 @@ package org.openmetadata.service.workflows.searchIndex; +import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass; import static org.openmetadata.service.util.ReIndexingHandler.REINDEXING_JOB_EXTENSION; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess; @@ -74,8 +75,8 @@ public class SearchIndexWorkflow implements Runnable { private final CollectionDAO dao; private volatile boolean stopped = false; - public SearchIndexWorkflow(CollectionDAO dao, SearchClient client, EventPublisherJob request) { - this.dao = dao; + public SearchIndexWorkflow(SearchClient client, EventPublisherJob request) { + this.dao = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class); this.jobData = request; request .getEntities()