mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-02 05:33:49 +00:00
parent
bbe5cfa9e7
commit
628296d294
@ -247,7 +247,17 @@
|
|||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mockito</groupId>
|
<groupId>org.mockito</groupId>
|
||||||
<artifactId>mockito-all</artifactId>
|
<artifactId>mockito-core</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-junit-jupiter</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-inline</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -13,10 +13,13 @@
|
|||||||
|
|
||||||
package org.openmetadata.catalog.airflow;
|
package org.openmetadata.catalog.airflow;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.http.HttpRequest;
|
import java.net.http.HttpRequest;
|
||||||
import java.net.http.HttpResponse;
|
import java.net.http.HttpResponse;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -142,19 +145,8 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HttpResponse<String> getServiceStatus() {
|
public HttpResponse<String> getServiceStatus() {
|
||||||
HttpResponse<String> response;
|
|
||||||
try {
|
try {
|
||||||
String token = authenticate();
|
HttpResponse<String> response = requestAuthenticatedForJsonContent("%s/rest_api/api?api=rest_status", serviceURL);
|
||||||
String authToken = String.format(AUTH_TOKEN, token);
|
|
||||||
String statusEndPoint = "%s/rest_api/api?api=rest_status";
|
|
||||||
String statusUrl = String.format(statusEndPoint, serviceURL);
|
|
||||||
HttpRequest request =
|
|
||||||
HttpRequest.newBuilder(URI.create(statusUrl))
|
|
||||||
.header(CONTENT_HEADER, CONTENT_TYPE)
|
|
||||||
.header(AUTH_HEADER, authToken)
|
|
||||||
.GET()
|
|
||||||
.build();
|
|
||||||
response = client.send(request, HttpResponse.BodyHandlers.ofString());
|
|
||||||
if (response.statusCode() == 200) {
|
if (response.statusCode() == 200) {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
@ -180,4 +172,32 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
|||||||
}
|
}
|
||||||
throw new PipelineServiceClientException(String.format("Failed to test connection due to %s", response.body()));
|
throw new PipelineServiceClientException(String.format("Failed to test connection due to %s", response.body()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> getLastIngestionLogs(String pipelineName) {
|
||||||
|
try {
|
||||||
|
HttpResponse<String> response =
|
||||||
|
requestAuthenticatedForJsonContent("%s/rest_api/api?api=last_dag_logs&dag_id=%s", serviceURL, pipelineName);
|
||||||
|
if (response.statusCode() == 200) {
|
||||||
|
return JsonUtils.readValue(response.body(), new TypeReference<>() {});
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new PipelineServiceClientException("Failed to get last ingestion logs.");
|
||||||
|
}
|
||||||
|
throw new PipelineServiceClientException("Failed to get last ingestion logs.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private HttpResponse<String> requestAuthenticatedForJsonContent(String stringUrlFormat, Object... stringReplacement)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
String token = authenticate();
|
||||||
|
String authToken = String.format(AUTH_TOKEN, token);
|
||||||
|
String url = String.format(stringUrlFormat, stringReplacement);
|
||||||
|
HttpRequest request =
|
||||||
|
HttpRequest.newBuilder(URI.create(url))
|
||||||
|
.header(CONTENT_HEADER, CONTENT_TYPE)
|
||||||
|
.header(AUTH_HEADER, authToken)
|
||||||
|
.GET()
|
||||||
|
.build();
|
||||||
|
return client.send(request, HttpResponse.BodyHandlers.ofString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ import java.io.IOException;
|
|||||||
import java.net.http.HttpResponse;
|
import java.net.http.HttpResponse;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
import javax.json.JsonPatch;
|
import javax.json.JsonPatch;
|
||||||
import javax.validation.Valid;
|
import javax.validation.Valid;
|
||||||
import javax.validation.constraints.Max;
|
import javax.validation.constraints.Max;
|
||||||
@ -469,7 +470,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
|
|||||||
description = "Delete a ingestion by `id`.",
|
description = "Delete a ingestion by `id`.",
|
||||||
responses = {
|
responses = {
|
||||||
@ApiResponse(responseCode = "200", description = "OK"),
|
@ApiResponse(responseCode = "200", description = "OK"),
|
||||||
@ApiResponse(responseCode = "404", description = "ingestion for instance {id} is not found")
|
@ApiResponse(responseCode = "404", description = "Ingestion for instance {id} is not found")
|
||||||
})
|
})
|
||||||
public Response delete(
|
public Response delete(
|
||||||
@Context UriInfo uriInfo,
|
@Context UriInfo uriInfo,
|
||||||
@ -483,6 +484,28 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
|
|||||||
return delete(uriInfo, securityContext, id, false, hardDelete, ADMIN | BOT);
|
return delete(uriInfo, securityContext, id, false, hardDelete, ADMIN | BOT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Path("/logs/{id}/last")
|
||||||
|
@Operation(
|
||||||
|
summary = "Retrieve all logs from last ingestion pipeline run",
|
||||||
|
tags = "IngestionPipelines",
|
||||||
|
description = "Get all logs from last ingestion pipeline run by `id`.",
|
||||||
|
responses = {
|
||||||
|
@ApiResponse(
|
||||||
|
responseCode = "200",
|
||||||
|
description = "JSON object with the task instance name of the ingestion on each key and log in the value",
|
||||||
|
content = @Content(mediaType = "application/json")),
|
||||||
|
@ApiResponse(responseCode = "404", description = "Logs for instance {id} is not found")
|
||||||
|
})
|
||||||
|
public Response getLastIngestionLogs(
|
||||||
|
@Context UriInfo uriInfo,
|
||||||
|
@Context SecurityContext securityContext,
|
||||||
|
@Parameter(description = "Pipeline Id", schema = @Schema(type = "string")) @PathParam("id") String id)
|
||||||
|
throws IOException {
|
||||||
|
Map<String, String> lastIngestionLogs = pipelineServiceClient.getLastIngestionLogs(id);
|
||||||
|
return Response.ok(lastIngestionLogs, MediaType.APPLICATION_JSON_TYPE).build();
|
||||||
|
}
|
||||||
|
|
||||||
private IngestionPipeline getIngestionPipeline(CreateIngestionPipeline create, String user) throws IOException {
|
private IngestionPipeline getIngestionPipeline(CreateIngestionPipeline create, String user) throws IOException {
|
||||||
Source source = buildIngestionSource(create);
|
Source source = buildIngestionSource(create);
|
||||||
OpenMetadataServerConnection openMetadataServerConnection =
|
OpenMetadataServerConnection openMetadataServerConnection =
|
||||||
|
@ -16,6 +16,7 @@ package org.openmetadata.catalog.util;
|
|||||||
import static org.openmetadata.catalog.util.RestUtil.DATE_TIME_FORMAT;
|
import static org.openmetadata.catalog.util.RestUtil.DATE_TIME_FORMAT;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||||
@ -93,6 +94,13 @@ public final class JsonUtils {
|
|||||||
return OBJECT_MAPPER.readValue(json, clz);
|
return OBJECT_MAPPER.readValue(json, clz);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <T> T readValue(String json, TypeReference<T> valueTypeRef) throws IOException {
|
||||||
|
if (json == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return OBJECT_MAPPER.readValue(json, valueTypeRef);
|
||||||
|
}
|
||||||
|
|
||||||
/** Read an array of objects of type {@code T} from json */
|
/** Read an array of objects of type {@code T} from json */
|
||||||
public static <T> List<T> readObjects(String json, Class<T> clz) throws IOException {
|
public static <T> List<T> readObjects(String json, Class<T> clz) throws IOException {
|
||||||
if (json == null) {
|
if (json == null) {
|
||||||
|
@ -8,6 +8,7 @@ import java.net.http.HttpClient;
|
|||||||
import java.net.http.HttpRequest;
|
import java.net.http.HttpRequest;
|
||||||
import java.net.http.HttpResponse;
|
import java.net.http.HttpResponse;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.Map;
|
||||||
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
|
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
|
||||||
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
|
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
|
||||||
import org.openmetadata.catalog.exception.PipelineServiceClientException;
|
import org.openmetadata.catalog.exception.PipelineServiceClientException;
|
||||||
@ -86,4 +87,7 @@ public abstract class PipelineServiceClient {
|
|||||||
|
|
||||||
/* Get the status of a deployed pipeline */
|
/* Get the status of a deployed pipeline */
|
||||||
public abstract IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline);
|
public abstract IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline);
|
||||||
|
|
||||||
|
/* Get the all last run logs of a deployed pipeline */
|
||||||
|
public abstract Map<String, String> getLastIngestionLogs(String pipelineName);
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,101 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.openmetadata.catalog.airflow;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.openmetadata.catalog.exception.PipelineServiceClientException;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
public class AirflowRESTClientIntegrationTest {
|
||||||
|
|
||||||
|
private static final String DAG_NAME = "test_dag";
|
||||||
|
private static final String URI_TO_HANDLE_REQUEST = "/";
|
||||||
|
|
||||||
|
@RegisterExtension private static final HttpServerExtension httpServerExtension = new HttpServerExtension();
|
||||||
|
|
||||||
|
AirflowRESTClient airflowRESTClient;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() {
|
||||||
|
AirflowConfiguration airflowConfiguration = createDefaultAirflowConfiguration();
|
||||||
|
airflowRESTClient = new AirflowRESTClient(airflowConfiguration);
|
||||||
|
httpServerExtension.unregisterHandler();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLastIngestionLogsAreRetrievedWhenStatusCodesAre200() {
|
||||||
|
Map<String, String> expectedMap = Map.of("key1", "value1", "key2", "value2");
|
||||||
|
|
||||||
|
registerMockedEndpoints(200, 200);
|
||||||
|
|
||||||
|
assertEquals(expectedMap, airflowRESTClient.getLastIngestionLogs(DAG_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLastIngestionLogsExceptionWhenLoginFails() {
|
||||||
|
registerMockedEndpoints(404, 200);
|
||||||
|
|
||||||
|
Exception exception =
|
||||||
|
assertThrows(PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(DAG_NAME));
|
||||||
|
|
||||||
|
String expectedMessage = "Failed to get last ingestion logs.";
|
||||||
|
String actualMessage = exception.getMessage();
|
||||||
|
|
||||||
|
assertEquals(expectedMessage, actualMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLastIngestionLogsExceptionWhenStatusCode404() {
|
||||||
|
registerMockedEndpoints(200, 404);
|
||||||
|
|
||||||
|
Exception exception =
|
||||||
|
assertThrows(PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(DAG_NAME));
|
||||||
|
|
||||||
|
String expectedMessage = "Failed to get last ingestion logs.";
|
||||||
|
String actualMessage = exception.getMessage();
|
||||||
|
|
||||||
|
assertEquals(expectedMessage, actualMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
private AirflowConfiguration createDefaultAirflowConfiguration() {
|
||||||
|
AirflowConfiguration airflowConfiguration = new AirflowConfiguration();
|
||||||
|
airflowConfiguration.setApiEndpoint(HttpServerExtension.getUriFor("").toString());
|
||||||
|
airflowConfiguration.setUsername("user");
|
||||||
|
airflowConfiguration.setPassword("pass");
|
||||||
|
airflowConfiguration.setTimeout(60);
|
||||||
|
return airflowConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void registerMockedEndpoints(int loginStatusCode, int lastDagLogStatusCode) {
|
||||||
|
String jsonResponse = "{ \"key1\": \"value1\", \"key2\": \"value2\" }";
|
||||||
|
|
||||||
|
Map<String, MockResponse> pathResponses = new HashMap<>();
|
||||||
|
pathResponses.put(
|
||||||
|
"/rest_api/api?api=last_dag_logs&dag_id=" + DAG_NAME,
|
||||||
|
new MockResponse(jsonResponse, "application/json", lastDagLogStatusCode));
|
||||||
|
pathResponses.put("/api/v1/security/login", new MockResponse("{}", "application/json", loginStatusCode));
|
||||||
|
|
||||||
|
httpServerExtension.registerHandler(URI_TO_HANDLE_REQUEST, new JsonHandler(pathResponses));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,72 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.openmetadata.catalog.airflow;
|
||||||
|
|
||||||
|
import com.sun.net.httpserver.HttpHandler;
|
||||||
|
import com.sun.net.httpserver.HttpServer;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.ServerSocket;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import org.apache.http.client.utils.URIBuilder;
|
||||||
|
import org.junit.jupiter.api.extension.AfterAllCallback;
|
||||||
|
import org.junit.jupiter.api.extension.BeforeAllCallback;
|
||||||
|
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||||
|
|
||||||
|
public class HttpServerExtension implements BeforeAllCallback, AfterAllCallback {
|
||||||
|
|
||||||
|
private static final int PORT;
|
||||||
|
|
||||||
|
static {
|
||||||
|
try (ServerSocket socket = new ServerSocket(0)) {
|
||||||
|
socket.setReuseAddress(true);
|
||||||
|
PORT = socket.getLocalPort();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new RuntimeException("Could not find a free port for testing");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final String HOST = "localhost";
|
||||||
|
private static final String SCHEME = "http";
|
||||||
|
private static final String DEFAULT_CONTEXT = "/";
|
||||||
|
|
||||||
|
private com.sun.net.httpserver.HttpServer server;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterAll(ExtensionContext extensionContext) throws Exception {
|
||||||
|
if (server != null) {
|
||||||
|
server.stop(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeAll(ExtensionContext extensionContext) throws Exception {
|
||||||
|
server = HttpServer.create(new InetSocketAddress(PORT), 0);
|
||||||
|
server.setExecutor(null);
|
||||||
|
server.start();
|
||||||
|
server.createContext(DEFAULT_CONTEXT);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static URI getUriFor(String path) throws URISyntaxException {
|
||||||
|
return new URIBuilder().setScheme(SCHEME).setHost(HOST).setPort(PORT).setPath(path).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void registerHandler(String uriToHandle, HttpHandler httpHandler) {
|
||||||
|
server.createContext(uriToHandle, httpHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unregisterHandler() {
|
||||||
|
server.removeContext(DEFAULT_CONTEXT);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,38 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.openmetadata.catalog.airflow;
|
||||||
|
|
||||||
|
import com.sun.net.httpserver.HttpExchange;
|
||||||
|
import com.sun.net.httpserver.HttpHandler;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
|
||||||
|
class JsonHandler implements HttpHandler {
|
||||||
|
|
||||||
|
Map<String, MockResponse> pathResponses;
|
||||||
|
|
||||||
|
public JsonHandler(Map<String, MockResponse> pathResponses) {
|
||||||
|
this.pathResponses = pathResponses;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(HttpExchange exchange) throws IOException {
|
||||||
|
MockResponse response = pathResponses.get(exchange.getRequestURI().toString());
|
||||||
|
exchange.getResponseHeaders().add("Content-Type", response.getContentType());
|
||||||
|
exchange.sendResponseHeaders(response.getStatusCode(), response.getBody().length());
|
||||||
|
IOUtils.write(response.getBody(), exchange.getResponseBody(), Charset.defaultCharset());
|
||||||
|
exchange.close();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.openmetadata.catalog.airflow;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class MockResponse {
|
||||||
|
|
||||||
|
private final String body;
|
||||||
|
private final String contentType;
|
||||||
|
private final int statusCode;
|
||||||
|
}
|
@ -0,0 +1,81 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.openmetadata.catalog.resources.services.ingestionpipelines;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mockConstruction;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import javax.ws.rs.core.SecurityContext;
|
||||||
|
import javax.ws.rs.core.UriInfo;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.MockedConstruction;
|
||||||
|
import org.mockito.MockedConstruction.Context;
|
||||||
|
import org.mockito.Spy;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.openmetadata.catalog.CatalogApplicationConfig;
|
||||||
|
import org.openmetadata.catalog.airflow.AirflowRESTClient;
|
||||||
|
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||||
|
import org.openmetadata.catalog.security.Authorizer;
|
||||||
|
import org.openmetadata.catalog.util.PipelineServiceClient;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
public class IngestionPipelineResourceUnitTest {
|
||||||
|
|
||||||
|
private static final String DAG_NAME = "test_dag";
|
||||||
|
|
||||||
|
private IngestionPipelineResource ingestionPipelineResource;
|
||||||
|
|
||||||
|
@Mock UriInfo uriInfo;
|
||||||
|
|
||||||
|
@Mock SecurityContext securityContext;
|
||||||
|
|
||||||
|
@Mock Authorizer authorizer;
|
||||||
|
|
||||||
|
@Mock CollectionDAO collectionDAO;
|
||||||
|
|
||||||
|
@Spy CollectionDAO.IngestionPipelineDAO ingestionPipelineDAO;
|
||||||
|
|
||||||
|
@Mock CatalogApplicationConfig catalogApplicationConfig;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() {
|
||||||
|
doReturn(ingestionPipelineDAO).when(collectionDAO).ingestionPipelineDAO();
|
||||||
|
ingestionPipelineResource = new IngestionPipelineResource(collectionDAO, authorizer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLastIngestionLogsAreRetrievedWhen() throws IOException {
|
||||||
|
Map<String, String> expectedMap = Map.of("task", "log");
|
||||||
|
try (MockedConstruction<AirflowRESTClient> mocked =
|
||||||
|
mockConstruction(AirflowRESTClient.class, this::preparePipelineServiceClient)) {
|
||||||
|
ingestionPipelineResource.initialize(catalogApplicationConfig);
|
||||||
|
assertEquals(
|
||||||
|
expectedMap, ingestionPipelineResource.getLastIngestionLogs(uriInfo, securityContext, DAG_NAME).getEntity());
|
||||||
|
PipelineServiceClient client = mocked.constructed().get(0);
|
||||||
|
verify(client).getLastIngestionLogs(DAG_NAME);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void preparePipelineServiceClient(AirflowRESTClient mockPipelineServiceClient, Context context) {
|
||||||
|
doReturn(Map.of("task", "log")).when(mockPipelineServiceClient).getLastIngestionLogs(anyString());
|
||||||
|
}
|
||||||
|
}
|
@ -17,7 +17,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import javax.json.Json;
|
import javax.json.Json;
|
||||||
import javax.json.JsonArrayBuilder;
|
import javax.json.JsonArrayBuilder;
|
||||||
@ -97,4 +99,12 @@ class JsonUtilsTest {
|
|||||||
assertThrows(JsonException.class, () -> JsonUtils.applyPatch(original, jsonPatchBuilder2.build(), Team.class));
|
assertThrows(JsonException.class, () -> JsonUtils.applyPatch(original, jsonPatchBuilder2.build(), Team.class));
|
||||||
assertTrue(jsonException.getMessage().contains("contains no element for index 3"));
|
assertTrue(jsonException.getMessage().contains("contains no element for index 3"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReadValuePassingTypeReference() throws IOException {
|
||||||
|
Map<String, String> expectedMap = Map.of("key1", "value1", "key2", "value2");
|
||||||
|
String json = "{ \"key1\": \"value1\", \"key2\": \"value2\" }";
|
||||||
|
TypeReference<Map<String, String>> mapTypeReference = new TypeReference<>() {};
|
||||||
|
assertEquals(expectedMap, JsonUtils.readValue(json, mapTypeReference));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -195,7 +195,7 @@ curl -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE
|
|||||||
- Delete dag based on dag_id.
|
- Delete dag based on dag_id.
|
||||||
##### Endpoint:
|
##### Endpoint:
|
||||||
```text
|
```text
|
||||||
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=delete_dag&dag_id=value
|
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/rest_api/api?api=delete_dag&dag_id=value
|
||||||
```
|
```
|
||||||
##### Method:
|
##### Method:
|
||||||
- GET
|
- GET
|
||||||
@ -203,7 +203,7 @@ http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=delete_dag&dag_id=va
|
|||||||
- dag_id - string - The id of dag.
|
- dag_id - string - The id of dag.
|
||||||
##### Examples:
|
##### Examples:
|
||||||
```bash
|
```bash
|
||||||
curl -X GET http://localhost:8080/admin/rest_api/api?api=delete_dag&dag_id=dag_test
|
curl -X GET http://localhost:8080/rest_api/api?api=delete_dag&dag_id=dag_test
|
||||||
```
|
```
|
||||||
##### response:
|
##### response:
|
||||||
```json
|
```json
|
||||||
|
@ -79,6 +79,19 @@ APIS_METADATA = [
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"name": "last_dag_logs",
|
||||||
|
"description": "Retrieve all logs from the task instances of a last DAG run",
|
||||||
|
"http_method": "GET",
|
||||||
|
"arguments": [
|
||||||
|
{
|
||||||
|
"name": "dag_id",
|
||||||
|
"description": "The id of the dag",
|
||||||
|
"form_input_type": "text",
|
||||||
|
"required": True,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name": "rest_status",
|
"name": "rest_status",
|
||||||
"description": "Get the status of Airflow REST status",
|
"description": "Get the status of Airflow REST status",
|
||||||
|
@ -18,7 +18,7 @@ import airflow
|
|||||||
from airflow import configuration
|
from airflow import configuration
|
||||||
from openmetadata import __version__
|
from openmetadata import __version__
|
||||||
|
|
||||||
REST_API_ENDPOINT = "/admin/rest_api/api"
|
REST_API_ENDPOINT = "/rest_api/api"
|
||||||
|
|
||||||
# Getting Versions and Global variables
|
# Getting Versions and Global variables
|
||||||
HOSTNAME = socket.gethostname()
|
HOSTNAME = socket.gethostname()
|
||||||
|
@ -34,6 +34,7 @@ from openmetadata.api.response import ApiResponse
|
|||||||
from openmetadata.api.utils import jwt_token_secure
|
from openmetadata.api.utils import jwt_token_secure
|
||||||
from openmetadata.operations.delete import delete_dag_id
|
from openmetadata.operations.delete import delete_dag_id
|
||||||
from openmetadata.operations.deploy import DagDeployer
|
from openmetadata.operations.deploy import DagDeployer
|
||||||
|
from openmetadata.operations.last_dag_logs import last_dag_logs
|
||||||
from openmetadata.operations.status import status
|
from openmetadata.operations.status import status
|
||||||
from openmetadata.operations.test_connection import test_source_connection
|
from openmetadata.operations.test_connection import test_source_connection
|
||||||
from openmetadata.operations.trigger import trigger
|
from openmetadata.operations.trigger import trigger
|
||||||
@ -130,6 +131,8 @@ class REST_API(AppBuilderBaseView):
|
|||||||
return self.dag_status()
|
return self.dag_status()
|
||||||
if api == "delete_dag":
|
if api == "delete_dag":
|
||||||
return self.delete_dag()
|
return self.delete_dag()
|
||||||
|
if api == "last_dag_logs":
|
||||||
|
return self.last_dag_logs()
|
||||||
|
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Invalid api param {api}. Expected deploy_dag or trigger_dag."
|
f"Invalid api param {api}. Expected deploy_dag or trigger_dag."
|
||||||
@ -276,3 +279,25 @@ class REST_API(AppBuilderBaseView):
|
|||||||
status=ApiResponse.STATUS_SERVER_ERROR,
|
status=ApiResponse.STATUS_SERVER_ERROR,
|
||||||
error=f"Failed to delete {dag_id} due to {exc} - {traceback.format_exc()}",
|
error=f"Failed to delete {dag_id} due to {exc} - {traceback.format_exc()}",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def last_dag_logs(self) -> Response:
|
||||||
|
"""
|
||||||
|
Retrieve all logs from the task instances of a last DAG run
|
||||||
|
"""
|
||||||
|
dag_id: str = self.get_request_arg(request, "dag_id")
|
||||||
|
|
||||||
|
if not dag_id:
|
||||||
|
return ApiResponse.error(
|
||||||
|
status=ApiResponse.STATUS_BAD_REQUEST,
|
||||||
|
error=f"Missing dag_id argument in the request",
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return last_dag_logs(dag_id)
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logging.info(f"Failed to get last run logs for '{dag_id}'")
|
||||||
|
return ApiResponse.error(
|
||||||
|
status=ApiResponse.STATUS_SERVER_ERROR,
|
||||||
|
error=f"Failed to get last run logs for '{dag_id}' due to {exc} - {traceback.format_exc()}",
|
||||||
|
)
|
||||||
|
@ -0,0 +1,67 @@
|
|||||||
|
# Copyright 2022 Collate
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
"""
|
||||||
|
Module containing the logic to retrieve all logs from the tasks of a last DAG run
|
||||||
|
"""
|
||||||
|
import glob
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from airflow.models import DagModel, DagRun
|
||||||
|
from flask import Response
|
||||||
|
from openmetadata.api.response import ApiResponse, ResponseFormat
|
||||||
|
|
||||||
|
|
||||||
|
def last_dag_logs(dag_id: str) -> Response:
|
||||||
|
"""
|
||||||
|
Validate that the DAG is registered by Airflow and have at least one Run.
|
||||||
|
If exists, returns all logs for each task instance of the last DAG run.
|
||||||
|
:param dag_id: DAG to find
|
||||||
|
:return: API Response
|
||||||
|
"""
|
||||||
|
|
||||||
|
dag_model = DagModel.get_dagmodel(dag_id=dag_id)
|
||||||
|
|
||||||
|
if not dag_model:
|
||||||
|
return ApiResponse.not_found(f"DAG '{dag_id}' not found.")
|
||||||
|
|
||||||
|
last_dag_run = dag_model.get_last_dagrun()
|
||||||
|
|
||||||
|
if not last_dag_run:
|
||||||
|
return ApiResponse.not_found(f"No DAG run found for '{dag_id}'.")
|
||||||
|
|
||||||
|
task_instances = last_dag_run.get_task_instances()
|
||||||
|
|
||||||
|
response = {}
|
||||||
|
|
||||||
|
for task_instance in task_instances:
|
||||||
|
if os.path.isfile(task_instance.log_filepath):
|
||||||
|
response[task_instance.task_id] = Path(
|
||||||
|
task_instance.log_filepath
|
||||||
|
).read_text()
|
||||||
|
# logs could be kept in a directory with the same name than the log file path without extension per attempt
|
||||||
|
elif os.path.isdir(os.path.splitext(task_instance.log_filepath)[0]):
|
||||||
|
dir_path = os.path.splitext(task_instance.log_filepath)[0]
|
||||||
|
sorted_logs = sorted(
|
||||||
|
filter(os.path.isfile, glob.glob(f"{dir_path}/*.log")),
|
||||||
|
key=os.path.getmtime,
|
||||||
|
)
|
||||||
|
response[
|
||||||
|
task_instance.task_id
|
||||||
|
] = f"\n*** Reading local file: {task_instance.log_filepath}\n".join(
|
||||||
|
[Path(log).read_text() for log in sorted_logs]
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return ApiResponse.not_found(
|
||||||
|
f"Logs for task instance '{task_instance}' of DAG '{dag_id}' not found."
|
||||||
|
)
|
||||||
|
|
||||||
|
return ApiResponse.success(response)
|
12
pom.xml
12
pom.xml
@ -345,6 +345,18 @@
|
|||||||
<version>2.0.2-beta</version>
|
<version>2.0.2-beta</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-inline</artifactId>
|
||||||
|
<version>${mockito.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-junit-jupiter</artifactId>
|
||||||
|
<version>${mockito.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.amazon.redshift</groupId>
|
<groupId>com.amazon.redshift</groupId>
|
||||||
<artifactId>redshift-jdbc42</artifactId>
|
<artifactId>redshift-jdbc42</artifactId>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user