Fix #4146 - Airflow REST delete DAG (#4149)

This commit is contained in:
Pere Miquel Brull 2022-04-14 21:52:13 +02:00 committed by GitHub
parent c48bc18065
commit c4a6bbdc83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 2 deletions

View File

@ -72,6 +72,19 @@ APIS_METADATA = [
},
],
},
{
"name": "delete_dag",
"description": "Delete a DAG in the Web Server from Airflow database and filesystem",
"http_method": "DELETE",
"arguments": [
{
"name": "dag_id",
"description": "The id of the dag to delete",
"form_input_type": "text",
"required": True,
},
],
},
]

View File

@ -32,6 +32,7 @@ from openmetadata.api.config import (
)
from openmetadata.api.response import ApiResponse
from openmetadata.api.utils import jwt_token_secure
from openmetadata.operations.delete import delete_dag_id
from openmetadata.operations.deploy import DagDeployer
from openmetadata.operations.status import status
from openmetadata.operations.test_connection import test_source_connection
@ -93,8 +94,10 @@ class REST_API(AppBuilderBaseView):
# '/api' REST Endpoint where API requests should all come in
@csrf.exempt # Exempt the CSRF token
@admin_expose("/api", methods=["GET", "POST"]) # for Flask Admin
@app_builder_expose("/api", methods=["GET", "POST"]) # for Flask AppBuilder
@admin_expose("/api", methods=["GET", "POST", "DELETE"]) # for Flask Admin
@app_builder_expose(
"/api", methods=["GET", "POST", "DELETE"]
) # for Flask AppBuilder
@jwt_token_secure # On each request
def api(self):
# Get the api that you want to execute
@ -123,6 +126,8 @@ class REST_API(AppBuilderBaseView):
return self.test_connection()
if api == "dag_status":
return self.dag_status()
if api == "delete_dag":
return self.delete_dag()
raise ValueError(
f"Invalid api param {api}. Expected deploy_dag or trigger_dag."
@ -224,3 +229,27 @@ class REST_API(AppBuilderBaseView):
status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Failed to get status for {dag_id} {run_id} due to {exc} - {traceback.format_exc()}",
)
def delete_dag(self) -> Response:
"""
POST request to DELETE a DAG.
Expect: POST
{
"workflow_name": "my_ingestion_pipeline3"
}
"""
dag_id: str = self.get_request_arg(request, "dag_id")
if not dag_id:
return ApiResponse.bad_request("workflow_name should be informed")
try:
return delete_dag_id(dag_id)
except Exception as exc:
logging.info(f"Failed to delete dag {dag_id}")
return ApiResponse.error(
status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Failed to delete {dag_id} due to {exc} - {traceback.format_exc()}",
)

View File

@ -0,0 +1,52 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module containing the logic to delete a DAG
"""
import os
from pathlib import Path
from airflow import settings
from airflow.models import DagModel
from flask import Response
from openmetadata.api.config import AIRFLOW_DAGS_FOLDER
from openmetadata.api.response import ApiResponse
def delete_dag_id(dag_id: str) -> Response:
"""
Delete a DAG dag_id from the filesystem and airflow db
:param dag_id: DAG to delete
:return: API Response
"""
dag_py_file = Path(AIRFLOW_DAGS_FOLDER) / f"{dag_id}.py"
deleted_file = False
if dag_py_file.is_file():
deleted_file = True
os.remove(dag_py_file.absolute())
with settings.Session() as session:
deleted_dags = (
session.query(DagModel).filter(DagModel.dag_id == dag_id).delete()
)
session.commit()
if deleted_dags > 0 and deleted_file:
return ApiResponse.success({"message": f"DAG [{dag_id}] has been deleted"})
return ApiResponse.error(
status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Could not find and delete {dag_id}. Deleted dags: {deleted_dags}; "
+ f"deleted {dag_py_file}: {deleted_file}",
)