Fix #4258 - Check dag bag and discard run id search (#4264)

Fix #4258 - Check dag bag and discard run id search (#4264)
This commit is contained in:
Pere Miquel Brull 2022-04-20 14:55:33 +02:00 committed by GitHub
parent 51110c2a86
commit c3993119f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 24 additions and 32 deletions

View File

@ -171,7 +171,10 @@ public class AirflowRESTClient {
if (response.statusCode() == 200) {
List<PipelineStatus> statuses = JsonUtils.readObjects(response.body(), PipelineStatus.class);
ingestionPipeline.setPipelineStatuses(statuses);
ingestionPipeline.setDeployed(true);
return ingestionPipeline;
} else if (response.statusCode() == 404) {
ingestionPipeline.setDeployed(false);
}
throw AirflowException.byMessage(

View File

@ -166,6 +166,10 @@
},
"default": null
},
"deployed": {
"description": "Indicates if the workflow has been successfully deployed to Airflow.",
"type": "boolean"
},
"nextExecutionDate": {
"description": "Next execution date from the underlying pipeline platform once the pipeline scheduled.",
"$ref": "../../../type/basic.json#/definitions/date"

View File

@ -55,7 +55,7 @@ APIS_METADATA = [
},
{
"name": "dag_status",
"description": "Get the status of a dag run",
"description": "Get the status of a dag's latest runs",
"http_method": "GET",
"arguments": [
{
@ -64,12 +64,6 @@ APIS_METADATA = [
"form_input_type": "text",
"required": True,
},
{
"name": "run_id",
"description": "The id of the dagRun",
"form_input_type": "text",
"required": False,
},
],
},
{

View File

@ -218,16 +218,15 @@ class REST_API(AppBuilderBaseView):
Check the status of a DAG runs
"""
dag_id: str = self.get_request_arg(request, "dag_id")
run_id: Optional[str] = self.get_request_arg(request, "run_id")
try:
return status(dag_id, run_id)
return status(dag_id)
except Exception as exc:
logging.info(f"Failed to get dag {dag_id} {run_id} status")
logging.info(f"Failed to get dag {dag_id} status")
return ApiResponse.error(
status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Failed to get status for {dag_id} {run_id} due to {exc} - {traceback.format_exc()}",
error=f"Failed to get status for {dag_id} due to {exc} - {traceback.format_exc()}",
)
def delete_dag(self) -> Response:

View File

@ -12,39 +12,31 @@
Module containing the logic to check a DAG status
"""
import json
from typing import Optional
from airflow import settings
from airflow.models import DagRun
from airflow.models import DagModel, DagRun
from flask import Response
from openmetadata.api.response import ApiResponse, ResponseFormat
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineStatus,
)
def status(dag_id: str, run_id: Optional[str]) -> Response:
def status(dag_id: str) -> Response:
"""
Validate that the DAG is registered by Airflow.
If exists, check the DagRun
:param dag_id: DAG to find
:return: API Response
"""
with settings.Session() as session:
query = session.query(DagRun)
dag_model = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
if run_id:
dag_run = query.filter(
DagRun.dag_id == dag_id, DagRun.run_id == run_id
).first()
if dag_run is None:
return ApiResponse.not_found(f"DAG run {run_id} not found")
res_dag_run: PipelineStatus = ResponseFormat.format_dag_run_state(dag_run)
return ApiResponse.success(json.loads(res_dag_run.json()))
if not dag_model:
return ApiResponse.not_found(f"DAG {dag_id} not found.")
runs = (
query.filter(
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
)
.order_by(DagRun.start_date.desc())