Add Queued Status in Ingestion Pipelines (#8529)

This commit is contained in:
Mayur Singal 2022-11-04 11:19:50 +05:30 committed by GitHub
parent 1c1f8660f2
commit a49c261cbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 33 additions and 16 deletions

View File

@ -16,7 +16,7 @@ from typing import Callable
from flask import Blueprint, Response
from openmetadata_managed_apis.api.response import ApiResponse
from openmetadata_managed_apis.api.utils import get_arg_dag_id
from openmetadata_managed_apis.api.utils import get_arg_dag_id, get_arg_only_queued
from openmetadata_managed_apis.operations.status import status
from openmetadata_managed_apis.utils.logger import routes_logger
@ -44,9 +44,9 @@ def get_fn(blueprint: Blueprint) -> Callable:
Check the status of a DAG runs
"""
dag_id = get_arg_dag_id()
only_queued = get_arg_only_queued()
try:
return status(dag_id)
return status(dag_id, only_queued)
except Exception as exc:
logger.debug(traceback.format_exc())

View File

@ -78,6 +78,13 @@ def get_arg_dag_id() -> Optional[str]:
return clean_dag_id(raw_dag_id)
def get_arg_only_queued() -> Optional[str]:
"""
Try to fetch the only_queued from the args
"""
return get_request_arg(request, "only_queued", raise_missing=False)
def get_request_dag_id() -> Optional[str]:
"""
Try to fetch the dag_id from the JSON request

View File

@ -15,11 +15,12 @@ import json
from airflow import settings
from airflow.models import DagModel, DagRun
from airflow.utils.state import DagRunState
from flask import Response
from openmetadata_managed_apis.api.response import ApiResponse, ResponseFormat
def status(dag_id: str) -> Response:
def status(dag_id: str, only_queued: str = None) -> Response:
"""
Validate that the DAG is registered by Airflow.
If exists, check the DagRun
@ -34,16 +35,19 @@ def status(dag_id: str) -> Response:
if not dag_model:
return ApiResponse.not_found(f"DAG {dag_id} not found.")
runs = (
query = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
)
.order_by(DagRun.start_date.desc())
.limit(10)
.all()
)
if only_queued:
query = query.filter(DagRun.state == DagRunState.QUEUED)
runs = query.limit(10).all()
formatted = [
json.loads(ResponseFormat.format_dag_run_state(dag_run).json())
for dag_run in runs

View File

@ -18,6 +18,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
@ -25,6 +26,7 @@ import org.json.JSONObject;
import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration;
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.service.exception.IngestionPipelineDeploymentException;
import org.openmetadata.service.exception.PipelineServiceClientException;
@ -151,17 +153,15 @@ public class AirflowRESTClient extends PipelineServiceClient {
}
@Override
public IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline) {
public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String statusEndPoint = "%s/%s/status?dag_id=%s";
String statusEndPoint = "%s/%s/status?dag_id=%s&only_queued=true";
response =
getRequestAuthenticatedForJsonContent(statusEndPoint, serviceURL, API_ENDPOINT, ingestionPipeline.getName());
if (response.statusCode() == 200) {
ingestionPipeline.setDeployed(true);
return ingestionPipeline;
} else if (response.statusCode() == 404) {
ingestionPipeline.setDeployed(false);
List<PipelineStatus> pipelineStatusList = JsonUtils.readObjects(response.body(), PipelineStatus.class);
return pipelineStatusList;
}
} catch (Exception e) {
throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage());

View File

@ -207,8 +207,10 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
.listBetweenTimestampsByFQN(
ingestionPipeline.getFullyQualifiedName(), PIPELINE_STATUS_JSON_SCHEMA, startTs, endTs),
PipelineStatus.class);
List<PipelineStatus> allPipelineStatusList = pipelineServiceClient.getQueuedPipelineStatus(ingestionPipeline);
allPipelineStatusList.addAll(pipelineStatusList);
return new ResultList<>(
pipelineStatusList, String.valueOf(startTs), String.valueOf(endTs), pipelineStatusList.size());
allPipelineStatusList, String.valueOf(startTs), String.valueOf(endTs), allPipelineStatusList.size());
}
public PipelineStatus getLatestPipelineStatus(IngestionPipeline ingestionPipeline) throws IOException {

View File

@ -10,6 +10,7 @@ import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
@ -17,6 +18,7 @@ import javax.ws.rs.core.Response;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.service.OpenMetadataApplication;
import org.openmetadata.service.exception.PipelineServiceClientException;
import org.openmetadata.service.exception.PipelineServiceVersionException;
@ -146,7 +148,7 @@ public abstract class PipelineServiceClient {
public abstract String deletePipeline(String pipelineName);
/* Get the status of a deployed pipeline */
public abstract IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline);
public abstract List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline);
/* Toggle the state of an Ingestion Pipeline as enabled/disabled */
public abstract IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline);

View File

@ -1,10 +1,12 @@
package org.openmetadata.service.pipelineService;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.service.util.PipelineServiceClient;
public class MockPipelineServiceClient extends PipelineServiceClient {
@ -40,7 +42,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient {
}
@Override
public IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline) {
public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {
return null;
}