From f30eff4dc56b72442718c74b0f8ac927b2839a64 Mon Sep 17 00:00:00 2001 From: Siddhant <86899184+Siddhanttimeline@users.noreply.github.com> Date: Mon, 9 Jun 2025 19:22:40 +0530 Subject: [PATCH] Fixes #19466: Cleanup Activity Feed In Retention App (#21640) * add activityThreadsRetentionPeriod in the DataRetentionApplication jsons * clean up activity threads of type conversation --- .../bundles/dataRetention/DataRetention.java | 37 +++++++++++++++++++ .../service/jdbi3/CollectionDAO.java | 15 ++++++++ .../service/jdbi3/FeedRepository.java | 17 +++++++++ .../data/app/DataRetentionApplication.json | 3 +- .../internal/dataRetentionConfiguration.json | 9 ++++- .../Applications/DataRetentionApplication.md | 7 ++++ .../internal/dataRetentionConfiguration.ts | 5 +++ .../DataRetentionApplication.json | 9 ++++- 8 files changed, 99 insertions(+), 3 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/dataRetention/DataRetention.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/dataRetention/DataRetention.java index b31c3d2916f..db875621288 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/dataRetention/DataRetention.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/dataRetention/DataRetention.java @@ -5,7 +5,9 @@ import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_S import java.time.Duration; import java.time.Instant; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; @@ -18,8 +20,10 @@ import org.openmetadata.schema.entity.applications.configuration.internal.DataRe import org.openmetadata.schema.system.EntityStats; import org.openmetadata.schema.system.Stats; import org.openmetadata.schema.system.StepStats; +import org.openmetadata.service.Entity; import org.openmetadata.service.apps.AbstractNativeApplication; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.FeedRepository; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.socket.WebSocketManager; import org.openmetadata.service.util.JsonUtils; @@ -37,9 +41,14 @@ public class DataRetention extends AbstractNativeApplication { private AppRunRecord.Status internalStatus = AppRunRecord.Status.COMPLETED; private Map failureDetails = null; + private final FeedRepository feedRepository; + private final CollectionDAO.FeedDAO feedDAO; + public DataRetention(CollectionDAO collectionDAO, SearchRepository searchRepository) { super(collectionDAO, searchRepository); this.eventSubscriptionDAO = collectionDAO.eventSubscriptionDAO(); + this.feedRepository = Entity.getFeedRepository(); + this.feedDAO = Entity.getCollectionDAO().feedDAO(); } @Override @@ -89,6 +98,7 @@ public class DataRetention extends AbstractNativeApplication { entityStats.withAdditionalProperty("successful_sent_change_events", new StepStats()); entityStats.withAdditionalProperty("change_events", new StepStats()); entityStats.withAdditionalProperty("consumers_dlq", new StepStats()); + entityStats.withAdditionalProperty("activity_threads", new StepStats()); retentionStats.setEntityStats(entityStats); } @@ -100,6 +110,33 @@ public class DataRetention extends AbstractNativeApplication { int retentionPeriod = config.getChangeEventRetentionPeriod(); LOG.info("Starting cleanup for change events with retention period: {} days.", retentionPeriod); cleanChangeEvents(retentionPeriod); + + int threadRetentionPeriod = config.getActivityThreadsRetentionPeriod(); + LOG.info( + "Starting cleanup for activity threads with retention period: {} days.", + threadRetentionPeriod); + cleanActivityThreads(threadRetentionPeriod); + } + + @Transaction + private void cleanActivityThreads(int retentionPeriod) { + LOG.info("Initiating activity threads cleanup: Retention = {} days.", retentionPeriod); + long cutoffMillis = getRetentionCutoffMillis(retentionPeriod); + + List threadIdsToDelete = + feedDAO.fetchConversationThreadIdsOlderThan(cutoffMillis, BATCH_SIZE); + + if (threadIdsToDelete.isEmpty()) { + LOG.info( + "No activity threads found older than retention period of {} days, skipping cleanup.", + retentionPeriod); + return; + } + + executeWithStatsTracking( + "activity_threads", () -> feedRepository.deleteThreadsInBatch(threadIdsToDelete)); + + LOG.info("Activity threads cleanup complete."); } @Transaction diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index e3b3a2d2983..207fc05a3c1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -1406,6 +1406,12 @@ public interface CollectionDAO { + "(fromId = :id AND fromEntity = :entity)") void deleteAll(@BindUUID("id") UUID id, @Bind("entity") String entity); + @SqlUpdate( + "DELETE FROM entity_relationship " + + "WHERE (toId IN () AND toEntity = :entity) " + + " OR (fromId IN () AND fromEntity = :entity)") + void deleteAllByThreadIds(@BindList("ids") List ids, @Bind("entity") String entity); + @SqlUpdate("DELETE from entity_relationship WHERE fromId = :id or toId = :id") void deleteAllWithId(@BindUUID("id") UUID id); @@ -1511,6 +1517,9 @@ public interface CollectionDAO { @SqlUpdate("DELETE FROM thread_entity WHERE id = :id") void delete(@BindUUID("id") UUID id); + @SqlUpdate("DELETE FROM thread_entity WHERE id IN ()") + int deleteByIds(@BindList("ids") List ids); + @ConnectionAwareSqlUpdate( value = "UPDATE task_sequence SET id=LAST_INSERT_ID(id+1)", connectionType = MYSQL) @@ -2292,6 +2301,12 @@ public interface CollectionDAO { deleteAllByPrefixInternal(condition, bindMap); } + default void deleteAllByPrefixes(List threadIds) { + for (String threadId : threadIds) { + deleteAllByPrefix(threadId); + } + } + @SqlUpdate("DELETE from field_relationship ") void deleteAllByPrefixInternal( @Define("cond") String cond, @BindMap Map bindings); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java index e698bffcc38..a5770ad6261 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java @@ -59,6 +59,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.json.JSONObject; +import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.feed.CloseTask; import org.openmetadata.schema.api.feed.ResolveTask; @@ -551,6 +552,22 @@ public class FeedRepository { dao.feedDAO().delete(id); } + @Transaction + public int deleteThreadsInBatch(List threadUUIDs) { + if (CommonUtil.nullOrEmpty(threadUUIDs)) return 0; + + List threadIds = threadUUIDs.stream().map(UUID::toString).toList(); + + // Delete all the relationships to other entities + dao.relationshipDAO().deleteAllByThreadIds(threadIds, Entity.THREAD); + + // Delete all the field relationships to other entities + dao.fieldRelationshipDAO().deleteAllByPrefixes(threadIds); + + // Delete the thread and return the count + return dao.feedDAO().deleteByIds(threadIds); + } + public void deleteByAbout(UUID entityId) { List threadIds = listOrEmpty(dao.feedDAO().findByEntityId(entityId.toString())); for (String threadId : threadIds) { diff --git a/openmetadata-service/src/main/resources/json/data/app/DataRetentionApplication.json b/openmetadata-service/src/main/resources/json/data/app/DataRetentionApplication.json index c5e14f24f4e..e554d75b0b0 100644 --- a/openmetadata-service/src/main/resources/json/data/app/DataRetentionApplication.json +++ b/openmetadata-service/src/main/resources/json/data/app/DataRetentionApplication.json @@ -2,7 +2,8 @@ "name": "DataRetentionApplication", "displayName": "Data Retention", "appConfiguration": { - "changeEventRetentionPeriod": 7 + "changeEventRetentionPeriod": 7, + "activityThreadsRetentionPeriod": 60 }, "appSchedule": { "scheduleTimeline": "Custom", diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/dataRetentionConfiguration.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/dataRetentionConfiguration.json index bdb4745559f..e9abce13e10 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/dataRetentionConfiguration.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/dataRetentionConfiguration.json @@ -10,10 +10,17 @@ "type": "integer", "default": 7, "minimum": 1 + }, + "activityThreadsRetentionPeriod": { + "title": "Activity Threads Retention Period (days)", + "description": "Enter the retention period for Activity Threads of type = 'Conversation' records in days (e.g., 30 for one month, 60 for two months).", + "type": "integer", + "default": 60, + "minimum": 0 } }, "required": [ - "changeEventRetentionPeriod" + "changeEventRetentionPeriod", "activityThreadsRetentionPeriod" ], "additionalProperties": false } diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Applications/DataRetentionApplication.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Applications/DataRetentionApplication.md index b40cc133591..a0b82eae959 100644 --- a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Applications/DataRetentionApplication.md +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Applications/DataRetentionApplication.md @@ -7,4 +7,11 @@ $$section Enter the retention period for change event records in days (e.g., 7 for one week, 30 for one month). +$$ + +$$section +### Activity Threads Retention Period (days) $(id="activityThreadsRetentionPeriod") + +Enter the retention period for Activity Threads of type = 'Conversation' records in days (e.g., 30 for one month, 60 for two months). + $$ \ No newline at end of file diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/configuration/internal/dataRetentionConfiguration.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/configuration/internal/dataRetentionConfiguration.ts index 01d766f7b47..e48c99db0b2 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/configuration/internal/dataRetentionConfiguration.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/configuration/internal/dataRetentionConfiguration.ts @@ -11,6 +11,11 @@ * limitations under the License. */ export interface DataRetentionConfigurationClass { + /** + * Enter the retention period for Activity Threads of type = 'Conversation' records in days + * (e.g., 30 for one month, 60 for two months). + */ + activityThreadsRetentionPeriod: number; /** * Enter the retention period for change event records in days (e.g., 7 for one week, 30 for * one month). diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/DataRetentionApplication.json b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/DataRetentionApplication.json index 0d152ee5ba4..a393832da76 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/DataRetentionApplication.json +++ b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/DataRetentionApplication.json @@ -11,8 +11,15 @@ "type": "integer", "default": 7, "minimum": 1 + }, + "activityThreadsRetentionPeriod": { + "title": "Activity Threads Retention Period (days)", + "description": "Enter the retention period for Activity Threads of type = 'Conversation' records in days (e.g., 30 for one month, 60 for two months).", + "type": "integer", + "default": 60, + "minimum": 0 } }, - "required": ["changeEventRetentionPeriod"], + "required": ["changeEventRetentionPeriod", "activityThreadsRetentionPeriod"], "additionalProperties": false }