diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java index c8f36e76989..44e57416fa3 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java @@ -189,6 +189,29 @@ public class AirflowRESTClient { Response.Status.fromStatusCode(response.statusCode())); } + public HttpResponse getRESTStatus() { + HttpResponse response; + try { + String token = authenticate(); + String authToken = String.format(AUTH_TOKEN, token); + String statusEndPoint = "%s/rest_api/api?api=rest_status"; + String statusUrl = String.format(statusEndPoint, airflowURL); + HttpRequest request = + HttpRequest.newBuilder(URI.create(statusUrl)) + .header(CONTENT_HEADER, CONTENT_TYPE) + .header(AUTH_HEADER, authToken) + .GET() + .build(); + response = client.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() == 200) { + return response; + } + } catch (Exception e) { + throw AirflowException.byMessage("Failed to get REST status.", e.getMessage()); + } + throw new AirflowException(String.format("Failed to get REST status due to %s", response.body())); + } + public HttpResponse testConnection(TestServiceConnection testServiceConnection) { HttpResponse response; try { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java index 0a33653ff6a..4b7f56022a9 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java @@ -403,9 +403,7 @@ public class IngestionPipelineResource extends EntityResource response = airflowRESTClient.getRESTStatus(); + return Response.status(200, response.body()).build(); + } + @DELETE @Path("/{id}") @Operation( diff --git a/openmetadata-airflow-apis/setup.py b/openmetadata-airflow-apis/setup.py index 643f9c7440a..1b85460625a 100644 --- a/openmetadata-airflow-apis/setup.py +++ b/openmetadata-airflow-apis/setup.py @@ -23,14 +23,12 @@ def get_long_description(): base_requirements = { "openmetadata-ingestion[airflow-container]~=0.9", - "PyYAML<6.0", # pycln requires < 6 "pendulum~=2.1.2", "packaging~=21.2", "setuptools~=58.3.0", - "apache-airflow[http,kubernetes]>=1.10.2", + "apache-airflow==2.1.4", "Flask~=1.1.4", - "Flask-Admin", - "pydantic>=1.7.4", + "Flask-Admin==1.6.0", } dev_requirements = { diff --git a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py index b5f138ebbd1..a9635c993e5 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py @@ -10,7 +10,7 @@ # limitations under the License. from typing import Any, Dict, Optional -# TODO DELETE, STATUS (pick it up from airflow directly), LOG (just link v1), ENABLE DAG, DISABLE DAG (play pause) +# TODO LOG (just link v1), ENABLE DAG, DISABLE DAG (play pause) APIS_METADATA = [ { "name": "deploy_dag", @@ -79,6 +79,11 @@ APIS_METADATA = [ }, ], }, + { + "name": "rest_status", + "description": "Get the status of Airflow REST status", + "http_method": "GET", + }, ] diff --git a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py index 64d9f6c615f..d01b1b4bdc7 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py @@ -118,6 +118,8 @@ class REST_API(AppBuilderBaseView): # Deciding which function to use based off the API object that was requested. # Some functions are custom and need to be manually routed to. + if api == "rest_status": + return self.rest_status() if api == "deploy_dag": return self.deploy_dag() if api == "trigger_dag": @@ -133,12 +135,28 @@ class REST_API(AppBuilderBaseView): f"Invalid api param {api}. Expected deploy_dag or trigger_dag." ) + @staticmethod + def rest_status() -> Response: + """ + Check that the Airflow REST is reachable + and running correctly. + """ + try: + url = AIRFLOW_WEBSERVER_BASE_URL + REST_API_ENDPOINT + return ApiResponse.success( + {"message": f"Airflow REST {REST_API_PLUGIN_VERSION} running at {url}"} + ) + except Exception as err: + return ApiResponse.error( + status=ApiResponse.STATUS_SERVER_ERROR, + error=f"Internal error obtaining REST status - {err} - {traceback.format_exc()}", + ) + def deploy_dag(self) -> Response: - """Custom Function for the deploy_dag API + """ + Custom Function for the deploy_dag API Creates workflow dag based on workflow dag file and refreshes the session - args: - workflow_config: the workflow config that defines the dag """ json_request = request.get_json()