From c3993119f6178a3cc881f181f2a7e018bfa3f64d Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 20 Apr 2022 14:55:33 +0200 Subject: [PATCH] Fix #4258 - Check dag bag and discard run id search (#4264) Fix #4258 - Check dag bag and discard run id search (#4264) --- .../catalog/airflow/AirflowRESTClient.java | 3 ++ .../ingestionPipelines/ingestionPipeline.json | 4 +++ .../src/openmetadata/api/apis_metadata.py | 8 +---- .../src/openmetadata/api/rest_api.py | 7 ++-- .../src/openmetadata/operations/status.py | 34 +++++++------------ 5 files changed, 24 insertions(+), 32 deletions(-) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java index ba2e2d1162e..6244e5114a2 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java @@ -171,7 +171,10 @@ public class AirflowRESTClient { if (response.statusCode() == 200) { List statuses = JsonUtils.readObjects(response.body(), PipelineStatus.class); ingestionPipeline.setPipelineStatuses(statuses); + ingestionPipeline.setDeployed(true); return ingestionPipeline; + } else if (response.statusCode() == 404) { + ingestionPipeline.setDeployed(false); } throw AirflowException.byMessage( diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json index 01f2a987ddd..011bfca9705 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json @@ -166,6 +166,10 @@ }, "default": null }, + "deployed": { + "description": "Indicates if the workflow has been successfully deployed to Airflow.", + "type": "boolean" + }, "nextExecutionDate": { "description": "Next execution date from the underlying pipeline platform once the pipeline scheduled.", "$ref": "../../../type/basic.json#/definitions/date" diff --git a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py index 8e6592a8a6e..b5f138ebbd1 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py @@ -55,7 +55,7 @@ APIS_METADATA = [ }, { "name": "dag_status", - "description": "Get the status of a dag run", + "description": "Get the status of a dag's latest runs", "http_method": "GET", "arguments": [ { @@ -64,12 +64,6 @@ APIS_METADATA = [ "form_input_type": "text", "required": True, }, - { - "name": "run_id", - "description": "The id of the dagRun", - "form_input_type": "text", - "required": False, - }, ], }, { diff --git a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py index 43af7003679..04877cd3b90 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py @@ -218,16 +218,15 @@ class REST_API(AppBuilderBaseView): Check the status of a DAG runs """ dag_id: str = self.get_request_arg(request, "dag_id") - run_id: Optional[str] = self.get_request_arg(request, "run_id") try: - return status(dag_id, run_id) + return status(dag_id) except Exception as exc: - logging.info(f"Failed to get dag {dag_id} {run_id} status") + logging.info(f"Failed to get dag {dag_id} status") return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Failed to get status for {dag_id} {run_id} due to {exc} - {traceback.format_exc()}", + error=f"Failed to get status for {dag_id} due to {exc} - {traceback.format_exc()}", ) def delete_dag(self) -> Response: diff --git a/openmetadata-airflow-apis/src/openmetadata/operations/status.py b/openmetadata-airflow-apis/src/openmetadata/operations/status.py index 75aacf25fec..1af056de63d 100644 --- a/openmetadata-airflow-apis/src/openmetadata/operations/status.py +++ b/openmetadata-airflow-apis/src/openmetadata/operations/status.py @@ -12,39 +12,31 @@ Module containing the logic to check a DAG status """ import json -from typing import Optional from airflow import settings -from airflow.models import DagRun +from airflow.models import DagModel, DagRun from flask import Response from openmetadata.api.response import ApiResponse, ResponseFormat -from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( - PipelineStatus, -) - -def status(dag_id: str, run_id: Optional[str]) -> Response: +def status(dag_id: str) -> Response: + """ + Validate that the DAG is registered by Airflow. + If exists, check the DagRun + :param dag_id: DAG to find + :return: API Response + """ with settings.Session() as session: - query = session.query(DagRun) + dag_model = session.query(DagModel).filter(DagModel.dag_id == dag_id).first() - if run_id: - - dag_run = query.filter( - DagRun.dag_id == dag_id, DagRun.run_id == run_id - ).first() - - if dag_run is None: - return ApiResponse.not_found(f"DAG run {run_id} not found") - - res_dag_run: PipelineStatus = ResponseFormat.format_dag_run_state(dag_run) - - return ApiResponse.success(json.loads(res_dag_run.json())) + if not dag_model: + return ApiResponse.not_found(f"DAG {dag_id} not found.") runs = ( - query.filter( + session.query(DagRun) + .filter( DagRun.dag_id == dag_id, ) .order_by(DagRun.start_date.desc())