Remove jdbi unit of work (#13550)

* Remove JDBI Unit of Work transactions

* Remove JDBI Unit of Work transactions

* Remove JDBI Unit of Work transactions
This commit is contained in:
Sriharsha Chintalapani 2023-10-12 16:07:50 -07:00 committed by GitHub
parent e66b9bbeea
commit e79ce5fe07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 153 additions and 538 deletions

View File

@ -35,6 +35,7 @@ import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVFormat.Builder;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.type.EntityReference;
@ -335,6 +336,7 @@ public abstract class EntityCsv<T extends EntityInterface> {
}
}
@Transaction
private void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T entity) throws IOException {
entity.setId(UUID.randomUUID());
entity.setUpdatedBy(importedBy);

View File

@ -13,7 +13,6 @@
package org.openmetadata.service;
import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass;
import static org.openmetadata.service.util.MicrometerBundleSingleton.webAnalyticEvents;
import io.dropwizard.Application;
@ -41,7 +40,6 @@ import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.time.temporal.ChronoUnit;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Optional;
import javax.naming.ConfigurationException;
import javax.servlet.DispatcherType;
@ -84,9 +82,6 @@ import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.jdbi3.unitofwork.JdbiTransactionManager;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkApplicationEventListener;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;
import org.openmetadata.service.migration.Migration;
import org.openmetadata.service.migration.api.MigrationWorkflow;
import org.openmetadata.service.monitoring.EventMonitor;
@ -139,11 +134,8 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
ChangeEventConfig.initialize(catalogConfig);
final Jdbi jdbi = createAndSetupJDBI(environment, catalogConfig.getDataSourceFactory());
JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider = JdbiUnitOfWorkProvider.withDefault(jdbi);
JdbiTransactionManager.initialize(jdbiUnitOfWorkProvider.getHandleManager());
CollectionDAO collectionDAO = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class);
CollectionDAO collectionDAO = jdbi.onDemand(CollectionDAO.class);
Entity.setCollectionDAO(collectionDAO);
environment.jersey().register(new JdbiUnitOfWorkApplicationEventListener(new HashSet<>()));
// initialize Search Repository, all repositories use SearchRepository this line should always before initializing
// repository

View File

