From 060df56aaf4c8dc974931ecb0859a32fd85f4cd3 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Thu, 17 Jul 2025 02:56:38 +0530 Subject: [PATCH] =?UTF-8?q?Fix=20Entity=20Relationship=20Cleanup=20to=20no?= =?UTF-8?q?t=20deleted=20entities=20without=20repo=E2=80=A6=20(#22400)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * - Fix Entity Relationship Cleanup to not delted entities without repositories * - Add Migration from Test Case to Test Case Resolution * - Typo should be resolution id * Add Tests and Fix Condition * Update BatchSize --- .../native/1.8.5/mysql/schemaChanges.sql | 0 .../native/1.8.5/postgres/schemaChanges.sql | 0 .../service/jdbi3/CollectionDAO.java | 7 + .../migration/mysql/v185/Migration.java | 20 ++ .../migration/postgres/v185/Migration.java | 20 ++ .../migration/utils/v185/MigrationUtil.java | 137 +++++++++++++ .../util/EntityRelationshipCleanup.java | 65 ++++-- .../util/EntityRelationshipCleanupTest.java | 187 ++++++++++++++++++ 8 files changed, 425 insertions(+), 11 deletions(-) create mode 100644 bootstrap/sql/migrations/native/1.8.5/mysql/schemaChanges.sql create mode 100644 bootstrap/sql/migrations/native/1.8.5/postgres/schemaChanges.sql create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v185/Migration.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v185/Migration.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v185/MigrationUtil.java diff --git a/bootstrap/sql/migrations/native/1.8.5/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.8.5/mysql/schemaChanges.sql new file mode 100644 index 00000000000..e69de29bb2d diff --git a/bootstrap/sql/migrations/native/1.8.5/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.8.5/postgres/schemaChanges.sql new file mode 100644 index 00000000000..e69de29bb2d diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 60fd403503e..d36a06c8960 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -6026,6 +6026,13 @@ public interface CollectionDAO { + "WHERE stateId = :stateId ORDER BY timestamp DESC") List listTestCaseResolutionStatusesForStateId(@Bind("stateId") String stateId); + @SqlQuery( + value = + "SELECT json FROM test_case_resolution_status_time_series " + + "WHERE entityFQNHash = :entityFQNHash ORDER BY timestamp DESC") + List listTestCaseResolutionForEntityFQNHash( + @BindFQN("entityFQNHash") String entityFqnHas); + @SqlQuery( value = "SELECT json FROM test_case_resolution_status_time_series " diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v185/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v185/Migration.java new file mode 100644 index 00000000000..7099bcf37bc --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v185/Migration.java @@ -0,0 +1,20 @@ +package org.openmetadata.service.migration.mysql.v185; + +import lombok.SneakyThrows; +import org.openmetadata.service.migration.api.MigrationProcessImpl; +import org.openmetadata.service.migration.utils.MigrationFile; +import org.openmetadata.service.migration.utils.v185.MigrationUtil; + +public class Migration extends MigrationProcessImpl { + + public Migration(MigrationFile migrationFile) { + super(migrationFile); + } + + @Override + @SneakyThrows + public void runDataMigration() { + MigrationUtil migrationUtil = new MigrationUtil(collectionDAO); + migrationUtil.createTestCaseToTestCaseResolutionRelation(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v185/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v185/Migration.java new file mode 100644 index 00000000000..02814793b53 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v185/Migration.java @@ -0,0 +1,20 @@ +package org.openmetadata.service.migration.postgres.v185; + +import lombok.SneakyThrows; +import org.openmetadata.service.migration.api.MigrationProcessImpl; +import org.openmetadata.service.migration.utils.MigrationFile; +import org.openmetadata.service.migration.utils.v185.MigrationUtil; + +public class Migration extends MigrationProcessImpl { + + public Migration(MigrationFile migrationFile) { + super(migrationFile); + } + + @Override + @SneakyThrows + public void runDataMigration() { + MigrationUtil migrationUtil = new MigrationUtil(collectionDAO); + migrationUtil.createTestCaseToTestCaseResolutionRelation(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v185/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v185/MigrationUtil.java new file mode 100644 index 00000000000..7f04071fe6d --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v185/MigrationUtil.java @@ -0,0 +1,137 @@ +package org.openmetadata.service.migration.utils.v185; + +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.tests.TestCase; +import org.openmetadata.schema.tests.type.TestCaseResolutionStatus; +import org.openmetadata.schema.type.Include; +import org.openmetadata.schema.type.Relationship; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.EntityDAO; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.jdbi3.TestCaseRepository; +import org.openmetadata.service.util.EntityUtil; +import org.openmetadata.service.util.RestUtil; +import org.openmetadata.service.util.ResultList; + +@Slf4j +public class MigrationUtil { + + private final CollectionDAO collectionDAO; + + public MigrationUtil(CollectionDAO collectionDAO) { + this.collectionDAO = collectionDAO; + } + + public void createTestCaseToTestCaseResolutionRelation() { + LOG.info( + "Starting migration to recreate test case to test case resolution status relationships"); + + try { + TestCaseRepository testCaseRepository = + (TestCaseRepository) Entity.getEntityRepository(Entity.TEST_CASE); + EntityDAO testCaseDAO = testCaseRepository.getDao(); + // Process test cases in batches + int batchSize = 100; + int relationshipsCreated = 0; + int processedCount = 0; + int offset = 0; + String cursor = RestUtil.encodeCursor("0"); + int total = testCaseDAO.listCount(new ListFilter(Include.ALL)); + LOG.info("Processing TestCases: total={}", total); + + while (offset < total) { + ResultList testCases = + testCaseRepository.listWithOffset( + testCaseDAO::listAfter, + testCaseDAO::listCount, + new ListFilter(Include.ALL), + batchSize, + cursor, + true, + EntityUtil.Fields.EMPTY_FIELDS, + null); + + LOG.info( + "Processing batch: offset={}, size={}", + RestUtil.decodeCursor(cursor), + testCases.getData().size()); + + for (TestCase testCase : testCases.getData()) { + processedCount++; + try { + // Check if this test case has any test case resolution status records + List resolutionStatuses = + JsonUtils.readObjects( + collectionDAO + .testCaseResolutionStatusTimeSeriesDao() + .listTestCaseResolutionForEntityFQNHash(testCase.getFullyQualifiedName()), + TestCaseResolutionStatus.class); + + if (!resolutionStatuses.isEmpty()) { + // Group by stateId to get unique resolution status records + Set uniqueResolutionId = + resolutionStatuses.stream() + .map(TestCaseResolutionStatus::getId) + .collect(Collectors.toSet()); + + LOG.debug( + "Test case {} has {} unique resolution status states", + testCase.getFullyQualifiedName(), + uniqueResolutionId.size()); + + // Create parent-child relationship for each unique state + for (UUID resolutionId : uniqueResolutionId) { + try { + collectionDAO + .relationshipDAO() + .insert( + testCase.getId(), + resolutionId, + Entity.TEST_CASE, + Entity.TEST_CASE_RESOLUTION_STATUS, + Relationship.PARENT_OF.ordinal(), + null); + } catch (Exception e) { + LOG.error( + "Failed to create relationship for test case {} and state {}: {}", + testCase.getFullyQualifiedName(), + resolutionId, + e.getMessage()); + } + } + } + + } catch (Exception e) { + LOG.error( + "Error processing test case {}: {}", + testCase.getFullyQualifiedName(), + e.getMessage()); + } + } + + LOG.info( + "Processed {} test cases so far, created {} relationships", + processedCount, + relationshipsCreated); + + offset += testCases.getData().size(); + cursor = RestUtil.encodeCursor(String.valueOf(offset)); + } + + LOG.info( + "Migration completed. Processed {} test cases, created {} new relationships", + processedCount, + relationshipsCreated); + + } catch (Exception e) { + LOG.error("Failed to complete test case to test case resolution status migration", e); + throw new RuntimeException("Migration failed", e); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityRelationshipCleanup.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityRelationshipCleanup.java index acdc9db1f55..797a712f3fc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityRelationshipCleanup.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityRelationshipCleanup.java @@ -31,19 +31,22 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.EntityRepository; +import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository; @Slf4j public class EntityRelationshipCleanup { private final CollectionDAO collectionDAO; - private final Map> entityRepositories; + private final Map> entityRepositories = new HashMap<>(); + private final Map> entityTimeSeriesRepositoy = + new HashMap<>(); private final boolean dryRun; public EntityRelationshipCleanup(CollectionDAO collectionDAO, boolean dryRun) { this.collectionDAO = collectionDAO; this.dryRun = dryRun; - this.entityRepositories = new HashMap<>(); initializeEntityRepositories(); + initializeTimeSeriesRepositories(); } @Data @@ -84,6 +87,17 @@ public class EntityRelationshipCleanup { } } + private void initializeTimeSeriesRepositories() { + for (String entityType : Entity.getEntityList()) { + try { + EntityTimeSeriesRepository repository = Entity.getEntityTimeSeriesRepository(entityType); + entityTimeSeriesRepositoy.put(entityType, repository); + } catch (EntityNotFoundException e) { + LOG.error("No repository found for entity type: {}", entityType); + } + } + } + public EntityCleanupResult performCleanup(int batchSize) { LOG.info( "Starting entity relationship cleanup. Dry run: {}, Batch size: {}", dryRun, batchSize); @@ -178,6 +192,23 @@ public class EntityRelationshipCleanup { UUID toId = UUID.fromString(relationship.getToId()); String fromEntity = relationship.getFromEntity(); String toEntity = relationship.getToEntity(); + + // Check if fromEntity has any repository + boolean fromEntityHasNoRepository = doEntityHaveAnyRepository(fromEntity); + if (!fromEntityHasNoRepository) { + LOG.error( + "No repository found for from entity type: {}, the entity will not be cleaned", + fromEntity); + return null; + } + + boolean toEntityHasNoRepository = doEntityHaveAnyRepository(toEntity); + if (!toEntityHasNoRepository) { + LOG.error( + "No repository found for to entity type: {}, the entity will not be cleaned", toEntity); + return null; + } + boolean fromExists = entityExists(fromId, fromEntity); boolean toExists = entityExists(toId, toEntity); @@ -223,23 +254,35 @@ public class EntityRelationshipCleanup { } } + private boolean doEntityHaveAnyRepository(String entityType) { + return entityRepositories.containsKey(entityType) + || entityTimeSeriesRepositoy.containsKey(entityType); + } + private boolean entityExists(UUID entityId, String entityType) { + boolean existsInEntityRepo = checkInEntityRepository(entityId, entityType); + boolean existsInTimeSeriesRepo = checkInEntityTimeSeriesRepository(entityId, entityType); + return existsInEntityRepo || existsInTimeSeriesRepo; + } + + private boolean checkInEntityRepository(UUID entityId, String entityType) { try { EntityRepository repository = entityRepositories.get(entityType); - if (repository == null) { - LOG.debug("No repository found for entity type: {}", entityType); - return false; - } repository.get(null, entityId, EntityUtil.Fields.EMPTY_FIELDS, ALL, false); return true; } catch (EntityNotFoundException e) { - LOG.debug("Entity {}:{} not found", entityType, entityId); - return false; - } catch (Exception e) { - LOG.debug( - "Error checking existence of entity {}:{}: {}", entityType, entityId, e.getMessage()); + LOG.debug("Entity {}:{} not found in repository: {}", entityType, entityId, e.getMessage()); + } + return false; + } + + private boolean checkInEntityTimeSeriesRepository(UUID entityId, String entityType) { + EntityTimeSeriesRepository repository = entityTimeSeriesRepositoy.get(entityType); + if (repository == null) { + LOG.debug("No repository found for entity type: {}", entityType); return false; } + return repository.getById(entityId) != null; } /** diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/util/EntityRelationshipCleanupTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/util/EntityRelationshipCleanupTest.java index 57a4679803b..a7754707e7c 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/util/EntityRelationshipCleanupTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/util/EntityRelationshipCleanupTest.java @@ -479,4 +479,191 @@ class EntityRelationshipCleanupTest extends OpenMetadataApplicationTest { deleteResult.getRelationshipsDeleted() >= 0, "Delete mode should delete 0 or more relationships"); } + + @Test + @Execution(ExecutionMode.CONCURRENT) + void test_entityWithTimeSeriesRepository_shouldNotBeCleanedWhenExists() { + UUID testCaseId = UUID.randomUUID(); + UUID testCaseResolutionId = UUID.randomUUID(); + UUID tableId = testTables.get(0).getId(); + + collectionDAO + .relationshipDAO() + .insert( + testCaseId, + tableId, + Entity.TABLE, + Entity.TEST_CASE, + Relationship.CONTAINS.ordinal(), + null); + + collectionDAO + .relationshipDAO() + .insert( + testCaseId, + testCaseResolutionId, + Entity.TEST_CASE, + Entity.TEST_CASE_RESULT, + Relationship.PARENT_OF.ordinal(), + null); + + EntityRelationshipCleanup cleanup = new EntityRelationshipCleanup(collectionDAO, true); + EntityRelationshipCleanup.EntityCleanupResult result = cleanup.performCleanup(100); + + assertNotNull(result); + assertTrue( + result.getOrphanedRelationshipsFound() > 0, + "Should find orphaned relationships for non-existent time series entities"); + + boolean foundTestCaseOrphan = + result.getOrphanedRelationships().stream() + .anyMatch( + orphan -> + testCaseId.toString().equals(orphan.getFromId()) + || testCaseId.toString().equals(orphan.getToId())); + + boolean foundTestCaseResultOrphan = + result.getOrphanedRelationships().stream() + .anyMatch( + orphan -> + testCaseResolutionId.toString().equals(orphan.getFromId()) + || testCaseResolutionId.toString().equals(orphan.getToId())); + + assertTrue(foundTestCaseOrphan, "Should find orphaned relationship for non-existent testCase"); + assertTrue( + foundTestCaseResultOrphan, + "Should find orphaned relationship for non-existent testCaseResult"); + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + void test_entityWithoutAnyRepository_shouldNotBeCleanedUpEvenIfRelationshipExists() { + UUID nonExistentId1 = UUID.randomUUID(); + UUID nonExistentId2 = UUID.randomUUID(); + UUID tableId = testTables.get(0).getId(); + + collectionDAO + .relationshipDAO() + .insert( + nonExistentId1, + tableId, + "nonExistentEntityType", + Entity.TABLE, + Relationship.CONTAINS.ordinal(), + null); + + collectionDAO + .relationshipDAO() + .insert( + tableId, + nonExistentId2, + Entity.TABLE, + "anotherNonExistentEntityType", + Relationship.CONTAINS.ordinal(), + null); + + EntityRelationshipCleanup cleanup = new EntityRelationshipCleanup(collectionDAO, true); + EntityRelationshipCleanup.EntityCleanupResult result = cleanup.performCleanup(100); + + assertNotNull(result); + + boolean foundNonExistentFromEntity = + result.getOrphanedRelationships().stream() + .anyMatch(orphan -> nonExistentId1.toString().equals(orphan.getFromId())); + + boolean foundNonExistentToEntity = + result.getOrphanedRelationships().stream() + .anyMatch(orphan -> nonExistentId2.toString().equals(orphan.getToId())); + + assertFalse( + foundNonExistentFromEntity, + "Should NOT find orphaned relationship for entity without repository (from)"); + assertFalse( + foundNonExistentToEntity, + "Should NOT find orphaned relationship for entity without repository (to)"); + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + void test_mixedEntityTypes_onlyValidRepositoryEntitiesAreProcessed() { + UUID testCaseId = UUID.randomUUID(); + UUID queryCostId = UUID.randomUUID(); + UUID workflowInstanceId = UUID.randomUUID(); + UUID invalidEntityId = UUID.randomUUID(); + UUID tableId = testTables.get(0).getId(); + + collectionDAO + .relationshipDAO() + .insert( + testCaseId, + tableId, + Entity.TEST_CASE, + Entity.TABLE, + Relationship.TESTED_BY.ordinal(), + null); + + collectionDAO + .relationshipDAO() + .insert( + queryCostId, + tableId, + Entity.QUERY_COST_RECORD, + Entity.TABLE, + Relationship.RELATED_TO.ordinal(), + null); + + collectionDAO + .relationshipDAO() + .insert( + workflowInstanceId, + tableId, + Entity.WORKFLOW_INSTANCE, + Entity.TABLE, + Relationship.HAS.ordinal(), + null); + + collectionDAO + .relationshipDAO() + .insert( + invalidEntityId, + tableId, + "invalidEntityType", + Entity.TABLE, + Relationship.CONTAINS.ordinal(), + null); + + EntityRelationshipCleanup cleanup = new EntityRelationshipCleanup(collectionDAO, true); + EntityRelationshipCleanup.EntityCleanupResult result = cleanup.performCleanup(100); + + assertNotNull(result); + + boolean foundTestCaseOrphan = + result.getOrphanedRelationships().stream() + .anyMatch(orphan -> testCaseId.toString().equals(orphan.getFromId())); + + boolean foundQueryCostOrphan = + result.getOrphanedRelationships().stream() + .anyMatch(orphan -> queryCostId.toString().equals(orphan.getFromId())); + + boolean foundWorkflowInstanceOrphan = + result.getOrphanedRelationships().stream() + .anyMatch(orphan -> workflowInstanceId.toString().equals(orphan.getFromId())); + + boolean foundInvalidEntityOrphan = + result.getOrphanedRelationships().stream() + .anyMatch(orphan -> invalidEntityId.toString().equals(orphan.getFromId())); + + assertTrue( + foundTestCaseOrphan, + "Should find orphaned relationship for non-existent testCase (time series entity)"); + assertTrue( + foundQueryCostOrphan, + "Should find orphaned relationship for non-existent queryCostRecord (time series entity)"); + assertTrue( + foundWorkflowInstanceOrphan, + "Should find orphaned relationship for non-existent workflowInstance (time series entity)"); + assertFalse( + foundInvalidEntityOrphan, + "Should NOT find orphaned relationship for invalid entity type without repository"); + } }