From dacacda2f65f812885ea17f610fe70f6cd9efaa0 Mon Sep 17 00:00:00 2001 From: sureshms Date: Sun, 17 Oct 2021 11:36:35 -0700 Subject: [PATCH] Fixes #819 - Update pipeline version during PUT and POST operations --- .../catalog/jdbi3/PipelineRepository.java | 225 ++++++++++++------ .../pipelines/PipelineResourceTest.java | 179 +++++--------- 2 files changed, 214 insertions(+), 190 deletions(-) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java index a0217cc03a4..7165f826dd5 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java @@ -16,6 +16,8 @@ package org.openmetadata.catalog.jdbi3; +import com.fasterxml.jackson.core.JsonProcessingException; +import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.entity.data.Pipeline; import org.openmetadata.catalog.entity.data.Task; import org.openmetadata.catalog.entity.services.PipelineService; @@ -23,11 +25,12 @@ import org.openmetadata.catalog.exception.CatalogExceptionMessage; import org.openmetadata.catalog.exception.EntityNotFoundException; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO; -import org.openmetadata.catalog.Entity; -import org.openmetadata.catalog.resources.pipelines.PipelineResource.PipelineList; import org.openmetadata.catalog.resources.pipelines.PipelineResource; +import org.openmetadata.catalog.resources.pipelines.PipelineResource.PipelineList; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.TagLabel; +import org.openmetadata.catalog.util.EntityInterface; +import org.openmetadata.catalog.util.EntityUpdater; import org.openmetadata.catalog.util.EntityUtil; import org.openmetadata.catalog.util.EntityUtil.Fields; import org.openmetadata.catalog.util.JsonUtils; @@ -40,15 +43,13 @@ import org.skife.jdbi.v2.sqlobject.SqlUpdate; import org.skife.jdbi.v2.sqlobject.Transaction; import javax.json.JsonPatch; -import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import java.io.IOException; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.UUID; import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; @@ -134,51 +135,26 @@ public abstract class PipelineRepository { @Transaction public Pipeline create(Pipeline pipeline, EntityReference service, EntityReference owner) throws IOException { - getService(service); // Validate service return createInternal(pipeline, service, owner); } @Transaction - public PutResponse createOrUpdate(Pipeline updatedPipeline, EntityReference service, + public PutResponse createOrUpdate(Pipeline updated, EntityReference service, EntityReference newOwner) throws IOException { getService(service); // Validate service - String fqn = getFQN(service, updatedPipeline); - Pipeline storedPipeline = JsonUtils.readValue(pipelineDAO().findByFQN(fqn), Pipeline.class); - if (storedPipeline == null) { - return new PutResponse<>(Status.CREATED, createInternal(updatedPipeline, service, newOwner)); - } - // Update existing pipeline - EntityUtil.populateOwner(userDAO(), teamDAO(), newOwner); // Validate new owner - if (storedPipeline.getDescription() == null || storedPipeline.getDescription().isEmpty()) { - storedPipeline.withDescription(updatedPipeline.getDescription()); - } - //update the display name from source - if (updatedPipeline.getDisplayName() != null && !updatedPipeline.getDisplayName().isEmpty()) { - storedPipeline.withDisplayName(updatedPipeline.getDisplayName()); + String fqn = getFQN(service, updated); + Pipeline stored = JsonUtils.readValue(pipelineDAO().findByFQN(fqn), Pipeline.class); + if (stored == null) { + return new PutResponse<>(Status.CREATED, createInternal(updated, service, newOwner)); } + setFields(stored, PIPELINE_UPDATE_FIELDS); + updated.setId(stored.getId()); + validateRelationships(updated, service, newOwner); - pipelineDAO().update(storedPipeline.getId().toString(), JsonUtils.pojoToJson(storedPipeline)); - - // Update owner relationship - setFields(storedPipeline, PIPELINE_UPDATE_FIELDS); // First get the ownership information - updateOwner(storedPipeline, storedPipeline.getOwner(), newOwner); - - // Service can't be changed in update since service name is part of FQN and - // change to a different service will result in a different FQN and creation of a new database under the new service - //Airflow lineage backend gets executed per task in a dag. This means we will not a get full picture of the pipeline - // in each call. Hence we may create a pipeline and add a single task when one task finishes in a pipeline - // in the next task run we may have to update. To take care of this we will merge the tasks - - List storedTasks = storedPipeline.getTasks(); - if (updatedPipeline.getTasks() != null) { - List updatedTasks = Stream.concat(storedPipeline.getTasks().stream(), - updatedPipeline.getTasks().stream()).collect(Collectors.toList()); - storedPipeline.setTasks(updatedTasks); - } - - storedPipeline.setService(service); - updateTaskRelationships(storedPipeline); - return new PutResponse<>(Response.Status.OK, storedPipeline); + PipelineUpdater pipelineUpdater = new PipelineUpdater(stored, updated, false); + pipelineUpdater.updateAll(); + pipelineUpdater.store(); + return new PutResponse<>(Status.OK, updated); } @Transaction @@ -248,17 +224,39 @@ public abstract class PipelineRepository { private Pipeline createInternal(Pipeline pipeline, EntityReference service, EntityReference owner) throws IOException { - String fqn = service.getName() + "." + pipeline.getName(); - pipeline.setFullyQualifiedName(fqn); - - EntityUtil.populateOwner(userDAO(), teamDAO(), owner); // Validate owner - - pipelineDAO().insert(JsonUtils.pojoToJson(pipeline)); - setService(pipeline, service); + validateRelationships(pipeline, service, owner); + storePipeline(pipeline, false); addRelationships(pipeline); return pipeline; } + private void validateRelationships(Pipeline pipeline, EntityReference service, EntityReference owner) throws IOException { + pipeline.setFullyQualifiedName(getFQN(service, pipeline)); + EntityUtil.populateOwner(userDAO(), teamDAO(), owner); // Validate owner + getService(service); + pipeline.setTags(EntityUtil.addDerivedTags(tagDAO(), pipeline.getTags())); + } + + private void storePipeline(Pipeline pipeline, boolean update) throws JsonProcessingException { + // Relationships and fields such as href are derived and not stored as part of json + EntityReference owner = pipeline.getOwner(); + List tags = pipeline.getTags(); + EntityReference service = pipeline.getService(); + List tasks = pipeline.getTasks(); + + // Don't store owner, database, href and tags as JSON. Build it on the fly based on relationships + pipeline.withOwner(null).withService(null).withTasks(null).withHref(null).withTags(null); + + if (update) { + pipelineDAO().update(pipeline.getId().toString(), JsonUtils.pojoToJson(pipeline)); + } else { + pipelineDAO().insert(JsonUtils.pojoToJson(pipeline)); + } + + // Restore the relationships + pipeline.withOwner(owner).withService(service).withTasks(tasks).withTags(tags); + } + private EntityReference getService(Pipeline pipeline) throws IOException { return pipeline == null ? null : getService(EntityUtil.getService(relationshipDAO(), pipeline.getId())); } @@ -271,7 +269,7 @@ public abstract class PipelineRepository { service.setDescription(serviceInstance.getDescription()); service.setName(serviceInstance.getName()); } else { - throw new IllegalArgumentException(String.format("Invalid service type %s for the chart", service.getType())); + throw new IllegalArgumentException(String.format("Invalid service type %s for the pipeline", service.getType())); } return service; } @@ -286,31 +284,13 @@ public abstract class PipelineRepository { } private void patch(Pipeline original, Pipeline updated) throws IOException { - String pipelineId = original.getId().toString(); - if (!original.getId().equals(updated.getId())) { - throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.PIPELINE, "id")); - } - if (!original.getName().equals(updated.getName())) { - throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.PIPELINE, "name")); - } - if (updated.getService() == null || !original.getService().getId().equals(updated.getService().getId())) { - throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.PIPELINE, - "service")); - } - // Validate new owner - EntityReference newOwner = EntityUtil.populateOwner(userDAO(), teamDAO(), updated.getOwner()); - - EntityReference newService = updated.getService(); - // Remove previous tags. Merge tags from the update and the existing tags - EntityUtil.removeTags(tagDAO(), original.getFullyQualifiedName()); - - updated.setHref(null); - updated.setOwner(null); - updated.setService(null); - pipelineDAO().update(pipelineId, JsonUtils.pojoToJson(updated)); - updateOwner(updated, original.getOwner(), newOwner); - updated.setService(newService); - applyTags(updated); + // Patch can't make changes to following fields. Ignore the changes + updated.withFullyQualifiedName(original.getFullyQualifiedName()).withName(original.getName()) + .withService(original.getService()).withId(original.getId()); + validateRelationships(updated, updated.getService(), updated.getOwner()); + PipelineRepository.PipelineUpdater pipelineUpdater = new PipelineRepository.PipelineUpdater(original, updated, true); + pipelineUpdater.updateAll(); + pipelineUpdater.store(); } private EntityReference getOwner(Pipeline pipeline) throws IOException { @@ -354,6 +334,8 @@ public abstract class PipelineRepository { } private void addRelationships(Pipeline pipeline) throws IOException { + setService(pipeline, pipeline.getService()); + // Add relationship from pipeline to task String pipelineId = pipeline.getId().toString(); if (pipeline.getTasks() != null) { @@ -433,4 +415,97 @@ public abstract class PipelineRepository { @SqlUpdate("DELETE FROM pipeline_entity WHERE id = :id") int delete(@Bind("id") String id); } + + static class PipelineEntityInterface implements EntityInterface { + private final Pipeline pipeline; + + PipelineEntityInterface(Pipeline Pipeline) { + this.pipeline = Pipeline; + } + + @Override + public UUID getId() { + return pipeline.getId(); + } + + @Override + public String getDescription() { + return pipeline.getDescription(); + } + + @Override + public String getDisplayName() { + return pipeline.getDisplayName(); + } + + @Override + public EntityReference getOwner() { + return pipeline.getOwner(); + } + + @Override + public String getFullyQualifiedName() { + return pipeline.getFullyQualifiedName(); + } + + @Override + public List getTags() { + return pipeline.getTags(); + } + + @Override + public void setDescription(String description) { + pipeline.setDescription(description); + } + + @Override + public void setDisplayName(String displayName) { + pipeline.setDisplayName(displayName); + } + + @Override + public void setTags(List tags) { + pipeline.setTags(tags); + } + } + + /** + * Handles entity updated from PUT and POST operation. + */ + public class PipelineUpdater extends EntityUpdater { + final Pipeline orig; + final Pipeline updated; + + public PipelineUpdater(Pipeline orig, Pipeline updated, boolean patchOperation) { + super(new PipelineRepository.PipelineEntityInterface(orig), new PipelineRepository.PipelineEntityInterface(updated), patchOperation, relationshipDAO(), + tagDAO()); + this.orig = orig; + this.updated = updated; + } + + public void updateAll() throws IOException { + super.updateAll(); + updateTasks(); + } + + private void updateTasks() throws IOException { + // Airflow lineage backend gets executed per task in a DAG. This means we will not a get full picture of the + // pipeline in each call. Hence we may create a pipeline and add a single task when one task finishes in a pipeline + // in the next task run we may have to update. To take care of this we will merge the tasks + if (updated.getTasks() == null) { + updated.setTasks(orig.getTasks()); + } else { + updated.getTasks().addAll(orig.getTasks()); // TODO remove duplicates + } + + // Add relationship from pipeline to task + updateTaskRelationships(updated); + update("tasks", EntityUtil.getIDList(updated.getTasks()), EntityUtil.getIDList(orig.getTasks())); + } + + public void store() throws IOException { + updated.setVersion(getNewVersion(orig.getVersion())); + storePipeline(updated, true); + } + } } diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java index a17e5fbdbd8..10e5e68a6fa 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java @@ -42,6 +42,7 @@ import org.openmetadata.catalog.type.TagLabel; import org.openmetadata.catalog.util.EntityUtil; import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.TestUtils; +import org.openmetadata.catalog.util.TestUtils.UpdateType; import org.openmetadata.common.utils.JsonSchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,8 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; -import static org.openmetadata.catalog.exception.CatalogExceptionMessage.readOnlyAttribute; +import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE; +import static org.openmetadata.catalog.util.TestUtils.UpdateType.NO_CHANGE; import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders; import static org.openmetadata.catalog.util.TestUtils.assertEntityPagination; import static org.openmetadata.catalog.util.TestUtils.assertResponse; @@ -162,7 +164,7 @@ public class PipelineResourceTest extends CatalogApplicationTest { } @Test - public void post_PipelineWithTASKs_200_ok(TestInfo test) throws HttpResponseException { + public void post_PipelineWithTasks_200_ok(TestInfo test) throws HttpResponseException { createAndCheckPipeline(create(test), TASK_REFERENCES, adminAuthHeaders()); } @@ -307,53 +309,50 @@ public class PipelineResourceTest extends CatalogApplicationTest { public void put_PipelineUpdateWithNoChange_200(TestInfo test) throws HttpResponseException { // Create a Pipeline with POST CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1); - createAndCheckPipeline(request, adminAuthHeaders()); + Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); // Update Pipeline two times successfully with PUT requests - updateAndCheckPipeline(request, OK, adminAuthHeaders()); - updateAndCheckPipeline(request, OK, adminAuthHeaders()); + pipeline = updateAndCheckPipeline(pipeline, request, OK, adminAuthHeaders(), NO_CHANGE); + updateAndCheckPipeline(pipeline, request, OK, adminAuthHeaders(), NO_CHANGE); } @Test public void put_PipelineCreate_200(TestInfo test) throws HttpResponseException { - // Create a new Pipeline with put + // Create a new Pipeline with PUT CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1); - updateAndCheckPipeline(request.withName(test.getDisplayName()).withDescription(null), CREATED, adminAuthHeaders()); + updateAndCheckPipeline(null, request.withName(test.getDisplayName()).withDescription(null), CREATED, + adminAuthHeaders(), NO_CHANGE); } @Test public void put_PipelineCreate_as_owner_200(TestInfo test) throws HttpResponseException { // Create a new Pipeline with put CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1); - // Add Owner as admin - createAndCheckPipeline(request, adminAuthHeaders()); - //Update the table as Owner - updateAndCheckPipeline(request.withName(test.getDisplayName()).withDescription(null), - CREATED, authHeaders(USER1.getEmail())); - + // Add pipeline as admin + Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); + // Update the table as user owner + updateAndCheckPipeline(pipeline, request, OK, authHeaders(USER1.getEmail()), NO_CHANGE); } @Test public void put_PipelineNullDescriptionUpdate_200(TestInfo test) throws HttpResponseException { CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null); - createAndCheckPipeline(request, adminAuthHeaders()); + Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); // Update null description with a new description - Pipeline db = updateAndCheckPipeline(request.withDisplayName("Pipeline1"). - withDescription("newDescription"), OK, adminAuthHeaders()); - assertEquals("newDescription", db.getDescription()); - assertEquals("Pipeline1", db.getDisplayName()); + pipeline = updateAndCheckPipeline(pipeline, request.withDisplayName("Pipeline1"). + withDescription("newDescription"), OK, adminAuthHeaders(), MINOR_UPDATE); + assertEquals("Pipeline1", pipeline.getDisplayName()); // TODO move this to common validate } @Test public void put_PipelineEmptyDescriptionUpdate_200(TestInfo test) throws HttpResponseException { // Create table with empty description CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(""); - createAndCheckPipeline(request, adminAuthHeaders()); + Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); // Update empty description with a new description - Pipeline db = updateAndCheckPipeline(request.withDescription("newDescription"), OK, adminAuthHeaders()); - assertEquals("newDescription", db.getDescription()); + updateAndCheckPipeline(pipeline, request.withDescription("newDescription"), OK, adminAuthHeaders(), MINOR_UPDATE); } @Test @@ -369,43 +368,40 @@ public class PipelineResourceTest extends CatalogApplicationTest { @Test public void put_PipelineUpdateOwner_200(TestInfo test) throws HttpResponseException { CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(""); - createAndCheckPipeline(request, adminAuthHeaders()); + Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); // Change ownership from USER_OWNER1 to TEAM_OWNER1 - updateAndCheckPipeline(request.withOwner(TEAM_OWNER1), OK, adminAuthHeaders()); + pipeline = updateAndCheckPipeline(pipeline, request.withOwner(TEAM_OWNER1), OK, adminAuthHeaders(), MINOR_UPDATE); // Remove ownership - Pipeline db = updateAndCheckPipeline(request.withOwner(null), OK, adminAuthHeaders()); - assertNull(db.getOwner()); + pipeline = updateAndCheckPipeline(pipeline, request.withOwner(null), OK, adminAuthHeaders(), MINOR_UPDATE); + assertNull(pipeline.getOwner()); } @Test - public void put_PipelineTASKsUpdate_200(TestInfo test) throws HttpResponseException { + public void put_PipelineTasksUpdate_200(TestInfo test) throws HttpResponseException { CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null); - createAndCheckPipeline(request, adminAuthHeaders()); - - Pipeline pipeline = updateAndCheckPipeline(request - .withDescription("newDescription").withTasks(TASK_REFERENCES), - OK, adminAuthHeaders()); - validatePipelineTasks(pipeline, TASK_REFERENCES); - assertEquals("newDescription", pipeline.getDescription()); + Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); + pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES), + OK, adminAuthHeaders(), MINOR_UPDATE); + validatePipelineTasks(pipeline, TASK_REFERENCES); // TODO clean this up } @Test - public void put_AddRemovePipelineTASKsUpdate_200(TestInfo test) throws HttpResponseException { + public void put_AddRemovePipelineTasksUpdate_200(TestInfo test) throws HttpResponseException { CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null); - createAndCheckPipeline(request, adminAuthHeaders()); + Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); - Pipeline pipeline = updateAndCheckPipeline(request - .withDescription("newDescription").withTasks(TASK_REFERENCES), - OK, adminAuthHeaders()); + // Add tasks + pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES), + OK, adminAuthHeaders(), MINOR_UPDATE); validatePipelineTasks(pipeline, TASK_REFERENCES); - // remove a TASK + + // remove a task TASK_REFERENCES.remove(0); - pipeline = updateAndCheckPipeline(request - .withDescription("newDescription").withTasks(TASK_REFERENCES), - OK, adminAuthHeaders()); + pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES), + OK, adminAuthHeaders(), MINOR_UPDATE); validatePipelineTasks(pipeline, TASK_REFERENCES); } @@ -447,72 +443,19 @@ public class PipelineResourceTest extends CatalogApplicationTest { // Add description, owner when previously they were null pipeline = patchPipelineAttributesAndCheck(pipeline, "description", - TEAM_OWNER1, pipelineTags, adminAuthHeaders()); + TEAM_OWNER1, pipelineTags, adminAuthHeaders(), MINOR_UPDATE); pipeline.setOwner(TEAM_OWNER1); // Get rid of href and name returned in the response for owner pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service pipelineTags = singletonList(USER_ADDRESS_TAG_LABEL); + // Replace description, tier, owner pipeline = patchPipelineAttributesAndCheck(pipeline, "description1", - USER_OWNER1, pipelineTags, adminAuthHeaders()); + USER_OWNER1, pipelineTags, adminAuthHeaders(), MINOR_UPDATE); pipeline.setOwner(USER_OWNER1); // Get rid of href and name returned in the response for owner pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service // Remove description, tier, owner - patchPipelineAttributesAndCheck(pipeline, null, null, pipelineTags, adminAuthHeaders()); - } - - @Test - public void patch_PipelineIDChange_400(TestInfo test) throws HttpResponseException, JsonProcessingException { - // Ensure Pipeline ID can't be changed using patch - Pipeline pipeline = createPipeline(create(test), adminAuthHeaders()); - UUID pipelineId = pipeline.getId(); - String pipelineJson = JsonUtils.pojoToJson(pipeline); - pipeline.setId(UUID.randomUUID()); - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - patchPipeline(pipelineId, pipelineJson, pipeline, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "id")); - - // ID can't be deleted - pipeline.setId(null); - exception = assertThrows(HttpResponseException.class, () -> - patchPipeline(pipelineId, pipelineJson, pipeline, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "id")); - } - - @Test - public void patch_PipelineNameChange_400(TestInfo test) throws HttpResponseException, JsonProcessingException { - // Ensure Pipeline name can't be changed using patch - Pipeline pipeline = createPipeline(create(test), adminAuthHeaders()); - String pipelineJson = JsonUtils.pojoToJson(pipeline); - pipeline.setName("newName"); - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - patchPipeline(pipelineJson, pipeline, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "name")); - - // Name can't be removed - pipeline.setName(null); - exception = assertThrows(HttpResponseException.class, () -> - patchPipeline(pipelineJson, pipeline, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "name")); - } - - @Test - public void patch_PipelineRemoveService_400(TestInfo test) throws HttpResponseException, JsonProcessingException { - // Ensure service corresponding to Pipeline can't be changed by patch operation - Pipeline pipeline = createPipeline(create(test), adminAuthHeaders()); - pipeline.getService().setHref(null); // Remove href from returned response as it is read-only field - - String pipelineJson = JsonUtils.pojoToJson(pipeline); - pipeline.setService(PREFECT_REFERENCE); - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - patchPipeline(pipelineJson, pipeline, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "service")); - - // Service relationship can't be removed - pipeline.setService(null); - exception = assertThrows(HttpResponseException.class, () -> - patchPipeline(pipelineJson, pipeline, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "service")); + patchPipelineAttributesAndCheck(pipeline, null, null, pipelineTags, adminAuthHeaders(), MINOR_UPDATE); } // TODO listing tables test:1 @@ -556,12 +499,17 @@ public class PipelineResourceTest extends CatalogApplicationTest { return getAndValidate(pipeline.getId(), create, authHeaders, updatedBy); } - public static Pipeline updateAndCheckPipeline(CreatePipeline create, - Status status, - Map authHeaders) throws HttpResponseException { + public static Pipeline updateAndCheckPipeline(Pipeline before, CreatePipeline create, Status status, + Map authHeaders, UpdateType updateType) + throws HttpResponseException { String updatedBy = TestUtils.getPrincipal(authHeaders); Pipeline updatedPipeline = updatePipeline(create, status, authHeaders); validatePipeline(updatedPipeline, create.getDescription(), create.getOwner(), create.getService(), updatedBy); + if (before == null) { + assertEquals(0.1, updatedPipeline.getVersion()); // First version created + } else { + TestUtils.validateUpdate(before.getVersion(), updatedPipeline.getVersion(), updateType); + } // GET the newly updated Pipeline and validate return getAndValidate(updatedPipeline.getId(), create, authHeaders, updatedBy); @@ -689,38 +637,39 @@ public class PipelineResourceTest extends CatalogApplicationTest { private static void validatePipelineTasks(Pipeline pipeline, List tasks) { if (tasks != null) { - List expectedTASKReferences = new ArrayList<>(); + List expectedTaskReferences = new ArrayList<>(); for (EntityReference task: tasks) { - expectedTASKReferences.add(task.getId()); + expectedTaskReferences.add(task.getId()); } List actualTaskReferences = new ArrayList<>(); for (EntityReference task: pipeline.getTasks()) { TestUtils.validateEntityReference(task); actualTaskReferences.add(task.getId()); } - assertTrue(actualTaskReferences.containsAll(expectedTASKReferences)); + assertTrue(actualTaskReferences.containsAll(expectedTaskReferences)); } } - private Pipeline patchPipelineAttributesAndCheck(Pipeline pipeline, String newDescription, + private Pipeline patchPipelineAttributesAndCheck(Pipeline before, String newDescription, EntityReference newOwner, List tags, - Map authHeaders) + Map authHeaders, UpdateType updateType) throws JsonProcessingException, HttpResponseException { String updatedBy = TestUtils.getPrincipal(authHeaders); - String pipelineJson = JsonUtils.pojoToJson(pipeline); + String pipelineJson = JsonUtils.pojoToJson(before); // Update the table attributes - pipeline.setDescription(newDescription); - pipeline.setOwner(newOwner); - pipeline.setTags(tags); + before.setDescription(newDescription); + before.setOwner(newOwner); + before.setTags(tags); // Validate information returned in patch response has the updates - Pipeline updatedPipeline = patchPipeline(pipelineJson, pipeline, authHeaders); - validatePipeline(updatedPipeline, pipeline.getDescription(), newOwner, null, tags, - pipeline.getTasks(), updatedBy); + Pipeline updatedPipeline = patchPipeline(pipelineJson, before, authHeaders); + validatePipeline(updatedPipeline, before.getDescription(), newOwner, null, tags, + before.getTasks(), updatedBy); + TestUtils.validateUpdate(before.getVersion(), updatedPipeline.getVersion(), updateType); // GET the table and Validate information returned - Pipeline getPipeline = getPipeline(pipeline.getId(), "service,owner", authHeaders); + Pipeline getPipeline = getPipeline(before.getId(), "service,owner", authHeaders); validatePipeline(updatedPipeline, getPipeline.getDescription(), newOwner, null, tags, getPipeline.getTasks(), updatedBy); return updatedPipeline;