WIP - Fix #6582 - Validate Airflow REST versions on health endpoint (#6836)

WIP - Fix #6582 - Validate Airflow REST versions on health endpoint (#6836)
This commit is contained in:
Pere Miquel Brull 2022-08-23 08:59:44 +02:00 committed by GitHub
parent 102ea93e83
commit a975d5177d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 211 additions and 10 deletions

View File

@ -165,12 +165,28 @@ public class AirflowRESTClient extends PipelineServiceClient {
}
@Override
public HttpResponse<String> getServiceStatus() {
public Response getServiceStatus() {
HttpResponse<String> response;
try {
response = getRequestNoAuthForJsonContent("%s/%s/health", serviceURL, API_ENDPOINT);
if (response.statusCode() == 200) {
return response;
JSONObject responseJSON = new JSONObject(response.body());
String ingestionVersion = responseJSON.getString("version");
if (Boolean.TRUE.equals(validServerClientVersions(ingestionVersion))) {
Map<String, String> status = Map.of("status", "healthy");
return Response.status(200, status.toString()).build();
} else {
Map<String, String> status =
Map.of(
"status",
"unhealthy",
"reason",
String.format(
"Got Ingestion Version %s and Server Version %s. They should match.",
ingestionVersion, SERVER_VERSION));
return Response.status(500, status.toString()).build();
}
}
} catch (Exception e) {
throw PipelineServiceClientException.byMessage("Failed to get REST status.", e.getMessage());

View File

@ -0,0 +1,42 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.exception;
import javax.ws.rs.core.Response;
public class PipelineServiceVersionException extends WebServiceException {
private static final String BY_NAME_MESSAGE = "Pipeline Service Version mismatch due to [%s].";
public PipelineServiceVersionException(String message) {
super(Response.Status.INTERNAL_SERVER_ERROR, message);
}
private PipelineServiceVersionException(Response.Status status, String message) {
super(status, message);
}
public static PipelineServiceVersionException byMessage(String name, String errorMessage, Response.Status status) {
return new PipelineServiceVersionException(status, buildMessageByName(name, errorMessage));
}
public static PipelineServiceVersionException byMessage(String name, String errorMessage) {
return new PipelineServiceVersionException(
Response.Status.INTERNAL_SERVER_ERROR, buildMessageByName(name, errorMessage));
}
public static String buildMessageByName(String name, String errorMessage) {
return String.format(BY_NAME_MESSAGE, name, errorMessage);
}
}

View File

@ -515,8 +515,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
content = @Content(mediaType = "application/json"))
})
public Response getRESTStatus(@Context UriInfo uriInfo, @Context SecurityContext securityContext) {
HttpResponse<String> response = pipelineServiceClient.getServiceStatus();
return Response.status(200, response.body()).build();
return pipelineServiceClient.getServiceStatus();
}
@DELETE

View File

@ -1,6 +1,7 @@
package org.openmetadata.catalog.util;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
@ -10,9 +11,14 @@ import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Base64;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import javax.ws.rs.core.Response;
import org.openmetadata.catalog.CatalogApplication;
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.exception.PipelineServiceClientException;
import org.openmetadata.catalog.exception.PipelineServiceVersionException;
/**
* Client to make API calls to add, deleted, and deploy pipelines on a PipelineService, such as Airflow. Core
@ -35,6 +41,18 @@ public abstract class PipelineServiceClient {
protected static final String CONTENT_HEADER = "Content-Type";
protected static final String CONTENT_TYPE = "application/json";
public static final String SERVER_VERSION;
static {
String rawServerVersion;
try {
rawServerVersion = getServerVersion();
} catch (IOException e) {
rawServerVersion = "unknown";
}
SERVER_VERSION = rawServerVersion;
}
public PipelineServiceClient(String userName, String password, String apiEndpoint, int apiTimeout) {
try {
this.serviceURL = new URL(apiEndpoint);
@ -71,8 +89,34 @@ public abstract class PipelineServiceClient {
return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
}
public static String getServerVersion() throws IOException {
InputStream fileInput = CatalogApplication.class.getResourceAsStream("/catalog/VERSION");
Properties props = new Properties();
props.load(fileInput);
return props.getProperty("version", "unknown");
}
public final String getVersionFromString(String version) {
if (version != null) {
return Pattern.compile("(\\d+.\\d+.\\d+)")
.matcher(version)
.results()
.map(m -> m.group(1))
.findFirst()
.orElseThrow(
() ->
new PipelineServiceVersionException(String.format("Cannot extract version x.y.z from %s", version)));
} else {
throw new PipelineServiceVersionException("Received version as null");
}
}
public final Boolean validServerClientVersions(String clientVersion) {
return getVersionFromString(clientVersion).equals(getVersionFromString(SERVER_VERSION));
}
/* Check the status of pipeline service to ensure it is healthy */
public abstract HttpResponse<String> getServiceStatus();
public abstract Response getServiceStatus();
/* Test the connection to the service such as database service a pipeline depends on. */
public abstract HttpResponse<String> testConnection(TestServiceConnection testServiceConnection);

