diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java index 49b274cee65..b15542b599a 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java @@ -24,20 +24,30 @@ import java.util.Objects; import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.elasticsearch.ElasticSearchIndexDefinition.ElasticSearchIndexType; import org.openmetadata.catalog.entity.data.Dashboard; +import org.openmetadata.catalog.entity.data.Database; +import org.openmetadata.catalog.entity.data.DatabaseSchema; import org.openmetadata.catalog.entity.data.GlossaryTerm; import org.openmetadata.catalog.entity.data.Pipeline; import org.openmetadata.catalog.entity.data.Table; import org.openmetadata.catalog.entity.data.Topic; +import org.openmetadata.catalog.entity.services.DashboardService; +import org.openmetadata.catalog.entity.services.DatabaseService; +import org.openmetadata.catalog.entity.services.MessagingService; +import org.openmetadata.catalog.entity.services.PipelineService; import org.openmetadata.catalog.entity.teams.Team; import org.openmetadata.catalog.entity.teams.User; import org.openmetadata.catalog.events.AbstractEventPublisher; @@ -73,37 +83,49 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { for (ChangeEvent event : events.getData()) { try { String entityType = event.getEntityType(); - UpdateRequest updateRequest = null; switch (entityType) { case Entity.TABLE: - updateRequest = updateTable(event); + updateTable(event); break; case Entity.DASHBOARD: - updateRequest = updateDashboard(event); + updateDashboard(event); break; case Entity.TOPIC: - updateRequest = updateTopic(event); + updateTopic(event); break; case Entity.PIPELINE: - updateRequest = updatePipeline(event); + updatePipeline(event); break; case Entity.USER: - updateRequest = updateUser(event); + updateUser(event); break; case Entity.TEAM: - updateRequest = updateTeam(event); + updateTeam(event); break; case Entity.GLOSSARY_TERM: - updateRequest = updateGlossaryTerm(event); + updateGlossaryTerm(event); + break; + case Entity.DATABASE: + updateDatabase(event); + break; + case Entity.DATABASE_SCHEMA: + updateDatabaseSchema(event); + break; + case Entity.DASHBOARD_SERVICE: + updateDashboardService(event); + break; + case Entity.DATABASE_SERVICE: + updateDatabaseService(event); + break; + case Entity.MESSAGING_SERVICE: + updateMessagingService(event); + break; + case Entity.PIPELINE_SERVICE: + updatePipelineService(event); break; default: LOG.warn("Ignoring Entity Type {}", entityType); } - if (updateRequest != null) { - LOG.debug("Sending request to ElasticSearch"); - LOG.debug(updateRequest.toString()); - client.update(updateRequest, RequestOptions.DEFAULT); - } } catch (ElasticsearchException e) { LOG.error("failed to update ES doc"); LOG.debug(e.getMessage()); @@ -173,7 +195,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { } } - private UpdateRequest updateTable(ChangeEvent event) throws IOException { + private void updateTable(ChangeEvent event) throws IOException { UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName, event.getEntityId().toString()); TableESIndex tableESIndex = null; @@ -186,6 +208,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { String json = JsonUtils.pojoToJson(tableESIndex); updateRequest.doc(json, XContentType.JSON); updateRequest.docAsUpsert(true); + updateElasticSearch(updateRequest); break; case ENTITY_UPDATED: if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) { @@ -193,18 +216,21 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { } else { scriptedUpsert(tableESIndex, updateRequest); } + updateElasticSearch(updateRequest); break; case ENTITY_SOFT_DELETED: softDeleteEntity(updateRequest); + updateElasticSearch(updateRequest); break; case ENTITY_DELETED: + DeleteRequest deleteRequest = + new DeleteRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName, event.getEntityId().toString()); + deleteEntityFromElasticSearch(deleteRequest); break; } - - return updateRequest; } - private UpdateRequest updateTopic(ChangeEvent event) throws IOException { + private void updateTopic(ChangeEvent event) throws IOException { UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName, event.getEntityId().toString()); TopicESIndex topicESIndex = null; @@ -218,6 +244,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { String json = JsonUtils.pojoToJson(topicESIndex); updateRequest.doc(json, XContentType.JSON); updateRequest.docAsUpsert(true); + updateElasticSearch(updateRequest); break; case ENTITY_UPDATED: if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) { @@ -225,17 +252,21 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { } else { scriptedUpsert(topicESIndex, updateRequest); } + updateElasticSearch(updateRequest); break; case ENTITY_SOFT_DELETED: softDeleteEntity(updateRequest); + updateElasticSearch(updateRequest); break; case ENTITY_DELETED: + DeleteRequest deleteRequest = + new DeleteRequest(ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName, event.getEntityId().toString()); + deleteEntityFromElasticSearch(deleteRequest); break; } - return updateRequest; } - private UpdateRequest updateDashboard(ChangeEvent event) throws IOException { + private void updateDashboard(ChangeEvent event) throws IOException { DashboardESIndex dashboardESIndex = null; UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName, event.getEntityId().toString()); @@ -248,6 +279,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { String json = JsonUtils.pojoToJson(dashboardESIndex); updateRequest.doc(json, XContentType.JSON); updateRequest.docAsUpsert(true); + updateElasticSearch(updateRequest); break; case ENTITY_UPDATED: if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) { @@ -255,17 +287,21 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { } else { scriptedUpsert(dashboardESIndex, updateRequest); } + updateElasticSearch(updateRequest); break; case ENTITY_SOFT_DELETED: softDeleteEntity(updateRequest); + updateElasticSearch(updateRequest); break; case ENTITY_DELETED: + DeleteRequest deleteRequest = + new DeleteRequest(ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName, event.getEntityId().toString()); + deleteEntityFromElasticSearch(deleteRequest); break; } - return updateRequest; } - private UpdateRequest updatePipeline(ChangeEvent event) throws IOException { + private void updatePipeline(ChangeEvent event) throws IOException { PipelineESIndex pipelineESIndex = null; if (event.getEntity() != null && event.getEventType() != EventType.ENTITY_SOFT_DELETED) { Pipeline pipeline = (Pipeline) event.getEntity(); @@ -278,6 +314,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { String json = JsonUtils.pojoToJson(pipelineESIndex); updateRequest.doc(json, XContentType.JSON); updateRequest.docAsUpsert(true); + updateElasticSearch(updateRequest); break; case ENTITY_UPDATED: if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) { @@ -285,18 +322,21 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { } else { scriptedUpsert(pipelineESIndex, updateRequest); } + updateElasticSearch(updateRequest); break; case ENTITY_SOFT_DELETED: softDeleteEntity(updateRequest); + updateElasticSearch(updateRequest); break; case ENTITY_DELETED: + DeleteRequest deleteRequest = + new DeleteRequest(ElasticSearchIndexType.PIPELINE_SEARCH_INDEX.indexName, event.getEntityId().toString()); + deleteEntityFromElasticSearch(deleteRequest); break; } - - return updateRequest; } - private UpdateRequest updateUser(ChangeEvent event) throws IOException { + private void updateUser(ChangeEvent event) throws IOException { UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.USER_SEARCH_INDEX.indexName, event.getEntityId().toString()); UserESIndex userESIndex = null; @@ -309,21 +349,25 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { String json = JsonUtils.pojoToJson(userESIndex); updateRequest.doc(json, XContentType.JSON); updateRequest.docAsUpsert(true); + updateElasticSearch(updateRequest); break; case ENTITY_UPDATED: scriptedUserUpsert(userESIndex, updateRequest); + updateElasticSearch(updateRequest); break; case ENTITY_SOFT_DELETED: softDeleteEntity(updateRequest); + updateElasticSearch(updateRequest); break; case ENTITY_DELETED: + DeleteRequest deleteRequest = + new DeleteRequest(ElasticSearchIndexType.USER_SEARCH_INDEX.indexName, event.getEntityId().toString()); + deleteEntityFromElasticSearch(deleteRequest); break; } - - return updateRequest; } - private UpdateRequest updateTeam(ChangeEvent event) throws IOException { + private void updateTeam(ChangeEvent event) throws IOException { UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.TEAM_SEARCH_INDEX.indexName, event.getEntityId().toString()); TeamESIndex teamESIndex = null; @@ -336,21 +380,25 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { String json = JsonUtils.pojoToJson(teamESIndex); updateRequest.doc(json, XContentType.JSON); updateRequest.docAsUpsert(true); + updateElasticSearch(updateRequest); break; case ENTITY_UPDATED: scriptedTeamUpsert(teamESIndex, updateRequest); + updateElasticSearch(updateRequest); break; case ENTITY_SOFT_DELETED: softDeleteEntity(updateRequest); + updateElasticSearch(updateRequest); break; case ENTITY_DELETED: + DeleteRequest deleteRequest = + new DeleteRequest(ElasticSearchIndexType.TEAM_SEARCH_INDEX.indexName, event.getEntityId().toString()); + deleteEntityFromElasticSearch(deleteRequest); break; } - - return updateRequest; } - private UpdateRequest updateGlossaryTerm(ChangeEvent event) throws IOException { + private void updateGlossaryTerm(ChangeEvent event) throws IOException { UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName, event.getEntityId().toString()); GlossaryTermESIndex glossaryESIndex = null; @@ -363,18 +411,76 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { String json = JsonUtils.pojoToJson(glossaryESIndex); updateRequest.doc(json, XContentType.JSON); updateRequest.docAsUpsert(true); + updateElasticSearch(updateRequest); break; case ENTITY_UPDATED: scriptedUpsert(glossaryESIndex, updateRequest); + updateElasticSearch(updateRequest); break; case ENTITY_SOFT_DELETED: softDeleteEntity(updateRequest); + updateElasticSearch(updateRequest); break; case ENTITY_DELETED: + DeleteRequest deleteRequest = + new DeleteRequest(ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName, event.getEntityId().toString()); + deleteEntityFromElasticSearch(deleteRequest); break; } + } - return updateRequest; + private void updateDatabase(ChangeEvent event) throws IOException { + if (event.getEventType() == EventType.ENTITY_DELETED) { + Database database = (Database) event.getEntity(); + DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName); + request.setQuery(new TermQueryBuilder("database", database.getName())); + deleteEntityFromElasticSearchByQuery(request); + } + } + + private void updateDatabaseSchema(ChangeEvent event) throws IOException { + if (event.getEventType() == EventType.ENTITY_DELETED) { + DatabaseSchema databaseSchema = (DatabaseSchema) event.getEntity(); + DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName); + request.setQuery(new TermQueryBuilder("database_schema", databaseSchema.getName())); + deleteEntityFromElasticSearchByQuery(request); + } + } + + private void updateDatabaseService(ChangeEvent event) throws IOException { + if (event.getEventType() == EventType.ENTITY_DELETED) { + DatabaseService databaseService = (DatabaseService) event.getEntity(); + DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName); + request.setQuery(new TermQueryBuilder("service", databaseService.getName())); + deleteEntityFromElasticSearchByQuery(request); + } + } + + private void updatePipelineService(ChangeEvent event) throws IOException { + if (event.getEventType() == EventType.ENTITY_DELETED) { + PipelineService pipelineService = (PipelineService) event.getEntity(); + DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.PIPELINE_SEARCH_INDEX.indexName); + request.setQuery(new TermQueryBuilder("service", pipelineService.getName())); + deleteEntityFromElasticSearchByQuery(request); + } + } + + private void updateMessagingService(ChangeEvent event) throws IOException { + if (event.getEventType() == EventType.ENTITY_DELETED) { + MessagingService messagingService = (MessagingService) event.getEntity(); + DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName); + request.setQuery(new TermQueryBuilder("service", messagingService.getName())); + deleteEntityFromElasticSearchByQuery(request); + } + } + + private void updateDashboardService(ChangeEvent event) throws IOException { + if (event.getEventType() == EventType.ENTITY_DELETED) { + DashboardService dashboardService = (DashboardService) event.getEntity(); + DeleteByQueryRequest request = new DeleteByQueryRequest(ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName); + request.setQuery(new TermQueryBuilder("service", dashboardService.getName())); + deleteEntityFromElasticSearchByQuery(request); + } } private void scriptedUpsert(Object index, UpdateRequest updateRequest) { @@ -417,6 +523,32 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { updateRequest.script(script); } + private void updateElasticSearch(UpdateRequest updateRequest) throws IOException { + if (updateRequest != null) { + LOG.debug("Sending request to ElasticSearch"); + LOG.debug(updateRequest.toString()); + client.update(updateRequest, RequestOptions.DEFAULT); + } + } + + private void deleteEntityFromElasticSearch(DeleteRequest deleteRequest) throws IOException { + if (deleteRequest != null) { + LOG.debug("Sending request to ElasticSearch"); + LOG.debug(deleteRequest.toString()); + deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + client.delete(deleteRequest, RequestOptions.DEFAULT); + } + } + + private void deleteEntityFromElasticSearchByQuery(DeleteByQueryRequest deleteRequest) throws IOException { + if (deleteRequest != null) { + LOG.debug("Sending request to ElasticSearch"); + LOG.debug(deleteRequest.toString()); + deleteRequest.setRefresh(true); + client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT); + } + } + public void close() { try { this.client.close();