diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java index c0ca3f914f4..04e86782b49 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java @@ -28,11 +28,8 @@ import org.openmetadata.catalog.resources.pipelines.PipelineResource; import org.openmetadata.catalog.resources.pipelines.PipelineResource.PipelineList; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.TagLabel; -import org.openmetadata.catalog.util.EntityInterface; -import org.openmetadata.catalog.util.EntityUpdater; -import org.openmetadata.catalog.util.EntityUtil; +import org.openmetadata.catalog.util.*; import org.openmetadata.catalog.util.EntityUtil.Fields; -import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.RestUtil.PutResponse; import org.openmetadata.common.utils.CipherText; import org.skife.jdbi.v2.sqlobject.Bind; @@ -44,7 +41,9 @@ import org.skife.jdbi.v2.sqlobject.Transaction; import javax.json.JsonPatch; import javax.ws.rs.core.Response.Status; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.security.GeneralSecurityException; +import java.text.ParseException; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -83,47 +82,48 @@ public abstract class PipelineRepository { @CreateSqlObject abstract TagRepository.TagDAO tagDAO(); + EntityRepository entityRepository = new EntityRepository<>() { + @Override + public List listAfter(String fqnPrefix, int limitParam, String after) { + return pipelineDAO().listAfter(fqnPrefix, limitParam, after); + } + + @Override + public List listBefore(String fqnPrefix, int limitParam, String before) { + return pipelineDAO().listBefore(fqnPrefix, limitParam, before); + } + + @Override + public int listCount(String fqnPrefix) { + return pipelineDAO().listCount(fqnPrefix); + } + + @Override + public String getFullyQualifiedName(Pipeline entity) { + return entity.getFullyQualifiedName(); + } + + @Override + public Pipeline setFields(Pipeline entity, Fields fields) throws IOException, ParseException { + return PipelineRepository.this.setFields(entity, fields); + } + + @Override + public ResultList getResultList(List entities, String beforeCursor, String afterCursor, int total) throws GeneralSecurityException, UnsupportedEncodingException { + return new PipelineList(entities, beforeCursor, afterCursor, total); + } + }; @Transaction - public PipelineList listAfter(Fields fields, String serviceName, int limitParam, String after) throws IOException, - GeneralSecurityException { - // forward scrolling, if after == null then first page is being asked being asked - List jsons = pipelineDAO().listAfter(serviceName, limitParam + 1, after == null ? "" : - CipherText.instance().decrypt(after)); - - List pipelines = new ArrayList<>(); - for (String json : jsons) { - pipelines.add(setFields(JsonUtils.readValue(json, Pipeline.class), fields)); - } - int total = pipelineDAO().listCount(serviceName); - - String beforeCursor, afterCursor = null; - beforeCursor = after == null ? null : pipelines.get(0).getFullyQualifiedName(); - if (pipelines.size() > limitParam) { // If extra result exists, then next page exists - return after cursor - pipelines.remove(limitParam); - afterCursor = pipelines.get(limitParam - 1).getFullyQualifiedName(); - } - return new PipelineList(pipelines, beforeCursor, afterCursor, total); + public ResultList listAfter(Fields fields, String serviceName, int limitParam, String after) throws IOException, + GeneralSecurityException, ParseException { + return EntityUtil.listAfter(entityRepository, Pipeline.class, fields, serviceName, limitParam, after); } @Transaction - public PipelineList listBefore(Fields fields, String serviceName, int limitParam, String before) - throws IOException, GeneralSecurityException { - // Reverse scrolling - Get one extra result used for computing before cursor - List jsons = pipelineDAO().listBefore(serviceName, limitParam + 1, CipherText.instance().decrypt(before)); - List pipelines = new ArrayList<>(); - for (String json : jsons) { - pipelines.add(setFields(JsonUtils.readValue(json, Pipeline.class), fields)); - } - int total = pipelineDAO().listCount(serviceName); - - String beforeCursor = null, afterCursor; - if (pipelines.size() > limitParam) { // If extra result exists, then previous page exists - return before cursor - pipelines.remove(0); - beforeCursor = pipelines.get(0).getFullyQualifiedName(); - } - afterCursor = pipelines.get(pipelines.size() - 1).getFullyQualifiedName(); - return new PipelineList(pipelines, beforeCursor, afterCursor, total); + public ResultList listBefore(Fields fields, String serviceName, int limitParam, String before) + throws IOException, GeneralSecurityException, ParseException { + return EntityUtil.listBefore(entityRepository, Pipeline.class, fields, serviceName, limitParam, before); } @Transaction diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java index 5c035fe839e..41f17c0be41 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java @@ -62,6 +62,7 @@ import javax.ws.rs.core.UriInfo; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.security.GeneralSecurityException; +import java.text.ParseException; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -134,31 +135,31 @@ public class PipelineResource { content = @Content(mediaType = "application/json", schema = @Schema(implementation = PipelineList.class))) }) - public PipelineList list(@Context UriInfo uriInfo, - @Context SecurityContext securityContext, - @Parameter(description = "Fields requested in the returned resource", - schema = @Schema(type = "string", example = FIELDS)) - @QueryParam("fields") String fieldsParam, - @Parameter(description = "Filter pipelines by service name", - schema = @Schema(type = "string", example = "airflow")) - @QueryParam("service") String serviceParam, - @Parameter(description = "Limit the number pipelines returned. (1 to 1000000, " + - "default = 10)") - @DefaultValue("10") - @Min(1) - @Max(1000000) - @QueryParam("limit") int limitParam, - @Parameter(description = "Returns list of pipelines before this cursor", - schema = @Schema(type = "string")) - @QueryParam("before") String before, - @Parameter(description = "Returns list of pipelines after this cursor", - schema = @Schema(type = "string")) - @QueryParam("after") String after - ) throws IOException, GeneralSecurityException { + public ResultList list(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Fields requested in the returned resource", + schema = @Schema(type = "string", example = FIELDS)) + @QueryParam("fields") String fieldsParam, + @Parameter(description = "Filter pipelines by service name", + schema = @Schema(type = "string", example = "airflow")) + @QueryParam("service") String serviceParam, + @Parameter(description = "Limit the number pipelines returned. (1 to 1000000, " + + "default = 10)") + @DefaultValue("10") + @Min(1) + @Max(1000000) + @QueryParam("limit") int limitParam, + @Parameter(description = "Returns list of pipelines before this cursor", + schema = @Schema(type = "string")) + @QueryParam("before") String before, + @Parameter(description = "Returns list of pipelines after this cursor", + schema = @Schema(type = "string")) + @QueryParam("after") String after + ) throws IOException, GeneralSecurityException, ParseException { RestUtil.validateCursors(before, after); Fields fields = new Fields(FIELD_LIST, fieldsParam); - PipelineList pipelines; + ResultList pipelines; if (before != null) { // Reverse paging pipelines = dao.listBefore(fields, serviceParam, limitParam, before); // Ask for one extra entry } else { // Forward paging or first page