# Copyright 2025 Collate # Licensed under the Collate Community License, Version 1.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Return a list of the 10 last status for the ingestion Pipeline """ import traceback from typing import Callable from flask import Blueprint, Response from openmetadata_managed_apis.api.response import ApiResponse from openmetadata_managed_apis.api.utils import get_arg_dag_id, get_arg_only_queued from openmetadata_managed_apis.operations.status import status from openmetadata_managed_apis.utils.logger import routes_logger logger = routes_logger() def get_fn(blueprint: Blueprint) -> Callable: """ Return the function loaded to a route :param blueprint: Flask Blueprint to assign route to :return: routed function """ # Lazy import the requirements # pylint: disable=import-outside-toplevel from airflow.api_connexion import security from airflow.security import permissions from airflow.www.app import csrf @blueprint.route("/status", methods=["GET"]) @csrf.exempt @security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)]) def dag_status() -> Response: """ Check the status of a DAG runs """ dag_id = get_arg_dag_id() only_queued = get_arg_only_queued() try: return status(dag_id, only_queued) except Exception as exc: logger.debug(traceback.format_exc()) logger.error(f"Failed to get dag [{dag_id}] status: {exc}") return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, error=f"Failed to get status for [{dag_id}] due to [{exc}] ", ) return dag_status