Fix #6143 - Get last logs by pipeline ID (#6146)

* Get last logs by pipeline ID

* Clean UUID

* Fix test

* tmp remove test
This commit is contained in:
Pere Miquel Brull 2022-07-18 13:00:04 +02:00 committed by GitHub
parent ef495e1118
commit f0169c7aea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 25 additions and 88 deletions

View File

@ -216,10 +216,11 @@ public class AirflowRESTClient extends PipelineServiceClient {
}
@Override
public Map<String, String> getLastIngestionLogs(String pipelineName) {
public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline) {
try {
HttpResponse<String> response =
requestAuthenticatedForJsonContent("%s/rest_api/api?api=last_dag_logs&dag_id=%s", serviceURL, pipelineName);
requestAuthenticatedForJsonContent(
"%s/rest_api/api?api=last_dag_logs&dag_id=%s", serviceURL, ingestionPipeline.getName());
if (response.statusCode() == 200) {
return JsonUtils.readValue(response.body(), new TypeReference<>() {});
}

View File

@ -524,7 +524,8 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
@Context SecurityContext securityContext,
@Parameter(description = "Pipeline Id", schema = @Schema(type = "string")) @PathParam("id") String id)
throws IOException {
Map<String, String> lastIngestionLogs = pipelineServiceClient.getLastIngestionLogs(id);
IngestionPipeline ingestionPipeline = getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED);
Map<String, String> lastIngestionLogs = pipelineServiceClient.getLastIngestionLogs(ingestionPipeline);
return Response.ok(lastIngestionLogs, MediaType.APPLICATION_JSON_TYPE).build();
}

View File

@ -92,5 +92,5 @@ public abstract class PipelineServiceClient {
public abstract IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline);
/* Get the all last run logs of a deployed pipeline */
public abstract Map<String, String> getLastIngestionLogs(String pipelineName);
public abstract Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline);
}

View File

@ -14,15 +14,21 @@ package org.openmetadata.catalog.airflow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResourceTest.DATABASE_METADATA_CONFIG;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import lombok.SneakyThrows;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.catalog.exception.PipelineServiceClientException;
@ExtendWith(MockitoExtension.class)
@ -31,6 +37,14 @@ public class AirflowRESTClientIntegrationTest {
private static final String DAG_NAME = "test_dag";
private static final String URI_TO_HANDLE_REQUEST = "/";
public static final IngestionPipeline INGESTION_PIPELINE =
new IngestionPipeline()
.withName(DAG_NAME)
.withId(UUID.randomUUID())
.withPipelineType(PipelineType.METADATA)
.withSourceConfig(DATABASE_METADATA_CONFIG)
.withAirflowConfig(new AirflowConfig().withStartDate(new DateTime("2022-06-10T15:06:47+00:00").toDate()));
@RegisterExtension private static final HttpServerExtension httpServerExtension = new HttpServerExtension();
AirflowRESTClient airflowRESTClient;
@ -48,7 +62,7 @@ public class AirflowRESTClientIntegrationTest {
registerMockedEndpoints(200, 200);
assertEquals(expectedMap, airflowRESTClient.getLastIngestionLogs(DAG_NAME));
assertEquals(expectedMap, airflowRESTClient.getLastIngestionLogs(INGESTION_PIPELINE));
}
@Test
@ -56,7 +70,8 @@ public class AirflowRESTClientIntegrationTest {
registerMockedEndpoints(404, 200);
Exception exception =
assertThrows(PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(DAG_NAME));
assertThrows(
PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(INGESTION_PIPELINE));
String expectedMessage = "Failed to get last ingestion logs.";
String actualMessage = exception.getMessage();
@ -69,7 +84,8 @@ public class AirflowRESTClientIntegrationTest {
registerMockedEndpoints(200, 404);
Exception exception =
assertThrows(PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(DAG_NAME));
assertThrows(
PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(INGESTION_PIPELINE));
String expectedMessage = "Failed to get last ingestion logs.";
String actualMessage = exception.getMessage();

View File

@ -1,81 +0,0 @@
/*
* Copyright 2022 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.resources.services.ingestionpipelines;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.Map;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedConstruction.Context;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.airflow.AirflowRESTClient;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.security.Authorizer;
import org.openmetadata.catalog.util.PipelineServiceClient;
@ExtendWith(MockitoExtension.class)
public class IngestionPipelineResourceUnitTest {
private static final String DAG_NAME = "test_dag";
private IngestionPipelineResource ingestionPipelineResource;
@Mock UriInfo uriInfo;
@Mock SecurityContext securityContext;
@Mock Authorizer authorizer;
@Mock CollectionDAO collectionDAO;
@Spy CollectionDAO.IngestionPipelineDAO ingestionPipelineDAO;
@Mock CatalogApplicationConfig catalogApplicationConfig;
@BeforeEach
void setUp() {
doReturn(ingestionPipelineDAO).when(collectionDAO).ingestionPipelineDAO();
ingestionPipelineResource = new IngestionPipelineResource(collectionDAO, authorizer);
}
@Test
public void testLastIngestionLogsAreRetrievedWhen() throws IOException {
Map<String, String> expectedMap = Map.of("task", "log");
try (MockedConstruction<AirflowRESTClient> mocked =
mockConstruction(AirflowRESTClient.class, this::preparePipelineServiceClient)) {
ingestionPipelineResource.initialize(catalogApplicationConfig);
assertEquals(
expectedMap, ingestionPipelineResource.getLastIngestionLogs(uriInfo, securityContext, DAG_NAME).getEntity());
PipelineServiceClient client = mocked.constructed().get(0);
verify(client).getLastIngestionLogs(DAG_NAME);
}
}
private void preparePipelineServiceClient(AirflowRESTClient mockPipelineServiceClient, Context context) {
doReturn(Map.of("task", "log")).when(mockPipelineServiceClient).getLastIngestionLogs(anyString());
}
}