Get dag status (#4109)

This commit is contained in:
Pere Miquel Brull 2022-04-13 18:08:09 +02:00 committed by GitHub
parent 43be01b92e
commit 31fbf1a500
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 94 additions and 4 deletions

View File

@ -53,6 +53,25 @@ APIS_METADATA = [
}, },
], ],
}, },
{
"name": "dag_status",
"description": "Get the status of a dag run",
"http_method": "GET",
"arguments": [
{
"name": "dag_id",
"description": "The id of the dag",
"form_input_type": "text",
"required": True,
},
{
"name": "run_id",
"description": "The id of the dagRun",
"form_input_type": "text",
"required": False,
},
],
},
] ]

View File

@ -12,6 +12,7 @@
import json import json
from typing import Optional from typing import Optional
from airflow.models import DagRun
from flask import Response from flask import Response
@ -69,9 +70,10 @@ class ResponseFormat:
pass pass
@staticmethod @staticmethod
def format_dag_run_state(dag_run): def format_dag_run_state(dag_run: DagRun):
return { return {
"state": dag_run.get_state(), "state": dag_run.get_state(),
"run_id": dag_run.run_id,
"startDate": ( "startDate": (
None None
if not dag_run.start_date if not dag_run.start_date

View File

@ -17,9 +17,7 @@ import traceback
from typing import Optional from typing import Optional
from airflow import settings from airflow import settings
from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.models import DagBag, DagModel from airflow.models import DagBag, DagModel
from airflow.utils import timezone
from airflow.www.app import csrf from airflow.www.app import csrf
from flask import Response, request from flask import Response, request
from flask_admin import expose as admin_expose from flask_admin import expose as admin_expose
@ -35,6 +33,7 @@ from openmetadata.api.config import (
from openmetadata.api.response import ApiResponse from openmetadata.api.response import ApiResponse
from openmetadata.api.utils import jwt_token_secure from openmetadata.api.utils import jwt_token_secure
from openmetadata.operations.deploy import DagDeployer from openmetadata.operations.deploy import DagDeployer
from openmetadata.operations.status import status
from openmetadata.operations.test_connection import test_source_connection from openmetadata.operations.test_connection import test_source_connection
from openmetadata.operations.trigger import trigger from openmetadata.operations.trigger import trigger
from pydantic.error_wrappers import ValidationError from pydantic.error_wrappers import ValidationError
@ -122,6 +121,8 @@ class REST_API(AppBuilderBaseView):
return self.trigger_dag() return self.trigger_dag()
if api == "test_connection": if api == "test_connection":
return self.test_connection() return self.test_connection()
if api == "dag_status":
return self.dag_status()
raise ValueError( raise ValueError(
f"Invalid api param {api}. Expected deploy_dag or trigger_dag." f"Invalid api param {api}. Expected deploy_dag or trigger_dag."
@ -188,7 +189,6 @@ class REST_API(AppBuilderBaseView):
""" """
Trigger a dag run Trigger a dag run
""" """
logging.info("Running run_dag method")
request_json = request.get_json() request_json = request.get_json()
dag_id = request_json.get("workflow_name") dag_id = request_json.get("workflow_name")
@ -207,3 +207,20 @@ class REST_API(AppBuilderBaseView):
status=ApiResponse.STATUS_SERVER_ERROR, status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Workflow {dag_id} has filed to trigger due to {exc} - {traceback.format_exc()}", error=f"Workflow {dag_id} has filed to trigger due to {exc} - {traceback.format_exc()}",
) )
def dag_status(self) -> Response:
"""
Check the status of a DAG runs
"""
dag_id: str = self.get_request_arg(request, "dag_id")
run_id: Optional[str] = self.get_request_arg(request, "run_id")
try:
return status(dag_id, run_id)
except Exception as exc:
logging.info(f"Failed to get dag {dag_id} {run_id} status")
return ApiResponse.error(
status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Failed to get status for {dag_id} {run_id} due to {exc} - {traceback.format_exc()}",
)

View File

@ -0,0 +1,52 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module containing the logic to check a DAG status
"""
from typing import Optional
from airflow import settings
from airflow.models import DagRun
from flask import Response
from openmetadata.api.response import ApiResponse, ResponseFormat
def status(dag_id: str, run_id: Optional[str]) -> Response:
with settings.Session() as session:
query = session.query(DagRun)
if run_id:
dag_run = query.filter(
DagRun.dag_id == dag_id, DagRun.run_id == run_id
).first()
if dag_run is None:
return ApiResponse.not_found(f"DAG run {run_id} not found")
res_dag_run = ResponseFormat.format_dag_run_state(dag_run)
return ApiResponse.success({"message": f"{res_dag_run}"})
runs = (
query.filter(
DagRun.dag_id == dag_id,
)
.order_by(DagRun.start_date.desc())
.limit(10)
.all()
)
formatted = [ResponseFormat.format_dag_run_state(dag_run) for dag_run in runs]
return ApiResponse.success({"message": f"{formatted}"})