From 816f8d49eb426958c9f5cec2bfafcaa1fc73a56c Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Sun, 26 Jun 2022 21:18:14 +0200 Subject: [PATCH] Use source when creating and updating indexes (#5597) --- .../catalog/airflow/AirflowRESTClient.java | 42 ++++++++++++ .../jdbi3/IngestionPipelineRepository.java | 11 +++- .../IngestionPipelineResource.java | 25 +++++++ .../catalog/util/PipelineServiceClient.java | 3 + .../ingestionPipelines/ingestionPipeline.json | 5 ++ openmetadata-airflow-apis/setup.py | 2 +- .../src/openmetadata/api/apis_metadata.py | 30 ++++++++- .../src/openmetadata/api/rest_api.py | 47 +++++++++++++ .../src/openmetadata/operations/state.py | 66 +++++++++++++++++++ 9 files changed, 227 insertions(+), 4 deletions(-) create mode 100644 openmetadata-airflow-apis/src/openmetadata/operations/state.py diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java index 72e7f5ca798..cdd25275e9b 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java @@ -118,6 +118,48 @@ public class AirflowRESTClient extends PipelineServiceClient { pipelineName, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode(response.statusCode())); } + @Override + public IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline) { + HttpResponse response; + try { + String toggleEndPoint; + String toggleUrl; + JSONObject requestPayload = new JSONObject(); + requestPayload.put("dag_id", ingestionPipeline.getName()); + // If the pipeline is currently enabled, disable it + if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) { + toggleEndPoint = "%s/rest_api/api?api=disable_dag"; + toggleUrl = String.format(toggleEndPoint, serviceURL); + response = post(toggleUrl, requestPayload.toString()); + if (response.statusCode() == 200) { + ingestionPipeline.setEnabled(false); + return ingestionPipeline; + } else if (response.statusCode() == 404) { + ingestionPipeline.setDeployed(false); + return ingestionPipeline; + } + // otherwise, enable it back + } else { + toggleEndPoint = "%s/rest_api/api?api=enable_dag"; + toggleUrl = String.format(toggleEndPoint, serviceURL); + response = post(toggleUrl, requestPayload.toString()); + if (response.statusCode() == 200) { + ingestionPipeline.setEnabled(true); + return ingestionPipeline; + } else if (response.statusCode() == 404) { + ingestionPipeline.setDeployed(false); + return ingestionPipeline; + } + } + } catch (Exception e) { + throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage()); + } + throw PipelineServiceClientException.byMessage( + ingestionPipeline.getName(), + "Failed to toggle ingestion pipeline state", + Response.Status.fromStatusCode(response.statusCode())); + } + @Override public IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline) { HttpResponse response; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java index b9fcb2a552d..40a13cce633 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java @@ -33,8 +33,8 @@ import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.PipelineServiceClient; public class IngestionPipelineRepository extends EntityRepository { - private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel"; - private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel"; + private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled"; + private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled"; private static PipelineServiceClient pipelineServiceClient; public IngestionPipelineRepository(CollectionDAO dao) { @@ -129,6 +129,7 @@ public class IngestionPipelineRepository extends EntityRepository getLastIngestionLogs(String pipelineName); } diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json index a34ef2f2db8..0e83a481ad2 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json @@ -170,6 +170,11 @@ "description": "Indicates if the workflow has been successfully deployed to Airflow.", "type": "boolean" }, + "enabled": { + "description": "True if the pipeline is ready to be run in the next schedule. False if it is paused.", + "type": "boolean", + "default": true + }, "href": { "description": "Link to this ingestion pipeline resource.", "$ref": "../../../type/basic.json#/definitions/href" diff --git a/openmetadata-airflow-apis/setup.py b/openmetadata-airflow-apis/setup.py index 6b24801ee28..8b0e6f13033 100644 --- a/openmetadata-airflow-apis/setup.py +++ b/openmetadata-airflow-apis/setup.py @@ -22,7 +22,7 @@ def get_long_description(): base_requirements = { - "openmetadata-ingestion[airflow-container]~=0.9", + "openmetadata-ingestion[airflow-container]~=0.10", "pendulum~=2.1.2", "packaging~=21.2", "setuptools~=58.3.0", diff --git a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py index 5d9b86bb9ed..77be7230b9c 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py @@ -10,7 +10,7 @@ # limitations under the License. from typing import Any, Dict, Optional -# TODO LOG (just link v1), ENABLE DAG, DISABLE DAG (play pause) +# TODO LOG (just link v1) APIS_METADATA = [ { "name": "deploy_dag", @@ -97,6 +97,34 @@ APIS_METADATA = [ "description": "Get the status of Airflow REST status", "http_method": "GET", }, + { + "name": "enable_dag", + "description": "Mark the DAG as enabled to run on the next schedule.", + "http_method": "POST", + "arguments": [], + "post_arguments": [ + { + "name": "dag_id", + "description": "The id of the dag", + "form_input_type": "text", + "required": True, + }, + ], + }, + { + "name": "disable_dag", + "description": "Mark the DAG as disabled. It will not run on the next schedule.", + "http_method": "POST", + "arguments": [], + "post_arguments": [ + { + "name": "dag_id", + "description": "The id of the dag", + "form_input_type": "text", + "required": True, + }, + ], + }, ] diff --git a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py index 5de0c69111e..625fe068412 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py @@ -35,6 +35,7 @@ from openmetadata.api.utils import jwt_token_secure from openmetadata.operations.delete import delete_dag_id from openmetadata.operations.deploy import DagDeployer from openmetadata.operations.last_dag_logs import last_dag_logs +from openmetadata.operations.state import disable_dag, enable_dag from openmetadata.operations.status import status from openmetadata.operations.test_connection import test_source_connection from openmetadata.operations.trigger import trigger @@ -133,6 +134,10 @@ class REST_API(AppBuilderBaseView): return self.delete_dag() if api == "last_dag_logs": return self.last_dag_logs() + if api == "enable_dag": + return self.enable_dag() + if api == "disable_dag": + return self.disable_dag() raise ValueError( f"Invalid api param {api}. Expected deploy_dag or trigger_dag." @@ -301,3 +306,45 @@ class REST_API(AppBuilderBaseView): status=ApiResponse.STATUS_SERVER_ERROR, error=f"Failed to get last run logs for '{dag_id}' due to {exc} - {traceback.format_exc()}", ) + + @staticmethod + def enable_dag() -> Response: + """ + Given a DAG ID, mark the dag as enabled + """ + request_json = request.get_json() + + dag_id = request_json.get("dag_id") + if not dag_id: + return ApiResponse.bad_request(f"Missing dag_id argument in the request") + + try: + return enable_dag(dag_id) + + except Exception as exc: + logging.info(f"Failed to get last run logs for '{dag_id}'") + return ApiResponse.error( + status=ApiResponse.STATUS_SERVER_ERROR, + error=f"Failed to get last run logs for '{dag_id}' due to {exc} - {traceback.format_exc()}", + ) + + @staticmethod + def disable_dag() -> Response: + """ + Given a DAG ID, mark the dag as disabled + """ + request_json = request.get_json() + + dag_id = request_json.get("dag_id") + if not dag_id: + return ApiResponse.bad_request(f"Missing dag_id argument in the request") + + try: + return disable_dag(dag_id) + + except Exception as exc: + logging.info(f"Failed to get last run logs for '{dag_id}'") + return ApiResponse.error( + status=ApiResponse.STATUS_SERVER_ERROR, + error=f"Failed to get last run logs for '{dag_id}' due to {exc} - {traceback.format_exc()}", + ) diff --git a/openmetadata-airflow-apis/src/openmetadata/operations/state.py b/openmetadata-airflow-apis/src/openmetadata/operations/state.py new file mode 100644 index 00000000000..24905834264 --- /dev/null +++ b/openmetadata-airflow-apis/src/openmetadata/operations/state.py @@ -0,0 +1,66 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Module containing the logic to toggle DAG state between enabled/disabled +""" +from airflow import settings +from airflow.models import DagModel +from flask import Response +from openmetadata.api.response import ApiResponse + + +def _update_dag_state(dag_id: str, paused: bool, message: str) -> Response: + """ + Given a DAG id, a boolean and a message, update + the DAG state between paused or unpaused. + :param dag_id: DAG to update + :param paused: state to set + :param message: message to return + :return: API Response + """ + + with settings.Session() as session: + + dag_model: DagModel = ( + session.query(DagModel).filter(DagModel.dag_id == dag_id).first() + ) + + if not dag_model: + return ApiResponse.not_found(f"DAG {dag_id} not found.") + + dag_model.is_paused = paused + session.commit() + + return ApiResponse.success({"message": message}) + + +def enable_dag(dag_id: str) -> Response: + """ + Unpause the DAG + :param dag_id: DAG to find + :return: API Response + """ + + return _update_dag_state( + dag_id=dag_id, paused=False, message=f"DAG {dag_id} has been enabled" + ) + + +def disable_dag(dag_id: str) -> Response: + """ + Pause the DAG + :param dag_id: DAG to find + :return: API Response + """ + + return _update_dag_state( + dag_id=dag_id, paused=True, message=f"DAG {dag_id} has been disabled" + )