diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 4084184f711..8044cc97f62 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -13,6 +13,7 @@ package org.openmetadata.service.jdbi3; +import static org.openmetadata.schema.type.Relationship.CONTAINS; import static org.openmetadata.schema.type.Relationship.MENTIONED_IN; import static org.openmetadata.service.Entity.ORGANIZATION_NAME; import static org.openmetadata.service.Entity.QUERY; @@ -1827,6 +1828,102 @@ public interface CollectionDAO { default String getNameColumn() { return "fullyQualifiedName"; } + + @Override + default int listCount(ListFilter filter) { + String serviceType = filter.getQueryParam("serviceType"); + String service = filter.getQueryParam("service"); + String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId"; + String pipelineTypeCondition; + Map bindMap = new HashMap<>(); + if (!CommonUtil.nullOrEmpty(serviceType)) { + if (filter.getQueryParam("pipelineType") != null) { + pipelineTypeCondition = String.format(" and %s", filter.getPipelineTypeCondition(null)); + condition += pipelineTypeCondition; + } + condition = + String.format( + "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service", + condition); + bindMap.put("relation", CONTAINS.ordinal()); + bindMap.put("service", service + ".%"); + bindMap.put("serviceType", serviceType); + return listIngestionPipelineCount(condition, bindMap); + } + return EntityDAO.super.listCount(filter); + } + + @Override + default List listAfter(ListFilter filter, int limit, String after) { + String serviceType = filter.getQueryParam("serviceType"); + String service = filter.getQueryParam("service"); + String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId"; + String pipelineTypeCondition; + Map bindMap = new HashMap<>(); + if (!CommonUtil.nullOrEmpty(serviceType)) { + if (filter.getQueryParam("pipelineType") != null) { + pipelineTypeCondition = filter.getPipelineTypeCondition(null); + condition = + String.format( + "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName > :after and %s order by ingestion_pipeline_entity.fullyQualifiedName ASC LIMIT :limit", + condition, pipelineTypeCondition); + } else { + condition = + String.format( + "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName > :after order by ingestion_pipeline_entity.fullyQualifiedName ASC LIMIT :limit", + condition); + } + bindMap.put("serviceType", serviceType); + bindMap.put("service", service + ".%"); + bindMap.put("relation", CONTAINS.ordinal()); + bindMap.put("after", after); + bindMap.put("limit", limit); + return listAfterIngestionPipelineByserviceType(condition, bindMap); + } + return EntityDAO.super.listAfter(filter, limit, after); + } + + @Override + default List listBefore(ListFilter filter, int limit, String before) { + String service = filter.getQueryParam("service"); + String serviceType = filter.getQueryParam("serviceType"); + String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId"; + String pipelineTypeCondition; + Map bindMap = new HashMap<>(); + if (!CommonUtil.nullOrEmpty(serviceType)) { + if (filter.getQueryParam("pipelineType") != null) { + pipelineTypeCondition = filter.getPipelineTypeCondition(null); + condition = + String.format( + "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName < :before and %s order by ingestion_pipeline_entity.fullyQualifiedName DESC LIMIT :limit", + condition, pipelineTypeCondition); + } else { + condition = + String.format( + "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName < :before order by ingestion_pipeline_entity.fullyQualifiedName DESC LIMIT :limit", + condition); + } + bindMap.put("serviceType", serviceType); + bindMap.put("service", service + ".%"); + bindMap.put("relation", CONTAINS.ordinal()); + bindMap.put("before", before); + bindMap.put("limit", limit); + return listBeforeIngestionPipelineByserviceType(condition, bindMap); + } + return EntityDAO.super.listBefore(filter, limit, before); + } + + @SqlQuery("SELECT ingestion_pipeline_entity.json FROM ingestion_pipeline_entity ") + List listAfterIngestionPipelineByserviceType( + @Define("cond") String cond, @BindMap Map bindings); + + @SqlQuery( + "SELECT json FROM (SELECT ingestion_pipeline_entity.fullyQualifiedName, ingestion_pipeline_entity.json FROM ingestion_pipeline_entity ) last_rows_subquery ORDER BY fullyQualifiedName") + List listBeforeIngestionPipelineByserviceType( + @Define("cond") String cond, @BindMap Map bindings); + + @SqlQuery("SELECT count(*) FROM ingestion_pipeline_entity ") + int listIngestionPipelineCount(@Define("cond") String cond, @BindMap Map bindings); } interface PipelineServiceDAO extends EntityDAO { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ListFilter.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ListFilter.java index e7883268f85..44e649997dd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ListFilter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ListFilter.java @@ -137,11 +137,14 @@ public class ListFilter { pipelineType = escape(pipelineType); if (DatasourceConfig.getInstance().isMySQL()) { return tableName == null - ? String.format("JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipelineType')) = '%s'", pipelineType) - : String.format("%s.JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipelineType')) = '%s%%'", tableName, pipelineType); + ? String.format( + "JSON_UNQUOTE(JSON_EXTRACT(ingestion_pipeline_entity.json, '$.pipelineType')) = '%s'", pipelineType) + : String.format( + "%s.JSON_UNQUOTE(JSON_EXTRACT(ingestion_pipeline_entity.json, '$.pipelineType')) = '%s%%'", + tableName, pipelineType); } return tableName == null - ? String.format("json->>'pipelineType' = '%s'", pipelineType) + ? String.format("ingestion_pipeline_entity.json->>'pipelineType' = '%s'", pipelineType) : String.format("%s.json->>'pipelineType' = '%s%%'", tableName, pipelineType); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java index 6393c5eab94..0afdf7d8349 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java @@ -214,6 +214,11 @@ public class IngestionPipelineResource extends EntityResource ingestionPipelines = super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after);