mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-27 00:31:42 +00:00
Update Data Insights Pipeline to change CustomProperty key based on entityType (#17350)
This commit is contained in:
parent
4b2f3fa4d0
commit
717c9709e0
@ -47,6 +47,8 @@ public class DataInsightsApp extends AbstractNativeApplication {
|
||||
|
||||
public record Backfill(String startDate, String endDate) {}
|
||||
|
||||
private Optional<Boolean> recreateDataAssetsIndex;
|
||||
|
||||
@Getter private Optional<Backfill> backfill;
|
||||
@Getter EventPublisherJob jobData;
|
||||
private volatile boolean stopped = false;
|
||||
@ -55,7 +57,7 @@ public class DataInsightsApp extends AbstractNativeApplication {
|
||||
super(collectionDAO, searchRepository);
|
||||
}
|
||||
|
||||
private void createDataAssetsDataStream() {
|
||||
private DataInsightsSearchInterface getSearchInterface() {
|
||||
DataInsightsSearchInterface searchInterface;
|
||||
|
||||
if (searchRepository
|
||||
@ -70,6 +72,11 @@ public class DataInsightsApp extends AbstractNativeApplication {
|
||||
(os.org.opensearch.client.RestClient)
|
||||
searchRepository.getSearchClient().getLowLevelClient());
|
||||
}
|
||||
return searchInterface;
|
||||
}
|
||||
|
||||
private void createDataAssetsDataStream() {
|
||||
DataInsightsSearchInterface searchInterface = getSearchInterface();
|
||||
|
||||
try {
|
||||
if (!searchInterface.dataAssetDataStreamExists("di-data-assets")) {
|
||||
@ -80,6 +87,18 @@ public class DataInsightsApp extends AbstractNativeApplication {
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteDataAssetsDataStream() {
|
||||
DataInsightsSearchInterface searchInterface = getSearchInterface();
|
||||
|
||||
try {
|
||||
if (searchInterface.dataAssetDataStreamExists("di-data-assets")) {
|
||||
searchInterface.deleteDataAssetDataStream();
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Couldn't delete DataAssets DataStream", ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(App app) {
|
||||
super.init(app);
|
||||
@ -90,6 +109,9 @@ public class DataInsightsApp extends AbstractNativeApplication {
|
||||
// Configure batchSize
|
||||
batchSize = config.getBatchSize();
|
||||
|
||||
// Configure recreate
|
||||
recreateDataAssetsIndex = Optional.ofNullable(config.getRecreateDataAssetsIndex());
|
||||
|
||||
// Configure Backfill
|
||||
Optional<BackfillConfiguration> backfillConfig =
|
||||
Optional.ofNullable(config.getBackfillConfiguration());
|
||||
@ -118,6 +140,12 @@ public class DataInsightsApp extends AbstractNativeApplication {
|
||||
|
||||
if (!runType.equals(ON_DEMAND_JOB)) {
|
||||
backfill = Optional.empty();
|
||||
recreateDataAssetsIndex = Optional.empty();
|
||||
}
|
||||
|
||||
if (recreateDataAssetsIndex.isPresent() && recreateDataAssetsIndex.get().equals(true)) {
|
||||
deleteDataAssetsDataStream();
|
||||
createDataAssetsDataStream();
|
||||
}
|
||||
|
||||
WorkflowStats webAnalyticsStats = processWebAnalytics();
|
||||
|
||||
@ -25,5 +25,7 @@ public interface DataInsightsSearchInterface {
|
||||
|
||||
void createDataAssetsDataStream() throws IOException;
|
||||
|
||||
void deleteDataAssetDataStream() throws IOException;
|
||||
|
||||
Boolean dataAssetDataStreamExists(String name) throws IOException;
|
||||
}
|
||||
|
||||
@ -67,4 +67,9 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
|
||||
"di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath)));
|
||||
createDataStream("di-data-assets");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteDataAssetDataStream() throws IOException {
|
||||
performRequest("DELETE", "/_data_stream/di-data-assets");
|
||||
}
|
||||
}
|
||||
|
||||
@ -75,4 +75,9 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
|
||||
"di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath)));
|
||||
createDataStream("di-data-assets");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteDataAssetDataStream() throws IOException {
|
||||
performRequest("DELETE", "_data_stream/di-data-assets");
|
||||
}
|
||||
}
|
||||
|
||||
@ -187,6 +187,11 @@ public class DataInsightsEntityEnricherProcessor
|
||||
entityMap.put("hasDescription", CommonUtil.nullOrEmpty(entity.getDescription()) ? 0 : 1);
|
||||
}
|
||||
|
||||
// Modify Custom Property key
|
||||
Optional<Object> oCustomProperties = Optional.ofNullable(entityMap.remove("extension"));
|
||||
oCustomProperties.ifPresent(
|
||||
o -> entityMap.put(String.format("%sCustomProperty", entityType), o));
|
||||
|
||||
// Remove 'changeDescription' field
|
||||
entityMap.remove("changeDescription");
|
||||
// Remove 'sampleData'
|
||||
|
||||
@ -3,6 +3,7 @@
|
||||
"displayName": "Data Insights",
|
||||
"appConfiguration": {
|
||||
"batchSize": 100,
|
||||
"recreateDataAssetsIndex": false,
|
||||
"backfillConfiguration": {
|
||||
"enabled": false
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
},
|
||||
"appConfiguration": {
|
||||
"batchSize": 100,
|
||||
"recreateDataAssetsIndex": false,
|
||||
"backfillConfiguration": {
|
||||
"enabled": false
|
||||
}
|
||||
|
||||
@ -47,6 +47,12 @@
|
||||
"type": "integer",
|
||||
"default": 100
|
||||
},
|
||||
"recreateDataAssetsIndex": {
|
||||
"title": "Recreate DataInsights DataAssets Index",
|
||||
"description": "Recreates the DataAssets index on DataInsights. Useful if you changed a Custom Property Type and are facing errors. Bear in mind that recreating the index will delete your DataAssets and a backfill will be needed.",
|
||||
"type": "boolean",
|
||||
"default": false
|
||||
},
|
||||
"backfillConfiguration": {
|
||||
"$ref": "#/definitions/backfillConfiguration"
|
||||
}
|
||||
|
||||
@ -47,6 +47,12 @@
|
||||
"type": "integer",
|
||||
"default": 100
|
||||
},
|
||||
"recreateDataAssetsIndex": {
|
||||
"title": "Recreate DataInsights DataAssets Index",
|
||||
"description": "Recreates the DataAssets index on DataInsights. Useful if you changed a Custom Property Type and are facing errors. Bear in mind that recreating the index will delete your DataAssets and a backfill will be needed.",
|
||||
"type": "boolean",
|
||||
"default": false
|
||||
},
|
||||
"backfillConfiguration": {
|
||||
"$ref": "#/definitions/backfillConfiguration"
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user