Ingestion pipeline by service type task (#10914)

* working on ingestion getList Issue

* filter ingestion pipeline by pipelineType

* revoked docker conf

* removed old changes and formatted

* .

* addressing comments

---------

Co-authored-by: Himank Mehta <himankmehta@Himanks-MacBook-Air.local>
This commit is contained in:
07Himank 2023-04-04 17:03:57 +05:30 committed by GitHub
parent 4eb1ec81e1
commit 29f49ee783
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 2 deletions

View File

@ -81,6 +81,7 @@ import org.openmetadata.service.monitoring.EventMonitor;
import org.openmetadata.service.monitoring.EventMonitorFactory;
import org.openmetadata.service.monitoring.EventMonitorPublisher;
import org.openmetadata.service.resources.CollectionRegistry;
import org.openmetadata.service.resources.databases.DatasourceConfig;
import org.openmetadata.service.secrets.SecretsManager;
import org.openmetadata.service.secrets.SecretsManagerFactory;
import org.openmetadata.service.secrets.SecretsManagerUpdateService;
@ -151,6 +152,9 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
// Register Authenticator
registerAuthenticator(catalogConfig);
// init for dataSourceFactory
DatasourceConfig.initialize(catalogConfig);
// Unregister dropwizard default exception mappers
((DefaultServerFactory) catalogConfig.getServerFactory()).setRegisterDefaultExceptionMappers(false);
environment.jersey().property(ServerProperties.RESPONSE_SET_STATUS_OVER_SEND_ERROR, true);

View File

@ -9,6 +9,7 @@ import lombok.Getter;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.databases.DatasourceConfig;
public class ListFilter {
@Getter private final Include include;
@ -39,6 +40,7 @@ public class ListFilter {
String condition = getIncludeCondition(tableName);
condition = addCondition(condition, getDatabaseCondition(tableName));
condition = addCondition(condition, getServiceCondition(tableName));
condition = addCondition(condition, getPipelineTypeCondition(tableName));
condition = addCondition(condition, getParentCondition(tableName));
condition = addCondition(condition, getCategoryCondition(tableName));
condition = addCondition(condition, getWebhookCondition(tableName));
@ -88,6 +90,11 @@ public class ListFilter {
return webhookType == null ? "" : getWebhookTypePrefixCondition(tableName, webhookType);
}
public String getPipelineTypeCondition(String tableName) {
String pipelineType = queryParams.get("pipelineType");
return pipelineType == null ? "" : getPipelineTypePrefixCondition(tableName, pipelineType);
}
private String getTestCaseCondition() {
String condition1 = "";
String entityFQN = getQueryParam("entityFQN");
@ -126,6 +133,18 @@ public class ListFilter {
: String.format("%s.webhookType LIKE '%s%%'", tableName, typePrefix);
}
private String getPipelineTypePrefixCondition(String tableName, String pipelineType) {
pipelineType = escape(pipelineType);
if (DatasourceConfig.getInstance().isMySQL()) {
return tableName == null
? String.format("JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipelineType')) = '%s'", pipelineType)
: String.format("%s.JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipelineType')) = '%s%%'", tableName, pipelineType);
}
return tableName == null
? String.format("json->>'pipelineType' = '%s'", pipelineType)
: String.format("json->>'pipelineType' = '%s%%'", tableName, pipelineType);
}
private String getCategoryPrefixCondition(String tableName, String category) {
category = escape(category);
return tableName == null

View File

@ -0,0 +1,30 @@
package org.openmetadata.service.resources.databases;
import io.dropwizard.db.DataSourceFactory;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
public class DatasourceConfig {
private static DatasourceConfig INSTANCE;
private static volatile boolean INITIALIZED = false;
private static DataSourceFactory dataSourceFactory;
public static void initialize(OpenMetadataApplicationConfig config) {
if (!INITIALIZED) {
INSTANCE = new DatasourceConfig();
dataSourceFactory = config.getDataSourceFactory();
INITIALIZED = true;
}
}
public static DatasourceConfig getInstance() {
return INSTANCE;
}
public Boolean isMySQL() {
if (dataSourceFactory.getDriverClass().equals(ConnectionType.MYSQL.label)) {
return true;
}
return false;
}
}

View File

@ -14,7 +14,8 @@
package org.openmetadata.service.resources.services.ingestionpipelines;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.service.Entity.*;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import static org.openmetadata.service.Entity.FIELD_PIPELINE_STATUS;
import io.swagger.v3.oas.annotations.ExternalDocumentation;
import io.swagger.v3.oas.annotations.Hidden;
@ -152,6 +153,11 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
schema = @Schema(type = "string", example = "snowflakeWestCoast"))
@QueryParam("service")
String serviceParam,
@Parameter(
description = "Filter airflow pipelines by pipeline Type",
schema = @Schema(type = "string", example = "elasticSearchReindex"))
@QueryParam("pipelineType")
String pipelineType,
@Parameter(description = "Limit the number ingestion returned. (1 to 1000000, " + "default = 10)")
@DefaultValue("10")
@Min(0)
@ -171,7 +177,8 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
@DefaultValue("non-deleted")
Include include)
throws IOException {
ListFilter filter = new ListFilter(include).addQueryParam("service", serviceParam);
ListFilter filter =
new ListFilter(include).addQueryParam("service", serviceParam).addQueryParam("pipelineType", pipelineType);
ResultList<IngestionPipeline> ingestionPipelines =
super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after);