Fix Entity Relationship Cleanup to not deleted entities without repo… (#22400)

* - Fix Entity Relationship Cleanup to not delted entities without repositories

* - Add Migration from Test Case to Test Case Resolution

* - Typo should be resolution id

* Add Tests and Fix Condition

* Update BatchSize
This commit is contained in:
Mohit Yadav 2025-07-17 02:56:38 +05:30 committed by GitHub
parent 4f3385bf74
commit 060df56aaf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 425 additions and 11 deletions

View File

@ -6026,6 +6026,13 @@ public interface CollectionDAO {
+ "WHERE stateId = :stateId ORDER BY timestamp DESC")
List<String> listTestCaseResolutionStatusesForStateId(@Bind("stateId") String stateId);
@SqlQuery(
value =
"SELECT json FROM test_case_resolution_status_time_series "
+ "WHERE entityFQNHash = :entityFQNHash ORDER BY timestamp DESC")
List<String> listTestCaseResolutionForEntityFQNHash(
@BindFQN("entityFQNHash") String entityFqnHas);
@SqlQuery(
value =
"SELECT json FROM test_case_resolution_status_time_series "

View File

@ -0,0 +1,20 @@
package org.openmetadata.service.migration.mysql.v185;
import lombok.SneakyThrows;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
import org.openmetadata.service.migration.utils.v185.MigrationUtil;
public class Migration extends MigrationProcessImpl {
public Migration(MigrationFile migrationFile) {
super(migrationFile);
}
@Override
@SneakyThrows
public void runDataMigration() {
MigrationUtil migrationUtil = new MigrationUtil(collectionDAO);
migrationUtil.createTestCaseToTestCaseResolutionRelation();
}
}

View File

@ -0,0 +1,20 @@
package org.openmetadata.service.migration.postgres.v185;
import lombok.SneakyThrows;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
import org.openmetadata.service.migration.utils.v185.MigrationUtil;
public class Migration extends MigrationProcessImpl {
public Migration(MigrationFile migrationFile) {
super(migrationFile);
}
@Override
@SneakyThrows
public void runDataMigration() {
MigrationUtil migrationUtil = new MigrationUtil(collectionDAO);
migrationUtil.createTestCaseToTestCaseResolutionRelation();
}
}

View File

@ -0,0 +1,137 @@
package org.openmetadata.service.migration.utils.v185;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.type.TestCaseResolutionStatus;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityDAO;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.TestCaseRepository;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
@Slf4j
public class MigrationUtil {
private final CollectionDAO collectionDAO;
public MigrationUtil(CollectionDAO collectionDAO) {
this.collectionDAO = collectionDAO;
}
public void createTestCaseToTestCaseResolutionRelation() {
LOG.info(
"Starting migration to recreate test case to test case resolution status relationships");
try {
TestCaseRepository testCaseRepository =
(TestCaseRepository) Entity.getEntityRepository(Entity.TEST_CASE);
EntityDAO<TestCase> testCaseDAO = testCaseRepository.getDao();
// Process test cases in batches
int batchSize = 100;
int relationshipsCreated = 0;
int processedCount = 0;
int offset = 0;
String cursor = RestUtil.encodeCursor("0");
int total = testCaseDAO.listCount(new ListFilter(Include.ALL));
LOG.info("Processing TestCases: total={}", total);
while (offset < total) {
ResultList<TestCase> testCases =
testCaseRepository.listWithOffset(
testCaseDAO::listAfter,
testCaseDAO::listCount,
new ListFilter(Include.ALL),
batchSize,
cursor,
true,
EntityUtil.Fields.EMPTY_FIELDS,
null);
LOG.info(
"Processing batch: offset={}, size={}",
RestUtil.decodeCursor(cursor),
testCases.getData().size());
for (TestCase testCase : testCases.getData()) {
processedCount++;
try {
// Check if this test case has any test case resolution status records
List<TestCaseResolutionStatus> resolutionStatuses =
JsonUtils.readObjects(
collectionDAO
.testCaseResolutionStatusTimeSeriesDao()
.listTestCaseResolutionForEntityFQNHash(testCase.getFullyQualifiedName()),
TestCaseResolutionStatus.class);
if (!resolutionStatuses.isEmpty()) {
// Group by stateId to get unique resolution status records
Set<UUID> uniqueResolutionId =
resolutionStatuses.stream()
.map(TestCaseResolutionStatus::getId)
.collect(Collectors.toSet());
LOG.debug(
"Test case {} has {} unique resolution status states",
testCase.getFullyQualifiedName(),
uniqueResolutionId.size());
// Create parent-child relationship for each unique state
for (UUID resolutionId : uniqueResolutionId) {
try {
collectionDAO
.relationshipDAO()
.insert(
testCase.getId(),
resolutionId,
Entity.TEST_CASE,
Entity.TEST_CASE_RESOLUTION_STATUS,
Relationship.PARENT_OF.ordinal(),
null);
} catch (Exception e) {
LOG.error(
"Failed to create relationship for test case {} and state {}: {}",
testCase.getFullyQualifiedName(),
resolutionId,
e.getMessage());
}
}
}
} catch (Exception e) {
LOG.error(
"Error processing test case {}: {}",
testCase.getFullyQualifiedName(),
e.getMessage());
}
}
LOG.info(
"Processed {} test cases so far, created {} relationships",
processedCount,
relationshipsCreated);
offset += testCases.getData().size();
cursor = RestUtil.encodeCursor(String.valueOf(offset));
}
LOG.info(
"Migration completed. Processed {} test cases, created {} new relationships",
processedCount,
relationshipsCreated);
} catch (Exception e) {
LOG.error("Failed to complete test case to test case resolution status migration", e);
throw new RuntimeException("Migration failed", e);
}
}
}

View File

@ -31,19 +31,22 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository;
@Slf4j
public class EntityRelationshipCleanup {
private final CollectionDAO collectionDAO;
private final Map<String, EntityRepository<?>> entityRepositories;
private final Map<String, EntityRepository<?>> entityRepositories = new HashMap<>();
private final Map<String, EntityTimeSeriesRepository<?>> entityTimeSeriesRepositoy =
new HashMap<>();
private final boolean dryRun;
public EntityRelationshipCleanup(CollectionDAO collectionDAO, boolean dryRun) {
this.collectionDAO = collectionDAO;
this.dryRun = dryRun;
this.entityRepositories = new HashMap<>();
initializeEntityRepositories();
initializeTimeSeriesRepositories();
}
@Data
@ -84,6 +87,17 @@ public class EntityRelationshipCleanup {
}
}
private void initializeTimeSeriesRepositories() {
for (String entityType : Entity.getEntityList()) {
try {
EntityTimeSeriesRepository<?> repository = Entity.getEntityTimeSeriesRepository(entityType);
entityTimeSeriesRepositoy.put(entityType, repository);
} catch (EntityNotFoundException e) {
LOG.error("No repository found for entity type: {}", entityType);
}
}
}
public EntityCleanupResult performCleanup(int batchSize) {
LOG.info(
"Starting entity relationship cleanup. Dry run: {}, Batch size: {}", dryRun, batchSize);
@ -178,6 +192,23 @@ public class EntityRelationshipCleanup {
UUID toId = UUID.fromString(relationship.getToId());
String fromEntity = relationship.getFromEntity();
String toEntity = relationship.getToEntity();
// Check if fromEntity has any repository
boolean fromEntityHasNoRepository = doEntityHaveAnyRepository(fromEntity);
if (!fromEntityHasNoRepository) {
LOG.error(
"No repository found for from entity type: {}, the entity will not be cleaned",
fromEntity);
return null;
}
boolean toEntityHasNoRepository = doEntityHaveAnyRepository(toEntity);
if (!toEntityHasNoRepository) {
LOG.error(
"No repository found for to entity type: {}, the entity will not be cleaned", toEntity);
return null;
}
boolean fromExists = entityExists(fromId, fromEntity);
boolean toExists = entityExists(toId, toEntity);
@ -223,23 +254,35 @@ public class EntityRelationshipCleanup {
}
}
private boolean doEntityHaveAnyRepository(String entityType) {
return entityRepositories.containsKey(entityType)
|| entityTimeSeriesRepositoy.containsKey(entityType);
}
private boolean entityExists(UUID entityId, String entityType) {
boolean existsInEntityRepo = checkInEntityRepository(entityId, entityType);
boolean existsInTimeSeriesRepo = checkInEntityTimeSeriesRepository(entityId, entityType);
return existsInEntityRepo || existsInTimeSeriesRepo;
}
private boolean checkInEntityRepository(UUID entityId, String entityType) {
try {
EntityRepository<?> repository = entityRepositories.get(entityType);
if (repository == null) {
LOG.debug("No repository found for entity type: {}", entityType);
return false;
}
repository.get(null, entityId, EntityUtil.Fields.EMPTY_FIELDS, ALL, false);
return true;
} catch (EntityNotFoundException e) {
LOG.debug("Entity {}:{} not found", entityType, entityId);
return false;
} catch (Exception e) {
LOG.debug(
"Error checking existence of entity {}:{}: {}", entityType, entityId, e.getMessage());
LOG.debug("Entity {}:{} not found in repository: {}", entityType, entityId, e.getMessage());
}
return false;
}
private boolean checkInEntityTimeSeriesRepository(UUID entityId, String entityType) {
EntityTimeSeriesRepository<?> repository = entityTimeSeriesRepositoy.get(entityType);
if (repository == null) {
LOG.debug("No repository found for entity type: {}", entityType);
return false;
}
return repository.getById(entityId) != null;
}
/**

View File

@ -479,4 +479,191 @@ class EntityRelationshipCleanupTest extends OpenMetadataApplicationTest {
deleteResult.getRelationshipsDeleted() >= 0,
"Delete mode should delete 0 or more relationships");
}
@Test
@Execution(ExecutionMode.CONCURRENT)
void test_entityWithTimeSeriesRepository_shouldNotBeCleanedWhenExists() {
UUID testCaseId = UUID.randomUUID();
UUID testCaseResolutionId = UUID.randomUUID();
UUID tableId = testTables.get(0).getId();
collectionDAO
.relationshipDAO()
.insert(
testCaseId,
tableId,
Entity.TABLE,
Entity.TEST_CASE,
Relationship.CONTAINS.ordinal(),
null);
collectionDAO
.relationshipDAO()
.insert(
testCaseId,
testCaseResolutionId,
Entity.TEST_CASE,
Entity.TEST_CASE_RESULT,
Relationship.PARENT_OF.ordinal(),
null);
EntityRelationshipCleanup cleanup = new EntityRelationshipCleanup(collectionDAO, true);
EntityRelationshipCleanup.EntityCleanupResult result = cleanup.performCleanup(100);
assertNotNull(result);
assertTrue(
result.getOrphanedRelationshipsFound() > 0,
"Should find orphaned relationships for non-existent time series entities");
boolean foundTestCaseOrphan =
result.getOrphanedRelationships().stream()
.anyMatch(
orphan ->
testCaseId.toString().equals(orphan.getFromId())
|| testCaseId.toString().equals(orphan.getToId()));
boolean foundTestCaseResultOrphan =
result.getOrphanedRelationships().stream()
.anyMatch(
orphan ->
testCaseResolutionId.toString().equals(orphan.getFromId())
|| testCaseResolutionId.toString().equals(orphan.getToId()));
assertTrue(foundTestCaseOrphan, "Should find orphaned relationship for non-existent testCase");
assertTrue(
foundTestCaseResultOrphan,
"Should find orphaned relationship for non-existent testCaseResult");
}
@Test
@Execution(ExecutionMode.CONCURRENT)
void test_entityWithoutAnyRepository_shouldNotBeCleanedUpEvenIfRelationshipExists() {
UUID nonExistentId1 = UUID.randomUUID();
UUID nonExistentId2 = UUID.randomUUID();
UUID tableId = testTables.get(0).getId();
collectionDAO
.relationshipDAO()
.insert(
nonExistentId1,
tableId,
"nonExistentEntityType",
Entity.TABLE,
Relationship.CONTAINS.ordinal(),
null);
collectionDAO
.relationshipDAO()
.insert(
tableId,
nonExistentId2,
Entity.TABLE,
"anotherNonExistentEntityType",
Relationship.CONTAINS.ordinal(),
null);
EntityRelationshipCleanup cleanup = new EntityRelationshipCleanup(collectionDAO, true);
EntityRelationshipCleanup.EntityCleanupResult result = cleanup.performCleanup(100);
assertNotNull(result);
boolean foundNonExistentFromEntity =
result.getOrphanedRelationships().stream()
.anyMatch(orphan -> nonExistentId1.toString().equals(orphan.getFromId()));
boolean foundNonExistentToEntity =
result.getOrphanedRelationships().stream()
.anyMatch(orphan -> nonExistentId2.toString().equals(orphan.getToId()));
assertFalse(
foundNonExistentFromEntity,
"Should NOT find orphaned relationship for entity without repository (from)");
assertFalse(
foundNonExistentToEntity,
"Should NOT find orphaned relationship for entity without repository (to)");
}
@Test
@Execution(ExecutionMode.CONCURRENT)
void test_mixedEntityTypes_onlyValidRepositoryEntitiesAreProcessed() {
UUID testCaseId = UUID.randomUUID();
UUID queryCostId = UUID.randomUUID();
UUID workflowInstanceId = UUID.randomUUID();
UUID invalidEntityId = UUID.randomUUID();
UUID tableId = testTables.get(0).getId();
collectionDAO
.relationshipDAO()
.insert(
testCaseId,
tableId,
Entity.TEST_CASE,
Entity.TABLE,
Relationship.TESTED_BY.ordinal(),
null);
collectionDAO
.relationshipDAO()
.insert(
queryCostId,
tableId,
Entity.QUERY_COST_RECORD,
Entity.TABLE,
Relationship.RELATED_TO.ordinal(),
null);
collectionDAO
.relationshipDAO()
.insert(
workflowInstanceId,
tableId,
Entity.WORKFLOW_INSTANCE,
Entity.TABLE,
Relationship.HAS.ordinal(),
null);
collectionDAO
.relationshipDAO()
.insert(
invalidEntityId,
tableId,
"invalidEntityType",
Entity.TABLE,
Relationship.CONTAINS.ordinal(),
null);
EntityRelationshipCleanup cleanup = new EntityRelationshipCleanup(collectionDAO, true);
EntityRelationshipCleanup.EntityCleanupResult result = cleanup.performCleanup(100);
assertNotNull(result);
boolean foundTestCaseOrphan =
result.getOrphanedRelationships().stream()
.anyMatch(orphan -> testCaseId.toString().equals(orphan.getFromId()));
boolean foundQueryCostOrphan =
result.getOrphanedRelationships().stream()
.anyMatch(orphan -> queryCostId.toString().equals(orphan.getFromId()));
boolean foundWorkflowInstanceOrphan =
result.getOrphanedRelationships().stream()
.anyMatch(orphan -> workflowInstanceId.toString().equals(orphan.getFromId()));
boolean foundInvalidEntityOrphan =
result.getOrphanedRelationships().stream()
.anyMatch(orphan -> invalidEntityId.toString().equals(orphan.getFromId()));
assertTrue(
foundTestCaseOrphan,
"Should find orphaned relationship for non-existent testCase (time series entity)");
assertTrue(
foundQueryCostOrphan,
"Should find orphaned relationship for non-existent queryCostRecord (time series entity)");
assertTrue(
foundWorkflowInstanceOrphan,
"Should find orphaned relationship for non-existent workflowInstance (time series entity)");
assertFalse(
foundInvalidEntityOrphan,
"Should NOT find orphaned relationship for invalid entity type without repository");
}
}