Issue-4118: Support hard delete for ElasticSearch (#4123)

This commit is contained in:
Sriharsha Chintalapani 2022-04-14 00:44:04 -07:00 committed by GitHub
parent ad7040e7cd
commit c9da6f89dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -24,20 +24,30 @@ import java.util.Objects;
import java.util.UUID; import java.util.UUID;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.ScriptType;
import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.elasticsearch.ElasticSearchIndexDefinition.ElasticSearchIndexType; import org.openmetadata.catalog.elasticsearch.ElasticSearchIndexDefinition.ElasticSearchIndexType;
import org.openmetadata.catalog.entity.data.Dashboard; import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.data.Database;
import org.openmetadata.catalog.entity.data.DatabaseSchema;
import org.openmetadata.catalog.entity.data.GlossaryTerm; import org.openmetadata.catalog.entity.data.GlossaryTerm;
import org.openmetadata.catalog.entity.data.Pipeline; import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Table; import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic; import org.openmetadata.catalog.entity.data.Topic;
import org.openmetadata.catalog.entity.services.DashboardService;
import org.openmetadata.catalog.entity.services.DatabaseService;
import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.entity.teams.Team; import org.openmetadata.catalog.entity.teams.Team;
import org.openmetadata.catalog.entity.teams.User; import org.openmetadata.catalog.entity.teams.User;
import org.openmetadata.catalog.events.AbstractEventPublisher; import org.openmetadata.catalog.events.AbstractEventPublisher;
@ -73,37 +83,49 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
for (ChangeEvent event : events.getData()) { for (ChangeEvent event : events.getData()) {
try { try {
String entityType = event.getEntityType(); String entityType = event.getEntityType();
UpdateRequest updateRequest = null;
switch (entityType) { switch (entityType) {
case Entity.TABLE: case Entity.TABLE:
updateRequest = updateTable(event); updateTable(event);
break; break;
case Entity.DASHBOARD: case Entity.DASHBOARD:
updateRequest = updateDashboard(event); updateDashboard(event);
break; break;
case Entity.TOPIC: case Entity.TOPIC:
updateRequest = updateTopic(event); updateTopic(event);
break; break;
case Entity.PIPELINE: case Entity.PIPELINE:
updateRequest = updatePipeline(event); updatePipeline(event);
break; break;
case Entity.USER: case Entity.USER:
updateRequest = updateUser(event); updateUser(event);
break; break;
case Entity.TEAM: case Entity.TEAM:
updateRequest = updateTeam(event); updateTeam(event);
break; break;
case Entity.GLOSSARY_TERM: case Entity.GLOSSARY_TERM:
updateRequest = updateGlossaryTerm(event); updateGlossaryTerm(event);
break;
case Entity.DATABASE:
updateDatabase(event);
break;
case Entity.DATABASE_SCHEMA:
updateDatabaseSchema(event);
break;
case Entity.DASHBOARD_SERVICE:
updateDashboardService(event);
break;
case Entity.DATABASE_SERVICE:
updateDatabaseService(event);
break;
case Entity.MESSAGING_SERVICE:
updateMessagingService(event);
break;
case Entity.PIPELINE_SERVICE:
updatePipelineService(event);
break; break;
default: default:
LOG.warn("Ignoring Entity Type {}", entityType); LOG.warn("Ignoring Entity Type {}", entityType);
} }
if (updateRequest != null) {
LOG.debug("Sending request to ElasticSearch");
LOG.debug(updateRequest.toString());
client.update(updateRequest, RequestOptions.DEFAULT);
}
} catch (ElasticsearchException e) { } catch (ElasticsearchException e) {
LOG.error("failed to update ES doc"); LOG.error("failed to update ES doc");
LOG.debug(e.getMessage()); LOG.debug(e.getMessage());
@ -173,7 +195,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
} }
} }
private UpdateRequest updateTable(ChangeEvent event) throws IOException { private void updateTable(ChangeEvent event) throws IOException {
UpdateRequest updateRequest = UpdateRequest updateRequest =
new UpdateRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName, event.getEntityId().toString()); new UpdateRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName, event.getEntityId().toString());
TableESIndex tableESIndex = null; TableESIndex tableESIndex = null;
@ -186,6 +208,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
String json = JsonUtils.pojoToJson(tableESIndex); String json = JsonUtils.pojoToJson(tableESIndex);
updateRequest.doc(json, XContentType.JSON); updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true); updateRequest.docAsUpsert(true);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_UPDATED: case ENTITY_UPDATED:
if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) { if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) {
@ -193,18 +216,21 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
} else { } else {
scriptedUpsert(tableESIndex, updateRequest); scriptedUpsert(tableESIndex, updateRequest);
} }
updateElasticSearch(updateRequest);
break; break;
case ENTITY_SOFT_DELETED: case ENTITY_SOFT_DELETED:
softDeleteEntity(updateRequest); softDeleteEntity(updateRequest);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_DELETED: case ENTITY_DELETED:
DeleteRequest deleteRequest =
new DeleteRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName, event.getEntityId().toString());
deleteEntityFromElasticSearch(deleteRequest);
break; break;
} }
return updateRequest;
} }
private UpdateRequest updateTopic(ChangeEvent event) throws IOException { private void updateTopic(ChangeEvent event) throws IOException {
UpdateRequest updateRequest = UpdateRequest updateRequest =
new UpdateRequest(ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName, event.getEntityId().toString()); new UpdateRequest(ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName, event.getEntityId().toString());
TopicESIndex topicESIndex = null; TopicESIndex topicESIndex = null;
@ -218,6 +244,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
String json = JsonUtils.pojoToJson(topicESIndex); String json = JsonUtils.pojoToJson(topicESIndex);
updateRequest.doc(json, XContentType.JSON); updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true); updateRequest.docAsUpsert(true);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_UPDATED: case ENTITY_UPDATED:
if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) { if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) {
@ -225,17 +252,21 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
} else { } else {
scriptedUpsert(topicESIndex, updateRequest); scriptedUpsert(topicESIndex, updateRequest);
} }
updateElasticSearch(updateRequest);
break; break;
case ENTITY_SOFT_DELETED: case ENTITY_SOFT_DELETED:
softDeleteEntity(updateRequest); softDeleteEntity(updateRequest);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_DELETED: case ENTITY_DELETED:
DeleteRequest deleteRequest =
new DeleteRequest(ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName, event.getEntityId().toString());
deleteEntityFromElasticSearch(deleteRequest);
break; break;
} }
return updateRequest;
} }
private UpdateRequest updateDashboard(ChangeEvent event) throws IOException { private void updateDashboard(ChangeEvent event) throws IOException {
DashboardESIndex dashboardESIndex = null; DashboardESIndex dashboardESIndex = null;
UpdateRequest updateRequest = UpdateRequest updateRequest =
new UpdateRequest(ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName, event.getEntityId().toString()); new UpdateRequest(ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName, event.getEntityId().toString());
@ -248,6 +279,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
String json = JsonUtils.pojoToJson(dashboardESIndex); String json = JsonUtils.pojoToJson(dashboardESIndex);
updateRequest.doc(json, XContentType.JSON); updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true); updateRequest.docAsUpsert(true);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_UPDATED: case ENTITY_UPDATED:
if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) { if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) {
@ -255,17 +287,21 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
} else { } else {
scriptedUpsert(dashboardESIndex, updateRequest); scriptedUpsert(dashboardESIndex, updateRequest);
} }
updateElasticSearch(updateRequest);
break; break;
case ENTITY_SOFT_DELETED: case ENTITY_SOFT_DELETED:
softDeleteEntity(updateRequest); softDeleteEntity(updateRequest);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_DELETED: case ENTITY_DELETED:
DeleteRequest deleteRequest =
new DeleteRequest(ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName, event.getEntityId().toString());
deleteEntityFromElasticSearch(deleteRequest);
break; break;
} }
return updateRequest;
} }
private UpdateRequest updatePipeline(ChangeEvent event) throws IOException { private void updatePipeline(ChangeEvent event) throws IOException {
PipelineESIndex pipelineESIndex = null; PipelineESIndex pipelineESIndex = null;
if (event.getEntity() != null && event.getEventType() != EventType.ENTITY_SOFT_DELETED) { if (event.getEntity() != null && event.getEventType() != EventType.ENTITY_SOFT_DELETED) {
Pipeline pipeline = (Pipeline) event.getEntity(); Pipeline pipeline = (Pipeline) event.getEntity();
@ -278,6 +314,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
String json = JsonUtils.pojoToJson(pipelineESIndex); String json = JsonUtils.pojoToJson(pipelineESIndex);
updateRequest.doc(json, XContentType.JSON); updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true); updateRequest.docAsUpsert(true);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_UPDATED: case ENTITY_UPDATED:
if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) { if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) {
@ -285,18 +322,21 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
} else { } else {
scriptedUpsert(pipelineESIndex, updateRequest); scriptedUpsert(pipelineESIndex, updateRequest);
} }
updateElasticSearch(updateRequest);
break; break;
case ENTITY_SOFT_DELETED: case ENTITY_SOFT_DELETED:
softDeleteEntity(updateRequest); softDeleteEntity(updateRequest);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_DELETED: case ENTITY_DELETED:
DeleteRequest deleteRequest =
new DeleteRequest(ElasticSearchIndexType.PIPELINE_SEARCH_INDEX.indexName, event.getEntityId().toString());
deleteEntityFromElasticSearch(deleteRequest);
break; break;
} }
return updateRequest;
} }
private UpdateRequest updateUser(ChangeEvent event) throws IOException { private void updateUser(ChangeEvent event) throws IOException {
UpdateRequest updateRequest = UpdateRequest updateRequest =
new UpdateRequest(ElasticSearchIndexType.USER_SEARCH_INDEX.indexName, event.getEntityId().toString()); new UpdateRequest(ElasticSearchIndexType.USER_SEARCH_INDEX.indexName, event.getEntityId().toString());
UserESIndex userESIndex = null; UserESIndex userESIndex = null;
@ -309,21 +349,25 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
String json = JsonUtils.pojoToJson(userESIndex); String json = JsonUtils.pojoToJson(userESIndex);
updateRequest.doc(json, XContentType.JSON); updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true); updateRequest.docAsUpsert(true);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_UPDATED: case ENTITY_UPDATED:
scriptedUserUpsert(userESIndex, updateRequest); scriptedUserUpsert(userESIndex, updateRequest);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_SOFT_DELETED: case ENTITY_SOFT_DELETED:
softDeleteEntity(updateRequest); softDeleteEntity(updateRequest);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_DELETED: case ENTITY_DELETED:
DeleteRequest deleteRequest =
new DeleteRequest(ElasticSearchIndexType.USER_SEARCH_INDEX.indexName, event.getEntityId().toString());
deleteEntityFromElasticSearch(deleteRequest);
break; break;
} }
return updateRequest;
} }
private UpdateRequest updateTeam(ChangeEvent event) throws IOException { private void updateTeam(ChangeEvent event) throws IOException {
UpdateRequest updateRequest = UpdateRequest updateRequest =
new UpdateRequest(ElasticSearchIndexType.TEAM_SEARCH_INDEX.indexName, event.getEntityId().toString()); new UpdateRequest(ElasticSearchIndexType.TEAM_SEARCH_INDEX.indexName, event.getEntityId().toString());
TeamESIndex teamESIndex = null; TeamESIndex teamESIndex = null;
@ -336,21 +380,25 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
String json = JsonUtils.pojoToJson(teamESIndex); String json = JsonUtils.pojoToJson(teamESIndex);
updateRequest.doc(json, XContentType.JSON); updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true); updateRequest.docAsUpsert(true);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_UPDATED: case ENTITY_UPDATED:
scriptedTeamUpsert(teamESIndex, updateRequest); scriptedTeamUpsert(teamESIndex, updateRequest);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_SOFT_DELETED: case ENTITY_SOFT_DELETED:
softDeleteEntity(updateRequest); softDeleteEntity(updateRequest);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_DELETED: case ENTITY_DELETED:
DeleteRequest deleteRequest =
new DeleteRequest(ElasticSearchIndexType.TEAM_SEARCH_INDEX.indexName, event.getEntityId().toString());
deleteEntityFromElasticSearch(deleteRequest);
break; break;
} }
return updateRequest;
} }
private UpdateRequest updateGlossaryTerm(ChangeEvent event) throws IOException { private void updateGlossaryTerm(ChangeEvent event) throws IOException {
UpdateRequest updateRequest = UpdateRequest updateRequest =
new UpdateRequest(ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName, event.getEntityId().toString()); new UpdateRequest(ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName, event.getEntityId().toString());
GlossaryTermESIndex glossaryESIndex = null; GlossaryTermESIndex glossaryESIndex = null;
@ -363,18 +411,76 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
String json = JsonUtils.pojoToJson(glossaryESIndex); String json = JsonUtils.pojoToJson(glossaryESIndex);
updateRequest.doc(json, XContentType.JSON); updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true); updateRequest.docAsUpsert(true);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_UPDATED: case ENTITY_UPDATED:
scriptedUpsert(glossaryESIndex, updateRequest); scriptedUpsert(glossaryESIndex, updateRequest);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_SOFT_DELETED: case ENTITY_SOFT_DELETED:
softDeleteEntity(updateRequest); softDeleteEntity(updateRequest);
updateElasticSearch(updateRequest);
break; break;
case ENTITY_DELETED: case ENTITY_DELETED:
DeleteRequest deleteRequest =
new DeleteRequest(ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName, event.getEntityId().toString());
deleteEntityFromElasticSearch(deleteRequest);
break; break;
} }
}
return updateRequest; private void updateDatabase(ChangeEvent event) throws IOException {
if (event.getEventType() == EventType.ENTITY_DELETED) {
Database database = (Database) event.getEntity();
DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName);
request.setQuery(new TermQueryBuilder("database", database.getName()));
deleteEntityFromElasticSearchByQuery(request);
}
}
private void updateDatabaseSchema(ChangeEvent event) throws IOException {
if (event.getEventType() == EventType.ENTITY_DELETED) {
DatabaseSchema databaseSchema = (DatabaseSchema) event.getEntity();
DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName);
request.setQuery(new TermQueryBuilder("database_schema", databaseSchema.getName()));
deleteEntityFromElasticSearchByQuery(request);
}
}
private void updateDatabaseService(ChangeEvent event) throws IOException {
if (event.getEventType() == EventType.ENTITY_DELETED) {
DatabaseService databaseService = (DatabaseService) event.getEntity();
DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName);
request.setQuery(new TermQueryBuilder("service", databaseService.getName()));
deleteEntityFromElasticSearchByQuery(request);
}
}
private void updatePipelineService(ChangeEvent event) throws IOException {
if (event.getEventType() == EventType.ENTITY_DELETED) {
PipelineService pipelineService = (PipelineService) event.getEntity();
DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.PIPELINE_SEARCH_INDEX.indexName);
request.setQuery(new TermQueryBuilder("service", pipelineService.getName()));
deleteEntityFromElasticSearchByQuery(request);
}
}
private void updateMessagingService(ChangeEvent event) throws IOException {
if (event.getEventType() == EventType.ENTITY_DELETED) {
MessagingService messagingService = (MessagingService) event.getEntity();
DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName);
request.setQuery(new TermQueryBuilder("service", messagingService.getName()));
deleteEntityFromElasticSearchByQuery(request);
}
}
private void updateDashboardService(ChangeEvent event) throws IOException {
if (event.getEventType() == EventType.ENTITY_DELETED) {
DashboardService dashboardService = (DashboardService) event.getEntity();
DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName);
request.setQuery(new TermQueryBuilder("service", dashboardService.getName()));
deleteEntityFromElasticSearchByQuery(request);
}
} }
private void scriptedUpsert(Object index, UpdateRequest updateRequest) { private void scriptedUpsert(Object index, UpdateRequest updateRequest) {
@ -417,6 +523,32 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
updateRequest.script(script); updateRequest.script(script);
} }
private void updateElasticSearch(UpdateRequest updateRequest) throws IOException {
if (updateRequest != null) {
LOG.debug("Sending request to ElasticSearch");
LOG.debug(updateRequest.toString());
client.update(updateRequest, RequestOptions.DEFAULT);
}
}
private void deleteEntityFromElasticSearch(DeleteRequest deleteRequest) throws IOException {
if (deleteRequest != null) {
LOG.debug("Sending request to ElasticSearch");
LOG.debug(deleteRequest.toString());
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
client.delete(deleteRequest, RequestOptions.DEFAULT);
}
}
private void deleteEntityFromElasticSearchByQuery(DeleteByQueryRequest deleteRequest) throws IOException {
if (deleteRequest != null) {
LOG.debug("Sending request to ElasticSearch");
LOG.debug(deleteRequest.toString());
deleteRequest.setRefresh(true);
client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
}
}
public void close() { public void close() {
try { try {
this.client.close(); this.client.close();