diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java index 3373330c517..b913ca3a4c3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java @@ -249,10 +249,11 @@ public class DataAssetsWorkflow { private void deleteBasedOnDataRetentionPolicy(String dataStreamName) throws SearchIndexException { long retentionLimitTimestamp = TimestampUtils.subtractDays(System.currentTimeMillis(), dataAssetsConfig.getRetention()); - String rangeTermQuery = - String.format("{ \"@timestamp\": { \"lte\": %s } }", retentionLimitTimestamp); try { - searchRepository.getSearchClient().deleteByQuery(dataStreamName, rangeTermQuery); + searchRepository + .getSearchClient() + .deleteByRangeQuery( + dataStreamName, "@timestamp", null, null, null, retentionLimitTimestamp); } catch (Exception rx) { throw new SearchIndexException(new IndexingError().withMessage(rx.getMessage())); } @@ -264,7 +265,10 @@ public class DataAssetsWorkflow { "{ \"@timestamp\": { \"gte\": %s, \"lte\": %s } }", startTimestamp, endTimestamp); try { if (dataAssetsConfig.getServiceFilter() == null) { - searchRepository.getSearchClient().deleteByQuery(dataStreamName, rangeTermQuery); + searchRepository + .getSearchClient() + .deleteByRangeQuery( + dataStreamName, "@timestamp", null, startTimestamp, null, endTimestamp); } else { searchRepository .getSearchClient() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataQuality/DataQualityWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataQuality/DataQualityWorkflow.java index 89438c8029f..adbd03ab172 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataQuality/DataQualityWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataQuality/DataQualityWorkflow.java @@ -135,10 +135,7 @@ public class DataQualityWorkflow { try { searchRepository .getSearchClient() - .deleteByQuery( - indexName, - String.format( - "{\"@timestamp\": {\"gte\": %s, \"lte\": %s}}", startTimestamp, endTimestamp)); + .deleteByRangeQuery(indexName, "@timestamp", null, startTimestamp, null, endTimestamp); } catch (Exception rx) { throw new SearchIndexException(new IndexingError().withMessage(rx.getMessage())); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/EntityManagementClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/EntityManagementClient.java index f8afc640c7b..73585c299cc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/EntityManagementClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/EntityManagementClient.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.commons.lang3.tuple.Pair; +import org.openmetadata.schema.api.lineage.EsLineageData; /** * Interface for entity management operations in search. @@ -169,4 +170,40 @@ public interface EntityManagementClient { String pipelineName, String entityType, List entityIds); + + /** + * Updates entities by FQN prefix by replacing old parent FQN with new parent FQN. + * + * @param indexName the name of the index + * @param oldParentFQN the old parent FQN to be replaced + * @param newParentFQN the new parent FQN to replace with + * @param prefixFieldCondition the field name to match for prefix query + */ + void updateByFqnPrefix( + String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition); + + /** + * Updates lineage information for entities. + * + * @param indexName the name of the index + * @param fieldAndValue field and value pair to match entities + * @param lineageData the lineage data to update + */ + void updateLineage( + String indexName, Pair fieldAndValue, EsLineageData lineageData); + + /** + * Deletes entities matching a range query on a specific field. + * + * @param index the index name + * @param fieldName the field name to apply the range query on + * @param gt greater than value (exclusive), can be null + * @param gte greater than or equal to value (inclusive), can be null + * @param lt less than value (exclusive), can be null + * @param lte less than or equal to value (inclusive), can be null + * @throws IOException if there's an error during the delete operation + */ + void deleteByRangeQuery( + String index, String fieldName, Object gt, Object gte, Object lt, Object lte) + throws IOException; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index 31199ceb581..19750974be9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -11,12 +11,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import org.apache.commons.lang3.tuple.Pair; import org.openmetadata.schema.api.entityRelationship.SearchEntityRelationshipRequest; import org.openmetadata.schema.api.entityRelationship.SearchEntityRelationshipResult; import org.openmetadata.schema.api.entityRelationship.SearchSchemaEntityRelationshipResult; import org.openmetadata.schema.api.lineage.EntityCountLineageRequest; -import org.openmetadata.schema.api.lineage.EsLineageData; import org.openmetadata.schema.api.lineage.LineagePaginationInfo; import org.openmetadata.schema.api.lineage.SearchLineageRequest; import org.openmetadata.schema.api.lineage.SearchLineageResult; @@ -118,6 +116,27 @@ public interface SearchClient extends IndexManagementClient, EntityManagement } """; + String UPDATE_FQN_PREFIX_SCRIPT = + """ + String updatedFQN = ctx._source.fullyQualifiedName.replace(params.oldParentFQN, params.newParentFQN); + ctx._source.fullyQualifiedName = updatedFQN; + ctx._source.fqnDepth = updatedFQN.splitOnToken('.').length; + if (ctx._source.containsKey('parent')) { + if (ctx._source.parent.containsKey('fullyQualifiedName')) { + String parentFQN = ctx._source.parent.fullyQualifiedName; + ctx._source.parent.fullyQualifiedName = parentFQN.replace(params.oldParentFQN, params.newParentFQN); + } + } + if (ctx._source.containsKey('tags')) { + for (int i = 0; i < ctx._source.tags.size(); i++) { + if (ctx._source.tags[i].containsKey('tagFQN')) { + String tagFQN = ctx._source.tags[i].tagFQN; + ctx._source.tags[i].tagFQN = tagFQN.replace(params.oldParentFQN, params.newParentFQN); + } + } + } + """; + String REMOVE_LINEAGE_SCRIPT = "ctx._source.upstreamLineage.removeIf(lineage -> lineage.docUniqueId == params.docUniqueId)"; @@ -495,12 +514,6 @@ public interface SearchClient extends IndexManagementClient, EntityManagement /* This function takes in Entity Reference, Search for occurances of those entity across ES, and perform an update for that with reindexing the data from the database to ES */ void reindexAcrossIndices(String matchingKey, EntityReference sourceRef); - void updateByFqnPrefix( - String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition); - - void updateLineage( - String indexName, Pair fieldAndValue, EsLineageData lineageData); - Response listDataInsightChartResult( Long startTs, Long endTs, @@ -513,9 +526,6 @@ public interface SearchClient extends IndexManagementClient, EntityManagement String dataReportIndex) throws IOException; - // TODO: Think if it makes sense to have this or maybe a specific deleteByRange - void deleteByQuery(String index, String query); - void deleteByRangeAndTerm(String index, String rangeQueryStr, String termKey, String termValue); default BulkResponse bulk(BulkRequest data, RequestOptions options) throws IOException { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 88aa8eabce5..46fabe55128 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -44,8 +44,6 @@ import es.org.elasticsearch.common.ParsingException; import es.org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import es.org.elasticsearch.core.TimeValue; import es.org.elasticsearch.index.query.BoolQueryBuilder; -import es.org.elasticsearch.index.query.MatchQueryBuilder; -import es.org.elasticsearch.index.query.Operator; import es.org.elasticsearch.index.query.PrefixQueryBuilder; import es.org.elasticsearch.index.query.QueryBuilder; import es.org.elasticsearch.index.query.QueryBuilders; @@ -1687,47 +1685,7 @@ public class ElasticSearchClient implements SearchClient { @Override public void updateByFqnPrefix( String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition) { - // Match all children documents whose fullyQualifiedName starts with the old parent's FQN - PrefixQueryBuilder prefixQuery = new PrefixQueryBuilder(prefixFieldCondition, oldParentFQN); - - UpdateByQueryRequest updateByQueryRequest = - new UpdateByQueryRequest(Entity.getSearchRepository().getIndexOrAliasName(indexName)); - updateByQueryRequest.setQuery(prefixQuery); - - Map params = new HashMap<>(); - params.put("oldParentFQN", oldParentFQN); - params.put("newParentFQN", newParentFQN); - - String painlessScript = - "String updatedFQN = ctx._source.fullyQualifiedName.replace(params.oldParentFQN, params.newParentFQN); " - + "ctx._source.fullyQualifiedName = updatedFQN; " - + "ctx._source.fqnDepth = updatedFQN.splitOnToken('.').length; " - + "if (ctx._source.containsKey('parent')) { " - + " if (ctx._source.parent.containsKey('fullyQualifiedName')) { " - + " String parentFQN = ctx._source.parent.fullyQualifiedName; " - + " ctx._source.parent.fullyQualifiedName = parentFQN.replace(params.oldParentFQN, params.newParentFQN); " - + " } " - + "} " - + "if (ctx._source.containsKey('tags')) { " - + " for (int i = 0; i < ctx._source.tags.size(); i++) { " - + " if (ctx._source.tags[i].containsKey('tagFQN')) { " - + " String tagFQN = ctx._source.tags[i].tagFQN; " - + " ctx._source.tags[i].tagFQN = tagFQN.replace(params.oldParentFQN, params.newParentFQN); " - + " } " - + " } " - + "}"; - - Script inlineScript = - new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, painlessScript, params); - - updateByQueryRequest.setScript(inlineScript); - - try { - updateElasticSearchByQuery(updateByQueryRequest); - LOG.info("Successfully propagated FQN updates for parent FQN: {}", oldParentFQN); - } catch (Exception e) { - LOG.error("Error while propagating FQN updates: {}", e.getMessage(), e); - } + entityManager.updateByFqnPrefix(indexName, oldParentFQN, newParentFQN, prefixFieldCondition); } @Override @@ -1761,18 +1719,7 @@ public class ElasticSearchClient implements SearchClient { @Override public void updateLineage( String indexName, Pair fieldAndValue, EsLineageData lineageData) { - if (isClientAvailable) { - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); - updateByQueryRequest.setQuery( - new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue()) - .operator(Operator.AND)); - Map params = - Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData)); - Script script = - new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params); - updateByQueryRequest.setScript(script); - updateElasticSearchByQuery(updateByQueryRequest); - } + entityManager.updateLineage(indexName, fieldAndValue, lineageData); } @SneakyThrows @@ -1788,15 +1735,11 @@ public class ElasticSearchClient implements SearchClient { @Override public void close() {} - @SneakyThrows - public void deleteByQuery(String index, String query) { - DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(index); - // Hack: Due to an issue on how the RangeQueryBuilder.fromXContent works, we're removing the - // first token from the Parser - XContentParser parser = createXContentParser(query); - parser.nextToken(); - deleteRequest.setQuery(RangeQueryBuilder.fromXContent(parser)); - deleteEntityFromElasticSearchByQuery(deleteRequest); + @Override + public void deleteByRangeQuery( + String index, String fieldName, Object gt, Object gte, Object lt, Object lte) + throws IOException { + entityManager.deleteByRangeQuery(index, fieldName, gt, gte, lt, lte); } @SneakyThrows diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java index ba76ec7945b..ebca1e3c962 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java @@ -2,6 +2,8 @@ package org.openmetadata.service.search.elasticsearch; import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse; import static org.openmetadata.service.search.SearchClient.ADD_UPDATE_ENTITY_RELATIONSHIP; +import static org.openmetadata.service.search.SearchClient.ADD_UPDATE_LINEAGE; +import static org.openmetadata.service.search.SearchClient.UPDATE_FQN_PREFIX_SCRIPT; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -10,12 +12,14 @@ import es.co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; import es.co.elastic.clients.elasticsearch.ElasticsearchClient; import es.co.elastic.clients.elasticsearch._types.BulkIndexByScrollFailure; import es.co.elastic.clients.elasticsearch._types.ElasticsearchException; +import es.co.elastic.clients.elasticsearch._types.ErrorCause; import es.co.elastic.clients.elasticsearch._types.FieldValue; import es.co.elastic.clients.elasticsearch._types.Refresh; import es.co.elastic.clients.elasticsearch._types.ScriptLanguage; import es.co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery; import es.co.elastic.clients.elasticsearch._types.query_dsl.Operator; import es.co.elastic.clients.elasticsearch._types.query_dsl.Query; +import es.co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery; import es.co.elastic.clients.elasticsearch.core.BulkResponse; import es.co.elastic.clients.elasticsearch.core.DeleteByQueryResponse; import es.co.elastic.clients.elasticsearch.core.DeleteResponse; @@ -34,6 +38,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; +import org.openmetadata.schema.api.lineage.EsLineageData; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.sdk.exception.SearchException; import org.openmetadata.sdk.exception.SearchIndexNotFoundException; @@ -574,4 +579,135 @@ public class ElasticSearchEntityManager implements EntityManagementClient { } return JsonData.of(docMap); } + + @Override + public void updateByFqnPrefix( + String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition) { + Query prefixQuery = + Query.of(q -> q.prefix(p -> p.field(prefixFieldCondition).value(oldParentFQN))); + + Map params = + Map.of( + "oldParentFQN", JsonData.of(oldParentFQN), + "newParentFQN", JsonData.of(newParentFQN)); + + try { + UpdateByQueryResponse updateResponse = + client.updateByQuery( + req -> + req.index(Entity.getSearchRepository().getIndexOrAliasName(indexName)) + .query(prefixQuery) + .script( + s -> + s.inline( + i -> + i.lang(ScriptLanguage.Painless) + .source(UPDATE_FQN_PREFIX_SCRIPT) + .params(params))) + .refresh(true)); + + LOG.info("Successfully propagated FQN updates for parent FQN: {}", oldParentFQN); + + if (!updateResponse.failures().isEmpty()) { + String errorMessage = + updateResponse.failures().stream() + .map(BulkIndexByScrollFailure::cause) + .map(ErrorCause::reason) + .collect(Collectors.joining(", ")); + LOG.error("Failed to update FQN prefix: {}", errorMessage); + } + + } catch (Exception e) { + LOG.error("Error while propagating FQN updates: {}", e.getMessage(), e); + } + } + + @Override + public void updateLineage( + String indexName, Pair fieldAndValue, EsLineageData lineageData) { + if (!isClientAvailable) { + LOG.error("ElasticSearch client is not available. Cannot update lineage."); + return; + } + + try { + Map params = + Collections.singletonMap("lineageData", JsonData.of(JsonUtils.getMap(lineageData))); + + UpdateByQueryResponse response = + client.updateByQuery( + u -> + u.index(indexName) + .query( + q -> + q.match( + m -> + m.field(fieldAndValue.getKey()) + .query(fieldAndValue.getValue()) + .operator(Operator.And))) + .script( + s -> + s.inline( + inline -> + inline + .lang(ScriptLanguage.Painless) + .source(ADD_UPDATE_LINEAGE) + .params(params))) + .refresh(true)); + + LOG.info("Successfully updated lineage in ElasticSearch for index: {}", indexName); + + if (!response.failures().isEmpty()) { + String failureDetails = + response.failures().stream() + .map(BulkIndexByScrollFailure::toString) + .collect(Collectors.joining("; ")); + LOG.error("Update lineage encountered failures: {}", failureDetails); + } + + } catch (IOException | ElasticsearchException e) { + LOG.error("Failed to update lineage in ElasticSearch for index: {}", indexName, e); + } + } + + @Override + public void deleteByRangeQuery( + String index, String fieldName, Object gt, Object gte, Object lt, Object lte) + throws IOException { + if (!isClientAvailable) { + LOG.error("Elasticsearch client is not available. Cannot delete by range query."); + return; + } + + // Build the range query + Query query = + Query.of( + q -> + q.range( + r -> { + RangeQuery.Builder builder = new RangeQuery.Builder().field(fieldName); + if (gt != null) builder.gt(JsonData.of(gt)); + if (gte != null) builder.gte(JsonData.of(gte)); + if (lt != null) builder.lt(JsonData.of(lt)); + if (lte != null) builder.lte(JsonData.of(lte)); + return builder; + })); + + // Execute delete-by-query with refresh + DeleteByQueryResponse response = + client.deleteByQuery(d -> d.index(index).query(query).refresh(true)); + + LOG.info( + "DeleteByQuery response from ES - Deleted: {}, Failures: {}", + response.deleted(), + response.failures().size()); + + if (!response.failures().isEmpty()) { + String failureDetails = + response.failures().stream() + .map(BulkIndexByScrollFailure::toString) + .collect(Collectors.joining("; ")); + LOG.error("DeleteByQuery encountered failures: {}", failureDetails); + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index f8a6f26b514..50f7daba5ae 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -168,7 +168,6 @@ import os.org.opensearch.common.xcontent.XContentParser; import os.org.opensearch.common.xcontent.XContentType; import os.org.opensearch.index.IndexNotFoundException; import os.org.opensearch.index.query.BoolQueryBuilder; -import os.org.opensearch.index.query.MatchQueryBuilder; import os.org.opensearch.index.query.MultiMatchQueryBuilder; import os.org.opensearch.index.query.Operator; import os.org.opensearch.index.query.PrefixQueryBuilder; @@ -1846,62 +1845,13 @@ public class OpenSearchClient implements SearchClient { @Override public void updateByFqnPrefix( String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition) { - // Match all children documents whose fullyQualifiedName starts with the old parent's FQN - PrefixQueryBuilder prefixQuery = new PrefixQueryBuilder(prefixFieldCondition, oldParentFQN); - UpdateByQueryRequest updateByQueryRequest = - new UpdateByQueryRequest(Entity.getSearchRepository().getIndexOrAliasName(indexName)); - updateByQueryRequest.setQuery(prefixQuery); - - Map params = new HashMap<>(); - params.put("oldParentFQN", oldParentFQN); - params.put("newParentFQN", newParentFQN); - - String painlessScript = - "String updatedFQN = ctx._source.fullyQualifiedName.replace(params.oldParentFQN, params.newParentFQN); " - + "ctx._source.fullyQualifiedName = updatedFQN; " - + "ctx._source.fqnDepth = updatedFQN.splitOnToken('.').length; " - + "if (ctx._source.containsKey('parent')) { " - + " if (ctx._source.parent.containsKey('fullyQualifiedName')) { " - + " String parentFQN = ctx._source.parent.fullyQualifiedName; " - + " ctx._source.parent.fullyQualifiedName = parentFQN.replace(params.oldParentFQN, params.newParentFQN); " - + " } " - + "} " - + "if (ctx._source.containsKey('tags')) { " - + " for (int i = 0; i < ctx._source.tags.size(); i++) { " - + " if (ctx._source.tags[i].containsKey('tagFQN')) { " - + " String tagFQN = ctx._source.tags[i].tagFQN; " - + " ctx._source.tags[i].tagFQN = tagFQN.replace(params.oldParentFQN, params.newParentFQN); " - + " } " - + " } " - + "}"; - Script inlineScript = - new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, painlessScript, params); - - updateByQueryRequest.setScript(inlineScript); - - try { - updateOpenSearchByQuery(updateByQueryRequest); - LOG.info("Successfully propagated FQN updates for parent FQN: {}", oldParentFQN); - } catch (Exception e) { - LOG.error("Error while propagating FQN updates: {}", e.getMessage(), e); - } + entityManager.updateByFqnPrefix(indexName, oldParentFQN, newParentFQN, prefixFieldCondition); } @Override public void updateLineage( String indexName, Pair fieldAndValue, EsLineageData lineageData) { - if (isClientAvailable) { - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); - updateByQueryRequest.setQuery( - new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue()) - .operator(Operator.AND)); - Map params = - Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData)); - Script script = - new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params); - updateByQueryRequest.setScript(script); - updateOpenSearchByQuery(updateByQueryRequest); - } + entityManager.updateLineage(indexName, fieldAndValue, lineageData); } @Override @@ -1932,15 +1882,11 @@ public class OpenSearchClient implements SearchClient { } } - @SneakyThrows - public void deleteByQuery(String index, String query) { - DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(index); - // Hack: Due to an issue on how the RangeQueryBuilder.fromXContent works, we're removing the - // first token from the Parser - XContentParser parser = createXContentParser(query); - parser.nextToken(); - deleteRequest.setQuery(RangeQueryBuilder.fromXContent(parser)); - deleteEntityFromOpenSearchByQuery(deleteRequest); + @Override + public void deleteByRangeQuery( + String index, String fieldName, Object gt, Object gte, Object lt, Object lte) + throws IOException { + entityManager.deleteByRangeQuery(index, fieldName, gt, gte, lt, lte); } @SneakyThrows diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java index 625c43c0d94..169f2f04e4e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java @@ -2,6 +2,8 @@ package org.openmetadata.service.search.opensearch; import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse; import static org.openmetadata.service.search.SearchClient.ADD_UPDATE_ENTITY_RELATIONSHIP; +import static org.openmetadata.service.search.SearchClient.ADD_UPDATE_LINEAGE; +import static org.openmetadata.service.search.SearchClient.UPDATE_FQN_PREFIX_SCRIPT; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -18,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; +import org.openmetadata.schema.api.lineage.EsLineageData; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.sdk.exception.SearchException; import org.openmetadata.sdk.exception.SearchIndexNotFoundException; @@ -27,12 +30,14 @@ import os.org.opensearch.client.json.JsonData; import os.org.opensearch.client.opensearch.OpenSearchAsyncClient; import os.org.opensearch.client.opensearch.OpenSearchClient; import os.org.opensearch.client.opensearch._types.BulkIndexByScrollFailure; +import os.org.opensearch.client.opensearch._types.ErrorCause; import os.org.opensearch.client.opensearch._types.FieldValue; import os.org.opensearch.client.opensearch._types.OpenSearchException; import os.org.opensearch.client.opensearch._types.Refresh; import os.org.opensearch.client.opensearch._types.query_dsl.BoolQuery; import os.org.opensearch.client.opensearch._types.query_dsl.Operator; import os.org.opensearch.client.opensearch._types.query_dsl.Query; +import os.org.opensearch.client.opensearch._types.query_dsl.RangeQuery; import os.org.opensearch.client.opensearch.core.BulkRequest; import os.org.opensearch.client.opensearch.core.BulkResponse; import os.org.opensearch.client.opensearch.core.DeleteByQueryResponse; @@ -577,4 +582,133 @@ public class OpenSearchEntityManager implements EntityManagementClient { } return JsonData.of(docMap); } + + @Override + public void updateByFqnPrefix( + String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition) { + Query prefixQuery = + Query.of(q -> q.prefix(p -> p.field(prefixFieldCondition).value(oldParentFQN))); + + Map params = + Map.of( + "oldParentFQN", JsonData.of(oldParentFQN), + "newParentFQN", JsonData.of(newParentFQN)); + + try { + UpdateByQueryResponse updateResponse = + client.updateByQuery( + req -> + req.index(Entity.getSearchRepository().getIndexOrAliasName(indexName)) + .query(prefixQuery) + .script( + s -> + s.inline( + i -> + i.lang(ScriptLanguage.Painless.jsonValue()) + .source(UPDATE_FQN_PREFIX_SCRIPT) + .params(params))) + .refresh(true)); + + LOG.info("Successfully propagated FQN updates for parent FQN: {}", oldParentFQN); + + if (!updateResponse.failures().isEmpty()) { + String errorMessage = + updateResponse.failures().stream() + .map(BulkIndexByScrollFailure::cause) + .map(ErrorCause::reason) + .collect(Collectors.joining(", ")); + LOG.error("Failed to update FQN prefix: {}", errorMessage); + } + } catch (Exception e) { + LOG.error("Error while propagating FQN updates: {}", e.getMessage(), e); + } + } + + @Override + public void updateLineage( + String indexName, Pair fieldAndValue, EsLineageData lineageData) { + if (!isClientAvailable) { + LOG.error("OpenSearch client is not available. Cannot update lineage."); + return; + } + + try { + Map params = + Collections.singletonMap("lineageData", JsonData.of(JsonUtils.getMap(lineageData))); + + UpdateByQueryResponse response = + client.updateByQuery( + u -> + u.index(indexName) + .query( + q -> + q.match( + m -> + m.field(fieldAndValue.getKey()) + .query(FieldValue.of(fieldAndValue.getValue())) + .operator(Operator.And))) + .script( + s -> + s.inline( + inline -> + inline + .lang(ScriptLanguage.Painless.jsonValue()) + .source(ADD_UPDATE_LINEAGE) + .params(params))) + .refresh(true)); + + LOG.info("Successfully updated lineage in OpenSearch for index: {}", indexName); + + if (!response.failures().isEmpty()) { + String failureDetails = + response.failures().stream() + .map(BulkIndexByScrollFailure::toString) + .collect(Collectors.joining("; ")); + LOG.error("Update lineage encountered failures: {}", failureDetails); + } + + } catch (IOException | OpenSearchException e) { + LOG.error("Failed to update lineage in OpenSearch for index: {}", indexName, e); + } + } + + @Override + public void deleteByRangeQuery( + String index, String fieldName, Object gt, Object gte, Object lt, Object lte) + throws IOException { + if (!isClientAvailable) { + LOG.error("OpenSearch client is not available. Cannot delete by range query."); + return; + } + // Build the range query + Query query = + Query.of( + q -> + q.range( + r -> { + RangeQuery.Builder builder = new RangeQuery.Builder().field(fieldName); + if (gt != null) builder.gt(JsonData.of(gt)); + if (gte != null) builder.gte(JsonData.of(gte)); + if (lt != null) builder.lt(JsonData.of(lt)); + if (lte != null) builder.lte(JsonData.of(lte)); + return builder; + })); + + // Execute delete-by-query with refresh + DeleteByQueryResponse response = + client.deleteByQuery(d -> d.index(index).query(query).refresh(true)); + + LOG.info( + "DeleteByQuery response from OS - Deleted: {}, Failures: {}", + response.deleted(), + response.failures().size()); + + if (!response.failures().isEmpty()) { + String failureDetails = + response.failures().stream() + .map(BulkIndexByScrollFailure::toString) + .collect(Collectors.joining("; ")); + LOG.error("DeleteByQuery encountered failures: {}", failureDetails); + } + } }