diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java index cdd25275e9b..543af31ab6a 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java @@ -216,10 +216,11 @@ public class AirflowRESTClient extends PipelineServiceClient { } @Override - public Map getLastIngestionLogs(String pipelineName) { + public Map getLastIngestionLogs(IngestionPipeline ingestionPipeline) { try { HttpResponse response = - requestAuthenticatedForJsonContent("%s/rest_api/api?api=last_dag_logs&dag_id=%s", serviceURL, pipelineName); + requestAuthenticatedForJsonContent( + "%s/rest_api/api?api=last_dag_logs&dag_id=%s", serviceURL, ingestionPipeline.getName()); if (response.statusCode() == 200) { return JsonUtils.readValue(response.body(), new TypeReference<>() {}); } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java index fb17d93605d..446ba425b54 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResource.java @@ -524,7 +524,8 @@ public class IngestionPipelineResource extends EntityResource lastIngestionLogs = pipelineServiceClient.getLastIngestionLogs(id); + IngestionPipeline ingestionPipeline = getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED); + Map lastIngestionLogs = pipelineServiceClient.getLastIngestionLogs(ingestionPipeline); return Response.ok(lastIngestionLogs, MediaType.APPLICATION_JSON_TYPE).build(); } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java index 6837dd99061..a3cf4d1ef20 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java @@ -92,5 +92,5 @@ public abstract class PipelineServiceClient { public abstract IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline); /* Get the all last run logs of a deployed pipeline */ - public abstract Map getLastIngestionLogs(String pipelineName); + public abstract Map getLastIngestionLogs(IngestionPipeline ingestionPipeline); } diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/AirflowRESTClientIntegrationTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/AirflowRESTClientIntegrationTest.java index 5c84f45f0d4..3b75976ac60 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/AirflowRESTClientIntegrationTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/airflow/AirflowRESTClientIntegrationTest.java @@ -14,15 +14,21 @@ package org.openmetadata.catalog.airflow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResourceTest.DATABASE_METADATA_CONFIG; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import lombok.SneakyThrows; +import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.junit.jupiter.MockitoExtension; +import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig; +import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType; import org.openmetadata.catalog.exception.PipelineServiceClientException; @ExtendWith(MockitoExtension.class) @@ -31,6 +37,14 @@ public class AirflowRESTClientIntegrationTest { private static final String DAG_NAME = "test_dag"; private static final String URI_TO_HANDLE_REQUEST = "/"; + public static final IngestionPipeline INGESTION_PIPELINE = + new IngestionPipeline() + .withName(DAG_NAME) + .withId(UUID.randomUUID()) + .withPipelineType(PipelineType.METADATA) + .withSourceConfig(DATABASE_METADATA_CONFIG) + .withAirflowConfig(new AirflowConfig().withStartDate(new DateTime("2022-06-10T15:06:47+00:00").toDate())); + @RegisterExtension private static final HttpServerExtension httpServerExtension = new HttpServerExtension(); AirflowRESTClient airflowRESTClient; @@ -48,7 +62,7 @@ public class AirflowRESTClientIntegrationTest { registerMockedEndpoints(200, 200); - assertEquals(expectedMap, airflowRESTClient.getLastIngestionLogs(DAG_NAME)); + assertEquals(expectedMap, airflowRESTClient.getLastIngestionLogs(INGESTION_PIPELINE)); } @Test @@ -56,7 +70,8 @@ public class AirflowRESTClientIntegrationTest { registerMockedEndpoints(404, 200); Exception exception = - assertThrows(PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(DAG_NAME)); + assertThrows( + PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(INGESTION_PIPELINE)); String expectedMessage = "Failed to get last ingestion logs."; String actualMessage = exception.getMessage(); @@ -69,7 +84,8 @@ public class AirflowRESTClientIntegrationTest { registerMockedEndpoints(200, 404); Exception exception = - assertThrows(PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(DAG_NAME)); + assertThrows( + PipelineServiceClientException.class, () -> airflowRESTClient.getLastIngestionLogs(INGESTION_PIPELINE)); String expectedMessage = "Failed to get last ingestion logs."; String actualMessage = exception.getMessage(); diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTest.java deleted file mode 100644 index 097bb2913f6..00000000000 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2022 Collate - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.openmetadata.catalog.resources.services.ingestionpipelines; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mockConstruction; -import static org.mockito.Mockito.verify; - -import java.io.IOException; -import java.util.Map; -import javax.ws.rs.core.SecurityContext; -import javax.ws.rs.core.UriInfo; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.MockedConstruction; -import org.mockito.MockedConstruction.Context; -import org.mockito.Spy; -import org.mockito.junit.jupiter.MockitoExtension; -import org.openmetadata.catalog.CatalogApplicationConfig; -import org.openmetadata.catalog.airflow.AirflowRESTClient; -import org.openmetadata.catalog.jdbi3.CollectionDAO; -import org.openmetadata.catalog.security.Authorizer; -import org.openmetadata.catalog.util.PipelineServiceClient; - -@ExtendWith(MockitoExtension.class) -public class IngestionPipelineResourceUnitTest { - - private static final String DAG_NAME = "test_dag"; - - private IngestionPipelineResource ingestionPipelineResource; - - @Mock UriInfo uriInfo; - - @Mock SecurityContext securityContext; - - @Mock Authorizer authorizer; - - @Mock CollectionDAO collectionDAO; - - @Spy CollectionDAO.IngestionPipelineDAO ingestionPipelineDAO; - - @Mock CatalogApplicationConfig catalogApplicationConfig; - - @BeforeEach - void setUp() { - doReturn(ingestionPipelineDAO).when(collectionDAO).ingestionPipelineDAO(); - ingestionPipelineResource = new IngestionPipelineResource(collectionDAO, authorizer); - } - - @Test - public void testLastIngestionLogsAreRetrievedWhen() throws IOException { - Map expectedMap = Map.of("task", "log"); - try (MockedConstruction mocked = - mockConstruction(AirflowRESTClient.class, this::preparePipelineServiceClient)) { - ingestionPipelineResource.initialize(catalogApplicationConfig); - assertEquals( - expectedMap, ingestionPipelineResource.getLastIngestionLogs(uriInfo, securityContext, DAG_NAME).getEntity()); - PipelineServiceClient client = mocked.constructed().get(0); - verify(client).getLastIngestionLogs(DAG_NAME); - } - } - - private void preparePipelineServiceClient(AirflowRESTClient mockPipelineServiceClient, Context context) { - doReturn(Map.of("task", "log")).when(mockPipelineServiceClient).getLastIngestionLogs(anyString()); - } -}