Fixes #20289 : Added application run logs for DataRetentionApp (#20581)

* Added application run logs for DataRetentionApp

* Code changes to accommodate change in EntityStats
This commit is contained in:
aji-aju 2025-04-11 15:21:54 +05:30 committed by GitHub
parent 0e69bbd4dc
commit 06953ee266
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 213 additions and 36 deletions

View File

@ -1,24 +1,41 @@
package org.openmetadata.service.apps.bundles.dataRetention;
import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_STATS;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.applications.configuration.internal.DataRetentionConfiguration;
import org.openmetadata.schema.system.EntityStats;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
import org.quartz.JobExecutionContext;
@Slf4j
public class DataRetention extends AbstractNativeApplication {
private static final int BATCH_SIZE = 10_000;
private DataRetentionConfiguration dataRetentionConfiguration;
private final CollectionDAO.EventSubscriptionDAO eventSubscriptionDAO;
private final Stats retentionStats = new Stats();
private JobExecutionContext jobExecutionContext;
private AppRunRecord.Status internalStatus = AppRunRecord.Status.COMPLETED;
private Map<String, Object> failureDetails = null;
public DataRetention(CollectionDAO collectionDAO, SearchRepository searchRepository) {
super(collectionDAO, searchRepository);
@ -37,44 +54,100 @@ public class DataRetention extends AbstractNativeApplication {
@Override
public void startApp(JobExecutionContext jobExecutionContext) {
executeCleanup(dataRetentionConfiguration);
this.jobExecutionContext = jobExecutionContext;
try {
initializeStatsDefaults();
executeCleanup(dataRetentionConfiguration);
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, retentionStats);
updateRecordToDbAndNotify(null);
if (internalStatus == AppRunRecord.Status.ACTIVE_ERROR
|| internalStatus == AppRunRecord.Status.FAILED) {
throw new RuntimeException("Partial failure occurred in DataRetention job");
}
} catch (Exception ex) {
LOG.error("DataRetention job failed.", ex);
internalStatus = AppRunRecord.Status.FAILED;
failureDetails = new HashMap<>();
failureDetails.put("message", ex.getMessage());
failureDetails.put("jobStackTrace", ExceptionUtils.getStackTrace(ex));
updateRecordToDbAndNotify(ex);
}
}
private void initializeStatsDefaults() {
StepStats jobStats =
new StepStats().withTotalRecords(0).withSuccessRecords(0).withFailedRecords(0);
retentionStats.setJobStats(jobStats);
EntityStats entityStats = new EntityStats();
entityStats.withAdditionalProperty("successful_sent_change_events", new StepStats());
entityStats.withAdditionalProperty("change_events", new StepStats());
entityStats.withAdditionalProperty("consumers_dlq", new StepStats());
retentionStats.setEntityStats(entityStats);
}
public void executeCleanup(DataRetentionConfiguration config) {
if (CommonUtil.nullOrEmpty(config)) {
if (config == null) {
LOG.warn("DataRetentionConfiguration is null. Skipping cleanup.");
return;
}
cleanChangeEvents(config.getChangeEventRetentionPeriod());
int retentionPeriod = config.getChangeEventRetentionPeriod();
LOG.info("Starting cleanup for change events with retention period: {} days.", retentionPeriod);
cleanChangeEvents(retentionPeriod);
}
@Transaction
private void cleanChangeEvents(int retentionPeriod) {
LOG.info(
"Initiating change events cleanup: Deleting records with a retention period of {} days.",
retentionPeriod);
LOG.info("Initiating change events cleanup: Retention = {} days.", retentionPeriod);
long cutoffMillis = getRetentionCutoffMillis(retentionPeriod);
int totalDeletedSuccessfulEvents =
batchDelete(
() ->
eventSubscriptionDAO.deleteSuccessfulSentChangeEventsInBatches(
cutoffMillis, BATCH_SIZE));
executeWithStatsTracking(
"successful_sent_change_events",
() ->
eventSubscriptionDAO.deleteSuccessfulSentChangeEventsInBatches(
cutoffMillis, BATCH_SIZE));
int totalDeletedChangeEvents =
batchDelete(
() -> eventSubscriptionDAO.deleteChangeEventsInBatches(cutoffMillis, BATCH_SIZE));
executeWithStatsTracking(
"change_events",
() -> eventSubscriptionDAO.deleteChangeEventsInBatches(cutoffMillis, BATCH_SIZE));
int totalDeletedDlq =
batchDelete(
() -> eventSubscriptionDAO.deleteConsumersDlqInBatches(cutoffMillis, BATCH_SIZE));
executeWithStatsTracking(
"consumers_dlq",
() -> eventSubscriptionDAO.deleteConsumersDlqInBatches(cutoffMillis, BATCH_SIZE));
LOG.info(
"Change events cleanup completed: {} successful_sent_change_events, {} change_events, and {} consumers_dlq records deleted (retention period: {} days).",
totalDeletedSuccessfulEvents,
totalDeletedChangeEvents,
totalDeletedDlq,
retentionPeriod);
LOG.info("Change events cleanup complete.");
}
private void executeWithStatsTracking(String entity, Supplier<Integer> deleteFunction) {
int totalDeleted = 0;
int totalFailed = 0;
while (true) {
try {
int deleted = deleteFunction.get();
totalDeleted += deleted;
if (deleted < BATCH_SIZE) break;
} catch (Exception ex) {
LOG.error("Failed to clean entity: {}", entity, ex);
totalFailed += BATCH_SIZE;
internalStatus = AppRunRecord.Status.ACTIVE_ERROR;
if (failureDetails == null) {
failureDetails = new HashMap<>();
failureDetails.put("message", ex.getMessage());
failureDetails.put("jobStackTrace", ExceptionUtils.getStackTrace(ex));
}
break;
}
}
updateStats(entity, totalDeleted, totalFailed);
}
private long getRetentionCutoffMillis(int retentionPeriodInDays) {
@ -83,18 +156,41 @@ public class DataRetention extends AbstractNativeApplication {
.toEpochMilli();
}
/**
* Runs a batch delete operation in a loop until fewer than BATCH_SIZE records are deleted in a single iteration.
*/
private int batchDelete(Supplier<Integer> deleteFunction) {
var totalDeleted = 0;
while (true) {
var deletedCount = deleteFunction.get();
totalDeleted += deletedCount;
if (deletedCount < BATCH_SIZE) {
break;
}
private synchronized void updateStats(String entity, int successCount, int failureCount) {
StepStats entityStat =
retentionStats
.getEntityStats()
.getAdditionalProperties()
.getOrDefault(entity, new StepStats());
entityStat.setTotalRecords(entityStat.getTotalRecords() + successCount + failureCount);
entityStat.setSuccessRecords(entityStat.getSuccessRecords() + successCount);
entityStat.setFailedRecords(entityStat.getFailedRecords() + failureCount);
retentionStats.getEntityStats().withAdditionalProperty(entity, entityStat);
StepStats jobStats = retentionStats.getJobStats();
jobStats.setTotalRecords(jobStats.getTotalRecords() + successCount + failureCount);
jobStats.setSuccessRecords(jobStats.getSuccessRecords() + successCount);
jobStats.setFailedRecords(jobStats.getFailedRecords() + failureCount);
}
private void updateRecordToDbAndNotify(Exception error) {
AppRunRecord appRecord = getJobRecord(jobExecutionContext);
appRecord.setStatus(internalStatus);
if (failureDetails != null) {
appRecord.setFailureContext(
new FailureContext().withAdditionalProperty("failure", failureDetails));
}
return totalDeleted;
if (WebSocketManager.getInstance() != null) {
WebSocketManager.getInstance()
.broadCastMessageToAll("data_retention_app_channel", JsonUtils.pojoToJson(appRecord));
}
LOG.info("AppRecord before DB save: {}", JsonUtils.pojoToJson(appRecord));
pushAppStatusUpdates(jobExecutionContext, appRecord, true);
LOG.info("Final AppRunRecord update: {}", JsonUtils.pojoToJson(appRecord));
}
}

View File

@ -523,6 +523,87 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
String.format("eventsubscription instance for %s not found", subscriptionName));
}
@Test
void test_data_retention_app_deletes_old_change_events()
throws IOException, InterruptedException {
// Create database service, database, and schema
DatabaseServiceResourceTest databaseServiceResourceTest = new DatabaseServiceResourceTest();
DatabaseService databaseService =
databaseServiceResourceTest.createEntity(
databaseServiceResourceTest
.createRequest("RetentionTestService")
.withServiceType(CreateDatabaseService.DatabaseServiceType.Snowflake),
ADMIN_AUTH_HEADERS);
DatabaseResourceTest databaseResourceTest = new DatabaseResourceTest();
Database database =
databaseResourceTest.createEntity(
databaseResourceTest
.createRequest("retention_test_db")
.withService(databaseService.getFullyQualifiedName()),
ADMIN_AUTH_HEADERS);
DatabaseSchemaResourceTest schemaResourceTest = new DatabaseSchemaResourceTest();
DatabaseSchema schema =
schemaResourceTest.createEntity(
schemaResourceTest
.createRequest("retention_test_schema")
.withDatabase(database.getFullyQualifiedName()),
ADMIN_AUTH_HEADERS);
// Create a new table to work with
TableResourceTest tableResourceTest = new TableResourceTest();
String tableName = "retention_test_table_" + System.currentTimeMillis();
Table table =
tableResourceTest.createEntity(
tableResourceTest
.createRequest(tableName)
.withDatabaseSchema(schema.getFullyQualifiedName()),
ADMIN_AUTH_HEADERS);
// Create some change events by updating the table multiple times
for (int i = 0; i < 5; i++) {
Table updatedTable = JsonUtils.deepCopy(table, Table.class);
updatedTable.setDescription("Updated description " + i);
tableResourceTest.patchEntity(
table.getId(), JsonUtils.pojoToJson(updatedTable), updatedTable, ADMIN_AUTH_HEADERS);
table = updatedTable;
// Add a small delay between updates to ensure they're recorded as separate events
Thread.sleep(100);
}
// Wait a moment for change events to be processed
Thread.sleep(1000);
// Trigger the Data Retention application
postTriggerApp("DataRetentionApplication", ADMIN_AUTH_HEADERS);
// Wait for the app to complete
Thread.sleep(5000);
// Assert the app status is available after trigger
assertAppStatusAvailableAfterTrigger("DataRetentionApplication");
// Assert the app ran with SUCCESS status
assertAppRanAfterTriggerWithStatus("DataRetentionApplication", AppRunRecord.Status.SUCCESS);
// Get the latest run record to check statistics
AppRunRecord latestRun = getLatestAppRun("DataRetentionApplication", ADMIN_AUTH_HEADERS);
Assertions.assertNotNull(latestRun);
// Check whether successContext is not null
Assertions.assertNotNull(latestRun.getSuccessContext());
// Clean up - delete the test entities
tableResourceTest.deleteEntity(table.getId(), true, true, ADMIN_AUTH_HEADERS);
schemaResourceTest.deleteEntity(schema.getId(), true, true, ADMIN_AUTH_HEADERS);
databaseResourceTest.deleteEntity(database.getId(), true, true, ADMIN_AUTH_HEADERS);
databaseServiceResourceTest.deleteEntity(
databaseService.getId(), true, true, ADMIN_AUTH_HEADERS);
}
@Override
public void validateCreatedEntity(
App createdEntity, CreateApp request, Map<String, String> authHeaders)