MINOR: Implement Multi Data Streams. Fix Duplicating issue (#17365)

* Implement Multi Data Streams. Fix Duplicating issue

* Fix Test for multi index search

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
IceS2 2024-08-12 14:09:13 +02:00 committed by GitHub
parent c84b77859e
commit 7b191b891d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 93 additions and 56 deletions

View File

@ -9,6 +9,7 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.lang.exception.ExceptionUtils;
@ -42,6 +43,7 @@ import org.quartz.JobExecutionContext;
@Slf4j @Slf4j
public class DataInsightsApp extends AbstractNativeApplication { public class DataInsightsApp extends AbstractNativeApplication {
public static final String REPORT_DATA_TYPE_KEY = "ReportDataType"; public static final String REPORT_DATA_TYPE_KEY = "ReportDataType";
public static final String DATA_ASSET_INDEX_PREFIX = "di-data-assets";
@Getter private Long timestamp; @Getter private Long timestamp;
@Getter private int batchSize; @Getter private int batchSize;
@ -53,6 +55,24 @@ public class DataInsightsApp extends AbstractNativeApplication {
@Getter EventPublisherJob jobData; @Getter EventPublisherJob jobData;
private volatile boolean stopped = false; private volatile boolean stopped = false;
public final Set<String> dataAssetTypes =
Set.of(
"table",
"storedProcedure",
"databaseSchema",
"database",
"chart",
"dashboard",
"dashboardDataModel",
"pipeline",
"topic",
"container",
"searchIndex",
"mlmodel",
"dataProduct",
"glossaryTerm",
"tag");
public DataInsightsApp(CollectionDAO collectionDAO, SearchRepository searchRepository) { public DataInsightsApp(CollectionDAO collectionDAO, SearchRepository searchRepository) {
super(collectionDAO, searchRepository); super(collectionDAO, searchRepository);
} }
@ -75,12 +95,19 @@ public class DataInsightsApp extends AbstractNativeApplication {
return searchInterface; return searchInterface;
} }
public static String getDataStreamName(String dataAssetType) {
return String.format("%s-%s", DATA_ASSET_INDEX_PREFIX, dataAssetType).toLowerCase();
}
private void createDataAssetsDataStream() { private void createDataAssetsDataStream() {
DataInsightsSearchInterface searchInterface = getSearchInterface(); DataInsightsSearchInterface searchInterface = getSearchInterface();
try { try {
if (!searchInterface.dataAssetDataStreamExists("di-data-assets")) { for (String dataAssetType : dataAssetTypes) {
searchInterface.createDataAssetsDataStream(); String dataStreamName = getDataStreamName(dataAssetType);
if (!searchInterface.dataAssetDataStreamExists(dataStreamName)) {
searchInterface.createDataAssetsDataStream(dataStreamName);
}
} }
} catch (IOException ex) { } catch (IOException ex) {
LOG.error("Couldn't install DataInsightsApp: Can't initialize ElasticSearch Index.", ex); LOG.error("Couldn't install DataInsightsApp: Can't initialize ElasticSearch Index.", ex);
@ -91,8 +118,11 @@ public class DataInsightsApp extends AbstractNativeApplication {
DataInsightsSearchInterface searchInterface = getSearchInterface(); DataInsightsSearchInterface searchInterface = getSearchInterface();
try { try {
if (searchInterface.dataAssetDataStreamExists("di-data-assets")) { for (String dataAssetType : dataAssetTypes) {
searchInterface.deleteDataAssetDataStream(); String dataStreamName = getDataStreamName(dataAssetType);
if (searchInterface.dataAssetDataStreamExists(dataStreamName)) {
searchInterface.deleteDataAssetDataStream(dataStreamName);
}
} }
} catch (IOException ex) { } catch (IOException ex) {
LOG.error("Couldn't delete DataAssets DataStream", ex); LOG.error("Couldn't delete DataAssets DataStream", ex);
@ -231,7 +261,8 @@ public class DataInsightsApp extends AbstractNativeApplication {
private WorkflowStats processDataAssets() { private WorkflowStats processDataAssets() {
DataAssetsWorkflow workflow = DataAssetsWorkflow workflow =
new DataAssetsWorkflow(timestamp, batchSize, backfill, collectionDAO, searchRepository); new DataAssetsWorkflow(
timestamp, batchSize, backfill, dataAssetTypes, collectionDAO, searchRepository);
WorkflowStats workflowStats = workflow.getWorkflowStats(); WorkflowStats workflowStats = workflow.getWorkflowStats();
try { try {

View File

@ -23,9 +23,9 @@ public interface DataInsightsSearchInterface {
} }
} }
void createDataAssetsDataStream() throws IOException; void createDataAssetsDataStream(String name) throws IOException;
void deleteDataAssetDataStream() throws IOException; void deleteDataAssetDataStream(String name) throws IOException;
Boolean dataAssetDataStreamExists(String name) throws IOException; Boolean dataAssetDataStreamExists(String name) throws IOException;
} }

View File

@ -52,7 +52,7 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
} }
@Override @Override
public void createDataAssetsDataStream() throws IOException { public void createDataAssetsDataStream(String name) throws IOException {
String resourcePath = "/dataInsights/elasticsearch"; String resourcePath = "/dataInsights/elasticsearch";
createLifecyclePolicy( createLifecyclePolicy(
"di-data-assets-lifecycle", "di-data-assets-lifecycle",
@ -65,11 +65,11 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
readResource(String.format("%s/indexMappingsTemplate.json", resourcePath))); readResource(String.format("%s/indexMappingsTemplate.json", resourcePath)));
createIndexTemplate( createIndexTemplate(
"di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath))); "di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath)));
createDataStream("di-data-assets"); createDataStream(name);
} }
@Override @Override
public void deleteDataAssetDataStream() throws IOException { public void deleteDataAssetDataStream(String name) throws IOException {
performRequest("DELETE", "/_data_stream/di-data-assets"); performRequest("DELETE", String.format("/_data_stream/%s", name));
} }
} }

View File

@ -63,7 +63,7 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
} }
@Override @Override
public void createDataAssetsDataStream() throws IOException { public void createDataAssetsDataStream(String name) throws IOException {
String resourcePath = "/dataInsights/opensearch"; String resourcePath = "/dataInsights/opensearch";
createLifecyclePolicy( createLifecyclePolicy(
"di-data-assets-lifecycle", "di-data-assets-lifecycle",
@ -73,11 +73,11 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
readResource(String.format("%s/indexMappingsTemplate.json", resourcePath))); readResource(String.format("%s/indexMappingsTemplate.json", resourcePath)));
createIndexTemplate( createIndexTemplate(
"di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath))); "di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath)));
createDataStream("di-data-assets"); createDataStream(name);
} }
@Override @Override
public void deleteDataAssetDataStream() throws IOException { public void deleteDataAssetDataStream(String name) throws IOException {
performRequest("DELETE", "_data_stream/di-data-assets"); performRequest("DELETE", String.format("_data_stream/%s", name));
} }
} }

