mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-24 14:08:45 +00:00
* fix: migration * fix: playwright test DBT -> dbt * feat: added rentention for profile and dq data * feat: fix failing tests * feat: address error in postgres delete sql * feat: fixed missing parameter in psql query * fix: added the deletion step in test case * feat: fixed postgres query for deletion before cutoffs
This commit is contained in:
parent
5d92be620a
commit
8cd23b2490
@ -59,3 +59,11 @@ SET json = JSON_SET(
|
||||
0.2
|
||||
)
|
||||
WHERE name = 'tableDiff';
|
||||
|
||||
UPDATE installed_apps
|
||||
SET json = JSON_SET(
|
||||
json,
|
||||
'$.appConfiguration.testCaseResultsRetentionPeriod', 1440,
|
||||
'$.appConfiguration.profileDataRetentionPeriod', 1440
|
||||
)
|
||||
WHERE JSON_EXTRACT(json, '$.name') = 'DataRetentionApplication';
|
||||
@ -56,3 +56,15 @@ SET json = json::jsonb || json_build_object(
|
||||
'version', 0.2
|
||||
)::jsonb
|
||||
WHERE name = 'tableDiff';
|
||||
|
||||
UPDATE installed_apps
|
||||
SET json = jsonb_set(
|
||||
jsonb_set(
|
||||
json,
|
||||
'{appConfiguration, testCaseResultsRetentionPeriod}',
|
||||
'1440'
|
||||
),
|
||||
'{appConfiguration, profileDataRetentionPeriod}',
|
||||
'1440'
|
||||
)
|
||||
WHERE json->>'name' = 'DataRetentionApplication';
|
||||
|
||||
@ -24,6 +24,7 @@ import org.openmetadata.schema.utils.JsonUtils;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.apps.AbstractNativeApplication;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.jdbi3.EntityTimeSeriesDAO;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.socket.WebSocketManager;
|
||||
@ -45,11 +46,16 @@ public class DataRetention extends AbstractNativeApplication {
|
||||
private final FeedRepository feedRepository;
|
||||
private final CollectionDAO.FeedDAO feedDAO;
|
||||
|
||||
private final EntityTimeSeriesDAO testCaseResultsDAO;
|
||||
private final EntityTimeSeriesDAO profileDataDAO;
|
||||
|
||||
public DataRetention(CollectionDAO collectionDAO, SearchRepository searchRepository) {
|
||||
super(collectionDAO, searchRepository);
|
||||
this.eventSubscriptionDAO = collectionDAO.eventSubscriptionDAO();
|
||||
this.feedRepository = Entity.getFeedRepository();
|
||||
this.feedDAO = Entity.getCollectionDAO().feedDAO();
|
||||
this.testCaseResultsDAO = collectionDAO.testCaseResultTimeSeriesDao();
|
||||
this.profileDataDAO = collectionDAO.profilerDataTimeSeriesDao();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -134,6 +140,18 @@ public class DataRetention extends AbstractNativeApplication {
|
||||
"Starting cleanup for activity threads with retention period: {} days.",
|
||||
threadRetentionPeriod);
|
||||
cleanActivityThreads(threadRetentionPeriod);
|
||||
|
||||
int testCaseResultsRetentionPeriod = config.getTestCaseResultsRetentionPeriod();
|
||||
LOG.info(
|
||||
"Starting cleanup for test case results with retention period: {} days.",
|
||||
testCaseResultsRetentionPeriod);
|
||||
cleanTestCaseResults(testCaseResultsRetentionPeriod);
|
||||
|
||||
int profileDataRetentionPeriod = config.getProfileDataRetentionPeriod();
|
||||
LOG.info(
|
||||
"Starting cleanup for profile data with retention period: {} days.",
|
||||
profileDataRetentionPeriod);
|
||||
cleanProfileData(profileDataRetentionPeriod);
|
||||
}
|
||||
|
||||
@Transaction
|
||||
@ -219,6 +237,29 @@ public class DataRetention extends AbstractNativeApplication {
|
||||
}
|
||||
}
|
||||
|
||||
@Transaction
|
||||
private void cleanTestCaseResults(int retentionPeriod) {
|
||||
LOG.info("Initiating test case results cleanup: Retention = {} days.", retentionPeriod);
|
||||
long cutoffMillis = getRetentionCutoffMillis(retentionPeriod);
|
||||
|
||||
executeWithStatsTracking(
|
||||
"test_case_results",
|
||||
() -> testCaseResultsDAO.deleteRecordsBeforeCutOff(cutoffMillis, BATCH_SIZE));
|
||||
|
||||
LOG.info("Test case results cleanup complete.");
|
||||
}
|
||||
|
||||
@Transaction
|
||||
private void cleanProfileData(int retentionPeriod) {
|
||||
LOG.info("Initiating profile data cleanup: Retention = {} days.", retentionPeriod);
|
||||
long cutoffMillis = getRetentionCutoffMillis(retentionPeriod);
|
||||
|
||||
executeWithStatsTracking(
|
||||
"profile_data", () -> profileDataDAO.deleteRecordsBeforeCutOff(cutoffMillis, BATCH_SIZE));
|
||||
|
||||
LOG.info("Profile data cleanup complete.");
|
||||
}
|
||||
|
||||
private void executeWithStatsTracking(String entity, Supplier<Integer> deleteFunction) {
|
||||
int totalDeleted = 0;
|
||||
int totalFailed = 0;
|
||||
|
||||
@ -543,6 +543,27 @@ public interface EntityTimeSeriesDAO {
|
||||
}
|
||||
}
|
||||
|
||||
@ConnectionAwareSqlUpdate(
|
||||
value =
|
||||
"DELETE FROM <table> "
|
||||
+ "WHERE json->>'id' IN ( "
|
||||
+ " SELECT json->>'id' FROM <table> "
|
||||
+ " WHERE timestamp < :cutoffTs ORDER BY timestamp LIMIT :limit "
|
||||
+ ")",
|
||||
connectionType = POSTGRES)
|
||||
@ConnectionAwareSqlUpdate(
|
||||
value =
|
||||
"""
|
||||
DELETE FROM <table> WHERE timestamp < :cutoffTs ORDER BY timestamp LIMIT :limit
|
||||
""",
|
||||
connectionType = MYSQL)
|
||||
int deleteRecordsBeforeCutOff(
|
||||
@Define("table") String table, @Bind("cutoffTs") long cutoffTs, @Bind("limit") int limit);
|
||||
|
||||
default int deleteRecordsBeforeCutOff(long cutoffTs, int limit) {
|
||||
return deleteRecordsBeforeCutOff(getTimeSeriesTableName(), cutoffTs, limit);
|
||||
}
|
||||
|
||||
/** @deprecated */
|
||||
@SqlQuery(
|
||||
"SELECT DISTINCT entityFQN FROM <table> WHERE entityFQNHash = '' or entityFQNHash is null LIMIT :limit")
|
||||
|
||||
@ -3,7 +3,9 @@
|
||||
"displayName": "Data Retention",
|
||||
"appConfiguration": {
|
||||
"changeEventRetentionPeriod": 7,
|
||||
"activityThreadsRetentionPeriod": 60
|
||||
"activityThreadsRetentionPeriod": 60,
|
||||
"profileDataRetentionPeriod": 1440,
|
||||
"testCaseResultsRetentionPeriod": 1440
|
||||
},
|
||||
"appSchedule": {
|
||||
"scheduleTimeline": "Custom",
|
||||
|
||||
@ -4569,6 +4569,56 @@ public class TestCaseResourceTest extends EntityResourceTest<TestCase, CreateTes
|
||||
assertNull(nullValueResult.getFailedRows(), "Failed rows should be null when not set");
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(999)
|
||||
void delete_testCaseResults_verifyDeletionByTimestamp(TestInfo testInfo)
|
||||
throws IOException, ParseException {
|
||||
CreateTestCase create =
|
||||
createRequest(testInfo)
|
||||
.withEntityLink(TABLE_LINK)
|
||||
.withTestDefinition(TEST_DEFINITION4.getFullyQualifiedName())
|
||||
.withParameterValues(
|
||||
List.of(new TestCaseParameterValue().withValue("100").withName("maxValue")));
|
||||
TestCase testCase = createAndCheckEntity(create, ADMIN_AUTH_HEADERS);
|
||||
|
||||
long baseTimestamp = dateToTimestamp("2024-03-01");
|
||||
long dayInMs = 24 * 60 * 60 * 1000L;
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
CreateTestCaseResult createTestCaseResult =
|
||||
new CreateTestCaseResult()
|
||||
.withResult("result " + i)
|
||||
.withTestCaseStatus(TestCaseStatus.Success)
|
||||
.withTimestamp(baseTimestamp + (i * dayInMs));
|
||||
postTestCaseResult(
|
||||
testCase.getFullyQualifiedName(), createTestCaseResult, ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
long cutoffTs = baseTimestamp + (3 * dayInMs);
|
||||
int limit = 10000;
|
||||
|
||||
int deletedCount =
|
||||
org.openmetadata.service.Entity.getCollectionDAO()
|
||||
.testCaseResultTimeSeriesDao()
|
||||
.deleteRecordsBeforeCutOff(cutoffTs, limit);
|
||||
|
||||
ResultList<TestCaseResult> remainingResults =
|
||||
getTestCaseResults(
|
||||
testCase.getFullyQualifiedName(),
|
||||
baseTimestamp,
|
||||
baseTimestamp + (5 * dayInMs),
|
||||
ADMIN_AUTH_HEADERS);
|
||||
|
||||
assertNotNull(remainingResults);
|
||||
|
||||
for (TestCaseResult result : remainingResults.getData()) {
|
||||
long resultTimestamp = result.getTimestamp();
|
||||
assertTrue(
|
||||
resultTimestamp >= cutoffTs,
|
||||
"All remaining test case results should have timestamps >= cutoff");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void test_testCaseFollowerInheritance(TestInfo testInfo)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOf;
|
||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.permissionNotAllowed;
|
||||
import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS;
|
||||
@ -62,6 +63,7 @@ import org.openmetadata.schema.type.TableConstraint.ConstraintType;
|
||||
import org.openmetadata.schema.type.TableProfile;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.utils.ResultList;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.OpenMetadataApplicationTest;
|
||||
import org.openmetadata.service.resources.databases.TableResourceTest;
|
||||
import org.openmetadata.service.util.TestUtils;
|
||||
@ -470,6 +472,50 @@ public class EntityProfileResourceTest extends OpenMetadataApplicationTest {
|
||||
"table instance for nonexistent.table not found");
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(9999)
|
||||
void delete_profileData_verifyDeletionByTimestamp() throws HttpResponseException, ParseException {
|
||||
long baseTimestamp = dateToTimestamp("2024-02-01");
|
||||
long dayInMs = 24 * 60 * 60 * 1000L;
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
ColumnProfile columnProfile =
|
||||
new ColumnProfile().withName("age").withValuesCount(100.0 + i).withNullCount(10.0);
|
||||
|
||||
CreateEntityProfile createProfile =
|
||||
new CreateEntityProfile()
|
||||
.withTimestamp(baseTimestamp + (i * dayInMs))
|
||||
.withProfileData(columnProfile)
|
||||
.withProfileType(CreateEntityProfile.ProfileTypeEnum.COLUMN);
|
||||
|
||||
createEntityProfile(TEST_TABLE.getId(), createProfile, ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
long cutoffTs = baseTimestamp + (3 * dayInMs);
|
||||
int limit = 10000;
|
||||
|
||||
Entity.getCollectionDAO()
|
||||
.profilerDataTimeSeriesDao()
|
||||
.deleteRecordsBeforeCutOff(cutoffTs, limit);
|
||||
|
||||
ResultList<EntityProfile> remainingProfiles =
|
||||
getEntityProfiles(
|
||||
TEST_TABLE.getFullyQualifiedName(),
|
||||
"table",
|
||||
baseTimestamp,
|
||||
baseTimestamp + (5 * dayInMs),
|
||||
CreateEntityProfile.ProfileTypeEnum.COLUMN,
|
||||
ADMIN_AUTH_HEADERS);
|
||||
|
||||
assertNotNull(remainingProfiles);
|
||||
|
||||
for (EntityProfile profile : remainingProfiles.getData()) {
|
||||
long profileTimestamp = profile.getTimestamp();
|
||||
assertTrue(
|
||||
profileTimestamp >= cutoffTs, "All remaining profiles should have timestamps >= cutoff");
|
||||
}
|
||||
}
|
||||
|
||||
private void addSampleProfileData(UUID tableId) throws HttpResponseException, ParseException {
|
||||
// Add table profile
|
||||
TableProfile tableProfile = new TableProfile().withRowCount(1000.0).withColumnCount(3.0);
|
||||
|
||||
@ -17,10 +17,22 @@
|
||||
"type": "integer",
|
||||
"default": 60,
|
||||
"minimum": 0
|
||||
},
|
||||
"testCaseResultsRetentionPeriod": {
|
||||
"title": "Test Case Results Retention Period (days)",
|
||||
"description": "Enter the retention period for Test Case Results in days (e.g., 30 for one month, 60 for two months).",
|
||||
"type": "integer",
|
||||
"default": 1440
|
||||
},
|
||||
"profileDataRetentionPeriod": {
|
||||
"title": "Profile Data Retention Period (days)",
|
||||
"description": "Enter the retention period for Profile Data in days (e.g., 30 for one month, 60 for two months).",
|
||||
"type": "integer",
|
||||
"default": 1440
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"changeEventRetentionPeriod", "activityThreadsRetentionPeriod"
|
||||
"changeEventRetentionPeriod", "activityThreadsRetentionPeriod", "testCaseResultsRetentionPeriod", "profileDataRetentionPeriod"
|
||||
],
|
||||
"additionalProperties": false
|
||||
}
|
||||
|
||||
@ -14,4 +14,18 @@ $$section
|
||||
|
||||
Enter the retention period for Activity Threads of type = 'Conversation' records in days (e.g., 30 for one month, 60 for two months).
|
||||
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Test Case Results Retention Period (days) $(id="testCaseResultsRetentionPeriod")
|
||||
|
||||
Enter the retention period for Test Case Results in days (e.g., 30 for one month, 60 for two months).
|
||||
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Profile Data Retention Period (days) $(id="profileDataRetentionPeriod")
|
||||
|
||||
Enter the retention period for Profile Data in days (e.g., 30 for one month, 60 for two months).
|
||||
|
||||
$$
|
||||
@ -18,8 +18,25 @@
|
||||
"type": "integer",
|
||||
"default": 60,
|
||||
"minimum": 0
|
||||
},
|
||||
"testCaseResultsRetentionPeriod": {
|
||||
"title": "Test Case Results Retention Period (days)",
|
||||
"description": "Enter the retention period for Test Case Results in days (e.g., 30 for one month, 60 for two months).",
|
||||
"type": "integer",
|
||||
"default": 1440
|
||||
},
|
||||
"profileDataRetentionPeriod": {
|
||||
"title": "Profile Data Retention Period (days)",
|
||||
"description": "Enter the retention period for Profile Data in days (e.g., 30 for one month, 60 for two months).",
|
||||
"type": "integer",
|
||||
"default": 1440
|
||||
}
|
||||
},
|
||||
"required": ["changeEventRetentionPeriod", "activityThreadsRetentionPeriod"],
|
||||
"required": [
|
||||
"changeEventRetentionPeriod",
|
||||
"activityThreadsRetentionPeriod",
|
||||
"testCaseResultsRetentionPeriod",
|
||||
"profileDataRetentionPeriod"
|
||||
],
|
||||
"additionalProperties": false
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user