Issue-3237: Airflow pipeline resource is not returning status for a workflow (#3238)

* Issue-3237: Airflow pipeline resource is not returning status for a workflow

* Issue-3237: Airflow pipeline resource is not returning status for a workflow

* Issue-3237: Airflow pipeline resource is not returning status for a workflow
This commit is contained in:
Sriharsha Chintalapani 2022-03-08 05:20:44 -08:00 committed by GitHub
parent b93a8c8d0d
commit 400a19752b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 2 deletions

View File

@ -24,6 +24,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.airflow.models.AirflowAuthRequest;
@ -37,6 +38,7 @@ import org.openmetadata.catalog.operations.pipelines.AirflowPipeline;
import org.openmetadata.catalog.operations.pipelines.PipelineStatus;
import org.openmetadata.catalog.util.JsonUtils;
@Slf4j
public class AirflowRESTClient {
private final URL url;
private final String username;
@ -110,6 +112,28 @@ public class AirflowRESTClient {
}
}
public String deletePipeline(String pipelineName) {
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
String triggerEndPoint = "%s/rest_api/api?api=delete_delete&dag_id=%s";
String triggerUrl = String.format(triggerEndPoint, url, pipelineName);
JSONObject requestPayload = new JSONObject();
requestPayload.put("workflow_name", pipelineName);
HttpRequest request =
HttpRequest.newBuilder(URI.create(triggerUrl))
.header(CONTENT_HEADER, CONTENT_TYPE)
.header(AUTH_HEADER, authToken)
.POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString()))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
return response.body();
} catch (Exception e) {
LOG.error("Failed to delete Airflow Pipeline from Airflow DAGS");
}
return null;
}
public String runPipeline(String pipelineName) {
try {
String token = authenticate();

View File

@ -29,6 +29,8 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -109,7 +111,7 @@ public class AirflowPipelineResource {
public static class AirflowPipelineList extends ResultList<AirflowPipeline> {
@SuppressWarnings("unused")
AirflowPipelineList() {
public AirflowPipelineList() {
// Empty constructor needed for deserialization
}
@ -120,7 +122,14 @@ public class AirflowPipelineResource {
}
static final String FIELDS = FIELD_OWNER;
public static final List<String> ALLOWED_FIELDS = Entity.getEntityFields(AirflowPipeline.class);
public static final List<String> ALLOWED_FIELDS;
static {
List<String> list = new ArrayList<>();
list.addAll(Entity.getEntityFields(AirflowPipeline.class));
list.add("status"); // Add a field parameter called tests that represent the fields - tableTests and columnTests
ALLOWED_FIELDS = Collections.unmodifiableList(list);
}
@GET
@Valid
@ -429,6 +438,9 @@ public class AirflowPipelineResource {
public Response delete(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("id") String id)
throws IOException, ParseException {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
Fields fields = new Fields(ALLOWED_FIELDS, FIELD_OWNER);
AirflowPipeline pipeline = dao.get(uriInfo, id, fields);
airflowRESTClient.deletePipeline(pipeline.getName());
DeleteResponse<AirflowPipeline> response = dao.delete(securityContext.getUserPrincipal().getName(), id);
return response.toResponse();
}