diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java index fe5187afb1b..4b6a7ad00e4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; @@ -42,6 +43,7 @@ import org.quartz.JobExecutionContext; @Slf4j public class DataInsightsApp extends AbstractNativeApplication { public static final String REPORT_DATA_TYPE_KEY = "ReportDataType"; + public static final String DATA_ASSET_INDEX_PREFIX = "di-data-assets"; @Getter private Long timestamp; @Getter private int batchSize; @@ -53,6 +55,24 @@ public class DataInsightsApp extends AbstractNativeApplication { @Getter EventPublisherJob jobData; private volatile boolean stopped = false; + public final Set dataAssetTypes = + Set.of( + "table", + "storedProcedure", + "databaseSchema", + "database", + "chart", + "dashboard", + "dashboardDataModel", + "pipeline", + "topic", + "container", + "searchIndex", + "mlmodel", + "dataProduct", + "glossaryTerm", + "tag"); + public DataInsightsApp(CollectionDAO collectionDAO, SearchRepository searchRepository) { super(collectionDAO, searchRepository); } @@ -75,12 +95,19 @@ public class DataInsightsApp extends AbstractNativeApplication { return searchInterface; } + public static String getDataStreamName(String dataAssetType) { + return String.format("%s-%s", DATA_ASSET_INDEX_PREFIX, dataAssetType).toLowerCase(); + } + private void createDataAssetsDataStream() { DataInsightsSearchInterface searchInterface = getSearchInterface(); try { - if (!searchInterface.dataAssetDataStreamExists("di-data-assets")) { - searchInterface.createDataAssetsDataStream(); + for (String dataAssetType : dataAssetTypes) { + String dataStreamName = getDataStreamName(dataAssetType); + if (!searchInterface.dataAssetDataStreamExists(dataStreamName)) { + searchInterface.createDataAssetsDataStream(dataStreamName); + } } } catch (IOException ex) { LOG.error("Couldn't install DataInsightsApp: Can't initialize ElasticSearch Index.", ex); @@ -91,8 +118,11 @@ public class DataInsightsApp extends AbstractNativeApplication { DataInsightsSearchInterface searchInterface = getSearchInterface(); try { - if (searchInterface.dataAssetDataStreamExists("di-data-assets")) { - searchInterface.deleteDataAssetDataStream(); + for (String dataAssetType : dataAssetTypes) { + String dataStreamName = getDataStreamName(dataAssetType); + if (searchInterface.dataAssetDataStreamExists(dataStreamName)) { + searchInterface.deleteDataAssetDataStream(dataStreamName); + } } } catch (IOException ex) { LOG.error("Couldn't delete DataAssets DataStream", ex); @@ -231,7 +261,8 @@ public class DataInsightsApp extends AbstractNativeApplication { private WorkflowStats processDataAssets() { DataAssetsWorkflow workflow = - new DataAssetsWorkflow(timestamp, batchSize, backfill, collectionDAO, searchRepository); + new DataAssetsWorkflow( + timestamp, batchSize, backfill, dataAssetTypes, collectionDAO, searchRepository); WorkflowStats workflowStats = workflow.getWorkflowStats(); try { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java index f777fa414a4..0d06b3e03be 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java @@ -23,9 +23,9 @@ public interface DataInsightsSearchInterface { } } - void createDataAssetsDataStream() throws IOException; + void createDataAssetsDataStream(String name) throws IOException; - void deleteDataAssetDataStream() throws IOException; + void deleteDataAssetDataStream(String name) throws IOException; Boolean dataAssetDataStreamExists(String name) throws IOException; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java index 91e0f545007..345ea1ceb9e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java @@ -52,7 +52,7 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf } @Override - public void createDataAssetsDataStream() throws IOException { + public void createDataAssetsDataStream(String name) throws IOException { String resourcePath = "/dataInsights/elasticsearch"; createLifecyclePolicy( "di-data-assets-lifecycle", @@ -65,11 +65,11 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf readResource(String.format("%s/indexMappingsTemplate.json", resourcePath))); createIndexTemplate( "di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath))); - createDataStream("di-data-assets"); + createDataStream(name); } @Override - public void deleteDataAssetDataStream() throws IOException { - performRequest("DELETE", "/_data_stream/di-data-assets"); + public void deleteDataAssetDataStream(String name) throws IOException { + performRequest("DELETE", String.format("/_data_stream/%s", name)); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java index 80fb1ebf6da..ee01925bfdf 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java @@ -63,7 +63,7 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface } @Override - public void createDataAssetsDataStream() throws IOException { + public void createDataAssetsDataStream(String name) throws IOException { String resourcePath = "/dataInsights/opensearch"; createLifecyclePolicy( "di-data-assets-lifecycle", @@ -73,11 +73,11 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface readResource(String.format("%s/indexMappingsTemplate.json", resourcePath))); createIndexTemplate( "di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath))); - createDataStream("di-data-assets"); + createDataStream(name); } @Override - public void deleteDataAssetDataStream() throws IOException { - performRequest("DELETE", "_data_stream/di-data-assets"); + public void deleteDataAssetDataStream(String name) throws IOException { + performRequest("DELETE", String.format("_data_stream/%s", name)); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java index ab2ea713bf3..7c430d54a78 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java @@ -1,6 +1,7 @@ package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets; import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; +import static org.openmetadata.service.apps.bundles.insights.DataInsightsApp.getDataStreamName; import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.END_TIMESTAMP_KEY; import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.START_TIMESTAMP_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; @@ -38,7 +39,7 @@ import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource; @Slf4j public class DataAssetsWorkflow { - public static final String DATA_ASSETS_DATA_STREAM = "di-data-assets"; + public static final String DATA_STREAM_KEY = "DataStreamKey"; private final int retentionDays = 30; private final Long startTimestamp; private final Long endTimestamp; @@ -46,23 +47,7 @@ public class DataAssetsWorkflow { private final SearchRepository searchRepository; private final CollectionDAO collectionDAO; private final List sources = new ArrayList<>(); - private final Set entityTypes = - Set.of( - "table", - "storedProcedure", - "databaseSchema", - "database", - "chart", - "dashboard", - "dashboardDataModel", - "pipeline", - "topic", - "container", - "searchIndex", - "mlmodel", - "dataProduct", - "glossaryTerm", - "tag"); + private final Set entityTypes; private DataInsightsEntityEnricherProcessor entityEnricher; private Processor entityProcessor; @@ -73,6 +58,7 @@ public class DataAssetsWorkflow { Long timestamp, int batchSize, Optional backfill, + Set entityTypes, CollectionDAO collectionDAO, SearchRepository searchRepository) { if (backfill.isPresent()) { @@ -105,6 +91,7 @@ public class DataAssetsWorkflow { this.batchSize = batchSize; this.searchRepository = searchRepository; this.collectionDAO = collectionDAO; + this.entityTypes = entityTypes; } private void initialize() { @@ -144,9 +131,9 @@ public class DataAssetsWorkflow { contextData.put(START_TIMESTAMP_KEY, startTimestamp); contextData.put(END_TIMESTAMP_KEY, endTimestamp); - deleteDataBeforeInserting(); - for (PaginatedEntitiesSource source : sources) { + deleteDataBeforeInserting(getDataStreamName(source.getEntityType())); + contextData.put(DATA_STREAM_KEY, getDataStreamName(source.getEntityType())); contextData.put(ENTITY_TYPE_KEY, source.getEntityType()); while (!source.isDone()) { @@ -191,12 +178,12 @@ public class DataAssetsWorkflow { } } - private void deleteDataBeforeInserting() throws SearchIndexException { + private void deleteDataBeforeInserting(String dataStreamName) throws SearchIndexException { try { searchRepository .getSearchClient() .deleteByQuery( - DATA_ASSETS_DATA_STREAM, + dataStreamName, String.format( "{\"@timestamp\": {\"gte\": %s, \"lte\": %s}}", startTimestamp, endTimestamp)); } catch (Exception rx) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsElasticSearchProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsElasticSearchProcessor.java index 77515dd19a0..2349208e50c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsElasticSearchProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsElasticSearchProcessor.java @@ -1,6 +1,6 @@ package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors; -import static org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.DataAssetsWorkflow.DATA_ASSETS_DATA_STREAM; +import static org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.DataAssetsWorkflow.DATA_STREAM_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; import es.org.elasticsearch.action.bulk.BulkRequest; @@ -29,7 +29,7 @@ public class DataInsightsElasticSearchProcessor @Override public BulkRequest process(List> input, Map contextData) throws SearchIndexException { - String index = DATA_ASSETS_DATA_STREAM; + String index = (String) contextData.get(DATA_STREAM_KEY); LOG.debug( "[EsEntitiesProcessor] Processing a Batch of Size: {}, Index: {} ", input.size(), index); BulkRequest requests; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java index e9bb1497df8..49766197f8d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java @@ -92,7 +92,7 @@ public class DataInsightsEntityEnricherProcessor EntityInterface versionEntity = JsonUtils.readOrConvertValue( version, ENTITY_TYPE_TO_CLASS_MAP.get(entityType.toLowerCase())); - Long versionTimestamp = TimestampUtils.getEndOfDayTimestamp(versionEntity.getUpdatedAt()); + Long versionTimestamp = TimestampUtils.getStartOfDayTimestamp(versionEntity.getUpdatedAt()); if (versionTimestamp > pointerTimestamp) { continue; } else if (versionTimestamp < startTimestamp) { @@ -108,15 +108,12 @@ public class DataInsightsEntityEnricherProcessor Map versionMap = new HashMap<>(); versionMap.put("endTimestamp", pointerTimestamp); - versionMap.put("startTimestamp", versionTimestamp); + versionMap.put("startTimestamp", TimestampUtils.getEndOfDayTimestamp(versionTimestamp)); versionMap.put("versionEntity", versionEntity); entityVersions.add(versionMap); - if (versionTimestamp.equals(pointerTimestamp)) { - pointerTimestamp = TimestampUtils.subtractDays(pointerTimestamp, 1); - } else { - pointerTimestamp = versionTimestamp; - } + pointerTimestamp = + TimestampUtils.getEndOfDayTimestamp(TimestampUtils.subtractDays(versionTimestamp, 1)); } } return entityVersions; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsOpenSearchProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsOpenSearchProcessor.java index 17dbbb02a06..0284ed03674 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsOpenSearchProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsOpenSearchProcessor.java @@ -1,6 +1,6 @@ package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors; -import static org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.DataAssetsWorkflow.DATA_ASSETS_DATA_STREAM; +import static org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.DataAssetsWorkflow.DATA_STREAM_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; import java.util.List; @@ -29,7 +29,7 @@ public class DataInsightsOpenSearchProcessor @Override public BulkRequest process(List> input, Map contextData) throws SearchIndexException { - String index = DATA_ASSETS_DATA_STREAM; + String index = (String) contextData.get(DATA_STREAM_KEY); LOG.debug( "[OsEntitiesProcessor] Processing a Batch of Size: {}, Index: {} ", input.size(), index); BulkRequest requests; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java index 916af9a9ebb..48102c18501 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java @@ -18,7 +18,7 @@ public class DataInsightSystemChartRepository extends EntityRepository { // ------------------------------------------------- RestClient searchClient = getSearchClient(); es.org.elasticsearch.client.Response response; - Request request = new Request("GET", "di-data-assets/_search"); + Request request = new Request("GET", "di-data-assets-*/_search"); String payload = String.format( "{\"query\":{\"bool\":{\"must\":{\"term\":{\"fullyQualifiedName.keyword\":\"%s\"}}}}}", diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/kpi/KpiResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/kpi/KpiResourceTest.java index a564d8b10c1..e8b40f36e85 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/kpi/KpiResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/kpi/KpiResourceTest.java @@ -16,6 +16,7 @@ import es.org.elasticsearch.client.RestClient; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.HttpResponseException; @@ -47,6 +48,23 @@ public class KpiResourceTest extends EntityResourceTest { private void createDataAssetsDataStream() { DataInsightsSearchInterface searchInterface; + Set dataAssetTypes = + Set.of( + "table", + "storedProcedure", + "databaseSchema", + "database", + "chart", + "dashboard", + "dashboardDataModel", + "pipeline", + "topic", + "container", + "searchIndex", + "mlmodel", + "dataProduct", + "glossaryTerm", + "tag"); if (getSearchRepository() .getSearchType() .equals(ElasticSearchConfiguration.SearchType.ELASTICSEARCH)) { @@ -59,17 +77,21 @@ public class KpiResourceTest extends EntityResourceTest { (os.org.opensearch.client.RestClient) getSearchRepository().getSearchClient().getLowLevelClient()); } - try { - if (!searchInterface.dataAssetDataStreamExists("di-data-assets")) { - searchInterface.createDataAssetsDataStream(); + for (String dataAssetType : dataAssetTypes) { + String dataStreamName = + String.format("%s-%s", "di-data-assets", dataAssetType).toLowerCase(); + if (!searchInterface.dataAssetDataStreamExists(dataStreamName)) { + searchInterface.createDataAssetsDataStream(dataStreamName); + } } } catch (IOException ex) { - LOG.error("Couldn't install DataInsightsApp: Can't initialize ElasticSearch Index."); + LOG.error("Couldn't install DataInsightsApp: Can't initialize ElasticSearch Index.", ex); } } public void setupKpi() throws IOException { + createDataAssetsDataStream(); KPI_TARGET = new KpiTarget().withName("Percentage").withValue("80.0"); }