Ingestion pipeline by service type (#11295)

* solved issue for displaying ingestion pipeling by serviceType

* fixed for postgres

* removed unnecessary param

* just cleary declared the examples

---------

Co-authored-by: Himank Mehta <himankmehta@Himanks-MacBook-Air.local>
This commit is contained in:
07Himank 2023-04-27 15:20:42 +05:30 committed by GitHub
parent c53a3413fb
commit 9b5ee0ab68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 112 additions and 4 deletions

View File

@ -13,6 +13,7 @@
package org.openmetadata.service.jdbi3; package org.openmetadata.service.jdbi3;
import static org.openmetadata.schema.type.Relationship.CONTAINS;
import static org.openmetadata.schema.type.Relationship.MENTIONED_IN; import static org.openmetadata.schema.type.Relationship.MENTIONED_IN;
import static org.openmetadata.service.Entity.ORGANIZATION_NAME; import static org.openmetadata.service.Entity.ORGANIZATION_NAME;
import static org.openmetadata.service.Entity.QUERY; import static org.openmetadata.service.Entity.QUERY;
@ -1827,6 +1828,102 @@ public interface CollectionDAO {
default String getNameColumn() { default String getNameColumn() {
return "fullyQualifiedName"; return "fullyQualifiedName";
} }
@Override
default int listCount(ListFilter filter) {
String serviceType = filter.getQueryParam("serviceType");
String service = filter.getQueryParam("service");
String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId";
String pipelineTypeCondition;
Map<String, Object> bindMap = new HashMap<>();
if (!CommonUtil.nullOrEmpty(serviceType)) {
if (filter.getQueryParam("pipelineType") != null) {
pipelineTypeCondition = String.format(" and %s", filter.getPipelineTypeCondition(null));
condition += pipelineTypeCondition;
}
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service",
condition);
bindMap.put("relation", CONTAINS.ordinal());
bindMap.put("service", service + ".%");
bindMap.put("serviceType", serviceType);
return listIngestionPipelineCount(condition, bindMap);
}
return EntityDAO.super.listCount(filter);
}
@Override
default List<String> listAfter(ListFilter filter, int limit, String after) {
String serviceType = filter.getQueryParam("serviceType");
String service = filter.getQueryParam("service");
String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId";
String pipelineTypeCondition;
Map<String, Object> bindMap = new HashMap<>();
if (!CommonUtil.nullOrEmpty(serviceType)) {
if (filter.getQueryParam("pipelineType") != null) {
pipelineTypeCondition = filter.getPipelineTypeCondition(null);
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName > :after and %s order by ingestion_pipeline_entity.fullyQualifiedName ASC LIMIT :limit",
condition, pipelineTypeCondition);
} else {
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName > :after order by ingestion_pipeline_entity.fullyQualifiedName ASC LIMIT :limit",
condition);
}
bindMap.put("serviceType", serviceType);
bindMap.put("service", service + ".%");
bindMap.put("relation", CONTAINS.ordinal());
bindMap.put("after", after);
bindMap.put("limit", limit);
return listAfterIngestionPipelineByserviceType(condition, bindMap);
}
return EntityDAO.super.listAfter(filter, limit, after);
}
@Override
default List<String> listBefore(ListFilter filter, int limit, String before) {
String service = filter.getQueryParam("service");
String serviceType = filter.getQueryParam("serviceType");
String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId";
String pipelineTypeCondition;
Map<String, Object> bindMap = new HashMap<>();
if (!CommonUtil.nullOrEmpty(serviceType)) {
if (filter.getQueryParam("pipelineType") != null) {
pipelineTypeCondition = filter.getPipelineTypeCondition(null);
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName < :before and %s order by ingestion_pipeline_entity.fullyQualifiedName DESC LIMIT :limit",
condition, pipelineTypeCondition);
} else {
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName < :before order by ingestion_pipeline_entity.fullyQualifiedName DESC LIMIT :limit",
condition);
}
bindMap.put("serviceType", serviceType);
bindMap.put("service", service + ".%");
bindMap.put("relation", CONTAINS.ordinal());
bindMap.put("before", before);
bindMap.put("limit", limit);
return listBeforeIngestionPipelineByserviceType(condition, bindMap);
}
return EntityDAO.super.listBefore(filter, limit, before);
}
@SqlQuery("SELECT ingestion_pipeline_entity.json FROM ingestion_pipeline_entity <cond>")
List<String> listAfterIngestionPipelineByserviceType(
@Define("cond") String cond, @BindMap Map<String, Object> bindings);
@SqlQuery(
"SELECT json FROM (SELECT ingestion_pipeline_entity.fullyQualifiedName, ingestion_pipeline_entity.json FROM ingestion_pipeline_entity <cond>) last_rows_subquery ORDER BY fullyQualifiedName")
List<String> listBeforeIngestionPipelineByserviceType(
@Define("cond") String cond, @BindMap Map<String, Object> bindings);
@SqlQuery("SELECT count(*) FROM ingestion_pipeline_entity <cond> ")
int listIngestionPipelineCount(@Define("cond") String cond, @BindMap Map<String, Object> bindings);
} }
interface PipelineServiceDAO extends EntityDAO<PipelineService> { interface PipelineServiceDAO extends EntityDAO<PipelineService> {

View File

@ -137,11 +137,14 @@ public class ListFilter {
pipelineType = escape(pipelineType); pipelineType = escape(pipelineType);
if (DatasourceConfig.getInstance().isMySQL()) { if (DatasourceConfig.getInstance().isMySQL()) {
return tableName == null return tableName == null
? String.format("JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipelineType')) = '%s'", pipelineType) ? String.format(
: String.format("%s.JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipelineType')) = '%s%%'", tableName, pipelineType); "JSON_UNQUOTE(JSON_EXTRACT(ingestion_pipeline_entity.json, '$.pipelineType')) = '%s'", pipelineType)
: String.format(
"%s.JSON_UNQUOTE(JSON_EXTRACT(ingestion_pipeline_entity.json, '$.pipelineType')) = '%s%%'",
tableName, pipelineType);
} }
return tableName == null return tableName == null
? String.format("json->>'pipelineType' = '%s'", pipelineType) ? String.format("ingestion_pipeline_entity.json->>'pipelineType' = '%s'", pipelineType)
: String.format("%s.json->>'pipelineType' = '%s%%'", tableName, pipelineType); : String.format("%s.json->>'pipelineType' = '%s%%'", tableName, pipelineType);
} }

View File

@ -214,6 +214,11 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
schema = @Schema(type = "string", example = "elasticSearchReindex")) schema = @Schema(type = "string", example = "elasticSearchReindex"))
@QueryParam("pipelineType") @QueryParam("pipelineType")
String pipelineType, String pipelineType,
@Parameter(
description = "Filter airflow pipelines by service Type",
schema = @Schema(type = "string", example = "messagingService"))
@QueryParam("serviceType")
String serviceType,
@Parameter(description = "Limit the number ingestion returned. (1 to 1000000, " + "default = 10)") @Parameter(description = "Limit the number ingestion returned. (1 to 1000000, " + "default = 10)")
@DefaultValue("10") @DefaultValue("10")
@Min(0) @Min(0)
@ -234,7 +239,10 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
Include include) Include include)
throws IOException { throws IOException {
ListFilter filter = ListFilter filter =
new ListFilter(include).addQueryParam("service", serviceParam).addQueryParam("pipelineType", pipelineType); new ListFilter(include)
.addQueryParam("service", serviceParam)
.addQueryParam("pipelineType", pipelineType)
.addQueryParam("serviceType", serviceType);
ResultList<IngestionPipeline> ingestionPipelines = ResultList<IngestionPipeline> ingestionPipelines =
super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after); super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after);