diff --git a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py index 7845163b4c6..8da62d81a5e 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py @@ -72,6 +72,19 @@ APIS_METADATA = [ }, ], }, + { + "name": "delete_dag", + "description": "Delete a DAG in the Web Server from Airflow database and filesystem", + "http_method": "DELETE", + "arguments": [ + { + "name": "dag_id", + "description": "The id of the dag to delete", + "form_input_type": "text", + "required": True, + }, + ], + }, ] diff --git a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py index db6398ba368..01232a7d47a 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py @@ -32,6 +32,7 @@ from openmetadata.api.config import ( ) from openmetadata.api.response import ApiResponse from openmetadata.api.utils import jwt_token_secure +from openmetadata.operations.delete import delete_dag_id from openmetadata.operations.deploy import DagDeployer from openmetadata.operations.status import status from openmetadata.operations.test_connection import test_source_connection @@ -93,8 +94,10 @@ class REST_API(AppBuilderBaseView): # '/api' REST Endpoint where API requests should all come in @csrf.exempt # Exempt the CSRF token - @admin_expose("/api", methods=["GET", "POST"]) # for Flask Admin - @app_builder_expose("/api", methods=["GET", "POST"]) # for Flask AppBuilder + @admin_expose("/api", methods=["GET", "POST", "DELETE"]) # for Flask Admin + @app_builder_expose( + "/api", methods=["GET", "POST", "DELETE"] + ) # for Flask AppBuilder @jwt_token_secure # On each request def api(self): # Get the api that you want to execute @@ -123,6 +126,8 @@ class REST_API(AppBuilderBaseView): return self.test_connection() if api == "dag_status": return self.dag_status() + if api == "delete_dag": + return self.delete_dag() raise ValueError( f"Invalid api param {api}. Expected deploy_dag or trigger_dag." @@ -224,3 +229,27 @@ class REST_API(AppBuilderBaseView): status=ApiResponse.STATUS_SERVER_ERROR, error=f"Failed to get status for {dag_id} {run_id} due to {exc} - {traceback.format_exc()}", ) + + def delete_dag(self) -> Response: + """ + POST request to DELETE a DAG. + + Expect: POST + { + "workflow_name": "my_ingestion_pipeline3" + } + """ + dag_id: str = self.get_request_arg(request, "dag_id") + + if not dag_id: + return ApiResponse.bad_request("workflow_name should be informed") + + try: + return delete_dag_id(dag_id) + + except Exception as exc: + logging.info(f"Failed to delete dag {dag_id}") + return ApiResponse.error( + status=ApiResponse.STATUS_SERVER_ERROR, + error=f"Failed to delete {dag_id} due to {exc} - {traceback.format_exc()}", + ) diff --git a/openmetadata-airflow-apis/src/openmetadata/operations/delete.py b/openmetadata-airflow-apis/src/openmetadata/operations/delete.py new file mode 100644 index 00000000000..5dd3d3b2deb --- /dev/null +++ b/openmetadata-airflow-apis/src/openmetadata/operations/delete.py @@ -0,0 +1,52 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Module containing the logic to delete a DAG +""" +import os +from pathlib import Path + +from airflow import settings +from airflow.models import DagModel +from flask import Response +from openmetadata.api.config import AIRFLOW_DAGS_FOLDER +from openmetadata.api.response import ApiResponse + + +def delete_dag_id(dag_id: str) -> Response: + """ + Delete a DAG dag_id from the filesystem and airflow db + :param dag_id: DAG to delete + :return: API Response + """ + + dag_py_file = Path(AIRFLOW_DAGS_FOLDER) / f"{dag_id}.py" + + deleted_file = False + if dag_py_file.is_file(): + deleted_file = True + os.remove(dag_py_file.absolute()) + + with settings.Session() as session: + + deleted_dags = ( + session.query(DagModel).filter(DagModel.dag_id == dag_id).delete() + ) + session.commit() + + if deleted_dags > 0 and deleted_file: + return ApiResponse.success({"message": f"DAG [{dag_id}] has been deleted"}) + + return ApiResponse.error( + status=ApiResponse.STATUS_SERVER_ERROR, + error=f"Could not find and delete {dag_id}. Deleted dags: {deleted_dags}; " + + f"deleted {dag_py_file}: {deleted_file}", + )