diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java index e272678a61f..ba2e2d1162e 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java @@ -21,12 +21,15 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.time.Duration; +import java.util.List; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; import org.json.JSONObject; import org.openmetadata.catalog.airflow.models.AirflowAuthRequest; import org.openmetadata.catalog.airflow.models.AirflowAuthResponse; +import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection; import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineStatus; import org.openmetadata.catalog.exception.AirflowException; import org.openmetadata.catalog.exception.IngestionPipelineDeploymentException; import org.openmetadata.catalog.util.JsonUtils; @@ -150,4 +153,57 @@ public class AirflowRESTClient { throw IngestionPipelineDeploymentException.byMessage(pipelineName, e.getMessage()); } } + + public IngestionPipeline getStatus(IngestionPipeline ingestionPipeline) { + try { + String token = authenticate(); + String authToken = String.format(AUTH_TOKEN, token); + String statusEndPoint = "%s/rest_api/api?api=dag_status&dag_id=%s"; + String statusUrl = String.format(statusEndPoint, airflowURL, ingestionPipeline.getName()); + JSONObject requestPayload = new JSONObject(); + HttpRequest request = + HttpRequest.newBuilder(URI.create(statusUrl)) + .header(CONTENT_HEADER, CONTENT_TYPE) + .header(AUTH_HEADER, authToken) + .POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString())) + .build(); + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() == 200) { + List statuses = JsonUtils.readObjects(response.body(), PipelineStatus.class); + ingestionPipeline.setPipelineStatuses(statuses); + return ingestionPipeline; + } + + throw AirflowException.byMessage( + ingestionPipeline.getName(), + "Failed to fetch ingestion pipeline runs", + Response.Status.fromStatusCode(response.statusCode())); + } catch (Exception e) { + throw AirflowException.byMessage(ingestionPipeline.getName(), e.getMessage()); + } + } + + public HttpResponse testConnection(TestServiceConnection testServiceConnection) { + try { + String token = authenticate(); + String authToken = String.format(AUTH_TOKEN, token); + String statusEndPoint = "%s/rest_api/api?api=test_connection"; + String statusUrl = String.format(statusEndPoint, airflowURL); + String connectionPayload = JsonUtils.pojoToJson(testServiceConnection); + HttpRequest request = + HttpRequest.newBuilder(URI.create(statusUrl)) + .header(CONTENT_HEADER, CONTENT_TYPE) + .header(AUTH_HEADER, authToken) + .POST(HttpRequest.BodyPublishers.ofString(connectionPayload)) + .build(); + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() == 200) { + return response; + } + throw AirflowException.byMessage( + "Failed to test connection.", String.valueOf(Response.Status.fromStatusCode(response.statusCode()))); + } catch (Exception e) { + throw AirflowException.byMessage("Failed to test connection.", e.getMessage()); + } + } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java index deea923b0bd..4af55adcab3 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java @@ -17,6 +17,7 @@ import static org.openmetadata.catalog.Entity.FIELD_OWNER; import static org.openmetadata.catalog.security.SecurityUtil.ADMIN; import static org.openmetadata.catalog.security.SecurityUtil.BOT; import static org.openmetadata.catalog.security.SecurityUtil.OWNER; +import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import io.swagger.annotations.Api; import io.swagger.v3.oas.annotations.ExternalDocumentation; @@ -28,6 +29,7 @@ import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.responses.ApiResponse; import java.io.IOException; +import java.net.http.HttpResponse; import java.util.List; import java.util.Locale; import java.util.UUID; @@ -57,6 +59,7 @@ import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.airflow.AirflowConfiguration; import org.openmetadata.catalog.airflow.AirflowRESTClient; import org.openmetadata.catalog.api.services.ingestionPipelines.CreateIngestionPipeline; +import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection; import org.openmetadata.catalog.entity.services.DashboardService; import org.openmetadata.catalog.entity.services.DatabaseService; import org.openmetadata.catalog.entity.services.MessagingService; @@ -164,7 +167,12 @@ public class IngestionPipelineResource extends EntityResource ingestionPipelines = + super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after); + if (fieldsParam != null && fieldsParam.contains("pipelineStatuses")) { + addStatus(ingestionPipelines.getData()); + } + return ingestionPipelines; } @GET @@ -217,7 +225,11 @@ public class IngestionPipelineResource extends EntityResource response = airflowRESTClient.testConnection(testServiceConnection); + return Response.status(200, response.body()).build(); + } + @DELETE @Path("/{id}") @Operation( @@ -463,4 +502,17 @@ public class IngestionPipelineResource extends EntityResource ingestionPipelines) { + listOrEmpty(ingestionPipelines).forEach(this::addStatus); + } + + private IngestionPipeline addStatus(IngestionPipeline ingestionPipeline) { + try { + ingestionPipeline = airflowRESTClient.getStatus(ingestionPipeline); + } catch (Exception e) { + LOG.error("Failed to fetch status for {} due to {}", ingestionPipeline.getName(), e); + } + return ingestionPipeline; + } } diff --git a/catalog-rest-service/src/main/resources/json/schema/api/services/ingestionPipelines/testServiceConnection.json b/catalog-rest-service/src/main/resources/json/schema/api/services/ingestionPipelines/testServiceConnection.json new file mode 100644 index 00000000000..b3cb3428617 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/api/services/ingestionPipelines/testServiceConnection.json @@ -0,0 +1,40 @@ +{ + "$id": "https://open-metadata.org/schema/api/services/ingestionPipelines/testServiceConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "TestServiceConnectionRequest", + "description": "Test Service Connection to test user provided configuration is valid or not.", + "type": "object", + "properties": { + "connection": { + "description": "Database Connection.", + "oneOf": [ + { + "$ref": "../../../entity/services/databaseService.json#/definitions/databaseConnection" + }, + { + "$ref": "../../../entity/services/dashboardService.json#/definitions/dashboardConnection" + }, + { + "$ref": "../../../entity/services/messagingService.json#/definitions/messagingConnection" + } + ] + }, + "connectionType": { + "description": "Type of database service such as MySQL, BigQuery, Snowflake, Redshift, Postgres...", + "type": "string", + "enum": ["Database", "Dashboard", "Messaging"], + "javaEnums": [ + { + "name": "Database" + }, + { + "name": "Dashboard" + }, + { + "name": "Messaging" + } + ] + } + }, + "additionalProperties": false +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json index b0c43dbce3b..959da5fabf7 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json @@ -11,6 +11,30 @@ "javaType": "org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType", "enum": ["metadata", "usage"] }, + "pipelineStatus": { + "type": "object", + "javaType": "org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineStatus", + "description": "This defines runtime status of Pipeline.", + "properties": { + "runId": { + "description": "Pipeline unique run ID.", + "type": "string" + }, + "state": { + "description": "Pipeline status denotes if its failed or succeeded.", + "type": "string" + }, + "startDate": { + "description": "startDate of the pipeline run for this particular execution.", + "type": "string" + }, + "endDate": { + "description": "endDate of the pipeline run for this particular execution.", + "type": "string" + } + }, + "additionalProperties": false + }, "airflowConfig": { "description": "Properties to configure the Airflow pipeline that will run the workflow.", "type": "object", @@ -151,6 +175,18 @@ "description": "Link to the database service where this database is hosted in.", "$ref": "../../../type/entityReference.json" }, + "pipelineStatuses": { + "description": "List of executions and status for the Pipeline.", + "type": "array", + "items": { + "$ref": "#/definitions/pipelineStatus" + }, + "default": null + }, + "nextExecutionDate": { + "description": "Next execution date from the underlying pipeline platform once the pipeline scheduled.", + "$ref": "../../../type/basic.json#/definitions/date" + }, "href": { "description": "Link to this ingestion pipeline resource.", "$ref": "../../../type/basic.json#/definitions/href" diff --git a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py index 8da62d81a5e..8e6592a8a6e 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py @@ -48,7 +48,7 @@ APIS_METADATA = [ "post_arguments": [ { "name": "service_connection", - "description": "ServiceConnectionModel config to test", + "description": "TestServiceConnectionRequest config to test", "required": True, }, ], diff --git a/openmetadata-airflow-apis/src/openmetadata/api/response.py b/openmetadata-airflow-apis/src/openmetadata/api/response.py index 42aef8a6b93..770a0d8e966 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/response.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/response.py @@ -10,11 +10,15 @@ # limitations under the License. import json -from typing import Optional +from typing import Optional, Union from airflow.models import DagRun from flask import Response +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineStatus, +) + class ApiResponse: """ @@ -37,12 +41,8 @@ class ApiResponse: return resp @staticmethod - def success(response_obj: Optional[dict] = None): - if not response_obj: - response_obj = {} - - response_obj["status"] = "success" - return ApiResponse.standard_response(ApiResponse.STATUS_OK, response_obj) + def success(response_obj: Union[Optional[dict], Optional[list]] = None): + return ApiResponse.standard_response(ApiResponse.STATUS_OK, response_obj or {}) @staticmethod def error(status, error): @@ -70,47 +70,14 @@ class ResponseFormat: pass @staticmethod - def format_dag_run_state(dag_run: DagRun): - return { - "state": dag_run.get_state(), - "run_id": dag_run.run_id, - "startDate": ( - None - if not dag_run.start_date - else dag_run.start_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z") - ), - "endDate": ( - None - if not dag_run.end_date - else dag_run.end_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z") - ), - } - - @staticmethod - def format_dag_task(task_instance): - return { - "taskId": task_instance.task_id, - "dagId": task_instance.dag_id, - "state": task_instance.state, - "tryNumber": ( - None - if not task_instance._try_number - else str(task_instance._try_number) - ), - "maxTries": ( - None if not task_instance.max_tries else str(task_instance.max_tries) - ), - "startDate": ( - None - if not task_instance.start_date - else task_instance.start_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z") - ), - "endDate": ( - None - if not task_instance.end_date - else task_instance.end_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z") - ), - "duration": ( - None if not task_instance.duration else str(task_instance.duration) - ), - } + def format_dag_run_state(dag_run: DagRun) -> PipelineStatus: + return PipelineStatus( + state=dag_run.get_state(), + runId=dag_run.run_id, + startDate=None + if not dag_run.start_date + else dag_run.start_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z"), + endDate=None + if not dag_run.end_date + else dag_run.end_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z"), + ) diff --git a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py index 2bdecbf5236..43af7003679 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py @@ -39,8 +39,8 @@ from openmetadata.operations.test_connection import test_source_connection from openmetadata.operations.trigger import trigger from pydantic.error_wrappers import ValidationError -from metadata.generated.schema.entity.services.connections.serviceConnection import ( - ServiceConnectionModel, +from metadata.generated.schema.api.services.ingestionPipelines.testServiceConnection import ( + TestServiceConnectionRequest, ) from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, @@ -172,10 +172,8 @@ class REST_API(AppBuilderBaseView): json_request = request.get_json() try: - service_connection_model = ServiceConnectionModel(**json_request) - response = test_source_connection( - service_connection_model.serviceConnection.__root__.config - ) + test_service_connection = TestServiceConnectionRequest(**json_request) + response = test_source_connection(test_service_connection) return response diff --git a/openmetadata-airflow-apis/src/openmetadata/operations/status.py b/openmetadata-airflow-apis/src/openmetadata/operations/status.py index fd33002ccfc..75aacf25fec 100644 --- a/openmetadata-airflow-apis/src/openmetadata/operations/status.py +++ b/openmetadata-airflow-apis/src/openmetadata/operations/status.py @@ -11,6 +11,7 @@ """ Module containing the logic to check a DAG status """ +import json from typing import Optional from airflow import settings @@ -18,6 +19,10 @@ from airflow.models import DagRun from flask import Response from openmetadata.api.response import ApiResponse, ResponseFormat +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineStatus, +) + def status(dag_id: str, run_id: Optional[str]) -> Response: @@ -34,9 +39,9 @@ def status(dag_id: str, run_id: Optional[str]) -> Response: if dag_run is None: return ApiResponse.not_found(f"DAG run {run_id} not found") - res_dag_run = ResponseFormat.format_dag_run_state(dag_run) + res_dag_run: PipelineStatus = ResponseFormat.format_dag_run_state(dag_run) - return ApiResponse.success({"message": f"{res_dag_run}"}) + return ApiResponse.success(json.loads(res_dag_run.json())) runs = ( query.filter( @@ -47,6 +52,9 @@ def status(dag_id: str, run_id: Optional[str]) -> Response: .all() ) - formatted = [ResponseFormat.format_dag_run_state(dag_run) for dag_run in runs] + formatted = [ + json.loads(ResponseFormat.format_dag_run_state(dag_run).json()) + for dag_run in runs + ] - return ApiResponse.success({"message": f"{formatted}"}) + return ApiResponse.success(formatted) diff --git a/openmetadata-airflow-apis/src/openmetadata/operations/test_connection.py b/openmetadata-airflow-apis/src/openmetadata/operations/test_connection.py index fcd5c2d295c..60ed3edccfb 100644 --- a/openmetadata-airflow-apis/src/openmetadata/operations/test_connection.py +++ b/openmetadata-airflow-apis/src/openmetadata/operations/test_connection.py @@ -15,6 +15,9 @@ from a WorkflowSource from flask import Response from openmetadata.api.response import ApiResponse +from metadata.generated.schema.api.services.ingestionPipelines.testServiceConnection import ( + TestServiceConnectionRequest, +) from metadata.utils.engines import ( SourceConnectionException, get_engine, @@ -22,13 +25,15 @@ from metadata.utils.engines import ( ) -def test_source_connection(connection) -> Response: +def test_source_connection( + test_service_connection: TestServiceConnectionRequest, +) -> Response: """ Create the engine and test the connection :param workflow_source: Source to test :return: None or exception """ - engine = get_engine(connection) + engine = get_engine(test_service_connection.connection.config) try: test_connection(engine)