diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java index 505192af4da..b56647486e8 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java @@ -165,12 +165,28 @@ public class AirflowRESTClient extends PipelineServiceClient { } @Override - public HttpResponse getServiceStatus() { + public Response getServiceStatus() { HttpResponse response; try { response = getRequestNoAuthForJsonContent("%s/%s/health", serviceURL, API_ENDPOINT); if (response.statusCode() == 200) { - return response; + JSONObject responseJSON = new JSONObject(response.body()); + String ingestionVersion = responseJSON.getString("version"); + + if (Boolean.TRUE.equals(validServerClientVersions(ingestionVersion))) { + Map status = Map.of("status", "healthy"); + return Response.status(200, status.toString()).build(); + } else { + Map status = + Map.of( + "status", + "unhealthy", + "reason", + String.format( + "Got Ingestion Version %s and Server Version %s. They should match.", + ingestionVersion, SERVER_VERSION)); + return Response.status(500, status.toString()).build(); + } } } catch (Exception e) { throw PipelineServiceClientException.byMessage("Failed to get REST status.", e.getMessage()); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/exception/PipelineServiceVersionException.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/exception/PipelineServiceVersionException.java new file mode 100644 index 00000000000..fa750adee7d --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/exception/PipelineServiceVersionException.java @@ -0,0 +1,42 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.exception; + +import javax.ws.rs.core.Response; + +public class PipelineServiceVersionException extends WebServiceException { + + private static final String BY_NAME_MESSAGE = "Pipeline Service Version mismatch due to [%s]."; + + public PipelineServiceVersionException(String message) { + super(Response.Status.INTERNAL_SERVER_ERROR, message); + } + + private PipelineServiceVersionException(Response.Status status, String message) { + super(status, message); + } + + public static PipelineServiceVersionException byMessage(String name, String errorMessage, Response.Status status) { + return new PipelineServiceVersionException(status, buildMessageByName(name, errorMessage)); + } + + public static PipelineServiceVersionException byMessage(String name, String errorMessage) { + return new PipelineServiceVersionException( + Response.Status.INTERNAL_SERVER_ERROR, buildMessageByName(name, errorMessage)); + } + + public static String buildMessageByName(String name, String errorMessage) { + return String.format(BY_NAME_MESSAGE, name, errorMessage); + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java index 5cadba428dd..241aa920471 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java @@ -515,8 +515,7 @@ public class IngestionPipelineResource extends EntityResource response = pipelineServiceClient.getServiceStatus(); - return Response.status(200, response.body()).build(); + return pipelineServiceClient.getServiceStatus(); } @DELETE diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java index 9738c01c290..91e3d7f81b5 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java @@ -1,6 +1,7 @@ package org.openmetadata.catalog.util; import java.io.IOException; +import java.io.InputStream; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; @@ -10,9 +11,14 @@ import java.net.http.HttpResponse; import java.time.Duration; import java.util.Base64; import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; +import javax.ws.rs.core.Response; +import org.openmetadata.catalog.CatalogApplication; import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection; import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.catalog.exception.PipelineServiceClientException; +import org.openmetadata.catalog.exception.PipelineServiceVersionException; /** * Client to make API calls to add, deleted, and deploy pipelines on a PipelineService, such as Airflow. Core @@ -35,6 +41,18 @@ public abstract class PipelineServiceClient { protected static final String CONTENT_HEADER = "Content-Type"; protected static final String CONTENT_TYPE = "application/json"; + public static final String SERVER_VERSION; + + static { + String rawServerVersion; + try { + rawServerVersion = getServerVersion(); + } catch (IOException e) { + rawServerVersion = "unknown"; + } + SERVER_VERSION = rawServerVersion; + } + public PipelineServiceClient(String userName, String password, String apiEndpoint, int apiTimeout) { try { this.serviceURL = new URL(apiEndpoint); @@ -71,8 +89,34 @@ public abstract class PipelineServiceClient { return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); } + public static String getServerVersion() throws IOException { + InputStream fileInput = CatalogApplication.class.getResourceAsStream("/catalog/VERSION"); + Properties props = new Properties(); + props.load(fileInput); + return props.getProperty("version", "unknown"); + } + + public final String getVersionFromString(String version) { + if (version != null) { + return Pattern.compile("(\\d+.\\d+.\\d+)") + .matcher(version) + .results() + .map(m -> m.group(1)) + .findFirst() + .orElseThrow( + () -> + new PipelineServiceVersionException(String.format("Cannot extract version x.y.z from %s", version))); + } else { + throw new PipelineServiceVersionException("Received version as null"); + } + } + + public final Boolean validServerClientVersions(String clientVersion) { + return getVersionFromString(clientVersion).equals(getVersionFromString(SERVER_VERSION)); + } + /* Check the status of pipeline service to ensure it is healthy */ - public abstract HttpResponse getServiceStatus(); + public abstract Response getServiceStatus(); /* Test the connection to the service such as database service a pipeline depends on. */ public abstract HttpResponse testConnection(TestServiceConnection testServiceConnection); diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/MockPipelineServiceClient.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/MockPipelineServiceClient.java new file mode 100644 index 00000000000..1f06a33be10 --- /dev/null +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/MockPipelineServiceClient.java @@ -0,0 +1,60 @@ +package org.openmetadata.catalog.pipelineService; + +import java.net.http.HttpResponse; +import java.util.Map; +import javax.ws.rs.core.Response; +import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection; +import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.catalog.util.PipelineServiceClient; + +public class MockPipelineServiceClient extends PipelineServiceClient { + + public MockPipelineServiceClient(String userName, String password, String apiEndpoint, int apiTimeout) { + super(userName, password, apiEndpoint, apiTimeout); + } + + @Override + public Response getServiceStatus() { + return null; + } + + @Override + public HttpResponse testConnection(TestServiceConnection testServiceConnection) { + return null; + } + + @Override + public String deployPipeline(IngestionPipeline ingestionPipeline) { + return null; + } + + @Override + public String runPipeline(String pipelineName) { + return null; + } + + @Override + public String deletePipeline(String pipelineName) { + return null; + } + + @Override + public IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline) { + return null; + } + + @Override + public IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline) { + return null; + } + + @Override + public Map getLastIngestionLogs(IngestionPipeline ingestionPipeline) { + return null; + } + + @Override + public HttpResponse killIngestion(IngestionPipeline ingestionPipeline) { + return null; + } +} diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/PipelineServiceClientTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/PipelineServiceClientTest.java new file mode 100644 index 00000000000..478edba6f10 --- /dev/null +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/PipelineServiceClientTest.java @@ -0,0 +1,31 @@ +package org.openmetadata.catalog.pipelineService; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.Test; +import org.openmetadata.catalog.exception.PipelineServiceVersionException; + +public class PipelineServiceClientTest { + + MockPipelineServiceClient mockPipelineServiceClient = + new MockPipelineServiceClient("user", "password", "https://endpoint.com", 10); + + @Test + public void testGetVersionFromString() { + String version = mockPipelineServiceClient.getVersionFromString("0.12.0.dev0"); + assertEquals("0.12.0", version); + } + + @Test + public void testGetVersionFromStringRaises() { + Exception exception = + assertThrows( + PipelineServiceVersionException.class, () -> mockPipelineServiceClient.getVersionFromString("random")); + + String expectedMessage = "Cannot extract version x.y.z from random"; + String actualMessage = exception.getMessage(); + + assertEquals(expectedMessage, actualMessage); + } +} diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/AirflowRESTClientIntegrationTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/AirflowRESTClientIntegrationTest.java similarity index 96% rename from catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/AirflowRESTClientIntegrationTest.java rename to catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/AirflowRESTClientIntegrationTest.java index 69a7da74af9..95243975ebd 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/AirflowRESTClientIntegrationTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/AirflowRESTClientIntegrationTest.java @@ -10,7 +10,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.openmetadata.catalog.airflow; +package org.openmetadata.catalog.pipelineService.airflow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -26,6 +26,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.junit.jupiter.MockitoExtension; +import org.openmetadata.catalog.airflow.AirflowConfiguration; +import org.openmetadata.catalog.airflow.AirflowRESTClient; import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig; import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType; diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/HttpServerExtension.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/HttpServerExtension.java similarity index 97% rename from catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/HttpServerExtension.java rename to catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/HttpServerExtension.java index b0420b0466f..edf737f3a1c 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/HttpServerExtension.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/HttpServerExtension.java @@ -10,7 +10,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.openmetadata.catalog.airflow; +package org.openmetadata.catalog.pipelineService.airflow; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/JsonHandler.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/JsonHandler.java similarity index 96% rename from catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/JsonHandler.java rename to catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/JsonHandler.java index 4e7643f08c2..9db563c7d71 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/JsonHandler.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/JsonHandler.java @@ -10,7 +10,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.openmetadata.catalog.airflow; +package org.openmetadata.catalog.pipelineService.airflow; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/MockResponse.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/MockResponse.java similarity index 93% rename from catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/MockResponse.java rename to catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/MockResponse.java index 0ed34c7ee6e..5e0e3ad9020 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/MockResponse.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/airflow/MockResponse.java @@ -10,7 +10,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.openmetadata.catalog.airflow; +package org.openmetadata.catalog.pipelineService.airflow; import lombok.AllArgsConstructor; import lombok.Getter; diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/health.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/health.py index 172496e731b..0ef32f3a4d7 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/health.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/health.py @@ -13,6 +13,11 @@ Health endpoint. Globally accessible """ import traceback +try: + from importlib.metadata import version +except ImportError: + from importlib_metadata import version + from airflow.www.app import csrf from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse @@ -26,7 +31,9 @@ def health(): """ try: - return ApiResponse.success({"status": "healthy"}) + return ApiResponse.success( + {"status": "healthy", "version": version("openmetadata-ingestion")} + ) except Exception as err: return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR,