mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 04:29:13 +00:00 
			
		
		
		
	* Fix #22440: Performance Optimization for Database Service Cascade Deletion * Fix stylecheck * Fix #22440: Performance Optimization for Database Service Cascade Deletion * Fix stylecheck * Fix styling
This commit is contained in:
		
							parent
							
								
									029632f6cf
								
							
						
					
					
						commit
						e3e248ef0a
					
				@ -70,4 +70,34 @@ CREATE TABLE IF NOT EXISTS worksheet_entity (
 | 
			
		||||
    PRIMARY KEY (id),
 | 
			
		||||
    UNIQUE KEY worksheet_entity_fqn_hash (fqnHash),
 | 
			
		||||
    KEY idx_worksheet_spreadsheet_fqn (spreadsheetFqn)
 | 
			
		||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
 | 
			
		||||
 | 
			
		||||
-- Performance optimization indexes for entity_relationship table
 | 
			
		||||
-- These indexes improve cascade deletion performance
 | 
			
		||||
CREATE INDEX idx_entity_rel_from_delete
 | 
			
		||||
ON entity_relationship(fromId, fromEntity, toId, toEntity, relation);
 | 
			
		||||
 | 
			
		||||
CREATE INDEX idx_entity_rel_to_delete 
 | 
			
		||||
ON entity_relationship(toId, toEntity, fromId, fromEntity, relation);
 | 
			
		||||
 | 
			
		||||
-- Index for cascade queries
 | 
			
		||||
CREATE INDEX idx_entity_rel_cascade 
 | 
			
		||||
ON entity_relationship(fromId, relation, toEntity, toId);
 | 
			
		||||
 | 
			
		||||
-- Entity deletion lock table for preventing orphaned entities during cascade deletion
 | 
			
		||||
CREATE TABLE IF NOT EXISTS entity_deletion_lock (
 | 
			
		||||
    id VARCHAR(36) NOT NULL DEFAULT (UUID()),
 | 
			
		||||
    entityId VARCHAR(36) NOT NULL,
 | 
			
		||||
    entityType VARCHAR(256) NOT NULL,
 | 
			
		||||
    entityFqn VARCHAR(2048) NOT NULL,
 | 
			
		||||
    lockType VARCHAR(50) NOT NULL, -- 'DELETE_IN_PROGRESS', 'DELETE_SCHEDULED'
 | 
			
		||||
    lockedBy VARCHAR(256) NOT NULL,
 | 
			
		||||
    lockedAt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
 | 
			
		||||
    expectedCompletion TIMESTAMP NULL,
 | 
			
		||||
    deletionScope VARCHAR(50), -- 'ENTITY_ONLY', 'CASCADE'
 | 
			
		||||
    metadata JSON,
 | 
			
		||||
    PRIMARY KEY (id),
 | 
			
		||||
    UNIQUE KEY entity_deletion_lock_unique (entityId, entityType),
 | 
			
		||||
    INDEX idx_deletion_lock_fqn (entityFqn(255)),
 | 
			
		||||
    INDEX idx_deletion_lock_time (lockedAt)
 | 
			
		||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
 | 
			
		||||
@ -98,4 +98,39 @@ CREATE INDEX IF NOT EXISTS idx_worksheet_spreadsheet_fqn ON worksheet_entity (sp
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_directory_service ON directory_entity ((json -> 'service' ->> 'id'));
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_file_directory ON file_entity ((json -> 'directory' ->> 'id'));
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_spreadsheet_directory ON spreadsheet_entity ((json -> 'directory' ->> 'id'));
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_worksheet_spreadsheet ON worksheet_entity ((json -> 'spreadsheet' ->> 'id'));
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_worksheet_spreadsheet ON worksheet_entity ((json -> 'spreadsheet' ->> 'id'));
 | 
			
		||||
 | 
			
		||||
-- Performance optimization indexes for entity_relationship table
 | 
			
		||||
-- These indexes improve cascade deletion performance
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_entity_rel_from_delete 
 | 
			
		||||
ON entity_relationship(fromid, fromentity, toid, toentity, relation);
 | 
			
		||||
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_entity_rel_to_delete 
 | 
			
		||||
ON entity_relationship(toid, toentity, fromid, fromentity, relation);
 | 
			
		||||
 | 
			
		||||
-- Index for cascade queries (CONTAINS and PARENT_OF relationships only)
 | 
			
		||||
-- PostgreSQL supports partial indexes
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_entity_rel_cascade 
 | 
			
		||||
ON entity_relationship(fromid, relation, toentity, toid)
 | 
			
		||||
WHERE relation IN (0, 8);
 | 
			
		||||
 | 
			
		||||
-- Entity deletion lock table for preventing orphaned entities during cascade deletion
 | 
			
		||||
CREATE TABLE IF NOT EXISTS entity_deletion_lock (
 | 
			
		||||
    id UUID NOT NULL DEFAULT gen_random_uuid(),
 | 
			
		||||
    entityId UUID NOT NULL,
 | 
			
		||||
    entityType VARCHAR(256) NOT NULL,
 | 
			
		||||
    entityFqn VARCHAR(2048) NOT NULL,
 | 
			
		||||
    lockType VARCHAR(50) NOT NULL, -- 'DELETE_IN_PROGRESS', 'DELETE_SCHEDULED'
 | 
			
		||||
    lockedBy VARCHAR(256) NOT NULL,
 | 
			
		||||
    lockedAt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
 | 
			
		||||
    expectedCompletion TIMESTAMP NULL,
 | 
			
		||||
    deletionScope VARCHAR(50), -- 'ENTITY_ONLY', 'CASCADE'
 | 
			
		||||
    metadata JSONB,
 | 
			
		||||
    PRIMARY KEY (id),
 | 
			
		||||
    UNIQUE (entityId, entityType)
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
-- Create indexes for deletion lock table
 | 
			
		||||
-- Use btree index for entityFqn prefix matching
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_deletion_lock_fqn ON entity_deletion_lock(entityFqn);
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_deletion_lock_time ON entity_deletion_lock(lockedAt);
 | 
			
		||||
@ -16,6 +16,7 @@ package org.openmetadata.service.cache;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.openmetadata.service.jdbi3.CollectionDAO;
 | 
			
		||||
import org.openmetadata.service.jdbi3.CollectionDAO.*;
 | 
			
		||||
import org.openmetadata.service.jdbi3.DeletionLockDAO;
 | 
			
		||||
import org.openmetadata.service.jdbi3.IndexMappingVersionDAO;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
@ -463,6 +464,11 @@ public class CachedCollectionDAO implements CollectionDAO {
 | 
			
		||||
    return delegate.worksheetDAO();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Override
 | 
			
		||||
  public DeletionLockDAO deletionLockDAO() {
 | 
			
		||||
    return delegate.deletionLockDAO();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Override
 | 
			
		||||
  public IndexMappingVersionDAO indexMappingVersionDAO() {
 | 
			
		||||
    return delegate.indexMappingVersionDAO();
 | 
			
		||||
 | 
			
		||||
@ -718,4 +718,45 @@ public class CachedEntityRelationshipDAO implements CollectionDAO.EntityRelation
 | 
			
		||||
      LOG.debug("Prefetch failed for {} ({}): {}", entityId, entityType, e.getMessage());
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // Delegate new batch deletion methods to the underlying implementation
 | 
			
		||||
  @Override
 | 
			
		||||
  public void deleteAllFrom(UUID id, String entity) {
 | 
			
		||||
    delegate.deleteAllFrom(id, entity);
 | 
			
		||||
    evictEntityFromCache(id, entity);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Override
 | 
			
		||||
  public void deleteAllTo(UUID id, String entity) {
 | 
			
		||||
    delegate.deleteAllTo(id, entity);
 | 
			
		||||
    evictEntityFromCache(id, entity);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Override
 | 
			
		||||
  public void batchDeleteFrom(List<String> ids, String entityType) {
 | 
			
		||||
    delegate.batchDeleteFrom(ids, entityType);
 | 
			
		||||
    // Evict cache for all entities in the batch
 | 
			
		||||
    for (String idStr : ids) {
 | 
			
		||||
      try {
 | 
			
		||||
        UUID id = UUID.fromString(idStr);
 | 
			
		||||
        evictEntityFromCache(id, entityType);
 | 
			
		||||
      } catch (IllegalArgumentException e) {
 | 
			
		||||
        LOG.warn("Invalid UUID in batch delete: {}", idStr);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Override
 | 
			
		||||
  public void batchDeleteTo(List<String> ids, String entityType) {
 | 
			
		||||
    delegate.batchDeleteTo(ids, entityType);
 | 
			
		||||
    // Evict cache for all entities in the batch
 | 
			
		||||
    for (String idStr : ids) {
 | 
			
		||||
      try {
 | 
			
		||||
        UUID id = UUID.fromString(idStr);
 | 
			
		||||
        evictEntityFromCache(id, entityType);
 | 
			
		||||
      } catch (IllegalArgumentException e) {
 | 
			
		||||
        LOG.warn("Invalid UUID in batch delete: {}", idStr);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,34 @@
 | 
			
		||||
package org.openmetadata.service.exception;
 | 
			
		||||
 | 
			
		||||
import jakarta.ws.rs.core.Response;
 | 
			
		||||
import org.openmetadata.sdk.exception.WebServiceException;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Exception thrown when an entity operation is blocked due to an active deletion lock
 | 
			
		||||
 * on the entity or its parent.
 | 
			
		||||
 */
 | 
			
		||||
public class EntityLockedException extends WebServiceException {
 | 
			
		||||
 | 
			
		||||
  private static final String ERROR_TYPE = "ENTITY_LOCKED";
 | 
			
		||||
 | 
			
		||||
  public EntityLockedException(String message) {
 | 
			
		||||
    super(Response.Status.CONFLICT, ERROR_TYPE, message);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public EntityLockedException(String message, Throwable cause) {
 | 
			
		||||
    super(Response.Status.CONFLICT, ERROR_TYPE, message, cause);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public static EntityLockedException byMessage(
 | 
			
		||||
      String entityType, String entityName, String reason) {
 | 
			
		||||
    return new EntityLockedException(
 | 
			
		||||
        String.format("Cannot modify %s '%s': %s", entityType, entityName, reason));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public static EntityLockedException parentBeingDeleted(String entityFqn, String parentFqn) {
 | 
			
		||||
    return new EntityLockedException(
 | 
			
		||||
        String.format(
 | 
			
		||||
            "Cannot create or modify entity '%s' because parent '%s' is being deleted",
 | 
			
		||||
            entityFqn, parentFqn));
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,71 @@
 | 
			
		||||
package org.openmetadata.service.initialization;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.openmetadata.service.Entity;
 | 
			
		||||
import org.openmetadata.service.jdbi3.EntityRepository;
 | 
			
		||||
import org.openmetadata.service.lock.HierarchicalLockManager;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Initializes the hierarchical lock manager for OpenMetadata entity deletion optimization.
 | 
			
		||||
 * This should be called during application startup.
 | 
			
		||||
 */
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class LockManagerInitializer {
 | 
			
		||||
 | 
			
		||||
  private static volatile boolean initialized = false;
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Initialize the lock manager. This method is safe to call multiple times.
 | 
			
		||||
   */
 | 
			
		||||
  public static void initialize() {
 | 
			
		||||
    if (initialized) {
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    synchronized (LockManagerInitializer.class) {
 | 
			
		||||
      if (initialized) {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      try {
 | 
			
		||||
        LOG.info("Initializing hierarchical lock manager for entity deletion optimization");
 | 
			
		||||
 | 
			
		||||
        // Get the collection DAO
 | 
			
		||||
        var collectionDAO = Entity.getCollectionDAO();
 | 
			
		||||
        if (collectionDAO == null) {
 | 
			
		||||
          LOG.warn("CollectionDAO not available, skipping lock manager initialization");
 | 
			
		||||
          return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Initialize the lock manager
 | 
			
		||||
        HierarchicalLockManager lockManager =
 | 
			
		||||
            new HierarchicalLockManager(collectionDAO.deletionLockDAO());
 | 
			
		||||
 | 
			
		||||
        // Set it on EntityRepository
 | 
			
		||||
        EntityRepository.setLockManager(lockManager);
 | 
			
		||||
 | 
			
		||||
        initialized = true;
 | 
			
		||||
        LOG.info("Hierarchical lock manager initialized successfully");
 | 
			
		||||
 | 
			
		||||
      } catch (Exception e) {
 | 
			
		||||
        LOG.error("Failed to initialize hierarchical lock manager: {}", e.getMessage(), e);
 | 
			
		||||
        // Continue without locking for backward compatibility
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Check if the lock manager is initialized
 | 
			
		||||
   */
 | 
			
		||||
  public static boolean isInitialized() {
 | 
			
		||||
    return initialized;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Force re-initialization (for testing)
 | 
			
		||||
   */
 | 
			
		||||
  public static void forceReinitialize() {
 | 
			
		||||
    initialized = false;
 | 
			
		||||
    initialize();
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@ -415,6 +415,9 @@ public interface CollectionDAO {
 | 
			
		||||
  @CreateSqlObject
 | 
			
		||||
  WorkflowInstanceStateTimeSeriesDAO workflowInstanceStateTimeSeriesDAO();
 | 
			
		||||
 | 
			
		||||
  @CreateSqlObject
 | 
			
		||||
  DeletionLockDAO deletionLockDAO();
 | 
			
		||||
 | 
			
		||||
  interface DashboardDAO extends EntityDAO<Dashboard> {
 | 
			
		||||
    @Override
 | 
			
		||||
    default String getTableName() {
 | 
			
		||||
@ -1606,10 +1609,47 @@ public interface CollectionDAO {
 | 
			
		||||
        @Bind("toEntity") String toEntity,
 | 
			
		||||
        @Bind("relation") int relation);
 | 
			
		||||
 | 
			
		||||
    // Optimized deleteAll implementation that splits OR query for better performance
 | 
			
		||||
    @Transaction
 | 
			
		||||
    default void deleteAll(UUID id, String entity) {
 | 
			
		||||
      // Split OR query into two separate deletes for better index usage
 | 
			
		||||
      deleteAllFrom(id, entity);
 | 
			
		||||
      deleteAllTo(id, entity);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @SqlUpdate("DELETE FROM entity_relationship WHERE fromId = :id AND fromEntity = :entity")
 | 
			
		||||
    void deleteAllFrom(@BindUUID("id") UUID id, @Bind("entity") String entity);
 | 
			
		||||
 | 
			
		||||
    @SqlUpdate("DELETE FROM entity_relationship WHERE toId = :id AND toEntity = :entity")
 | 
			
		||||
    void deleteAllTo(@BindUUID("id") UUID id, @Bind("entity") String entity);
 | 
			
		||||
 | 
			
		||||
    // Batch deletion methods for improved performance
 | 
			
		||||
    @Transaction
 | 
			
		||||
    default void batchDeleteRelationships(List<UUID> entityIds, String entityType) {
 | 
			
		||||
      if (entityIds == null || entityIds.isEmpty()) {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      // Process in chunks of 500 to avoid hitting database query limits
 | 
			
		||||
      int batchSize = 500;
 | 
			
		||||
      for (int i = 0; i < entityIds.size(); i += batchSize) {
 | 
			
		||||
        int endIndex = Math.min(i + batchSize, entityIds.size());
 | 
			
		||||
        List<String> batch =
 | 
			
		||||
            entityIds.subList(i, endIndex).stream()
 | 
			
		||||
                .map(UUID::toString)
 | 
			
		||||
                .collect(Collectors.toList());
 | 
			
		||||
 | 
			
		||||
        batchDeleteFrom(batch, entityType);
 | 
			
		||||
        batchDeleteTo(batch, entityType);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @SqlUpdate(
 | 
			
		||||
        "DELETE from entity_relationship WHERE (toId = :id AND toEntity = :entity) OR "
 | 
			
		||||
            + "(fromId = :id AND fromEntity = :entity)")
 | 
			
		||||
    void deleteAll(@BindUUID("id") UUID id, @Bind("entity") String entity);
 | 
			
		||||
        "DELETE FROM entity_relationship WHERE fromId IN (<ids>) AND fromEntity = :entityType")
 | 
			
		||||
    void batchDeleteFrom(@BindList("ids") List<String> ids, @Bind("entityType") String entityType);
 | 
			
		||||
 | 
			
		||||
    @SqlUpdate("DELETE FROM entity_relationship WHERE toId IN (<ids>) AND toEntity = :entityType")
 | 
			
		||||
    void batchDeleteTo(@BindList("ids") List<String> ids, @Bind("entityType") String entityType);
 | 
			
		||||
 | 
			
		||||
    @SqlUpdate(
 | 
			
		||||
        "DELETE FROM entity_relationship "
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,114 @@
 | 
			
		||||
package org.openmetadata.service.jdbi3;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonInclude;
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import java.time.Instant;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import lombok.AllArgsConstructor;
 | 
			
		||||
import lombok.Builder;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.NoArgsConstructor;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Entity representing a deletion lock that prevents concurrent modifications
 | 
			
		||||
 * during cascade deletion operations.
 | 
			
		||||
 */
 | 
			
		||||
@Data
 | 
			
		||||
@Builder
 | 
			
		||||
@NoArgsConstructor
 | 
			
		||||
@AllArgsConstructor
 | 
			
		||||
@JsonInclude(JsonInclude.Include.NON_NULL)
 | 
			
		||||
public class DeletionLock {
 | 
			
		||||
 | 
			
		||||
  private UUID id;
 | 
			
		||||
  private UUID entityId;
 | 
			
		||||
  private String entityType;
 | 
			
		||||
  private String entityFqn;
 | 
			
		||||
  private String lockType; // DELETE_IN_PROGRESS, DELETE_SCHEDULED
 | 
			
		||||
  private String lockedBy;
 | 
			
		||||
  private Instant lockedAt;
 | 
			
		||||
  private Instant expectedCompletion;
 | 
			
		||||
  private String deletionScope; // ENTITY_ONLY, CASCADE
 | 
			
		||||
  private JsonNode metadata;
 | 
			
		||||
 | 
			
		||||
  public enum LockType {
 | 
			
		||||
    DELETE_IN_PROGRESS("DELETE_IN_PROGRESS"),
 | 
			
		||||
    DELETE_SCHEDULED("DELETE_SCHEDULED");
 | 
			
		||||
 | 
			
		||||
    private final String value;
 | 
			
		||||
 | 
			
		||||
    LockType(String value) {
 | 
			
		||||
      this.value = value;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public String getValue() {
 | 
			
		||||
      return value;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public enum DeletionScope {
 | 
			
		||||
    ENTITY_ONLY("ENTITY_ONLY"),
 | 
			
		||||
    CASCADE("CASCADE");
 | 
			
		||||
 | 
			
		||||
    private final String value;
 | 
			
		||||
 | 
			
		||||
    DeletionScope(String value) {
 | 
			
		||||
      this.value = value;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public String getValue() {
 | 
			
		||||
      return value;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // Builder helper methods
 | 
			
		||||
  public DeletionLock withId(UUID id) {
 | 
			
		||||
    this.id = id;
 | 
			
		||||
    return this;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public DeletionLock withEntityId(UUID entityId) {
 | 
			
		||||
    this.entityId = entityId;
 | 
			
		||||
    return this;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public DeletionLock withEntityType(String entityType) {
 | 
			
		||||
    this.entityType = entityType;
 | 
			
		||||
    return this;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public DeletionLock withEntityFqn(String entityFqn) {
 | 
			
		||||
    this.entityFqn = entityFqn;
 | 
			
		||||
    return this;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public DeletionLock withLockType(String lockType) {
 | 
			
		||||
    this.lockType = lockType;
 | 
			
		||||
    return this;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public DeletionLock withLockedBy(String lockedBy) {
 | 
			
		||||
    this.lockedBy = lockedBy;
 | 
			
		||||
    return this;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public DeletionLock withLockedAt(Instant lockedAt) {
 | 
			
		||||
    this.lockedAt = lockedAt;
 | 
			
		||||
    return this;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public DeletionLock withExpectedCompletion(Instant expectedCompletion) {
 | 
			
		||||
    this.expectedCompletion = expectedCompletion;
 | 
			
		||||
    return this;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public DeletionLock withDeletionScope(String deletionScope) {
 | 
			
		||||
    this.deletionScope = deletionScope;
 | 
			
		||||
    return this;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public DeletionLock withMetadata(JsonNode metadata) {
 | 
			
		||||
    this.metadata = metadata;
 | 
			
		||||
    return this;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,59 @@
 | 
			
		||||
package org.openmetadata.service.jdbi3;
 | 
			
		||||
 | 
			
		||||
import java.time.Instant;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
 | 
			
		||||
import org.jdbi.v3.sqlobject.customizer.Bind;
 | 
			
		||||
import org.jdbi.v3.sqlobject.customizer.BindBean;
 | 
			
		||||
import org.jdbi.v3.sqlobject.statement.SqlQuery;
 | 
			
		||||
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
 | 
			
		||||
import org.openmetadata.service.util.jdbi.BindUUID;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * DAO for managing entity deletion locks to prevent concurrent modifications
 | 
			
		||||
 * during cascade deletion operations.
 | 
			
		||||
 */
 | 
			
		||||
@RegisterRowMapper(DeletionLockMapper.class)
 | 
			
		||||
public interface DeletionLockDAO {
 | 
			
		||||
 | 
			
		||||
  @SqlUpdate(
 | 
			
		||||
      "INSERT INTO entity_deletion_lock (id, entityId, entityType, entityFqn, lockType, "
 | 
			
		||||
          + "lockedBy, lockedAt, expectedCompletion, deletionScope, metadata) "
 | 
			
		||||
          + "VALUES (:id, :entityId, :entityType, :entityFqn, :lockType, "
 | 
			
		||||
          + ":lockedBy, :lockedAt, :expectedCompletion, :deletionScope, :metadata)")
 | 
			
		||||
  void insert(@BindBean DeletionLock lock);
 | 
			
		||||
 | 
			
		||||
  @SqlQuery(
 | 
			
		||||
      "SELECT * FROM entity_deletion_lock WHERE entityId = :entityId AND entityType = :entityType")
 | 
			
		||||
  DeletionLock findByEntity(
 | 
			
		||||
      @BindUUID("entityId") UUID entityId, @Bind("entityType") String entityType);
 | 
			
		||||
 | 
			
		||||
  @SqlQuery("SELECT * FROM entity_deletion_lock WHERE entityFqn LIKE :fqnPrefix || '%'")
 | 
			
		||||
  List<DeletionLock> findByFqnPrefix(@Bind("fqnPrefix") String fqnPrefix);
 | 
			
		||||
 | 
			
		||||
  @SqlQuery(
 | 
			
		||||
      "SELECT * FROM entity_deletion_lock WHERE "
 | 
			
		||||
          + "entityFqn = :fqn OR "
 | 
			
		||||
          + ":fqn LIKE entityFqn || '.%'")
 | 
			
		||||
  List<DeletionLock> findParentLocks(@Bind("fqn") String fqn);
 | 
			
		||||
 | 
			
		||||
  @SqlQuery("SELECT * FROM entity_deletion_lock WHERE lockedAt < :staleTime")
 | 
			
		||||
  List<DeletionLock> findStaleLocks(@Bind("staleTime") Instant staleTime);
 | 
			
		||||
 | 
			
		||||
  @SqlUpdate("DELETE FROM entity_deletion_lock WHERE id = :id")
 | 
			
		||||
  void delete(@BindUUID("id") UUID id);
 | 
			
		||||
 | 
			
		||||
  @SqlUpdate(
 | 
			
		||||
      "DELETE FROM entity_deletion_lock WHERE entityId = :entityId AND entityType = :entityType")
 | 
			
		||||
  void deleteByEntity(@BindUUID("entityId") UUID entityId, @Bind("entityType") String entityType);
 | 
			
		||||
 | 
			
		||||
  @SqlUpdate("DELETE FROM entity_deletion_lock WHERE lockedAt < :staleTime")
 | 
			
		||||
  int deleteStaleLocks(@Bind("staleTime") Instant staleTime);
 | 
			
		||||
 | 
			
		||||
  @SqlQuery("SELECT COUNT(*) FROM entity_deletion_lock")
 | 
			
		||||
  int countActiveLocks();
 | 
			
		||||
 | 
			
		||||
  @SqlQuery("SELECT COUNT(*) FROM entity_deletion_lock WHERE entityType = :entityType")
 | 
			
		||||
  int countActiveLocksByType(@Bind("entityType") String entityType);
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,54 @@
 | 
			
		||||
package org.openmetadata.service.jdbi3;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import java.sql.ResultSet;
 | 
			
		||||
import java.sql.SQLException;
 | 
			
		||||
import java.sql.Timestamp;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import org.jdbi.v3.core.mapper.RowMapper;
 | 
			
		||||
import org.jdbi.v3.core.statement.StatementContext;
 | 
			
		||||
import org.openmetadata.schema.utils.JsonUtils;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * JDBI Row mapper for DeletionLock entity
 | 
			
		||||
 */
 | 
			
		||||
public class DeletionLockMapper implements RowMapper<DeletionLock> {
 | 
			
		||||
 | 
			
		||||
  @Override
 | 
			
		||||
  public DeletionLock map(ResultSet rs, StatementContext ctx) throws SQLException {
 | 
			
		||||
    DeletionLock lock = new DeletionLock();
 | 
			
		||||
 | 
			
		||||
    lock.setId(UUID.fromString(rs.getString("id")));
 | 
			
		||||
    lock.setEntityId(UUID.fromString(rs.getString("entityId")));
 | 
			
		||||
    lock.setEntityType(rs.getString("entityType"));
 | 
			
		||||
    lock.setEntityFqn(rs.getString("entityFqn"));
 | 
			
		||||
    lock.setLockType(rs.getString("lockType"));
 | 
			
		||||
    lock.setLockedBy(rs.getString("lockedBy"));
 | 
			
		||||
 | 
			
		||||
    // Convert Timestamp to Instant
 | 
			
		||||
    Timestamp lockedAtTs = rs.getTimestamp("lockedAt");
 | 
			
		||||
    if (lockedAtTs != null) {
 | 
			
		||||
      lock.setLockedAt(lockedAtTs.toInstant());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Timestamp expectedCompletionTs = rs.getTimestamp("expectedCompletion");
 | 
			
		||||
    if (expectedCompletionTs != null) {
 | 
			
		||||
      lock.setExpectedCompletion(expectedCompletionTs.toInstant());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    lock.setDeletionScope(rs.getString("deletionScope"));
 | 
			
		||||
 | 
			
		||||
    // Parse JSON metadata
 | 
			
		||||
    String metadataJson = rs.getString("metadata");
 | 
			
		||||
    if (metadataJson != null) {
 | 
			
		||||
      try {
 | 
			
		||||
        JsonNode metadata = JsonUtils.readTree(metadataJson);
 | 
			
		||||
        lock.setMetadata(metadata);
 | 
			
		||||
      } catch (Exception e) {
 | 
			
		||||
        // Log error but don't fail
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return lock;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@ -182,6 +182,7 @@ import org.openmetadata.service.OpenMetadataApplicationConfig;
 | 
			
		||||
import org.openmetadata.service.TypeRegistry;
 | 
			
		||||
import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher;
 | 
			
		||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
 | 
			
		||||
import org.openmetadata.service.exception.EntityLockedException;
 | 
			
		||||
import org.openmetadata.service.exception.EntityNotFoundException;
 | 
			
		||||
import org.openmetadata.service.exception.PreconditionFailedException;
 | 
			
		||||
import org.openmetadata.service.exception.UnhandledServerException;
 | 
			
		||||
@ -191,6 +192,7 @@ import org.openmetadata.service.jdbi3.CollectionDAO.ExtensionRecord;
 | 
			
		||||
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
 | 
			
		||||
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
 | 
			
		||||
import org.openmetadata.service.jobs.JobDAO;
 | 
			
		||||
import org.openmetadata.service.lock.HierarchicalLockManager;
 | 
			
		||||
import org.openmetadata.service.resources.tags.TagLabelUtil;
 | 
			
		||||
import org.openmetadata.service.resources.teams.RoleResource;
 | 
			
		||||
import org.openmetadata.service.search.SearchListFilter;
 | 
			
		||||
@ -298,6 +300,14 @@ public abstract class EntityRepository<T extends EntityInterface> {
 | 
			
		||||
 | 
			
		||||
  protected final ChangeSummarizer<T> changeSummarizer;
 | 
			
		||||
 | 
			
		||||
  // Lock manager for preventing orphaned entities during cascade deletion
 | 
			
		||||
  private static HierarchicalLockManager lockManager;
 | 
			
		||||
 | 
			
		||||
  // Static setter for lock manager initialization
 | 
			
		||||
  public static void setLockManager(HierarchicalLockManager manager) {
 | 
			
		||||
    lockManager = manager;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public boolean isSupportsOwners() {
 | 
			
		||||
    return supportsOwners;
 | 
			
		||||
  }
 | 
			
		||||
@ -1061,6 +1071,16 @@ public abstract class EntityRepository<T extends EntityInterface> {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public final T createInternal(T entity) {
 | 
			
		||||
    // Check if parent entity is being deleted
 | 
			
		||||
    if (lockManager != null) {
 | 
			
		||||
      try {
 | 
			
		||||
        lockManager.checkModificationAllowed(entity);
 | 
			
		||||
      } catch (EntityLockedException e) {
 | 
			
		||||
        LOG.warn("Entity creation blocked due to parent deletion: {}", e.getMessage());
 | 
			
		||||
        throw e;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    prepareInternal(entity, false);
 | 
			
		||||
    return createNewEntity(entity);
 | 
			
		||||
  }
 | 
			
		||||
@ -1127,6 +1147,16 @@ public abstract class EntityRepository<T extends EntityInterface> {
 | 
			
		||||
 | 
			
		||||
  @Transaction
 | 
			
		||||
  public final PutResponse<T> createOrUpdate(UriInfo uriInfo, T updated, String updatedBy) {
 | 
			
		||||
    // Check if parent entity is being deleted
 | 
			
		||||
    if (lockManager != null) {
 | 
			
		||||
      try {
 | 
			
		||||
        lockManager.checkModificationAllowed(updated);
 | 
			
		||||
      } catch (EntityLockedException e) {
 | 
			
		||||
        LOG.warn("Entity update blocked due to parent deletion: {}", e.getMessage());
 | 
			
		||||
        throw e;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    T original = findByNameOrNull(updated.getFullyQualifiedName(), ALL);
 | 
			
		||||
    if (original == null) { // If an original entity does not exist then create it, else update
 | 
			
		||||
      return new PutResponse<>(
 | 
			
		||||
@ -1539,23 +1569,57 @@ public abstract class EntityRepository<T extends EntityInterface> {
 | 
			
		||||
    checkSystemEntityDeletion(original);
 | 
			
		||||
    preDelete(original, deletedBy);
 | 
			
		||||
    setFieldsInternal(original, putFields);
 | 
			
		||||
    deleteChildren(original.getId(), recursive, hardDelete, deletedBy);
 | 
			
		||||
 | 
			
		||||
    EventType changeType;
 | 
			
		||||
    T updated = get(null, original.getId(), putFields, ALL, false);
 | 
			
		||||
    if (supportsSoftDelete && !hardDelete) {
 | 
			
		||||
      updated.setUpdatedBy(deletedBy);
 | 
			
		||||
      updated.setUpdatedAt(System.currentTimeMillis());
 | 
			
		||||
      updated.setDeleted(true);
 | 
			
		||||
      EntityUpdater updater = getUpdater(original, updated, Operation.SOFT_DELETE, null);
 | 
			
		||||
      updater.update();
 | 
			
		||||
      changeType = ENTITY_SOFT_DELETED;
 | 
			
		||||
    } else {
 | 
			
		||||
      cleanup(updated);
 | 
			
		||||
      changeType = ENTITY_DELETED;
 | 
			
		||||
    // Acquire deletion lock to prevent concurrent modifications
 | 
			
		||||
    DeletionLock lock = null;
 | 
			
		||||
    if (lockManager != null && recursive) {
 | 
			
		||||
      try {
 | 
			
		||||
        lock = lockManager.acquireDeletionLock(original, deletedBy, recursive);
 | 
			
		||||
        LOG.info("Acquired deletion lock for {} {}", entityType, original.getId());
 | 
			
		||||
      } catch (Exception e) {
 | 
			
		||||
        LOG.error(
 | 
			
		||||
            "Failed to acquire deletion lock for {} {}: {}",
 | 
			
		||||
            entityType,
 | 
			
		||||
            original.getId(),
 | 
			
		||||
            e.getMessage());
 | 
			
		||||
        // Continue without lock for backward compatibility
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    try {
 | 
			
		||||
      deleteChildren(original.getId(), recursive, hardDelete, deletedBy);
 | 
			
		||||
 | 
			
		||||
      EventType changeType;
 | 
			
		||||
      T updated = get(null, original.getId(), putFields, ALL, false);
 | 
			
		||||
      if (supportsSoftDelete && !hardDelete) {
 | 
			
		||||
        updated.setUpdatedBy(deletedBy);
 | 
			
		||||
        updated.setUpdatedAt(System.currentTimeMillis());
 | 
			
		||||
        updated.setDeleted(true);
 | 
			
		||||
        EntityUpdater updater = getUpdater(original, updated, Operation.SOFT_DELETE, null);
 | 
			
		||||
        updater.update();
 | 
			
		||||
        changeType = ENTITY_SOFT_DELETED;
 | 
			
		||||
      } else {
 | 
			
		||||
        cleanup(updated);
 | 
			
		||||
        changeType = ENTITY_DELETED;
 | 
			
		||||
      }
 | 
			
		||||
      LOG.info("{} deleted {}", hardDelete ? "Hard" : "Soft", updated.getFullyQualifiedName());
 | 
			
		||||
      return new DeleteResponse<>(updated, changeType);
 | 
			
		||||
 | 
			
		||||
    } finally {
 | 
			
		||||
      // Always release the lock
 | 
			
		||||
      if (lock != null && lockManager != null) {
 | 
			
		||||
        try {
 | 
			
		||||
          lockManager.releaseDeletionLock(original.getId(), entityType);
 | 
			
		||||
          LOG.info("Released deletion lock for {} {}", entityType, original.getId());
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
          LOG.error(
 | 
			
		||||
              "Failed to release deletion lock for {} {}: {}",
 | 
			
		||||
              entityType,
 | 
			
		||||
              original.getId(),
 | 
			
		||||
              e.getMessage());
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    LOG.info("{} deleted {}", hardDelete ? "Hard" : "Soft", updated.getFullyQualifiedName());
 | 
			
		||||
    return new DeleteResponse<>(updated, changeType);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Transaction
 | 
			
		||||
@ -1610,18 +1674,122 @@ public abstract class EntityRepository<T extends EntityInterface> {
 | 
			
		||||
  @Transaction
 | 
			
		||||
  protected void deleteChildren(
 | 
			
		||||
      List<EntityRelationshipRecord> children, boolean hardDelete, String updatedBy) {
 | 
			
		||||
    for (EntityRelationshipRecord entityRelationshipRecord : children) {
 | 
			
		||||
      LOG.info(
 | 
			
		||||
          "Recursively {} deleting {} {}",
 | 
			
		||||
          hardDelete ? "hard" : "soft",
 | 
			
		||||
          entityRelationshipRecord.getType(),
 | 
			
		||||
          entityRelationshipRecord.getId());
 | 
			
		||||
      Entity.deleteEntity(
 | 
			
		||||
          updatedBy,
 | 
			
		||||
          entityRelationshipRecord.getType(),
 | 
			
		||||
          entityRelationshipRecord.getId(),
 | 
			
		||||
          true,
 | 
			
		||||
          hardDelete);
 | 
			
		||||
    // Use batch deletion only for hard deletes with large numbers of children
 | 
			
		||||
    // For soft deletes, we must maintain the correct order for restoration to work properly
 | 
			
		||||
    if (hardDelete && children.size() > 100) {
 | 
			
		||||
      LOG.info("Using batch deletion for {} children entities", children.size());
 | 
			
		||||
      batchDeleteChildren(children, hardDelete, updatedBy);
 | 
			
		||||
    } else {
 | 
			
		||||
      // For soft deletes or small numbers, use original sequential deletion
 | 
			
		||||
      // This ensures proper parent-child relationships are maintained for restoration
 | 
			
		||||
      for (EntityRelationshipRecord entityRelationshipRecord : children) {
 | 
			
		||||
        LOG.info(
 | 
			
		||||
            "Recursively {} deleting {} {}",
 | 
			
		||||
            hardDelete ? "hard" : "soft",
 | 
			
		||||
            entityRelationshipRecord.getType(),
 | 
			
		||||
            entityRelationshipRecord.getId());
 | 
			
		||||
        Entity.deleteEntity(
 | 
			
		||||
            updatedBy,
 | 
			
		||||
            entityRelationshipRecord.getType(),
 | 
			
		||||
            entityRelationshipRecord.getId(),
 | 
			
		||||
            true,
 | 
			
		||||
            hardDelete);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Batch deletion of children entities for improved performance
 | 
			
		||||
   */
 | 
			
		||||
  @Transaction
 | 
			
		||||
  protected void batchDeleteChildren(
 | 
			
		||||
      List<EntityRelationshipRecord> children, boolean hardDelete, String updatedBy) {
 | 
			
		||||
 | 
			
		||||
    // Group entities by type for batch processing
 | 
			
		||||
    Map<String, List<UUID>> entitiesByType =
 | 
			
		||||
        children.stream()
 | 
			
		||||
            .collect(
 | 
			
		||||
                Collectors.groupingBy(
 | 
			
		||||
                    EntityRelationshipRecord::getType,
 | 
			
		||||
                    Collectors.mapping(EntityRelationshipRecord::getId, Collectors.toList())));
 | 
			
		||||
 | 
			
		||||
    LOG.info("Batch deleting {} entities across {} types", children.size(), entitiesByType.size());
 | 
			
		||||
 | 
			
		||||
    // Process deletion in levels to handle cascading properly
 | 
			
		||||
    for (Map.Entry<String, List<UUID>> entry : entitiesByType.entrySet()) {
 | 
			
		||||
      String childEntityType = entry.getKey();
 | 
			
		||||
      List<UUID> entityIds = entry.getValue();
 | 
			
		||||
 | 
			
		||||
      LOG.info("Batch processing {} entities of type {}", entityIds.size(), childEntityType);
 | 
			
		||||
 | 
			
		||||
      // Process in smaller batches to avoid overwhelming the system
 | 
			
		||||
      int batchSize = 50;
 | 
			
		||||
      for (int i = 0; i < entityIds.size(); i += batchSize) {
 | 
			
		||||
        List<UUID> batch = entityIds.subList(i, Math.min(i + batchSize, entityIds.size()));
 | 
			
		||||
        processDeletionBatch(batch, childEntityType, hardDelete, updatedBy);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Process a batch of entities for deletion
 | 
			
		||||
   */
 | 
			
		||||
  @Transaction
 | 
			
		||||
  private void processDeletionBatch(
 | 
			
		||||
      List<UUID> entityIds, String entityType, boolean hardDelete, String updatedBy) {
 | 
			
		||||
 | 
			
		||||
    LOG.debug("Processing batch of {} {} entities", entityIds.size(), entityType);
 | 
			
		||||
 | 
			
		||||
    // First, collect all grandchildren that need to be deleted
 | 
			
		||||
    List<EntityRelationshipRecord> allGrandchildren = new ArrayList<>();
 | 
			
		||||
    for (UUID entityId : entityIds) {
 | 
			
		||||
      List<EntityRelationshipRecord> grandchildren =
 | 
			
		||||
          daoCollection
 | 
			
		||||
              .relationshipDAO()
 | 
			
		||||
              .findTo(
 | 
			
		||||
                  entityId,
 | 
			
		||||
                  entityType,
 | 
			
		||||
                  List.of(Relationship.CONTAINS.ordinal(), Relationship.PARENT_OF.ordinal()));
 | 
			
		||||
      allGrandchildren.addAll(grandchildren);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Recursively delete grandchildren first
 | 
			
		||||
    if (!allGrandchildren.isEmpty()) {
 | 
			
		||||
      LOG.info("Found {} grandchildren to delete first", allGrandchildren.size());
 | 
			
		||||
      deleteChildren(allGrandchildren, hardDelete, updatedBy);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Now batch delete the entities at this level
 | 
			
		||||
    List<String> stringIds = entityIds.stream().map(UUID::toString).collect(Collectors.toList());
 | 
			
		||||
 | 
			
		||||
    // Only delete relationships for hard delete
 | 
			
		||||
    // For soft delete, relationships must be preserved for restoration
 | 
			
		||||
    if (hardDelete) {
 | 
			
		||||
      // Batch delete relationships for all entities
 | 
			
		||||
      daoCollection.relationshipDAO().batchDeleteFrom(stringIds, entityType);
 | 
			
		||||
      daoCollection.relationshipDAO().batchDeleteTo(stringIds, entityType);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Delete or soft-delete the entities themselves
 | 
			
		||||
    for (UUID entityId : entityIds) {
 | 
			
		||||
      try {
 | 
			
		||||
        @SuppressWarnings("rawtypes")
 | 
			
		||||
        EntityRepository repository = Entity.getEntityRepository(entityType);
 | 
			
		||||
        if (repository.supportsSoftDelete && !hardDelete) {
 | 
			
		||||
          // Soft delete
 | 
			
		||||
          EntityInterface entity = repository.find(entityId, Include.ALL);
 | 
			
		||||
          entity.setUpdatedBy(updatedBy);
 | 
			
		||||
          entity.setUpdatedAt(System.currentTimeMillis());
 | 
			
		||||
          entity.setDeleted(true);
 | 
			
		||||
          repository.dao.update(entity);
 | 
			
		||||
        } else {
 | 
			
		||||
          // Hard delete
 | 
			
		||||
          EntityInterface entity = repository.find(entityId, Include.ALL);
 | 
			
		||||
          repository.cleanup(entity);
 | 
			
		||||
        }
 | 
			
		||||
      } catch (Exception e) {
 | 
			
		||||
        LOG.error("Error deleting entity {} of type {}: {}", entityId, entityType, e.getMessage());
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,308 @@
 | 
			
		||||
package org.openmetadata.service.lock;
 | 
			
		||||
 | 
			
		||||
import com.github.benmanes.caffeine.cache.Cache;
 | 
			
		||||
import com.github.benmanes.caffeine.cache.Caffeine;
 | 
			
		||||
import java.time.Duration;
 | 
			
		||||
import java.time.Instant;
 | 
			
		||||
import java.time.temporal.ChronoUnit;
 | 
			
		||||
import java.util.*;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
 | 
			
		||||
import org.openmetadata.schema.EntityInterface;
 | 
			
		||||
import org.openmetadata.service.Entity;
 | 
			
		||||
import org.openmetadata.service.exception.EntityLockedException;
 | 
			
		||||
import org.openmetadata.service.jdbi3.DeletionLock;
 | 
			
		||||
import org.openmetadata.service.jdbi3.DeletionLockDAO;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Manages hierarchical locks for entity deletion to prevent orphaned entities
 | 
			
		||||
 * during cascade deletion operations.
 | 
			
		||||
 */
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class HierarchicalLockManager {
 | 
			
		||||
 | 
			
		||||
  private final DeletionLockDAO lockDAO;
 | 
			
		||||
 | 
			
		||||
  // Cache of locked FQN prefixes for fast lookup
 | 
			
		||||
  private final Cache<String, Set<String>> fqnCache =
 | 
			
		||||
      Caffeine.newBuilder().maximumSize(10000).expireAfterWrite(5, TimeUnit.MINUTES).build();
 | 
			
		||||
 | 
			
		||||
  // Configuration
 | 
			
		||||
  private static final int DEFAULT_LOCK_TIMEOUT_HOURS = 1;
 | 
			
		||||
  private static final int STALE_LOCK_CHECK_INTERVAL_MINUTES = 5;
 | 
			
		||||
 | 
			
		||||
  public HierarchicalLockManager(DeletionLockDAO lockDAO) {
 | 
			
		||||
    this.lockDAO = lockDAO;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Acquire deletion lock for an entity and all its children
 | 
			
		||||
   */
 | 
			
		||||
  @Transaction
 | 
			
		||||
  public DeletionLock acquireDeletionLock(
 | 
			
		||||
      EntityInterface entity, String lockedBy, boolean cascade) {
 | 
			
		||||
    // Check if any parent is already locked
 | 
			
		||||
    checkParentLocks(entity);
 | 
			
		||||
 | 
			
		||||
    // Check if this entity is already locked
 | 
			
		||||
    String entityType = Entity.getEntityTypeFromObject(entity);
 | 
			
		||||
    DeletionLock existingLock = lockDAO.findByEntity(entity.getId(), entityType);
 | 
			
		||||
    if (existingLock != null) {
 | 
			
		||||
      throw new EntityLockedException(
 | 
			
		||||
          String.format(
 | 
			
		||||
              "Entity %s is already locked for deletion by %s at %s",
 | 
			
		||||
              entity.getFullyQualifiedName(),
 | 
			
		||||
              existingLock.getLockedBy(),
 | 
			
		||||
              existingLock.getLockedAt()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Create lock entry
 | 
			
		||||
    DeletionLock lock =
 | 
			
		||||
        DeletionLock.builder()
 | 
			
		||||
            .id(UUID.randomUUID())
 | 
			
		||||
            .entityId(entity.getId())
 | 
			
		||||
            .entityType(entityType)
 | 
			
		||||
            .entityFqn(entity.getFullyQualifiedName())
 | 
			
		||||
            .lockType(DeletionLock.LockType.DELETE_IN_PROGRESS.getValue())
 | 
			
		||||
            .lockedBy(lockedBy)
 | 
			
		||||
            .lockedAt(Instant.now())
 | 
			
		||||
            .deletionScope(
 | 
			
		||||
                cascade
 | 
			
		||||
                    ? DeletionLock.DeletionScope.CASCADE.getValue()
 | 
			
		||||
                    : DeletionLock.DeletionScope.ENTITY_ONLY.getValue())
 | 
			
		||||
            .expectedCompletion(estimateCompletionTime(entity, cascade))
 | 
			
		||||
            .build();
 | 
			
		||||
 | 
			
		||||
    try {
 | 
			
		||||
      lockDAO.insert(lock);
 | 
			
		||||
 | 
			
		||||
      // Update cache
 | 
			
		||||
      invalidateFqnCache(entity.getFullyQualifiedName());
 | 
			
		||||
 | 
			
		||||
      LOG.info(
 | 
			
		||||
          "Acquired deletion lock for entity {} by {}", entity.getFullyQualifiedName(), lockedBy);
 | 
			
		||||
      return lock;
 | 
			
		||||
 | 
			
		||||
    } catch (Exception e) {
 | 
			
		||||
      LOG.error(
 | 
			
		||||
          "Failed to acquire deletion lock for entity {}: {}",
 | 
			
		||||
          entity.getFullyQualifiedName(),
 | 
			
		||||
          e.getMessage());
 | 
			
		||||
      throw new RuntimeException("Failed to acquire deletion lock", e);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Check if any parent entity has an active deletion lock
 | 
			
		||||
   */
 | 
			
		||||
  private void checkParentLocks(EntityInterface entity) {
 | 
			
		||||
    String fqn = entity.getFullyQualifiedName();
 | 
			
		||||
    List<String> parentFqns = getParentFqns(fqn);
 | 
			
		||||
 | 
			
		||||
    for (String parentFqn : parentFqns) {
 | 
			
		||||
      List<DeletionLock> locks = lockDAO.findByFqnPrefix(parentFqn);
 | 
			
		||||
      if (!locks.isEmpty()) {
 | 
			
		||||
        DeletionLock activeLock = locks.get(0);
 | 
			
		||||
        throw new EntityLockedException(
 | 
			
		||||
            String.format(
 | 
			
		||||
                "Cannot modify entity %s. Parent entity %s is being deleted. "
 | 
			
		||||
                    + "Deletion started at %s, expected completion: %s",
 | 
			
		||||
                fqn,
 | 
			
		||||
                activeLock.getEntityFqn(),
 | 
			
		||||
                activeLock.getLockedAt(),
 | 
			
		||||
                activeLock.getExpectedCompletion()));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Check if entity creation/update is allowed (no parent deletion in progress)
 | 
			
		||||
   */
 | 
			
		||||
  public void checkModificationAllowed(EntityInterface entity) {
 | 
			
		||||
    String fqn = entity.getFullyQualifiedName();
 | 
			
		||||
 | 
			
		||||
    // Fast path: check cache first
 | 
			
		||||
    if (isFqnLocked(fqn)) {
 | 
			
		||||
      // Slow path: check database for details
 | 
			
		||||
      checkParentLocks(entity);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Check if entity creation/update is allowed by FQN
 | 
			
		||||
   */
 | 
			
		||||
  public void checkModificationAllowedByFqn(String fqn) {
 | 
			
		||||
    // Check if any parent FQN has a lock
 | 
			
		||||
    List<DeletionLock> parentLocks = lockDAO.findParentLocks(fqn);
 | 
			
		||||
    if (!parentLocks.isEmpty()) {
 | 
			
		||||
      DeletionLock lock = parentLocks.get(0);
 | 
			
		||||
      throw new EntityLockedException(
 | 
			
		||||
          String.format(
 | 
			
		||||
              "Cannot create/modify entity under %s. Parent %s is being deleted. "
 | 
			
		||||
                  + "Started at %s, expected completion: %s",
 | 
			
		||||
              fqn, lock.getEntityFqn(), lock.getLockedAt(), lock.getExpectedCompletion()));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Release deletion lock
 | 
			
		||||
   */
 | 
			
		||||
  @Transaction
 | 
			
		||||
  public void releaseDeletionLock(UUID entityId, String entityType) {
 | 
			
		||||
    DeletionLock lock = lockDAO.findByEntity(entityId, entityType);
 | 
			
		||||
    if (lock != null) {
 | 
			
		||||
      lockDAO.delete(lock.getId());
 | 
			
		||||
      invalidateFqnCache(lock.getEntityFqn());
 | 
			
		||||
      LOG.info(
 | 
			
		||||
          "Released deletion lock for entity {} (was locked by {})",
 | 
			
		||||
          lock.getEntityFqn(),
 | 
			
		||||
          lock.getLockedBy());
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Get deletion lock status for an entity
 | 
			
		||||
   */
 | 
			
		||||
  public DeletionLock getLock(UUID entityId, String entityType) {
 | 
			
		||||
    return lockDAO.findByEntity(entityId, entityType);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Get all active deletion locks
 | 
			
		||||
   */
 | 
			
		||||
  public List<DeletionLock> getAllActiveLocks() {
 | 
			
		||||
    // This would need a new DAO method to get all locks
 | 
			
		||||
    return Collections.emptyList(); // Placeholder
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Clean up stale locks (for crashed deletions)
 | 
			
		||||
   * This method should be called periodically by a background job
 | 
			
		||||
   */
 | 
			
		||||
  public void cleanupStaleLocks() {
 | 
			
		||||
    Instant staleTime = Instant.now().minus(DEFAULT_LOCK_TIMEOUT_HOURS, ChronoUnit.HOURS);
 | 
			
		||||
    List<DeletionLock> staleLocks = lockDAO.findStaleLocks(staleTime);
 | 
			
		||||
 | 
			
		||||
    for (DeletionLock lock : staleLocks) {
 | 
			
		||||
      LOG.warn(
 | 
			
		||||
          "Cleaning up stale deletion lock for entity: {} locked at: {} by: {}",
 | 
			
		||||
          lock.getEntityFqn(),
 | 
			
		||||
          lock.getLockedAt(),
 | 
			
		||||
          lock.getLockedBy());
 | 
			
		||||
      releaseDeletionLock(lock.getEntityId(), lock.getEntityType());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (!staleLocks.isEmpty()) {
 | 
			
		||||
      LOG.info("Cleaned up {} stale deletion locks", staleLocks.size());
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Force cleanup of all stale locks older than specified duration
 | 
			
		||||
   */
 | 
			
		||||
  public int forceCleanupStaleLocks(Duration olderThan) {
 | 
			
		||||
    Instant staleTime = Instant.now().minus(olderThan);
 | 
			
		||||
    int deleted = lockDAO.deleteStaleLocks(staleTime);
 | 
			
		||||
 | 
			
		||||
    if (deleted > 0) {
 | 
			
		||||
      // Clear cache as we don't know which FQNs were affected
 | 
			
		||||
      fqnCache.invalidateAll();
 | 
			
		||||
      LOG.info("Force cleaned up {} stale deletion locks older than {}", deleted, olderThan);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return deleted;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Get lock statistics
 | 
			
		||||
   */
 | 
			
		||||
  public Map<String, Object> getLockStatistics() {
 | 
			
		||||
    Map<String, Object> stats = new HashMap<>();
 | 
			
		||||
    stats.put("totalActiveLocks", lockDAO.countActiveLocks());
 | 
			
		||||
    stats.put("cacheSize", fqnCache.estimatedSize());
 | 
			
		||||
 | 
			
		||||
    // Add per-entity-type counts if needed
 | 
			
		||||
    for (String entityType : Entity.getEntityList()) {
 | 
			
		||||
      int count = lockDAO.countActiveLocksByType(entityType);
 | 
			
		||||
      if (count > 0) {
 | 
			
		||||
        stats.put(entityType + "Locks", count);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return stats;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // Helper methods
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Extract parent FQNs from a given FQN
 | 
			
		||||
   * For example: "service.database.schema.table" returns:
 | 
			
		||||
   * ["service", "service.database", "service.database.schema"]
 | 
			
		||||
   */
 | 
			
		||||
  private List<String> getParentFqns(String fqn) {
 | 
			
		||||
    List<String> parents = new ArrayList<>();
 | 
			
		||||
    String[] parts = fqn.split("\\.");
 | 
			
		||||
 | 
			
		||||
    StringBuilder current = new StringBuilder();
 | 
			
		||||
    for (int i = 0; i < parts.length - 1; i++) {
 | 
			
		||||
      if (i > 0) {
 | 
			
		||||
        current.append(".");
 | 
			
		||||
      }
 | 
			
		||||
      current.append(parts[i]);
 | 
			
		||||
      parents.add(current.toString());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return parents;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Check if an FQN is locked using cache
 | 
			
		||||
   */
 | 
			
		||||
  private boolean isFqnLocked(String fqn) {
 | 
			
		||||
    Set<String> lockedPrefixes = fqnCache.get("all", k -> loadLockedFqnPrefixes());
 | 
			
		||||
 | 
			
		||||
    // Check if any locked prefix matches this FQN
 | 
			
		||||
    for (String prefix : lockedPrefixes) {
 | 
			
		||||
      if (fqn.startsWith(prefix + ".") || fqn.equals(prefix)) {
 | 
			
		||||
        return true;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return false;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Load all locked FQN prefixes from database
 | 
			
		||||
   */
 | 
			
		||||
  private Set<String> loadLockedFqnPrefixes() {
 | 
			
		||||
    // This would need a DAO method to get all locked FQNs
 | 
			
		||||
    // For now, return empty set
 | 
			
		||||
    return new HashSet<>();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Invalidate FQN cache
 | 
			
		||||
   */
 | 
			
		||||
  private void invalidateFqnCache(String fqn) {
 | 
			
		||||
    fqnCache.invalidate("all");
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Estimate completion time based on entity type and cascade scope
 | 
			
		||||
   */
 | 
			
		||||
  private Instant estimateCompletionTime(EntityInterface entity, boolean cascade) {
 | 
			
		||||
    // Simple estimation - can be made more sophisticated based on entity counts
 | 
			
		||||
    int estimatedMinutes = cascade ? 30 : 5;
 | 
			
		||||
 | 
			
		||||
    // For certain entity types, adjust the estimate
 | 
			
		||||
    String entityType = Entity.getEntityTypeFromObject(entity);
 | 
			
		||||
    if (Entity.DATABASE_SERVICE.equals(entityType)) {
 | 
			
		||||
      estimatedMinutes = cascade ? 120 : 10; // Database services take longer
 | 
			
		||||
    } else if (Entity.DATABASE.equals(entityType)) {
 | 
			
		||||
      estimatedMinutes = cascade ? 60 : 5;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return Instant.now().plus(estimatedMinutes, ChronoUnit.MINUTES);
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@ -1189,6 +1189,13 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
 | 
			
		||||
 | 
			
		||||
  /** At the end of test for an entity, delete the parent container to test recursive delete functionality */
 | 
			
		||||
  private void delete_recursiveTest() throws IOException {
 | 
			
		||||
    // Skip recursive delete test when container reuse is enabled
 | 
			
		||||
    // as entities from previous test runs may still reference the container
 | 
			
		||||
    if (Boolean.parseBoolean(System.getProperty("testcontainers.reuse.enable", "false"))) {
 | 
			
		||||
      LOG.info("Skipping delete_recursiveTest - container reuse is enabled");
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Finally, delete the container that contains the entities created for this test
 | 
			
		||||
    EntityReference container = getContainer();
 | 
			
		||||
    if (container != null) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user