Fixes #19466: Cleanup Activity Feed In Retention App (#21640)

* add activityThreadsRetentionPeriod in the DataRetentionApplication jsons

* clean up activity threads of type conversation
This commit is contained in:
Siddhant 2025-06-09 19:22:40 +05:30 committed by GitHub
parent 1511871fc1
commit f30eff4dc5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 99 additions and 3 deletions

View File

@ -5,7 +5,9 @@ import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_S
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
@ -18,8 +20,10 @@ import org.openmetadata.schema.entity.applications.configuration.internal.DataRe
import org.openmetadata.schema.system.EntityStats;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.FeedRepository;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
@ -37,9 +41,14 @@ public class DataRetention extends AbstractNativeApplication {
private AppRunRecord.Status internalStatus = AppRunRecord.Status.COMPLETED;
private Map<String, Object> failureDetails = null;
private final FeedRepository feedRepository;
private final CollectionDAO.FeedDAO feedDAO;
public DataRetention(CollectionDAO collectionDAO, SearchRepository searchRepository) {
super(collectionDAO, searchRepository);
this.eventSubscriptionDAO = collectionDAO.eventSubscriptionDAO();
this.feedRepository = Entity.getFeedRepository();
this.feedDAO = Entity.getCollectionDAO().feedDAO();
}
@Override
@ -89,6 +98,7 @@ public class DataRetention extends AbstractNativeApplication {
entityStats.withAdditionalProperty("successful_sent_change_events", new StepStats());
entityStats.withAdditionalProperty("change_events", new StepStats());
entityStats.withAdditionalProperty("consumers_dlq", new StepStats());
entityStats.withAdditionalProperty("activity_threads", new StepStats());
retentionStats.setEntityStats(entityStats);
}
@ -100,6 +110,33 @@ public class DataRetention extends AbstractNativeApplication {
int retentionPeriod = config.getChangeEventRetentionPeriod();
LOG.info("Starting cleanup for change events with retention period: {} days.", retentionPeriod);
cleanChangeEvents(retentionPeriod);
int threadRetentionPeriod = config.getActivityThreadsRetentionPeriod();
LOG.info(
"Starting cleanup for activity threads with retention period: {} days.",
threadRetentionPeriod);
cleanActivityThreads(threadRetentionPeriod);
}
@Transaction
private void cleanActivityThreads(int retentionPeriod) {
LOG.info("Initiating activity threads cleanup: Retention = {} days.", retentionPeriod);
long cutoffMillis = getRetentionCutoffMillis(retentionPeriod);
List<UUID> threadIdsToDelete =
feedDAO.fetchConversationThreadIdsOlderThan(cutoffMillis, BATCH_SIZE);
if (threadIdsToDelete.isEmpty()) {
LOG.info(
"No activity threads found older than retention period of {} days, skipping cleanup.",
retentionPeriod);
return;
}
executeWithStatsTracking(
"activity_threads", () -> feedRepository.deleteThreadsInBatch(threadIdsToDelete));
LOG.info("Activity threads cleanup complete.");
}
@Transaction

View File

@ -1406,6 +1406,12 @@ public interface CollectionDAO {
+ "(fromId = :id AND fromEntity = :entity)")
void deleteAll(@BindUUID("id") UUID id, @Bind("entity") String entity);
@SqlUpdate(
"DELETE FROM entity_relationship "
+ "WHERE (toId IN (<ids>) AND toEntity = :entity) "
+ " OR (fromId IN (<ids>) AND fromEntity = :entity)")
void deleteAllByThreadIds(@BindList("ids") List<String> ids, @Bind("entity") String entity);
@SqlUpdate("DELETE from entity_relationship WHERE fromId = :id or toId = :id")
void deleteAllWithId(@BindUUID("id") UUID id);
@ -1511,6 +1517,9 @@ public interface CollectionDAO {
@SqlUpdate("DELETE FROM thread_entity WHERE id = :id")
void delete(@BindUUID("id") UUID id);
@SqlUpdate("DELETE FROM thread_entity WHERE id IN (<ids>)")
int deleteByIds(@BindList("ids") List<String> ids);
@ConnectionAwareSqlUpdate(
value = "UPDATE task_sequence SET id=LAST_INSERT_ID(id+1)",
connectionType = MYSQL)
@ -2292,6 +2301,12 @@ public interface CollectionDAO {
deleteAllByPrefixInternal(condition, bindMap);
}
default void deleteAllByPrefixes(List<String> threadIds) {
for (String threadId : threadIds) {
deleteAllByPrefix(threadId);
}
}
@SqlUpdate("DELETE from field_relationship <cond>")
void deleteAllByPrefixInternal(
@Define("cond") String cond, @BindMap Map<String, String> bindings);

View File

@ -59,6 +59,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.json.JSONObject;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.feed.CloseTask;
import org.openmetadata.schema.api.feed.ResolveTask;
@ -551,6 +552,22 @@ public class FeedRepository {
dao.feedDAO().delete(id);
}
@Transaction
public int deleteThreadsInBatch(List<UUID> threadUUIDs) {
if (CommonUtil.nullOrEmpty(threadUUIDs)) return 0;
List<String> threadIds = threadUUIDs.stream().map(UUID::toString).toList();
// Delete all the relationships to other entities
dao.relationshipDAO().deleteAllByThreadIds(threadIds, Entity.THREAD);
// Delete all the field relationships to other entities
dao.fieldRelationshipDAO().deleteAllByPrefixes(threadIds);
// Delete the thread and return the count
return dao.feedDAO().deleteByIds(threadIds);
}
public void deleteByAbout(UUID entityId) {
List<String> threadIds = listOrEmpty(dao.feedDAO().findByEntityId(entityId.toString()));
for (String threadId : threadIds) {

View File

@ -2,7 +2,8 @@
"name": "DataRetentionApplication",
"displayName": "Data Retention",
"appConfiguration": {
"changeEventRetentionPeriod": 7
"changeEventRetentionPeriod": 7,
"activityThreadsRetentionPeriod": 60
},
"appSchedule": {
"scheduleTimeline": "Custom",

View File

@ -10,10 +10,17 @@
"type": "integer",
"default": 7,
"minimum": 1
},
"activityThreadsRetentionPeriod": {
"title": "Activity Threads Retention Period (days)",
"description": "Enter the retention period for Activity Threads of type = 'Conversation' records in days (e.g., 30 for one month, 60 for two months).",
"type": "integer",
"default": 60,
"minimum": 0
}
},
"required": [
"changeEventRetentionPeriod"
"changeEventRetentionPeriod", "activityThreadsRetentionPeriod"
],
"additionalProperties": false
}

View File

@ -7,4 +7,11 @@ $$section
Enter the retention period for change event records in days (e.g., 7 for one week, 30 for one month).
$$
$$section
### Activity Threads Retention Period (days) $(id="activityThreadsRetentionPeriod")
Enter the retention period for Activity Threads of type = 'Conversation' records in days (e.g., 30 for one month, 60 for two months).
$$

View File

@ -11,6 +11,11 @@
* limitations under the License.
*/
export interface DataRetentionConfigurationClass {
/**
* Enter the retention period for Activity Threads of type = 'Conversation' records in days
* (e.g., 30 for one month, 60 for two months).
*/
activityThreadsRetentionPeriod: number;
/**
* Enter the retention period for change event records in days (e.g., 7 for one week, 30 for
* one month).

View File

@ -11,8 +11,15 @@
"type": "integer",
"default": 7,
"minimum": 1
},
"activityThreadsRetentionPeriod": {
"title": "Activity Threads Retention Period (days)",
"description": "Enter the retention period for Activity Threads of type = 'Conversation' records in days (e.g., 30 for one month, 60 for two months).",
"type": "integer",
"default": 60,
"minimum": 0
}
},
"required": ["changeEventRetentionPeriod"],
"required": ["changeEventRetentionPeriod", "activityThreadsRetentionPeriod"],
"additionalProperties": false
}