From 717c9709e00e51ee29f02be4e2ef9ae1f84c12e3 Mon Sep 17 00:00:00 2001 From: IceS2 Date: Fri, 9 Aug 2024 03:50:27 +0200 Subject: [PATCH] Update Data Insights Pipeline to change CustomProperty key based on entityType (#17350) --- .../bundles/insights/DataInsightsApp.java | 30 ++++++++++++++++++- .../search/DataInsightsSearchInterface.java | 2 ++ .../ElasticSearchDataInsightsClient.java | 5 ++++ .../OpenSearchDataInsightsClient.java | 5 ++++ .../DataInsightsEntityEnricherProcessor.java | 5 ++++ .../data/app/DataInsightsApplication.json | 1 + .../DataInsightsApplication.json | 1 + .../internal/dataInsightsAppConfig.json | 6 ++++ .../DataInsightsApplication.json | 6 ++++ 9 files changed, 60 insertions(+), 1 deletion(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java index 45edcb70c8f..fe5187afb1b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java @@ -47,6 +47,8 @@ public class DataInsightsApp extends AbstractNativeApplication { public record Backfill(String startDate, String endDate) {} + private Optional recreateDataAssetsIndex; + @Getter private Optional backfill; @Getter EventPublisherJob jobData; private volatile boolean stopped = false; @@ -55,7 +57,7 @@ public class DataInsightsApp extends AbstractNativeApplication { super(collectionDAO, searchRepository); } - private void createDataAssetsDataStream() { + private DataInsightsSearchInterface getSearchInterface() { DataInsightsSearchInterface searchInterface; if (searchRepository @@ -70,6 +72,11 @@ public class DataInsightsApp extends AbstractNativeApplication { (os.org.opensearch.client.RestClient) searchRepository.getSearchClient().getLowLevelClient()); } + return searchInterface; + } + + private void createDataAssetsDataStream() { + DataInsightsSearchInterface searchInterface = getSearchInterface(); try { if (!searchInterface.dataAssetDataStreamExists("di-data-assets")) { @@ -80,6 +87,18 @@ public class DataInsightsApp extends AbstractNativeApplication { } } + private void deleteDataAssetsDataStream() { + DataInsightsSearchInterface searchInterface = getSearchInterface(); + + try { + if (searchInterface.dataAssetDataStreamExists("di-data-assets")) { + searchInterface.deleteDataAssetDataStream(); + } + } catch (IOException ex) { + LOG.error("Couldn't delete DataAssets DataStream", ex); + } + } + @Override public void init(App app) { super.init(app); @@ -90,6 +109,9 @@ public class DataInsightsApp extends AbstractNativeApplication { // Configure batchSize batchSize = config.getBatchSize(); + // Configure recreate + recreateDataAssetsIndex = Optional.ofNullable(config.getRecreateDataAssetsIndex()); + // Configure Backfill Optional backfillConfig = Optional.ofNullable(config.getBackfillConfiguration()); @@ -118,6 +140,12 @@ public class DataInsightsApp extends AbstractNativeApplication { if (!runType.equals(ON_DEMAND_JOB)) { backfill = Optional.empty(); + recreateDataAssetsIndex = Optional.empty(); + } + + if (recreateDataAssetsIndex.isPresent() && recreateDataAssetsIndex.get().equals(true)) { + deleteDataAssetsDataStream(); + createDataAssetsDataStream(); } WorkflowStats webAnalyticsStats = processWebAnalytics(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java index 3bd3085efe6..f777fa414a4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java @@ -25,5 +25,7 @@ public interface DataInsightsSearchInterface { void createDataAssetsDataStream() throws IOException; + void deleteDataAssetDataStream() throws IOException; + Boolean dataAssetDataStreamExists(String name) throws IOException; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java index a13b5fbee12..91e0f545007 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java @@ -67,4 +67,9 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf "di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath))); createDataStream("di-data-assets"); } + + @Override + public void deleteDataAssetDataStream() throws IOException { + performRequest("DELETE", "/_data_stream/di-data-assets"); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java index 4340bc08ed3..80fb1ebf6da 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java @@ -75,4 +75,9 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface "di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath))); createDataStream("di-data-assets"); } + + @Override + public void deleteDataAssetDataStream() throws IOException { + performRequest("DELETE", "_data_stream/di-data-assets"); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java index a4a0b830ad2..4be08f85fea 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java @@ -187,6 +187,11 @@ public class DataInsightsEntityEnricherProcessor entityMap.put("hasDescription", CommonUtil.nullOrEmpty(entity.getDescription()) ? 0 : 1); } + // Modify Custom Property key + Optional oCustomProperties = Optional.ofNullable(entityMap.remove("extension")); + oCustomProperties.ifPresent( + o -> entityMap.put(String.format("%sCustomProperty", entityType), o)); + // Remove 'changeDescription' field entityMap.remove("changeDescription"); // Remove 'sampleData' diff --git a/openmetadata-service/src/main/resources/json/data/app/DataInsightsApplication.json b/openmetadata-service/src/main/resources/json/data/app/DataInsightsApplication.json index f5dee93cd88..ff7ca49dd4a 100644 --- a/openmetadata-service/src/main/resources/json/data/app/DataInsightsApplication.json +++ b/openmetadata-service/src/main/resources/json/data/app/DataInsightsApplication.json @@ -3,6 +3,7 @@ "displayName": "Data Insights", "appConfiguration": { "batchSize": 100, + "recreateDataAssetsIndex": false, "backfillConfiguration": { "enabled": false } diff --git a/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/DataInsightsApplication.json b/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/DataInsightsApplication.json index 380f9bccfb7..58efd4e8fa6 100644 --- a/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/DataInsightsApplication.json +++ b/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/DataInsightsApplication.json @@ -17,6 +17,7 @@ }, "appConfiguration": { "batchSize": 100, + "recreateDataAssetsIndex": false, "backfillConfiguration": { "enabled": false } diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/dataInsightsAppConfig.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/dataInsightsAppConfig.json index 61c0133d10f..b63104da61f 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/dataInsightsAppConfig.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/dataInsightsAppConfig.json @@ -47,6 +47,12 @@ "type": "integer", "default": 100 }, + "recreateDataAssetsIndex": { + "title": "Recreate DataInsights DataAssets Index", + "description": "Recreates the DataAssets index on DataInsights. Useful if you changed a Custom Property Type and are facing errors. Bear in mind that recreating the index will delete your DataAssets and a backfill will be needed.", + "type": "boolean", + "default": false + }, "backfillConfiguration": { "$ref": "#/definitions/backfillConfiguration" } diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/DataInsightsApplication.json b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/DataInsightsApplication.json index 7d61e008df0..f1defeaaaf7 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/DataInsightsApplication.json +++ b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/DataInsightsApplication.json @@ -47,6 +47,12 @@ "type": "integer", "default": 100 }, + "recreateDataAssetsIndex": { + "title": "Recreate DataInsights DataAssets Index", + "description": "Recreates the DataAssets index on DataInsights. Useful if you changed a Custom Property Type and are facing errors. Bear in mind that recreating the index will delete your DataAssets and a backfill will be needed.", + "type": "boolean", + "default": false + }, "backfillConfiguration": { "$ref": "#/definitions/backfillConfiguration" }