Use source when creating and updating indexes (#5597)

This commit is contained in:
Pere Miquel Brull 2022-06-26 21:18:14 +02:00 committed by GitHub
parent e640db3192
commit 816f8d49eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 227 additions and 4 deletions

View File

@ -118,6 +118,48 @@ public class AirflowRESTClient extends PipelineServiceClient {
pipelineName, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode(response.statusCode()));
}
@Override
public IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String toggleEndPoint;
String toggleUrl;
JSONObject requestPayload = new JSONObject();
requestPayload.put("dag_id", ingestionPipeline.getName());
// If the pipeline is currently enabled, disable it
if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) {
toggleEndPoint = "%s/rest_api/api?api=disable_dag";
toggleUrl = String.format(toggleEndPoint, serviceURL);
response = post(toggleUrl, requestPayload.toString());
if (response.statusCode() == 200) {
ingestionPipeline.setEnabled(false);
return ingestionPipeline;
} else if (response.statusCode() == 404) {
ingestionPipeline.setDeployed(false);
return ingestionPipeline;
}
// otherwise, enable it back
} else {
toggleEndPoint = "%s/rest_api/api?api=enable_dag";
toggleUrl = String.format(toggleEndPoint, serviceURL);
response = post(toggleUrl, requestPayload.toString());
if (response.statusCode() == 200) {
ingestionPipeline.setEnabled(true);
return ingestionPipeline;
} else if (response.statusCode() == 404) {
ingestionPipeline.setDeployed(false);
return ingestionPipeline;
}
}
} catch (Exception e) {
throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage());
}
throw PipelineServiceClientException.byMessage(
ingestionPipeline.getName(),
"Failed to toggle ingestion pipeline state",
Response.Status.fromStatusCode(response.statusCode()));
}
@Override
public IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;

View File

@ -33,8 +33,8 @@ import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.PipelineServiceClient;
public class IngestionPipelineRepository extends EntityRepository<IngestionPipeline> {
private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel";
private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel";
private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled";
private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled";
private static PipelineServiceClient pipelineServiceClient;
public IngestionPipelineRepository(CollectionDAO dao) {
@ -129,6 +129,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
updateOpenMetadataServerConnection(
original.getOpenMetadataServerConnection(), updated.getOpenMetadataServerConnection());
updateLogLevel(original.getLoggerLevel(), updated.getLoggerLevel());
updateEnabled(original.getEnabled(), updated.getEnabled());
}
private void updateSourceConfig(SourceConfig origSource, SourceConfig updatedSource)
@ -161,5 +162,11 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
recordChange("loggerLevel", origLevel, updatedLevel);
}
}
private void updateEnabled(Boolean origEnabled, Boolean updatedEnabled) throws JsonProcessingException {
if (updatedEnabled != null && !origEnabled.equals(updatedEnabled)) {
recordChange("enabled", origEnabled, updatedEnabled);
}
}
}
}

View File

@ -416,6 +416,31 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
return addHref(uriInfo, dao.get(uriInfo, id, fields));
}
@POST
@Path("/toggleIngestion/{id}")
@Operation(
operationId = "toggleIngestionPipelineEnabled",
summary = "Set an Ingestion pipeline either as Enabled or Disabled",
tags = "IngestionPipelines",
description = "Toggle an ingestion pipeline state by id.",
responses = {
@ApiResponse(
responseCode = "200",
description = "The ingestion",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = IngestionPipeline.class))),
@ApiResponse(responseCode = "404", description = "Ingestion for instance {id} is not found")
})
public Response toggleIngestion(
@Context UriInfo uriInfo, @PathParam("id") String id, @Context SecurityContext securityContext)
throws IOException {
Fields fields = getFields(FIELD_OWNER);
IngestionPipeline pipeline = dao.get(uriInfo, id, fields);
// This call updates the state in Airflow as well as the `enabled` field on the IngestionPipeline
pipelineServiceClient.toggleIngestion(pipeline);
return createOrUpdate(uriInfo, securityContext, pipeline, ADMIN | BOT | OWNER);
}
@POST
@Path("/testConnection")
@Operation(

View File

@ -88,6 +88,9 @@ public abstract class PipelineServiceClient {
/* Get the status of a deployed pipeline */
public abstract IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline);
/* Toggle the state of an Ingestion Pipeline as enabled/disabled */
public abstract IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline);
/* Get the all last run logs of a deployed pipeline */
public abstract Map<String, String> getLastIngestionLogs(String pipelineName);
}

View File

