From 400a19752bb72b441cfff634a535d524a351438e Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 8 Mar 2022 05:20:44 -0800 Subject: [PATCH] Issue-3237: Airflow pipeline resource is not returning status for a workflow (#3238) * Issue-3237: Airflow pipeline resource is not returning status for a workflow * Issue-3237: Airflow pipeline resource is not returning status for a workflow * Issue-3237: Airflow pipeline resource is not returning status for a workflow --- .../catalog/airflow/AirflowRESTClient.java | 24 +++++++++++++++++++ .../operations/AirflowPipelineResource.java | 16 +++++++++++-- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java index ce0313759c0..809f53f983c 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java @@ -24,6 +24,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; import javax.ws.rs.core.Response; +import lombok.extern.slf4j.Slf4j; import org.json.JSONObject; import org.openmetadata.catalog.CatalogApplicationConfig; import org.openmetadata.catalog.airflow.models.AirflowAuthRequest; @@ -37,6 +38,7 @@ import org.openmetadata.catalog.operations.pipelines.AirflowPipeline; import org.openmetadata.catalog.operations.pipelines.PipelineStatus; import org.openmetadata.catalog.util.JsonUtils; +@Slf4j public class AirflowRESTClient { private final URL url; private final String username; @@ -110,6 +112,28 @@ public class AirflowRESTClient { } } + public String deletePipeline(String pipelineName) { + try { + String token = authenticate(); + String authToken = String.format(AUTH_TOKEN, token); + String triggerEndPoint = "%s/rest_api/api?api=delete_delete&dag_id=%s"; + String triggerUrl = String.format(triggerEndPoint, url, pipelineName); + JSONObject requestPayload = new JSONObject(); + requestPayload.put("workflow_name", pipelineName); + HttpRequest request = + HttpRequest.newBuilder(URI.create(triggerUrl)) + .header(CONTENT_HEADER, CONTENT_TYPE) + .header(AUTH_HEADER, authToken) + .POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString())) + .build(); + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + return response.body(); + } catch (Exception e) { + LOG.error("Failed to delete Airflow Pipeline from Airflow DAGS"); + } + return null; + } + public String runPipeline(String pipelineName) { try { String token = authenticate(); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResource.java index 25cf69a28b4..4067085c148 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResource.java @@ -29,6 +29,8 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.security.GeneralSecurityException; import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.UUID; @@ -109,7 +111,7 @@ public class AirflowPipelineResource { public static class AirflowPipelineList extends ResultList { @SuppressWarnings("unused") - AirflowPipelineList() { + public AirflowPipelineList() { // Empty constructor needed for deserialization } @@ -120,7 +122,14 @@ public class AirflowPipelineResource { } static final String FIELDS = FIELD_OWNER; - public static final List ALLOWED_FIELDS = Entity.getEntityFields(AirflowPipeline.class); + public static final List ALLOWED_FIELDS; + + static { + List list = new ArrayList<>(); + list.addAll(Entity.getEntityFields(AirflowPipeline.class)); + list.add("status"); // Add a field parameter called tests that represent the fields - tableTests and columnTests + ALLOWED_FIELDS = Collections.unmodifiableList(list); + } @GET @Valid @@ -429,6 +438,9 @@ public class AirflowPipelineResource { public Response delete(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("id") String id) throws IOException, ParseException { SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); + Fields fields = new Fields(ALLOWED_FIELDS, FIELD_OWNER); + AirflowPipeline pipeline = dao.get(uriInfo, id, fields); + airflowRESTClient.deletePipeline(pipeline.getName()); DeleteResponse response = dao.delete(securityContext.getUserPrincipal().getName(), id); return response.toResponse(); }