diff --git a/catalog-rest-service/pom.xml b/catalog-rest-service/pom.xml
index 8db148f60dc..9d9d970a9a2 100644
--- a/catalog-rest-service/pom.xml
+++ b/catalog-rest-service/pom.xml
@@ -247,7 +247,17 @@
org.mockito
- mockito-all
+ mockito-core
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
+ org.mockito
+ mockito-inline
test
diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java
index 63f80dd3d77..72e7f5ca798 100644
--- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java
+++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java
@@ -13,10 +13,13 @@
package org.openmetadata.catalog.airflow;
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
+import java.util.Map;
import javax.ws.rs.core.Response;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -142,19 +145,8 @@ public class AirflowRESTClient extends PipelineServiceClient {
@Override
public HttpResponse getServiceStatus() {
- HttpResponse response;
try {
- String token = authenticate();
- String authToken = String.format(AUTH_TOKEN, token);
- String statusEndPoint = "%s/rest_api/api?api=rest_status";
- String statusUrl = String.format(statusEndPoint, serviceURL);
- HttpRequest request =
- HttpRequest.newBuilder(URI.create(statusUrl))
- .header(CONTENT_HEADER, CONTENT_TYPE)
- .header(AUTH_HEADER, authToken)
- .GET()
- .build();
- response = client.send(request, HttpResponse.BodyHandlers.ofString());
+ HttpResponse response = requestAuthenticatedForJsonContent("%s/rest_api/api?api=rest_status", serviceURL);
if (response.statusCode() == 200) {
return response;
}
@@ -180,4 +172,32 @@ public class AirflowRESTClient extends PipelineServiceClient {
}
throw new PipelineServiceClientException(String.format("Failed to test connection due to %s", response.body()));
}
+
+ @Override
+ public Map getLastIngestionLogs(String pipelineName) {
+ try {
+ HttpResponse response =
+ requestAuthenticatedForJsonContent("%s/rest_api/api?api=last_dag_logs&dag_id=%s", serviceURL, pipelineName);
+ if (response.statusCode() == 200) {
+ return JsonUtils.readValue(response.body(), new TypeReference<>() {});
+ }
+ } catch (Exception e) {
+ throw new PipelineServiceClientException("Failed to get last ingestion logs.");
+ }
+ throw new PipelineServiceClientException("Failed to get last ingestion logs.");
+ }
+
+ private HttpResponse requestAuthenticatedForJsonContent(String stringUrlFormat, Object... stringReplacement)
+ throws IOException, InterruptedException {
+ String token = authenticate();
+ String authToken = String.format(AUTH_TOKEN, token);
+ String url = String.format(stringUrlFormat, stringReplacement);
+ HttpRequest request =
+ HttpRequest.newBuilder(URI.create(url))
+ .header(CONTENT_HEADER, CONTENT_TYPE)
+ .header(AUTH_HEADER, authToken)
+ .GET()
+ .build();
+ return client.send(request, HttpResponse.BodyHandlers.ofString());
+ }
}
diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java
index a6f7bcbba81..0acdef6e9b4 100644
--- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java
+++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import javax.json.JsonPatch;
import javax.validation.Valid;
import javax.validation.constraints.Max;
@@ -469,7 +470,7 @@ public class IngestionPipelineResource extends EntityResource lastIngestionLogs = pipelineServiceClient.getLastIngestionLogs(id);
+ return Response.ok(lastIngestionLogs, MediaType.APPLICATION_JSON_TYPE).build();
+ }
+
private IngestionPipeline getIngestionPipeline(CreateIngestionPipeline create, String user) throws IOException {
Source source = buildIngestionSource(create);
OpenMetadataServerConnection openMetadataServerConnection =
diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/JsonUtils.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/JsonUtils.java
index 929b8b3a1df..4473a6b58a1 100644
--- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/JsonUtils.java
+++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/JsonUtils.java
@@ -16,6 +16,7 @@ package org.openmetadata.catalog.util;
import static org.openmetadata.catalog.util.RestUtil.DATE_TIME_FORMAT;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@@ -93,6 +94,13 @@ public final class JsonUtils {
return OBJECT_MAPPER.readValue(json, clz);
}
+ public static T readValue(String json, TypeReference valueTypeRef) throws IOException {
+ if (json == null) {
+ return null;
+ }
+ return OBJECT_MAPPER.readValue(json, valueTypeRef);
+ }
+
/** Read an array of objects of type {@code T} from json */
public static List readObjects(String json, Class clz) throws IOException {
if (json == null) {
diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java
index 002d1d2d5bd..95e24daf11d 100644
--- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java
+++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java
@@ -8,6 +8,7 @@ import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
+import java.util.Map;
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.exception.PipelineServiceClientException;
@@ -86,4 +87,7 @@ public abstract class PipelineServiceClient {
/* Get the status of a deployed pipeline */
public abstract IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline);
+
+ /* Get the all last run logs of a deployed pipeline */
+ public abstract Map getLastIngestionLogs(String pipelineName);
}
diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/AirflowRESTClientIntegrationTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/AirflowRESTClientIntegrationTest.java
new file mode 100644
index 00000000000..5c84f45f0d4
--- /dev/null
+++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/AirflowRESTClientIntegrationTest.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2022 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openmetadata.catalog.airflow;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.HashMap;
+import java.util.Map;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.openmetadata.catalog.exception.PipelineServiceClientException;
+
+@ExtendWith(MockitoExtension.class)
+public class AirflowRESTClientIntegrationTest {
+
+ private static final String DAG_NAME = "test_dag";
+ private static final String URI_TO_HANDLE_REQUEST = "/";
+
+ @RegisterExtension private static final HttpServerExtension httpServerExtension = new HttpServerExtension();
+
+ AirflowRESTClient airflowRESTClient;
+
+ @BeforeEach
+ void setUp() {
+ AirflowConfiguration airflowConfiguration = createDefaultAirflowConfiguration();
+ airflowRESTClient = new AirflowRESTClient(airflowConfiguration);
+ httpServerExtension.unregisterHandler();
+ }
+
+ @Test
+ public void testLastIngestionLogsAreRetrievedWhenStatusCodesAre200() {
+ Map expectedMap = Map.of("key1", "value1", "key2", "value2");
+
+ registerMockedEndpoints(200, 200);
+
+ assertEquals(expectedMap, airflowRESTClient.getLastIngestionLogs(DAG_NAME));
+ }
+
+ @Test
+ public void testLastIngestionLogsExceptionWhenLoginFails() {
+ registerMockedEndpoints(404, 200);
+
+ Exception exception =
+ assertThrows(PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(DAG_NAME));
+
+ String expectedMessage = "Failed to get last ingestion logs.";
+ String actualMessage = exception.getMessage();
+
+ assertEquals(expectedMessage, actualMessage);
+ }
+
+ @Test
+ public void testLastIngestionLogsExceptionWhenStatusCode404() {
+ registerMockedEndpoints(200, 404);
+
+ Exception exception =
+ assertThrows(PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(DAG_NAME));
+
+ String expectedMessage = "Failed to get last ingestion logs.";
+ String actualMessage = exception.getMessage();
+
+ assertEquals(expectedMessage, actualMessage);
+ }
+
+ @SneakyThrows
+ private AirflowConfiguration createDefaultAirflowConfiguration() {
+ AirflowConfiguration airflowConfiguration = new AirflowConfiguration();
+ airflowConfiguration.setApiEndpoint(HttpServerExtension.getUriFor("").toString());
+ airflowConfiguration.setUsername("user");
+ airflowConfiguration.setPassword("pass");
+ airflowConfiguration.setTimeout(60);
+ return airflowConfiguration;
+ }
+
+ private void registerMockedEndpoints(int loginStatusCode, int lastDagLogStatusCode) {
+ String jsonResponse = "{ \"key1\": \"value1\", \"key2\": \"value2\" }";
+
+ Map pathResponses = new HashMap<>();
+ pathResponses.put(
+ "/rest_api/api?api=last_dag_logs&dag_id=" + DAG_NAME,
+ new MockResponse(jsonResponse, "application/json", lastDagLogStatusCode));
+ pathResponses.put("/api/v1/security/login", new MockResponse("{}", "application/json", loginStatusCode));
+
+ httpServerExtension.registerHandler(URI_TO_HANDLE_REQUEST, new JsonHandler(pathResponses));
+ }
+}
diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/HttpServerExtension.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/HttpServerExtension.java
new file mode 100644
index 00000000000..b0420b0466f
--- /dev/null
+++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/HttpServerExtension.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2022 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openmetadata.catalog.airflow;
+
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.http.client.utils.URIBuilder;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class HttpServerExtension implements BeforeAllCallback, AfterAllCallback {
+
+ private static final int PORT;
+
+ static {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ socket.setReuseAddress(true);
+ PORT = socket.getLocalPort();
+ } catch (IOException ex) {
+ throw new RuntimeException("Could not find a free port for testing");
+ }
+ }
+
+ private static final String HOST = "localhost";
+ private static final String SCHEME = "http";
+ private static final String DEFAULT_CONTEXT = "/";
+
+ private com.sun.net.httpserver.HttpServer server;
+
+ @Override
+ public void afterAll(ExtensionContext extensionContext) throws Exception {
+ if (server != null) {
+ server.stop(0);
+ }
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext extensionContext) throws Exception {
+ server = HttpServer.create(new InetSocketAddress(PORT), 0);
+ server.setExecutor(null);
+ server.start();
+ server.createContext(DEFAULT_CONTEXT);
+ }
+
+ public static URI getUriFor(String path) throws URISyntaxException {
+ return new URIBuilder().setScheme(SCHEME).setHost(HOST).setPort(PORT).setPath(path).build();
+ }
+
+ public void registerHandler(String uriToHandle, HttpHandler httpHandler) {
+ server.createContext(uriToHandle, httpHandler);
+ }
+
+ public void unregisterHandler() {
+ server.removeContext(DEFAULT_CONTEXT);
+ }
+}
diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/JsonHandler.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/JsonHandler.java
new file mode 100644
index 00000000000..4e7643f08c2
--- /dev/null
+++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/JsonHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2022 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openmetadata.catalog.airflow;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
+
+class JsonHandler implements HttpHandler {
+
+ Map pathResponses;
+
+ public JsonHandler(Map pathResponses) {
+ this.pathResponses = pathResponses;
+ }
+
+ @Override
+ public void handle(HttpExchange exchange) throws IOException {
+ MockResponse response = pathResponses.get(exchange.getRequestURI().toString());
+ exchange.getResponseHeaders().add("Content-Type", response.getContentType());
+ exchange.sendResponseHeaders(response.getStatusCode(), response.getBody().length());
+ IOUtils.write(response.getBody(), exchange.getResponseBody(), Charset.defaultCharset());
+ exchange.close();
+ }
+}
diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/MockResponse.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/MockResponse.java
new file mode 100644
index 00000000000..0ed34c7ee6e
--- /dev/null
+++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/MockResponse.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2022 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openmetadata.catalog.airflow;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@Getter
+@AllArgsConstructor
+public class MockResponse {
+
+ private final String body;
+ private final String contentType;
+ private final int statusCode;
+}
diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTest.java
new file mode 100644
index 00000000000..097bb2913f6
--- /dev/null
+++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTest.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2022 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openmetadata.catalog.resources.services.ingestionpipelines;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.ws.rs.core.SecurityContext;
+import javax.ws.rs.core.UriInfo;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedConstruction.Context;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.openmetadata.catalog.CatalogApplicationConfig;
+import org.openmetadata.catalog.airflow.AirflowRESTClient;
+import org.openmetadata.catalog.jdbi3.CollectionDAO;
+import org.openmetadata.catalog.security.Authorizer;
+import org.openmetadata.catalog.util.PipelineServiceClient;
+
+@ExtendWith(MockitoExtension.class)
+public class IngestionPipelineResourceUnitTest {
+
+ private static final String DAG_NAME = "test_dag";
+
+ private IngestionPipelineResource ingestionPipelineResource;
+
+ @Mock UriInfo uriInfo;
+
+ @Mock SecurityContext securityContext;
+
+ @Mock Authorizer authorizer;
+
+ @Mock CollectionDAO collectionDAO;
+
+ @Spy CollectionDAO.IngestionPipelineDAO ingestionPipelineDAO;
+
+ @Mock CatalogApplicationConfig catalogApplicationConfig;
+
+ @BeforeEach
+ void setUp() {
+ doReturn(ingestionPipelineDAO).when(collectionDAO).ingestionPipelineDAO();
+ ingestionPipelineResource = new IngestionPipelineResource(collectionDAO, authorizer);
+ }
+
+ @Test
+ public void testLastIngestionLogsAreRetrievedWhen() throws IOException {
+ Map expectedMap = Map.of("task", "log");
+ try (MockedConstruction mocked =
+ mockConstruction(AirflowRESTClient.class, this::preparePipelineServiceClient)) {
+ ingestionPipelineResource.initialize(catalogApplicationConfig);
+ assertEquals(
+ expectedMap, ingestionPipelineResource.getLastIngestionLogs(uriInfo, securityContext, DAG_NAME).getEntity());
+ PipelineServiceClient client = mocked.constructed().get(0);
+ verify(client).getLastIngestionLogs(DAG_NAME);
+ }
+ }
+
+ private void preparePipelineServiceClient(AirflowRESTClient mockPipelineServiceClient, Context context) {
+ doReturn(Map.of("task", "log")).when(mockPipelineServiceClient).getLastIngestionLogs(anyString());
+ }
+}
diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/JsonUtilsTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/JsonUtilsTest.java
index 8d6437e274e..3a39f028053 100644
--- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/JsonUtilsTest.java
+++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/JsonUtilsTest.java
@@ -17,7 +17,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
+import java.util.Map;
import java.util.UUID;
import javax.json.Json;
import javax.json.JsonArrayBuilder;
@@ -97,4 +99,12 @@ class JsonUtilsTest {
assertThrows(JsonException.class, () -> JsonUtils.applyPatch(original, jsonPatchBuilder2.build(), Team.class));
assertTrue(jsonException.getMessage().contains("contains no element for index 3"));
}
+
+ @Test
+ void testReadValuePassingTypeReference() throws IOException {
+ Map expectedMap = Map.of("key1", "value1", "key2", "value2");
+ String json = "{ \"key1\": \"value1\", \"key2\": \"value2\" }";
+ TypeReference
+
+ org.mockito
+ mockito-inline
+ ${mockito.version}
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ ${mockito.version}
+ test
+
com.amazon.redshift
redshift-jdbc42