From 63814923f91bbcd9a60f88d3d63f5f88f338ade2 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Thu, 17 Jul 2025 20:19:38 +0530 Subject: [PATCH] Fix Threads relations (#22432) * Fix Threads relations * Fix Post.getFrom * Fix THREAD Entity * Update Message --------- Co-authored-by: Pere Miquel Brull --- .../service/jdbi3/CollectionDAO.java | 12 ++ .../service/jdbi3/EntityRepository.java | 4 + .../migration/mysql/v185/Migration.java | 2 + .../migration/postgres/v185/Migration.java | 2 + .../migration/utils/v185/MigrationUtil.java | 195 +++++++++++++++++- .../util/EntityRelationshipCleanup.java | 56 +++-- 6 files changed, 258 insertions(+), 13 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index d36a06c8960..22b24d3a7ed 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -1840,6 +1840,18 @@ public interface CollectionDAO { int listCountTasksAssignedBy( @Bind("username") String username, @Define("condition") String condition); + @SqlQuery( + "SELECT json FROM thread_entity where type = 'Task' LIMIT :limit OFFSET :paginationOffset") + List listTaskThreadWithOffset( + @Bind("limit") int limit, @Bind("paginationOffset") int paginationOffset); + + @SqlQuery( + "SELECT json FROM thread_entity where type != 'Task' AND createdAt > :cutoffMillis ORDER BY createdAt LIMIT :limit OFFSET :paginationOffset") + List listOtherConversationThreadWithOffset( + @Bind("cutoffMillis") long cutoffMillis, + @Bind("limit") int limit, + @Bind("paginationOffset") int paginationOffset); + @SqlQuery( "SELECT json FROM thread_entity AND " // Entity for which the thread is about is owned by the user or his teams diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index 74b965e7be8..0784e6104e6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -298,6 +298,10 @@ public abstract class EntityRepository { protected final ChangeSummarizer changeSummarizer; + public boolean isSupportsOwners() { + return supportsOwners; + } + protected EntityRepository( String collectionPath, String entityType, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v185/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v185/Migration.java index 7099bcf37bc..18df693ff43 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v185/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v185/Migration.java @@ -16,5 +16,7 @@ public class Migration extends MigrationProcessImpl { public void runDataMigration() { MigrationUtil migrationUtil = new MigrationUtil(collectionDAO); migrationUtil.createTestCaseToTestCaseResolutionRelation(); + migrationUtil.recreateTaskThreadsRelation(); + migrationUtil.recreateOtherThreadRelation(); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v185/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v185/Migration.java index 02814793b53..4e694496785 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v185/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v185/Migration.java @@ -16,5 +16,7 @@ public class Migration extends MigrationProcessImpl { public void runDataMigration() { MigrationUtil migrationUtil = new MigrationUtil(collectionDAO); migrationUtil.createTestCaseToTestCaseResolutionRelation(); + migrationUtil.recreateTaskThreadsRelation(); + migrationUtil.recreateOtherThreadRelation(); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v185/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v185/MigrationUtil.java index 7f04071fe6d..c0bbfd41b72 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v185/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v185/MigrationUtil.java @@ -1,20 +1,32 @@ package org.openmetadata.service.migration.utils.v185; +import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import static org.openmetadata.schema.type.Include.ALL; +import static org.openmetadata.schema.type.Relationship.ADDRESSED_TO; + +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.tests.TestCase; import org.openmetadata.schema.tests.type.TestCaseResolutionStatus; +import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; +import org.openmetadata.schema.type.Post; import org.openmetadata.schema.type.Relationship; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.Entity; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.EntityDAO; +import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.TestCaseRepository; +import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.util.ResultList; @@ -131,7 +143,188 @@ public class MigrationUtil { } catch (Exception e) { LOG.error("Failed to complete test case to test case resolution status migration", e); - throw new RuntimeException("Migration failed", e); + } + } + + private void processThreadsInBatches( + String operationName, String failureMessage, ThreadListSupplier threadListSupplier) { + try { + LOG.info("Recreating {} relation", operationName); + int batchSize = 100; + int offset = 0; + int processedCount = 0; + + while (true) { + List taskThreads = threadListSupplier.getThreads(batchSize, offset); + + if (taskThreads.isEmpty()) { + break; + } + + LOG.info("Processing batch: offset={}, size={}", offset, taskThreads.size()); + + for (String threadJson : taskThreads) { + try { + Thread thread = JsonUtils.readValue(threadJson, Thread.class); + recreateThreadRelationships(thread); + processedCount++; + } catch (Exception e) { + LOG.error("Error processing thread: {}", e.getMessage()); + } + } + + LOG.info("Processed {} threads so far", processedCount); + + offset += taskThreads.size(); + + if (taskThreads.size() < batchSize) { + break; + } + } + + LOG.info("Migration completed. Processed {} task threads", processedCount); + + } catch (Exception e) { + LOG.error("Failed to complete {}", failureMessage, e); + } + } + + @FunctionalInterface + private interface ThreadListSupplier { + List getThreads(int batchSize, int offset); + } + + public void recreateOtherThreadRelation() { + long cutOffTime = System.currentTimeMillis() - 60L * 24 * 60 * 60 * 1000; // 30 days ago + processThreadsInBatches( + "conversation and announcement threads", + "task threads relation migration", + (batchSize, offset) -> + collectionDAO + .feedDAO() + .listOtherConversationThreadWithOffset(cutOffTime, batchSize, offset)); + } + + public void recreateTaskThreadsRelation() { + processThreadsInBatches( + "task threads", + "task threads relation migration", + (batchSize, offset) -> collectionDAO.feedDAO().listTaskThreadWithOffset(batchSize, offset)); + } + + /** + * Recreate thread relationships for CREATED, ADDRESSED_TO, and REPLIED_TO + * @param thread The thread to recreate relationships for + */ + private void recreateThreadRelationships(Thread thread) { + int relationshipsCreated = 0; + + try { + // 1. Create CREATED relationship: User -> Thread + if (thread.getCreatedBy() != null) { + try { + String createdByUserId = thread.getCreatedBy(); + EntityReference ref = Entity.getEntityReferenceByName(Entity.USER, createdByUserId, ALL); + collectionDAO + .relationshipDAO() + .insert( + ref.getId(), + thread.getId(), + ref.getType(), + Entity.THREAD, + Relationship.CREATED.ordinal(), + null); + relationshipsCreated++; + LOG.debug("Created CREATED relationship for thread {}", thread.getId()); + } catch (Exception e) { + LOG.error( + "Failed to create CREATED relationship for thread {}: {}", + thread.getId(), + e.getMessage()); + } + } + + // 2. Create ADDRESSED_TO relationships: Thread -> User/Team (entity owners) + try { + MessageParser.EntityLink aboutEntityLink = + MessageParser.EntityLink.parse(thread.getAbout()); + EntityRepository repository = + Entity.getEntityRepository(aboutEntityLink.getEntityType()); + List fieldList = new ArrayList<>(); + if (repository.isSupportsOwners()) { + fieldList.add("owners"); + } + EntityInterface aboutEntity = + Entity.getEntity( + aboutEntityLink, String.join(",", fieldList.toArray(new String[0])), ALL); + + List entityOwners = aboutEntity.getOwners(); + if (!nullOrEmpty(entityOwners)) { + for (EntityReference entityOwner : entityOwners) { + collectionDAO + .relationshipDAO() + .insert( + thread.getId(), + entityOwner.getId(), + Entity.THREAD, + entityOwner.getType(), + ADDRESSED_TO.ordinal()); + } + } + } catch (Exception ex) { + LOG.debug( + "Recreating relationship for thread {} failed: {}", + thread.getId(), + ex.getMessage(), + ex); + } + + // 3. Create REPLIED_TO relationships: User -> Thread (for users who replied) + if (thread.getPosts() != null && !thread.getPosts().isEmpty()) { + Set repliedUsers = new HashSet<>(); + + for (Post post : thread.getPosts()) { + if (post.getFrom() != null) { + try { + String createdByUserId = thread.getCreatedBy(); + EntityReference ref = + Entity.getEntityReferenceByName(Entity.USER, createdByUserId, ALL); + + // Only create relationship if this user hasn't already replied + if (!repliedUsers.contains(ref.getId())) { + collectionDAO + .relationshipDAO() + .insert( + ref.getId(), + thread.getId(), + Entity.USER, + Entity.THREAD, + Relationship.REPLIED_TO.ordinal(), + null); + repliedUsers.add(ref.getId()); + relationshipsCreated++; + LOG.debug( + "Created REPLIED_TO relationship for thread {} from user {}", + thread.getId(), + ref.getId()); + } + } catch (Exception e) { + LOG.error( + "Failed to create REPLIED_TO relationship for thread {} from user {}: {}", + thread.getId(), + post.getFrom(), + e.getMessage()); + } + } + } + } + + if (relationshipsCreated > 0) { + LOG.debug("Created {} relationships for thread {}", relationshipsCreated, thread.getId()); + } + + } catch (Exception e) { + LOG.error("Error recreating relationships for thread {}: {}", thread.getId(), e.getMessage()); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityRelationshipCleanup.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityRelationshipCleanup.java index 797a712f3fc..574c65554fa 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityRelationshipCleanup.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityRelationshipCleanup.java @@ -32,6 +32,7 @@ import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository; +import org.openmetadata.service.jdbi3.FeedRepository; @Slf4j public class EntityRelationshipCleanup { @@ -40,11 +41,13 @@ public class EntityRelationshipCleanup { private final Map> entityRepositories = new HashMap<>(); private final Map> entityTimeSeriesRepositoy = new HashMap<>(); + private final FeedRepository feedRepository; private final boolean dryRun; public EntityRelationshipCleanup(CollectionDAO collectionDAO, boolean dryRun) { this.collectionDAO = collectionDAO; this.dryRun = dryRun; + this.feedRepository = new FeedRepository(); initializeEntityRepositories(); initializeTimeSeriesRepositories(); } @@ -82,7 +85,7 @@ public class EntityRelationshipCleanup { EntityRepository repository = Entity.getEntityRepository(entityType); entityRepositories.put(entityType, repository); } catch (EntityNotFoundException e) { - LOG.error("No repository found for entity type: {}", entityType); + LOG.debug("No repository found for entity type: {}", entityType); } } } @@ -93,7 +96,7 @@ public class EntityRelationshipCleanup { EntityTimeSeriesRepository repository = Entity.getEntityTimeSeriesRepository(entityType); entityTimeSeriesRepositoy.put(entityType, repository); } catch (EntityNotFoundException e) { - LOG.error("No repository found for entity type: {}", entityType); + LOG.debug("No repository found for entity type: {}", entityType); } } } @@ -256,13 +259,24 @@ public class EntityRelationshipCleanup { private boolean doEntityHaveAnyRepository(String entityType) { return entityRepositories.containsKey(entityType) - || entityTimeSeriesRepositoy.containsKey(entityType); + || entityTimeSeriesRepositoy.containsKey(entityType) + || entityType.equals(Entity.THREAD); } private boolean entityExists(UUID entityId, String entityType) { - boolean existsInEntityRepo = checkInEntityRepository(entityId, entityType); - boolean existsInTimeSeriesRepo = checkInEntityTimeSeriesRepository(entityId, entityType); - return existsInEntityRepo || existsInTimeSeriesRepo; + if (entityRepositories.get(entityType) != null) { + return checkInEntityRepository(entityId, entityType); + } + + if (entityTimeSeriesRepositoy.get(entityType) != null) { + return checkInEntityTimeSeriesRepository(entityId, entityType); + } + + if (entityType.equals(Entity.THREAD)) { + return checkInFeedRepository(entityId); + } + + return true; } private boolean checkInEntityRepository(UUID entityId, String entityType) { @@ -272,17 +286,35 @@ public class EntityRelationshipCleanup { return true; } catch (EntityNotFoundException e) { LOG.debug("Entity {}:{} not found in repository: {}", entityType, entityId, e.getMessage()); + return false; + } catch (Exception ex) { + LOG.debug("Entity {}:{} encountered exception: {}", entityType, entityId, ex.getMessage()); + // If any other exception occurs, we assume the entity is not valid + return true; } - return false; } private boolean checkInEntityTimeSeriesRepository(UUID entityId, String entityType) { - EntityTimeSeriesRepository repository = entityTimeSeriesRepositoy.get(entityType); - if (repository == null) { - LOG.debug("No repository found for entity type: {}", entityType); - return false; + try { + EntityTimeSeriesRepository repository = entityTimeSeriesRepositoy.get(entityType); + return repository.getById(entityId) != null; + } catch (Exception ex) { + LOG.debug("Entity {}:{} encountered exception: {}", entityType, entityId, ex.getMessage()); + return true; + } + } + + private boolean checkInFeedRepository(UUID entityId) { + try { + return feedRepository.get(entityId) != null; + } catch (EntityNotFoundException e) { + LOG.debug( + "Entity {}:{} not found in repository: {}", Entity.THREAD, entityId, e.getMessage()); + return false; + } catch (Exception ex) { + LOG.debug("Entity {}:{} encountered exception: {}", Entity.THREAD, entityId, ex.getMessage()); + return true; } - return repository.getById(entityId) != null; } /**