MINOR: Add method to filter ingestion pipeline based on metadata (#21449)

* Add logic to handle WorkflowContext on Ingestion

* Revert base.py changes

* Removed comment

* Fix basedpyright complaints

* Make ContextManager automatically add its context to the PipelineStatus

* Small changes

* Only dump non-null keys

* Add Method to Filter Ingestion Pipeline based on Metadata

* Reduce the scope to filter only specifically on metadata->workflow->serviceName
This commit is contained in:
IceS2 2025-06-04 16:13:39 +02:00 committed by GitHub
parent 4633025f55
commit 8540884ab1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 33 additions and 1 deletions

View File

@ -116,7 +116,9 @@ class ContextManager:
result: dict[str, Any] = {}
for context_enum in ContextsEnum:
context_obj = getattr(instance, context_enum.value)
result[context_enum.value] = context_obj.model_dump()
context_dict = context_obj.model_dump(exclude_none=True)
if context_dict:
result[context_enum.value] = context_dict
if result:
return result
return None

View File

@ -19,6 +19,7 @@ import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriInfo;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import lombok.Getter;
@ -299,6 +300,29 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
.orElse(null)));
}
public ResultList<PipelineStatus> listExternalAppStatus(
String ingestionPipelineFQN, String serviceName, Long startTs, Long endTs) {
return listPipelineStatus(ingestionPipelineFQN, startTs, endTs)
.filter(
pipelineStatus -> {
Map<String, Object> metadata = pipelineStatus.getMetadata();
if (metadata == null) {
return false;
}
Map<String, Object> workflowMetadata =
JsonUtils.readOrConvertValue(metadata.get("workflow"), Map.class);
String pipelineStatusService = (String) workflowMetadata.get("serviceName");
return pipelineStatusService != null && pipelineStatusService.equals(serviceName);
})
.map(
pipelineStatus ->
pipelineStatus.withConfig(
Optional.ofNullable(pipelineStatus.getConfig())
.map(m -> m.getOrDefault("appConfig", null))
.map(JsonUtils::getMap)
.orElse(null)));
}
public PipelineStatus getLatestPipelineStatus(IngestionPipeline ingestionPipeline) {
return JsonUtils.readValue(
getLatestExtensionFromTimeSeries(

View File

@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.constraints.NotNull;
import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.openmetadata.schema.system.EntityError;
import org.openmetadata.schema.type.Paging;
@ -102,6 +103,11 @@ public class ResultList<T> {
return new ResultList<>(data.stream().map(mapper).collect(Collectors.toList()), paging);
}
/* Conveniently filter the data without the need to create a new ResultList */
public ResultList<T> filter(Predicate<T> predicate) {
return new ResultList<>(data.stream().filter(predicate).collect(Collectors.toList()), paging);
}
public ResultList(List<T> data, Integer offset, Integer limit, Integer total) {
this.data = data;
paging =