| 
									
										
										
										
											2022-04-14 21:52:13 +02:00
										 |  |  | #  Copyright 2021 Collate | 
					
						
							|  |  |  | #  Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							|  |  |  | #  http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | Module containing the logic to delete a DAG | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | from pathlib import Path | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from airflow import settings | 
					
						
							| 
									
										
										
										
											2022-05-11 16:02:06 +02:00
										 |  |  | from airflow.models import DagModel, DagRun | 
					
						
							| 
									
										
										
										
											2022-04-14 21:52:13 +02:00
										 |  |  | from flask import Response | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | from openmetadata_managed_apis.api.config import ( | 
					
						
							|  |  |  |     AIRFLOW_DAGS_FOLDER, | 
					
						
							|  |  |  |     DAG_GENERATED_CONFIGS, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from openmetadata_managed_apis.api.response import ApiResponse | 
					
						
							| 
									
										
										
										
											2022-04-14 21:52:13 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def delete_dag_id(dag_id: str) -> Response: | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2022-05-09 15:24:58 +02:00
										 |  |  |     Delete a DAG dag_id from the filesystem and airflow db. | 
					
						
							|  |  |  |     We clean: | 
					
						
							|  |  |  |     - py file in AIRFLOW_DAGS_FOLDER | 
					
						
							|  |  |  |     - config file in DAG_GENERATED_CONFIGS | 
					
						
							| 
									
										
										
										
											2022-05-11 16:02:06 +02:00
										 |  |  |     - DagModel and DagRun entries in airflow db | 
					
						
							| 
									
										
										
										
											2022-04-14 21:52:13 +02:00
										 |  |  |     :param dag_id: DAG to delete | 
					
						
							|  |  |  |     :return: API Response | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     dag_py_file = Path(AIRFLOW_DAGS_FOLDER) / f"{dag_id}.py" | 
					
						
							| 
									
										
										
										
											2022-05-09 15:24:58 +02:00
										 |  |  |     config_file = Path(DAG_GENERATED_CONFIGS) / f"{dag_id}.json" | 
					
						
							| 
									
										
										
										
											2022-04-14 21:52:13 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     deleted_file = False | 
					
						
							|  |  |  |     if dag_py_file.is_file(): | 
					
						
							|  |  |  |         deleted_file = True | 
					
						
							|  |  |  |         os.remove(dag_py_file.absolute()) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-09 15:24:58 +02:00
										 |  |  |     deleted_config = False | 
					
						
							|  |  |  |     if config_file.is_file(): | 
					
						
							|  |  |  |         deleted_config = True | 
					
						
							|  |  |  |         os.remove(config_file.absolute()) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-14 21:52:13 +02:00
										 |  |  |     with settings.Session() as session: | 
					
						
							|  |  |  |         deleted_dags = ( | 
					
						
							|  |  |  |             session.query(DagModel).filter(DagModel.dag_id == dag_id).delete() | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2022-05-11 16:02:06 +02:00
										 |  |  |         session.query(DagRun).filter(DagRun.dag_id == dag_id).delete() | 
					
						
							| 
									
										
										
										
											2022-04-14 21:52:13 +02:00
										 |  |  |         session.commit() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-09 15:24:58 +02:00
										 |  |  |     if deleted_dags > 0 and deleted_file and deleted_config: | 
					
						
							| 
									
										
										
										
											2022-04-14 21:52:13 +02:00
										 |  |  |         return ApiResponse.success({"message": f"DAG [{dag_id}] has been deleted"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return ApiResponse.error( | 
					
						
							|  |  |  |         status=ApiResponse.STATUS_SERVER_ERROR, | 
					
						
							|  |  |  |         error=f"Could not find and delete {dag_id}. Deleted dags: {deleted_dags}; " | 
					
						
							|  |  |  |         + f"deleted {dag_py_file}: {deleted_file}", | 
					
						
							|  |  |  |     ) |