Fix #4675: Ingestion deployment from UI is broken (#4676)

This commit is contained in:
Sriharsha Chintalapani 2022-05-03 22:10:01 -07:00 committed by GitHub
parent e37ef3dfa3
commit a5f7a69a2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 32 additions and 22 deletions

View File

@ -368,12 +368,35 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
return createOrUpdate(uriInfo, securityContext, ingestionPipeline, ADMIN | BOT | OWNER); return createOrUpdate(uriInfo, securityContext, ingestionPipeline, ADMIN | BOT | OWNER);
} }
@POST
@Path("/deploy/{id}")
@Operation(
summary = "Deploy a ingestion pipeline run",
tags = "IngestionPipelines",
description = "Trigger a ingestion pipeline run by id.",
responses = {
@ApiResponse(
responseCode = "200",
description = "The ingestion",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = IngestionPipeline.class))),
@ApiResponse(responseCode = "404", description = "Ingestion for instance {name} is not found")
})
public IngestionPipeline deployIngestion(
@Context UriInfo uriInfo, @PathParam("id") String id, @Context SecurityContext securityContext)
throws IOException {
Fields fields = getFields(FIELD_OWNER);
IngestionPipeline pipeline = dao.get(uriInfo, id, fields);
pipelineServiceClient.deployPipeline(pipeline);
return addHref(uriInfo, dao.get(uriInfo, id, fields));
}
@POST @POST
@Path("/trigger/{id}") @Path("/trigger/{id}")
@Operation( @Operation(
summary = "Trigger a airflow pipeline run", summary = "Trigger a ingestion pipeline run",
tags = "IngestionPipelines", tags = "IngestionPipelines",
description = "Trigger a airflow pipeline run by id.", description = "Trigger a ingestion pipeline run by id.",
responses = { responses = {
@ApiResponse( @ApiResponse(
responseCode = "200", responseCode = "200",

View File

@ -55,14 +55,15 @@ public abstract class PipelineServiceClient {
public final HttpResponse<String> post(String endpoint, String payload, boolean authenticate) public final HttpResponse<String> post(String endpoint, String payload, boolean authenticate)
throws IOException, InterruptedException { throws IOException, InterruptedException {
String authToken = authenticate ? String.format(AUTH_TOKEN, authenticate()) : null; String authToken = authenticate ? authenticate() : null;
HttpRequest request = HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder(URI.create(endpoint)) HttpRequest.newBuilder(URI.create(endpoint))
.header(CONTENT_HEADER, CONTENT_TYPE) .header(CONTENT_HEADER, CONTENT_TYPE)
.header(AUTH_HEADER, authToken) .POST(HttpRequest.BodyPublishers.ofString(payload));
.POST(HttpRequest.BodyPublishers.ofString(payload)) if (authenticate) {
.build(); requestBuilder.header(AUTH_HEADER, String.format(AUTH_TOKEN, authToken));
return client.send(request, HttpResponse.BodyHandlers.ofString()); }
return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
} }
/* Authenticate with the service */ /* Authenticate with the service */

View File

@ -22,7 +22,6 @@ import static org.openmetadata.catalog.util.TestUtils.ADMIN_AUTH_HEADERS;
import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE; import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE;
import static org.openmetadata.catalog.util.TestUtils.assertListNotNull; import static org.openmetadata.catalog.util.TestUtils.assertListNotNull;
import static org.openmetadata.catalog.util.TestUtils.assertListNull; import static org.openmetadata.catalog.util.TestUtils.assertListNull;
import static org.openmetadata.catalog.util.TestUtils.assertResponse;
import static org.openmetadata.catalog.util.TestUtils.assertResponseContains; import static org.openmetadata.catalog.util.TestUtils.assertResponseContains;
import java.io.IOException; import java.io.IOException;
@ -50,7 +49,6 @@ import org.openmetadata.catalog.entity.services.DatabaseService;
import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig; import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType; import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.catalog.exception.IngestionPipelineDeploymentException;
import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceEntityInterface; import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceEntityInterface;
import org.openmetadata.catalog.jdbi3.IngestionPipelineRepository; import org.openmetadata.catalog.jdbi3.IngestionPipelineRepository;
import org.openmetadata.catalog.metadataIngestion.DashboardServiceMetadataPipeline; import org.openmetadata.catalog.metadataIngestion.DashboardServiceMetadataPipeline;
@ -200,18 +198,6 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
assertResponseContains(() -> createEntity(create, ADMIN_AUTH_HEADERS), BAD_REQUEST, "service must not be null"); assertResponseContains(() -> createEntity(create, ADMIN_AUTH_HEADERS), BAD_REQUEST, "service must not be null");
} }
@Test
void post_IngestionPipelineWithDeploy_4xx(TestInfo test) {
CreateIngestionPipeline create =
createRequest(test)
.withService(BIGQUERY_REFERENCE)
.withAirflowConfig(new AirflowConfig().withStartDate("2021-11-21").withForceDeploy(true));
assertResponse(
() -> createEntity(create, ADMIN_AUTH_HEADERS),
BAD_REQUEST,
IngestionPipelineDeploymentException.buildMessageByName(create.getName(), "value"));
}
@Test @Test
void post_AirflowWithDifferentService_200_ok(TestInfo test) throws IOException { void post_AirflowWithDifferentService_200_ok(TestInfo test) throws IOException {
EntityReference[] differentServices = {REDSHIFT_REFERENCE, BIGQUERY_REFERENCE}; EntityReference[] differentServices = {REDSHIFT_REFERENCE, BIGQUERY_REFERENCE};