MINOR: Update Data Insights Configurations to enable more fine control management (#19884)

* Update Data Insights Configurations to enable more fine control management

* Update Data Insights Configurations to enable more fine control management

* Hide one configuration from DataInsights App
This commit is contained in:
IceS2 2025-02-26 11:47:18 +01:00 committed by GitHub
parent 7053b53b8c
commit 7e64bfa888
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 790 additions and 53 deletions

View File

@ -37,6 +37,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.core.UriInfo;
import lombok.Getter;
@ -667,4 +668,11 @@ public final class Entity {
}
return entityType;
}
public static Set<String> getEntityTypeInService(String serviceType) {
return ENTITY_SERVICE_TYPE_MAP.entrySet().stream()
.filter(entry -> entry.getValue().equals(serviceType))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}
}

View File

@ -19,8 +19,12 @@ import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.entity.applications.configuration.internal.AppAnalyticsConfig;
import org.openmetadata.schema.entity.applications.configuration.internal.BackfillConfiguration;
import org.openmetadata.schema.entity.applications.configuration.internal.CostAnalysisConfig;
import org.openmetadata.schema.entity.applications.configuration.internal.DataAssetsConfig;
import org.openmetadata.schema.entity.applications.configuration.internal.DataInsightsAppConfig;
import org.openmetadata.schema.entity.applications.configuration.internal.DataQualityConfig;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.IndexingError;
@ -54,6 +58,11 @@ public class DataInsightsApp extends AbstractNativeApplication {
public record Backfill(String startDate, String endDate) {}
private CostAnalysisConfig costAnalysisConfig;
private DataAssetsConfig dataAssetsConfig;
private DataQualityConfig dataQualityConfig;
private AppAnalyticsConfig webAnalyticsConfig;
private Optional<Boolean> recreateDataAssetsIndex;
@Getter private Optional<Backfill> backfill;
@ -143,7 +152,7 @@ public class DataInsightsApp extends AbstractNativeApplication {
deleteIndexInternal(Entity.TEST_CASE_RESOLUTION_STATUS);
}
private void createDataAssetsDataStream() {
private void createOrUpdateDataAssetsDataStream() {
DataInsightsSearchInterface searchInterface = getSearchInterface();
ElasticSearchConfiguration config = searchRepository.getElasticSearchConfiguration();
@ -158,7 +167,13 @@ public class DataInsightsApp extends AbstractNativeApplication {
String dataStreamName = getDataStreamName(dataAssetType);
if (!searchInterface.dataAssetDataStreamExists(dataStreamName)) {
searchInterface.createDataAssetsDataStream(
dataStreamName, dataAssetType, dataAssetIndex, language);
dataStreamName,
dataAssetType,
dataAssetIndex,
language,
dataAssetsConfig.getRetention());
} else {
searchInterface.updateLifecyclePolicy(dataAssetsConfig.getRetention());
}
}
} catch (IOException ex) {
@ -184,11 +199,15 @@ public class DataInsightsApp extends AbstractNativeApplication {
@Override
public void init(App app) {
super.init(app);
createDataAssetsDataStream();
createDataQualityDataIndex();
DataInsightsAppConfig config =
JsonUtils.convertValue(app.getAppConfiguration(), DataInsightsAppConfig.class);
// Get the configuration for the different modules
costAnalysisConfig = config.getModuleConfiguration().getCostAnalysis();
dataAssetsConfig = parseDataAssetsConfig(config.getModuleConfiguration().getDataAssets());
dataQualityConfig = config.getModuleConfiguration().getDataQuality();
webAnalyticsConfig = config.getModuleConfiguration().getAppAnalytics();
// Configure batchSize
batchSize = config.getBatchSize();
@ -207,9 +226,21 @@ public class DataInsightsApp extends AbstractNativeApplication {
new Backfill(backfillConfig.get().getStartDate(), backfillConfig.get().getEndDate()));
}
createOrUpdateDataAssetsDataStream();
createDataQualityDataIndex();
jobData = new EventPublisherJob().withStats(new Stats());
}
private DataAssetsConfig parseDataAssetsConfig(DataAssetsConfig config) {
if (config.getServiceFilter() != null
&& (config.getServiceFilter().getServiceName() == null
|| config.getServiceFilter().getServiceType() == null)) {
return config.withServiceFilter(null);
}
return config;
}
@Override
public void startApp(JobExecutionContext jobExecutionContext) {
try {
@ -228,7 +259,7 @@ public class DataInsightsApp extends AbstractNativeApplication {
if (recreateDataAssetsIndex.isPresent() && recreateDataAssetsIndex.get().equals(true)) {
deleteDataAssetsDataStream();
createDataAssetsDataStream();
createOrUpdateDataAssetsDataStream();
deleteDataQualityDataIndex();
createDataQualityDataIndex();
}
@ -290,7 +321,8 @@ public class DataInsightsApp extends AbstractNativeApplication {
}
private WorkflowStats processWebAnalytics() {
WebAnalyticsWorkflow workflow = new WebAnalyticsWorkflow(timestamp, batchSize, backfill);
WebAnalyticsWorkflow workflow =
new WebAnalyticsWorkflow(webAnalyticsConfig, timestamp, batchSize, backfill);
WorkflowStats workflowStats = workflow.getWorkflowStats();
try {
@ -304,7 +336,8 @@ public class DataInsightsApp extends AbstractNativeApplication {
}
private WorkflowStats processCostAnalysis() {
CostAnalysisWorkflow workflow = new CostAnalysisWorkflow(timestamp, batchSize, backfill);
CostAnalysisWorkflow workflow =
new CostAnalysisWorkflow(costAnalysisConfig, timestamp, batchSize, backfill);
WorkflowStats workflowStats = workflow.getWorkflowStats();
try {
@ -320,6 +353,7 @@ public class DataInsightsApp extends AbstractNativeApplication {
private WorkflowStats processDataAssets() {
DataAssetsWorkflow workflow =
new DataAssetsWorkflow(
dataAssetsConfig,
timestamp,
batchSize,
backfill,
@ -343,7 +377,13 @@ public class DataInsightsApp extends AbstractNativeApplication {
for (String entityType : dataQualityEntities) {
DataQualityWorkflow workflow =
new DataQualityWorkflow(
timestamp, batchSize, backfill, entityType, collectionDAO, searchRepository);
dataQualityConfig,
timestamp,
batchSize,
backfill,
entityType,
collectionDAO,
searchRepository);
try {
workflow.process();

View File

@ -82,10 +82,16 @@ public interface DataInsightsSearchInterface {
}
void createDataAssetsDataStream(
String name, String entityType, IndexMapping entityIndexMapping, String language)
String name,
String entityType,
IndexMapping entityIndexMapping,
String language,
int retentionDays)
throws IOException;
void deleteDataAssetDataStream(String name) throws IOException;
void updateLifecyclePolicy(int retentionDays) throws IOException;
Boolean dataAssetDataStreamExists(String name) throws IOException;
}

View File

@ -0,0 +1,84 @@
package org.openmetadata.service.apps.bundles.insights.search;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import org.openmetadata.service.util.JsonUtils;
@Getter
public class IndexLifecyclePolicyConfig {
private final int retentionDays;
public enum SearchType {
OPENSEARCH("opensearch"),
ELASTICSEARCH("elasticsearch");
private final String value;
SearchType(String value) {
this.value = value;
}
public String toString() {
return value;
}
public static SearchType fromString(String value) {
for (SearchType type : SearchType.values()) {
if (type.value.equalsIgnoreCase(value)) {
return type;
}
}
throw new IllegalArgumentException("Unknown SearchType: " + value);
}
}
public IndexLifecyclePolicyConfig(String policyName, String policyStr, SearchType searchType) {
this.retentionDays = parseRetentionDays(policyName, policyStr, searchType);
}
public IndexLifecyclePolicyConfig(int retentionDays) {
this.retentionDays = retentionDays;
}
private Integer parseRetentionDays(String policyName, String policyStr, SearchType searchType) {
if (searchType.equals(SearchType.ELASTICSEARCH)) {
return parseElasticSearchRetentionDays(policyName, policyStr);
} else {
return parseOpenSearchRetentionDays(policyName, policyStr);
}
}
private Integer parseOpenSearchRetentionDays(String policyName, String policyStr) {
Map<String, Map<String, Object>> policyMap = JsonUtils.readOrConvertValue(policyStr, Map.class);
List<Map<String, Object>> states =
JsonUtils.readOrConvertValue(policyMap.get("policy").get("states"), List.class);
for (Map<String, Object> state : states) {
if (state.get("name").equals("warm")) {
List<Map<String, Object>> transitions =
JsonUtils.readOrConvertValue(state.get("transitions"), List.class);
Map<String, String> conditions =
JsonUtils.readOrConvertValue(transitions.get(0).get("conditions"), Map.class);
return parseRetentionStr(conditions.get("min_index_age"));
}
}
return null;
}
private int parseElasticSearchRetentionDays(String policyName, String policyStr) {
Map<String, Map<String, Map<String, Object>>> policyMap =
JsonUtils.readOrConvertValue(policyStr, Map.class);
Map<String, Object> phasesMap =
JsonUtils.readOrConvertValue(
policyMap.get(policyName).get("policy").get("phases"), Map.class);
Map<String, Object> deletePhase =
JsonUtils.readOrConvertValue(phasesMap.get("delete"), Map.class);
String retentionStr = (String) deletePhase.get("min_age");
return parseRetentionStr(retentionStr);
}
private int parseRetentionStr(String retentionStr) {
return Integer.parseInt(retentionStr.replaceAll("\\D", ""));
}
}

View File

@ -4,11 +4,15 @@ import es.org.elasticsearch.client.Request;
import es.org.elasticsearch.client.Response;
import es.org.elasticsearch.client.RestClient;
import java.io.IOException;
import org.apache.http.util.EntityUtils;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface;
import org.openmetadata.service.apps.bundles.insights.search.IndexLifecyclePolicyConfig;
import org.openmetadata.service.search.models.IndexMapping;
public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterface {
private final RestClient client;
private final String resourcePath = "/dataInsights/elasticsearch";
private final String lifecyclePolicyName = "di-data-assets-lifecycle";
public ElasticSearchDataInsightsClient(RestClient client) {
this.client = client;
@ -54,12 +58,17 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
@Override
public void createDataAssetsDataStream(
String name, String entityType, IndexMapping entityIndexMapping, String language)
String name,
String entityType,
IndexMapping entityIndexMapping,
String language,
int retentionDays)
throws IOException {
String resourcePath = "/dataInsights/elasticsearch";
createLifecyclePolicy(
"di-data-assets-lifecycle",
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)));
lifecyclePolicyName,
buildLifecyclePolicy(
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays));
createComponentTemplate(
"di-data-assets-settings",
readResource(String.format("%s/indexSettingsTemplate.json", resourcePath)));
@ -75,6 +84,33 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
createDataStream(name);
}
private String buildLifecyclePolicy(String lifecyclePolicy, int retentionDays) {
return lifecyclePolicy
.replace("{{retention}}", String.valueOf(retentionDays))
.replace("{{halfRetention}}", String.valueOf(retentionDays / 2));
}
@Override
public void updateLifecyclePolicy(int retentionDays) throws IOException {
String currentLifecyclePolicy =
EntityUtils.toString(
performRequest("GET", String.format("/_ilm/policy/%s", lifecyclePolicyName))
.getEntity());
if (new IndexLifecyclePolicyConfig(
lifecyclePolicyName,
currentLifecyclePolicy,
IndexLifecyclePolicyConfig.SearchType.ELASTICSEARCH)
.getRetentionDays()
!= retentionDays) {
String updatedLifecyclePolicy =
buildLifecyclePolicy(
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays);
performRequest(
"PUT", String.format("/_ilm/policy/%s", lifecyclePolicyName), updatedLifecyclePolicy);
}
}
@Override
public void deleteDataAssetDataStream(String name) throws IOException {
performRequest("DELETE", String.format("/_data_stream/%s", name));

View File

@ -1,7 +1,9 @@
package org.openmetadata.service.apps.bundles.insights.search.opensearch;
import java.io.IOException;
import org.apache.http.util.EntityUtils;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface;
import org.openmetadata.service.apps.bundles.insights.search.IndexLifecyclePolicyConfig;
import org.openmetadata.service.search.models.IndexMapping;
import os.org.opensearch.client.Request;
import os.org.opensearch.client.Response;
@ -10,6 +12,8 @@ import os.org.opensearch.client.RestClient;
public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface {
private final RestClient client;
private final String resourcePath = "/dataInsights/opensearch";
private final String lifecyclePolicyName = "di-data-assets-lifecycle";
public OpenSearchDataInsightsClient(RestClient client) {
this.client = client;
@ -65,12 +69,17 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
@Override
public void createDataAssetsDataStream(
String name, String entityType, IndexMapping entityIndexMapping, String language)
String name,
String entityType,
IndexMapping entityIndexMapping,
String language,
int retentionDays)
throws IOException {
String resourcePath = "/dataInsights/opensearch";
createLifecyclePolicy(
"di-data-assets-lifecycle",
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)));
lifecyclePolicyName,
buildLifecyclePolicy(
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays));
createComponentTemplate(
"di-data-assets-mapping",
buildMapping(
@ -83,6 +92,32 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
createDataStream(name);
}
private String buildLifecyclePolicy(String lifecyclePolicy, int retentionDays) {
return lifecyclePolicy
.replace("{{retention}}", String.valueOf(retentionDays))
.replace("{{halfRetention}}", String.valueOf(retentionDays / 2));
}
@Override
public void updateLifecyclePolicy(int retentionDays) throws IOException {
String currentLifecyclePolicy =
EntityUtils.toString(
performRequest("GET", String.format("/_plugins/_ism/policies/%s", lifecyclePolicyName))
.getEntity());
if (new IndexLifecyclePolicyConfig(
lifecyclePolicyName,
currentLifecyclePolicy,
IndexLifecyclePolicyConfig.SearchType.OPENSEARCH)
.getRetentionDays()
!= retentionDays) {
String updatedLifecyclePolicy =
buildLifecyclePolicy(
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays);
createLifecyclePolicy(lifecyclePolicyName, updatedLifecyclePolicy);
}
}
@Override
public void deleteDataAssetDataStream(String name) throws IOException {
performRequest("DELETE", String.format("_data_stream/%s", name));

View File

@ -16,6 +16,7 @@ import org.openmetadata.schema.analytics.DataAssetMetrics;
import org.openmetadata.schema.analytics.RawCostAnalysisReportData;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.api.services.CreateDatabaseService;
import org.openmetadata.schema.entity.applications.configuration.internal.CostAnalysisConfig;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.system.StepStats;
@ -46,6 +47,7 @@ public class CostAnalysisWorkflow {
@Getter private final Long endTimestamp;
private final int retentionDays = 30;
@Getter private final List<PaginatedEntitiesSource> sources = new ArrayList<>();
private final CostAnalysisConfig costAnalysisConfig;
public record CostAnalysisTableData(
Table table, Optional<LifeCycle> oLifeCycle, Optional<Double> oSize) {}
@ -65,7 +67,10 @@ public class CostAnalysisWorkflow {
@Getter private final WorkflowStats workflowStats = new WorkflowStats("CostAnalysisWorkflow");
public CostAnalysisWorkflow(
Long timestamp, int batchSize, Optional<DataInsightsApp.Backfill> backfill) {
CostAnalysisConfig costAnalysisConfig,
Long timestamp,
int batchSize,
Optional<DataInsightsApp.Backfill> backfill) {
this.endTimestamp =
TimestampUtils.getEndOfDayTimestamp(TimestampUtils.subtractDays(timestamp, 1));
this.startTimestamp = TimestampUtils.getStartOfDayTimestamp(endTimestamp);
@ -93,6 +98,7 @@ public class CostAnalysisWorkflow {
// }
this.batchSize = batchSize;
this.costAnalysisConfig = costAnalysisConfig;
}
private void initialize() throws SearchIndexException {
@ -129,6 +135,9 @@ public class CostAnalysisWorkflow {
}
public void process() throws SearchIndexException {
if (!costAnalysisConfig.getEnabled()) {
return;
}
initialize();
Map<String, Object> contextData = new HashMap<>();

View File

@ -17,11 +17,13 @@ import java.util.Set;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.applications.configuration.internal.DataAssetsConfig;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.insights.DataInsightsApp;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchConfiguration;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface;
@ -46,6 +48,8 @@ import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
public class DataAssetsWorkflow {
public static final String DATA_STREAM_KEY = "DataStreamKey";
public static final String ENTITY_TYPE_FIELDS_KEY = "EnityTypeFields";
private static final String ALL_ENTITIES = "all";
private final DataAssetsConfig dataAssetsConfig;
private final int retentionDays = 30;
private final Long startTimestamp;
private final Long endTimestamp;
@ -63,6 +67,7 @@ public class DataAssetsWorkflow {
@Getter private final WorkflowStats workflowStats = new WorkflowStats("DataAssetsWorkflow");
public DataAssetsWorkflow(
DataAssetsConfig dataAssetsConfig,
Long timestamp,
int batchSize,
Optional<DataInsightsApp.Backfill> backfill,
@ -103,27 +108,33 @@ public class DataAssetsWorkflow {
this.entityTypes = entityTypes;
this.searchInterface = searchInterface;
this.dataInsightsSearchConfiguration = searchInterface.readDataInsightsSearchConfiguration();
this.dataAssetsConfig = dataAssetsConfig;
}
private void initialize() {
Stats stats = getInitialStatsForEntities(entityTypes);
int totalRecords = stats.getJobStats().getTotalRecords();
entityTypes.forEach(
entityType -> {
List<String> fields = List.of("*");
ListFilter filter;
// data product does not support soft delete
if (!entityType.equals("dataProduct")) {
filter = new ListFilter();
} else {
filter = new ListFilter(Include.ALL);
}
PaginatedEntitiesSource source =
new PaginatedEntitiesSource(entityType, batchSize, fields, filter)
.withName(String.format("[DataAssetsWorkflow] %s", entityType));
sources.add(source);
});
Set<String> entityTypesToProcess = getEntityTypesToProcess();
entityTypes.stream()
.filter(
entityType ->
dataAssetsConfig.getEntities().equals(Set.of(ALL_ENTITIES))
|| dataAssetsConfig.getEntities().contains(entityType))
.filter(
entityType ->
entityTypesToProcess.equals(Set.of(ALL_ENTITIES))
|| entityTypesToProcess.contains(entityType))
.forEach(
entityType -> {
List<String> fields = List.of("*");
ListFilter filter = getListFilter(entityType);
PaginatedEntitiesSource source =
new PaginatedEntitiesSource(entityType, batchSize, fields, filter)
.withName(String.format("[DataAssetsWorkflow] %s", entityType));
sources.add(source);
});
this.entityEnricher = new DataInsightsEntityEnricherProcessor(totalRecords);
if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
@ -143,7 +154,35 @@ public class DataAssetsWorkflow {
}
}
private ListFilter getListFilter(String entityType) {
ListFilter filter = null;
// data product does not support soft delete
if (!entityType.equals("dataProduct")) {
filter = new ListFilter();
if (dataAssetsConfig.getServiceFilter() != null) {
filter =
filter.addQueryParam("service", dataAssetsConfig.getServiceFilter().getServiceName());
}
} else {
filter = new ListFilter(Include.ALL);
}
return filter;
}
private Set<String> getEntityTypesToProcess() {
if (dataAssetsConfig.getServiceFilter() != null) {
return Entity.getEntityTypeInService(dataAssetsConfig.getServiceFilter().getServiceType());
} else {
return Set.of(ALL_ENTITIES);
}
}
public void process() throws SearchIndexException {
if (!dataAssetsConfig.getEnabled()) {
return;
}
initialize();
Map<String, Object> contextData = new HashMap<>();
@ -151,6 +190,7 @@ public class DataAssetsWorkflow {
contextData.put(END_TIMESTAMP_KEY, endTimestamp);
for (PaginatedEntitiesSource source : sources) {
deleteBasedOnDataRetentionPolicy(getDataStreamName(source.getEntityType()));
deleteDataBeforeInserting(getDataStreamName(source.getEntityType()));
contextData.put(DATA_STREAM_KEY, getDataStreamName(source.getEntityType()));
contextData.put(ENTITY_TYPE_KEY, source.getEntityType());
@ -201,14 +241,34 @@ public class DataAssetsWorkflow {
}
}
private void deleteDataBeforeInserting(String dataStreamName) throws SearchIndexException {
private void deleteBasedOnDataRetentionPolicy(String dataStreamName) throws SearchIndexException {
long retentionLimitTimestamp =
TimestampUtils.subtractDays(System.currentTimeMillis(), dataAssetsConfig.getRetention());
String rangeTermQuery =
String.format("{ \"@timestamp\": { \"lte\": %s } }", retentionLimitTimestamp);
try {
searchRepository
.getSearchClient()
.deleteByQuery(
dataStreamName,
String.format(
"{\"@timestamp\": {\"gte\": %s, \"lte\": %s}}", startTimestamp, endTimestamp));
searchRepository.getSearchClient().deleteByQuery(dataStreamName, rangeTermQuery);
} catch (Exception rx) {
throw new SearchIndexException(new IndexingError().withMessage(rx.getMessage()));
}
}
private void deleteDataBeforeInserting(String dataStreamName) throws SearchIndexException {
String rangeTermQuery =
String.format(
"{ \"@timestamp\": { \"gte\": %s, \"lte\": %s } }", startTimestamp, endTimestamp);
try {
if (dataAssetsConfig.getServiceFilter() == null) {
searchRepository.getSearchClient().deleteByQuery(dataStreamName, rangeTermQuery);
} else {
searchRepository
.getSearchClient()
.deleteByRangeAndTerm(
dataStreamName,
rangeTermQuery,
"service.name.keyword",
dataAssetsConfig.getServiceFilter().getServiceName());
}
} catch (Exception rx) {
throw new SearchIndexException(new IndexingError().withMessage(rx.getMessage()));
}

View File

@ -15,6 +15,7 @@ import java.util.Set;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.entity.applications.configuration.internal.DataQualityConfig;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
@ -42,6 +43,7 @@ public class DataQualityWorkflow {
private final Long startTimestamp;
private final Long endTimestamp;
private final int batchSize;
private final DataQualityConfig dataQualityConfig;
private final SearchRepository searchRepository;
private final CollectionDAO collectionDAO;
@ -56,6 +58,7 @@ public class DataQualityWorkflow {
private static final WorkflowStats workflowStats = new WorkflowStats("DataQualityWorkflow");
public DataQualityWorkflow(
DataQualityConfig dataQualityConfig,
Long timestamp,
int batchSize,
Optional<DataInsightsApp.Backfill> backfill,
@ -93,6 +96,7 @@ public class DataQualityWorkflow {
this.searchRepository = searchRepository;
this.collectionDAO = collectionDAO;
this.entityType = entityType;
this.dataQualityConfig = dataQualityConfig;
}
public String getIndexNameByType(String entityType) {
@ -141,6 +145,9 @@ public class DataQualityWorkflow {
}
public void process() throws SearchIndexException {
if (!dataQualityConfig.getEnabled()) {
return;
}
initialize();
Map<String, Object> contextData = new HashMap<>();

View File

@ -17,6 +17,7 @@ import org.openmetadata.schema.analytics.WebAnalyticEntityViewReportData;
import org.openmetadata.schema.analytics.WebAnalyticEventData;
import org.openmetadata.schema.analytics.WebAnalyticUserActivityReportData;
import org.openmetadata.schema.analytics.type.WebAnalyticEventType;
import org.openmetadata.schema.entity.applications.configuration.internal.AppAnalyticsConfig;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.insights.DataInsightsApp;
@ -38,6 +39,7 @@ public class WebAnalyticsWorkflow {
private final Long startTimestamp;
private final Long endTimestamp;
private final int batchSize;
private final AppAnalyticsConfig webAnalyticsConfig;
private static final int WEB_ANALYTIC_EVENTS_RETENTION_DAYS = 7;
private final List<PaginatedWebAnalyticEventDataSource> sources = new ArrayList<>();
private WebAnalyticsEntityViewProcessor webAnalyticsEntityViewProcessor;
@ -58,7 +60,10 @@ public class WebAnalyticsWorkflow {
public static final String ENTITY_VIEW_REPORT_DATA_KEY = "entityViewReportData";
public WebAnalyticsWorkflow(
Long timestamp, int batchSize, Optional<DataInsightsApp.Backfill> backfill) {
AppAnalyticsConfig webAnalyticsConfig,
Long timestamp,
int batchSize,
Optional<DataInsightsApp.Backfill> backfill) {
if (backfill.isPresent()) {
Long oldestPossibleTimestamp =
TimestampUtils.getStartOfDayTimestamp(
@ -86,6 +91,7 @@ public class WebAnalyticsWorkflow {
this.startTimestamp = TimestampUtils.getStartOfDayTimestamp(endTimestamp);
}
this.batchSize = batchSize;
this.webAnalyticsConfig = webAnalyticsConfig;
}
private void initialize() {
@ -112,6 +118,9 @@ public class WebAnalyticsWorkflow {
}
public void process() throws SearchIndexException {
if (!webAnalyticsConfig.getEnabled()) {
return;
}
initialize();
Map<String, Object> contextData = new HashMap<>();

View File

@ -1,5 +1,6 @@
package org.openmetadata.service.migration.mysql.v170;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.updateDataInsightsApplication;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.updateGovernanceWorkflowDefinitions;
import lombok.SneakyThrows;
@ -17,5 +18,6 @@ public class Migration extends MigrationProcessImpl {
public void runDataMigration() {
initializeWorkflowHandler();
updateGovernanceWorkflowDefinitions();
updateDataInsightsApplication();
}
}

View File

@ -1,5 +1,6 @@
package org.openmetadata.service.migration.postgres.v170;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.updateDataInsightsApplication;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.updateGovernanceWorkflowDefinitions;
import lombok.SneakyThrows;
@ -16,5 +17,6 @@ public class Migration extends MigrationProcessImpl {
@SneakyThrows
public void runDataMigration() {
updateGovernanceWorkflowDefinitions();
updateDataInsightsApplication();
}
}

View File

@ -7,10 +7,14 @@ import java.util.List;
import java.util.Map;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.governance.workflows.flowable.MainWorkflow;
import org.openmetadata.service.jdbi3.AppMarketPlaceRepository;
import org.openmetadata.service.jdbi3.AppRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository;
import org.openmetadata.service.util.EntityUtil;
@ -18,6 +22,32 @@ import org.openmetadata.service.util.JsonUtils;
@Slf4j
public class MigrationUtil {
public static void updateDataInsightsApplication() {
// Delete DataInsightsApplication - It will be recreated on AppStart
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
try {
appRepository.deleteByName("admin", "DataInsightsApplication", true, true);
} catch (EntityNotFoundException ex) {
LOG.debug("DataInsights Application not found.");
} catch (UnableToExecuteStatementException ex) {
// Note: Due to a change in the code this delete fails on a postDelete step that is not
LOG.debug("[UnableToExecuteStatementException]: {}", ex.getMessage());
}
// Update DataInsightsApplication MarketplaceDefinition - It will be recreated on AppStart
AppMarketPlaceRepository marketPlaceRepository =
(AppMarketPlaceRepository) Entity.getEntityRepository(Entity.APP_MARKET_PLACE_DEF);
try {
marketPlaceRepository.deleteByName("admin", "DataInsightsApplication", true, true);
} catch (EntityNotFoundException ex) {
LOG.debug("DataInsights Application Marketplace Definition not found.");
} catch (UnableToExecuteStatementException ex) {
// Note: Due to a change in the code this delete fails on a postDelete step that is not
LOG.debug("[UnableToExecuteStatementException]: {}", ex.getMessage());
}
}
@SneakyThrows
private static void setDefaultInputNamespaceMap(WorkflowNodeDefinitionInterface nodeDefinition) {

View File

@ -329,6 +329,8 @@ public interface SearchClient {
// TODO: Think if it makes sense to have this or maybe a specific deleteByRange
void deleteByQuery(String index, String query);
void deleteByRangeAndTerm(String index, String rangeQueryStr, String termKey, String termValue);
default BulkResponse bulk(BulkRequest data, RequestOptions options) throws IOException {
throw new CustomExceptionMessage(
Response.Status.NOT_IMPLEMENTED, NOT_IMPLEMENTED_ERROR_TYPE, NOT_IMPLEMENTED_METHOD);

View File

@ -2375,7 +2375,24 @@ public class ElasticSearchClient implements SearchClient {
XContentParser parser = createXContentParser(query);
parser.nextToken();
deleteRequest.setQuery(RangeQueryBuilder.fromXContent(parser));
client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
deleteEntityFromElasticSearchByQuery(deleteRequest);
}
@SneakyThrows
public void deleteByRangeAndTerm(
String index, String rangeQueryStr, String termKey, String termValue) {
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(index);
// Hack: Due to an issue on how the RangeQueryBuilder.fromXContent works, we're removing the
// first token from the Parser
XContentParser rangeParser = createXContentParser(rangeQueryStr);
rangeParser.nextToken();
RangeQueryBuilder rangeQuery = RangeQueryBuilder.fromXContent(rangeParser);
TermQueryBuilder termQuery = QueryBuilders.termQuery(termKey, termValue);
BoolQueryBuilder query = QueryBuilders.boolQuery().must(rangeQuery).must(termQuery);
deleteRequest.setQuery(query);
deleteEntityFromElasticSearchByQuery(deleteRequest);
}
@SneakyThrows

View File

@ -2335,7 +2335,24 @@ public class OpenSearchClient implements SearchClient {
XContentParser parser = createXContentParser(query);
parser.nextToken();
deleteRequest.setQuery(RangeQueryBuilder.fromXContent(parser));
client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
deleteEntityFromOpenSearchByQuery(deleteRequest);
}
@SneakyThrows
public void deleteByRangeAndTerm(
String index, String rangeQueryStr, String termKey, String termValue) {
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(index);
// Hack: Due to an issue on how the RangeQueryBuilder.fromXContent works, we're removing the
// first token from the Parser
XContentParser rangeParser = createXContentParser(rangeQueryStr);
rangeParser.nextToken();
RangeQueryBuilder rangeQuery = RangeQueryBuilder.fromXContent(rangeParser);
TermQueryBuilder termQuery = QueryBuilders.termQuery(termKey, termValue);
BoolQueryBuilder query = QueryBuilders.boolQuery().must(rangeQuery).must(termQuery);
deleteRequest.setQuery(query);
deleteEntityFromOpenSearchByQuery(deleteRequest);
}
@SneakyThrows

View File

@ -4,12 +4,13 @@
"hot": {
"actions": {
"rollover": {
"max_primary_shard_size": "50gb"
"max_primary_shard_size": "10gb",
"max_age": "{{halfRetention}}d"
}
}
},
"warm": {
"min_age": "30d",
"min_age": "{{halfRetention}}d",
"actions": {
"shrink": {
"number_of_shards": 1
@ -20,7 +21,7 @@
}
},
"delete": {
"min_age": "90d",
"min_age": "{{retention}}d",
"actions": {
"delete": {}
}

View File

@ -8,7 +8,8 @@
"actions": [
{
"rollover": {
"min_primary_shard_size": "50gb"
"min_primary_shard_size": "10gb",
"min_index_age": "{{halfRetention}}d"
}
}
],
@ -16,7 +17,7 @@
{
"state_name": "warm",
"conditions": {
"min_index_age": "30d"
"min_index_age": "{{halfRetention}}d"
}
}
]
@ -39,7 +40,7 @@
{
"state_name": "delete",
"conditions": {
"min_index_age": "90d"
"min_index_age": "{{retention}}d"
}
}
]
@ -54,7 +55,7 @@
}
],
"ism_template": {
"index_patterns": ["di-data-assets"],
"index_patterns": ["di-data-assets-*"],
"priority": 500
}
}

View File

@ -6,6 +6,24 @@
"recreateDataAssetsIndex": false,
"backfillConfiguration": {
"enabled": false
},
"moduleConfiguration": {
"dataAssets": {
"enabled": true,
"entities": [
"all"
],
"retention": 7
},
"appAnalytics": {
"enabled": true
},
"dataQuality": {
"enabled": true
},
"costAnalysis": {
"enabled": true
}
}
},
"appSchedule": {

View File

@ -20,6 +20,24 @@
"recreateDataAssetsIndex": false,
"backfillConfiguration": {
"enabled": false
},
"moduleConfiguration": {
"dataAssets": {
"enabled": true,
"entities": [
"all"
],
"retention": 7
},
"appAnalytics": {
"enabled": true
},
"dataQuality": {
"enabled": true
},
"costAnalysis": {
"enabled": true
}
}
}
}

View File

@ -86,7 +86,8 @@ public class KpiResourceTest extends EntityResourceTest<Kpi, CreateKpiRequest> {
dataStreamName,
dataAssetType,
getSearchRepository().getIndexMapping(dataAssetType),
"en");
"en",
7);
}
}
} catch (IOException ex) {

View File

@ -33,6 +33,115 @@
"format": "date"
}
}
},
"dataAssetsConfig": {
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "If Enabled, Data Asset insights will be populated when the App runs.",
"type": "boolean",
"default": true
},
"entities": {
"description": "List of Entities to Reindex",
"type": "array",
"items": {
"type": "string"
},
"default": ["all"],
"uniqueItems": true
},
"retention": {
"title": "Data Retention (Days)",
"description": "Defines the number of days the Data Assets Insights information will be kept. After it they will be deleted.",
"type": "integer",
"default": 7
},
"serviceFilter": {
"type": "object",
"properties": {
"serviceType": {
"type": "string"
},
"serviceName": {
"type": "string"
}
},
"additionalProperties": false,
"default": null
}
},
"additionalProperties": false,
"required": ["enabled"]
},
"appAnalyticsConfig": {
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "If Enabled, App Analytics insights will be populated when the App runs.",
"type": "boolean",
"default": true
}
},
"additionalProperties": false,
"required": ["enabled"]
},
"dataQualityConfig": {
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "If Enabled, Data Quality insights will be populated when the App runs.",
"type": "boolean",
"default": true
}
},
"additionalProperties": false,
"required": ["enabled"]
},
"costAnalysisConfig": {
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "If Enabled, Cost Analysis insights will be populated when the App runs.",
"type": "boolean",
"default": true
}
},
"additionalProperties": false,
"required": ["enabled"]
},
"moduleConfiguration": {
"description": "Different Module Configurations",
"title": "Module Configuration",
"type": "object",
"properties": {
"dataAssets": {
"title": "Data Assets Module",
"description": "Data Assets Insights Module configuration",
"$ref": "#/definitions/dataAssetsConfig"
},
"appAnalytics": {
"title": "App Analytics Module",
"description": "App Analytics Module configuration",
"$ref": "#/definitions/appAnalyticsConfig"
},
"dataQuality": {
"title": "Data Quality Insights Module",
"description": "Data Quality Insights Module configuration",
"$ref": "#/definitions/dataQualityConfig"
},
"costAnalysis": {
"title": "Cost Analysis Insights Module",
"description": "Cost Analysis Insights Module configuration",
"$ref": "#/definitions/costAnalysisConfig"
}
},
"additionalProperties": false,
"required": ["dataAssets", "appAnalytics", "dataQuality", "costAnalysis"]
}
},
"properties": {
@ -55,6 +164,9 @@
},
"backfillConfiguration": {
"$ref": "#/definitions/backfillConfiguration"
},
"moduleConfiguration": {
"$ref": "#/definitions/moduleConfiguration"
}
},
"additionalProperties": false

View File

@ -26,4 +26,9 @@ $$
$$section
### backfillConfiguration $(id="backfillConfiguration")
$$
$$section
### moduleConfiguration $(id="moduleConfiguration")
$$

View File

@ -20,7 +20,15 @@ class ApplicationsClassBase {
return import(`../../../../utils/ApplicationSchemas/${fqn}.json`);
}
public getJSONUISchema() {
return {};
return {
moduleConfiguration: {
dataAssets: {
serviceFilter: {
'ui:widget': 'hidden',
},
},
},
};
}
public importAppLogo(appName: string) {
return import(`../../../../assets/svg/${appName}.svg`);

View File

@ -19,7 +19,8 @@ export interface DataInsightsAppConfig {
/**
* Maximum number of events processed at a time (Default 100).
*/
batchSize?: number;
batchSize?: number;
moduleConfiguration?: ModuleConfiguration;
/**
* Recreates the DataAssets index on DataInsights. Useful if you changed a Custom Property
* Type and are facing errors. Bear in mind that recreating the index will delete your
@ -51,6 +52,83 @@ export interface BackfillConfiguration {
[property: string]: any;
}
/**
* Different Module Configurations
*/
export interface ModuleConfiguration {
/**
* App Analytics Module configuration
*/
appAnalytics: AppAnalyticsConfig;
/**
* Cost Analysis Insights Module configuration
*/
costAnalysis: CostAnalysisConfig;
/**
* Data Assets Insights Module configuration
*/
dataAssets: DataAssetsConfig;
/**
* Data Quality Insights Module configuration
*/
dataQuality: DataQualityConfig;
}
/**
* App Analytics Module configuration
*/
export interface AppAnalyticsConfig {
/**
* If Enabled, App Analytics insights will be populated when the App runs.
*/
enabled: boolean;
}
/**
* Cost Analysis Insights Module configuration
*/
export interface CostAnalysisConfig {
/**
* If Enabled, Cost Analysis insights will be populated when the App runs.
*/
enabled: boolean;
}
/**
* Data Assets Insights Module configuration
*/
export interface DataAssetsConfig {
/**
* If Enabled, Data Asset insights will be populated when the App runs.
*/
enabled: boolean;
/**
* List of Entities to Reindex
*/
entities?: string[];
/**
* Defines the number of days the Data Assets Insights information will be kept. After it
* they will be deleted.
*/
retention?: number;
serviceFilter?: ServiceFilter;
}
export interface ServiceFilter {
serviceName?: string;
serviceType?: string;
}
/**
* Data Quality Insights Module configuration
*/
export interface DataQualityConfig {
/**
* If Enabled, Data Quality insights will be populated when the App runs.
*/
enabled: boolean;
}
/**
* Application Type
*/

View File

@ -33,6 +33,134 @@
"format": "date"
}
}
},
"dataAssetsConfig": {
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "If Enabled, Data Asset insights will be populated when the App runs.",
"type": "boolean",
"default": true
},
"entities": {
"title": "Entities",
"description": "List of entities that you need to reindex",
"type": "array",
"items": {
"type": "string",
"enum": [
"table",
"storedProcedure",
"databaseSchema",
"database",
"chart",
"dashboard",
"dashboardDataModel",
"pipeline",
"topic",
"container",
"searchIndex",
"mlmodel",
"dataProduct",
"glossaryTerm",
"tag"
]
},
"default": ["all"],
"uiFieldType": "treeSelect",
"uniqueItems": true
},
"retention": {
"title": "Data Retention (Days)",
"description": "Defines the number of days the Data Assets Insights information will be kept. After it they will be deleted.",
"type": "integer",
"default": 7
},
"serviceFilter": {
"type": "object",
"properties": {
"serviceType": {
"type": "string"
},
"serviceName": {
"type": "string"
}
},
"additionalProperties": false,
"default": null
}
},
"additionalProperties": false,
"required": ["enabled"]
},
"appAnalyticsConfig": {
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "If Enabled, App Analytics insights will be populated when the App runs.",
"type": "boolean",
"default": true
}
},
"additionalProperties": false,
"required": ["enabled"]
},
"dataQualityConfig": {
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "If Enabled, Data Quality insights will be populated when the App runs.",
"type": "boolean",
"default": true
}
},
"additionalProperties": false,
"required": ["enabled"]
},
"costAnalysisConfig": {
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "If Enabled, Cost Analysis insights will be populated when the App runs.",
"type": "boolean",
"default": true
}
},
"additionalProperties": false,
"required": ["enabled"]
},
"moduleConfiguration": {
"description": "Different Module Configurations",
"title": "Module Configuration",
"type": "object",
"properties": {
"dataAssets": {
"title": "Data Assets Module",
"description": "Data Assets Insights Module configuration",
"$ref": "#/definitions/dataAssetsConfig"
},
"appAnalytics": {
"title": "App Analytics Module",
"description": "App Analytics Module configuration",
"$ref": "#/definitions/appAnalyticsConfig"
},
"dataQuality": {
"title": "Data Quality Insights Module",
"description": "Data Quality Insights Module configuration",
"$ref": "#/definitions/dataQualityConfig"
},
"costAnalysis": {
"title": "Cost Analysis Insights Module",
"description": "Cost Analysis Insights Module configuration",
"$ref": "#/definitions/costAnalysisConfig"
}
},
"additionalProperties": false,
"required": ["dataAssets", "appAnalytics", "dataQuality", "costAnalysis"]
}
},
"properties": {
@ -55,6 +183,9 @@
},
"backfillConfiguration": {
"$ref": "#/definitions/backfillConfiguration"
},
"moduleConfiguration": {
"$ref": "#/definitions/moduleConfiguration"
}
},
"additionalProperties": false