View File

@ -0,0 +1,60 @@
package org.openmetadata.catalog.pipelineService;
import java.net.http.HttpResponse;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.util.PipelineServiceClient;
public class MockPipelineServiceClient extends PipelineServiceClient {
public MockPipelineServiceClient(String userName, String password, String apiEndpoint, int apiTimeout) {
super(userName, password, apiEndpoint, apiTimeout);
}
@Override
public Response getServiceStatus() {
return null;
}
@Override
public HttpResponse<String> testConnection(TestServiceConnection testServiceConnection) {
return null;
}
@Override
public String deployPipeline(IngestionPipeline ingestionPipeline) {
return null;
}
@Override
public String runPipeline(String pipelineName) {
return null;
}
@Override
public String deletePipeline(String pipelineName) {
return null;
}
@Override
public IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline) {
return null;
}
@Override
public IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline) {
return null;
}
@Override
public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline) {
return null;
}
@Override
public HttpResponse<String> killIngestion(IngestionPipeline ingestionPipeline) {
return null;
}
}

View File

@ -0,0 +1,31 @@
package org.openmetadata.catalog.pipelineService;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.junit.Test;
import org.openmetadata.catalog.exception.PipelineServiceVersionException;
public class PipelineServiceClientTest {
MockPipelineServiceClient mockPipelineServiceClient =
new MockPipelineServiceClient("user", "password", "https://endpoint.com", 10);
@Test
public void testGetVersionFromString() {
String version = mockPipelineServiceClient.getVersionFromString("0.12.0.dev0");
assertEquals("0.12.0", version);
}
@Test
public void testGetVersionFromStringRaises() {
Exception exception =
assertThrows(
PipelineServiceVersionException.class, () -> mockPipelineServiceClient.getVersionFromString("random"));
String expectedMessage = "Cannot extract version x.y.z from random";
String actualMessage = exception.getMessage();
assertEquals(expectedMessage, actualMessage);
}
}

View File

@ -10,7 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.airflow;
package org.openmetadata.catalog.pipelineService.airflow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -26,6 +26,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.catalog.airflow.AirflowConfiguration;
import org.openmetadata.catalog.airflow.AirflowRESTClient;
import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;

View File

@ -10,7 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.airflow;
package org.openmetadata.catalog.pipelineService.airflow;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;

View File

@ -10,7 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.airflow;
package org.openmetadata.catalog.pipelineService.airflow;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

View File

@ -10,7 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.airflow;
package org.openmetadata.catalog.pipelineService.airflow;
import lombok.AllArgsConstructor;
import lombok.Getter;

View File

@ -13,6 +13,11 @@ Health endpoint. Globally accessible
"""
import traceback
try:
from importlib.metadata import version
except ImportError:
from importlib_metadata import version
from airflow.www.app import csrf
from openmetadata_managed_apis.api.app import blueprint
from openmetadata_managed_apis.api.response import ApiResponse
@ -26,7 +31,9 @@ def health():
"""
try:
return ApiResponse.success({"status": "healthy"})
return ApiResponse.success(
{"status": "healthy", "version": version("openmetadata-ingestion")}
)
except Exception as err:
return ApiResponse.error(
status=ApiResponse.STATUS_SERVER_ERROR,