From 31fbf1a500092ff374a4924889dd39b66a6581a5 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 13 Apr 2022 18:08:09 +0200 Subject: [PATCH] Get dag status (#4109) --- .../src/openmetadata/api/apis_metadata.py | 19 +++++++ .../src/openmetadata/api/response.py | 4 +- .../src/openmetadata/api/rest_api.py | 23 ++++++-- .../src/openmetadata/operations/status.py | 52 +++++++++++++++++++ 4 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 openmetadata-airflow-apis/src/openmetadata/operations/status.py diff --git a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py index 7c17d4248c5..7845163b4c6 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py @@ -53,6 +53,25 @@ APIS_METADATA = [ }, ], }, + { + "name": "dag_status", + "description": "Get the status of a dag run", + "http_method": "GET", + "arguments": [ + { + "name": "dag_id", + "description": "The id of the dag", + "form_input_type": "text", + "required": True, + }, + { + "name": "run_id", + "description": "The id of the dagRun", + "form_input_type": "text", + "required": False, + }, + ], + }, ] diff --git a/openmetadata-airflow-apis/src/openmetadata/api/response.py b/openmetadata-airflow-apis/src/openmetadata/api/response.py index 128d7feca70..42aef8a6b93 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/response.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/response.py @@ -12,6 +12,7 @@ import json from typing import Optional +from airflow.models import DagRun from flask import Response @@ -69,9 +70,10 @@ class ResponseFormat: pass @staticmethod - def format_dag_run_state(dag_run): + def format_dag_run_state(dag_run: DagRun): return { "state": dag_run.get_state(), + "run_id": dag_run.run_id, "startDate": ( None if not dag_run.start_date diff --git a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py index 5d01e7fc91f..db6398ba368 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py @@ -17,9 +17,7 @@ import traceback from typing import Optional from airflow import settings -from airflow.api.common.experimental.trigger_dag import trigger_dag from airflow.models import DagBag, DagModel -from airflow.utils import timezone from airflow.www.app import csrf from flask import Response, request from flask_admin import expose as admin_expose @@ -35,6 +33,7 @@ from openmetadata.api.config import ( from openmetadata.api.response import ApiResponse from openmetadata.api.utils import jwt_token_secure from openmetadata.operations.deploy import DagDeployer +from openmetadata.operations.status import status from openmetadata.operations.test_connection import test_source_connection from openmetadata.operations.trigger import trigger from pydantic.error_wrappers import ValidationError @@ -122,6 +121,8 @@ class REST_API(AppBuilderBaseView): return self.trigger_dag() if api == "test_connection": return self.test_connection() + if api == "dag_status": + return self.dag_status() raise ValueError( f"Invalid api param {api}. Expected deploy_dag or trigger_dag." @@ -188,7 +189,6 @@ class REST_API(AppBuilderBaseView): """ Trigger a dag run """ - logging.info("Running run_dag method") request_json = request.get_json() dag_id = request_json.get("workflow_name") @@ -207,3 +207,20 @@ class REST_API(AppBuilderBaseView): status=ApiResponse.STATUS_SERVER_ERROR, error=f"Workflow {dag_id} has filed to trigger due to {exc} - {traceback.format_exc()}", ) + + def dag_status(self) -> Response: + """ + Check the status of a DAG runs + """ + dag_id: str = self.get_request_arg(request, "dag_id") + run_id: Optional[str] = self.get_request_arg(request, "run_id") + + try: + return status(dag_id, run_id) + + except Exception as exc: + logging.info(f"Failed to get dag {dag_id} {run_id} status") + return ApiResponse.error( + status=ApiResponse.STATUS_SERVER_ERROR, + error=f"Failed to get status for {dag_id} {run_id} due to {exc} - {traceback.format_exc()}", + ) diff --git a/openmetadata-airflow-apis/src/openmetadata/operations/status.py b/openmetadata-airflow-apis/src/openmetadata/operations/status.py new file mode 100644 index 00000000000..fd33002ccfc --- /dev/null +++ b/openmetadata-airflow-apis/src/openmetadata/operations/status.py @@ -0,0 +1,52 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Module containing the logic to check a DAG status +""" +from typing import Optional + +from airflow import settings +from airflow.models import DagRun +from flask import Response +from openmetadata.api.response import ApiResponse, ResponseFormat + + +def status(dag_id: str, run_id: Optional[str]) -> Response: + + with settings.Session() as session: + + query = session.query(DagRun) + + if run_id: + + dag_run = query.filter( + DagRun.dag_id == dag_id, DagRun.run_id == run_id + ).first() + + if dag_run is None: + return ApiResponse.not_found(f"DAG run {run_id} not found") + + res_dag_run = ResponseFormat.format_dag_run_state(dag_run) + + return ApiResponse.success({"message": f"{res_dag_run}"}) + + runs = ( + query.filter( + DagRun.dag_id == dag_id, + ) + .order_by(DagRun.start_date.desc()) + .limit(10) + .all() + ) + + formatted = [ResponseFormat.format_dag_run_state(dag_run) for dag_run in runs] + + return ApiResponse.success({"message": f"{formatted}"})