mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-11 10:36:54 +00:00
Fixes #819 - Update pipeline version during PUT and POST operations
This commit is contained in:
parent
23e280ffe6
commit
dacacda2f6
@ -16,6 +16,8 @@
|
|||||||
|
|
||||||
package org.openmetadata.catalog.jdbi3;
|
package org.openmetadata.catalog.jdbi3;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import org.openmetadata.catalog.Entity;
|
||||||
import org.openmetadata.catalog.entity.data.Pipeline;
|
import org.openmetadata.catalog.entity.data.Pipeline;
|
||||||
import org.openmetadata.catalog.entity.data.Task;
|
import org.openmetadata.catalog.entity.data.Task;
|
||||||
import org.openmetadata.catalog.entity.services.PipelineService;
|
import org.openmetadata.catalog.entity.services.PipelineService;
|
||||||
@ -23,11 +25,12 @@ import org.openmetadata.catalog.exception.CatalogExceptionMessage;
|
|||||||
import org.openmetadata.catalog.exception.EntityNotFoundException;
|
import org.openmetadata.catalog.exception.EntityNotFoundException;
|
||||||
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
|
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
|
||||||
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
|
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
|
||||||
import org.openmetadata.catalog.Entity;
|
|
||||||
import org.openmetadata.catalog.resources.pipelines.PipelineResource.PipelineList;
|
|
||||||
import org.openmetadata.catalog.resources.pipelines.PipelineResource;
|
import org.openmetadata.catalog.resources.pipelines.PipelineResource;
|
||||||
|
import org.openmetadata.catalog.resources.pipelines.PipelineResource.PipelineList;
|
||||||
import org.openmetadata.catalog.type.EntityReference;
|
import org.openmetadata.catalog.type.EntityReference;
|
||||||
import org.openmetadata.catalog.type.TagLabel;
|
import org.openmetadata.catalog.type.TagLabel;
|
||||||
|
import org.openmetadata.catalog.util.EntityInterface;
|
||||||
|
import org.openmetadata.catalog.util.EntityUpdater;
|
||||||
import org.openmetadata.catalog.util.EntityUtil;
|
import org.openmetadata.catalog.util.EntityUtil;
|
||||||
import org.openmetadata.catalog.util.EntityUtil.Fields;
|
import org.openmetadata.catalog.util.EntityUtil.Fields;
|
||||||
import org.openmetadata.catalog.util.JsonUtils;
|
import org.openmetadata.catalog.util.JsonUtils;
|
||||||
@ -40,15 +43,13 @@ import org.skife.jdbi.v2.sqlobject.SqlUpdate;
|
|||||||
import org.skife.jdbi.v2.sqlobject.Transaction;
|
import org.skife.jdbi.v2.sqlobject.Transaction;
|
||||||
|
|
||||||
import javax.json.JsonPatch;
|
import javax.json.JsonPatch;
|
||||||
import javax.ws.rs.core.Response;
|
|
||||||
import javax.ws.rs.core.Response.Status;
|
import javax.ws.rs.core.Response.Status;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.GeneralSecurityException;
|
import java.security.GeneralSecurityException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.UUID;
|
||||||
import java.util.stream.Stream;
|
|
||||||
|
|
||||||
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
|
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
|
||||||
|
|
||||||
@ -134,51 +135,26 @@ public abstract class PipelineRepository {
|
|||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
public Pipeline create(Pipeline pipeline, EntityReference service, EntityReference owner) throws IOException {
|
public Pipeline create(Pipeline pipeline, EntityReference service, EntityReference owner) throws IOException {
|
||||||
getService(service); // Validate service
|
|
||||||
return createInternal(pipeline, service, owner);
|
return createInternal(pipeline, service, owner);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
public PutResponse<Pipeline> createOrUpdate(Pipeline updatedPipeline, EntityReference service,
|
public PutResponse<Pipeline> createOrUpdate(Pipeline updated, EntityReference service,
|
||||||
EntityReference newOwner) throws IOException {
|
EntityReference newOwner) throws IOException {
|
||||||
getService(service); // Validate service
|
getService(service); // Validate service
|
||||||
String fqn = getFQN(service, updatedPipeline);
|
String fqn = getFQN(service, updated);
|
||||||
Pipeline storedPipeline = JsonUtils.readValue(pipelineDAO().findByFQN(fqn), Pipeline.class);
|
Pipeline stored = JsonUtils.readValue(pipelineDAO().findByFQN(fqn), Pipeline.class);
|
||||||
if (storedPipeline == null) {
|
if (stored == null) {
|
||||||
return new PutResponse<>(Status.CREATED, createInternal(updatedPipeline, service, newOwner));
|
return new PutResponse<>(Status.CREATED, createInternal(updated, service, newOwner));
|
||||||
}
|
|
||||||
// Update existing pipeline
|
|
||||||
EntityUtil.populateOwner(userDAO(), teamDAO(), newOwner); // Validate new owner
|
|
||||||
if (storedPipeline.getDescription() == null || storedPipeline.getDescription().isEmpty()) {
|
|
||||||
storedPipeline.withDescription(updatedPipeline.getDescription());
|
|
||||||
}
|
|
||||||
//update the display name from source
|
|
||||||
if (updatedPipeline.getDisplayName() != null && !updatedPipeline.getDisplayName().isEmpty()) {
|
|
||||||
storedPipeline.withDisplayName(updatedPipeline.getDisplayName());
|
|
||||||
}
|
}
|
||||||
|
setFields(stored, PIPELINE_UPDATE_FIELDS);
|
||||||
|
updated.setId(stored.getId());
|
||||||
|
validateRelationships(updated, service, newOwner);
|
||||||
|
|
||||||
pipelineDAO().update(storedPipeline.getId().toString(), JsonUtils.pojoToJson(storedPipeline));
|
PipelineUpdater pipelineUpdater = new PipelineUpdater(stored, updated, false);
|
||||||
|
pipelineUpdater.updateAll();
|
||||||
// Update owner relationship
|
pipelineUpdater.store();
|
||||||
setFields(storedPipeline, PIPELINE_UPDATE_FIELDS); // First get the ownership information
|
return new PutResponse<>(Status.OK, updated);
|
||||||
updateOwner(storedPipeline, storedPipeline.getOwner(), newOwner);
|
|
||||||
|
|
||||||
// Service can't be changed in update since service name is part of FQN and
|
|
||||||
// change to a different service will result in a different FQN and creation of a new database under the new service
|
|
||||||
//Airflow lineage backend gets executed per task in a dag. This means we will not a get full picture of the pipeline
|
|
||||||
// in each call. Hence we may create a pipeline and add a single task when one task finishes in a pipeline
|
|
||||||
// in the next task run we may have to update. To take care of this we will merge the tasks
|
|
||||||
|
|
||||||
List<EntityReference> storedTasks = storedPipeline.getTasks();
|
|
||||||
if (updatedPipeline.getTasks() != null) {
|
|
||||||
List<EntityReference> updatedTasks = Stream.concat(storedPipeline.getTasks().stream(),
|
|
||||||
updatedPipeline.getTasks().stream()).collect(Collectors.toList());
|
|
||||||
storedPipeline.setTasks(updatedTasks);
|
|
||||||
}
|
|
||||||
|
|
||||||
storedPipeline.setService(service);
|
|
||||||
updateTaskRelationships(storedPipeline);
|
|
||||||
return new PutResponse<>(Response.Status.OK, storedPipeline);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
@ -248,17 +224,39 @@ public abstract class PipelineRepository {
|
|||||||
|
|
||||||
private Pipeline createInternal(Pipeline pipeline, EntityReference service, EntityReference owner)
|
private Pipeline createInternal(Pipeline pipeline, EntityReference service, EntityReference owner)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
String fqn = service.getName() + "." + pipeline.getName();
|
validateRelationships(pipeline, service, owner);
|
||||||
pipeline.setFullyQualifiedName(fqn);
|
storePipeline(pipeline, false);
|
||||||
|
|
||||||
EntityUtil.populateOwner(userDAO(), teamDAO(), owner); // Validate owner
|
|
||||||
|
|
||||||
pipelineDAO().insert(JsonUtils.pojoToJson(pipeline));
|
|
||||||
setService(pipeline, service);
|
|
||||||
addRelationships(pipeline);
|
addRelationships(pipeline);
|
||||||
return pipeline;
|
return pipeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void validateRelationships(Pipeline pipeline, EntityReference service, EntityReference owner) throws IOException {
|
||||||
|
pipeline.setFullyQualifiedName(getFQN(service, pipeline));
|
||||||
|
EntityUtil.populateOwner(userDAO(), teamDAO(), owner); // Validate owner
|
||||||
|
getService(service);
|
||||||
|
pipeline.setTags(EntityUtil.addDerivedTags(tagDAO(), pipeline.getTags()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void storePipeline(Pipeline pipeline, boolean update) throws JsonProcessingException {
|
||||||
|
// Relationships and fields such as href are derived and not stored as part of json
|
||||||
|
EntityReference owner = pipeline.getOwner();
|
||||||
|
List<TagLabel> tags = pipeline.getTags();
|
||||||
|
EntityReference service = pipeline.getService();
|
||||||
|
List<EntityReference> tasks = pipeline.getTasks();
|
||||||
|
|
||||||
|
// Don't store owner, database, href and tags as JSON. Build it on the fly based on relationships
|
||||||
|
pipeline.withOwner(null).withService(null).withTasks(null).withHref(null).withTags(null);
|
||||||
|
|
||||||
|
if (update) {
|
||||||
|
pipelineDAO().update(pipeline.getId().toString(), JsonUtils.pojoToJson(pipeline));
|
||||||
|
} else {
|
||||||
|
pipelineDAO().insert(JsonUtils.pojoToJson(pipeline));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restore the relationships
|
||||||
|
pipeline.withOwner(owner).withService(service).withTasks(tasks).withTags(tags);
|
||||||
|
}
|
||||||
|
|
||||||
private EntityReference getService(Pipeline pipeline) throws IOException {
|
private EntityReference getService(Pipeline pipeline) throws IOException {
|
||||||
return pipeline == null ? null : getService(EntityUtil.getService(relationshipDAO(), pipeline.getId()));
|
return pipeline == null ? null : getService(EntityUtil.getService(relationshipDAO(), pipeline.getId()));
|
||||||
}
|
}
|
||||||
@ -271,7 +269,7 @@ public abstract class PipelineRepository {
|
|||||||
service.setDescription(serviceInstance.getDescription());
|
service.setDescription(serviceInstance.getDescription());
|
||||||
service.setName(serviceInstance.getName());
|
service.setName(serviceInstance.getName());
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException(String.format("Invalid service type %s for the chart", service.getType()));
|
throw new IllegalArgumentException(String.format("Invalid service type %s for the pipeline", service.getType()));
|
||||||
}
|
}
|
||||||
return service;
|
return service;
|
||||||
}
|
}
|
||||||
@ -286,31 +284,13 @@ public abstract class PipelineRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void patch(Pipeline original, Pipeline updated) throws IOException {
|
private void patch(Pipeline original, Pipeline updated) throws IOException {
|
||||||
String pipelineId = original.getId().toString();
|
// Patch can't make changes to following fields. Ignore the changes
|
||||||
if (!original.getId().equals(updated.getId())) {
|
updated.withFullyQualifiedName(original.getFullyQualifiedName()).withName(original.getName())
|
||||||
throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.PIPELINE, "id"));
|
.withService(original.getService()).withId(original.getId());
|
||||||
}
|
validateRelationships(updated, updated.getService(), updated.getOwner());
|
||||||
if (!original.getName().equals(updated.getName())) {
|
PipelineRepository.PipelineUpdater pipelineUpdater = new PipelineRepository.PipelineUpdater(original, updated, true);
|
||||||
throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.PIPELINE, "name"));
|
pipelineUpdater.updateAll();
|
||||||
}
|
pipelineUpdater.store();
|
||||||
if (updated.getService() == null || !original.getService().getId().equals(updated.getService().getId())) {
|
|
||||||
throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.PIPELINE,
|
|
||||||
"service"));
|
|
||||||
}
|
|
||||||
// Validate new owner
|
|
||||||
EntityReference newOwner = EntityUtil.populateOwner(userDAO(), teamDAO(), updated.getOwner());
|
|
||||||
|
|
||||||
EntityReference newService = updated.getService();
|
|
||||||
// Remove previous tags. Merge tags from the update and the existing tags
|
|
||||||
EntityUtil.removeTags(tagDAO(), original.getFullyQualifiedName());
|
|
||||||
|
|
||||||
updated.setHref(null);
|
|
||||||
updated.setOwner(null);
|
|
||||||
updated.setService(null);
|
|
||||||
pipelineDAO().update(pipelineId, JsonUtils.pojoToJson(updated));
|
|
||||||
updateOwner(updated, original.getOwner(), newOwner);
|
|
||||||
updated.setService(newService);
|
|
||||||
applyTags(updated);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private EntityReference getOwner(Pipeline pipeline) throws IOException {
|
private EntityReference getOwner(Pipeline pipeline) throws IOException {
|
||||||
@ -354,6 +334,8 @@ public abstract class PipelineRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void addRelationships(Pipeline pipeline) throws IOException {
|
private void addRelationships(Pipeline pipeline) throws IOException {
|
||||||
|
setService(pipeline, pipeline.getService());
|
||||||
|
|
||||||
// Add relationship from pipeline to task
|
// Add relationship from pipeline to task
|
||||||
String pipelineId = pipeline.getId().toString();
|
String pipelineId = pipeline.getId().toString();
|
||||||
if (pipeline.getTasks() != null) {
|
if (pipeline.getTasks() != null) {
|
||||||
@ -433,4 +415,97 @@ public abstract class PipelineRepository {
|
|||||||
@SqlUpdate("DELETE FROM pipeline_entity WHERE id = :id")
|
@SqlUpdate("DELETE FROM pipeline_entity WHERE id = :id")
|
||||||
int delete(@Bind("id") String id);
|
int delete(@Bind("id") String id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class PipelineEntityInterface implements EntityInterface {
|
||||||
|
private final Pipeline pipeline;
|
||||||
|
|
||||||
|
PipelineEntityInterface(Pipeline Pipeline) {
|
||||||
|
this.pipeline = Pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UUID getId() {
|
||||||
|
return pipeline.getId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return pipeline.getDescription();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDisplayName() {
|
||||||
|
return pipeline.getDisplayName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EntityReference getOwner() {
|
||||||
|
return pipeline.getOwner();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getFullyQualifiedName() {
|
||||||
|
return pipeline.getFullyQualifiedName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<TagLabel> getTags() {
|
||||||
|
return pipeline.getTags();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDescription(String description) {
|
||||||
|
pipeline.setDescription(description);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDisplayName(String displayName) {
|
||||||
|
pipeline.setDisplayName(displayName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setTags(List<TagLabel> tags) {
|
||||||
|
pipeline.setTags(tags);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles entity updated from PUT and POST operation.
|
||||||
|
*/
|
||||||
|
public class PipelineUpdater extends EntityUpdater {
|
||||||
|
final Pipeline orig;
|
||||||
|
final Pipeline updated;
|
||||||
|
|
||||||
|
public PipelineUpdater(Pipeline orig, Pipeline updated, boolean patchOperation) {
|
||||||
|
super(new PipelineRepository.PipelineEntityInterface(orig), new PipelineRepository.PipelineEntityInterface(updated), patchOperation, relationshipDAO(),
|
||||||
|
tagDAO());
|
||||||
|
this.orig = orig;
|
||||||
|
this.updated = updated;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateAll() throws IOException {
|
||||||
|
super.updateAll();
|
||||||
|
updateTasks();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateTasks() throws IOException {
|
||||||
|
// Airflow lineage backend gets executed per task in a DAG. This means we will not a get full picture of the
|
||||||
|
// pipeline in each call. Hence we may create a pipeline and add a single task when one task finishes in a pipeline
|
||||||
|
// in the next task run we may have to update. To take care of this we will merge the tasks
|
||||||
|
if (updated.getTasks() == null) {
|
||||||
|
updated.setTasks(orig.getTasks());
|
||||||
|
} else {
|
||||||
|
updated.getTasks().addAll(orig.getTasks()); // TODO remove duplicates
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add relationship from pipeline to task
|
||||||
|
updateTaskRelationships(updated);
|
||||||
|
update("tasks", EntityUtil.getIDList(updated.getTasks()), EntityUtil.getIDList(orig.getTasks()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void store() throws IOException {
|
||||||
|
updated.setVersion(getNewVersion(orig.getVersion()));
|
||||||
|
storePipeline(updated, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,7 @@ import org.openmetadata.catalog.type.TagLabel;
|
|||||||
import org.openmetadata.catalog.util.EntityUtil;
|
import org.openmetadata.catalog.util.EntityUtil;
|
||||||
import org.openmetadata.catalog.util.JsonUtils;
|
import org.openmetadata.catalog.util.JsonUtils;
|
||||||
import org.openmetadata.catalog.util.TestUtils;
|
import org.openmetadata.catalog.util.TestUtils;
|
||||||
|
import org.openmetadata.catalog.util.TestUtils.UpdateType;
|
||||||
import org.openmetadata.common.utils.JsonSchemaUtil;
|
import org.openmetadata.common.utils.JsonSchemaUtil;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -68,7 +69,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
|
|||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
|
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
|
||||||
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.readOnlyAttribute;
|
import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE;
|
||||||
|
import static org.openmetadata.catalog.util.TestUtils.UpdateType.NO_CHANGE;
|
||||||
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
|
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
|
||||||
import static org.openmetadata.catalog.util.TestUtils.assertEntityPagination;
|
import static org.openmetadata.catalog.util.TestUtils.assertEntityPagination;
|
||||||
import static org.openmetadata.catalog.util.TestUtils.assertResponse;
|
import static org.openmetadata.catalog.util.TestUtils.assertResponse;
|
||||||
@ -162,7 +164,7 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void post_PipelineWithTASKs_200_ok(TestInfo test) throws HttpResponseException {
|
public void post_PipelineWithTasks_200_ok(TestInfo test) throws HttpResponseException {
|
||||||
createAndCheckPipeline(create(test), TASK_REFERENCES, adminAuthHeaders());
|
createAndCheckPipeline(create(test), TASK_REFERENCES, adminAuthHeaders());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -307,53 +309,50 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
public void put_PipelineUpdateWithNoChange_200(TestInfo test) throws HttpResponseException {
|
public void put_PipelineUpdateWithNoChange_200(TestInfo test) throws HttpResponseException {
|
||||||
// Create a Pipeline with POST
|
// Create a Pipeline with POST
|
||||||
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1);
|
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1);
|
||||||
createAndCheckPipeline(request, adminAuthHeaders());
|
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
|
||||||
|
|
||||||
// Update Pipeline two times successfully with PUT requests
|
// Update Pipeline two times successfully with PUT requests
|
||||||
updateAndCheckPipeline(request, OK, adminAuthHeaders());
|
pipeline = updateAndCheckPipeline(pipeline, request, OK, adminAuthHeaders(), NO_CHANGE);
|
||||||
updateAndCheckPipeline(request, OK, adminAuthHeaders());
|
updateAndCheckPipeline(pipeline, request, OK, adminAuthHeaders(), NO_CHANGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void put_PipelineCreate_200(TestInfo test) throws HttpResponseException {
|
public void put_PipelineCreate_200(TestInfo test) throws HttpResponseException {
|
||||||
// Create a new Pipeline with put
|
// Create a new Pipeline with PUT
|
||||||
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1);
|
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1);
|
||||||
updateAndCheckPipeline(request.withName(test.getDisplayName()).withDescription(null), CREATED, adminAuthHeaders());
|
updateAndCheckPipeline(null, request.withName(test.getDisplayName()).withDescription(null), CREATED,
|
||||||
|
adminAuthHeaders(), NO_CHANGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void put_PipelineCreate_as_owner_200(TestInfo test) throws HttpResponseException {
|
public void put_PipelineCreate_as_owner_200(TestInfo test) throws HttpResponseException {
|
||||||
// Create a new Pipeline with put
|
// Create a new Pipeline with put
|
||||||
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1);
|
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1);
|
||||||
// Add Owner as admin
|
// Add pipeline as admin
|
||||||
createAndCheckPipeline(request, adminAuthHeaders());
|
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
|
||||||
//Update the table as Owner
|
// Update the table as user owner
|
||||||
updateAndCheckPipeline(request.withName(test.getDisplayName()).withDescription(null),
|
updateAndCheckPipeline(pipeline, request, OK, authHeaders(USER1.getEmail()), NO_CHANGE);
|
||||||
CREATED, authHeaders(USER1.getEmail()));
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void put_PipelineNullDescriptionUpdate_200(TestInfo test) throws HttpResponseException {
|
public void put_PipelineNullDescriptionUpdate_200(TestInfo test) throws HttpResponseException {
|
||||||
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null);
|
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null);
|
||||||
createAndCheckPipeline(request, adminAuthHeaders());
|
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
|
||||||
|
|
||||||
// Update null description with a new description
|
// Update null description with a new description
|
||||||
Pipeline db = updateAndCheckPipeline(request.withDisplayName("Pipeline1").
|
pipeline = updateAndCheckPipeline(pipeline, request.withDisplayName("Pipeline1").
|
||||||
withDescription("newDescription"), OK, adminAuthHeaders());
|
withDescription("newDescription"), OK, adminAuthHeaders(), MINOR_UPDATE);
|
||||||
assertEquals("newDescription", db.getDescription());
|
assertEquals("Pipeline1", pipeline.getDisplayName()); // TODO move this to common validate
|
||||||
assertEquals("Pipeline1", db.getDisplayName());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void put_PipelineEmptyDescriptionUpdate_200(TestInfo test) throws HttpResponseException {
|
public void put_PipelineEmptyDescriptionUpdate_200(TestInfo test) throws HttpResponseException {
|
||||||
// Create table with empty description
|
// Create table with empty description
|
||||||
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription("");
|
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription("");
|
||||||
createAndCheckPipeline(request, adminAuthHeaders());
|
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
|
||||||
|
|
||||||
// Update empty description with a new description
|
// Update empty description with a new description
|
||||||
Pipeline db = updateAndCheckPipeline(request.withDescription("newDescription"), OK, adminAuthHeaders());
|
updateAndCheckPipeline(pipeline, request.withDescription("newDescription"), OK, adminAuthHeaders(), MINOR_UPDATE);
|
||||||
assertEquals("newDescription", db.getDescription());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -369,43 +368,40 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
@Test
|
@Test
|
||||||
public void put_PipelineUpdateOwner_200(TestInfo test) throws HttpResponseException {
|
public void put_PipelineUpdateOwner_200(TestInfo test) throws HttpResponseException {
|
||||||
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription("");
|
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription("");
|
||||||
createAndCheckPipeline(request, adminAuthHeaders());
|
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
|
||||||
|
|
||||||
// Change ownership from USER_OWNER1 to TEAM_OWNER1
|
// Change ownership from USER_OWNER1 to TEAM_OWNER1
|
||||||
updateAndCheckPipeline(request.withOwner(TEAM_OWNER1), OK, adminAuthHeaders());
|
pipeline = updateAndCheckPipeline(pipeline, request.withOwner(TEAM_OWNER1), OK, adminAuthHeaders(), MINOR_UPDATE);
|
||||||
|
|
||||||
// Remove ownership
|
// Remove ownership
|
||||||
Pipeline db = updateAndCheckPipeline(request.withOwner(null), OK, adminAuthHeaders());
|
pipeline = updateAndCheckPipeline(pipeline, request.withOwner(null), OK, adminAuthHeaders(), MINOR_UPDATE);
|
||||||
assertNull(db.getOwner());
|
assertNull(pipeline.getOwner());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void put_PipelineTASKsUpdate_200(TestInfo test) throws HttpResponseException {
|
public void put_PipelineTasksUpdate_200(TestInfo test) throws HttpResponseException {
|
||||||
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null);
|
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null);
|
||||||
createAndCheckPipeline(request, adminAuthHeaders());
|
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
|
||||||
|
pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES),
|
||||||
Pipeline pipeline = updateAndCheckPipeline(request
|
OK, adminAuthHeaders(), MINOR_UPDATE);
|
||||||
.withDescription("newDescription").withTasks(TASK_REFERENCES),
|
validatePipelineTasks(pipeline, TASK_REFERENCES); // TODO clean this up
|
||||||
OK, adminAuthHeaders());
|
|
||||||
validatePipelineTasks(pipeline, TASK_REFERENCES);
|
|
||||||
assertEquals("newDescription", pipeline.getDescription());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void put_AddRemovePipelineTASKsUpdate_200(TestInfo test) throws HttpResponseException {
|
public void put_AddRemovePipelineTasksUpdate_200(TestInfo test) throws HttpResponseException {
|
||||||
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null);
|
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null);
|
||||||
createAndCheckPipeline(request, adminAuthHeaders());
|
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
|
||||||
|
|
||||||
Pipeline pipeline = updateAndCheckPipeline(request
|
// Add tasks
|
||||||
.withDescription("newDescription").withTasks(TASK_REFERENCES),
|
pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES),
|
||||||
OK, adminAuthHeaders());
|
OK, adminAuthHeaders(), MINOR_UPDATE);
|
||||||
validatePipelineTasks(pipeline, TASK_REFERENCES);
|
validatePipelineTasks(pipeline, TASK_REFERENCES);
|
||||||
// remove a TASK
|
|
||||||
|
// remove a task
|
||||||
TASK_REFERENCES.remove(0);
|
TASK_REFERENCES.remove(0);
|
||||||
pipeline = updateAndCheckPipeline(request
|
pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES),
|
||||||
.withDescription("newDescription").withTasks(TASK_REFERENCES),
|
OK, adminAuthHeaders(), MINOR_UPDATE);
|
||||||
OK, adminAuthHeaders());
|
|
||||||
validatePipelineTasks(pipeline, TASK_REFERENCES);
|
validatePipelineTasks(pipeline, TASK_REFERENCES);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -447,72 +443,19 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
|
|
||||||
// Add description, owner when previously they were null
|
// Add description, owner when previously they were null
|
||||||
pipeline = patchPipelineAttributesAndCheck(pipeline, "description",
|
pipeline = patchPipelineAttributesAndCheck(pipeline, "description",
|
||||||
TEAM_OWNER1, pipelineTags, adminAuthHeaders());
|
TEAM_OWNER1, pipelineTags, adminAuthHeaders(), MINOR_UPDATE);
|
||||||
pipeline.setOwner(TEAM_OWNER1); // Get rid of href and name returned in the response for owner
|
pipeline.setOwner(TEAM_OWNER1); // Get rid of href and name returned in the response for owner
|
||||||
pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
|
pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
|
||||||
pipelineTags = singletonList(USER_ADDRESS_TAG_LABEL);
|
pipelineTags = singletonList(USER_ADDRESS_TAG_LABEL);
|
||||||
|
|
||||||
// Replace description, tier, owner
|
// Replace description, tier, owner
|
||||||
pipeline = patchPipelineAttributesAndCheck(pipeline, "description1",
|
pipeline = patchPipelineAttributesAndCheck(pipeline, "description1",
|
||||||
USER_OWNER1, pipelineTags, adminAuthHeaders());
|
USER_OWNER1, pipelineTags, adminAuthHeaders(), MINOR_UPDATE);
|
||||||
pipeline.setOwner(USER_OWNER1); // Get rid of href and name returned in the response for owner
|
pipeline.setOwner(USER_OWNER1); // Get rid of href and name returned in the response for owner
|
||||||
pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
|
pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
|
||||||
|
|
||||||
// Remove description, tier, owner
|
// Remove description, tier, owner
|
||||||
patchPipelineAttributesAndCheck(pipeline, null, null, pipelineTags, adminAuthHeaders());
|
patchPipelineAttributesAndCheck(pipeline, null, null, pipelineTags, adminAuthHeaders(), MINOR_UPDATE);
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void patch_PipelineIDChange_400(TestInfo test) throws HttpResponseException, JsonProcessingException {
|
|
||||||
// Ensure Pipeline ID can't be changed using patch
|
|
||||||
Pipeline pipeline = createPipeline(create(test), adminAuthHeaders());
|
|
||||||
UUID pipelineId = pipeline.getId();
|
|
||||||
String pipelineJson = JsonUtils.pojoToJson(pipeline);
|
|
||||||
pipeline.setId(UUID.randomUUID());
|
|
||||||
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
|
|
||||||
patchPipeline(pipelineId, pipelineJson, pipeline, adminAuthHeaders()));
|
|
||||||
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "id"));
|
|
||||||
|
|
||||||
// ID can't be deleted
|
|
||||||
pipeline.setId(null);
|
|
||||||
exception = assertThrows(HttpResponseException.class, () ->
|
|
||||||
patchPipeline(pipelineId, pipelineJson, pipeline, adminAuthHeaders()));
|
|
||||||
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "id"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void patch_PipelineNameChange_400(TestInfo test) throws HttpResponseException, JsonProcessingException {
|
|
||||||
// Ensure Pipeline name can't be changed using patch
|
|
||||||
Pipeline pipeline = createPipeline(create(test), adminAuthHeaders());
|
|
||||||
String pipelineJson = JsonUtils.pojoToJson(pipeline);
|
|
||||||
pipeline.setName("newName");
|
|
||||||
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
|
|
||||||
patchPipeline(pipelineJson, pipeline, adminAuthHeaders()));
|
|
||||||
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "name"));
|
|
||||||
|
|
||||||
// Name can't be removed
|
|
||||||
pipeline.setName(null);
|
|
||||||
exception = assertThrows(HttpResponseException.class, () ->
|
|
||||||
patchPipeline(pipelineJson, pipeline, adminAuthHeaders()));
|
|
||||||
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "name"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void patch_PipelineRemoveService_400(TestInfo test) throws HttpResponseException, JsonProcessingException {
|
|
||||||
// Ensure service corresponding to Pipeline can't be changed by patch operation
|
|
||||||
Pipeline pipeline = createPipeline(create(test), adminAuthHeaders());
|
|
||||||
pipeline.getService().setHref(null); // Remove href from returned response as it is read-only field
|
|
||||||
|
|
||||||
String pipelineJson = JsonUtils.pojoToJson(pipeline);
|
|
||||||
pipeline.setService(PREFECT_REFERENCE);
|
|
||||||
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
|
|
||||||
patchPipeline(pipelineJson, pipeline, adminAuthHeaders()));
|
|
||||||
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "service"));
|
|
||||||
|
|
||||||
// Service relationship can't be removed
|
|
||||||
pipeline.setService(null);
|
|
||||||
exception = assertThrows(HttpResponseException.class, () ->
|
|
||||||
patchPipeline(pipelineJson, pipeline, adminAuthHeaders()));
|
|
||||||
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "service"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO listing tables test:1
|
// TODO listing tables test:1
|
||||||
@ -556,12 +499,17 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
return getAndValidate(pipeline.getId(), create, authHeaders, updatedBy);
|
return getAndValidate(pipeline.getId(), create, authHeaders, updatedBy);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Pipeline updateAndCheckPipeline(CreatePipeline create,
|
public static Pipeline updateAndCheckPipeline(Pipeline before, CreatePipeline create, Status status,
|
||||||
Status status,
|
Map<String, String> authHeaders, UpdateType updateType)
|
||||||
Map<String, String> authHeaders) throws HttpResponseException {
|
throws HttpResponseException {
|
||||||
String updatedBy = TestUtils.getPrincipal(authHeaders);
|
String updatedBy = TestUtils.getPrincipal(authHeaders);
|
||||||
Pipeline updatedPipeline = updatePipeline(create, status, authHeaders);
|
Pipeline updatedPipeline = updatePipeline(create, status, authHeaders);
|
||||||
validatePipeline(updatedPipeline, create.getDescription(), create.getOwner(), create.getService(), updatedBy);
|
validatePipeline(updatedPipeline, create.getDescription(), create.getOwner(), create.getService(), updatedBy);
|
||||||
|
if (before == null) {
|
||||||
|
assertEquals(0.1, updatedPipeline.getVersion()); // First version created
|
||||||
|
} else {
|
||||||
|
TestUtils.validateUpdate(before.getVersion(), updatedPipeline.getVersion(), updateType);
|
||||||
|
}
|
||||||
|
|
||||||
// GET the newly updated Pipeline and validate
|
// GET the newly updated Pipeline and validate
|
||||||
return getAndValidate(updatedPipeline.getId(), create, authHeaders, updatedBy);
|
return getAndValidate(updatedPipeline.getId(), create, authHeaders, updatedBy);
|
||||||
@ -689,38 +637,39 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
|
|
||||||
private static void validatePipelineTasks(Pipeline pipeline, List<EntityReference> tasks) {
|
private static void validatePipelineTasks(Pipeline pipeline, List<EntityReference> tasks) {
|
||||||
if (tasks != null) {
|
if (tasks != null) {
|
||||||
List<UUID> expectedTASKReferences = new ArrayList<>();
|
List<UUID> expectedTaskReferences = new ArrayList<>();
|
||||||
for (EntityReference task: tasks) {
|
for (EntityReference task: tasks) {
|
||||||
expectedTASKReferences.add(task.getId());
|
expectedTaskReferences.add(task.getId());
|
||||||
}
|
}
|
||||||
List<UUID> actualTaskReferences = new ArrayList<>();
|
List<UUID> actualTaskReferences = new ArrayList<>();
|
||||||
for (EntityReference task: pipeline.getTasks()) {
|
for (EntityReference task: pipeline.getTasks()) {
|
||||||
TestUtils.validateEntityReference(task);
|
TestUtils.validateEntityReference(task);
|
||||||
actualTaskReferences.add(task.getId());
|
actualTaskReferences.add(task.getId());
|
||||||
}
|
}
|
||||||
assertTrue(actualTaskReferences.containsAll(expectedTASKReferences));
|
assertTrue(actualTaskReferences.containsAll(expectedTaskReferences));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Pipeline patchPipelineAttributesAndCheck(Pipeline pipeline, String newDescription,
|
private Pipeline patchPipelineAttributesAndCheck(Pipeline before, String newDescription,
|
||||||
EntityReference newOwner, List<TagLabel> tags,
|
EntityReference newOwner, List<TagLabel> tags,
|
||||||
Map<String, String> authHeaders)
|
Map<String, String> authHeaders, UpdateType updateType)
|
||||||
throws JsonProcessingException, HttpResponseException {
|
throws JsonProcessingException, HttpResponseException {
|
||||||
String updatedBy = TestUtils.getPrincipal(authHeaders);
|
String updatedBy = TestUtils.getPrincipal(authHeaders);
|
||||||
String pipelineJson = JsonUtils.pojoToJson(pipeline);
|
String pipelineJson = JsonUtils.pojoToJson(before);
|
||||||
|
|
||||||
// Update the table attributes
|
// Update the table attributes
|
||||||
pipeline.setDescription(newDescription);
|
before.setDescription(newDescription);
|
||||||
pipeline.setOwner(newOwner);
|
before.setOwner(newOwner);
|
||||||
pipeline.setTags(tags);
|
before.setTags(tags);
|
||||||
|
|
||||||
// Validate information returned in patch response has the updates
|
// Validate information returned in patch response has the updates
|
||||||
Pipeline updatedPipeline = patchPipeline(pipelineJson, pipeline, authHeaders);
|
Pipeline updatedPipeline = patchPipeline(pipelineJson, before, authHeaders);
|
||||||
validatePipeline(updatedPipeline, pipeline.getDescription(), newOwner, null, tags,
|
validatePipeline(updatedPipeline, before.getDescription(), newOwner, null, tags,
|
||||||
pipeline.getTasks(), updatedBy);
|
before.getTasks(), updatedBy);
|
||||||
|
TestUtils.validateUpdate(before.getVersion(), updatedPipeline.getVersion(), updateType);
|
||||||
|
|
||||||
// GET the table and Validate information returned
|
// GET the table and Validate information returned
|
||||||
Pipeline getPipeline = getPipeline(pipeline.getId(), "service,owner", authHeaders);
|
Pipeline getPipeline = getPipeline(before.getId(), "service,owner", authHeaders);
|
||||||
validatePipeline(updatedPipeline, getPipeline.getDescription(), newOwner, null, tags,
|
validatePipeline(updatedPipeline, getPipeline.getDescription(), newOwner, null, tags,
|
||||||
getPipeline.getTasks(), updatedBy);
|
getPipeline.getTasks(), updatedBy);
|
||||||
return updatedPipeline;
|
return updatedPipeline;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user