mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-18 22:18:23 +00:00
* Add health endpoint * Replace rest_status for health * Update message
This commit is contained in:
parent
ee21d6b10e
commit
36cd2933ae
@ -188,7 +188,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
|||||||
@Override
|
@Override
|
||||||
public HttpResponse<String> getServiceStatus() {
|
public HttpResponse<String> getServiceStatus() {
|
||||||
try {
|
try {
|
||||||
HttpResponse<String> response = requestAuthenticatedForJsonContent("%s/rest_api/api?api=rest_status", serviceURL);
|
HttpResponse<String> response = requestNoAuthForJsonContent("%s/rest_api/health", serviceURL);
|
||||||
if (response.statusCode() == 200) {
|
if (response.statusCode() == 200) {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
@ -243,4 +243,11 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
|||||||
.build();
|
.build();
|
||||||
return client.send(request, HttpResponse.BodyHandlers.ofString());
|
return client.send(request, HttpResponse.BodyHandlers.ofString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private HttpResponse<String> requestNoAuthForJsonContent(String stringUrlFormat, Object... stringReplacement)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
String url = String.format(stringUrlFormat, stringReplacement);
|
||||||
|
HttpRequest request = HttpRequest.newBuilder(URI.create(url)).header(CONTENT_HEADER, CONTENT_TYPE).GET().build();
|
||||||
|
return client.send(request, HttpResponse.BodyHandlers.ofString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -92,11 +92,6 @@ APIS_METADATA = [
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"name": "rest_status",
|
|
||||||
"description": "Get the status of Airflow REST status",
|
|
||||||
"http_method": "GET",
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"name": "enable_dag",
|
"name": "enable_dag",
|
||||||
"description": "Mark the DAG as enabled to run on the next schedule.",
|
"description": "Mark the DAG as enabled to run on the next schedule.",
|
||||||
|
@ -93,6 +93,21 @@ class REST_API(AppBuilderBaseView):
|
|||||||
rbac_authentication_enabled=True,
|
rbac_authentication_enabled=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@csrf.exempt # Exempt the CSRF token
|
||||||
|
@app_builder_expose("/health", methods=["GET"]) # for Flask AppBuilder
|
||||||
|
def health(self):
|
||||||
|
"""
|
||||||
|
/health endpoint to check Airflow REST status without auth
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
return ApiResponse.success({"status": "healthy"})
|
||||||
|
except Exception as err:
|
||||||
|
return ApiResponse.error(
|
||||||
|
status=ApiResponse.STATUS_SERVER_ERROR,
|
||||||
|
error=f"Internal error obtaining REST status - {err} - {traceback.format_exc()}",
|
||||||
|
)
|
||||||
|
|
||||||
# '/api' REST Endpoint where API requests should all come in
|
# '/api' REST Endpoint where API requests should all come in
|
||||||
@csrf.exempt # Exempt the CSRF token
|
@csrf.exempt # Exempt the CSRF token
|
||||||
@admin_expose("/api", methods=["GET", "POST", "DELETE"]) # for Flask Admin
|
@admin_expose("/api", methods=["GET", "POST", "DELETE"]) # for Flask Admin
|
||||||
@ -119,8 +134,6 @@ class REST_API(AppBuilderBaseView):
|
|||||||
|
|
||||||
# Deciding which function to use based off the API object that was requested.
|
# Deciding which function to use based off the API object that was requested.
|
||||||
# Some functions are custom and need to be manually routed to.
|
# Some functions are custom and need to be manually routed to.
|
||||||
if api == "rest_status":
|
|
||||||
return self.rest_status()
|
|
||||||
if api == "deploy_dag":
|
if api == "deploy_dag":
|
||||||
return self.deploy_dag()
|
return self.deploy_dag()
|
||||||
if api == "trigger_dag":
|
if api == "trigger_dag":
|
||||||
@ -142,23 +155,6 @@ class REST_API(AppBuilderBaseView):
|
|||||||
f"Invalid api param {api}. Expected deploy_dag or trigger_dag."
|
f"Invalid api param {api}. Expected deploy_dag or trigger_dag."
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def rest_status() -> Response:
|
|
||||||
"""
|
|
||||||
Check that the Airflow REST is reachable
|
|
||||||
and running correctly.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
url = AIRFLOW_WEBSERVER_BASE_URL + REST_API_ENDPOINT
|
|
||||||
return ApiResponse.success(
|
|
||||||
{"message": f"Airflow REST {REST_API_PLUGIN_VERSION} running at {url}"}
|
|
||||||
)
|
|
||||||
except Exception as err:
|
|
||||||
return ApiResponse.error(
|
|
||||||
status=ApiResponse.STATUS_SERVER_ERROR,
|
|
||||||
error=f"Internal error obtaining REST status - {err} - {traceback.format_exc()}",
|
|
||||||
)
|
|
||||||
|
|
||||||
def deploy_dag(self) -> Response:
|
def deploy_dag(self) -> Response:
|
||||||
"""
|
"""
|
||||||
Custom Function for the deploy_dag API
|
Custom Function for the deploy_dag API
|
||||||
|
Loading…
x
Reference in New Issue
Block a user