diff --git a/bootstrap/sql/migrations/native/1.9.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.9.0/mysql/schemaChanges.sql index 321674dfadd..e1f1475677e 100644 --- a/bootstrap/sql/migrations/native/1.9.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.9.0/mysql/schemaChanges.sql @@ -70,4 +70,34 @@ CREATE TABLE IF NOT EXISTS worksheet_entity ( PRIMARY KEY (id), UNIQUE KEY worksheet_entity_fqn_hash (fqnHash), KEY idx_worksheet_spreadsheet_fqn (spreadsheetFqn) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; + +-- Performance optimization indexes for entity_relationship table +-- These indexes improve cascade deletion performance +CREATE INDEX idx_entity_rel_from_delete +ON entity_relationship(fromId, fromEntity, toId, toEntity, relation); + +CREATE INDEX idx_entity_rel_to_delete +ON entity_relationship(toId, toEntity, fromId, fromEntity, relation); + +-- Index for cascade queries +CREATE INDEX idx_entity_rel_cascade +ON entity_relationship(fromId, relation, toEntity, toId); + +-- Entity deletion lock table for preventing orphaned entities during cascade deletion +CREATE TABLE IF NOT EXISTS entity_deletion_lock ( + id VARCHAR(36) NOT NULL DEFAULT (UUID()), + entityId VARCHAR(36) NOT NULL, + entityType VARCHAR(256) NOT NULL, + entityFqn VARCHAR(2048) NOT NULL, + lockType VARCHAR(50) NOT NULL, -- 'DELETE_IN_PROGRESS', 'DELETE_SCHEDULED' + lockedBy VARCHAR(256) NOT NULL, + lockedAt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + expectedCompletion TIMESTAMP NULL, + deletionScope VARCHAR(50), -- 'ENTITY_ONLY', 'CASCADE' + metadata JSON, + PRIMARY KEY (id), + UNIQUE KEY entity_deletion_lock_unique (entityId, entityType), + INDEX idx_deletion_lock_fqn (entityFqn(255)), + INDEX idx_deletion_lock_time (lockedAt) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; \ No newline at end of file diff --git a/bootstrap/sql/migrations/native/1.9.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.9.0/postgres/schemaChanges.sql index ae99e4a4d0c..5370dce54d9 100644 --- a/bootstrap/sql/migrations/native/1.9.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.9.0/postgres/schemaChanges.sql @@ -98,4 +98,39 @@ CREATE INDEX IF NOT EXISTS idx_worksheet_spreadsheet_fqn ON worksheet_entity (sp CREATE INDEX IF NOT EXISTS idx_directory_service ON directory_entity ((json -> 'service' ->> 'id')); CREATE INDEX IF NOT EXISTS idx_file_directory ON file_entity ((json -> 'directory' ->> 'id')); CREATE INDEX IF NOT EXISTS idx_spreadsheet_directory ON spreadsheet_entity ((json -> 'directory' ->> 'id')); -CREATE INDEX IF NOT EXISTS idx_worksheet_spreadsheet ON worksheet_entity ((json -> 'spreadsheet' ->> 'id')); \ No newline at end of file +CREATE INDEX IF NOT EXISTS idx_worksheet_spreadsheet ON worksheet_entity ((json -> 'spreadsheet' ->> 'id')); + +-- Performance optimization indexes for entity_relationship table +-- These indexes improve cascade deletion performance +CREATE INDEX IF NOT EXISTS idx_entity_rel_from_delete +ON entity_relationship(fromid, fromentity, toid, toentity, relation); + +CREATE INDEX IF NOT EXISTS idx_entity_rel_to_delete +ON entity_relationship(toid, toentity, fromid, fromentity, relation); + +-- Index for cascade queries (CONTAINS and PARENT_OF relationships only) +-- PostgreSQL supports partial indexes +CREATE INDEX IF NOT EXISTS idx_entity_rel_cascade +ON entity_relationship(fromid, relation, toentity, toid) +WHERE relation IN (0, 8); + +-- Entity deletion lock table for preventing orphaned entities during cascade deletion +CREATE TABLE IF NOT EXISTS entity_deletion_lock ( + id UUID NOT NULL DEFAULT gen_random_uuid(), + entityId UUID NOT NULL, + entityType VARCHAR(256) NOT NULL, + entityFqn VARCHAR(2048) NOT NULL, + lockType VARCHAR(50) NOT NULL, -- 'DELETE_IN_PROGRESS', 'DELETE_SCHEDULED' + lockedBy VARCHAR(256) NOT NULL, + lockedAt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + expectedCompletion TIMESTAMP NULL, + deletionScope VARCHAR(50), -- 'ENTITY_ONLY', 'CASCADE' + metadata JSONB, + PRIMARY KEY (id), + UNIQUE (entityId, entityType) +); + +-- Create indexes for deletion lock table +-- Use btree index for entityFqn prefix matching +CREATE INDEX IF NOT EXISTS idx_deletion_lock_fqn ON entity_deletion_lock(entityFqn); +CREATE INDEX IF NOT EXISTS idx_deletion_lock_time ON entity_deletion_lock(lockedAt); \ No newline at end of file diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedCollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedCollectionDAO.java index 889ac20fb34..ff8ce3e451d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedCollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedCollectionDAO.java @@ -16,6 +16,7 @@ package org.openmetadata.service.cache; import lombok.extern.slf4j.Slf4j; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.CollectionDAO.*; +import org.openmetadata.service.jdbi3.DeletionLockDAO; import org.openmetadata.service.jdbi3.IndexMappingVersionDAO; /** @@ -463,6 +464,11 @@ public class CachedCollectionDAO implements CollectionDAO { return delegate.worksheetDAO(); } + @Override + public DeletionLockDAO deletionLockDAO() { + return delegate.deletionLockDAO(); + } + @Override public IndexMappingVersionDAO indexMappingVersionDAO() { return delegate.indexMappingVersionDAO(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedEntityRelationshipDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedEntityRelationshipDAO.java index 37d70e92618..1cdb3e36c59 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedEntityRelationshipDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedEntityRelationshipDAO.java @@ -718,4 +718,45 @@ public class CachedEntityRelationshipDAO implements CollectionDAO.EntityRelation LOG.debug("Prefetch failed for {} ({}): {}", entityId, entityType, e.getMessage()); } } + + // Delegate new batch deletion methods to the underlying implementation + @Override + public void deleteAllFrom(UUID id, String entity) { + delegate.deleteAllFrom(id, entity); + evictEntityFromCache(id, entity); + } + + @Override + public void deleteAllTo(UUID id, String entity) { + delegate.deleteAllTo(id, entity); + evictEntityFromCache(id, entity); + } + + @Override + public void batchDeleteFrom(List ids, String entityType) { + delegate.batchDeleteFrom(ids, entityType); + // Evict cache for all entities in the batch + for (String idStr : ids) { + try { + UUID id = UUID.fromString(idStr); + evictEntityFromCache(id, entityType); + } catch (IllegalArgumentException e) { + LOG.warn("Invalid UUID in batch delete: {}", idStr); + } + } + } + + @Override + public void batchDeleteTo(List ids, String entityType) { + delegate.batchDeleteTo(ids, entityType); + // Evict cache for all entities in the batch + for (String idStr : ids) { + try { + UUID id = UUID.fromString(idStr); + evictEntityFromCache(id, entityType); + } catch (IllegalArgumentException e) { + LOG.warn("Invalid UUID in batch delete: {}", idStr); + } + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/exception/EntityLockedException.java b/openmetadata-service/src/main/java/org/openmetadata/service/exception/EntityLockedException.java new file mode 100644 index 00000000000..68e041d429e --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/exception/EntityLockedException.java @@ -0,0 +1,34 @@ +package org.openmetadata.service.exception; + +import jakarta.ws.rs.core.Response; +import org.openmetadata.sdk.exception.WebServiceException; + +/** + * Exception thrown when an entity operation is blocked due to an active deletion lock + * on the entity or its parent. + */ +public class EntityLockedException extends WebServiceException { + + private static final String ERROR_TYPE = "ENTITY_LOCKED"; + + public EntityLockedException(String message) { + super(Response.Status.CONFLICT, ERROR_TYPE, message); + } + + public EntityLockedException(String message, Throwable cause) { + super(Response.Status.CONFLICT, ERROR_TYPE, message, cause); + } + + public static EntityLockedException byMessage( + String entityType, String entityName, String reason) { + return new EntityLockedException( + String.format("Cannot modify %s '%s': %s", entityType, entityName, reason)); + } + + public static EntityLockedException parentBeingDeleted(String entityFqn, String parentFqn) { + return new EntityLockedException( + String.format( + "Cannot create or modify entity '%s' because parent '%s' is being deleted", + entityFqn, parentFqn)); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/initialization/LockManagerInitializer.java b/openmetadata-service/src/main/java/org/openmetadata/service/initialization/LockManagerInitializer.java new file mode 100644 index 00000000000..93cab31a2be --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/initialization/LockManagerInitializer.java @@ -0,0 +1,71 @@ +package org.openmetadata.service.initialization; + +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.EntityRepository; +import org.openmetadata.service.lock.HierarchicalLockManager; + +/** + * Initializes the hierarchical lock manager for OpenMetadata entity deletion optimization. + * This should be called during application startup. + */ +@Slf4j +public class LockManagerInitializer { + + private static volatile boolean initialized = false; + + /** + * Initialize the lock manager. This method is safe to call multiple times. + */ + public static void initialize() { + if (initialized) { + return; + } + + synchronized (LockManagerInitializer.class) { + if (initialized) { + return; + } + + try { + LOG.info("Initializing hierarchical lock manager for entity deletion optimization"); + + // Get the collection DAO + var collectionDAO = Entity.getCollectionDAO(); + if (collectionDAO == null) { + LOG.warn("CollectionDAO not available, skipping lock manager initialization"); + return; + } + + // Initialize the lock manager + HierarchicalLockManager lockManager = + new HierarchicalLockManager(collectionDAO.deletionLockDAO()); + + // Set it on EntityRepository + EntityRepository.setLockManager(lockManager); + + initialized = true; + LOG.info("Hierarchical lock manager initialized successfully"); + + } catch (Exception e) { + LOG.error("Failed to initialize hierarchical lock manager: {}", e.getMessage(), e); + // Continue without locking for backward compatibility + } + } + } + + /** + * Check if the lock manager is initialized + */ + public static boolean isInitialized() { + return initialized; + } + + /** + * Force re-initialization (for testing) + */ + public static void forceReinitialize() { + initialized = false; + initialize(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index f697386add3..d2dbb9ce4ca 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -415,6 +415,9 @@ public interface CollectionDAO { @CreateSqlObject WorkflowInstanceStateTimeSeriesDAO workflowInstanceStateTimeSeriesDAO(); + @CreateSqlObject + DeletionLockDAO deletionLockDAO(); + interface DashboardDAO extends EntityDAO { @Override default String getTableName() { @@ -1606,10 +1609,47 @@ public interface CollectionDAO { @Bind("toEntity") String toEntity, @Bind("relation") int relation); + // Optimized deleteAll implementation that splits OR query for better performance + @Transaction + default void deleteAll(UUID id, String entity) { + // Split OR query into two separate deletes for better index usage + deleteAllFrom(id, entity); + deleteAllTo(id, entity); + } + + @SqlUpdate("DELETE FROM entity_relationship WHERE fromId = :id AND fromEntity = :entity") + void deleteAllFrom(@BindUUID("id") UUID id, @Bind("entity") String entity); + + @SqlUpdate("DELETE FROM entity_relationship WHERE toId = :id AND toEntity = :entity") + void deleteAllTo(@BindUUID("id") UUID id, @Bind("entity") String entity); + + // Batch deletion methods for improved performance + @Transaction + default void batchDeleteRelationships(List entityIds, String entityType) { + if (entityIds == null || entityIds.isEmpty()) { + return; + } + + // Process in chunks of 500 to avoid hitting database query limits + int batchSize = 500; + for (int i = 0; i < entityIds.size(); i += batchSize) { + int endIndex = Math.min(i + batchSize, entityIds.size()); + List batch = + entityIds.subList(i, endIndex).stream() + .map(UUID::toString) + .collect(Collectors.toList()); + + batchDeleteFrom(batch, entityType); + batchDeleteTo(batch, entityType); + } + } + @SqlUpdate( - "DELETE from entity_relationship WHERE (toId = :id AND toEntity = :entity) OR " - + "(fromId = :id AND fromEntity = :entity)") - void deleteAll(@BindUUID("id") UUID id, @Bind("entity") String entity); + "DELETE FROM entity_relationship WHERE fromId IN () AND fromEntity = :entityType") + void batchDeleteFrom(@BindList("ids") List ids, @Bind("entityType") String entityType); + + @SqlUpdate("DELETE FROM entity_relationship WHERE toId IN () AND toEntity = :entityType") + void batchDeleteTo(@BindList("ids") List ids, @Bind("entityType") String entityType); @SqlUpdate( "DELETE FROM entity_relationship " diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DeletionLock.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DeletionLock.java new file mode 100644 index 00000000000..c4e731700f8 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DeletionLock.java @@ -0,0 +1,114 @@ +package org.openmetadata.service.jdbi3; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.JsonNode; +import java.time.Instant; +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Entity representing a deletion lock that prevents concurrent modifications + * during cascade deletion operations. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class DeletionLock { + + private UUID id; + private UUID entityId; + private String entityType; + private String entityFqn; + private String lockType; // DELETE_IN_PROGRESS, DELETE_SCHEDULED + private String lockedBy; + private Instant lockedAt; + private Instant expectedCompletion; + private String deletionScope; // ENTITY_ONLY, CASCADE + private JsonNode metadata; + + public enum LockType { + DELETE_IN_PROGRESS("DELETE_IN_PROGRESS"), + DELETE_SCHEDULED("DELETE_SCHEDULED"); + + private final String value; + + LockType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + } + + public enum DeletionScope { + ENTITY_ONLY("ENTITY_ONLY"), + CASCADE("CASCADE"); + + private final String value; + + DeletionScope(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + } + + // Builder helper methods + public DeletionLock withId(UUID id) { + this.id = id; + return this; + } + + public DeletionLock withEntityId(UUID entityId) { + this.entityId = entityId; + return this; + } + + public DeletionLock withEntityType(String entityType) { + this.entityType = entityType; + return this; + } + + public DeletionLock withEntityFqn(String entityFqn) { + this.entityFqn = entityFqn; + return this; + } + + public DeletionLock withLockType(String lockType) { + this.lockType = lockType; + return this; + } + + public DeletionLock withLockedBy(String lockedBy) { + this.lockedBy = lockedBy; + return this; + } + + public DeletionLock withLockedAt(Instant lockedAt) { + this.lockedAt = lockedAt; + return this; + } + + public DeletionLock withExpectedCompletion(Instant expectedCompletion) { + this.expectedCompletion = expectedCompletion; + return this; + } + + public DeletionLock withDeletionScope(String deletionScope) { + this.deletionScope = deletionScope; + return this; + } + + public DeletionLock withMetadata(JsonNode metadata) { + this.metadata = metadata; + return this; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DeletionLockDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DeletionLockDAO.java new file mode 100644 index 00000000000..7923c598f8f --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DeletionLockDAO.java @@ -0,0 +1,59 @@ +package org.openmetadata.service.jdbi3; + +import java.time.Instant; +import java.util.List; +import java.util.UUID; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.customizer.BindBean; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; +import org.openmetadata.service.util.jdbi.BindUUID; + +/** + * DAO for managing entity deletion locks to prevent concurrent modifications + * during cascade deletion operations. + */ +@RegisterRowMapper(DeletionLockMapper.class) +public interface DeletionLockDAO { + + @SqlUpdate( + "INSERT INTO entity_deletion_lock (id, entityId, entityType, entityFqn, lockType, " + + "lockedBy, lockedAt, expectedCompletion, deletionScope, metadata) " + + "VALUES (:id, :entityId, :entityType, :entityFqn, :lockType, " + + ":lockedBy, :lockedAt, :expectedCompletion, :deletionScope, :metadata)") + void insert(@BindBean DeletionLock lock); + + @SqlQuery( + "SELECT * FROM entity_deletion_lock WHERE entityId = :entityId AND entityType = :entityType") + DeletionLock findByEntity( + @BindUUID("entityId") UUID entityId, @Bind("entityType") String entityType); + + @SqlQuery("SELECT * FROM entity_deletion_lock WHERE entityFqn LIKE :fqnPrefix || '%'") + List findByFqnPrefix(@Bind("fqnPrefix") String fqnPrefix); + + @SqlQuery( + "SELECT * FROM entity_deletion_lock WHERE " + + "entityFqn = :fqn OR " + + ":fqn LIKE entityFqn || '.%'") + List findParentLocks(@Bind("fqn") String fqn); + + @SqlQuery("SELECT * FROM entity_deletion_lock WHERE lockedAt < :staleTime") + List findStaleLocks(@Bind("staleTime") Instant staleTime); + + @SqlUpdate("DELETE FROM entity_deletion_lock WHERE id = :id") + void delete(@BindUUID("id") UUID id); + + @SqlUpdate( + "DELETE FROM entity_deletion_lock WHERE entityId = :entityId AND entityType = :entityType") + void deleteByEntity(@BindUUID("entityId") UUID entityId, @Bind("entityType") String entityType); + + @SqlUpdate("DELETE FROM entity_deletion_lock WHERE lockedAt < :staleTime") + int deleteStaleLocks(@Bind("staleTime") Instant staleTime); + + @SqlQuery("SELECT COUNT(*) FROM entity_deletion_lock") + int countActiveLocks(); + + @SqlQuery("SELECT COUNT(*) FROM entity_deletion_lock WHERE entityType = :entityType") + int countActiveLocksByType(@Bind("entityType") String entityType); +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DeletionLockMapper.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DeletionLockMapper.java new file mode 100644 index 00000000000..48397131ebe --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DeletionLockMapper.java @@ -0,0 +1,54 @@ +package org.openmetadata.service.jdbi3; + +import com.fasterxml.jackson.databind.JsonNode; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.UUID; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; +import org.openmetadata.schema.utils.JsonUtils; + +/** + * JDBI Row mapper for DeletionLock entity + */ +public class DeletionLockMapper implements RowMapper { + + @Override + public DeletionLock map(ResultSet rs, StatementContext ctx) throws SQLException { + DeletionLock lock = new DeletionLock(); + + lock.setId(UUID.fromString(rs.getString("id"))); + lock.setEntityId(UUID.fromString(rs.getString("entityId"))); + lock.setEntityType(rs.getString("entityType")); + lock.setEntityFqn(rs.getString("entityFqn")); + lock.setLockType(rs.getString("lockType")); + lock.setLockedBy(rs.getString("lockedBy")); + + // Convert Timestamp to Instant + Timestamp lockedAtTs = rs.getTimestamp("lockedAt"); + if (lockedAtTs != null) { + lock.setLockedAt(lockedAtTs.toInstant()); + } + + Timestamp expectedCompletionTs = rs.getTimestamp("expectedCompletion"); + if (expectedCompletionTs != null) { + lock.setExpectedCompletion(expectedCompletionTs.toInstant()); + } + + lock.setDeletionScope(rs.getString("deletionScope")); + + // Parse JSON metadata + String metadataJson = rs.getString("metadata"); + if (metadataJson != null) { + try { + JsonNode metadata = JsonUtils.readTree(metadataJson); + lock.setMetadata(metadata); + } catch (Exception e) { + // Log error but don't fail + } + } + + return lock; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index 0784e6104e6..2e5f51ecb83 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -182,6 +182,7 @@ import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.TypeRegistry; import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher; import org.openmetadata.service.exception.CatalogExceptionMessage; +import org.openmetadata.service.exception.EntityLockedException; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.exception.PreconditionFailedException; import org.openmetadata.service.exception.UnhandledServerException; @@ -191,6 +192,7 @@ import org.openmetadata.service.jdbi3.CollectionDAO.ExtensionRecord; import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow; import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext; import org.openmetadata.service.jobs.JobDAO; +import org.openmetadata.service.lock.HierarchicalLockManager; import org.openmetadata.service.resources.tags.TagLabelUtil; import org.openmetadata.service.resources.teams.RoleResource; import org.openmetadata.service.search.SearchListFilter; @@ -298,6 +300,14 @@ public abstract class EntityRepository { protected final ChangeSummarizer changeSummarizer; + // Lock manager for preventing orphaned entities during cascade deletion + private static HierarchicalLockManager lockManager; + + // Static setter for lock manager initialization + public static void setLockManager(HierarchicalLockManager manager) { + lockManager = manager; + } + public boolean isSupportsOwners() { return supportsOwners; } @@ -1061,6 +1071,16 @@ public abstract class EntityRepository { } public final T createInternal(T entity) { + // Check if parent entity is being deleted + if (lockManager != null) { + try { + lockManager.checkModificationAllowed(entity); + } catch (EntityLockedException e) { + LOG.warn("Entity creation blocked due to parent deletion: {}", e.getMessage()); + throw e; + } + } + prepareInternal(entity, false); return createNewEntity(entity); } @@ -1127,6 +1147,16 @@ public abstract class EntityRepository { @Transaction public final PutResponse createOrUpdate(UriInfo uriInfo, T updated, String updatedBy) { + // Check if parent entity is being deleted + if (lockManager != null) { + try { + lockManager.checkModificationAllowed(updated); + } catch (EntityLockedException e) { + LOG.warn("Entity update blocked due to parent deletion: {}", e.getMessage()); + throw e; + } + } + T original = findByNameOrNull(updated.getFullyQualifiedName(), ALL); if (original == null) { // If an original entity does not exist then create it, else update return new PutResponse<>( @@ -1539,23 +1569,57 @@ public abstract class EntityRepository { checkSystemEntityDeletion(original); preDelete(original, deletedBy); setFieldsInternal(original, putFields); - deleteChildren(original.getId(), recursive, hardDelete, deletedBy); - EventType changeType; - T updated = get(null, original.getId(), putFields, ALL, false); - if (supportsSoftDelete && !hardDelete) { - updated.setUpdatedBy(deletedBy); - updated.setUpdatedAt(System.currentTimeMillis()); - updated.setDeleted(true); - EntityUpdater updater = getUpdater(original, updated, Operation.SOFT_DELETE, null); - updater.update(); - changeType = ENTITY_SOFT_DELETED; - } else { - cleanup(updated); - changeType = ENTITY_DELETED; + // Acquire deletion lock to prevent concurrent modifications + DeletionLock lock = null; + if (lockManager != null && recursive) { + try { + lock = lockManager.acquireDeletionLock(original, deletedBy, recursive); + LOG.info("Acquired deletion lock for {} {}", entityType, original.getId()); + } catch (Exception e) { + LOG.error( + "Failed to acquire deletion lock for {} {}: {}", + entityType, + original.getId(), + e.getMessage()); + // Continue without lock for backward compatibility + } + } + + try { + deleteChildren(original.getId(), recursive, hardDelete, deletedBy); + + EventType changeType; + T updated = get(null, original.getId(), putFields, ALL, false); + if (supportsSoftDelete && !hardDelete) { + updated.setUpdatedBy(deletedBy); + updated.setUpdatedAt(System.currentTimeMillis()); + updated.setDeleted(true); + EntityUpdater updater = getUpdater(original, updated, Operation.SOFT_DELETE, null); + updater.update(); + changeType = ENTITY_SOFT_DELETED; + } else { + cleanup(updated); + changeType = ENTITY_DELETED; + } + LOG.info("{} deleted {}", hardDelete ? "Hard" : "Soft", updated.getFullyQualifiedName()); + return new DeleteResponse<>(updated, changeType); + + } finally { + // Always release the lock + if (lock != null && lockManager != null) { + try { + lockManager.releaseDeletionLock(original.getId(), entityType); + LOG.info("Released deletion lock for {} {}", entityType, original.getId()); + } catch (Exception e) { + LOG.error( + "Failed to release deletion lock for {} {}: {}", + entityType, + original.getId(), + e.getMessage()); + } + } } - LOG.info("{} deleted {}", hardDelete ? "Hard" : "Soft", updated.getFullyQualifiedName()); - return new DeleteResponse<>(updated, changeType); } @Transaction @@ -1610,18 +1674,122 @@ public abstract class EntityRepository { @Transaction protected void deleteChildren( List children, boolean hardDelete, String updatedBy) { - for (EntityRelationshipRecord entityRelationshipRecord : children) { - LOG.info( - "Recursively {} deleting {} {}", - hardDelete ? "hard" : "soft", - entityRelationshipRecord.getType(), - entityRelationshipRecord.getId()); - Entity.deleteEntity( - updatedBy, - entityRelationshipRecord.getType(), - entityRelationshipRecord.getId(), - true, - hardDelete); + // Use batch deletion only for hard deletes with large numbers of children + // For soft deletes, we must maintain the correct order for restoration to work properly + if (hardDelete && children.size() > 100) { + LOG.info("Using batch deletion for {} children entities", children.size()); + batchDeleteChildren(children, hardDelete, updatedBy); + } else { + // For soft deletes or small numbers, use original sequential deletion + // This ensures proper parent-child relationships are maintained for restoration + for (EntityRelationshipRecord entityRelationshipRecord : children) { + LOG.info( + "Recursively {} deleting {} {}", + hardDelete ? "hard" : "soft", + entityRelationshipRecord.getType(), + entityRelationshipRecord.getId()); + Entity.deleteEntity( + updatedBy, + entityRelationshipRecord.getType(), + entityRelationshipRecord.getId(), + true, + hardDelete); + } + } + } + + /** + * Batch deletion of children entities for improved performance + */ + @Transaction + protected void batchDeleteChildren( + List children, boolean hardDelete, String updatedBy) { + + // Group entities by type for batch processing + Map> entitiesByType = + children.stream() + .collect( + Collectors.groupingBy( + EntityRelationshipRecord::getType, + Collectors.mapping(EntityRelationshipRecord::getId, Collectors.toList()))); + + LOG.info("Batch deleting {} entities across {} types", children.size(), entitiesByType.size()); + + // Process deletion in levels to handle cascading properly + for (Map.Entry> entry : entitiesByType.entrySet()) { + String childEntityType = entry.getKey(); + List entityIds = entry.getValue(); + + LOG.info("Batch processing {} entities of type {}", entityIds.size(), childEntityType); + + // Process in smaller batches to avoid overwhelming the system + int batchSize = 50; + for (int i = 0; i < entityIds.size(); i += batchSize) { + List batch = entityIds.subList(i, Math.min(i + batchSize, entityIds.size())); + processDeletionBatch(batch, childEntityType, hardDelete, updatedBy); + } + } + } + + /** + * Process a batch of entities for deletion + */ + @Transaction + private void processDeletionBatch( + List entityIds, String entityType, boolean hardDelete, String updatedBy) { + + LOG.debug("Processing batch of {} {} entities", entityIds.size(), entityType); + + // First, collect all grandchildren that need to be deleted + List allGrandchildren = new ArrayList<>(); + for (UUID entityId : entityIds) { + List grandchildren = + daoCollection + .relationshipDAO() + .findTo( + entityId, + entityType, + List.of(Relationship.CONTAINS.ordinal(), Relationship.PARENT_OF.ordinal())); + allGrandchildren.addAll(grandchildren); + } + + // Recursively delete grandchildren first + if (!allGrandchildren.isEmpty()) { + LOG.info("Found {} grandchildren to delete first", allGrandchildren.size()); + deleteChildren(allGrandchildren, hardDelete, updatedBy); + } + + // Now batch delete the entities at this level + List stringIds = entityIds.stream().map(UUID::toString).collect(Collectors.toList()); + + // Only delete relationships for hard delete + // For soft delete, relationships must be preserved for restoration + if (hardDelete) { + // Batch delete relationships for all entities + daoCollection.relationshipDAO().batchDeleteFrom(stringIds, entityType); + daoCollection.relationshipDAO().batchDeleteTo(stringIds, entityType); + } + + // Delete or soft-delete the entities themselves + for (UUID entityId : entityIds) { + try { + @SuppressWarnings("rawtypes") + EntityRepository repository = Entity.getEntityRepository(entityType); + if (repository.supportsSoftDelete && !hardDelete) { + // Soft delete + EntityInterface entity = repository.find(entityId, Include.ALL); + entity.setUpdatedBy(updatedBy); + entity.setUpdatedAt(System.currentTimeMillis()); + entity.setDeleted(true); + repository.dao.update(entity); + } else { + // Hard delete + EntityInterface entity = repository.find(entityId, Include.ALL); + repository.cleanup(entity); + } + } catch (Exception e) { + LOG.error("Error deleting entity {} of type {}: {}", entityId, entityType, e.getMessage()); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/lock/HierarchicalLockManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/lock/HierarchicalLockManager.java new file mode 100644 index 00000000000..28759e12688 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/lock/HierarchicalLockManager.java @@ -0,0 +1,308 @@ +package org.openmetadata.service.lock; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.sqlobject.transaction.Transaction; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.service.Entity; +import org.openmetadata.service.exception.EntityLockedException; +import org.openmetadata.service.jdbi3.DeletionLock; +import org.openmetadata.service.jdbi3.DeletionLockDAO; + +/** + * Manages hierarchical locks for entity deletion to prevent orphaned entities + * during cascade deletion operations. + */ +@Slf4j +public class HierarchicalLockManager { + + private final DeletionLockDAO lockDAO; + + // Cache of locked FQN prefixes for fast lookup + private final Cache> fqnCache = + Caffeine.newBuilder().maximumSize(10000).expireAfterWrite(5, TimeUnit.MINUTES).build(); + + // Configuration + private static final int DEFAULT_LOCK_TIMEOUT_HOURS = 1; + private static final int STALE_LOCK_CHECK_INTERVAL_MINUTES = 5; + + public HierarchicalLockManager(DeletionLockDAO lockDAO) { + this.lockDAO = lockDAO; + } + + /** + * Acquire deletion lock for an entity and all its children + */ + @Transaction + public DeletionLock acquireDeletionLock( + EntityInterface entity, String lockedBy, boolean cascade) { + // Check if any parent is already locked + checkParentLocks(entity); + + // Check if this entity is already locked + String entityType = Entity.getEntityTypeFromObject(entity); + DeletionLock existingLock = lockDAO.findByEntity(entity.getId(), entityType); + if (existingLock != null) { + throw new EntityLockedException( + String.format( + "Entity %s is already locked for deletion by %s at %s", + entity.getFullyQualifiedName(), + existingLock.getLockedBy(), + existingLock.getLockedAt())); + } + + // Create lock entry + DeletionLock lock = + DeletionLock.builder() + .id(UUID.randomUUID()) + .entityId(entity.getId()) + .entityType(entityType) + .entityFqn(entity.getFullyQualifiedName()) + .lockType(DeletionLock.LockType.DELETE_IN_PROGRESS.getValue()) + .lockedBy(lockedBy) + .lockedAt(Instant.now()) + .deletionScope( + cascade + ? DeletionLock.DeletionScope.CASCADE.getValue() + : DeletionLock.DeletionScope.ENTITY_ONLY.getValue()) + .expectedCompletion(estimateCompletionTime(entity, cascade)) + .build(); + + try { + lockDAO.insert(lock); + + // Update cache + invalidateFqnCache(entity.getFullyQualifiedName()); + + LOG.info( + "Acquired deletion lock for entity {} by {}", entity.getFullyQualifiedName(), lockedBy); + return lock; + + } catch (Exception e) { + LOG.error( + "Failed to acquire deletion lock for entity {}: {}", + entity.getFullyQualifiedName(), + e.getMessage()); + throw new RuntimeException("Failed to acquire deletion lock", e); + } + } + + /** + * Check if any parent entity has an active deletion lock + */ + private void checkParentLocks(EntityInterface entity) { + String fqn = entity.getFullyQualifiedName(); + List parentFqns = getParentFqns(fqn); + + for (String parentFqn : parentFqns) { + List locks = lockDAO.findByFqnPrefix(parentFqn); + if (!locks.isEmpty()) { + DeletionLock activeLock = locks.get(0); + throw new EntityLockedException( + String.format( + "Cannot modify entity %s. Parent entity %s is being deleted. " + + "Deletion started at %s, expected completion: %s", + fqn, + activeLock.getEntityFqn(), + activeLock.getLockedAt(), + activeLock.getExpectedCompletion())); + } + } + } + + /** + * Check if entity creation/update is allowed (no parent deletion in progress) + */ + public void checkModificationAllowed(EntityInterface entity) { + String fqn = entity.getFullyQualifiedName(); + + // Fast path: check cache first + if (isFqnLocked(fqn)) { + // Slow path: check database for details + checkParentLocks(entity); + } + } + + /** + * Check if entity creation/update is allowed by FQN + */ + public void checkModificationAllowedByFqn(String fqn) { + // Check if any parent FQN has a lock + List parentLocks = lockDAO.findParentLocks(fqn); + if (!parentLocks.isEmpty()) { + DeletionLock lock = parentLocks.get(0); + throw new EntityLockedException( + String.format( + "Cannot create/modify entity under %s. Parent %s is being deleted. " + + "Started at %s, expected completion: %s", + fqn, lock.getEntityFqn(), lock.getLockedAt(), lock.getExpectedCompletion())); + } + } + + /** + * Release deletion lock + */ + @Transaction + public void releaseDeletionLock(UUID entityId, String entityType) { + DeletionLock lock = lockDAO.findByEntity(entityId, entityType); + if (lock != null) { + lockDAO.delete(lock.getId()); + invalidateFqnCache(lock.getEntityFqn()); + LOG.info( + "Released deletion lock for entity {} (was locked by {})", + lock.getEntityFqn(), + lock.getLockedBy()); + } + } + + /** + * Get deletion lock status for an entity + */ + public DeletionLock getLock(UUID entityId, String entityType) { + return lockDAO.findByEntity(entityId, entityType); + } + + /** + * Get all active deletion locks + */ + public List getAllActiveLocks() { + // This would need a new DAO method to get all locks + return Collections.emptyList(); // Placeholder + } + + /** + * Clean up stale locks (for crashed deletions) + * This method should be called periodically by a background job + */ + public void cleanupStaleLocks() { + Instant staleTime = Instant.now().minus(DEFAULT_LOCK_TIMEOUT_HOURS, ChronoUnit.HOURS); + List staleLocks = lockDAO.findStaleLocks(staleTime); + + for (DeletionLock lock : staleLocks) { + LOG.warn( + "Cleaning up stale deletion lock for entity: {} locked at: {} by: {}", + lock.getEntityFqn(), + lock.getLockedAt(), + lock.getLockedBy()); + releaseDeletionLock(lock.getEntityId(), lock.getEntityType()); + } + + if (!staleLocks.isEmpty()) { + LOG.info("Cleaned up {} stale deletion locks", staleLocks.size()); + } + } + + /** + * Force cleanup of all stale locks older than specified duration + */ + public int forceCleanupStaleLocks(Duration olderThan) { + Instant staleTime = Instant.now().minus(olderThan); + int deleted = lockDAO.deleteStaleLocks(staleTime); + + if (deleted > 0) { + // Clear cache as we don't know which FQNs were affected + fqnCache.invalidateAll(); + LOG.info("Force cleaned up {} stale deletion locks older than {}", deleted, olderThan); + } + + return deleted; + } + + /** + * Get lock statistics + */ + public Map getLockStatistics() { + Map stats = new HashMap<>(); + stats.put("totalActiveLocks", lockDAO.countActiveLocks()); + stats.put("cacheSize", fqnCache.estimatedSize()); + + // Add per-entity-type counts if needed + for (String entityType : Entity.getEntityList()) { + int count = lockDAO.countActiveLocksByType(entityType); + if (count > 0) { + stats.put(entityType + "Locks", count); + } + } + + return stats; + } + + // Helper methods + + /** + * Extract parent FQNs from a given FQN + * For example: "service.database.schema.table" returns: + * ["service", "service.database", "service.database.schema"] + */ + private List getParentFqns(String fqn) { + List parents = new ArrayList<>(); + String[] parts = fqn.split("\\."); + + StringBuilder current = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + if (i > 0) { + current.append("."); + } + current.append(parts[i]); + parents.add(current.toString()); + } + + return parents; + } + + /** + * Check if an FQN is locked using cache + */ + private boolean isFqnLocked(String fqn) { + Set lockedPrefixes = fqnCache.get("all", k -> loadLockedFqnPrefixes()); + + // Check if any locked prefix matches this FQN + for (String prefix : lockedPrefixes) { + if (fqn.startsWith(prefix + ".") || fqn.equals(prefix)) { + return true; + } + } + + return false; + } + + /** + * Load all locked FQN prefixes from database + */ + private Set loadLockedFqnPrefixes() { + // This would need a DAO method to get all locked FQNs + // For now, return empty set + return new HashSet<>(); + } + + /** + * Invalidate FQN cache + */ + private void invalidateFqnCache(String fqn) { + fqnCache.invalidate("all"); + } + + /** + * Estimate completion time based on entity type and cascade scope + */ + private Instant estimateCompletionTime(EntityInterface entity, boolean cascade) { + // Simple estimation - can be made more sophisticated based on entity counts + int estimatedMinutes = cascade ? 30 : 5; + + // For certain entity types, adjust the estimate + String entityType = Entity.getEntityTypeFromObject(entity); + if (Entity.DATABASE_SERVICE.equals(entityType)) { + estimatedMinutes = cascade ? 120 : 10; // Database services take longer + } else if (Entity.DATABASE.equals(entityType)) { + estimatedMinutes = cascade ? 60 : 5; + } + + return Instant.now().plus(estimatedMinutes, ChronoUnit.MINUTES); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java index 6174984fd13..675e76f98f1 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java @@ -1189,6 +1189,13 @@ public abstract class EntityResourceTest