@ -170,6 +170,11 @@
"description": "Indicates if the workflow has been successfully deployed to Airflow.",
"type": "boolean"
},
"enabled": {
"description": "True if the pipeline is ready to be run in the next schedule. False if it is paused.",
"type": "boolean",
"default": true
},
"href": {
"description": "Link to this ingestion pipeline resource.",
"$ref": "../../../type/basic.json#/definitions/href"

View File

@ -22,7 +22,7 @@ def get_long_description():
base_requirements = {
"openmetadata-ingestion[airflow-container]~=0.9",
"openmetadata-ingestion[airflow-container]~=0.10",
"pendulum~=2.1.2",
"packaging~=21.2",
"setuptools~=58.3.0",

View File

@ -10,7 +10,7 @@
# limitations under the License.
from typing import Any, Dict, Optional
# TODO LOG (just link v1), ENABLE DAG, DISABLE DAG (play pause)
# TODO LOG (just link v1)
APIS_METADATA = [
{
"name": "deploy_dag",
@ -97,6 +97,34 @@ APIS_METADATA = [
"description": "Get the status of Airflow REST status",
"http_method": "GET",
},
{
"name": "enable_dag",
"description": "Mark the DAG as enabled to run on the next schedule.",
"http_method": "POST",
"arguments": [],
"post_arguments": [
{
"name": "dag_id",
"description": "The id of the dag",
"form_input_type": "text",
"required": True,
},
],
},
{
"name": "disable_dag",
"description": "Mark the DAG as disabled. It will not run on the next schedule.",
"http_method": "POST",
"arguments": [],
"post_arguments": [
{
"name": "dag_id",
"description": "The id of the dag",
"form_input_type": "text",
"required": True,
},
],
},
]

View File

@ -35,6 +35,7 @@ from openmetadata.api.utils import jwt_token_secure
from openmetadata.operations.delete import delete_dag_id
from openmetadata.operations.deploy import DagDeployer
from openmetadata.operations.last_dag_logs import last_dag_logs
from openmetadata.operations.state import disable_dag, enable_dag
from openmetadata.operations.status import status
from openmetadata.operations.test_connection import test_source_connection
from openmetadata.operations.trigger import trigger
@ -133,6 +134,10 @@ class REST_API(AppBuilderBaseView):
return self.delete_dag()
if api == "last_dag_logs":
return self.last_dag_logs()
if api == "enable_dag":
return self.enable_dag()
if api == "disable_dag":
return self.disable_dag()
raise ValueError(
f"Invalid api param {api}. Expected deploy_dag or trigger_dag."
@ -301,3 +306,45 @@ class REST_API(AppBuilderBaseView):
status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Failed to get last run logs for '{dag_id}' due to {exc} - {traceback.format_exc()}",
)
@staticmethod
def enable_dag() -> Response:
"""
Given a DAG ID, mark the dag as enabled
"""
request_json = request.get_json()
dag_id = request_json.get("dag_id")
if not dag_id:
return ApiResponse.bad_request(f"Missing dag_id argument in the request")
try:
return enable_dag(dag_id)
except Exception as exc:
logging.info(f"Failed to get last run logs for '{dag_id}'")
return ApiResponse.error(
status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Failed to get last run logs for '{dag_id}' due to {exc} - {traceback.format_exc()}",
)
@staticmethod
def disable_dag() -> Response:
"""
Given a DAG ID, mark the dag as disabled
"""
request_json = request.get_json()
dag_id = request_json.get("dag_id")
if not dag_id:
return ApiResponse.bad_request(f"Missing dag_id argument in the request")
try:
return disable_dag(dag_id)
except Exception as exc:
logging.info(f"Failed to get last run logs for '{dag_id}'")
return ApiResponse.error(
status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Failed to get last run logs for '{dag_id}' due to {exc} - {traceback.format_exc()}",
)

View File

@ -0,0 +1,66 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module containing the logic to toggle DAG state between enabled/disabled
"""
from airflow import settings
from airflow.models import DagModel
from flask import Response
from openmetadata.api.response import ApiResponse
def _update_dag_state(dag_id: str, paused: bool, message: str) -> Response:
"""
Given a DAG id, a boolean and a message, update
the DAG state between paused or unpaused.
:param dag_id: DAG to update
:param paused: state to set
:param message: message to return
:return: API Response
"""
with settings.Session() as session:
dag_model: DagModel = (
session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
)
if not dag_model:
return ApiResponse.not_found(f"DAG {dag_id} not found.")
dag_model.is_paused = paused
session.commit()
return ApiResponse.success({"message": message})
def enable_dag(dag_id: str) -> Response:
"""
Unpause the DAG
:param dag_id: DAG to find
:return: API Response
"""
return _update_dag_state(
dag_id=dag_id, paused=False, message=f"DAG {dag_id} has been enabled"
)
def disable_dag(dag_id: str) -> Response:
"""
Pause the DAG
:param dag_id: DAG to find
:return: API Response
"""
return _update_dag_state(
dag_id=dag_id, paused=True, message=f"DAG {dag_id} has been disabled"
)