Fix Threads relations (#22432)

* Fix Threads relations

* Fix Post.getFrom

* Fix THREAD Entity

* Update Message

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Mohit Yadav 2025-07-17 20:19:38 +05:30 committed by GitHub
parent 5188673b5d
commit 63814923f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 258 additions and 13 deletions

View File

@ -1840,6 +1840,18 @@ public interface CollectionDAO {
int listCountTasksAssignedBy(
@Bind("username") String username, @Define("condition") String condition);
@SqlQuery(
"SELECT json FROM thread_entity where type = 'Task' LIMIT :limit OFFSET :paginationOffset")
List<String> listTaskThreadWithOffset(
@Bind("limit") int limit, @Bind("paginationOffset") int paginationOffset);
@SqlQuery(
"SELECT json FROM thread_entity where type != 'Task' AND createdAt > :cutoffMillis ORDER BY createdAt LIMIT :limit OFFSET :paginationOffset")
List<String> listOtherConversationThreadWithOffset(
@Bind("cutoffMillis") long cutoffMillis,
@Bind("limit") int limit,
@Bind("paginationOffset") int paginationOffset);
@SqlQuery(
"SELECT json FROM thread_entity <condition> AND "
// Entity for which the thread is about is owned by the user or his teams

View File

@ -298,6 +298,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
protected final ChangeSummarizer<T> changeSummarizer;
public boolean isSupportsOwners() {
return supportsOwners;
}
protected EntityRepository(
String collectionPath,
String entityType,

View File

@ -16,5 +16,7 @@ public class Migration extends MigrationProcessImpl {
public void runDataMigration() {
MigrationUtil migrationUtil = new MigrationUtil(collectionDAO);
migrationUtil.createTestCaseToTestCaseResolutionRelation();
migrationUtil.recreateTaskThreadsRelation();
migrationUtil.recreateOtherThreadRelation();
}
}

View File

@ -16,5 +16,7 @@ public class Migration extends MigrationProcessImpl {
public void runDataMigration() {
MigrationUtil migrationUtil = new MigrationUtil(collectionDAO);
migrationUtil.createTestCaseToTestCaseResolutionRelation();
migrationUtil.recreateTaskThreadsRelation();
migrationUtil.recreateOtherThreadRelation();
}
}

View File

@ -1,20 +1,32 @@
package org.openmetadata.service.migration.utils.v185;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.schema.type.Relationship.ADDRESSED_TO;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.type.TestCaseResolutionStatus;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Post;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.TestCaseRepository;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
@ -131,7 +143,188 @@ public class MigrationUtil {
} catch (Exception e) {
LOG.error("Failed to complete test case to test case resolution status migration", e);
throw new RuntimeException("Migration failed", e);
}
}
private void processThreadsInBatches(
String operationName, String failureMessage, ThreadListSupplier threadListSupplier) {
try {
LOG.info("Recreating {} relation", operationName);
int batchSize = 100;
int offset = 0;
int processedCount = 0;
while (true) {
List<String> taskThreads = threadListSupplier.getThreads(batchSize, offset);
if (taskThreads.isEmpty()) {
break;
}
LOG.info("Processing batch: offset={}, size={}", offset, taskThreads.size());
for (String threadJson : taskThreads) {
try {
Thread thread = JsonUtils.readValue(threadJson, Thread.class);
recreateThreadRelationships(thread);
processedCount++;
} catch (Exception e) {
LOG.error("Error processing thread: {}", e.getMessage());
}
}
LOG.info("Processed {} threads so far", processedCount);
offset += taskThreads.size();
if (taskThreads.size() < batchSize) {
break;
}
}
LOG.info("Migration completed. Processed {} task threads", processedCount);
} catch (Exception e) {
LOG.error("Failed to complete {}", failureMessage, e);
}
}
@FunctionalInterface
private interface ThreadListSupplier {
List<String> getThreads(int batchSize, int offset);
}
public void recreateOtherThreadRelation() {
long cutOffTime = System.currentTimeMillis() - 60L * 24 * 60 * 60 * 1000; // 30 days ago
processThreadsInBatches(
"conversation and announcement threads",
"task threads relation migration",
(batchSize, offset) ->
collectionDAO
.feedDAO()
.listOtherConversationThreadWithOffset(cutOffTime, batchSize, offset));
}
public void recreateTaskThreadsRelation() {
processThreadsInBatches(
"task threads",
"task threads relation migration",
(batchSize, offset) -> collectionDAO.feedDAO().listTaskThreadWithOffset(batchSize, offset));
}
/**
* Recreate thread relationships for CREATED, ADDRESSED_TO, and REPLIED_TO
* @param thread The thread to recreate relationships for
*/
private void recreateThreadRelationships(Thread thread) {
int relationshipsCreated = 0;
try {
// 1. Create CREATED relationship: User -> Thread
if (thread.getCreatedBy() != null) {
try {
String createdByUserId = thread.getCreatedBy();
EntityReference ref = Entity.getEntityReferenceByName(Entity.USER, createdByUserId, ALL);
collectionDAO
.relationshipDAO()
.insert(
ref.getId(),
thread.getId(),
ref.getType(),
Entity.THREAD,
Relationship.CREATED.ordinal(),
null);
relationshipsCreated++;
LOG.debug("Created CREATED relationship for thread {}", thread.getId());
} catch (Exception e) {
LOG.error(
"Failed to create CREATED relationship for thread {}: {}",
thread.getId(),
e.getMessage());
}
}
// 2. Create ADDRESSED_TO relationships: Thread -> User/Team (entity owners)
try {
MessageParser.EntityLink aboutEntityLink =
MessageParser.EntityLink.parse(thread.getAbout());
EntityRepository<? extends EntityInterface> repository =
Entity.getEntityRepository(aboutEntityLink.getEntityType());
List<String> fieldList = new ArrayList<>();
if (repository.isSupportsOwners()) {
fieldList.add("owners");
}
EntityInterface aboutEntity =
Entity.getEntity(
aboutEntityLink, String.join(",", fieldList.toArray(new String[0])), ALL);
List<EntityReference> entityOwners = aboutEntity.getOwners();
if (!nullOrEmpty(entityOwners)) {
for (EntityReference entityOwner : entityOwners) {
collectionDAO
.relationshipDAO()
.insert(
thread.getId(),
entityOwner.getId(),
Entity.THREAD,
entityOwner.getType(),
ADDRESSED_TO.ordinal());
}
}
} catch (Exception ex) {
LOG.debug(
"Recreating relationship for thread {} failed: {}",
thread.getId(),
ex.getMessage(),
ex);
}
// 3. Create REPLIED_TO relationships: User -> Thread (for users who replied)
if (thread.getPosts() != null && !thread.getPosts().isEmpty()) {
Set<UUID> repliedUsers = new HashSet<>();
for (Post post : thread.getPosts()) {
if (post.getFrom() != null) {
try {
String createdByUserId = thread.getCreatedBy();
EntityReference ref =
Entity.getEntityReferenceByName(Entity.USER, createdByUserId, ALL);
// Only create relationship if this user hasn't already replied
if (!repliedUsers.contains(ref.getId())) {
collectionDAO
.relationshipDAO()
.insert(
ref.getId(),
thread.getId(),
Entity.USER,
Entity.THREAD,
Relationship.REPLIED_TO.ordinal(),
null);
repliedUsers.add(ref.getId());
relationshipsCreated++;
LOG.debug(
"Created REPLIED_TO relationship for thread {} from user {}",
thread.getId(),
ref.getId());
}
} catch (Exception e) {
LOG.error(
"Failed to create REPLIED_TO relationship for thread {} from user {}: {}",
thread.getId(),
post.getFrom(),
e.getMessage());
}
}
}
}
if (relationshipsCreated > 0) {
LOG.debug("Created {} relationships for thread {}", relationshipsCreated, thread.getId());
}
} catch (Exception e) {
LOG.error("Error recreating relationships for thread {}: {}", thread.getId(), e.getMessage());
}
}
}

View File

@ -32,6 +32,7 @@ import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository;
import org.openmetadata.service.jdbi3.FeedRepository;
@Slf4j
public class EntityRelationshipCleanup {
@ -40,11 +41,13 @@ public class EntityRelationshipCleanup {
private final Map<String, EntityRepository<?>> entityRepositories = new HashMap<>();
private final Map<String, EntityTimeSeriesRepository<?>> entityTimeSeriesRepositoy =
new HashMap<>();
private final FeedRepository feedRepository;
private final boolean dryRun;
public EntityRelationshipCleanup(CollectionDAO collectionDAO, boolean dryRun) {
this.collectionDAO = collectionDAO;
this.dryRun = dryRun;
this.feedRepository = new FeedRepository();
initializeEntityRepositories();
initializeTimeSeriesRepositories();
}
@ -82,7 +85,7 @@ public class EntityRelationshipCleanup {
EntityRepository<?> repository = Entity.getEntityRepository(entityType);
entityRepositories.put(entityType, repository);
} catch (EntityNotFoundException e) {
LOG.error("No repository found for entity type: {}", entityType);
LOG.debug("No repository found for entity type: {}", entityType);
}
}
}
@ -93,7 +96,7 @@ public class EntityRelationshipCleanup {
EntityTimeSeriesRepository<?> repository = Entity.getEntityTimeSeriesRepository(entityType);
entityTimeSeriesRepositoy.put(entityType, repository);
} catch (EntityNotFoundException e) {
LOG.error("No repository found for entity type: {}", entityType);
LOG.debug("No repository found for entity type: {}", entityType);
}
}
}
@ -256,13 +259,24 @@ public class EntityRelationshipCleanup {
private boolean doEntityHaveAnyRepository(String entityType) {
return entityRepositories.containsKey(entityType)
|| entityTimeSeriesRepositoy.containsKey(entityType);
|| entityTimeSeriesRepositoy.containsKey(entityType)
|| entityType.equals(Entity.THREAD);
}
private boolean entityExists(UUID entityId, String entityType) {
boolean existsInEntityRepo = checkInEntityRepository(entityId, entityType);
boolean existsInTimeSeriesRepo = checkInEntityTimeSeriesRepository(entityId, entityType);
return existsInEntityRepo || existsInTimeSeriesRepo;
if (entityRepositories.get(entityType) != null) {
return checkInEntityRepository(entityId, entityType);
}
if (entityTimeSeriesRepositoy.get(entityType) != null) {
return checkInEntityTimeSeriesRepository(entityId, entityType);
}
if (entityType.equals(Entity.THREAD)) {
return checkInFeedRepository(entityId);
}
return true;
}
private boolean checkInEntityRepository(UUID entityId, String entityType) {
@ -272,17 +286,35 @@ public class EntityRelationshipCleanup {
return true;
} catch (EntityNotFoundException e) {
LOG.debug("Entity {}:{} not found in repository: {}", entityType, entityId, e.getMessage());
return false;
} catch (Exception ex) {
LOG.debug("Entity {}:{} encountered exception: {}", entityType, entityId, ex.getMessage());
// If any other exception occurs, we assume the entity is not valid
return true;
}
return false;
}
private boolean checkInEntityTimeSeriesRepository(UUID entityId, String entityType) {
EntityTimeSeriesRepository<?> repository = entityTimeSeriesRepositoy.get(entityType);
if (repository == null) {
LOG.debug("No repository found for entity type: {}", entityType);
return false;
try {
EntityTimeSeriesRepository<?> repository = entityTimeSeriesRepositoy.get(entityType);
return repository.getById(entityId) != null;
} catch (Exception ex) {
LOG.debug("Entity {}:{} encountered exception: {}", entityType, entityId, ex.getMessage());
return true;
}
}
private boolean checkInFeedRepository(UUID entityId) {
try {
return feedRepository.get(entityId) != null;
} catch (EntityNotFoundException e) {
LOG.debug(
"Entity {}:{} not found in repository: {}", Entity.THREAD, entityId, e.getMessage());
return false;
} catch (Exception ex) {
LOG.debug("Entity {}:{} encountered exception: {}", Entity.THREAD, entityId, ex.getMessage());
return true;
}
return repository.getById(entityId) != null;
}
/**