Fix #1215: Pipeline metadata is not being updated through PUT apis (#1216)

This commit is contained in:
Sriharsha Chintalapani 2021-11-16 12:44:19 -08:00 committed by GitHub
parent 4646c0fcf3
commit 940e51d102
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 13 deletions

View File

@ -307,6 +307,11 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
@Override
public void entitySpecificUpdate() throws IOException {
updateTasks(original.getEntity(), updated.getEntity());
recordChange("pipelineUrl", original.getEntity().getPipelineUrl(), updated.getEntity().getPipelineUrl());
recordChange("concurrency", original.getEntity().getConcurrency(), updated.getEntity().getConcurrency());
recordChange("pipelineLocation", original.getEntity().getPipelineLocation(),
updated.getEntity().getPipelineLocation());
recordChange("startDate", original.getEntity().getStartDate(), updated.getEntity().getStartDate());
}
private void updateTasks(Pipeline origPipeline, Pipeline updatedPipeline) throws JsonProcessingException {

View File

@ -297,8 +297,7 @@ public class PipelineResource {
public Response createOrUpdate(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Valid CreatePipeline create) throws IOException, ParseException {
Pipeline pipeline = getPipeline(securityContext, create).withConcurrency(create.getConcurrency())
.withStartDate(create.getStartDate());
Pipeline pipeline = getPipeline(securityContext, create);
PutResponse<Pipeline> response = dao.createOrUpdate(uriInfo, pipeline);
addHref(uriInfo, response.getEntity());
return response.toResponse();
@ -354,11 +353,14 @@ public class PipelineResource {
private Pipeline getPipeline(SecurityContext securityContext, CreatePipeline create) {
return new Pipeline().withId(UUID.randomUUID()).withName(create.getName())
.withDisplayName(create.getDisplayName())
.withDescription(create.getDescription()).withService(create.getService()).withTasks(create.getTasks())
.withPipelineUrl(create.getPipelineUrl()).withTags(create.getTags())
.withOwner(create.getOwner())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withDisplayName(create.getDisplayName())
.withDescription(create.getDescription()).withService(create.getService()).withTasks(create.getTasks())
.withPipelineUrl(create.getPipelineUrl()).withTags(create.getTags())
.withConcurrency(create.getConcurrency())
.withStartDate(create.getStartDate())
.withPipelineLocation(create.getPipelineLocation())
.withOwner(create.getOwner())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
}
}

View File

@ -275,17 +275,22 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline> {
}
@Test
public void put_AddRemovePipelineTasksUpdate_200(TestInfo test) throws IOException {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null).withTasks(null);
public void put_AddRemovePipelineTasksUpdate_200(TestInfo test) throws IOException, URISyntaxException {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null)
.withTasks(null).withConcurrency(null).withPipelineUrl(new URI("http://localhost:8080"));
Pipeline pipeline = createAndCheckEntity(request, adminAuthHeaders());
// Add tasks and description
ChangeDescription change = getChangeDescription(pipeline.getVersion());
change.getFieldsAdded().add(new FieldChange().withName("description").withNewValue("newDescription"));
change.getFieldsAdded().add(new FieldChange().withName("tasks").withNewValue(TASKS));
pipeline = updateAndCheckEntity(request.withDescription("newDescription").withTasks(TASKS),
OK, adminAuthHeaders(), MINOR_UPDATE, change);
change.getFieldsAdded().add(new FieldChange().withName("concurrency")
.withNewValue(5));
change.getFieldsUpdated().add(new FieldChange().withName("pipelineUrl")
.withNewValue("https://airflow.open-metadata.org").withOldValue("http://localhost:8080"));
pipeline = updateAndCheckEntity(request.withDescription("newDescription").withTasks(TASKS)
.withConcurrency(5).withPipelineUrl(new URI("https://airflow.open-metadata.org")),
OK, adminAuthHeaders(), MINOR_UPDATE, change);
// TODO update this once task removal is figured out
// remove a task
// TASKS.remove(0);