Add Pipeline Status to Ingestion Pipeline Index (#15918)

This commit is contained in:
Mohit Yadav 2024-04-16 21:40:24 +05:30 committed by GitHub
parent 6349adb0ec
commit c7e03471a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 20 additions and 5 deletions

View File

@ -325,7 +325,6 @@ public abstract class EntityRepository<T extends EntityInterface> {
* operations. It is also used during PUT and PATCH operations to set up fields that can be updated. * operations. It is also used during PUT and PATCH operations to set up fields that can be updated.
*/ */
protected abstract void clearFields(T entity, Fields fields); protected abstract void clearFields(T entity, Fields fields);
;
/** /**
* This method is used for validating an entity to be created during POST, PUT, and PATCH operations and prepare the * This method is used for validating an entity to be created during POST, PUT, and PATCH operations and prepare the

View File

@ -84,6 +84,10 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
if (ingestionPipeline.getService() == null) { if (ingestionPipeline.getService() == null) {
ingestionPipeline.withService(getContainer(ingestionPipeline.getId())); ingestionPipeline.withService(getContainer(ingestionPipeline.getId()));
} }
ingestionPipeline.setPipelineStatuses(
fields.contains("pipelineStatuses")
? getLatestPipelineStatus(ingestionPipeline)
: ingestionPipeline.getPipelineStatuses());
} }
@Override @Override
@ -188,7 +192,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
public RestUtil.PutResponse<?> addPipelineStatus( public RestUtil.PutResponse<?> addPipelineStatus(
UriInfo uriInfo, String fqn, PipelineStatus pipelineStatus) { UriInfo uriInfo, String fqn, PipelineStatus pipelineStatus) {
// Validate the request content // Validate the request content
IngestionPipeline ingestionPipeline = findByName(fqn, Include.NON_DELETED); IngestionPipeline ingestionPipeline = getByName(uriInfo, fqn, getFields("service"));
PipelineStatus storedPipelineStatus = PipelineStatus storedPipelineStatus =
JsonUtils.readValue( JsonUtils.readValue(
daoCollection daoCollection
@ -220,6 +224,11 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
ChangeDescription change = ChangeDescription change =
addPipelineStatusChangeDescription( addPipelineStatusChangeDescription(
ingestionPipeline.getVersion(), pipelineStatus, storedPipelineStatus); ingestionPipeline.getVersion(), pipelineStatus, storedPipelineStatus);
ingestionPipeline.setPipelineStatuses(pipelineStatus);
// Update ES Indexes
searchRepository.updateEntity(ingestionPipeline);
ChangeEvent changeEvent = ChangeEvent changeEvent =
getChangeEvent( getChangeEvent(
withHref(uriInfo, ingestionPipeline), withHref(uriInfo, ingestionPipeline),

View File

@ -1,9 +1,10 @@
package org.openmetadata.service.search.indexes; package org.openmetadata.service.search.indexes;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.service.Entity; import org.openmetadata.service.Entity;
import org.openmetadata.service.search.ParseTags; import org.openmetadata.service.search.ParseTags;
@ -30,7 +31,13 @@ public class IngestionPipelineIndex implements SearchIndex {
suggest.add( suggest.add(
SearchSuggest.builder().input(ingestionPipeline.getDisplayName()).weight(10).build()); SearchSuggest.builder().input(ingestionPipeline.getDisplayName()).weight(10).build());
serviceSuggest.add( serviceSuggest.add(
SearchSuggest.builder().input(ingestionPipeline.getService().getName()).weight(5).build()); SearchSuggest.builder()
.input(
(ingestionPipeline.getService() != null
? ingestionPipeline.getService().getName()
: null))
.weight(5)
.build());
ParseTags parseTags = ParseTags parseTags =
new ParseTags(Entity.getEntityTags(Entity.INGESTION_PIPELINE, ingestionPipeline)); new ParseTags(Entity.getEntityTags(Entity.INGESTION_PIPELINE, ingestionPipeline));
doc.put( doc.put(
@ -51,7 +58,7 @@ public class IngestionPipelineIndex implements SearchIndex {
doc.put("entityType", Entity.INGESTION_PIPELINE); doc.put("entityType", Entity.INGESTION_PIPELINE);
doc.put( doc.put(
"totalVotes", "totalVotes",
CommonUtil.nullOrEmpty(ingestionPipeline.getVotes()) nullOrEmpty(ingestionPipeline.getVotes())
? 0 ? 0
: ingestionPipeline.getVotes().getUpVotes() : ingestionPipeline.getVotes().getUpVotes()
- ingestionPipeline.getVotes().getDownVotes()); - ingestionPipeline.getVotes().getDownVotes());