diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/dataRetention/DataRetention.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/dataRetention/DataRetention.java index 214807bd10b..b31c3d2916f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/dataRetention/DataRetention.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/dataRetention/DataRetention.java @@ -1,24 +1,41 @@ package org.openmetadata.service.apps.bundles.dataRetention; +import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_STATS; + import java.time.Duration; import java.time.Instant; +import java.util.HashMap; +import java.util.Map; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.exception.ExceptionUtils; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.entity.app.App; +import org.openmetadata.schema.entity.app.AppRunRecord; +import org.openmetadata.schema.entity.app.FailureContext; import org.openmetadata.schema.entity.applications.configuration.internal.DataRetentionConfiguration; +import org.openmetadata.schema.system.EntityStats; +import org.openmetadata.schema.system.Stats; +import org.openmetadata.schema.system.StepStats; import org.openmetadata.service.apps.AbstractNativeApplication; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.socket.WebSocketManager; import org.openmetadata.service.util.JsonUtils; import org.quartz.JobExecutionContext; @Slf4j public class DataRetention extends AbstractNativeApplication { private static final int BATCH_SIZE = 10_000; + private DataRetentionConfiguration dataRetentionConfiguration; private final CollectionDAO.EventSubscriptionDAO eventSubscriptionDAO; + private final Stats retentionStats = new Stats(); + private JobExecutionContext jobExecutionContext; + + private AppRunRecord.Status internalStatus = AppRunRecord.Status.COMPLETED; + private Map failureDetails = null; public DataRetention(CollectionDAO collectionDAO, SearchRepository searchRepository) { super(collectionDAO, searchRepository); @@ -37,44 +54,100 @@ public class DataRetention extends AbstractNativeApplication { @Override public void startApp(JobExecutionContext jobExecutionContext) { - executeCleanup(dataRetentionConfiguration); + this.jobExecutionContext = jobExecutionContext; + + try { + initializeStatsDefaults(); + executeCleanup(dataRetentionConfiguration); + + jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, retentionStats); + updateRecordToDbAndNotify(null); + + if (internalStatus == AppRunRecord.Status.ACTIVE_ERROR + || internalStatus == AppRunRecord.Status.FAILED) { + throw new RuntimeException("Partial failure occurred in DataRetention job"); + } + + } catch (Exception ex) { + LOG.error("DataRetention job failed.", ex); + internalStatus = AppRunRecord.Status.FAILED; + + failureDetails = new HashMap<>(); + failureDetails.put("message", ex.getMessage()); + failureDetails.put("jobStackTrace", ExceptionUtils.getStackTrace(ex)); + + updateRecordToDbAndNotify(ex); + } + } + + private void initializeStatsDefaults() { + StepStats jobStats = + new StepStats().withTotalRecords(0).withSuccessRecords(0).withFailedRecords(0); + retentionStats.setJobStats(jobStats); + + EntityStats entityStats = new EntityStats(); + entityStats.withAdditionalProperty("successful_sent_change_events", new StepStats()); + entityStats.withAdditionalProperty("change_events", new StepStats()); + entityStats.withAdditionalProperty("consumers_dlq", new StepStats()); + retentionStats.setEntityStats(entityStats); } public void executeCleanup(DataRetentionConfiguration config) { - if (CommonUtil.nullOrEmpty(config)) { + if (config == null) { + LOG.warn("DataRetentionConfiguration is null. Skipping cleanup."); return; } - - cleanChangeEvents(config.getChangeEventRetentionPeriod()); + int retentionPeriod = config.getChangeEventRetentionPeriod(); + LOG.info("Starting cleanup for change events with retention period: {} days.", retentionPeriod); + cleanChangeEvents(retentionPeriod); } @Transaction private void cleanChangeEvents(int retentionPeriod) { - LOG.info( - "Initiating change events cleanup: Deleting records with a retention period of {} days.", - retentionPeriod); + LOG.info("Initiating change events cleanup: Retention = {} days.", retentionPeriod); long cutoffMillis = getRetentionCutoffMillis(retentionPeriod); - int totalDeletedSuccessfulEvents = - batchDelete( - () -> - eventSubscriptionDAO.deleteSuccessfulSentChangeEventsInBatches( - cutoffMillis, BATCH_SIZE)); + executeWithStatsTracking( + "successful_sent_change_events", + () -> + eventSubscriptionDAO.deleteSuccessfulSentChangeEventsInBatches( + cutoffMillis, BATCH_SIZE)); - int totalDeletedChangeEvents = - batchDelete( - () -> eventSubscriptionDAO.deleteChangeEventsInBatches(cutoffMillis, BATCH_SIZE)); + executeWithStatsTracking( + "change_events", + () -> eventSubscriptionDAO.deleteChangeEventsInBatches(cutoffMillis, BATCH_SIZE)); - int totalDeletedDlq = - batchDelete( - () -> eventSubscriptionDAO.deleteConsumersDlqInBatches(cutoffMillis, BATCH_SIZE)); + executeWithStatsTracking( + "consumers_dlq", + () -> eventSubscriptionDAO.deleteConsumersDlqInBatches(cutoffMillis, BATCH_SIZE)); - LOG.info( - "Change events cleanup completed: {} successful_sent_change_events, {} change_events, and {} consumers_dlq records deleted (retention period: {} days).", - totalDeletedSuccessfulEvents, - totalDeletedChangeEvents, - totalDeletedDlq, - retentionPeriod); + LOG.info("Change events cleanup complete."); + } + + private void executeWithStatsTracking(String entity, Supplier deleteFunction) { + int totalDeleted = 0; + int totalFailed = 0; + + while (true) { + try { + int deleted = deleteFunction.get(); + totalDeleted += deleted; + if (deleted < BATCH_SIZE) break; + } catch (Exception ex) { + LOG.error("Failed to clean entity: {}", entity, ex); + totalFailed += BATCH_SIZE; + internalStatus = AppRunRecord.Status.ACTIVE_ERROR; + + if (failureDetails == null) { + failureDetails = new HashMap<>(); + failureDetails.put("message", ex.getMessage()); + failureDetails.put("jobStackTrace", ExceptionUtils.getStackTrace(ex)); + } + break; + } + } + + updateStats(entity, totalDeleted, totalFailed); } private long getRetentionCutoffMillis(int retentionPeriodInDays) { @@ -83,18 +156,41 @@ public class DataRetention extends AbstractNativeApplication { .toEpochMilli(); } - /** - * Runs a batch delete operation in a loop until fewer than BATCH_SIZE records are deleted in a single iteration. - */ - private int batchDelete(Supplier deleteFunction) { - var totalDeleted = 0; - while (true) { - var deletedCount = deleteFunction.get(); - totalDeleted += deletedCount; - if (deletedCount < BATCH_SIZE) { - break; - } + private synchronized void updateStats(String entity, int successCount, int failureCount) { + StepStats entityStat = + retentionStats + .getEntityStats() + .getAdditionalProperties() + .getOrDefault(entity, new StepStats()); + + entityStat.setTotalRecords(entityStat.getTotalRecords() + successCount + failureCount); + entityStat.setSuccessRecords(entityStat.getSuccessRecords() + successCount); + entityStat.setFailedRecords(entityStat.getFailedRecords() + failureCount); + + retentionStats.getEntityStats().withAdditionalProperty(entity, entityStat); + + StepStats jobStats = retentionStats.getJobStats(); + jobStats.setTotalRecords(jobStats.getTotalRecords() + successCount + failureCount); + jobStats.setSuccessRecords(jobStats.getSuccessRecords() + successCount); + jobStats.setFailedRecords(jobStats.getFailedRecords() + failureCount); + } + + private void updateRecordToDbAndNotify(Exception error) { + AppRunRecord appRecord = getJobRecord(jobExecutionContext); + appRecord.setStatus(internalStatus); + + if (failureDetails != null) { + appRecord.setFailureContext( + new FailureContext().withAdditionalProperty("failure", failureDetails)); } - return totalDeleted; + + if (WebSocketManager.getInstance() != null) { + WebSocketManager.getInstance() + .broadCastMessageToAll("data_retention_app_channel", JsonUtils.pojoToJson(appRecord)); + } + + LOG.info("AppRecord before DB save: {}", JsonUtils.pojoToJson(appRecord)); + pushAppStatusUpdates(jobExecutionContext, appRecord, true); + LOG.info("Final AppRunRecord update: {}", JsonUtils.pojoToJson(appRecord)); } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java index 7fba402a02a..2c5e3413572 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java @@ -523,6 +523,87 @@ public class AppsResourceTest extends EntityResourceTest { String.format("eventsubscription instance for %s not found", subscriptionName)); } + @Test + void test_data_retention_app_deletes_old_change_events() + throws IOException, InterruptedException { + // Create database service, database, and schema + DatabaseServiceResourceTest databaseServiceResourceTest = new DatabaseServiceResourceTest(); + DatabaseService databaseService = + databaseServiceResourceTest.createEntity( + databaseServiceResourceTest + .createRequest("RetentionTestService") + .withServiceType(CreateDatabaseService.DatabaseServiceType.Snowflake), + ADMIN_AUTH_HEADERS); + + DatabaseResourceTest databaseResourceTest = new DatabaseResourceTest(); + Database database = + databaseResourceTest.createEntity( + databaseResourceTest + .createRequest("retention_test_db") + .withService(databaseService.getFullyQualifiedName()), + ADMIN_AUTH_HEADERS); + + DatabaseSchemaResourceTest schemaResourceTest = new DatabaseSchemaResourceTest(); + DatabaseSchema schema = + schemaResourceTest.createEntity( + schemaResourceTest + .createRequest("retention_test_schema") + .withDatabase(database.getFullyQualifiedName()), + ADMIN_AUTH_HEADERS); + + // Create a new table to work with + TableResourceTest tableResourceTest = new TableResourceTest(); + String tableName = "retention_test_table_" + System.currentTimeMillis(); + + Table table = + tableResourceTest.createEntity( + tableResourceTest + .createRequest(tableName) + .withDatabaseSchema(schema.getFullyQualifiedName()), + ADMIN_AUTH_HEADERS); + + // Create some change events by updating the table multiple times + for (int i = 0; i < 5; i++) { + Table updatedTable = JsonUtils.deepCopy(table, Table.class); + updatedTable.setDescription("Updated description " + i); + tableResourceTest.patchEntity( + table.getId(), JsonUtils.pojoToJson(updatedTable), updatedTable, ADMIN_AUTH_HEADERS); + table = updatedTable; + + // Add a small delay between updates to ensure they're recorded as separate events + Thread.sleep(100); + } + + // Wait a moment for change events to be processed + Thread.sleep(1000); + + // Trigger the Data Retention application + postTriggerApp("DataRetentionApplication", ADMIN_AUTH_HEADERS); + + // Wait for the app to complete + Thread.sleep(5000); + + // Assert the app status is available after trigger + assertAppStatusAvailableAfterTrigger("DataRetentionApplication"); + + // Assert the app ran with SUCCESS status + assertAppRanAfterTriggerWithStatus("DataRetentionApplication", AppRunRecord.Status.SUCCESS); + + // Get the latest run record to check statistics + AppRunRecord latestRun = getLatestAppRun("DataRetentionApplication", ADMIN_AUTH_HEADERS); + Assertions.assertNotNull(latestRun); + + // Check whether successContext is not null + Assertions.assertNotNull(latestRun.getSuccessContext()); + + // Clean up - delete the test entities + tableResourceTest.deleteEntity(table.getId(), true, true, ADMIN_AUTH_HEADERS); + schemaResourceTest.deleteEntity(schema.getId(), true, true, ADMIN_AUTH_HEADERS); + databaseResourceTest.deleteEntity(database.getId(), true, true, ADMIN_AUTH_HEADERS); + databaseServiceResourceTest.deleteEntity( + databaseService.getId(), true, true, ADMIN_AUTH_HEADERS); + } + @Override public void validateCreatedEntity( App createdEntity, CreateApp request, Map authHeaders)