Fix #4589 - Check Airflow REST status; Fix #4525 - Clean REST deps (#4606)

* Add rest status

* Check REST status

* Clean REST deps
This commit is contained in:
Pere Miquel Brull 2022-05-02 12:27:20 +02:00 committed by GitHub
parent 937f6153b9
commit 3f8f1d1c05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 70 additions and 11 deletions

View File

@ -189,6 +189,29 @@ public class AirflowRESTClient {
Response.Status.fromStatusCode(response.statusCode()));
}
public HttpResponse<String> getRESTStatus() {
HttpResponse<String> response;
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
String statusEndPoint = "%s/rest_api/api?api=rest_status";
String statusUrl = String.format(statusEndPoint, airflowURL);
HttpRequest request =
HttpRequest.newBuilder(URI.create(statusUrl))
.header(CONTENT_HEADER, CONTENT_TYPE)
.header(AUTH_HEADER, authToken)
.GET()
.build();
response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
return response;
}
} catch (Exception e) {
throw AirflowException.byMessage("Failed to get REST status.", e.getMessage());
}
throw new AirflowException(String.format("Failed to get REST status due to %s", response.body()));
}
public HttpResponse<String> testConnection(TestServiceConnection testServiceConnection) {
HttpResponse<String> response;
try {

View File

@ -403,9 +403,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
@ApiResponse(
responseCode = "200",
description = "The ingestion",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = IngestionPipeline.class))),
@ApiResponse(responseCode = "404", description = "Ingestion for instance {name} is not found")
content = @Content(mediaType = "application/json"))
})
public Response testIngestion(
@Context UriInfo uriInfo,
@ -415,6 +413,23 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
return Response.status(200, response.body()).build();
}
@GET
@Path("/status")
@Operation(
summary = "Check the Airflow REST status",
tags = "IngestionPipelines",
description = "Check that the Airflow REST endpoint is reachable and up and running",
responses = {
@ApiResponse(
responseCode = "200",
description = "Status message",
content = @Content(mediaType = "application/json"))
})
public Response getRESTStatus(@Context UriInfo uriInfo, @Context SecurityContext securityContext) {
HttpResponse<String> response = airflowRESTClient.getRESTStatus();
return Response.status(200, response.body()).build();
}
@DELETE
@Path("/{id}")
@Operation(

View File

@ -23,14 +23,12 @@ def get_long_description():
base_requirements = {
"openmetadata-ingestion[airflow-container]~=0.9",
"PyYAML<6.0", # pycln requires < 6
"pendulum~=2.1.2",
"packaging~=21.2",
"setuptools~=58.3.0",
"apache-airflow[http,kubernetes]>=1.10.2",
"apache-airflow==2.1.4",
"Flask~=1.1.4",
"Flask-Admin",
"pydantic>=1.7.4",
"Flask-Admin==1.6.0",
}
dev_requirements = {

View File

@ -10,7 +10,7 @@
# limitations under the License.
from typing import Any, Dict, Optional
# TODO DELETE, STATUS (pick it up from airflow directly), LOG (just link v1), ENABLE DAG, DISABLE DAG (play pause)
# TODO LOG (just link v1), ENABLE DAG, DISABLE DAG (play pause)
APIS_METADATA = [
{
"name": "deploy_dag",
@ -79,6 +79,11 @@ APIS_METADATA = [
},
],
},
{
"name": "rest_status",
"description": "Get the status of Airflow REST status",
"http_method": "GET",
},
]

View File

@ -118,6 +118,8 @@ class REST_API(AppBuilderBaseView):
# Deciding which function to use based off the API object that was requested.
# Some functions are custom and need to be manually routed to.
if api == "rest_status":
return self.rest_status()
if api == "deploy_dag":
return self.deploy_dag()
if api == "trigger_dag":
@ -133,12 +135,28 @@ class REST_API(AppBuilderBaseView):
f"Invalid api param {api}. Expected deploy_dag or trigger_dag."
)
@staticmethod
def rest_status() -> Response:
"""
Check that the Airflow REST is reachable
and running correctly.
"""
try:
url = AIRFLOW_WEBSERVER_BASE_URL + REST_API_ENDPOINT
return ApiResponse.success(
{"message": f"Airflow REST {REST_API_PLUGIN_VERSION} running at {url}"}
)
except Exception as err:
return ApiResponse.error(
status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Internal error obtaining REST status - {err} - {traceback.format_exc()}",
)
def deploy_dag(self) -> Response:
"""Custom Function for the deploy_dag API
"""
Custom Function for the deploy_dag API
Creates workflow dag based on workflow dag file and refreshes
the session
args:
workflow_config: the workflow config that defines the dag
"""
json_request = request.get_json()