FIX #19386 & #19388: Fixing Data Insights index mapping (#19423)

* Fixing Data Insights index mapping

* Add OpenMetadataOperations cli endpoint to reindex data insights

* Improve IndexMapTemplate building

* Improve the code a bit

* Fix test
This commit is contained in:
IceS2 2025-01-23 10:02:52 +01:00 committed by GitHub
parent a631d22c32
commit 901063b802
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 396 additions and 264 deletions

View File

@ -146,11 +146,19 @@ public class DataInsightsApp extends AbstractNativeApplication {
private void createDataAssetsDataStream() {
DataInsightsSearchInterface searchInterface = getSearchInterface();
ElasticSearchConfiguration config = searchRepository.getElasticSearchConfiguration();
String language =
config != null && config.getSearchIndexMappingLanguage() != null
? config.getSearchIndexMappingLanguage().value()
: "en";
try {
for (String dataAssetType : dataAssetTypes) {
IndexMapping dataAssetIndex = searchRepository.getIndexMapping(dataAssetType);
String dataStreamName = getDataStreamName(dataAssetType);
if (!searchInterface.dataAssetDataStreamExists(dataStreamName)) {
searchInterface.createDataAssetsDataStream(dataStreamName);
searchInterface.createDataAssetsDataStream(
dataStreamName, dataAssetType, dataAssetIndex, language);
}
}
} catch (IOException ex) {
@ -312,7 +320,13 @@ public class DataInsightsApp extends AbstractNativeApplication {
private WorkflowStats processDataAssets() {
DataAssetsWorkflow workflow =
new DataAssetsWorkflow(
timestamp, batchSize, backfill, dataAssetTypes, collectionDAO, searchRepository);
timestamp,
batchSize,
backfill,
dataAssetTypes,
collectionDAO,
searchRepository,
getSearchInterface());
WorkflowStats workflowStats = workflow.getWorkflowStats();
try {

View File

@ -0,0 +1,10 @@
package org.openmetadata.service.apps.bundles.insights.search;
import java.util.List;
import java.util.Map;
import lombok.Getter;
@Getter
public class DataInsightsSearchConfiguration {
private Map<String, List<String>> mappingFields;
}

View File

@ -2,9 +2,13 @@ package org.openmetadata.service.apps.bundles.insights.search;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.util.JsonUtils;
public interface DataInsightsSearchInterface {
String DATA_INSIGHTS_SEARCH_CONFIG_PATH = "/dataInsights/config.json";
void createLifecyclePolicy(String name, String policy) throws IOException;
@ -23,7 +27,63 @@ public interface DataInsightsSearchInterface {
}
}
void createDataAssetsDataStream(String name) throws IOException;
default String buildMapping(
String entityType,
IndexMapping entityIndexMapping,
String language,
String indexMappingTemplateStr) {
IndexMappingTemplate indexMappingTemplate =
JsonUtils.readOrConvertValue(indexMappingTemplateStr, IndexMappingTemplate.class);
EntityIndexMap entityIndexMap =
JsonUtils.readOrConvertValue(
readResource(
String.format(entityIndexMapping.getIndexMappingFile(), language.toLowerCase())),
EntityIndexMap.class);
DataInsightsSearchConfiguration dataInsightsSearchConfiguration =
readDataInsightsSearchConfiguration();
List<String> entityAttributeFields =
getEntityAttributeFields(dataInsightsSearchConfiguration, entityType);
indexMappingTemplate
.getTemplate()
.getSettings()
.put("analysis", entityIndexMap.getSettings().get("analysis"));
for (String attribute : entityAttributeFields) {
if (!indexMappingTemplate
.getTemplate()
.getMappings()
.getProperties()
.containsKey(attribute)) {
Object value = entityIndexMap.getMappings().getProperties().get(attribute);
if (value != null) {
indexMappingTemplate.getTemplate().getMappings().getProperties().put(attribute, value);
}
}
}
return JsonUtils.pojoToJson(indexMappingTemplate);
}
default DataInsightsSearchConfiguration readDataInsightsSearchConfiguration() {
return JsonUtils.readOrConvertValue(
readResource(DATA_INSIGHTS_SEARCH_CONFIG_PATH), DataInsightsSearchConfiguration.class);
}
default List<String> getEntityAttributeFields(
DataInsightsSearchConfiguration dataInsightsSearchConfiguration, String entityType) {
List<String> entityAttributeFields =
dataInsightsSearchConfiguration.getMappingFields().get("common");
entityAttributeFields.addAll(
dataInsightsSearchConfiguration.getMappingFields().get(entityType));
return entityAttributeFields;
}
void createDataAssetsDataStream(
String name, String entityType, IndexMapping entityIndexMapping, String language)
throws IOException;
void deleteDataAssetDataStream(String name) throws IOException;

View File

@ -0,0 +1,18 @@
package org.openmetadata.service.apps.bundles.insights.search;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class EntityIndexMap {
private Map<String, Object> settings;
private Mappings mappings;
@Getter
@Setter
public static class Mappings {
private Map<String, Object> properties;
}
}

View File

@ -0,0 +1,8 @@
package org.openmetadata.service.apps.bundles.insights.search;
import lombok.Getter;
@Getter
public class IndexMappingTemplate {
private EntityIndexMap template;
}

View File

@ -5,6 +5,7 @@ import es.org.elasticsearch.client.Response;
import es.org.elasticsearch.client.RestClient;
import java.io.IOException;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface;
import org.openmetadata.service.search.models.IndexMapping;
public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterface {
private final RestClient client;
@ -52,7 +53,9 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
}
@Override
public void createDataAssetsDataStream(String name) throws IOException {
public void createDataAssetsDataStream(
String name, String entityType, IndexMapping entityIndexMapping, String language)
throws IOException {
String resourcePath = "/dataInsights/elasticsearch";
createLifecyclePolicy(
"di-data-assets-lifecycle",
@ -62,7 +65,11 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
readResource(String.format("%s/indexSettingsTemplate.json", resourcePath)));
createComponentTemplate(
"di-data-assets-mapping",
readResource(String.format("%s/indexMappingsTemplate.json", resourcePath)));
buildMapping(
entityType,
entityIndexMapping,
language,
readResource(String.format("%s/indexMappingsTemplate.json", resourcePath))));
createIndexTemplate(
"di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath)));
createDataStream(name);

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.apps.bundles.insights.search.opensearch;
import java.io.IOException;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface;
import org.openmetadata.service.search.models.IndexMapping;
import os.org.opensearch.client.Request;
import os.org.opensearch.client.Response;
import os.org.opensearch.client.ResponseException;
@ -63,14 +64,20 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
}
@Override
public void createDataAssetsDataStream(String name) throws IOException {
public void createDataAssetsDataStream(
String name, String entityType, IndexMapping entityIndexMapping, String language)
throws IOException {
String resourcePath = "/dataInsights/opensearch";
createLifecyclePolicy(
"di-data-assets-lifecycle",
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)));
createComponentTemplate(
"di-data-assets-mapping",
readResource(String.format("%s/indexMappingsTemplate.json", resourcePath)));
buildMapping(
entityType,
entityIndexMapping,
language,
readResource(String.format("%s/indexMappingsTemplate.json", resourcePath))));
createIndexTemplate(
"di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath)));
createDataStream(name);

View File

@ -23,6 +23,8 @@ import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.apps.bundles.insights.DataInsightsApp;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchConfiguration;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface;
import org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils;
import org.openmetadata.service.apps.bundles.insights.workflows.WorkflowStats;
import org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors.DataInsightsElasticSearchProcessor;
@ -43,6 +45,7 @@ import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
@Slf4j
public class DataAssetsWorkflow {
public static final String DATA_STREAM_KEY = "DataStreamKey";
public static final String ENTITY_TYPE_FIELDS_KEY = "EnityTypeFields";
private final int retentionDays = 30;
private final Long startTimestamp;
private final Long endTimestamp;
@ -51,6 +54,8 @@ public class DataAssetsWorkflow {
private final CollectionDAO collectionDAO;
private final List<PaginatedEntitiesSource> sources = new ArrayList<>();
private final Set<String> entityTypes;
private final DataInsightsSearchConfiguration dataInsightsSearchConfiguration;
private final DataInsightsSearchInterface searchInterface;
private DataInsightsEntityEnricherProcessor entityEnricher;
private Processor entityProcessor;
@ -63,7 +68,8 @@ public class DataAssetsWorkflow {
Optional<DataInsightsApp.Backfill> backfill,
Set<String> entityTypes,
CollectionDAO collectionDAO,
SearchRepository searchRepository) {
SearchRepository searchRepository,
DataInsightsSearchInterface searchInterface) {
if (backfill.isPresent()) {
Long oldestPossibleTimestamp =
TimestampUtils.getStartOfDayTimestamp(
@ -95,6 +101,8 @@ public class DataAssetsWorkflow {
this.searchRepository = searchRepository;
this.collectionDAO = collectionDAO;
this.entityTypes = entityTypes;
this.searchInterface = searchInterface;
this.dataInsightsSearchConfiguration = searchInterface.readDataInsightsSearchConfiguration();
}
private void initialize() {
@ -146,6 +154,10 @@ public class DataAssetsWorkflow {
deleteDataBeforeInserting(getDataStreamName(source.getEntityType()));
contextData.put(DATA_STREAM_KEY, getDataStreamName(source.getEntityType()));
contextData.put(ENTITY_TYPE_KEY, source.getEntityType());
contextData.put(
ENTITY_TYPE_FIELDS_KEY,
searchInterface.getEntityAttributeFields(
dataInsightsSearchConfiguration, source.getEntityType()));
while (!source.isDone().get()) {
try {

View File

@ -3,6 +3,7 @@ package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.proc
import static org.openmetadata.schema.EntityInterface.ENTITY_TYPE_TO_CLASS_MAP;
import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.END_TIMESTAMP_KEY;
import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.START_TIMESTAMP_KEY;
import static org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.DataAssetsWorkflow.ENTITY_TYPE_FIELDS_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TIMESTAMP_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
@ -141,6 +142,8 @@ public class DataInsightsEntityEnricherProcessor
Long endTimestamp = (Long) entityVersionMap.get("endTimestamp");
Map<String, Object> entityMap = JsonUtils.getMap(entity);
entityMap.keySet().retainAll((List<String>) contextData.get(ENTITY_TYPE_FIELDS_KEY));
String entityType = (String) contextData.get(ENTITY_TYPE_KEY);
List<Class<?>> interfaces = List.of(entity.getClass().getInterfaces());

View File

@ -4,6 +4,7 @@ import static org.flywaydb.core.internal.info.MigrationInfoDumper.dumpToAsciiTab
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.ADMIN_USER_NAME;
import static org.openmetadata.service.Entity.FIELD_OWNERS;
import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.timestampToString;
import static org.openmetadata.service.formatter.decorators.MessageDecorator.getDateStringEpochMilli;
import static org.openmetadata.service.jdbi3.UserRepository.AUTH_MECHANISM_FIELD;
import static org.openmetadata.service.util.AsciiTable.printOpenMetadataText;
@ -30,6 +31,7 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Callable;
@ -49,6 +51,8 @@ import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppSchedule;
import org.openmetadata.schema.entity.app.CreateApp;
import org.openmetadata.schema.entity.app.ScheduleTimeline;
import org.openmetadata.schema.entity.applications.configuration.internal.BackfillConfiguration;
import org.openmetadata.schema.entity.applications.configuration.internal.DataInsightsAppConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.services.connections.metadata.AuthProvider;
@ -513,6 +517,104 @@ public class OpenMetadataOperations implements Callable<Integer> {
return result;
}
@Command(
name = "reindexdi",
description = "Re Indexes data insights into search engine from command line.")
public Integer reIndexDI(
@Option(
names = {"-b", "--batch-size"},
defaultValue = "100",
description = "Number of records to process in each batch.")
int batchSize,
@Option(
names = {"--recreate-indexes"},
defaultValue = "true",
description = "Flag to determine if indexes should be recreated.")
boolean recreateIndexes,
@Option(
names = {"--start-date"},
description = "Start Date to backfill from.")
String startDate,
@Option(
names = {"--end-date"},
description = "End Date to backfill to.")
String endDate) {
try {
LOG.info(
"Running Reindexing with Batch Size: {}, Recreate-Index: {}, Start Date: {}, End Date: {}.",
batchSize,
recreateIndexes,
startDate,
endDate);
parseConfig();
CollectionRegistry.initialize();
ApplicationHandler.initialize(config);
CollectionRegistry.getInstance().loadSeedData(jdbi, config, null, null, null, true);
ApplicationHandler.initialize(config);
AppScheduler.initialize(config, collectionDAO, searchRepository);
String appName = "DataInsightsApplication";
return executeDataInsightsReindexApp(
appName, batchSize, recreateIndexes, getBackfillConfiguration(startDate, endDate));
} catch (Exception e) {
LOG.error("Failed to reindex due to ", e);
return 1;
}
}
private BackfillConfiguration getBackfillConfiguration(String startDate, String endDate) {
BackfillConfiguration backfillConfiguration = new BackfillConfiguration();
backfillConfiguration.withEnabled(false);
if (startDate != null) {
backfillConfiguration.withEnabled(true);
backfillConfiguration.withStartDate(startDate);
backfillConfiguration.withEndDate(
Objects.requireNonNullElseGet(
endDate, () -> timestampToString(System.currentTimeMillis(), "yyyy-MM-dd")));
}
return backfillConfiguration;
}
private int executeDataInsightsReindexApp(
String appName,
int batchSize,
boolean recreateIndexes,
BackfillConfiguration backfillConfiguration) {
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
App originalDataInsightsApp =
appRepository.getByName(null, appName, appRepository.getFields("id"));
DataInsightsAppConfig storedConfig =
JsonUtils.convertValue(
originalDataInsightsApp.getAppConfiguration(), DataInsightsAppConfig.class);
DataInsightsAppConfig updatedConfig =
JsonUtils.deepCopy(storedConfig, DataInsightsAppConfig.class);
updatedConfig
.withBatchSize(batchSize)
.withRecreateDataAssetsIndex(recreateIndexes)
.withBackfillConfiguration(backfillConfiguration);
// Update the data insights app with the new configurations
App updatedDataInsightsApp = JsonUtils.deepCopy(originalDataInsightsApp, App.class);
updatedDataInsightsApp.withAppConfiguration(updatedConfig);
JsonPatch patch = JsonUtils.getJsonPatch(originalDataInsightsApp, updatedDataInsightsApp);
appRepository.patch(null, originalDataInsightsApp.getId(), "admin", patch);
// Trigger Application
long currentTime = System.currentTimeMillis();
AppScheduler.getInstance().triggerOnDemandApplication(updatedDataInsightsApp);
int result = waitAndReturnReindexingAppStatus(updatedDataInsightsApp, currentTime);
// Re-patch with original configuration
JsonPatch repatch = JsonUtils.getJsonPatch(updatedDataInsightsApp, originalDataInsightsApp);
appRepository.patch(null, originalDataInsightsApp.getId(), "admin", repatch);
return result;
}
@SneakyThrows
private int waitAndReturnReindexingAppStatus(App searchIndexApp, long startTime) {
AppRunRecord appRunRecord = null;

View File

@ -0,0 +1,122 @@
{
"mappingFields": {
"common": [
"id",
"description",
"displayName",
"name",
"deleted",
"version",
"owners",
"tags",
"followers",
"extension",
"votes",
"fullyQualifiedName",
"domain",
"dataProducts",
"certification"
],
"table": [
"tableType",
"columns",
"databaseSchema",
"database",
"service",
"serviceType"
],
"storedProcedure": [
"storedProcedureType",
"databaseSchema",
"database",
"service",
"serviceType"
],
"databaseSchema": [
"database",
"service",
"serviceType"
],
"database": [
"service",
"serviceType"
],
"chart": [
"service",
"serviceType",
"chartType"
],
"dashboard": [
"service",
"serviceType",
"dashboardType"
],
"dashboardDataModel": [
"service",
"serviceType",
"dataModelType",
"project",
"columns"
],
"pipeline": [
"service",
"serviceType",
"pipelineStatus",
"tasks"
],
"topic": [
"service",
"serviceType"
],
"container": [
"service",
"serviceType",
"numberOfObjects",
"size",
"fileFormats",
"parent",
"children",
"prefix"
],
"searchIndex": [
"service",
"serviceType",
"indexType",
"fields"
],
"mlmodel": [
"service",
"serviceType",
"mlStore",
"algorithm",
"mlFeatures",
"mlHyperParameters",
"target",
"dashboard",
"server"
],
"dataProduct": [
"experts",
"domain",
"assets"
],
"glossaryTerm": [
"synonyms",
"glossary",
"parent",
"children",
"relatedTerms",
"references",
"reviewers",
"status",
"usageCount",
"childrenCount"
],
"tag": [
"classification",
"parent",
"children",
"usageCount"
]
}
}

View File

@ -1,138 +1,16 @@
{
"template": {
"settings": {
"analysis": {
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
},
"analyzer": {
"om_analyzer": {
"tokenizer": "letter",
"filter": [
"lowercase",
"om_stemmer"
]
},
"om_ngram": {
"tokenizer": "ngram",
"min_gram": 3,
"max_gram": 10,
"filter": [
"lowercase"
]
}
},
"filter": {
"om_stemmer": {
"type": "stemmer",
"name": "english"
}
}
}
},
"settings": {},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"owners": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
},
"name": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "text"
},
"href": {
"type": "text"
}
}
},
"domain": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "keyword"
},
"description": {
"type": "text"
},
"deleted": {
"type": "text"
},
"href": {
"type": "text"
}
}
}

View File

@ -1,138 +1,16 @@
{
"template": {
"settings": {
"analysis": {
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
},
"analyzer": {
"om_analyzer": {
"tokenizer": "letter",
"filter": [
"lowercase",
"om_stemmer"
]
},
"om_ngram": {
"tokenizer": "ngram",
"min_gram": 3,
"max_gram": 10,
"filter": [
"lowercase"
]
}
},
"filter": {
"om_stemmer": {
"type": "stemmer",
"name": "english"
}
}
}
},
"settings": {},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"owners": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
},
"name": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "text"
},
"href": {
"type": "text"
}
}
},
"domain": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "keyword"
},
"description": {
"type": "text"
},
"deleted": {
"type": "text"
},
"href": {
"type": "text"
}
}
}

View File

@ -204,6 +204,9 @@
}
}
},
"startDate": {
"type": "text"
},
"tasks": {
"properties": {
"name": {
@ -234,6 +237,12 @@
},
"taskType": {
"type": "text"
},
"startDate": {
"type": "text"
},
"endDate": {
"type": "text"
}
}
},

View File

@ -320,7 +320,7 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
Request request = new Request("GET", "di-data-assets-*/_search");
String payload =
String.format(
"{\"query\":{\"bool\":{\"must\":{\"term\":{\"fullyQualifiedName.keyword\":\"%s\"}}}}}",
"{\"query\":{\"bool\":{\"must\":{\"term\":{\"fullyQualifiedName\":\"%s\"}}}}}",
table.getFullyQualifiedName());
request.setJsonEntity(payload);
response = searchClient.performRequest(request);

View File

@ -82,7 +82,11 @@ public class KpiResourceTest extends EntityResourceTest<Kpi, CreateKpiRequest> {
String dataStreamName =
String.format("%s-%s", "di-data-assets", dataAssetType).toLowerCase();
if (!searchInterface.dataAssetDataStreamExists(dataStreamName)) {
searchInterface.createDataAssetsDataStream(dataStreamName);
searchInterface.createDataAssetsDataStream(
dataStreamName,
dataAssetType,
getSearchRepository().getIndexMapping(dataAssetType),
"en");
}
}
} catch (IOException ex) {