View File

@ -1,6 +1,7 @@
package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets; package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets;
import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
import static org.openmetadata.service.apps.bundles.insights.DataInsightsApp.getDataStreamName;
import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.END_TIMESTAMP_KEY; import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.END_TIMESTAMP_KEY;
import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.START_TIMESTAMP_KEY; import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.START_TIMESTAMP_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
@ -38,7 +39,7 @@ import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
@Slf4j @Slf4j
public class DataAssetsWorkflow { public class DataAssetsWorkflow {
public static final String DATA_ASSETS_DATA_STREAM = "di-data-assets"; public static final String DATA_STREAM_KEY = "DataStreamKey";
private final int retentionDays = 30; private final int retentionDays = 30;
private final Long startTimestamp; private final Long startTimestamp;
private final Long endTimestamp; private final Long endTimestamp;
@ -46,23 +47,7 @@ public class DataAssetsWorkflow {
private final SearchRepository searchRepository; private final SearchRepository searchRepository;
private final CollectionDAO collectionDAO; private final CollectionDAO collectionDAO;
private final List<PaginatedEntitiesSource> sources = new ArrayList<>(); private final List<PaginatedEntitiesSource> sources = new ArrayList<>();
private final Set<String> entityTypes = private final Set<String> entityTypes;
Set.of(
"table",
"storedProcedure",
"databaseSchema",
"database",
"chart",
"dashboard",
"dashboardDataModel",
"pipeline",
"topic",
"container",
"searchIndex",
"mlmodel",
"dataProduct",
"glossaryTerm",
"tag");
private DataInsightsEntityEnricherProcessor entityEnricher; private DataInsightsEntityEnricherProcessor entityEnricher;
private Processor entityProcessor; private Processor entityProcessor;
@ -73,6 +58,7 @@ public class DataAssetsWorkflow {
Long timestamp, Long timestamp,
int batchSize, int batchSize,
Optional<DataInsightsApp.Backfill> backfill, Optional<DataInsightsApp.Backfill> backfill,
Set<String> entityTypes,
CollectionDAO collectionDAO, CollectionDAO collectionDAO,
SearchRepository searchRepository) { SearchRepository searchRepository) {
if (backfill.isPresent()) { if (backfill.isPresent()) {
@ -105,6 +91,7 @@ public class DataAssetsWorkflow {
this.batchSize = batchSize; this.batchSize = batchSize;
this.searchRepository = searchRepository; this.searchRepository = searchRepository;
this.collectionDAO = collectionDAO; this.collectionDAO = collectionDAO;
this.entityTypes = entityTypes;
} }
private void initialize() { private void initialize() {
@ -144,9 +131,9 @@ public class DataAssetsWorkflow {
contextData.put(START_TIMESTAMP_KEY, startTimestamp); contextData.put(START_TIMESTAMP_KEY, startTimestamp);
contextData.put(END_TIMESTAMP_KEY, endTimestamp); contextData.put(END_TIMESTAMP_KEY, endTimestamp);
deleteDataBeforeInserting();
for (PaginatedEntitiesSource source : sources) { for (PaginatedEntitiesSource source : sources) {
deleteDataBeforeInserting(getDataStreamName(source.getEntityType()));
contextData.put(DATA_STREAM_KEY, getDataStreamName(source.getEntityType()));
contextData.put(ENTITY_TYPE_KEY, source.getEntityType()); contextData.put(ENTITY_TYPE_KEY, source.getEntityType());
while (!source.isDone()) { while (!source.isDone()) {
@ -191,12 +178,12 @@ public class DataAssetsWorkflow {
} }
} }
private void deleteDataBeforeInserting() throws SearchIndexException { private void deleteDataBeforeInserting(String dataStreamName) throws SearchIndexException {
try { try {
searchRepository searchRepository
.getSearchClient() .getSearchClient()
.deleteByQuery( .deleteByQuery(
DATA_ASSETS_DATA_STREAM, dataStreamName,
String.format( String.format(
"{\"@timestamp\": {\"gte\": %s, \"lte\": %s}}", startTimestamp, endTimestamp)); "{\"@timestamp\": {\"gte\": %s, \"lte\": %s}}", startTimestamp, endTimestamp));
} catch (Exception rx) { } catch (Exception rx) {

View File

@ -1,6 +1,6 @@
package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors; package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors;
import static org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.DataAssetsWorkflow.DATA_ASSETS_DATA_STREAM; import static org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.DataAssetsWorkflow.DATA_STREAM_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import es.org.elasticsearch.action.bulk.BulkRequest; import es.org.elasticsearch.action.bulk.BulkRequest;
@ -29,7 +29,7 @@ public class DataInsightsElasticSearchProcessor
@Override @Override
public BulkRequest process(List<Map<String, Object>> input, Map<String, Object> contextData) public BulkRequest process(List<Map<String, Object>> input, Map<String, Object> contextData)
throws SearchIndexException { throws SearchIndexException {
String index = DATA_ASSETS_DATA_STREAM; String index = (String) contextData.get(DATA_STREAM_KEY);
LOG.debug( LOG.debug(
"[EsEntitiesProcessor] Processing a Batch of Size: {}, Index: {} ", input.size(), index); "[EsEntitiesProcessor] Processing a Batch of Size: {}, Index: {} ", input.size(), index);
BulkRequest requests; BulkRequest requests;

View File

@ -92,7 +92,7 @@ public class DataInsightsEntityEnricherProcessor
EntityInterface versionEntity = EntityInterface versionEntity =
JsonUtils.readOrConvertValue( JsonUtils.readOrConvertValue(
version, ENTITY_TYPE_TO_CLASS_MAP.get(entityType.toLowerCase())); version, ENTITY_TYPE_TO_CLASS_MAP.get(entityType.toLowerCase()));
Long versionTimestamp = TimestampUtils.getEndOfDayTimestamp(versionEntity.getUpdatedAt()); Long versionTimestamp = TimestampUtils.getStartOfDayTimestamp(versionEntity.getUpdatedAt());
if (versionTimestamp > pointerTimestamp) { if (versionTimestamp > pointerTimestamp) {
continue; continue;
} else if (versionTimestamp < startTimestamp) { } else if (versionTimestamp < startTimestamp) {
@ -108,15 +108,12 @@ public class DataInsightsEntityEnricherProcessor
Map<String, Object> versionMap = new HashMap<>(); Map<String, Object> versionMap = new HashMap<>();
versionMap.put("endTimestamp", pointerTimestamp); versionMap.put("endTimestamp", pointerTimestamp);
versionMap.put("startTimestamp", versionTimestamp); versionMap.put("startTimestamp", TimestampUtils.getEndOfDayTimestamp(versionTimestamp));
versionMap.put("versionEntity", versionEntity); versionMap.put("versionEntity", versionEntity);
entityVersions.add(versionMap); entityVersions.add(versionMap);
if (versionTimestamp.equals(pointerTimestamp)) { pointerTimestamp =
pointerTimestamp = TimestampUtils.subtractDays(pointerTimestamp, 1); TimestampUtils.getEndOfDayTimestamp(TimestampUtils.subtractDays(versionTimestamp, 1));
} else {
pointerTimestamp = versionTimestamp;
}
} }
} }
return entityVersions; return entityVersions;

View File

@ -1,6 +1,6 @@
package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors; package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors;
import static org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.DataAssetsWorkflow.DATA_ASSETS_DATA_STREAM; import static org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.DataAssetsWorkflow.DATA_STREAM_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import java.util.List; import java.util.List;
@ -29,7 +29,7 @@ public class DataInsightsOpenSearchProcessor
@Override @Override
public BulkRequest process(List<Map<String, Object>> input, Map<String, Object> contextData) public BulkRequest process(List<Map<String, Object>> input, Map<String, Object> contextData)
throws SearchIndexException { throws SearchIndexException {
String index = DATA_ASSETS_DATA_STREAM; String index = (String) contextData.get(DATA_STREAM_KEY);
LOG.debug( LOG.debug(
"[OsEntitiesProcessor] Processing a Batch of Size: {}, Index: {} ", input.size(), index); "[OsEntitiesProcessor] Processing a Batch of Size: {}, Index: {} ", input.size(), index);
BulkRequest requests; BulkRequest requests;

View File

@ -18,7 +18,7 @@ public class DataInsightSystemChartRepository extends EntityRepository<DataInsig
private static final SearchClient searchClient = Entity.getSearchRepository().getSearchClient(); private static final SearchClient searchClient = Entity.getSearchRepository().getSearchClient();
public static final String TIMESTAMP_FIELD = "@timestamp"; public static final String TIMESTAMP_FIELD = "@timestamp";
public static final String DI_SEARCH_INDEX = "di-data-assets"; public static final String DI_SEARCH_INDEX = "di-data-assets-*";
public static final String FORMULA_FUNC_REGEX = public static final String FORMULA_FUNC_REGEX =
"\\b(count|sum|min|max|avg)+\\(k='([^']*)',?\\s*(q='([^']*)')?\\)?"; "\\b(count|sum|min|max|avg)+\\(k='([^']*)',?\\s*(q='([^']*)')?\\)?";

View File

@ -1,6 +1,6 @@
{ {
"index_patterns": [ "index_patterns": [
"di-data-assets" "di-data-assets-*"
], ],
"data_stream": {}, "data_stream": {},
"composed_of": [ "composed_of": [

View File

@ -1,6 +1,6 @@
{ {
"index_patterns": [ "index_patterns": [
"di-data-assets" "di-data-assets-*"
], ],
"data_stream": {}, "data_stream": {},
"composed_of": [ "composed_of": [

View File

@ -299,7 +299,7 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
// ------------------------------------------------- // -------------------------------------------------
RestClient searchClient = getSearchClient(); RestClient searchClient = getSearchClient();
es.org.elasticsearch.client.Response response; es.org.elasticsearch.client.Response response;
Request request = new Request("GET", "di-data-assets/_search"); Request request = new Request("GET", "di-data-assets-*/_search");
String payload = String payload =
String.format( String.format(
"{\"query\":{\"bool\":{\"must\":{\"term\":{\"fullyQualifiedName.keyword\":\"%s\"}}}}}", "{\"query\":{\"bool\":{\"must\":{\"term\":{\"fullyQualifiedName.keyword\":\"%s\"}}}}}",

View File

@ -16,6 +16,7 @@ import es.org.elasticsearch.client.RestClient;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.HttpResponseException; import org.apache.http.client.HttpResponseException;
@ -47,6 +48,23 @@ public class KpiResourceTest extends EntityResourceTest<Kpi, CreateKpiRequest> {
private void createDataAssetsDataStream() { private void createDataAssetsDataStream() {
DataInsightsSearchInterface searchInterface; DataInsightsSearchInterface searchInterface;
Set<String> dataAssetTypes =
Set.of(
"table",
"storedProcedure",
"databaseSchema",
"database",
"chart",
"dashboard",
"dashboardDataModel",
"pipeline",
"topic",
"container",
"searchIndex",
"mlmodel",
"dataProduct",
"glossaryTerm",
"tag");
if (getSearchRepository() if (getSearchRepository()
.getSearchType() .getSearchType()
.equals(ElasticSearchConfiguration.SearchType.ELASTICSEARCH)) { .equals(ElasticSearchConfiguration.SearchType.ELASTICSEARCH)) {
@ -59,17 +77,21 @@ public class KpiResourceTest extends EntityResourceTest<Kpi, CreateKpiRequest> {
(os.org.opensearch.client.RestClient) (os.org.opensearch.client.RestClient)
getSearchRepository().getSearchClient().getLowLevelClient()); getSearchRepository().getSearchClient().getLowLevelClient());
} }
try { try {
if (!searchInterface.dataAssetDataStreamExists("di-data-assets")) { for (String dataAssetType : dataAssetTypes) {
searchInterface.createDataAssetsDataStream(); String dataStreamName =
String.format("%s-%s", "di-data-assets", dataAssetType).toLowerCase();
if (!searchInterface.dataAssetDataStreamExists(dataStreamName)) {
searchInterface.createDataAssetsDataStream(dataStreamName);
}
} }
} catch (IOException ex) { } catch (IOException ex) {
LOG.error("Couldn't install DataInsightsApp: Can't initialize ElasticSearch Index."); LOG.error("Couldn't install DataInsightsApp: Can't initialize ElasticSearch Index.", ex);
} }
} }
public void setupKpi() throws IOException { public void setupKpi() throws IOException {
createDataAssetsDataStream(); createDataAssetsDataStream();
KPI_TARGET = new KpiTarget().withName("Percentage").withValue("80.0"); KPI_TARGET = new KpiTarget().withName("Percentage").withValue("80.0");
} }