FIX #21483 - Filter Ingestion Pipelines by provider (#21498)

This commit is contained in:
Pere Miquel Brull 2025-06-03 12:13:21 +02:00
parent 25d9759967
commit 008a3ebc27
9 changed files with 77 additions and 4 deletions

View File

@ -43,6 +43,7 @@ import org.openmetadata.schema.metadataIngestion.SearchServiceMetadataPipeline;
import org.openmetadata.schema.metadataIngestion.SourceConfig;
import org.openmetadata.schema.metadataIngestion.StorageServiceMetadataPipeline;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
@ -244,6 +245,7 @@ public class CreateIngestionPipelineImpl {
.withOwners(service.getOwners())
.withPipelineType(pipelineType)
.withService(service.getEntityReference())
.withProvider(ProviderType.AUTOMATION)
.withSourceConfig(
new SourceConfig().withConfig(getSourceConfig(pipelineType, service)));
IngestionPipeline ingestionPipeline = mapper.createToEntity(create, "governance-bot");

View File

@ -2877,6 +2877,11 @@ public interface CollectionDAO {
condition += serviceCondition;
}
if (filter.getQueryParam("provider") != null) {
String providerCondition = String.format(" and %s", filter.getProviderCondition());
condition += providerCondition;
}
Map<String, Object> bindMap = new HashMap<>();
String serviceType = filter.getQueryParam("serviceType");
if (!nullOrEmpty(serviceType)) {
@ -2913,6 +2918,11 @@ public interface CollectionDAO {
condition += serviceCondition;
}
if (filter.getQueryParam("provider") != null) {
String providerCondition = String.format(" and %s", filter.getProviderCondition());
condition += providerCondition;
}
Map<String, Object> bindMap = new HashMap<>();
String serviceType = filter.getQueryParam("serviceType");
if (!nullOrEmpty(serviceType)) {
@ -2954,6 +2964,11 @@ public interface CollectionDAO {
condition += serviceCondition;
}
if (filter.getQueryParam("provider") != null) {
String providerCondition = String.format(" and %s", filter.getProviderCondition());
condition += providerCondition;
}
Map<String, Object> bindMap = new HashMap<>();
String serviceType = filter.getQueryParam("serviceType");
if (!nullOrEmpty(serviceType)) {

View File

@ -51,6 +51,7 @@ public class ListFilter extends Filter<ListFilter> {
conditions.add(getWorkflowDefinitionIdCondition());
conditions.add(getEntityLinkCondition());
conditions.add(getAgentTypeCondition());
conditions.add(getProviderCondition());
String condition = addCondition(conditions);
return condition.isEmpty() ? "WHERE TRUE" : "WHERE " + condition;
}
@ -104,6 +105,19 @@ public class ListFilter extends Filter<ListFilter> {
}
}
public String getProviderCondition() {
String provider = queryParams.get("provider");
if (provider == null) {
return "";
} else {
if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) {
return String.format("JSON_EXTRACT(json, '$.provider') = '%s'", provider);
} else {
return String.format("json->>'provider' = '%s'", provider);
}
}
}
private String getEventSubscriptionAlertType() {
String alertType = queryParams.get("alertType");
if (alertType == null) {

View File

@ -27,6 +27,7 @@ public class IngestionPipelineMapper
.withSourceConfig(create.getSourceConfig())
.withLoggerLevel(create.getLoggerLevel())
.withRaiseOnError(create.getRaiseOnError())
.withProvider(create.getProvider())
.withService(create.getService());
}
}

View File

@ -67,6 +67,7 @@ import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
@ -238,14 +239,20 @@ public class IngestionPipelineResource
schema = @Schema(implementation = Include.class))
@QueryParam("include")
@DefaultValue("non-deleted")
Include include) {
Include include,
@Parameter(
description = "List Ingestion Pipelines by provider..",
schema = @Schema(implementation = ProviderType.class))
@QueryParam("provider")
ProviderType provider) {
ListFilter filter =
new ListFilter(include)
.addQueryParam("service", serviceParam)
.addQueryParam("pipelineType", pipelineType)
.addQueryParam("serviceType", serviceType)
.addQueryParam("testSuite", testSuiteParam)
.addQueryParam("applicationType", applicationType);
.addQueryParam("applicationType", applicationType)
.addQueryParam("provider", provider == null ? null : provider.value());
ResultList<IngestionPipeline> ingestionPipelines =
super.listInternal(
uriInfo, securityContext, fieldsParam, filter, limitParam, before, after);

View File

@ -80,6 +80,7 @@ import org.openmetadata.schema.services.connections.database.ConnectionOptions;
import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.EntityResourceTest;
import org.openmetadata.service.resources.services.DashboardServiceResourceTest;
@ -853,6 +854,23 @@ public class IngestionPipelineResourceTest
authHeaders(DATA_CONSUMER.getName()));
}
@Test
void testListByProvider(TestInfo test) throws IOException {
// Create a pipeline with a provider
CreateIngestionPipeline create = createRequest(test).withProvider(ProviderType.AUTOMATION);
IngestionPipeline ingestionPipeline = createAndCheckEntity(create, ADMIN_AUTH_HEADERS);
CreateIngestionPipeline createNoProvider = createRequest(test, 1);
createAndCheckEntity(createNoProvider, ADMIN_AUTH_HEADERS);
// List pipelines by provider
Map<String, String> queryParams = new HashMap<>();
queryParams.put("provider", ProviderType.AUTOMATION.value());
ResultList<IngestionPipeline> resultList = listEntities(queryParams, ADMIN_AUTH_HEADERS);
assertEquals(1, resultList.getData().size());
assertEquals(ingestionPipeline.getId(), resultList.getData().get(0).getId());
}
private IngestionPipeline updateIngestionPipeline(
CreateIngestionPipeline create, Map<String, String> authHeaders)
throws HttpResponseException {

View File

@ -46,6 +46,9 @@
"$ref": "../../../type/entityReferenceList.json",
"default": null
},
"provider" : {
"$ref": "../../../type/basic.json#/definitions/providerType"
},
"domain" : {
"description": "Fully qualified name of the domain the Table belongs to.",
"type": "string"

View File

@ -161,9 +161,9 @@
},
"providerType": {
"javaType": "org.openmetadata.schema.type.ProviderType",
"description": "Type of provider of an entity. Some entities are provided by the `system`. Some are entities created and provided by the `user`. Typically `system` provide entities can't be deleted and can only be disabled.",
"description": "Type of provider of an entity. Some entities are provided by the `system`. Some are entities created and provided by the `user`. Typically `system` provide entities can't be deleted and can only be disabled. Some apps such as AutoPilot create entities with `automation` provider type. These entities can be deleted by the user.",
"type": "string",
"enum": ["system", "user"],
"enum": ["system", "user", "automation"],
"default": "user"
},
"componentConfig": {

View File

@ -40,6 +40,7 @@ export interface CreateIngestionPipeline {
*/
owners?: EntityReference[];
pipelineType: PipelineType;
provider?: ProviderType;
/**
* Control if we want to flag the workflow as failed if we encounter any processing errors.
*/
@ -203,6 +204,18 @@ export enum PipelineType {
Usage = "usage",
}
/**
* Type of provider of an entity. Some entities are provided by the `system`. Some are
* entities created and provided by the `user`. Typically `system` provide entities can't be
* deleted and can only be disabled. Some apps such as AutoPilot create entities with
* `automation` provider type. These entities can be deleted by the user.
*/
export enum ProviderType {
Automation = "automation",
System = "system",
User = "user",
}
/**
* Additional connection configuration.
*/