@ -16,7 +16,6 @@ package org.openmetadata.service.events;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import static org.openmetadata.service.formatter.util.FormatterUtil.getChangeEventFromResponseContext;
import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
@ -56,7 +55,7 @@ public class ChangeEventHandler implements EventHandler {
SecurityContext securityContext = requestContext.getSecurityContext();
String loggedInUserName = securityContext.getUserPrincipal().getName();
try {
CollectionDAO collectionDAO = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class);
CollectionDAO collectionDAO = Entity.getCollectionDAO();
CollectionDAO.ChangeEventDAO changeEventDAO = collectionDAO.changeEventDAO();
FeedRepository feedRepository = new FeedRepository();
if (responseContext.getEntity() != null && responseContext.getEntity().getClass().equals(Thread.class)) {

View File

@ -14,6 +14,7 @@
package org.openmetadata.service.jdbi3;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.Bot;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.type.EntityReference;
@ -84,6 +85,7 @@ public class BotRepository extends EntityRepository<Bot> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateUser(original, updated);

View File

@ -17,6 +17,7 @@ import static org.openmetadata.schema.type.Include.ALL;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.Chart;
import org.openmetadata.schema.entity.services.DashboardService;
@ -104,6 +105,7 @@ public class ChartRepository extends EntityRepository<Chart> {
super(chart, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
recordChange("chartType", original.getChartType(), updated.getChartType());

View File

@ -23,6 +23,7 @@ import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.classification.Classification;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.ProviderType;
@ -106,6 +107,7 @@ public class ClassificationRepository extends EntityRepository<Classification> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
// TODO handle name change

View File

@ -12,6 +12,7 @@ import static org.openmetadata.service.Entity.STORAGE_SERVICE;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.feed.ResolveTask;
import org.openmetadata.schema.entity.data.Container;
@ -292,6 +293,7 @@ public class ContainerRepository extends EntityRepository<Container> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateDataModel(original, updated);

View File

@ -21,6 +21,7 @@ import static org.openmetadata.service.Entity.FIELD_TAGS;
import java.util.List;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.feed.ResolveTask;
import org.openmetadata.schema.entity.data.DashboardDataModel;
@ -232,6 +233,7 @@ public class DashboardDataModelRepository extends EntityRepository<DashboardData
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
DatabaseUtil.validateColumns(original.getColumns());

View File

@ -21,6 +21,7 @@ import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.Chart;
import org.openmetadata.schema.entity.data.Dashboard;
@ -205,6 +206,7 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
update(Entity.CHART, "charts", listOrEmpty(updated.getCharts()), listOrEmpty(original.getCharts()));

View File

@ -21,6 +21,7 @@ import static org.openmetadata.service.util.EntityUtil.entityReferenceMatch;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.domains.DataProduct;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Relationship;
@ -95,6 +96,7 @@ public class DataProductRepository extends EntityRepository<DataProduct> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateAssets();

View File

@ -18,6 +18,7 @@ import static org.openmetadata.service.Entity.DATABASE_SERVICE;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.Database;
import org.openmetadata.schema.entity.services.DatabaseService;
@ -129,6 +130,7 @@ public class DatabaseRepository extends EntityRepository<Database> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod());

View File

@ -18,6 +18,7 @@ import static org.openmetadata.schema.type.Include.ALL;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.Database;
import org.openmetadata.schema.entity.data.DatabaseSchema;
@ -140,6 +141,7 @@ public class DatabaseSchemaRepository extends EntityRepository<DatabaseSchema> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod());

View File

@ -16,6 +16,7 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.service.Entity.DOCUMENT;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entities.docStore.Document;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.docstore.DocStoreResource;
@ -84,6 +85,7 @@ public class DocumentRepository extends EntityRepository<Document> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
recordChange("data", original.getData(), updated.getData(), true);

View File

@ -18,6 +18,7 @@ import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.DOMAIN;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.domains.Domain;
import org.openmetadata.schema.type.EntityReference;
@ -121,6 +122,7 @@ public class DomainRepository extends EntityRepository<Domain> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
recordChange("domainType", original.getDomainType(), updated.getDomainType());

View File

@ -91,6 +91,7 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.CreateEntity;
import org.openmetadata.schema.EntityInterface;
@ -415,6 +416,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
/** Initialize a given entity if it does not exist. */
@Transaction
public void initializeEntity(T entity) {
String existingJson = dao.findJsonByFqn(entity.getFullyQualifiedName(), ALL);
if (existingJson != null) {
@ -732,6 +734,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
return entity;
}
@Transaction
public final PutResponse<T> createOrUpdate(UriInfo uriInfo, T updated) {
T original = JsonUtils.readValue(dao.findJsonByFqn(updated.getFullyQualifiedName(), ALL), entityClass);
if (original == null) { // If an original entity does not exist then create it, else update
@ -754,6 +757,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
}
@Transaction
public PutResponse<T> update(UriInfo uriInfo, T original, T updated) {
// Get all the fields in the original entity that can be updated during PUT operation
setFieldsInternal(original, putFields);
@ -771,6 +775,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
return new PutResponse<>(Status.OK, withHref(uriInfo, updated), change);
}
@Transaction
public final PatchResponse<T> patch(UriInfo uriInfo, UUID id, String user, JsonPatch patch) {
// Get all the fields in the original entity that can be updated during PATCH operation
T original = setFieldsInternal(dao.findEntityById(id), patchFields);
@ -792,6 +797,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
return new PatchResponse<>(Status.OK, withHref(uriInfo, updated), change);
}
@Transaction
public PutResponse<T> addFollower(String updatedBy, UUID entityId, UUID userId) {
// Get entity
T entity = dao.findEntityById(entityId);
@ -827,6 +833,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
return new PutResponse<>(Status.OK, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED);
}
@Transaction
public PutResponse<T> updateVote(String updatedBy, UUID entityId, VoteRequest request) {
T originalEntity = dao.findEntityById(entityId);
@ -871,12 +878,14 @@ public abstract class EntityRepository<T extends EntityInterface> {
return new PutResponse<>(Status.OK, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED);
}
@Transaction
public final DeleteResponse<T> delete(String updatedBy, UUID id, boolean recursive, boolean hardDelete) {
DeleteResponse<T> response = deleteInternal(updatedBy, id, recursive, hardDelete);
postDelete(response.getEntity());
return response;
}
@Transaction
public final DeleteResponse<T> deleteByName(String updatedBy, String name, boolean recursive, boolean hardDelete) {
name = quoteFqn ? quoteName(name) : name;
DeleteResponse<T> response = deleteInternalByName(updatedBy, name, recursive, hardDelete);
@ -907,6 +916,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
}
@Transaction
private DeleteResponse<T> delete(String deletedBy, T original, boolean recursive, boolean hardDelete) {
checkSystemEntityDeletion(original);
preDelete(original, deletedBy);
@ -930,6 +940,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
return new DeleteResponse<>(updated, changeType);
}
@Transaction
public final DeleteResponse<T> deleteInternalByName(
String updatedBy, String name, boolean recursive, boolean hardDelete) {
// Validate entity
@ -937,12 +948,14 @@ public abstract class EntityRepository<T extends EntityInterface> {
return delete(updatedBy, entity, recursive, hardDelete);
}
@Transaction
public final DeleteResponse<T> deleteInternal(String updatedBy, UUID id, boolean recursive, boolean hardDelete) {
// Validate entity
T entity = dao.findEntityById(id, ALL);
return delete(updatedBy, entity, recursive, hardDelete);
}
@Transaction
private void deleteChildren(UUID id, boolean recursive, boolean hardDelete, String updatedBy) {
// If an entity being deleted contains other **non-deleted** children entities, it can't be deleted
List<EntityRelationshipRecord> childrenRecords =
@ -970,6 +983,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
}
@Transaction
protected void cleanup(T entityInterface) {
UUID id = entityInterface.getId();
@ -1008,6 +1022,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
CACHE_WITH_NAME.invalidate(new ImmutablePair<>(entityType, entity.getFullyQualifiedName()));
}
@Transaction
public PutResponse<T> deleteFollower(String updatedBy, UUID entityId, UUID userId) {
T entity = find(entityId, NON_DELETED);
@ -1045,6 +1060,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
return new ResultList<>(entities, errors, beforeCursor, afterCursor, total);
}
@Transaction
private T createNewEntity(T entity) {
storeEntity(entity, false);
storeExtension(entity);
@ -1054,6 +1070,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
return entity;
}
@Transaction
protected void store(T entity, boolean update) {
// Don't store owner, database, href and tags as JSON. Build it on the fly based on relationships
entity.withHref(null);
@ -1091,10 +1108,12 @@ public abstract class EntityRepository<T extends EntityInterface> {
entity.setExperts(experts);
}
@Transaction
protected void storeTimeSeries(String fqn, String extension, String jsonSchema, String entityJson, Long timestamp) {
daoCollection.entityExtensionTimeSeriesDao().insert(fqn, extension, jsonSchema, entityJson);
}
@Transaction
public String getExtensionAtTimestamp(String fqn, String extension, Long timestamp) {
return daoCollection.entityExtensionTimeSeriesDao().getExtensionAtTimestamp(fqn, extension, timestamp);
}
@ -1116,10 +1135,12 @@ public abstract class EntityRepository<T extends EntityInterface> {
.listBetweenTimestampsByOrder(fqn, extension, startTs, endTs, orderBy);
}
@Transaction
public void deleteExtensionAtTimestamp(String fqn, String extension, Long timestamp) {
daoCollection.entityExtensionTimeSeriesDao().deleteAtTimestamp(fqn, extension, timestamp);
}
@Transaction
public void deleteExtensionBeforeTimestamp(String fqn, String extension, Long timestamp) {
daoCollection.entityExtensionTimeSeriesDao().deleteBeforeTimestamp(fqn, extension, timestamp);
}
@ -1230,6 +1251,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
}
@Transaction
/** Apply tags {@code tagLabels} to the entity or field identified by {@code targetFQN} */
public void applyTags(List<TagLabel> tagLabels, String targetFQN) {
for (TagLabel tagLabel : listOrEmpty(tagLabels)) {
@ -1318,6 +1340,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
return RestUtil.getHref(uriInfo, collectionPath, id);
}
@Transaction
public PutResponse<T> restoreEntity(String updatedBy, String entityType, UUID id) {
// If an entity being restored contains other **deleted** children entities, restore them
List<EntityRelationshipRecord> records =
@ -1352,6 +1375,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
addRelationship(fromId, toId, fromEntity, toEntity, relationship, null, bidirectional);
}
@Transaction
public void addRelationship(
UUID fromId,
UUID toId,
@ -1371,6 +1395,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
daoCollection.relationshipDAO().insert(from, to, fromEntity, toEntity, relationship.ordinal(), json);
}
@Transaction
public final void bulkAddToRelationship(
UUID fromId, List<UUID> toId, String fromEntity, String toEntity, Relationship relationship) {
daoCollection
@ -1577,6 +1602,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
EntityUtil.copy(ref, owner);
}
@Transaction
protected void storeOwner(T entity, EntityReference owner) {
if (supportsOwner && owner != null) {
// Add relationship owner --- owns ---> ownedEntity
@ -1590,6 +1616,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
}
@Transaction
protected void storeDomain(T entity, EntityReference domain) {
if (supportsDomain && domain != null) {
// Add relationship domain --- has ---> entity
@ -1598,6 +1625,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
}
@Transaction
protected void storeDataProducts(T entity, List<EntityReference> dataProducts) {
if (supportsDataProducts && !nullOrEmpty(dataProducts)) {
for (EntityReference dataProduct : dataProducts) {
@ -1609,6 +1637,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
}
@Transaction
/** Remove owner relationship for a given entity */
private void removeOwner(T entity, EntityReference owner) {
if (EntityUtil.getId(owner) != null) {
@ -1617,6 +1646,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
}
@Transaction
public void updateOwner(T ownedEntity, EntityReference originalOwner, EntityReference newOwner) {
// TODO inefficient use replace instead of delete and add and check for orig and new owners being the same
validateOwner(newOwner);
@ -1779,6 +1809,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
: getEntityByName(Entity.USER, updated.getUpdatedBy(), "", NON_DELETED);
}
@Transaction
/** Compare original and updated entities and perform updates. Update the entity version and track changes. */
public final void update() {
if (operation.isDelete()) { // DELETE Operation

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.jdbi3;
import java.util.UUID;
import lombok.Getter;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.search.SearchRepository;
@ -27,6 +28,7 @@ public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInter
Entity.registerEntity(entityClass, entityType, this);
}
@Transaction
public T createNewRecord(T record, String extension, String recordFQN) {
record.setId(UUID.randomUUID());
timeSeriesDao.insert(recordFQN, extension, entityType, JsonUtils.pojoToJson(record));

View File

@ -22,6 +22,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.events.EventFilterRule;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionStatus;
@ -101,6 +102,7 @@ public class EventSubscriptionRepository extends EntityRepository<EventSubscript
return subscriptionPublisherMap.get(id);
}
@Transaction
public void addSubscriptionPublisher(EventSubscription eventSubscription) {
switch (eventSubscription.getAlertType()) {
case CHANGE_EVENT:
@ -133,6 +135,7 @@ public class EventSubscriptionRepository extends EntityRepository<EventSubscript
return new SubscriptionStatus().withStatus(status).withTimestamp(System.currentTimeMillis());
}
@Transaction
@SneakyThrows
public void updateEventSubscription(EventSubscription eventSubscription) {
switch (eventSubscription.getAlertType()) {
@ -165,6 +168,7 @@ public class EventSubscriptionRepository extends EntityRepository<EventSubscript
}
}
@Transaction
public void removeProcessorForEventSubscription(UUID id, SubscriptionStatus reasonForRemoval)
throws InterruptedException {
SubscriptionPublisher publisher = subscriptionPublisherMap.get(id);
@ -177,6 +181,7 @@ public class EventSubscriptionRepository extends EntityRepository<EventSubscript
}
}
@Transaction
public void deleteEventSubscriptionPublisher(EventSubscription deletedEntity)
throws InterruptedException, SchedulerException {
switch (deletedEntity.getAlertType()) {

View File

@ -50,6 +50,7 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.json.JSONObject;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.feed.CloseTask;
@ -209,21 +210,25 @@ public class FeedRepository {
return new ThreadContext(thread, event);
}
@Transaction
public Thread create(Thread thread) {
ThreadContext threadContext = getThreadContext(thread);
return createThread(threadContext);
}
@Transaction
public Thread create(Thread thread, ChangeEvent event) {
ThreadContext threadContext = getThreadContext(thread, event);
return createThread(threadContext);
}
@Transaction
public void store(ThreadContext threadContext) {
// Insert a new thread
dao.feedDAO().insert(JsonUtils.pojoToJson(threadContext.getThread()));
}
@Transaction
public void storeRelationships(ThreadContext threadContext) {
Thread thread = threadContext.getThread();
EntityLink about = threadContext.getAbout();
@ -332,6 +337,7 @@ public class FeedRepository {
return tags.stream().map(TagLabel::getTagFQN).collect(Collectors.joining(", "));
}
@Transaction
private void addClosingPost(Thread thread, String user, String closingComment) {
// Add a post to the task
String message;
@ -360,6 +366,7 @@ public class FeedRepository {
addPostToThread(thread.getId(), post, user);
}
@Transaction
public void closeTask(Thread thread, String user, CloseTask closeTask) {
ThreadContext threadContext = getThreadContext(thread);
TaskDetails task = thread.getTask();
@ -397,6 +404,7 @@ public class FeedRepository {
null));
}
@Transaction
public Thread addPostToThread(UUID id, Post post, String userName) {
// Validate the user posting the message
UUID fromUserId = Entity.getEntityReferenceByName(USER, post.getFrom(), NON_DELETED).getId();
@ -428,6 +436,7 @@ public class FeedRepository {
return post.get();
}
@Transaction
public DeleteResponse<Post> deletePost(Thread thread, Post post, String userName) {
List<Post> posts = thread.getPosts();
// Remove the post to be deleted from the posts list
@ -442,12 +451,14 @@ public class FeedRepository {
return new DeleteResponse<>(post, RestUtil.ENTITY_DELETED);
}
@Transaction
public DeleteResponse<Thread> deleteThread(Thread thread, String deletedByUser) {
deleteThreadInternal(thread.getId());
LOG.info("{} deleted thread with id {}", deletedByUser, thread.getId());
return new DeleteResponse<>(thread, RestUtil.ENTITY_DELETED);
}
@Transaction
public void deleteThreadInternal(UUID id) {
// Delete all the relationships to other entities
dao.relationshipDAO().deleteAll(id, Entity.THREAD);
@ -459,6 +470,7 @@ public class FeedRepository {
dao.feedDAO().delete(id);
}
@Transaction
public void deleteByAbout(UUID entityId) {
List<String> threadIds = listOrEmpty(dao.feedDAO().findByEntityId(entityId.toString()));
for (String threadId : threadIds) {
@ -611,6 +623,7 @@ public class FeedRepository {
return new ResultList<>(threads, beforeCursor, afterCursor, total);
}
@Transaction
private void storeReactions(Thread thread, String user) {
// Reactions are captured at the thread level. If the user reacted to a post of a thread,
// it will still be tracked as "user reacted to thread" since this will only be used to filter

View File

@ -38,6 +38,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.csv.CsvUtil;
import org.openmetadata.csv.EntityCsv;
import org.openmetadata.schema.EntityInterface;
@ -270,6 +271,7 @@ public class GlossaryRepository extends EntityRepository<Glossary> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateReviewers(original, updated);

View File

@ -36,6 +36,7 @@ import java.util.UUID;
import javax.json.JsonPatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.data.TermReference;
import org.openmetadata.schema.api.feed.CloseTask;
@ -368,6 +369,7 @@ public class GlossaryTermRepository extends EntityRepository<GlossaryTerm> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
validateParent();

View File

@ -17,6 +17,7 @@ import java.util.List;
import java.util.UUID;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.json.JSONObject;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
@ -87,6 +88,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
ingestionPipeline.setService(entityReference);
}
@Transaction
public IngestionPipeline deletePipelineStatus(UUID ingestionPipelineId) {
// Validate the request content
IngestionPipeline ingestionPipeline = dao.findEntityById(ingestionPipelineId);
@ -247,6 +249,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
super(buildIngestionPipelineDecrypted(original), updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateSourceConfig();

View File

@ -8,6 +8,7 @@ import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.dataInsight.ChartParameterValues;
import org.openmetadata.schema.dataInsight.DataInsightChart;
@ -99,6 +100,7 @@ public class KpiRepository extends EntityRepository<Kpi> {
addRelationship(kpi.getId(), kpi.getDataInsightChart().getId(), KPI, DATA_INSIGHT_CHART, Relationship.USES);
}
@Transaction
public RestUtil.PutResponse<?> addKpiResult(UriInfo uriInfo, String fqn, KpiResult kpiResult) {
// Validate the request content
Kpi kpi = dao.findEntityByName(fqn);
@ -114,6 +116,7 @@ public class KpiRepository extends EntityRepository<Kpi> {
return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED);
}
@Transaction
public RestUtil.PutResponse<?> deleteKpiResult(String fqn, Long timestamp) {
// Validate the request content
Kpi kpi = dao.findEntityByName(fqn);
@ -185,6 +188,7 @@ public class KpiRepository extends EntityRepository<Kpi> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateToRelationship(

View File

@ -17,6 +17,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.ColumnsEntityInterface;
import org.openmetadata.schema.api.lineage.AddLineage;
import org.openmetadata.schema.entity.data.Table;
@ -51,6 +52,7 @@ public class LineageRepository {
return getLineage(ref, upstreamDepth, downstreamDepth);
}
@Transaction
public void addLineage(AddLineage addLineage) {
// Validate from entity
EntityReference from = addLineage.getEdge().getFromEntity();
@ -119,6 +121,7 @@ public class LineageRepository {
|| !(to.getType().equals(Entity.TABLE) || to.getType().equals(Entity.DASHBOARD_DATA_MODEL));
}
@Transaction
public boolean deleteLineage(String fromEntity, String fromId, String toEntity, String toId) {
// Validate from entity
EntityReference from = Entity.getEntityReferenceById(fromEntity, UUID.fromString(fromId), Include.NON_DELETED);

View File

@ -26,6 +26,7 @@ import static org.openmetadata.service.util.EntityUtil.mlHyperParameterMatch;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.feed.ResolveTask;
import org.openmetadata.schema.entity.data.MlModel;
@ -325,6 +326,7 @@ public class MlModelRepository extends EntityRepository<MlModel> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateAlgorithm(original, updated);

View File

@ -18,6 +18,7 @@ import static org.openmetadata.service.Entity.PERSONA;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.teams.Persona;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Relationship;
@ -106,6 +107,7 @@ public class PersonaRepository extends EntityRepository<Persona> {
updateUsers(original, updated);
}
@Transaction
private void updateUsers(Persona origPersona, Persona updatedPersona) {
List<EntityReference> origUsers = listOrEmpty(origPersona.getUsers());
List<EntityReference> updatedUsers = listOrEmpty(updatedPersona.getUsers());

View File

@ -23,6 +23,7 @@ import static org.openmetadata.service.util.EntityUtil.taskMatch;
import java.util.ArrayList;
import java.util.List;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.feed.ResolveTask;
import org.openmetadata.schema.entity.data.Pipeline;
@ -341,6 +342,7 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateTasks(original, updated);

View File

@ -30,6 +30,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.policies.Policy;
import org.openmetadata.schema.entity.policies.accessControl.Rule;
import org.openmetadata.schema.type.EntityReference;
@ -149,6 +150,7 @@ public class PolicyRepository extends EntityRepository<Policy> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
recordChange(ENABLED, original.getEnabled(), updated.getEnabled());

View File

@ -9,6 +9,7 @@ import java.util.*;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import lombok.SneakyThrows;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.data.Query;
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.entity.teams.User;
@ -214,6 +215,7 @@ public class QueryRepository extends EntityRepository<Query> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateFromRelationships(

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.teams.Role;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Relationship;
@ -123,6 +124,7 @@ public class RoleRepository extends EntityRepository<Role> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updatePolicies(listOrEmpty(original.getPolicies()), listOrEmpty(updated.getPolicies()));

View File

@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.feed.ResolveTask;
import org.openmetadata.schema.entity.data.SearchIndex;
@ -377,6 +378,7 @@ public class SearchIndexRepository extends EntityRepository<SearchIndex> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
if (updated.getFields() != null) {

View File

@ -16,6 +16,7 @@ import static org.openmetadata.service.util.EntityUtil.objectMatch;
import java.util.UUID;
import lombok.Getter;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.ServiceConnectionEntityInterface;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.entity.services.ServiceType;
@ -111,6 +112,7 @@ public abstract class ServiceEntityRepository<
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateConnection();

View File

@ -5,6 +5,7 @@ import static org.openmetadata.service.Entity.DATABASE_SCHEMA;
import static org.openmetadata.service.Entity.FIELD_FOLLOWERS;
import static org.openmetadata.service.Entity.STORED_PROCEDURE;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.DatabaseSchema;
import org.openmetadata.schema.entity.data.StoredProcedure;
@ -116,6 +117,7 @@ public class StoredProcedureRepository extends EntityRepository<StoredProcedure>
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
// storedProcedureCode is a required field. Cannot be null.

View File

@ -6,6 +6,7 @@ import javax.json.JsonValue;
import javax.ws.rs.core.Response;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.api.configuration.SlackAppConfiguration;
import org.openmetadata.schema.email.SmtpSettings;
import org.openmetadata.schema.settings.Settings;
@ -99,6 +100,7 @@ public class SystemRepository {
return null;
}
@Transaction
public Response createOrUpdate(Settings setting) {
Settings oldValue = getConfigWithKey(setting.getConfigType().toString());
try {

View File

@ -41,6 +41,7 @@ import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.data.CreateTableProfile;
@ -179,6 +180,7 @@ public class TableRepository extends EntityRepository<Table> {
ColumnUtil.setColumnFQN(table.getFullyQualifiedName(), table.getColumns());
}
@Transaction
public Table addJoins(UUID tableId, TableJoins joins) {
// Validate the request content
Table table = dao.findEntityById(tableId);
@ -210,6 +212,7 @@ public class TableRepository extends EntityRepository<Table> {
return table.withJoins(getJoins(table));
}
@Transaction
public Table addSampleData(UUID tableId, TableData tableData) {
// Validate the request content
Table table = dao.findEntityById(tableId);
@ -255,6 +258,7 @@ public class TableRepository extends EntityRepository<Table> {
return table;
}
@Transaction
public Table deleteSampleData(UUID tableId) {
// Validate the request content
Table table = dao.findEntityById(tableId);
@ -284,6 +288,7 @@ public class TableRepository extends EntityRepository<Table> {
.orElse(null);
}
@Transaction
public Table addTableProfilerConfig(UUID tableId, TableProfilerConfig tableProfilerConfig) {
// Validate the request content
Table table = dao.findEntityById(tableId);
@ -313,6 +318,7 @@ public class TableRepository extends EntityRepository<Table> {
return table.withTableProfilerConfig(tableProfilerConfig);
}
@Transaction
public Table deleteTableProfilerConfig(UUID tableId) {
// Validate the request content
Table table = dao.findEntityById(tableId);

View File

@ -23,6 +23,7 @@ import java.util.Objects;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.classification.Tag;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.ProviderType;
@ -135,6 +136,7 @@ public class TagRepository extends EntityRepository<Tag> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
recordChange("mutuallyExclusive", original.getMutuallyExclusive(), updated.getMutuallyExclusive());

View File

@ -53,6 +53,7 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.csv.EntityCsv;
import org.openmetadata.schema.api.teams.CreateTeam.TeamType;
@ -664,6 +665,7 @@ public class TeamRepository extends EntityRepository<Team> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
if (original.getTeamType() != updated.getTeamType()) {

View File

@ -18,6 +18,7 @@ import java.util.stream.Collectors;
import javax.json.JsonPatch;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.tests.ResultSummary;
import org.openmetadata.schema.tests.TestCase;
@ -418,6 +419,7 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
}
}
@Transaction
public RestUtil.PutResponse<TestSuite> addTestCasesToLogicalTestSuite(TestSuite testSuite, List<UUID> testCaseIds) {
bulkAddToRelationship(testSuite.getId(), testCaseIds, TEST_SUITE, TEST_CASE, Relationship.CONTAINS);
List<EntityReference> testCasesEntityReferences = new ArrayList<>();
@ -459,6 +461,7 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
return new RestUtil.PutResponse<>(Response.Status.OK, testSuite, LOGICAL_TEST_CASES_ADDED);
}
@Transaction
public RestUtil.DeleteResponse<TestCase> deleteTestCaseFromLogicalTestSuite(UUID testSuiteId, UUID testCaseId) {
TestCase testCase = Entity.getEntity(Entity.TEST_CASE, testCaseId, null, null);
deleteRelationship(testSuiteId, TEST_SUITE, testCaseId, TEST_CASE, Relationship.CONTAINS);
@ -469,6 +472,7 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
return new RestUtil.DeleteResponse<>(testCase, RestUtil.ENTITY_DELETED);
}
@Transaction
/** Remove test case from test suite summary and update test suite */
private void removeTestCaseFromTestSuiteResultSummary(UUID testSuiteId, String testCaseFqn) {
TestSuite testSuite = Entity.getEntity(TEST_SUITE, testSuiteId, "*", Include.ALL, false);
@ -569,6 +573,7 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
EntityLink origEntityLink = EntityLink.parse(original.getEntityLink());

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.service.Entity.TEST_CONNECTION_DEFINITION;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.entity.services.connections.TestConnectionDefinition;
import org.openmetadata.service.Entity;
@ -76,6 +77,7 @@ public class TestConnectionDefinitionRepository extends EntityRepository<TestCon
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
recordChange("steps", original.getSteps(), updated.getSteps(), true);

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.service.Entity.TEST_DEFINITION;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.tests.TestDefinition;
import org.openmetadata.service.Entity;
@ -57,6 +58,7 @@ public class TestDefinitionRepository extends EntityRepository<TestDefinition> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
recordChange("testPlatforms", original.getTestPlatforms(), updated.getTestPlatforms());

View File

@ -11,6 +11,7 @@ import java.util.Map;
import java.util.UUID;
import javax.ws.rs.core.SecurityContext;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.tests.ResultSummary;
import org.openmetadata.schema.tests.TestCase;
@ -198,6 +199,7 @@ public class TestSuiteRepository extends EntityRepository<TestSuite> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
List<EntityReference> origTests = listOrEmpty(original.getTests());

View File

@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.feed.ResolveTask;
import org.openmetadata.schema.entity.data.Topic;
@ -385,6 +386,7 @@ public class TopicRepository extends EntityRepository<Topic> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
recordChange("maximumMessageSize", original.getMaximumMessageSize(), updated.getMaximumMessageSize());

View File

@ -28,6 +28,7 @@ import java.util.UUID;
import javax.ws.rs.core.UriInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Triple;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.Type;
import org.openmetadata.schema.entity.type.Category;
import org.openmetadata.schema.entity.type.CustomProperty;
@ -156,6 +157,7 @@ public class TypeRepository extends EntityRepository<Type> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateCustomProperties();

View File

@ -28,6 +28,7 @@ import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.Chart;
import org.openmetadata.schema.entity.data.Dashboard;
@ -71,28 +72,33 @@ public class UsageRepository {
return new EntityUsage().withUsage(usageDetails).withEntity(ref);
}
@Transaction
public RestUtil.PutResponse<?> create(String entityType, UUID id, DailyCount usage) {
// Validate data entity for which usage is being collected
Entity.getEntityReferenceById(entityType, id, Include.NON_DELETED);
return addUsage(POST, entityType, id, usage);
}
@Transaction
public RestUtil.PutResponse<?> createByName(String entityType, String fullyQualifiedName, DailyCount usage) {
EntityReference ref = Entity.getEntityReferenceByName(entityType, fullyQualifiedName, Include.NON_DELETED);
return addUsage(POST, entityType, ref.getId(), usage);
}
@Transaction
public RestUtil.PutResponse<?> createOrUpdate(String entityType, UUID id, DailyCount usage) {
// Validate data entity for which usage is being collected
Entity.getEntityReferenceById(entityType, id, Include.NON_DELETED);
return addUsage(PUT, entityType, id, usage);
}
@Transaction
public RestUtil.PutResponse<?> createOrUpdateByName(String entityType, String fullyQualifiedName, DailyCount usage) {
EntityReference ref = Entity.getEntityReferenceByName(entityType, fullyQualifiedName, Include.NON_DELETED);
return addUsage(PUT, entityType, ref.getId(), usage);
}
@Transaction
public void computePercentile(String entityType, String date) {
dao.usageDAO().computePercentile(entityType, date);
}

View File

@ -37,6 +37,7 @@ import javax.ws.rs.core.UriInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.csv.EntityCsv;
import org.openmetadata.schema.api.teams.CreateTeam.TeamType;
import org.openmetadata.schema.entity.teams.AuthenticationMechanism;
@ -498,6 +499,7 @@ public class UserRepository extends EntityRepository<User> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
updateRoles(original, updated);

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.service.Entity.WORKFLOW;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.service.Entity;
@ -80,6 +81,7 @@ public class WorkflowRepository extends EntityRepository<Workflow> {
super(original, updated, operation);
}
@Transaction
@Override
public void entitySpecificUpdate() {
recordChange("status", original.getStatus(), updated.getStatus());

View File

@ -1,23 +0,0 @@
package org.openmetadata.service.jdbi3.unitofwork;
import java.util.concurrent.ThreadFactory;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
public interface JdbiHandleManager {
Jdbi getJdbi();
Handle get();
boolean handleExists();
void clear();
default ThreadFactory createThreadFactory() {
throw new UnsupportedOperationException("Thread factory creation is not supported");
}
default String getConversationId() {
return String.valueOf(Thread.currentThread().getId());
}
}

View File

@ -1,100 +0,0 @@
package org.openmetadata.service.jdbi3.unitofwork;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Handles;
@Slf4j
public class JdbiTransactionManager {
private static JdbiTransactionManager instance;
private static volatile boolean initialized = false;
private final JdbiHandleManager handleManager;
private final Set<Integer> IN_TRANSACTION_HANDLES = Collections.newSetFromMap(new ConcurrentHashMap<>());
private JdbiTransactionManager(JdbiHandleManager handleManager) {
this.handleManager = handleManager;
}
public static void initialize(JdbiHandleManager handleManager) {
if (!initialized) {
instance = new JdbiTransactionManager(handleManager);
initialized = true;
} else {
LOG.info("Jdbi Transaction Manager is already initialized");
}
}
public static JdbiTransactionManager getInstance() {
return instance;
}
public void begin(boolean autoCommit) {
try {
Handle handle = handleManager.get();
if (!autoCommit) {
handle.getConnection().setAutoCommit(false);
handle.getConfig(Handles.class).setForceEndTransactions(false);
handle.begin();
IN_TRANSACTION_HANDLES.add(handle.hashCode());
LOG.debug(
"Begin Transaction Thread Id [{}] has handle id [{}] Transaction {} Level {}",
Thread.currentThread().getId(),
handle.hashCode(),
handle.isInTransaction(),
handle.getTransactionIsolationLevel());
}
} catch (Exception ex) {
terminateHandle();
}
}
public void commit() {
if (handleManager.handleExists()) {
Handle handle = handleManager.get();
try {
handle.getConnection().commit();
LOG.debug(
"Performing commit Thread Id [{}] has handle id [{}] Transaction {} Level {}",
Thread.currentThread().getId(),
handle.hashCode(),
handle.isInTransaction(),
handle.getTransactionIsolationLevel());
} catch (Exception ex) {
rollback();
}
}
}
public void rollback() {
if (handleManager.handleExists()) {
Handle handle = handleManager.get();
if (handle == null) {
LOG.debug("Handle was found to be null during rollback for [{}]", Thread.currentThread().getId());
return;
}
try {
handle.getConnection().rollback();
LOG.debug(
"Performed rollback on Thread Id [{}] has handle id [{}] Transaction {} Level {}",
Thread.currentThread().getId(),
handle.hashCode(),
handle.isInTransaction(),
handle.getTransactionIsolationLevel());
} catch (Exception e) {
LOG.debug("Failed to rollback transaction due to", e);
} finally {
terminateHandle();
}
}
}
public void terminateHandle() {
if (handleManager.handleExists()) {
IN_TRANSACTION_HANDLES.remove(handleManager.get().hashCode());
handleManager.clear();
}
}
}

View File

@ -1,33 +0,0 @@
package org.openmetadata.service.jdbi3.unitofwork;
import java.util.Set;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.server.monitoring.ApplicationEvent;
import org.glassfish.jersey.server.monitoring.ApplicationEventListener;
import org.glassfish.jersey.server.monitoring.RequestEvent;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
@Slf4j
public class JdbiUnitOfWorkApplicationEventListener implements ApplicationEventListener {
private final Set<String> excludedPaths;
public JdbiUnitOfWorkApplicationEventListener(Set<String> excludedPaths) {
this.excludedPaths = excludedPaths;
}
@Override
public void onEvent(ApplicationEvent event) {
LOG.debug("Received Application event {}", event.getType());
}
@Override
@Nullable
public RequestEventListener onRequest(RequestEvent event) {
String path = event.getUriInfo().getPath();
if (excludedPaths.stream().anyMatch(path::contains)) {
return null;
}
return new NonHttpGetRequestJdbiUnitOfWorkEventListener();
}
}

View File

@ -1,69 +0,0 @@
package org.openmetadata.service.jdbi3.unitofwork;
import com.google.common.reflect.Reflection;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
@Slf4j
@SuppressWarnings({"UnstableApiUsage", "rawtypes", "unchecked"})
public class JdbiUnitOfWorkProvider {
private static JdbiUnitOfWorkProvider instance;
private static volatile boolean initialized = false;
private final JdbiHandleManager handleManager;
private JdbiUnitOfWorkProvider(JdbiHandleManager handleManager) {
this.handleManager = handleManager;
}
public static JdbiUnitOfWorkProvider withDefault(Jdbi dbi) {
return initialize(new RequestScopedJdbiHandleManager(dbi));
}
public static JdbiUnitOfWorkProvider withLinked(Jdbi dbi) {
return initialize(new LinkedRequestScopedJdbiHandleManager(dbi));
}
public static JdbiUnitOfWorkProvider getInstance() {
return instance;
}
private static JdbiUnitOfWorkProvider initialize(JdbiHandleManager handleManager) {
if (!initialized) {
instance = new JdbiUnitOfWorkProvider(handleManager);
initialized = true;
} else {
LOG.info("JdbiUnitOfWorkProvider is already initialized");
}
return instance;
}
public JdbiHandleManager getHandleManager() {
return handleManager;
}
public Handle getHandle() {
return handleManager.get();
}
/**
* getWrappedInstanceForDaoClass generates a proxy instance of the dao class for which the jdbi unit of work aspect
* would be wrapped around with. This method however may be used in case the classpath scanning is disabled. If the
* original class is null or contains no relevant JDBI annotations, this method throws an exception
*
* @param daoClass the DAO class for which a proxy needs to be created fo
* @return the wrapped instance ready to be passed around
*/
public static Object getWrappedInstanceForDaoClass(Class daoClass) {
if (daoClass == null) {
throw new IllegalArgumentException("DAO Class cannot be null");
}
LOG.debug(
"Binding class [{}] with proxy handler [{}] ",
daoClass.getSimpleName(),
JdbiUnitOfWorkProvider.getInstance().getHandleManager().getClass().getSimpleName());
ManagedHandleInvocationHandler handler = new ManagedHandleInvocationHandler<>(daoClass);
Object proxiedInstance = Reflection.newProxy(daoClass, handler);
return daoClass.cast(proxiedInstance);
}
}

View File

@ -1,106 +0,0 @@
package org.openmetadata.service.jdbi3.unitofwork;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
/**
* This implementation provides a handle scoped to a thread and all other threads Y spawned from X All Y threads must
* follow a particular name format extracted from the conversation id This is one of the ways the manager can know of
* the grouping and establish re-usability of handles across such grouped threads. <br>
* <br>
* It can be used to service requests where only a single handle instance has to be used by multiple threads that are
* spawned with the specified name format from an initial thread. Use this only when you have complete control over the
* threads you create. The threads must not run once the parent thread is returned to the pool or else the handles will
* be invalid or in other words parent thread must block on the results of children. <br>
* It relies on the fact that the {@code Jdbi.Handle} is inherently thread safe and can be used to service dao requests
* between multiple threads. Note: Not suitable when you can not set the name format for the newly spawned threads.
*/
@Slf4j
class LinkedRequestScopedJdbiHandleManager implements JdbiHandleManager {
private final Map<String, Handle> parentThreadHandleMap = new ConcurrentHashMap<>();
private final Jdbi dbi;
public LinkedRequestScopedJdbiHandleManager(Jdbi dbi) {
this.dbi = dbi;
}
@Override
public Jdbi getJdbi() {
return dbi;
}
@Override
public Handle get() {
String parent = substringBetween(Thread.currentThread().getName());
Handle handle;
if (parent == null) {
handle = getHandle();
LOG.debug("Owner of handle [{}] : Parent Thread Id [{}]", handle.hashCode(), Thread.currentThread().getId());
} else {
handle = parentThreadHandleMap.get(parent);
if (handle == null) {
throw new IllegalStateException(
String.format(
"Handle to be reused in child thread [%s] is null for parent thread [%s]",
Thread.currentThread().getId(), parent));
}
LOG.debug("Reusing parent thread handle [{}] for [{}]", handle.hashCode(), Thread.currentThread().getId());
}
return handle;
}
@Override
public boolean handleExists() {
// TODO
return false;
}
@Override
public void clear() {
String parent = getConversationId();
Handle handle = parentThreadHandleMap.get(parent);
if (handle != null) {
handle.close();
LOG.debug("Closed handle Thread Id [{}] has handle id [{}]", Thread.currentThread().getId(), handle.hashCode());
parentThreadHandleMap.remove(parent);
LOG.debug("Clearing handle member for parent thread [{}] ", Thread.currentThread().getId());
}
}
@Override
public ThreadFactory createThreadFactory() {
String threadName = String.format("[%s]-%%d", getConversationId());
return new ThreadFactoryBuilder().setNameFormat(threadName).build();
}
private Handle getHandle() {
String threadIdentity = getConversationId();
if (parentThreadHandleMap.containsKey(threadIdentity)) {
return parentThreadHandleMap.get(threadIdentity);
}
Handle handle = dbi.open();
parentThreadHandleMap.putIfAbsent(threadIdentity, handle);
return handle;
}
@Nullable
private String substringBetween(String threadName) {
final int start = threadName.indexOf("[");
if (start != -1) {
final int end = threadName.indexOf("]", start + "[".length());
if (end != -1) {
return threadName.substring(start + "[".length(), end);
}
}
return null;
}
}

View File

@ -1,83 +0,0 @@
package org.openmetadata.service.jdbi3.unitofwork;
import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.sqlobject.CreateSqlObject;
import org.openmetadata.service.jdbi3.CollectionDAO;
@Slf4j
public class ManagedHandleInvocationHandler<T> implements InvocationHandler {
private static final Object[] NO_ARGS = {};
private final Class<T> underlying;
public ManagedHandleInvocationHandler(Class<T> underlying) {
this.underlying = underlying;
}
/**
* {@inheritDoc}
*
* <ul>
* <li>{@code proxy.toString()} delegates to {@link ManagedHandleInvocationHandler#toString}
* <li>other method calls are dispatched to {@link ManagedHandleInvocationHandler#handleInvocation}.
* </ul>
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (args == null) {
args = NO_ARGS;
}
if (args.length == 0 && method.getName().equals("toString")) {
return toString();
}
return handleInvocation(method, args);
}
private Object handleInvocation(Method method, Object[] args) throws Throwable {
if (CollectionDAO.class.isAssignableFrom(underlying) && method.isAnnotationPresent(CreateSqlObject.class)) {
return getWrappedInstanceForDaoClass(method.getReturnType());
} else {
Object dao;
Object result;
if (JdbiUnitOfWorkProvider.getInstance().getHandleManager().handleExists()) {
Handle handle = JdbiUnitOfWorkProvider.getInstance().getHandle();
LOG.debug(
"{}.{} [{}] Thread Id [{}] with handle id [{}]",
method.getDeclaringClass().getSimpleName(),
method.getName(),
underlying.getSimpleName(),
Thread.currentThread().getId(),
handle.hashCode());
dao = handle.attach(underlying);
result = invokeMethod(method, dao, args);
} else {
// This is non-transactional request
Handle handle = JdbiUnitOfWorkProvider.getInstance().getHandleManager().getJdbi().open();
try (handle) {
handle.getConnection().setAutoCommit(true);
dao = handle.attach(underlying);
result = invokeMethod(method, dao, args);
}
}
return result;
}
}
private Object invokeMethod(Method method, Object dao, Object[] args) throws Throwable {
try {
return method.invoke(dao, args);
} catch (Exception ex) {
throw ex.getCause();
}
}
@Override
public String toString() {
return "Proxy[" + underlying.getSimpleName() + "]";
}
}

View File

@ -1,40 +0,0 @@
package org.openmetadata.service.jdbi3.unitofwork;
import javax.ws.rs.HttpMethod;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.server.monitoring.RequestEvent;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
@Slf4j
class NonHttpGetRequestJdbiUnitOfWorkEventListener implements RequestEventListener {
NonHttpGetRequestJdbiUnitOfWorkEventListener() {}
@Override
public void onEvent(RequestEvent event) {
RequestEvent.Type type = event.getType();
String httpMethod = event.getContainerRequest().getMethod();
LOG.debug("Handling {} Request Event {} {}", httpMethod, type, Thread.currentThread().getId());
boolean isTransactional = isTransactional(event);
if (isTransactional) {
if (type == RequestEvent.Type.RESOURCE_METHOD_START) {
JdbiTransactionManager.getInstance().begin(false);
} else if (type == RequestEvent.Type.RESP_FILTERS_START) {
JdbiTransactionManager.getInstance().commit();
} else if (type == RequestEvent.Type.ON_EXCEPTION) {
JdbiTransactionManager.getInstance().rollback();
} else if (type == RequestEvent.Type.FINISHED) {
JdbiTransactionManager.getInstance().terminateHandle();
}
}
}
private boolean isTransactional(RequestEvent event) {
String httpMethod = event.getContainerRequest().getMethod();
return httpMethod.equals(HttpMethod.POST)
|| httpMethod.equals(HttpMethod.PUT)
|| httpMethod.equals(HttpMethod.PATCH)
|| httpMethod.equals(HttpMethod.DELETE);
}
}

View File

@ -1,49 +0,0 @@
package org.openmetadata.service.jdbi3.unitofwork;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
@Slf4j
class RequestScopedJdbiHandleManager implements JdbiHandleManager {
private final Jdbi dbi;
@SuppressWarnings("ThreadLocalUsage")
private final ThreadLocal<Handle> threadLocal = new ThreadLocal<>();
public RequestScopedJdbiHandleManager(Jdbi dbi) {
this.dbi = dbi;
}
@Override
public Jdbi getJdbi() {
return dbi;
}
@Override
public Handle get() {
if (threadLocal.get() == null) {
threadLocal.set(dbi.open());
}
Handle handle = threadLocal.get();
LOG.debug("handle [{}] : Thread Id [{}]", handle.hashCode(), Thread.currentThread().getId());
return handle;
}
@Override
public boolean handleExists() {
return threadLocal.get() != null;
}
@Override
public void clear() {
Handle handle = threadLocal.get();
if (handle != null) {
handle.close();
LOG.debug("Closed handle Thread Id [{}] has handle id [{}]", Thread.currentThread().getId(), handle.hashCode());
threadLocal.remove();
LOG.debug("Clearing handle member for thread [{}] ", Thread.currentThread().getId());
}
}
}

View File

@ -58,7 +58,6 @@ import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.jdbi3.AppRepository;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;
import org.openmetadata.service.resources.Collection;
import org.openmetadata.service.resources.EntityResource;
import org.openmetadata.service.search.SearchRepository;
@ -89,7 +88,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
PipelineServiceClientFactory.createPipelineServiceClient(config.getPipelineServiceClientConfiguration());
// Create an On Demand DAO
CollectionDAO dao = JdbiUnitOfWorkProvider.getInstance().getHandle().getJdbi().onDemand(CollectionDAO.class);
CollectionDAO dao = Entity.getCollectionDAO();
searchRepository = new SearchRepository(config.getElasticSearchConfiguration());
try {
@ -119,10 +118,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
// Schedule
if (app != null && app.getScheduleType().equals(ScheduleType.Scheduled)) {
ApplicationHandler.scheduleApplication(
app,
JdbiUnitOfWorkProvider.getInstance().getHandle().getJdbi().onDemand(CollectionDAO.class),
searchRepository);
ApplicationHandler.scheduleApplication(app, Entity.getCollectionDAO(), searchRepository);
}
} catch (Exception ex) {
@ -373,10 +369,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
uriInfo, create.getName(), new EntityUtil.Fields(repository.getMarketPlace().getAllowedFields()));
App app = getApplication(definition, create, securityContext.getUserPrincipal().getName());
if (app.getScheduleType().equals(ScheduleType.Scheduled)) {
ApplicationHandler.scheduleApplication(
app,
JdbiUnitOfWorkProvider.getInstance().getHandle().getJdbi().onDemand(CollectionDAO.class),
searchRepository);
ApplicationHandler.scheduleApplication(app, Entity.getCollectionDAO(), searchRepository);
}
return create(uriInfo, securityContext, app);
}
@ -427,10 +420,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
App app = getApplication(definition, create, securityContext.getUserPrincipal().getName());
AppScheduler.getInstance().deleteScheduledApplication(app);
if (app.getScheduleType().equals(ScheduleType.Scheduled)) {
ApplicationHandler.scheduleApplication(
app,
JdbiUnitOfWorkProvider.getInstance().getHandle().getJdbi().onDemand(CollectionDAO.class),
searchRepository);
ApplicationHandler.scheduleApplication(app, Entity.getCollectionDAO(), searchRepository);
}
return createOrUpdate(uriInfo, securityContext, app);
}
@ -513,10 +503,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
@Context SecurityContext securityContext) {
App app = repository.getByName(uriInfo, name, new EntityUtil.Fields(repository.getAllowedFields()));
if (app.getScheduleType().equals(ScheduleType.Scheduled)) {
ApplicationHandler.scheduleApplication(
app,
JdbiUnitOfWorkProvider.getInstance().getHandle().getJdbi().onDemand(CollectionDAO.class),
searchRepository);
ApplicationHandler.scheduleApplication(app, Entity.getCollectionDAO(), searchRepository);
Response.status(Response.Status.OK).entity("App Scheduled to Scheduler successfully.");
}
throw new IllegalArgumentException("App is not of schedule type Scheduled.");
@ -542,10 +529,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
EntityUtil.Fields fields = getFields(String.format("%s,%s", FIELD_OWNER, "bot"));
App app = repository.getByName(uriInfo, name, fields);
if (app.getAppType().equals(AppType.Internal)) {
ApplicationHandler.triggerApplicationOnDemand(
app,
JdbiUnitOfWorkProvider.getInstance().getHandle().getJdbi().onDemand(CollectionDAO.class),
searchRepository);
ApplicationHandler.triggerApplicationOnDemand(app, Entity.getCollectionDAO(), searchRepository);
return Response.status(Response.Status.OK).entity("Application Triggered").build();
} else {
app.setOpenMetadataServerConnection(

View File

@ -15,7 +15,6 @@ package org.openmetadata.service.util;
import static org.openmetadata.service.Entity.TEAM;
import static org.openmetadata.service.Entity.USER;
import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -58,7 +57,7 @@ public class NotificationHandler {
threadScheduler.submit(
() -> {
try {
CollectionDAO collectionDAO = (CollectionDAO) getWrappedInstanceForDaoClass(CollectionDAO.class);
CollectionDAO collectionDAO = Entity.getCollectionDAO();
handleNotifications(responseContext, collectionDAO);
} catch (Exception ex) {
LOG.error("[NotificationHandler] Failed to use mapper in converting to Json", ex);