diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java index 1498012f5c3..062940fe021 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java @@ -87,7 +87,7 @@ public class ElasticSearchIndexDefinition { return exists; } - private boolean createIndex(ElasticSearchIndexType elasticSearchIndexType) { + public boolean createIndex(ElasticSearchIndexType elasticSearchIndexType) { try { GetIndexRequest gRequest = new GetIndexRequest(elasticSearchIndexType.indexName); gRequest.local(false); @@ -132,7 +132,7 @@ public class ElasticSearchIndexDefinition { } } - private void deleteIndex(ElasticSearchIndexType elasticSearchIndexType) { + public void deleteIndex(ElasticSearchIndexType elasticSearchIndexType) { try { GetIndexRequest gRequest = new GetIndexRequest(elasticSearchIndexType.indexName); gRequest.local(false); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexFactory.java new file mode 100644 index 00000000000..30d45f6a480 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexFactory.java @@ -0,0 +1,43 @@ +package org.openmetadata.service.elasticsearch; + +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.entity.data.Dashboard; +import org.openmetadata.schema.entity.data.GlossaryTerm; +import org.openmetadata.schema.entity.data.MlModel; +import org.openmetadata.schema.entity.data.Pipeline; +import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.entity.data.Topic; +import org.openmetadata.schema.entity.tags.Tag; +import org.openmetadata.schema.entity.teams.Team; +import org.openmetadata.schema.entity.teams.User; +import org.openmetadata.service.Entity; + +@Slf4j +public class ElasticSearchIndexFactory { + public static ElasticSearchIndex buildIndex(String entityType, EntityInterface entity) { + switch (entityType) { + case Entity.TABLE: + return new TableIndex((Table) entity); + case Entity.DASHBOARD: + return new DashboardIndex((Dashboard) entity); + case Entity.TOPIC: + return new TopicIndex((Topic) entity); + case Entity.PIPELINE: + return new PipelineIndex((Pipeline) entity); + case Entity.USER: + return new UserIndex((User) entity); + case Entity.TEAM: + return new TeamIndex((Team) entity); + case Entity.GLOSSARY_TERM: + return new GlossaryTermIndex((GlossaryTerm) entity); + case Entity.MLMODEL: + return new MlModelIndex((MlModel) entity); + case Entity.TAG: + return new TagIndex((Tag) entity); + default: + LOG.warn("Ignoring Entity Type {}", entityType); + } + return null; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/MlModelIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/MlModelIndex.java index af55fb85e08..8738601be1d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/MlModelIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/MlModelIndex.java @@ -7,7 +7,7 @@ import org.openmetadata.schema.entity.data.MlModel; import org.openmetadata.service.Entity; import org.openmetadata.service.util.JsonUtils; -public class MlModelIndex { +public class MlModelIndex implements ElasticSearchIndex { MlModel mlModel; public MlModelIndex(MlModel mlModel) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/UserIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/UserIndex.java index 1fc0f072b86..4b799b1eb38 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/UserIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/UserIndex.java @@ -7,7 +7,7 @@ import org.openmetadata.schema.entity.teams.User; import org.openmetadata.service.Entity; import org.openmetadata.service.util.JsonUtils; -public class UserIndex { +public class UserIndex implements ElasticSearchIndex { User user; final List excludeFields = List.of("owns", "changeDescription"); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index fb7da0eafc0..bcdd64f234d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -2983,6 +2983,12 @@ public interface CollectionDAO { + "ORDER BY timestamp DESC LIMIT 1") String getLatestExtension(@Bind("entityFQN") String entityFQN, @Bind("extension") String extension); + @SqlQuery( + "SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension " + + "ORDER BY timestamp DESC LIMIT :limit") + List getLastLatestExtension( + @Bind("entityFQN") String entityFQN, @Bind("extension") String extension, @Bind("limit") int limit); + @RegisterRowMapper(ExtensionMapper.class) @SqlQuery( "SELECT extension, json FROM entity_extension WHERE id = :id AND extension " diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.java new file mode 100644 index 00000000000..4bc2e1d957d --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.java @@ -0,0 +1,335 @@ +package org.openmetadata.service.resources.elasticSearch; + +import static org.openmetadata.service.Entity.DASHBOARD; +import static org.openmetadata.service.Entity.GLOSSARY_TERM; +import static org.openmetadata.service.Entity.MLMODEL; +import static org.openmetadata.service.Entity.PIPELINE; +import static org.openmetadata.service.Entity.TABLE; +import static org.openmetadata.service.Entity.TAG; +import static org.openmetadata.service.Entity.TEAM; +import static org.openmetadata.service.Entity.TOPIC; +import static org.openmetadata.service.Entity.USER; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.swagger.annotations.Api; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.BiConsumer; +import javax.validation.Valid; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; +import javax.ws.rs.core.UriInfo; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.api.CreateEventPublisherJob; +import org.openmetadata.schema.api.CreateEventPublisherJob.RunMode; +import org.openmetadata.schema.api.configuration.elasticsearch.ElasticSearchConfiguration; +import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.settings.EventPublisherJob; +import org.openmetadata.schema.settings.FailureDetails; +import org.openmetadata.schema.type.Include; +import org.openmetadata.service.Entity; +import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition; +import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.EntityRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.resources.Collection; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.util.ConfigurationHolder; +import org.openmetadata.service.util.ElasticSearchClientUtils; +import org.openmetadata.service.util.EntityUtil; +import org.openmetadata.service.util.JsonUtils; +import org.openmetadata.service.util.ResultList; + +@Path("/v1/indexResource") +@Api(value = "Elastic Search Collection", tags = "Elastic Search Collection") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Collection(name = "indexResource") +@Slf4j +public class BuildSearchIndexResource { + public static final String ELASTIC_SEARCH_EXTENSION = "service.eventPublisher"; + public static final String ELASTIC_SEARCH_ENTITY_FQN = "eventPublisher:ElasticSearch"; + private final RestHighLevelClient client; + private final ElasticSearchIndexDefinition elasticSearchIndexDefinition; + private final CollectionDAO dao; + private final Authorizer authorizer; + private final BulkProcessorListener elasticSearchBulkProcessorListener; + private final BulkProcessor bulkProcessor; + private final ExecutorService threadScheduler = Executors.newFixedThreadPool(2); + + public BuildSearchIndexResource(CollectionDAO dao, Authorizer authorizer) { + this.client = + ElasticSearchClientUtils.createElasticSearchClient( + ConfigurationHolder.getInstance() + .getConfig( + ConfigurationHolder.ConfigurationType.ELASTICSEARCHCONFIG, ElasticSearchConfiguration.class)); + this.dao = dao; + this.authorizer = authorizer; + this.elasticSearchIndexDefinition = new ElasticSearchIndexDefinition(client); + this.elasticSearchBulkProcessorListener = new BulkProcessorListener(dao); + BiConsumer> bulkConsumer = + (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); + // Setup a bulk Processor + BulkProcessor.Builder builder = + BulkProcessor.builder(bulkConsumer, elasticSearchBulkProcessorListener, "es-reindex"); + builder.setBulkActions(100); + builder.setConcurrentRequests(2); + builder.setFlushInterval(TimeValue.timeValueSeconds(60L)); + builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); + this.bulkProcessor = builder.build(); + } + + @POST + @Path("/reindex") + @Operation( + operationId = "reindexEntities", + summary = "Reindex Entities", + tags = "indexResource", + description = "Reindex Elastic Search Entities", + responses = { + @ApiResponse(responseCode = "200", description = "Success"), + @ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found") + }) + public Response reindexAllEntities( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateEventPublisherJob createRequest) + throws IOException { + // Only admins can issue a reindex request + authorizer.authorizeAdmin(securityContext, false); + String startedBy = securityContext.getUserPrincipal().getName(); + return startReindexing(uriInfo, startedBy, createRequest); + } + + @GET + @Path("/reindex/status") + @Operation( + operationId = "getReindexAllLastJobStatus", + summary = "Get Last Run Reindex All Job Status", + tags = "indexResource", + description = "Reindex All job last status", + responses = { + @ApiResponse(responseCode = "200", description = "Success"), + @ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found") + }) + public Response reindexAllJobLastStatus( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Limit the number users returned. (1 to 1000000, default = 10)") + @DefaultValue("10") + @Min(0) + @Max(1000000) + @QueryParam("limit") + int limitParam) + throws IOException { + // Only admins can issue a reindex request + authorizer.authorizeAdmin(securityContext, false); + // Check if there is a running job for reindex for requested entity + List records = + dao.entityExtensionTimeSeriesDao() + .getLastLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, limitParam); + if (records != null) { + return Response.status(Response.Status.OK) + .entity(JsonUtils.readObjects(records, EventPublisherJob.class)) + .build(); + } + return Response.status(Response.Status.NOT_FOUND).entity("No Last Run.").build(); + } + + private synchronized Response startReindexing( + UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) throws IOException { + String reindexJobString = + dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION); + EventPublisherJob reindexJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); + if (reindexJob != null + && ((System.currentTimeMillis() - reindexJob.getTimestamp() > 3600000) + || reindexJob.getStatus() == EventPublisherJob.Status.SUCCESS)) { + return Response.status(Response.Status.FORBIDDEN) + .entity("Reindexing is Running Already. Cannot issue new request.") + .build(); + } else { + // create a new Job + Long startTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + EventPublisherJob newJob = + new EventPublisherJob() + .withName(createRequest.getName()) + .withPublisherType(createRequest.getPublisherType()) + .withRunMode(createRequest.getRunMode()) + .withStatus(EventPublisherJob.Status.RUNNING) + .withTimestamp(startTime) + .withStartedBy(startedBy) + .withStartTime(startTime) + .withEntities(createRequest.getEntities()); + + dao.entityExtensionTimeSeriesDao() + .insert( + ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, "eventPublisherJob", JsonUtils.pojoToJson(newJob)); + + // Update Listener for only Batch + if (createRequest.getRunMode() == RunMode.BATCH) { + elasticSearchBulkProcessorListener.setRequestIssuer(startedBy); + elasticSearchBulkProcessorListener.setCreateRequest(createRequest); + elasticSearchBulkProcessorListener.setEntityFQN(ELASTIC_SEARCH_ENTITY_FQN); + elasticSearchBulkProcessorListener.setStartTime(startTime); + elasticSearchBulkProcessorListener.resetCounters(); + } + + // Start Full Reindexing + threadScheduler.submit( + () -> { + try { + if (createRequest.getEntities().contains("all")) { + updateEntity(uriInfo, TABLE, createRequest); + updateEntity(uriInfo, TOPIC, createRequest); + updateEntity(uriInfo, DASHBOARD, createRequest); + updateEntity(uriInfo, PIPELINE, createRequest); + updateEntity(uriInfo, USER, createRequest); + updateEntity(uriInfo, TEAM, createRequest); + updateEntity(uriInfo, GLOSSARY_TERM, createRequest); + updateEntity(uriInfo, MLMODEL, createRequest); + updateEntity(uriInfo, TAG, createRequest); + } else { + for (String entityName : createRequest.getEntities()) { + updateEntity(uriInfo, entityName, createRequest); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return Response.status(Response.Status.OK).entity("Reindexing Started").build(); + } + } + + private synchronized void updateEntity(UriInfo uriInfo, String entityType, CreateEventPublisherJob createRequest) + throws IOException { + elasticSearchBulkProcessorListener.allowTotalRequestUpdate(); + + ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = + elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType); + + if (createRequest.getRecreateIndex()) { + // Delete index + elasticSearchIndexDefinition.deleteIndex(indexType); + // Create index + elasticSearchIndexDefinition.createIndex(indexType); + } + + // Start fetching a list of Entities and pushing them to ES + EntityRepository entityRepository = Entity.getEntityRepository(entityType); + List allowedFields = entityRepository.getAllowedFields(); + String fields = String.join(",", allowedFields); + ResultList result; + String after = null; + do { + result = + entityRepository.listAfter( + uriInfo, + new EntityUtil.Fields(allowedFields, fields), + new ListFilter(Include.ALL), + createRequest.getBatchSize(), + after); + if (createRequest.getRunMode() == RunMode.BATCH) { + elasticSearchBulkProcessorListener.addRequests(result.getPaging().getTotal()); + updateElasticSearchForEntityBatch(entityType, result.getData()); + } else { + updateElasticSearchForEntityStream(entityType, result.getData()); + } + after = result.getPaging().getAfter(); + } while (after != null); + + if (createRequest.getRunMode() == RunMode.STREAM) { + String reindexJobString = + dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION); + EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); + long lastUpdateTime = latestJob.getTimestamp(); + Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + latestJob.setTimestamp(time); + latestJob.setEndTime(time); + if (latestJob.getFailureDetails() != null) { + latestJob.setStatus(EventPublisherJob.Status.FAILED); + } else { + latestJob.setStatus(EventPublisherJob.Status.SUCCESS); + } + dao.entityExtensionTimeSeriesDao() + .update(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(latestJob), lastUpdateTime); + } + } + + private synchronized void updateElasticSearchForEntityBatch(String entityType, List entities) + throws IOException { + for (EntityInterface entity : entities) { + if (entityType.equals(TABLE)) { + ((Table) entity).getColumns().forEach(table -> table.setProfile(null)); + } + bulkProcessor.add(getUpdateRequest(entityType, entity)); + } + } + + private synchronized void updateElasticSearchForEntityStream(String entityType, List entities) + throws IOException { + String reindexJobString = + dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION); + EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); + Long lastUpdateTime = latestJob.getTimestamp(); + for (EntityInterface entity : entities) { + if (entityType.equals(TABLE)) { + ((Table) entity).getColumns().forEach(table -> table.setProfile(null)); + } + FailureDetails failureDetails = null; + Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + try { + client.update(getUpdateRequest(entityType, entity), RequestOptions.DEFAULT); + } catch (IOException ex) { + failureDetails = new FailureDetails().withLastFailedAt(time).withLastFailedReason(ex.getMessage()); + latestJob.setFailureDetails(failureDetails); + } + latestJob.setTimestamp(time); + dao.entityExtensionTimeSeriesDao() + .update(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(latestJob), lastUpdateTime); + lastUpdateTime = time; + } + } + + private UpdateRequest getUpdateRequest(String entityType, EntityInterface entity) throws JsonProcessingException { + UpdateRequest updateRequest = + new UpdateRequest( + elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType).indexName, entity.getId().toString()); + updateRequest.doc( + JsonUtils.pojoToJson( + Objects.requireNonNull(ElasticSearchIndexFactory.buildIndex(entityType, entity)).buildESDoc()), + XContentType.JSON); + updateRequest.docAsUpsert(true); + return updateRequest; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BulkProcessorListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BulkProcessorListener.java new file mode 100644 index 00000000000..27f345499ff --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BulkProcessorListener.java @@ -0,0 +1,143 @@ +package org.openmetadata.service.resources.elasticSearch; + +import static org.openmetadata.service.resources.elasticSearch.BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Date; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.openmetadata.schema.api.CreateEventPublisherJob; +import org.openmetadata.schema.settings.EventPublisherJob; +import org.openmetadata.schema.settings.FailureDetails; +import org.openmetadata.schema.settings.Stats; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.util.JsonUtils; + +@Slf4j +public class BulkProcessorListener implements BulkProcessor.Listener { + private volatile boolean updateTotalRequest = true; + private volatile int totalSuccessCount = 0; + private volatile int totalFailedCount = 0; + private volatile int totalRequests = 0; + + private CreateEventPublisherJob createRequest; + private String requestIssuer = "anonymous"; + private String entityFQN; + private final CollectionDAO dao; + private Long startTime; + private Long originalStartTime; + + public BulkProcessorListener(CollectionDAO dao) { + this.dao = dao; + } + + @Override + public void beforeBulk(long executionId, BulkRequest bulkRequest) { + int numberOfActions = bulkRequest.numberOfActions(); + LOG.info("Executing bulk [{}] with {} requests", executionId, numberOfActions); + } + + @Override + public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) { + int failedCount = 0; + int successCount; + FailureDetails failureDetails = null; + for (BulkItemResponse bulkItemResponse : bulkResponse) { + if (bulkItemResponse.isFailed()) { + BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); + failureDetails = new FailureDetails(); + failureDetails.setLastFailedAt( + Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime()); + failureDetails.setLastFailedReason( + String.format("ID [%s]. Reason : %s", failure.getId(), failure.getMessage())); + failedCount++; + } + } + successCount = bulkResponse.getItems().length - failedCount; + updateFailedAndSuccess(failedCount, successCount); + // update stats in DB + Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + EventPublisherJob updateJob = + new EventPublisherJob() + .withName(createRequest.getName()) + .withPublisherType(createRequest.getPublisherType()) + .withRunMode(createRequest.getRunMode()) + .withStatus(EventPublisherJob.Status.RUNNING) + .withTimestamp(time) + .withStats(new Stats().withFailed(totalFailedCount).withSuccess(totalSuccessCount).withTotal(totalRequests)) + .withStartedBy(requestIssuer) + .withFailureDetails(failureDetails) + .withStartTime(originalStartTime) + .withEntities(createRequest.getEntities()); + if (totalRequests == totalFailedCount + totalSuccessCount) { + updateJob.setStatus(EventPublisherJob.Status.SUCCESS); + updateJob.setEndTime(time); + } + try { + dao.entityExtensionTimeSeriesDao() + .update(entityFQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(updateJob), startTime); + } catch (JsonProcessingException e) { + LOG.error("Failed in Converting to Json."); + } + startTime = time; + } + + @Override + public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) { + LOG.error("Failed to execute bulk", throwable); + updateFailedAndSuccess(bulkRequest.numberOfActions(), 0); + } + + public String getEntityFQN() { + return entityFQN; + } + + public void setRequestIssuer(String adminName) { + this.requestIssuer = adminName; + } + + public void setEntityFQN(String entityFQN) { + this.entityFQN = entityFQN; + } + + public synchronized void addRequests(int count) { + if (updateTotalRequest) { + totalRequests += count; + } + updateTotalRequest = false; + } + + public synchronized void allowTotalRequestUpdate() { + updateTotalRequest = true; + } + + public synchronized void resetCounters() { + totalRequests = 0; + totalFailedCount = 0; + totalSuccessCount = 0; + updateTotalRequest = true; + } + + public synchronized void updateFailedAndSuccess(int failedCount, int successCount) { + totalFailedCount += failedCount; + totalSuccessCount += successCount; + } + + public void setStartTime(Long time) { + this.startTime = time; + this.originalStartTime = time; + } + + public CreateEventPublisherJob getCreateRequest() { + return createRequest; + } + + public void setCreateRequest(CreateEventPublisherJob createRequest) { + this.createRequest = createRequest; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/ConfigurationHolder.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/ConfigurationHolder.java index c8d20d04ad2..707a842a784 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/ConfigurationHolder.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/ConfigurationHolder.java @@ -11,7 +11,8 @@ public class ConfigurationHolder { public enum ConfigurationType { AUTHORIZERCONFIG("authorizerConfiguration"), AUTHENTICATIONCONFIG("authenticationConfiguration"), - SMTPCONFIG("email"); + SMTPCONFIG("email"), + ELASTICSEARCHCONFIG("elasticsearch"); private String value; ConfigurationType(String value) { @@ -41,27 +42,34 @@ public class ConfigurationHolder { private final ConcurrentHashMap CONFIG_MAP = new ConcurrentHashMap<>(); public void init(OpenMetadataApplicationConfig config) { - for (Field field : OpenMetadataApplicationConfig.class.getDeclaredFields()) { - if (field.isAnnotationPresent(JsonProperty.class)) { - String configType = field.getAnnotation(JsonProperty.class).value(); - if (configType != null && !configType.equals("")) { - ConfigurationType configTypeForEnum = ConfigurationType.fromValue(configType); - if (configTypeForEnum == null) continue; - switch (configTypeForEnum) { - case AUTHORIZERCONFIG: - CONFIG_MAP.put(ConfigurationType.AUTHORIZERCONFIG, config.getAuthorizerConfiguration()); - break; - case AUTHENTICATIONCONFIG: - CONFIG_MAP.put(ConfigurationType.AUTHENTICATIONCONFIG, config.getAuthenticationConfiguration()); - break; - case SMTPCONFIG: - CONFIG_MAP.put(ConfigurationType.SMTPCONFIG, config.getSmtpSettings()); - break; - default: - LOG.info("Currently AuthorizerConfig, AuthenticatioConfig, and SMTP these can be added"); + try { + for (Field field : OpenMetadataApplicationConfig.class.getDeclaredFields()) { + if (field.isAnnotationPresent(JsonProperty.class)) { + String configType = field.getAnnotation(JsonProperty.class).value(); + if (configType != null && !configType.equals("")) { + ConfigurationType configTypeForEnum = ConfigurationType.fromValue(configType); + if (configTypeForEnum == null) continue; + switch (configTypeForEnum) { + case AUTHORIZERCONFIG: + CONFIG_MAP.put(ConfigurationType.AUTHORIZERCONFIG, config.getAuthorizerConfiguration()); + break; + case AUTHENTICATIONCONFIG: + CONFIG_MAP.put(ConfigurationType.AUTHENTICATIONCONFIG, config.getAuthenticationConfiguration()); + break; + case SMTPCONFIG: + CONFIG_MAP.put(ConfigurationType.SMTPCONFIG, config.getSmtpSettings()); + break; + case ELASTICSEARCHCONFIG: + CONFIG_MAP.put(ConfigurationType.ELASTICSEARCHCONFIG, config.getElasticSearchConfiguration()); + break; + default: + LOG.info("Currently AuthorizerConfig, AuthenticatioConfig, SMTP and ES these can be added"); + } } } } + } catch (Exception ex) { + LOG.error("Failed in initialising Configuration Holder : Reason : {}", ex.getMessage()); } } diff --git a/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml b/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml index ebcf115b2be..3a428b96260 100644 --- a/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml +++ b/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml @@ -179,5 +179,17 @@ email: password: "" transportationStrategy: "SMTP_TLS" +elasticsearch: + host: "localhost" + port: "9200" + scheme: "http" + username: "" + password: "" + truststorePath: "" + truststorePassword: "" + connectionTimeoutSecs: 5 + socketTimeoutSecs: 60 + batchSize: 10 + slackChat: slackUrl: "http://localhost:8080" diff --git a/openmetadata-spec/src/main/resources/json/schema/api/createEventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/api/createEventPublisherJob.json new file mode 100644 index 00000000000..f07206aecd8 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/api/createEventPublisherJob.json @@ -0,0 +1,41 @@ +{ + "$id": "https://open-metadata.org/schema/entity/feed/createEventPublisherJob.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "CreateEventPublisherJob", + "description": "This schema defines Event Publisher Run Result.", + "type": "object", + "javaType": "org.openmetadata.schema.api.CreateEventPublisherJob", + "properties": { + "name": { + "description": "Name of the Job", + "type": "string" + }, + "publisherType": { + "$ref": "../settings/eventPublisherJob.json#/definitions/publisherType" + }, + "runMode": { + "$ref": "../settings/eventPublisherJob.json#/definitions/runMode" + }, + "entities": { + "description": "List of Entities to Reindex", + "type": "array", + "items": { + "type": "string" + }, + "default": ["all"], + "uniqueItems": true + }, + "recreateIndex": { + "description": "This schema publisher run modes.", + "type": "boolean", + "default": false + }, + "batchSize": { + "description": "Maximum number of events sent in a batch (Default 10).", + "type": "integer", + "default": 100 + } + }, + "required": ["name", "publisherType","runMode"], + "additionalProperties": false +} diff --git a/openmetadata-spec/src/main/resources/json/schema/settings/eventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/settings/eventPublisherJob.json new file mode 100644 index 00000000000..0054ad69f7c --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/settings/eventPublisherJob.json @@ -0,0 +1,110 @@ +{ + "$id": "https://open-metadata.org/schema/entity/feed/eventPublisherResult.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "EventPublisherResult", + "description": "This schema defines Event Publisher Run Result.", + "type": "object", + "javaType": "org.openmetadata.schema.settings.EventPublisherJob", + "definitions": { + "stats": { + "type": "object", + "properties": { + "success": { + "description": "Count of Success Record", + "type": "integer", + "default": 0 + }, + "failed": { + "description": "Count of Failed Records", + "type": "integer", + "default": 0 + }, + "total": { + "description": "Count of Failed Records", + "type": "integer", + "default": 0 + } + }, + "additionalProperties": false + }, + "runMode": { + "description": "This schema publisher run modes.", + "type": "string", + "enum": [ + "stream", + "batch" + ] + }, + "publisherType": { + "description": "This schema event Publisher Types", + "type": "string", + "enum": [ + "elasticSearch", + "kafka" + ] + } + }, + "properties": { + "name": { + "description": "Name of the result", + "type": "string" + }, + "startedBy": { + "description": "Job started by", + "type": "string" + }, + "publisherType": { + "$ref": "#/definitions/publisherType" + }, + "runMode": { + "$ref": "#/definitions/runMode" + }, + "timestamp": { + "$ref": "../type/basic.json#/definitions/timestamp" + }, + "startTime": { + "$ref": "../type/basic.json#/definitions/timestamp" + }, + "endTime": { + "$ref": "../type/basic.json#/definitions/timestamp" + }, + "status": { + "description": "This schema publisher run job status.", + "type": "string", + "enum": [ + "Failed", + "Success", + "Aborted", + "Running" + ] + }, + "failureDetails": { + "description": "Failure details are set only when `status` is not `success`.", + "type": "object", + "properties": { + "lastFailedAt": { + "description": "Last non-successful callback time in UNIX UTC epoch time in milliseconds.", + "$ref": "../type/basic.json#/definitions/timestamp" + }, + "lastFailedReason": { + "description": "Last non-successful activity response reason received during callback.", + "type": "string" + } + }, + "additionalProperties": false + }, + "stats": { + "$ref": "#/definitions/stats" + }, + "entities": { + "description": "List of Entities to Reindex", + "type": "array", + "items": { + "type": "string" + }, + "uniqueItems": true + } + }, + "required": ["name", "publisherType","runMode", "timestamp", "status"], + "additionalProperties": false +}