Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

67 lines
2.3 KiB
Python
Raw Permalink Normal View History

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module containing the logic to kill all DAG not finished executions
"""
from typing import List
from airflow import settings
from airflow.models import DagModel, DagRun, TaskInstance
from airflow.utils.state import DagRunState, TaskInstanceState
from flask import Response
from openmetadata_managed_apis.api.response import ApiResponse
def kill_all(dag_id: str) -> Response:
"""
Validate that the DAG is registered by Airflow.
If exists, check the DagRun
:param dag_id: DAG to find
:return: API Response
"""
with settings.Session() as session:
dag_model = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
if not dag_model:
return ApiResponse.not_found(f"DAG {dag_id} not found.")
runs: List[DagRun] = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
DagRun.state.in_((DagRunState.RUNNING, DagRunState.QUEUED)),
)
.all()
)
instances: List[TaskInstance] = session.query(TaskInstance).filter(
TaskInstance.dag_id == dag_id,
TaskInstance.state.notin_(
(TaskInstanceState.SUCCESS, TaskInstanceState.FAILED)
),
)
if not runs or not instances:
return ApiResponse.not_found(
f"Workflow [{dag_id}] has no running or pending runs nor tasks"
)
for dag_run in runs:
dag_run.set_state(DagRunState.FAILED)
for instance in instances:
instance.set_state(TaskInstanceState.FAILED)
session.commit()
return ApiResponse.success({"message": f"Workflow [{dag_id}] has been killed"})