diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/status.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/status.py index 533128ce4c6..faa14ccc77a 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/status.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/status.py @@ -16,7 +16,7 @@ from typing import Callable from flask import Blueprint, Response from openmetadata_managed_apis.api.response import ApiResponse -from openmetadata_managed_apis.api.utils import get_arg_dag_id +from openmetadata_managed_apis.api.utils import get_arg_dag_id, get_arg_only_queued from openmetadata_managed_apis.operations.status import status from openmetadata_managed_apis.utils.logger import routes_logger @@ -44,9 +44,9 @@ def get_fn(blueprint: Blueprint) -> Callable: Check the status of a DAG runs """ dag_id = get_arg_dag_id() - + only_queued = get_arg_only_queued() try: - return status(dag_id) + return status(dag_id, only_queued) except Exception as exc: logger.debug(traceback.format_exc()) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py index 4086c6921da..ff91d632f82 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py @@ -78,6 +78,13 @@ def get_arg_dag_id() -> Optional[str]: return clean_dag_id(raw_dag_id) +def get_arg_only_queued() -> Optional[str]: + """ + Try to fetch the only_queued from the args + """ + return get_request_arg(request, "only_queued", raise_missing=False) + + def get_request_dag_id() -> Optional[str]: """ Try to fetch the dag_id from the JSON request diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/operations/status.py b/openmetadata-airflow-apis/openmetadata_managed_apis/operations/status.py index df42d4189b6..45bbfc40353 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/operations/status.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/operations/status.py @@ -15,11 +15,12 @@ import json from airflow import settings from airflow.models import DagModel, DagRun +from airflow.utils.state import DagRunState from flask import Response from openmetadata_managed_apis.api.response import ApiResponse, ResponseFormat -def status(dag_id: str) -> Response: +def status(dag_id: str, only_queued: str = None) -> Response: """ Validate that the DAG is registered by Airflow. If exists, check the DagRun @@ -34,16 +35,19 @@ def status(dag_id: str) -> Response: if not dag_model: return ApiResponse.not_found(f"DAG {dag_id} not found.") - runs = ( + query = ( session.query(DagRun) .filter( DagRun.dag_id == dag_id, ) .order_by(DagRun.start_date.desc()) - .limit(10) - .all() ) + if only_queued: + query = query.filter(DagRun.state == DagRunState.QUEUED) + + runs = query.limit(10).all() + formatted = [ json.loads(ResponseFormat.format_dag_run_state(dag_run).json()) for dag_run in runs diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowRESTClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowRESTClient.java index 9371838c07e..9a8e3a21c41 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowRESTClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowRESTClient.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.net.URI; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.util.List; import java.util.Map; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; @@ -25,6 +26,7 @@ import org.json.JSONObject; import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration; import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection; import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType; import org.openmetadata.service.exception.IngestionPipelineDeploymentException; import org.openmetadata.service.exception.PipelineServiceClientException; @@ -151,17 +153,15 @@ public class AirflowRESTClient extends PipelineServiceClient { } @Override - public IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline) { + public List getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) { HttpResponse response; try { - String statusEndPoint = "%s/%s/status?dag_id=%s"; + String statusEndPoint = "%s/%s/status?dag_id=%s&only_queued=true"; response = getRequestAuthenticatedForJsonContent(statusEndPoint, serviceURL, API_ENDPOINT, ingestionPipeline.getName()); if (response.statusCode() == 200) { - ingestionPipeline.setDeployed(true); - return ingestionPipeline; - } else if (response.statusCode() == 404) { - ingestionPipeline.setDeployed(false); + List pipelineStatusList = JsonUtils.readObjects(response.body(), PipelineStatus.class); + return pipelineStatusList; } } catch (Exception e) { throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java index 3890f07e150..75341c390f6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java @@ -207,8 +207,10 @@ public class IngestionPipelineRepository extends EntityRepository allPipelineStatusList = pipelineServiceClient.getQueuedPipelineStatus(ingestionPipeline); + allPipelineStatusList.addAll(pipelineStatusList); return new ResultList<>( - pipelineStatusList, String.valueOf(startTs), String.valueOf(endTs), pipelineStatusList.size()); + allPipelineStatusList, String.valueOf(startTs), String.valueOf(endTs), allPipelineStatusList.size()); } public PipelineStatus getLatestPipelineStatus(IngestionPipeline ingestionPipeline) throws IOException { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/PipelineServiceClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/PipelineServiceClient.java index a290273fc8b..ce3c85f2c50 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/PipelineServiceClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/PipelineServiceClient.java @@ -10,6 +10,7 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.time.Duration; import java.util.Base64; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; @@ -17,6 +18,7 @@ import javax.ws.rs.core.Response; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection; import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; import org.openmetadata.service.OpenMetadataApplication; import org.openmetadata.service.exception.PipelineServiceClientException; import org.openmetadata.service.exception.PipelineServiceVersionException; @@ -146,7 +148,7 @@ public abstract class PipelineServiceClient { public abstract String deletePipeline(String pipelineName); /* Get the status of a deployed pipeline */ - public abstract IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline); + public abstract List getQueuedPipelineStatus(IngestionPipeline ingestionPipeline); /* Toggle the state of an Ingestion Pipeline as enabled/disabled */ public abstract IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java b/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java index 048189141c7..7a1b820adcb 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java @@ -1,10 +1,12 @@ package org.openmetadata.service.pipelineService; import java.net.http.HttpResponse; +import java.util.List; import java.util.Map; import javax.ws.rs.core.Response; import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection; import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; import org.openmetadata.service.util.PipelineServiceClient; public class MockPipelineServiceClient extends PipelineServiceClient { @@ -40,7 +42,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient { } @Override - public IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline) { + public List getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) { return null; }