MIgrated updateByFqnPrefix and updateLineage and deleteByRangeQuery methods

This commit is contained in:
Bhanu Agrawal 2025-09-30 12:01:49 +01:00
parent 6aff80665c
commit 39dc438f7f
8 changed files with 351 additions and 144 deletions

View File

@ -249,10 +249,11 @@ public class DataAssetsWorkflow {
private void deleteBasedOnDataRetentionPolicy(String dataStreamName) throws SearchIndexException {
long retentionLimitTimestamp =
TimestampUtils.subtractDays(System.currentTimeMillis(), dataAssetsConfig.getRetention());
String rangeTermQuery =
String.format("{ \"@timestamp\": { \"lte\": %s } }", retentionLimitTimestamp);
try {
searchRepository.getSearchClient().deleteByQuery(dataStreamName, rangeTermQuery);
searchRepository
.getSearchClient()
.deleteByRangeQuery(
dataStreamName, "@timestamp", null, null, null, retentionLimitTimestamp);
} catch (Exception rx) {
throw new SearchIndexException(new IndexingError().withMessage(rx.getMessage()));
}
@ -264,7 +265,10 @@ public class DataAssetsWorkflow {
"{ \"@timestamp\": { \"gte\": %s, \"lte\": %s } }", startTimestamp, endTimestamp);
try {
if (dataAssetsConfig.getServiceFilter() == null) {
searchRepository.getSearchClient().deleteByQuery(dataStreamName, rangeTermQuery);
searchRepository
.getSearchClient()
.deleteByRangeQuery(
dataStreamName, "@timestamp", null, startTimestamp, null, endTimestamp);
} else {
searchRepository
.getSearchClient()

View File

@ -135,10 +135,7 @@ public class DataQualityWorkflow {
try {
searchRepository
.getSearchClient()
.deleteByQuery(
indexName,
String.format(
"{\"@timestamp\": {\"gte\": %s, \"lte\": %s}}", startTimestamp, endTimestamp));
.deleteByRangeQuery(indexName, "@timestamp", null, startTimestamp, null, endTimestamp);
} catch (Exception rx) {
throw new SearchIndexException(new IndexingError().withMessage(rx.getMessage()));
}

View File

@ -6,6 +6,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.api.lineage.EsLineageData;
/**
* Interface for entity management operations in search.
@ -169,4 +170,40 @@ public interface EntityManagementClient {
String pipelineName,
String entityType,
List<UUID> entityIds);
/**
* Updates entities by FQN prefix by replacing old parent FQN with new parent FQN.
*
* @param indexName the name of the index
* @param oldParentFQN the old parent FQN to be replaced
* @param newParentFQN the new parent FQN to replace with
* @param prefixFieldCondition the field name to match for prefix query
*/
void updateByFqnPrefix(
String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition);
/**
* Updates lineage information for entities.
*
* @param indexName the name of the index
* @param fieldAndValue field and value pair to match entities
* @param lineageData the lineage data to update
*/
void updateLineage(
String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData);
/**
* Deletes entities matching a range query on a specific field.
*
* @param index the index name
* @param fieldName the field name to apply the range query on
* @param gt greater than value (exclusive), can be null
* @param gte greater than or equal to value (inclusive), can be null
* @param lt less than value (exclusive), can be null
* @param lte less than or equal to value (inclusive), can be null
* @throws IOException if there's an error during the delete operation
*/
void deleteByRangeQuery(
String index, String fieldName, Object gt, Object gte, Object lt, Object lte)
throws IOException;
}

View File

@ -11,12 +11,10 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.api.entityRelationship.SearchEntityRelationshipRequest;
import org.openmetadata.schema.api.entityRelationship.SearchEntityRelationshipResult;
import org.openmetadata.schema.api.entityRelationship.SearchSchemaEntityRelationshipResult;
import org.openmetadata.schema.api.lineage.EntityCountLineageRequest;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.api.lineage.LineagePaginationInfo;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
@ -118,6 +116,27 @@ public interface SearchClient<T> extends IndexManagementClient, EntityManagement
}
""";
String UPDATE_FQN_PREFIX_SCRIPT =
"""
String updatedFQN = ctx._source.fullyQualifiedName.replace(params.oldParentFQN, params.newParentFQN);
ctx._source.fullyQualifiedName = updatedFQN;
ctx._source.fqnDepth = updatedFQN.splitOnToken('.').length;
if (ctx._source.containsKey('parent')) {
if (ctx._source.parent.containsKey('fullyQualifiedName')) {
String parentFQN = ctx._source.parent.fullyQualifiedName;
ctx._source.parent.fullyQualifiedName = parentFQN.replace(params.oldParentFQN, params.newParentFQN);
}
}
if (ctx._source.containsKey('tags')) {
for (int i = 0; i < ctx._source.tags.size(); i++) {
if (ctx._source.tags[i].containsKey('tagFQN')) {
String tagFQN = ctx._source.tags[i].tagFQN;
ctx._source.tags[i].tagFQN = tagFQN.replace(params.oldParentFQN, params.newParentFQN);
}
}
}
""";
String REMOVE_LINEAGE_SCRIPT =
"ctx._source.upstreamLineage.removeIf(lineage -> lineage.docUniqueId == params.docUniqueId)";
@ -495,12 +514,6 @@ public interface SearchClient<T> extends IndexManagementClient, EntityManagement
/* This function takes in Entity Reference, Search for occurances of those entity across ES, and perform an update for that with reindexing the data from the database to ES */
void reindexAcrossIndices(String matchingKey, EntityReference sourceRef);
void updateByFqnPrefix(
String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition);
void updateLineage(
String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData);
Response listDataInsightChartResult(
Long startTs,
Long endTs,
@ -513,9 +526,6 @@ public interface SearchClient<T> extends IndexManagementClient, EntityManagement
String dataReportIndex)
throws IOException;
// TODO: Think if it makes sense to have this or maybe a specific deleteByRange
void deleteByQuery(String index, String query);
void deleteByRangeAndTerm(String index, String rangeQueryStr, String termKey, String termValue);
default BulkResponse bulk(BulkRequest data, RequestOptions options) throws IOException {

View File

@ -44,8 +44,6 @@ import es.org.elasticsearch.common.ParsingException;
import es.org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import es.org.elasticsearch.core.TimeValue;
import es.org.elasticsearch.index.query.BoolQueryBuilder;
import es.org.elasticsearch.index.query.MatchQueryBuilder;
import es.org.elasticsearch.index.query.Operator;
import es.org.elasticsearch.index.query.PrefixQueryBuilder;
import es.org.elasticsearch.index.query.QueryBuilder;
import es.org.elasticsearch.index.query.QueryBuilders;
@ -1687,47 +1685,7 @@ public class ElasticSearchClient implements SearchClient<RestHighLevelClient> {
@Override
public void updateByFqnPrefix(
String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition) {
// Match all children documents whose fullyQualifiedName starts with the old parent's FQN
PrefixQueryBuilder prefixQuery = new PrefixQueryBuilder(prefixFieldCondition, oldParentFQN);
UpdateByQueryRequest updateByQueryRequest =
new UpdateByQueryRequest(Entity.getSearchRepository().getIndexOrAliasName(indexName));
updateByQueryRequest.setQuery(prefixQuery);
Map<String, Object> params = new HashMap<>();
params.put("oldParentFQN", oldParentFQN);
params.put("newParentFQN", newParentFQN);
String painlessScript =
"String updatedFQN = ctx._source.fullyQualifiedName.replace(params.oldParentFQN, params.newParentFQN); "
+ "ctx._source.fullyQualifiedName = updatedFQN; "
+ "ctx._source.fqnDepth = updatedFQN.splitOnToken('.').length; "
+ "if (ctx._source.containsKey('parent')) { "
+ " if (ctx._source.parent.containsKey('fullyQualifiedName')) { "
+ " String parentFQN = ctx._source.parent.fullyQualifiedName; "
+ " ctx._source.parent.fullyQualifiedName = parentFQN.replace(params.oldParentFQN, params.newParentFQN); "
+ " } "
+ "} "
+ "if (ctx._source.containsKey('tags')) { "
+ " for (int i = 0; i < ctx._source.tags.size(); i++) { "
+ " if (ctx._source.tags[i].containsKey('tagFQN')) { "
+ " String tagFQN = ctx._source.tags[i].tagFQN; "
+ " ctx._source.tags[i].tagFQN = tagFQN.replace(params.oldParentFQN, params.newParentFQN); "
+ " } "
+ " } "
+ "}";
Script inlineScript =
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, painlessScript, params);
updateByQueryRequest.setScript(inlineScript);
try {
updateElasticSearchByQuery(updateByQueryRequest);
LOG.info("Successfully propagated FQN updates for parent FQN: {}", oldParentFQN);
} catch (Exception e) {
LOG.error("Error while propagating FQN updates: {}", e.getMessage(), e);
}
entityManager.updateByFqnPrefix(indexName, oldParentFQN, newParentFQN, prefixFieldCondition);
}
@Override
@ -1761,18 +1719,7 @@ public class ElasticSearchClient implements SearchClient<RestHighLevelClient> {
@Override
public void updateLineage(
String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData) {
if (isClientAvailable) {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName);
updateByQueryRequest.setQuery(
new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue())
.operator(Operator.AND));
Map<String, Object> params =
Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData));
Script script =
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params);
updateByQueryRequest.setScript(script);
updateElasticSearchByQuery(updateByQueryRequest);
}
entityManager.updateLineage(indexName, fieldAndValue, lineageData);
}
@SneakyThrows
@ -1788,15 +1735,11 @@ public class ElasticSearchClient implements SearchClient<RestHighLevelClient> {
@Override
public void close() {}
@SneakyThrows
public void deleteByQuery(String index, String query) {
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(index);
// Hack: Due to an issue on how the RangeQueryBuilder.fromXContent works, we're removing the
// first token from the Parser
XContentParser parser = createXContentParser(query);
parser.nextToken();
deleteRequest.setQuery(RangeQueryBuilder.fromXContent(parser));
deleteEntityFromElasticSearchByQuery(deleteRequest);
@Override
public void deleteByRangeQuery(
String index, String fieldName, Object gt, Object gte, Object lt, Object lte)
throws IOException {
entityManager.deleteByRangeQuery(index, fieldName, gt, gte, lt, lte);
}
@SneakyThrows

View File

@ -2,6 +2,8 @@ package org.openmetadata.service.search.elasticsearch;
import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse;
import static org.openmetadata.service.search.SearchClient.ADD_UPDATE_ENTITY_RELATIONSHIP;
import static org.openmetadata.service.search.SearchClient.ADD_UPDATE_LINEAGE;
import static org.openmetadata.service.search.SearchClient.UPDATE_FQN_PREFIX_SCRIPT;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@ -10,12 +12,14 @@ import es.co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import es.co.elastic.clients.elasticsearch.ElasticsearchClient;
import es.co.elastic.clients.elasticsearch._types.BulkIndexByScrollFailure;
import es.co.elastic.clients.elasticsearch._types.ElasticsearchException;
import es.co.elastic.clients.elasticsearch._types.ErrorCause;
import es.co.elastic.clients.elasticsearch._types.FieldValue;
import es.co.elastic.clients.elasticsearch._types.Refresh;
import es.co.elastic.clients.elasticsearch._types.ScriptLanguage;
import es.co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import es.co.elastic.clients.elasticsearch._types.query_dsl.Operator;
import es.co.elastic.clients.elasticsearch._types.query_dsl.Query;
import es.co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery;
import es.co.elastic.clients.elasticsearch.core.BulkResponse;
import es.co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import es.co.elastic.clients.elasticsearch.core.DeleteResponse;
@ -34,6 +38,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.sdk.exception.SearchException;
import org.openmetadata.sdk.exception.SearchIndexNotFoundException;
@ -574,4 +579,135 @@ public class ElasticSearchEntityManager implements EntityManagementClient {
}
return JsonData.of(docMap);
}
@Override
public void updateByFqnPrefix(
String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition) {
Query prefixQuery =
Query.of(q -> q.prefix(p -> p.field(prefixFieldCondition).value(oldParentFQN)));
Map<String, JsonData> params =
Map.of(
"oldParentFQN", JsonData.of(oldParentFQN),
"newParentFQN", JsonData.of(newParentFQN));
try {
UpdateByQueryResponse updateResponse =
client.updateByQuery(
req ->
req.index(Entity.getSearchRepository().getIndexOrAliasName(indexName))
.query(prefixQuery)
.script(
s ->
s.inline(
i ->
i.lang(ScriptLanguage.Painless)
.source(UPDATE_FQN_PREFIX_SCRIPT)
.params(params)))
.refresh(true));
LOG.info("Successfully propagated FQN updates for parent FQN: {}", oldParentFQN);
if (!updateResponse.failures().isEmpty()) {
String errorMessage =
updateResponse.failures().stream()
.map(BulkIndexByScrollFailure::cause)
.map(ErrorCause::reason)
.collect(Collectors.joining(", "));
LOG.error("Failed to update FQN prefix: {}", errorMessage);
}
} catch (Exception e) {
LOG.error("Error while propagating FQN updates: {}", e.getMessage(), e);
}
}
@Override
public void updateLineage(
String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData) {
if (!isClientAvailable) {
LOG.error("ElasticSearch client is not available. Cannot update lineage.");
return;
}
try {
Map<String, JsonData> params =
Collections.singletonMap("lineageData", JsonData.of(JsonUtils.getMap(lineageData)));
UpdateByQueryResponse response =
client.updateByQuery(
u ->
u.index(indexName)
.query(
q ->
q.match(
m ->
m.field(fieldAndValue.getKey())
.query(fieldAndValue.getValue())
.operator(Operator.And)))
.script(
s ->
s.inline(
inline ->
inline
.lang(ScriptLanguage.Painless)
.source(ADD_UPDATE_LINEAGE)
.params(params)))
.refresh(true));
LOG.info("Successfully updated lineage in ElasticSearch for index: {}", indexName);
if (!response.failures().isEmpty()) {
String failureDetails =
response.failures().stream()
.map(BulkIndexByScrollFailure::toString)
.collect(Collectors.joining("; "));
LOG.error("Update lineage encountered failures: {}", failureDetails);
}
} catch (IOException | ElasticsearchException e) {
LOG.error("Failed to update lineage in ElasticSearch for index: {}", indexName, e);
}
}
@Override
public void deleteByRangeQuery(
String index, String fieldName, Object gt, Object gte, Object lt, Object lte)
throws IOException {
if (!isClientAvailable) {
LOG.error("Elasticsearch client is not available. Cannot delete by range query.");
return;
}
// Build the range query
Query query =
Query.of(
q ->
q.range(
r -> {
RangeQuery.Builder builder = new RangeQuery.Builder().field(fieldName);
if (gt != null) builder.gt(JsonData.of(gt));
if (gte != null) builder.gte(JsonData.of(gte));
if (lt != null) builder.lt(JsonData.of(lt));
if (lte != null) builder.lte(JsonData.of(lte));
return builder;
}));
// Execute delete-by-query with refresh
DeleteByQueryResponse response =
client.deleteByQuery(d -> d.index(index).query(query).refresh(true));
LOG.info(
"DeleteByQuery response from ES - Deleted: {}, Failures: {}",
response.deleted(),
response.failures().size());
if (!response.failures().isEmpty()) {
String failureDetails =
response.failures().stream()
.map(BulkIndexByScrollFailure::toString)
.collect(Collectors.joining("; "));
LOG.error("DeleteByQuery encountered failures: {}", failureDetails);
}
}
}

View File

@ -168,7 +168,6 @@ import os.org.opensearch.common.xcontent.XContentParser;
import os.org.opensearch.common.xcontent.XContentType;
import os.org.opensearch.index.IndexNotFoundException;
import os.org.opensearch.index.query.BoolQueryBuilder;
import os.org.opensearch.index.query.MatchQueryBuilder;
import os.org.opensearch.index.query.MultiMatchQueryBuilder;
import os.org.opensearch.index.query.Operator;
import os.org.opensearch.index.query.PrefixQueryBuilder;
@ -1846,62 +1845,13 @@ public class OpenSearchClient implements SearchClient<RestHighLevelClient> {
@Override
public void updateByFqnPrefix(
String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition) {
// Match all children documents whose fullyQualifiedName starts with the old parent's FQN
PrefixQueryBuilder prefixQuery = new PrefixQueryBuilder(prefixFieldCondition, oldParentFQN);
UpdateByQueryRequest updateByQueryRequest =
new UpdateByQueryRequest(Entity.getSearchRepository().getIndexOrAliasName(indexName));
updateByQueryRequest.setQuery(prefixQuery);
Map<String, Object> params = new HashMap<>();
params.put("oldParentFQN", oldParentFQN);
params.put("newParentFQN", newParentFQN);
String painlessScript =
"String updatedFQN = ctx._source.fullyQualifiedName.replace(params.oldParentFQN, params.newParentFQN); "
+ "ctx._source.fullyQualifiedName = updatedFQN; "
+ "ctx._source.fqnDepth = updatedFQN.splitOnToken('.').length; "
+ "if (ctx._source.containsKey('parent')) { "
+ " if (ctx._source.parent.containsKey('fullyQualifiedName')) { "
+ " String parentFQN = ctx._source.parent.fullyQualifiedName; "
+ " ctx._source.parent.fullyQualifiedName = parentFQN.replace(params.oldParentFQN, params.newParentFQN); "
+ " } "
+ "} "
+ "if (ctx._source.containsKey('tags')) { "
+ " for (int i = 0; i < ctx._source.tags.size(); i++) { "
+ " if (ctx._source.tags[i].containsKey('tagFQN')) { "
+ " String tagFQN = ctx._source.tags[i].tagFQN; "
+ " ctx._source.tags[i].tagFQN = tagFQN.replace(params.oldParentFQN, params.newParentFQN); "
+ " } "
+ " } "
+ "}";
Script inlineScript =
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, painlessScript, params);
updateByQueryRequest.setScript(inlineScript);
try {
updateOpenSearchByQuery(updateByQueryRequest);
LOG.info("Successfully propagated FQN updates for parent FQN: {}", oldParentFQN);
} catch (Exception e) {
LOG.error("Error while propagating FQN updates: {}", e.getMessage(), e);
}
entityManager.updateByFqnPrefix(indexName, oldParentFQN, newParentFQN, prefixFieldCondition);
}
@Override
public void updateLineage(
String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData) {
if (isClientAvailable) {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName);
updateByQueryRequest.setQuery(
new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue())
.operator(Operator.AND));
Map<String, Object> params =
Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData));
Script script =
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params);
updateByQueryRequest.setScript(script);
updateOpenSearchByQuery(updateByQueryRequest);
}
entityManager.updateLineage(indexName, fieldAndValue, lineageData);
}
@Override
@ -1932,15 +1882,11 @@ public class OpenSearchClient implements SearchClient<RestHighLevelClient> {
}
}
@SneakyThrows
public void deleteByQuery(String index, String query) {
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(index);
// Hack: Due to an issue on how the RangeQueryBuilder.fromXContent works, we're removing the
// first token from the Parser
XContentParser parser = createXContentParser(query);
parser.nextToken();
deleteRequest.setQuery(RangeQueryBuilder.fromXContent(parser));
deleteEntityFromOpenSearchByQuery(deleteRequest);
@Override
public void deleteByRangeQuery(
String index, String fieldName, Object gt, Object gte, Object lt, Object lte)
throws IOException {
entityManager.deleteByRangeQuery(index, fieldName, gt, gte, lt, lte);
}
@SneakyThrows

View File

@ -2,6 +2,8 @@ package org.openmetadata.service.search.opensearch;
import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse;
import static org.openmetadata.service.search.SearchClient.ADD_UPDATE_ENTITY_RELATIONSHIP;
import static org.openmetadata.service.search.SearchClient.ADD_UPDATE_LINEAGE;
import static org.openmetadata.service.search.SearchClient.UPDATE_FQN_PREFIX_SCRIPT;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@ -18,6 +20,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.sdk.exception.SearchException;
import org.openmetadata.sdk.exception.SearchIndexNotFoundException;
@ -27,12 +30,14 @@ import os.org.opensearch.client.json.JsonData;
import os.org.opensearch.client.opensearch.OpenSearchAsyncClient;
import os.org.opensearch.client.opensearch.OpenSearchClient;
import os.org.opensearch.client.opensearch._types.BulkIndexByScrollFailure;
import os.org.opensearch.client.opensearch._types.ErrorCause;
import os.org.opensearch.client.opensearch._types.FieldValue;
import os.org.opensearch.client.opensearch._types.OpenSearchException;
import os.org.opensearch.client.opensearch._types.Refresh;
import os.org.opensearch.client.opensearch._types.query_dsl.BoolQuery;
import os.org.opensearch.client.opensearch._types.query_dsl.Operator;
import os.org.opensearch.client.opensearch._types.query_dsl.Query;
import os.org.opensearch.client.opensearch._types.query_dsl.RangeQuery;
import os.org.opensearch.client.opensearch.core.BulkRequest;
import os.org.opensearch.client.opensearch.core.BulkResponse;
import os.org.opensearch.client.opensearch.core.DeleteByQueryResponse;
@ -577,4 +582,133 @@ public class OpenSearchEntityManager implements EntityManagementClient {
}
return JsonData.of(docMap);
}
@Override
public void updateByFqnPrefix(
String indexName, String oldParentFQN, String newParentFQN, String prefixFieldCondition) {
Query prefixQuery =
Query.of(q -> q.prefix(p -> p.field(prefixFieldCondition).value(oldParentFQN)));
Map<String, JsonData> params =
Map.of(
"oldParentFQN", JsonData.of(oldParentFQN),
"newParentFQN", JsonData.of(newParentFQN));
try {
UpdateByQueryResponse updateResponse =
client.updateByQuery(
req ->
req.index(Entity.getSearchRepository().getIndexOrAliasName(indexName))
.query(prefixQuery)
.script(
s ->
s.inline(
i ->
i.lang(ScriptLanguage.Painless.jsonValue())
.source(UPDATE_FQN_PREFIX_SCRIPT)
.params(params)))
.refresh(true));
LOG.info("Successfully propagated FQN updates for parent FQN: {}", oldParentFQN);
if (!updateResponse.failures().isEmpty()) {
String errorMessage =
updateResponse.failures().stream()
.map(BulkIndexByScrollFailure::cause)
.map(ErrorCause::reason)
.collect(Collectors.joining(", "));
LOG.error("Failed to update FQN prefix: {}", errorMessage);
}
} catch (Exception e) {
LOG.error("Error while propagating FQN updates: {}", e.getMessage(), e);
}
}
@Override
public void updateLineage(
String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData) {
if (!isClientAvailable) {
LOG.error("OpenSearch client is not available. Cannot update lineage.");
return;
}
try {
Map<String, JsonData> params =
Collections.singletonMap("lineageData", JsonData.of(JsonUtils.getMap(lineageData)));
UpdateByQueryResponse response =
client.updateByQuery(
u ->
u.index(indexName)
.query(
q ->
q.match(
m ->
m.field(fieldAndValue.getKey())
.query(FieldValue.of(fieldAndValue.getValue()))
.operator(Operator.And)))
.script(
s ->
s.inline(
inline ->
inline
.lang(ScriptLanguage.Painless.jsonValue())
.source(ADD_UPDATE_LINEAGE)
.params(params)))
.refresh(true));
LOG.info("Successfully updated lineage in OpenSearch for index: {}", indexName);
if (!response.failures().isEmpty()) {
String failureDetails =
response.failures().stream()
.map(BulkIndexByScrollFailure::toString)
.collect(Collectors.joining("; "));
LOG.error("Update lineage encountered failures: {}", failureDetails);
}
} catch (IOException | OpenSearchException e) {
LOG.error("Failed to update lineage in OpenSearch for index: {}", indexName, e);
}
}
@Override
public void deleteByRangeQuery(
String index, String fieldName, Object gt, Object gte, Object lt, Object lte)
throws IOException {
if (!isClientAvailable) {
LOG.error("OpenSearch client is not available. Cannot delete by range query.");
return;
}
// Build the range query
Query query =
Query.of(
q ->
q.range(
r -> {
RangeQuery.Builder builder = new RangeQuery.Builder().field(fieldName);
if (gt != null) builder.gt(JsonData.of(gt));
if (gte != null) builder.gte(JsonData.of(gte));
if (lt != null) builder.lt(JsonData.of(lt));
if (lte != null) builder.lte(JsonData.of(lte));
return builder;
}));
// Execute delete-by-query with refresh
DeleteByQueryResponse response =
client.deleteByQuery(d -> d.index(index).query(query).refresh(true));
LOG.info(
"DeleteByQuery response from OS - Deleted: {}, Failures: {}",
response.deleted(),
response.failures().size());
if (!response.failures().isEmpty()) {
String failureDetails =
response.failures().stream()
.map(BulkIndexByScrollFailure::toString)
.collect(Collectors.joining("; "));
LOG.error("DeleteByQuery encountered failures: {}", failureDetails);
}
}
}