#15251 - Fix overwriting of airflow path segments (#15326)

This commit is contained in:
Pere Miquel Brull 2024-02-27 09:30:28 +01:00 committed by GitHub
parent ec3eb297f5
commit 10f9fe82a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 32 additions and 80 deletions

View File

@ -610,6 +610,7 @@
<phase>test</phase>
<configuration>
<includes>
<include>**/*Test.java</include>
<include>**/*ResourceTest.java</include>
</includes>
</configuration>
@ -649,6 +650,7 @@
<phase>test</phase>
<configuration>
<includes>
<include>**/*Test.java</include>
<include>**/*ResourceTest.java</include>
</includes>
</configuration>

View File

@ -455,11 +455,14 @@ public class AirflowRESTClient extends PipelineServiceClient {
String.format("Failed to get last ingestion logs due to %s", response.body()));
}
private URIBuilder buildURI(String path) {
public URIBuilder buildURI(String path) {
try {
List<String> pathInternal = new ArrayList<>(API_ENDPOINT_SEGMENTS);
pathInternal.add(path);
return new URIBuilder(String.valueOf(serviceURL)).setPathSegments(pathInternal);
URIBuilder builder = new URIBuilder(String.valueOf(serviceURL));
List<String> segments = new ArrayList<>(builder.getPathSegments());
segments.addAll(pathInternal);
return builder.setPathSegments(segments);
} catch (Exception e) {
throw clientException(String.format("Failed to built request URI for path [%s].", path), e);
}

View File

@ -13,56 +13,44 @@
package org.openmetadata.service.pipelineService.airflow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.openmetadata.service.resources.services.ingestionpipelines.IngestionPipelineResourceTest.DATABASE_METADATA_CONFIG;
import java.net.URISyntaxException;
import java.security.KeyStoreException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.Parameters;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.sdk.exception.PipelineServiceClientException;
import org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient;
@ExtendWith(MockitoExtension.class)
class AirflowRESTClientIntegrationTest {
class AirflowRESTClientTest {
private static final String DAG_NAME = "test_dag";
private static final String URI_TO_HANDLE_REQUEST = "/";
@Test
void test_buildUri() throws KeyStoreException {
// We build the right URI for a simple url
PipelineServiceClientConfiguration config = getPipelineServiceConfiguration();
AirflowRESTClient restClient = new AirflowRESTClient(config);
assertEquals(
"http://localhost:8080/api/v1/openmetadata/last_dag_logs",
restClient.buildURI("last_dag_logs").toString());
public static final IngestionPipeline INGESTION_PIPELINE =
new IngestionPipeline()
.withName(DAG_NAME)
.withId(UUID.randomUUID())
.withPipelineType(PipelineType.METADATA)
.withSourceConfig(DATABASE_METADATA_CONFIG)
.withAirflowConfig(
new AirflowConfig()
.withStartDate(new DateTime("2022-06-10T15:06:47+00:00").toDate()));
// We build the right URI for a service URLs with paths
config.setApiEndpoint("http://localhost:8080/airflow");
restClient = new AirflowRESTClient(config);
assertEquals(
"http://localhost:8080/airflow/api/v1/openmetadata/last_dag_logs",
restClient.buildURI("last_dag_logs").toString());
@RegisterExtension
private static final HttpServerExtension httpServerExtension = new HttpServerExtension();
AirflowRESTClient airflowRESTClient;
@BeforeEach
void setUp() throws URISyntaxException, KeyStoreException {
// The same works with more segments
config.setApiEndpoint("http://localhost:8080/airflow/foo");
restClient = new AirflowRESTClient(config);
assertEquals(
"http://localhost:8080/airflow/foo/api/v1/openmetadata/health",
restClient.buildURI("health").toString());
}
private PipelineServiceClientConfiguration getPipelineServiceConfiguration() {
PipelineServiceClientConfiguration pipelineServiceClientConfiguration =
new PipelineServiceClientConfiguration();
pipelineServiceClientConfiguration.setHostIp("111.11.11.1");
pipelineServiceClientConfiguration.setApiEndpoint(HttpServerExtension.getUriFor("").toString());
pipelineServiceClientConfiguration.setApiEndpoint("http://localhost:8080");
Parameters params = new Parameters();
params.setAdditionalProperty("username", "user");
@ -71,47 +59,6 @@ class AirflowRESTClientIntegrationTest {
pipelineServiceClientConfiguration.setParameters(params);
airflowRESTClient = new AirflowRESTClient(pipelineServiceClientConfiguration);
httpServerExtension.unregisterHandler();
}
@Test
void testLastIngestionLogsAreRetrievedWhenStatusCodesAre200() {
Map<String, String> expectedMap = Map.of("key1", "value1", "key2", "value2");
registerMockedEndpoints(200);
assertEquals(expectedMap, airflowRESTClient.getLastIngestionLogs(INGESTION_PIPELINE, "after"));
}
@Test
void testLastIngestionLogsExceptionWhenStatusCode404() {
registerMockedEndpoints(404);
Exception exception =
assertThrows(
PipelineServiceClientException.class,
() -> airflowRESTClient.getLastIngestionLogs(INGESTION_PIPELINE, "after"));
String expectedMessage = "Failed to get last ingestion logs due to 404 - Not Found";
String actualMessage = exception.getMessage();
assertEquals(expectedMessage, actualMessage);
}
private void registerMockedEndpoints(int lastDagLogStatusCode) {
String jsonResponse = "{ \"key1\": \"value1\", \"key2\": \"value2\" }";
if (lastDagLogStatusCode == 404) {
jsonResponse = "404 - Not Found";
}
Map<String, MockResponse> pathResponses = new HashMap<>();
pathResponses.put(
"/api/v1/openmetadata/last_dag_logs?dag_id="
+ DAG_NAME
+ "&task_id=ingestion_task&after=after",
new MockResponse(jsonResponse, "application/json", lastDagLogStatusCode));
httpServerExtension.registerHandler(URI_TO_HANDLE_REQUEST, new JsonHandler(pathResponses));
return pipelineServiceClientConfiguration;
}
}