diff --git a/bootstrap/sql/mysql/v001__create_db_connection_info.sql b/bootstrap/sql/mysql/v001__create_db_connection_info.sql index 8c2e2c919c6..49e52834cde 100644 --- a/bootstrap/sql/mysql/v001__create_db_connection_info.sql +++ b/bootstrap/sql/mysql/v001__create_db_connection_info.sql @@ -226,19 +226,6 @@ CREATE TABLE IF NOT EXISTS chart_entity ( INDEX (updatedAt) ); -CREATE TABLE IF NOT EXISTS task_entity ( - id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL, - fullyQualifiedName VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.fullyQualifiedName') NOT NULL, - json JSON NOT NULL, - updatedAt TIMESTAMP GENERATED ALWAYS AS (TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ'))) NOT NULL, - updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.updatedBy') NOT NULL, - timestamp BIGINT, - PRIMARY KEY (id), - UNIQUE KEY unique_name(fullyQualifiedName), - INDEX (updatedBy), - INDEX (updatedAt) -); - -- -- Feed related tables -- diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java index 5a87b77ca07..adb50f59d2d 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java @@ -34,7 +34,6 @@ public final class Entity { public static final String CHART = "chart"; public static final String REPORT = "report"; public static final String TOPIC = "topic"; - public static final String TASK = "task"; public static final String MODEL = "model"; public static final String BOTS = "bots"; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java index fc9b28f4ff7..df99b38248a 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java @@ -16,7 +16,6 @@ import org.openmetadata.catalog.entity.data.Model; import org.openmetadata.catalog.entity.data.Pipeline; import org.openmetadata.catalog.entity.data.Report; import org.openmetadata.catalog.entity.data.Table; -import org.openmetadata.catalog.entity.data.Task; import org.openmetadata.catalog.entity.data.Topic; import org.openmetadata.catalog.entity.services.DashboardService; import org.openmetadata.catalog.entity.services.DatabaseService; @@ -39,7 +38,6 @@ import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineEntityInterface import org.openmetadata.catalog.jdbi3.PipelineServiceRepository.PipelineServiceEntityInterface; import org.openmetadata.catalog.jdbi3.ReportRepository.ReportEntityInterface; import org.openmetadata.catalog.jdbi3.TableRepository.TableEntityInterface; -import org.openmetadata.catalog.jdbi3.TaskRepository.TaskEntityInterface; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamEntityInterface; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicEntityInterface; import org.openmetadata.catalog.jdbi3.UserRepository.UserEntityInterface; @@ -53,8 +51,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; public interface CollectionDAO { @CreateSqlObject @@ -87,9 +83,6 @@ public interface CollectionDAO { @CreateSqlObject MetricsDAO metricsDAO(); - @CreateSqlObject - TaskDAO taskDAO(); - @CreateSqlObject ChartDAO chartDAO(); @@ -211,8 +204,8 @@ public interface CollectionDAO { } class EntityVersionPair { - private Double version; - private String entityJson; + private final Double version; + private final String entityJson; public Double getVersion() { return version; @@ -579,22 +572,6 @@ public interface CollectionDAO { } } - interface TaskDAO extends EntityDAO{ - @Override - default String getTableName() { return "task_entity"; } - - @Override - default Class getEntityClass() { return Task.class; } - - @Override - default String getNameColumn() { return "fullyQualifiedName"; } - - @Override - default EntityReference getEntityReference(Task entity) { - return new TaskEntityInterface(entity).getEntityReference(); - } - } - interface TeamDAO extends EntityDAO { @Override default String getTableName() { return "team_entity"; } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java index 8182c9745b0..51ff86f0941 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java @@ -25,6 +25,7 @@ import org.openmetadata.catalog.resources.pipelines.PipelineResource; import org.openmetadata.catalog.type.ChangeDescription; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.TagLabel; +import org.openmetadata.catalog.type.Task; import org.openmetadata.catalog.util.EntityInterface; import org.openmetadata.catalog.util.EntityUtil; import org.openmetadata.catalog.util.EntityUtil.Fields; @@ -32,11 +33,15 @@ import org.openmetadata.catalog.util.JsonUtils; import javax.ws.rs.core.Response.Status; import java.io.IOException; +import java.net.URI; import java.text.ParseException; -import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; @@ -95,7 +100,9 @@ public class PipelineRepository extends EntityRepository { pipeline.setConcurrency(pipeline.getConcurrency()); pipeline.setOwner(fields.contains("owner") ? getOwner(pipeline) : null); pipeline.setFollowers(fields.contains("followers") ? getFollowers(pipeline) : null); - pipeline.setTasks(fields.contains("tasks") ? getTasks(pipeline) : null); + if (!fields.contains("tasks")) { + pipeline.withTasks(null); + } pipeline.setTags(fields.contains("tags") ? getTags(pipeline.getFullyQualifiedName()) : null); return pipeline; } @@ -132,10 +139,9 @@ public class PipelineRepository extends EntityRepository { EntityReference owner = pipeline.getOwner(); List tags = pipeline.getTags(); EntityReference service = pipeline.getService(); - List tasks = pipeline.getTasks(); // Don't store owner, database, href and tags as JSON. Build it on the fly based on relationships - pipeline.withOwner(null).withService(null).withTasks(null).withHref(null).withTags(null); + pipeline.withOwner(null).withService(null).withHref(null).withTags(null); if (update) { dao.pipelineDAO().update(pipeline.getId(), JsonUtils.pojoToJson(pipeline)); @@ -144,7 +150,7 @@ public class PipelineRepository extends EntityRepository { } // Restore the relationships - pipeline.withOwner(owner).withService(service).withTasks(tasks).withTags(tags); + pipeline.withOwner(owner).withService(service).withTags(tags); } @Override @@ -153,14 +159,6 @@ public class PipelineRepository extends EntityRepository { dao.relationshipDAO().insert(service.getId().toString(), pipeline.getId().toString(), service.getType(), Entity.PIPELINE, Relationship.CONTAINS.ordinal()); - // Add relationship from pipeline to task - String pipelineId = pipeline.getId().toString(); - if (pipeline.getTasks() != null) { - for (EntityReference task : pipeline.getTasks()) { - dao.relationshipDAO().insert(pipelineId, task.getId().toString(), Entity.PIPELINE, Entity.TASK, - Relationship.CONTAINS.ordinal()); - } - } // Add owner relationship EntityUtil.setOwner(dao.relationshipDAO(), pipeline.getId(), Entity.PIPELINE, pipeline.getOwner()); @@ -210,37 +208,10 @@ public class PipelineRepository extends EntityRepository { return pipeline == null ? null : EntityUtil.getFollowers(pipeline.getId(), dao.relationshipDAO(), dao.userDAO()); } - private List getTasks(Pipeline pipeline) throws IOException { - if (pipeline == null) { - return null; - } - String pipelineId = pipeline.getId().toString(); - List taskIds = dao.relationshipDAO().findTo(pipelineId, Relationship.CONTAINS.ordinal(), Entity.TASK); - List tasks = new ArrayList<>(); - for (String taskId : taskIds) { - tasks.add(dao.taskDAO().findEntityReferenceById(UUID.fromString(taskId))); - } - return tasks; - } - - private void updateTaskRelationships(Pipeline pipeline) { - String pipelineId = pipeline.getId().toString(); - - // Add relationship from pipeline to task - if (pipeline.getTasks() != null) { - // Remove any existing tasks associated with this pipeline - dao.relationshipDAO().deleteFrom(pipelineId, Relationship.CONTAINS.ordinal(), Entity.TASK); - for (EntityReference task : pipeline.getTasks()) { - dao.relationshipDAO().insert(pipelineId, task.getId().toString(), Entity.PIPELINE, Entity.TASK, - Relationship.CONTAINS.ordinal()); - } - } - } - - static class PipelineEntityInterface implements EntityInterface { + public static class PipelineEntityInterface implements EntityInterface { private final Pipeline entity; - PipelineEntityInterface(Pipeline entity) { + public PipelineEntityInterface(Pipeline entity) { this.entity = entity; } @@ -283,6 +254,9 @@ public class PipelineRepository extends EntityRepository { @Override public Date getUpdatedAt() { return entity.getUpdatedAt(); } + @Override + public URI getHref() { return entity.getHref(); } + @Override public EntityReference getEntityReference() { return new EntityReference().withId(getId()).withName(getFullyQualifiedName()).withDescription(getDescription()) @@ -318,9 +292,10 @@ public class PipelineRepository extends EntityRepository { } @Override - public void setTags(List tags) { - entity.setTags(tags); - } + public ChangeDescription getChangeDescription() { return entity.getChangeDescription(); } + + @Override + public void setTags(List tags) { entity.setTags(tags); } } /** @@ -339,18 +314,20 @@ public class PipelineRepository extends EntityRepository { private void updateTasks(Pipeline origPipeline, Pipeline updatedPipeline) { // Airflow lineage backend gets executed per task in a DAG. This means we will not a get full picture of the // pipeline in each call. Hence we may create a pipeline and add a single task when one task finishes in a - // pipeline - // in the next task run we may have to update. To take care of this we will merge the tasks - if (updatedPipeline.getTasks() == null) { - updatedPipeline.setTasks(origPipeline.getTasks()); - } else { - updatedPipeline.getTasks().addAll(origPipeline.getTasks()); // TODO remove duplicates - } + // pipeline in the next task run we may have to update. To take care of this we will merge the tasks + List updatedTasks = Optional.ofNullable(updatedPipeline.getTasks()).orElse(Collections.emptyList()); + List origTasks = Optional.ofNullable(origPipeline.getTasks()).orElse(Collections.emptyList()); - // Add relationship from pipeline to task - updateTaskRelationships(updatedPipeline); - recordChange("tasks", EntityUtil.getIDList(updatedPipeline.getTasks()), - EntityUtil.getIDList(origPipeline.getTasks())); + // TODO this might not provide distinct + updatedTasks = Stream.concat(origTasks.stream(), updatedTasks.stream()).distinct().collect(Collectors.toList()); + if (origTasks.isEmpty()) { + origTasks = null; + } + if (updatedTasks.isEmpty()) { + updatedTasks = null; + } + updatedPipeline.setTasks(updatedTasks); + recordChange("tasks", origTasks, updatedTasks); } } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java deleted file mode 100644 index a8dfe637131..00000000000 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.openmetadata.catalog.jdbi3; - -import org.jdbi.v3.sqlobject.transaction.Transaction; -import org.openmetadata.catalog.Entity; -import org.openmetadata.catalog.entity.data.Task; -import org.openmetadata.catalog.entity.services.PipelineService; -import org.openmetadata.catalog.exception.EntityNotFoundException; -import org.openmetadata.catalog.resources.tasks.TaskResource; -import org.openmetadata.catalog.type.ChangeDescription; -import org.openmetadata.catalog.type.EntityReference; -import org.openmetadata.catalog.type.TagLabel; -import org.openmetadata.catalog.util.EntityInterface; -import org.openmetadata.catalog.util.EntityUtil; -import org.openmetadata.catalog.util.EntityUtil.Fields; -import org.openmetadata.catalog.util.JsonUtils; - -import java.io.IOException; -import java.net.URI; -import java.text.ParseException; -import java.util.Date; -import java.util.List; -import java.util.Objects; -import java.util.UUID; - -import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; - -public class TaskRepository extends EntityRepository { - private static final Fields TASK_UPDATE_FIELDS = new Fields(TaskResource.FIELD_LIST, "owner," + - "taskConfig,tags,downstreamTasks"); - private static final Fields TASK_PATCH_FIELDS = new Fields(TaskResource.FIELD_LIST, "owner,service,tags"); - private final CollectionDAO dao; - - public static String getFQN(Task task) { - return (task.getService().getName() + "." + task.getName()); - } - - public TaskRepository(CollectionDAO dao) { - super(Task.class, dao.taskDAO(), dao, TASK_PATCH_FIELDS, TASK_UPDATE_FIELDS); - this.dao = dao; - } - - @Transaction - public void delete(UUID id) { - if (dao.relationshipDAO().findToCount(id.toString(), Relationship.CONTAINS.ordinal(), Entity.TASK) > 0) { - throw new IllegalArgumentException("Task is not empty"); - } - if (dao.taskDAO().delete(id) <= 0) { - throw EntityNotFoundException.byMessage(entityNotFound(Entity.TASK, id)); - } - dao.relationshipDAO().deleteAll(id.toString()); - } - - @Override - public void validate(Task task) throws IOException { - EntityReference pipelineService = getService(task.getService()); - task.setService(pipelineService); - task.setFullyQualifiedName(getFQN(task)); - EntityUtil.populateOwner(dao.userDAO(), dao.teamDAO(), task.getOwner()); // Validate owner - getService(task.getService()); - task.setTags(EntityUtil.addDerivedTags(dao.tagDAO(), task.getTags())); - } - - @Override - public void store(Task task, boolean update) throws IOException { - // Relationships and fields such as href are derived and not stored as part of json - EntityReference owner = task.getOwner(); - List tags = task.getTags(); - EntityReference service = task.getService(); - - // Don't store owner, database, href and tags as JSON. Build it on the fly based on relationships - task.withOwner(null).withService(null).withHref(null).withTags(null); - - if (update) { - dao.taskDAO().update(task.getId(), JsonUtils.pojoToJson(task)); - } else { - dao.taskDAO().insert(task); - } - - // Restore the relationships - task.withOwner(owner).withService(service).withTags(tags); - } - - @Override - public void storeRelationships(Task task) throws IOException { - setService(task, task.getService()); - setOwner(task, task.getOwner()); - applyTags(task); - } - - private void applyTags(Task task) throws IOException { - // Add task level tags by adding tag to task relationship - EntityUtil.applyTags(dao.tagDAO(), task.getTags(), task.getFullyQualifiedName()); - task.setTags(getTags(task.getFullyQualifiedName())); // Update tag to handle additional derived tags - } - - public EntityReference getOwner(Task task) throws IOException { - return task != null ? - EntityUtil.populateOwner(task.getId(), dao.relationshipDAO(), dao.userDAO(), dao.teamDAO()) : null; - } - - private void setOwner(Task task, EntityReference owner) { - EntityUtil.setOwner(dao.relationshipDAO(), task.getId(), Entity.TASK, owner); - task.setOwner(owner); - } - - @Override - public Task setFields(Task task, Fields fields) throws IOException { - task.setTaskUrl(task.getTaskUrl()); - task.setTaskSQL(task.getTaskSQL()); - task.setStartDate(task.getStartDate()); - task.setEndDate(task.getEndDate()); - task.setService(getService(task)); - task.setOwner(fields.contains("owner") ? getOwner(task) : null); - task.setTags(fields.contains("tags") ? getTags(task.getFullyQualifiedName()) : null); - task.setDownstreamTasks(fields.contains("downstreamTasks") ? task.getDownstreamTasks() : null); - return task; - } - - @Override - public void restorePatchAttributes(Task original, Task updated) throws IOException, ParseException { - - } - - @Override - public EntityInterface getEntityInterface(Task entity) { - return new TaskEntityInterface(entity); - } - - - private List getTags(String fqn) { - return dao.tagDAO().getTags(fqn); - } - - private EntityReference getService(Task task) throws IOException { - return task == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(dao.relationshipDAO(), - task.getId(), Entity.PIPELINE_SERVICE))); - } - - private EntityReference getService(EntityReference service) throws IOException { - if (service.getType().equalsIgnoreCase(Entity.PIPELINE_SERVICE)) { - PipelineService serviceInstance = dao.pipelineServiceDAO().findEntityById(service.getId()); - service.setDescription(serviceInstance.getDescription()); - service.setName(serviceInstance.getName()); - } else { - throw new IllegalArgumentException(String.format("Invalid service type %s for the task", service.getType())); - } - return service; - } - - public void setService(Task task, EntityReference service) throws IOException { - if (service != null && task != null) { - getService(service); // Populate service details - dao.relationshipDAO().insert(service.getId().toString(), task.getId().toString(), service.getType(), - Entity.TASK, Relationship.CONTAINS.ordinal()); - task.setService(service); - } - } - - public static class TaskEntityInterface implements EntityInterface { - private final Task entity; - - public TaskEntityInterface(Task entity) { - this.entity = entity; - } - - @Override - public UUID getId() { - return entity.getId(); - } - - @Override - public String getDescription() { - return entity.getDescription(); - } - - @Override - public String getDisplayName() { - return entity.getDisplayName(); - } - - @Override - public EntityReference getOwner() { - return entity.getOwner(); - } - - @Override - public String getFullyQualifiedName() { - return entity.getFullyQualifiedName(); - } - - @Override - public List getTags() { - return entity.getTags(); - } - - @Override - public Double getVersion() { return entity.getVersion(); } - - @Override - public String getUpdatedBy() { return entity.getUpdatedBy(); } - - @Override - public Date getUpdatedAt() { return entity.getUpdatedAt(); } - - @Override - public URI getHref() { return entity.getHref(); } - - @Override - public EntityReference getEntityReference() { - return new EntityReference().withId(getId()).withName(getFullyQualifiedName()).withDescription(getDescription()) - .withDisplayName(getDisplayName()).withType(Entity.TASK); - } - - @Override - public Task getEntity() { return entity; } - - @Override - public void setId(UUID id) { entity.setId(id); } - - @Override - public void setDescription(String description) { - entity.setDescription(description); - } - - @Override - public void setDisplayName(String displayName) { - entity.setDisplayName(displayName); - } - - @Override - public void setUpdateDetails(String updatedBy, Date updatedAt) { - entity.setUpdatedBy(updatedBy); - entity.setUpdatedAt(updatedAt); - } - - @Override - public void setChangeDescription(Double newVersion, ChangeDescription changeDescription) { - entity.setVersion(newVersion); - entity.setChangeDescription(changeDescription); - } - - @Override - public ChangeDescription getChangeDescription() { return entity.getChangeDescription(); } - - @Override - public void setTags(List tags) { - entity.setTags(tags); - } - } -} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/DatabaseResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/DatabaseResource.java index 4bc64e1f017..e40a8cb87f9 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/DatabaseResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/DatabaseResource.java @@ -28,7 +28,6 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.responses.ApiResponse; import org.openmetadata.catalog.api.data.CreateDatabase; import org.openmetadata.catalog.entity.data.Database; -import org.openmetadata.catalog.entity.data.Task; import org.openmetadata.catalog.jdbi3.CollectionDAO; import org.openmetadata.catalog.jdbi3.DatabaseRepository; import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseEntityInterface; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java index 212ac134d53..5f2b45effd3 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java @@ -33,6 +33,7 @@ import org.openmetadata.catalog.jdbi3.PipelineRepository; import org.openmetadata.catalog.resources.Collection; import org.openmetadata.catalog.security.CatalogAuthorizer; import org.openmetadata.catalog.security.SecurityUtil; +import org.openmetadata.catalog.type.EntityHistory; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.util.EntityUtil; import org.openmetadata.catalog.util.EntityUtil.Fields; @@ -95,9 +96,6 @@ public class PipelineResource { pipeline.setHref(RestUtil.getHref(uriInfo, PIPELINE_COLLECTION_PATH, pipeline.getId())); EntityUtil.addHref(uriInfo, pipeline.getOwner()); EntityUtil.addHref(uriInfo, pipeline.getService()); - if (pipeline.getTasks() != null) { - EntityUtil.addHref(uriInfo, pipeline.getTasks()); - } EntityUtil.addHref(uriInfo, pipeline.getFollowers()); return pipeline; } @@ -170,6 +168,22 @@ public class PipelineResource { return pipelines; } + @GET + @Path("/{id}/versions") + @Operation(summary = "List pipeline versions", tags = "pipelines", + description = "Get a list of all the versions of a pipeline identified by `id`", + responses = {@ApiResponse(responseCode = "200", description = "List of pipeline versions", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = EntityHistory.class))) + }) + public EntityHistory listVersions(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "pipeline Id", schema = @Schema(type = "string")) + @PathParam("id") String id) + throws IOException, ParseException, GeneralSecurityException { + return dao.listVersions(id); + } + @GET @Path("/{id}") @Operation(summary = "Get a pipeline", tags = "pipelines", @@ -210,6 +224,26 @@ public class PipelineResource { return addHref(uriInfo, pipeline); } + @GET + @Path("/{id}/versions/{version}") + @Operation(summary = "Get a version of the pipeline", tags = "pipelines", + description = "Get a version of the pipeline by given `id`", + responses = { + @ApiResponse(responseCode = "200", description = "pipeline", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = Pipeline.class))), + @ApiResponse(responseCode = "404", description = "Pipeline for instance {id} and version {version} is " + + "not found") + }) + public Pipeline getVersion(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Pipeline Id", schema = @Schema(type = "string")) + @PathParam("id") String id, + @Parameter(description = "Pipeline version number in the form `major`.`minor`", + schema = @Schema(type = "string", example = "0.1 or 1.1")) + @PathParam("version") String version) throws IOException, ParseException { + return dao.getVersion(id, version); + } @POST @Operation(summary = "Create a pipeline", tags = "pipelines", diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java deleted file mode 100644 index fb383f7db06..00000000000 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.openmetadata.catalog.resources.tasks; - -import com.google.inject.Inject; -import io.swagger.annotations.Api; -import io.swagger.v3.oas.annotations.ExternalDocumentation; -import io.swagger.v3.oas.annotations.Operation; -import io.swagger.v3.oas.annotations.Parameter; -import io.swagger.v3.oas.annotations.media.Content; -import io.swagger.v3.oas.annotations.media.ExampleObject; -import io.swagger.v3.oas.annotations.media.Schema; -import io.swagger.v3.oas.annotations.parameters.RequestBody; -import io.swagger.v3.oas.annotations.responses.ApiResponse; -import org.openmetadata.catalog.api.data.CreateTask; -import org.openmetadata.catalog.entity.data.Task; -import org.openmetadata.catalog.jdbi3.CollectionDAO; -import org.openmetadata.catalog.jdbi3.TaskRepository; -import org.openmetadata.catalog.jdbi3.TaskRepository.TaskEntityInterface; -import org.openmetadata.catalog.resources.Collection; -import org.openmetadata.catalog.security.CatalogAuthorizer; -import org.openmetadata.catalog.security.SecurityUtil; -import org.openmetadata.catalog.type.EntityHistory; -import org.openmetadata.catalog.type.EntityReference; -import org.openmetadata.catalog.util.EntityUtil; -import org.openmetadata.catalog.util.EntityUtil.Fields; -import org.openmetadata.catalog.util.RestUtil; -import org.openmetadata.catalog.util.RestUtil.PutResponse; -import org.openmetadata.catalog.util.ResultList; - -import javax.json.JsonPatch; -import javax.validation.Valid; -import javax.validation.constraints.Max; -import javax.validation.constraints.Min; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.PATCH; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.SecurityContext; -import javax.ws.rs.core.UriInfo; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.security.GeneralSecurityException; -import java.text.ParseException; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.UUID; - -@Path("/v1/tasks") -@Api(value = "tasks data asset collection", tags = "Task data asset collection") -@Produces(MediaType.APPLICATION_JSON) -@Consumes(MediaType.APPLICATION_JSON) -@Collection(name = "tasks") -public class TaskResource { - private static final String TASK_COLLECTION_PATH = "v1/tasks/"; - private final TaskRepository dao; - private final CatalogAuthorizer authorizer; - - public static void addHref(UriInfo uriInfo, EntityReference ref) { - ref.withHref(RestUtil.getHref(uriInfo, TASK_COLLECTION_PATH, ref.getId())); - } - - public static List addHref(UriInfo uriInfo, List tasks) { - Optional.ofNullable(tasks).orElse(Collections.emptyList()).forEach(i -> addHref(uriInfo, i)); - return tasks; - } - - public static Task addHref(UriInfo uriInfo, Task task) { - task.setHref(RestUtil.getHref(uriInfo, TASK_COLLECTION_PATH, task.getId())); - EntityUtil.addHref(uriInfo, task.getOwner()); - EntityUtil.addHref(uriInfo, task.getService()); - - return task; - } - - @Inject - public TaskResource(CollectionDAO dao, CatalogAuthorizer authorizer) { - Objects.requireNonNull(dao, "TaskRepository must not be null"); - this.dao = new TaskRepository(dao); - this.authorizer = authorizer; - } - - public static class TaskList extends ResultList { - @SuppressWarnings("unused") - TaskList() { - // Empty constructor needed for deserialization - } - - public TaskList(List data, String beforeCursor, String afterCursor, int total) - throws GeneralSecurityException, UnsupportedEncodingException { - super(data, beforeCursor, afterCursor, total); - } - } - - static final String FIELDS = "downstreamTasks,taskConfig,owner,service,tags"; - public static final List FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "") - .split(",")); - - @GET - @Operation(summary = "List tasks", tags = "tasks", - description = "Get a list of tasks, optionally filtered by `service` it belongs to. Use `fields` " + - "parameter to get only necessary fields. Use cursor-based pagination to limit the number " + - "entries in the list using `limit` and `before` or `after` query params.", - responses = { - @ApiResponse(responseCode = "200", description = "List of tasks", - content = @Content(mediaType = "application/json", - schema = @Schema(implementation = TaskList.class))) - }) - public ResultList list(@Context UriInfo uriInfo, - @Context SecurityContext securityContext, - @Parameter(description = "Fields requested in the returned resource", - schema = @Schema(type = "string", example = FIELDS)) - @QueryParam("fields") String fieldsParam, - @Parameter(description = "Filter tasks by service name", - schema = @Schema(type = "string", example = "superset")) - @QueryParam("service") String serviceParam, - @Parameter(description = "Limit the number tasks returned. (1 to 1000000, default = 10)") - @DefaultValue("10") - @QueryParam("limit") @Min(1) @Max(1000000) int limitParam, - @Parameter(description = "Returns list of tasks before this cursor", - schema = @Schema(type = "string")) - @QueryParam("before") String before, - @Parameter(description = "Returns list of tasks after this cursor", - schema = @Schema(type = "string")) - @QueryParam("after") String after - ) throws IOException, GeneralSecurityException, ParseException { - RestUtil.validateCursors(before, after); - Fields fields = new Fields(FIELD_LIST, fieldsParam); - - ResultList tasks; - if (before != null) { // Reverse paging - tasks = dao.listBefore(fields, serviceParam, limitParam, before); // Ask for one extra entry - } else { // Forward paging or first page - tasks = dao.listAfter(fields, serviceParam, limitParam, after); - } - addHref(uriInfo, tasks.getData()); - return tasks; - } - - @GET - @Path("/{id}/versions") - @Operation(summary = "List task versions", tags = "tasks", - description = "Get a list of all the versions of a task identified by `id`", - responses = {@ApiResponse(responseCode = "200", description = "List of task versions", - content = @Content(mediaType = "application/json", - schema = @Schema(implementation = EntityHistory.class))) - }) - public EntityHistory listVersions(@Context UriInfo uriInfo, - @Context SecurityContext securityContext, - @Parameter(description = "task Id", schema = @Schema(type = "string")) - @PathParam("id") String id) - throws IOException, ParseException, GeneralSecurityException { - return dao.listVersions(id); - } - - @GET - @Path("/{id}") - @Operation(summary = "Get a Task", tags = "tasks", - description = "Get a task by `id`.", - responses = { - @ApiResponse(responseCode = "200", description = "The Task", - content = @Content(mediaType = "application/json", - schema = @Schema(implementation = Task.class))), - @ApiResponse(responseCode = "404", description = "Task for instance {id} is not found") - }) - public Task get(@Context UriInfo uriInfo, @PathParam("id") String id, - @Context SecurityContext securityContext, - @Parameter(description = "Fields requested in the returned resource", - schema = @Schema(type = "string", example = FIELDS)) - @QueryParam("fields") String fieldsParam) throws IOException, ParseException { - Fields fields = new Fields(FIELD_LIST, fieldsParam); - return addHref(uriInfo, dao.get(id, fields)); - } - - @GET - @Path("/name/{fqn}") - @Operation(summary = "Get a task by name", tags = "tasks", - description = "Get a task by fully qualified name.", - responses = { - @ApiResponse(responseCode = "200", description = "The task", - content = @Content(mediaType = "application/json", - schema = @Schema(implementation = Task.class))), - @ApiResponse(responseCode = "404", description = "Task for instance {id} is not found") - }) - public Response getByName(@Context UriInfo uriInfo, @PathParam("fqn") String fqn, - @Context SecurityContext securityContext, - @Parameter(description = "Fields requested in the returned resource", - schema = @Schema(type = "string", example = FIELDS)) - @QueryParam("fields") String fieldsParam) throws IOException, ParseException { - Fields fields = new Fields(FIELD_LIST, fieldsParam); - Task task = dao.getByName(fqn, fields); - addHref(uriInfo, task); - return Response.ok(task).build(); - } - - @GET - @Path("/{id}/versions/{version}") - @Operation(summary = "Get a version of the task", tags = "tasks", - description = "Get a version of the task by given `id`", - responses = { - @ApiResponse(responseCode = "200", description = "task", - content = @Content(mediaType = "application/json", - schema = @Schema(implementation = Task.class))), - @ApiResponse(responseCode = "404", description = "Task for instance {id} and version {version} is " + - "not found") - }) - public Task getVersion(@Context UriInfo uriInfo, - @Context SecurityContext securityContext, - @Parameter(description = "Task Id", schema = @Schema(type = "string")) - @PathParam("id") String id, - @Parameter(description = "Task version number in the form `major`.`minor`", - schema = @Schema(type = "string", example = "0.1 or 1.1")) - @PathParam("version") String version) throws IOException, ParseException { - return dao.getVersion(id, version); - } - - @POST - @Operation(summary = "Create a task", tags = "tasks", - description = "Create a task under an existing `service`.", - responses = { - @ApiResponse(responseCode = "200", description = "The task", - content = @Content(mediaType = "application/json", - schema = @Schema(implementation = CreateTask.class))), - @ApiResponse(responseCode = "400", description = "Bad request") - }) - public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext, - @Valid CreateTask create) throws IOException, ParseException { - SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); - Task task = getTask(securityContext, create); - task = addHref(uriInfo, dao.create(task)); - return Response.created(task.getHref()).entity(task).build(); - } - - @PATCH - @Path("/{id}") - @Operation(summary = "Update a Task", tags = "task", - description = "Update an existing task using JsonPatch.", - externalDocs = @ExternalDocumentation(description = "JsonPatch RFC", - url = "https://tools.ietf.org/html/rfc6902")) - @Consumes(MediaType.APPLICATION_JSON_PATCH_JSON) - public Task updateDescription(@Context UriInfo uriInfo, - @Context SecurityContext securityContext, - @PathParam("id") String id, - @RequestBody(description = "JsonPatch with array of operations", - content = @Content(mediaType = MediaType.APPLICATION_JSON_PATCH_JSON, - examples = {@ExampleObject("[" + - "{op:remove, path:/a}," + - "{op:add, path: /b, value: val}" + - "]")})) - JsonPatch patch) throws IOException, ParseException { - Fields fields = new Fields(FIELD_LIST, FIELDS); - Task task = dao.get(id, fields); - SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext, - new TaskEntityInterface(task).getEntityReference()); - task = dao.patch(UUID.fromString(id), securityContext.getUserPrincipal().getName(), patch); - return addHref(uriInfo, task); - } - - @PUT - @Operation(summary = "Create or update task", tags = "tasks", - description = "Create a task, it it does not exist or update an existing task.", - responses = { - @ApiResponse(responseCode = "200", description = "The updated task ", - content = @Content(mediaType = "application/json", - schema = @Schema(implementation = CreateTask.class))) - }) - public Response createOrUpdate(@Context UriInfo uriInfo, - @Context SecurityContext securityContext, - @Valid CreateTask create) throws IOException, ParseException { - Task task = getTask(securityContext, create).withDownstreamTasks(create.getDownstreamTasks()); - PutResponse response = dao.createOrUpdate(task); - task = addHref(uriInfo, response.getEntity()); - return Response.status(response.getStatus()).entity(task).build(); - } - - @DELETE - @Path("/{id}") - @Operation(summary = "Delete a Task", tags = "tasks", - description = "Delete a task by `id`.", - responses = { - @ApiResponse(responseCode = "200", description = "OK"), - @ApiResponse(responseCode = "404", description = "task for instance {id} is not found") - }) - public Response delete(@Context UriInfo uriInfo, @PathParam("id") String id) { - dao.delete(UUID.fromString(id)); - return Response.ok().build(); - } - - private Task getTask(SecurityContext securityContext, CreateTask create) { - return new Task().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName()) - .withDescription(create.getDescription()) - .withService(create.getService()) - .withStartDate(create.getStartDate()) - .withEndDate(create.getEndDate()) - .withTaskType(create.getTaskType()) - .withTaskSQL(create.getTaskSQL()) - .withTaskUrl(create.getTaskUrl()) - .withTags(create.getTags()) - .withOwner(create.getOwner()) - .withUpdatedBy(securityContext.getUserPrincipal().getName()) - .withUpdatedAt(new Date()); - } -} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java index 4b36570a493..7351c087dca 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java @@ -39,7 +39,6 @@ import org.openmetadata.catalog.resources.services.dashboard.DashboardServiceRes import org.openmetadata.catalog.resources.services.database.DatabaseServiceResource; import org.openmetadata.catalog.resources.services.messaging.MessagingServiceResource; import org.openmetadata.catalog.resources.services.pipeline.PipelineServiceResource; -import org.openmetadata.catalog.resources.tasks.TaskResource; import org.openmetadata.catalog.resources.teams.TeamResource; import org.openmetadata.catalog.resources.teams.UserResource; import org.openmetadata.catalog.resources.topics.TopicResource; @@ -129,8 +128,6 @@ public final class EntityUtil { DashboardResource.addHref(uriInfo, ref); } else if (entity.equalsIgnoreCase(Entity.MODEL)) { ModelResource.addHref(uriInfo, ref); - } else if (entity.equalsIgnoreCase(Entity.TASK)) { - TaskResource.addHref(uriInfo, ref); } else if (entity.equalsIgnoreCase(Entity.PIPELINE)) { PipelineResource.addHref(uriInfo, ref); } else if (entity.equalsIgnoreCase(Entity.DATABASE_SERVICE)) { @@ -247,8 +244,6 @@ public final class EntityUtil { return dao.topicDAO().findEntityReferenceById(id); } else if (entity.equalsIgnoreCase(Entity.CHART)) { return dao.chartDAO().findEntityReferenceById(id); - } else if (entity.equalsIgnoreCase(Entity.TASK)) { - return dao.taskDAO().findEntityReferenceById(id); } else if (entity.equalsIgnoreCase(Entity.PIPELINE)) { return dao.pipelineDAO().findEntityReferenceById(id); } else if (entity.equalsIgnoreCase(Entity.MODEL)) { @@ -273,8 +268,6 @@ public final class EntityUtil { return dao.chartDAO().findEntityReferenceByName(fqn); } else if (entity.equalsIgnoreCase(Entity.DASHBOARD)) { return dao.dashboardDAO().findEntityReferenceByName(fqn); - } else if (entity.equalsIgnoreCase(Entity.TASK)) { - return dao.taskDAO().findEntityReferenceByName(fqn); } else if (entity.equalsIgnoreCase(Entity.PIPELINE)) { return dao.pipelineDAO().findEntityReferenceByName(fqn); } else if (entity.equalsIgnoreCase(Entity.MODEL)) { diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createPipeline.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createPipeline.json index d2cae373d0f..6a313a0b7b3 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createPipeline.json @@ -41,7 +41,7 @@ "description": "All the tasks that are part of pipeline.", "type": "array", "items": { - "$ref": "../../type/entityReference.json" + "$ref": "../../entity/data/pipeline.json#/definitions/task" }, "default": null }, diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createTask.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createTask.json deleted file mode 100644 index 08831d2ed35..00000000000 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createTask.json +++ /dev/null @@ -1,71 +0,0 @@ -{ - "$id": "https://open-metadata.org/schema/api/data/createTask.json", - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "Create Task entity request", - "description": "Create Task entity request", - "type": "object", - "properties" : { - "name": { - "description": "Name that identifies this Task.", - "type": "string", - "minLength": 1, - "maxLength": 64 - }, - "displayName": { - "description": "Display Name that identifies this Task. It could be title or label from the pipeline services", - "type": "string" - }, - "description": { - "description": "Description of the task instance. What it has and how to use it.", - "type": "string" - }, - "taskUrl" : { - "description": "Task URL to visit/manage. This URL points to respective pipeline service UI", - "type": "string", - "format": "uri" - }, - "taskType": { - "description": "Type of the Task. Usually refers to the class it implements", - "type": "string" - }, - "taskSQL": { - "description": "SQL used in the task. Can be used to determine the lineage", - "type": "string" - }, - "downstreamTasks": { - "description": "All the tasks that are downstream of this task.", - "type": "array", - "items": { - "type": "string", - "minLength": 1, - "maxLength": 64 - }, - "default": null - }, - "startDate": { - "description": "Start date of the task", - "$ref": "../../type/basic.json#/definitions/dateTime" - }, - "endDate": { - "description": "End date of the task", - "$ref": "../../type/basic.json#/definitions/dateTime" - }, - "tags": { - "description": "Tags for this chart", - "type": "array", - "items": { - "$ref": "../../type/tagLabel.json" - }, - "default": null - }, - "owner": { - "description": "Owner of this Task", - "$ref": "../../type/entityReference.json" - }, - "service" : { - "description": "Link to the pipeline service where this task is used", - "$ref" : "../../type/entityReference.json" - } - }, - "required": ["name", "service"] -} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json index 692b3407e2c..723820f8742 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json @@ -4,6 +4,69 @@ "title": "Pipeline", "description": "This schema defines the Pipeline entity. A pipeline enables the flow of data from source to destination through a series of processing steps. ETL is a type of pipeline where the series of steps Extract, Transform and Load the data.", "type": "object", + "definitions": { + "task": { + "type": "object", + "javaType": "org.openmetadata.catalog.type.Task", + "properties": { + "name": { + "description": "Name that identifies this task instance uniquely.", + "type": "string", + "minLength": 1, + "maxLength": 64 + }, + "displayName": { + "description": "Display Name that identifies this Task. It could be title or label from the pipeline services.", + "type": "string" + }, + "fullyQualifiedName": { + "description": "A unique name that identifies a pipeline in the format 'ServiceName.PipelineName.TaskName'.", + "type": "string", + "minLength": 1, + "maxLength": 64 + }, + "description": { + "description": "Description of this Task.", + "type": "string" + }, + "taskUrl": { + "description": "Task URL to visit/manage. This URL points to respective pipeline service UI.", + "type": "string", + "format": "uri" + }, + "downstreamTasks": { + "description": "All the tasks that are downstream of this task.", + "type": "array", + "items": { + "type": "string", + "minLength": 1, + "maxLength": 64 + }, + "default": null + }, + "taskType": { + "description": "Type of the Task. Usually refers to the class it implements.", + "type": "string" + }, + "taskSQL": { + "description": "SQL used in the task. Can be used to determine the lineage.", + "$ref": "../../type/basic.json#/definitions/sqlQuery" + }, + "tags": { + "description": "Tags for this task.", + "type": "array", + "items": { + "$ref": "../../type/tagLabel.json" + }, + "default": null + } + }, + "required": [ + "id", + "name" + ] + } + }, "properties" : { "id": { "description": "Unique identifier that identifies a pipeline instance.", @@ -62,7 +125,7 @@ "description": "All the tasks that are part of pipeline.", "type": "array", "items": { - "$ref": "../../type/entityReference.json" + "$ref": "#/definitions/task" }, "default": null }, diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/task.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/task.json deleted file mode 100644 index acb9b50c787..00000000000 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/task.json +++ /dev/null @@ -1,102 +0,0 @@ -{ - "$id": "https://open-metadata.org/schema/entity/data/task.json", - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "Task", - "description": "This schema defines the Task entity. A task is a unit of computation in a Pipeline.", - "type": "object", - "javaType": "org.openmetadata.catalog.entity.data.Task", - "properties" : { - "id": { - "description": "Unique identifier that identifies a task instance.", - "$ref": "../../type/basic.json#/definitions/uuid" - }, - "name": { - "description": "Name that identifies this task instance uniquely.", - "type": "string", - "minLength": 1, - "maxLength": 64 - }, - "displayName": { - "description": "Display Name that identifies this Task. It could be title or label from the pipeline services.", - "type": "string" - }, - "fullyQualifiedName": { - "description": "A unique name that identifies a pipeline in the format 'ServiceName.PipelineName.TaskName'.", - "type": "string", - "minLength": 1, - "maxLength": 64 - }, - "description": { - "description": "Description of this Task.", - "type": "string" - }, - "version" : { - "description": "Metadata version of the entity.", - "$ref": "../../type/entityHistory.json#/definitions/entityVersion" - }, - "updatedAt" : { - "description": "Last update time corresponding to the new version of the entity.", - "$ref": "../../type/basic.json#/definitions/dateTime" - }, - "updatedBy" : { - "description": "User who made the update.", - "type": "string" - }, - "taskUrl" : { - "description": "Task URL to visit/manage. This URL points to respective pipeline service UI.", - "type": "string", - "format": "uri" - }, - "downstreamTasks": { - "description": "All the tasks that are downstream of this task.", - "type": "array", - "items": { - "type": "string", - "minLength": 1, - "maxLength": 64 - }, - "default": null - }, - "taskType": { - "description": "Type of the Task. Usually refers to the class it implements.", - "type": "string" - }, - "taskSQL": { - "description": "SQL used in the task. Can be used to determine the lineage.", - "type": "string" - }, - "startDate": { - "description": "Start date of the task.", - "$ref": "../../type/basic.json#/definitions/dateTime" - }, - "endDate": { - "description": "End date of the task.", - "$ref": "../../type/basic.json#/definitions/dateTime" - }, - "tags": { - "description": "Tags for this Pipeline.", - "type": "array", - "items": { - "$ref": "../../type/tagLabel.json" - }, - "default": null - }, - "href": { - "description": "Link to the resource corresponding to this entity.", - "$ref": "../../type/basic.json#/definitions/href" - }, - "owner": { - "description": "Owner of this pipeline.", - "$ref": "../../type/entityReference.json" - }, - "service" : { - "description": "Link to service where this pipeline is hosted in.", - "$ref" : "../../type/entityReference.json" - }, - "changeDescription": { - "description" : "Change that lead to this version of the entity.", - "$ref": "../../type/entityHistory.json#/definitions/changeDescription" - } - }, - "required": ["id", "name", "service"] -} \ No newline at end of file diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java index f98e5a1d2fc..509b2cda7d0 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java @@ -22,40 +22,30 @@ import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; -import org.openmetadata.catalog.CatalogApplicationTest; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.api.data.CreatePipeline; -import org.openmetadata.catalog.api.data.CreateTask; -import org.openmetadata.catalog.api.services.CreatePipelineService; import org.openmetadata.catalog.entity.data.Pipeline; -import org.openmetadata.catalog.entity.data.Task; -import org.openmetadata.catalog.entity.services.PipelineService; -import org.openmetadata.catalog.entity.teams.Team; -import org.openmetadata.catalog.entity.teams.User; import org.openmetadata.catalog.exception.CatalogExceptionMessage; -import org.openmetadata.catalog.jdbi3.PipelineServiceRepository.PipelineServiceEntityInterface; -import org.openmetadata.catalog.jdbi3.TaskRepository.TaskEntityInterface; +import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineEntityInterface; +import org.openmetadata.catalog.resources.EntityResourceTest; import org.openmetadata.catalog.resources.pipelines.PipelineResource.PipelineList; -import org.openmetadata.catalog.resources.services.PipelineServiceResourceTest; -import org.openmetadata.catalog.resources.tasks.TaskResourceTest; -import org.openmetadata.catalog.resources.teams.TeamResourceTest; -import org.openmetadata.catalog.resources.teams.UserResourceTest; +import org.openmetadata.catalog.type.ChangeDescription; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.TagLabel; -import org.openmetadata.catalog.util.EntityUtil; +import org.openmetadata.catalog.type.Task; +import org.openmetadata.catalog.util.EntityInterface; import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.TestUtils; -import org.openmetadata.catalog.util.TestUtils.UpdateType; -import org.openmetadata.common.utils.JsonSchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.json.JsonPatch; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response.Status; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; @@ -64,7 +54,6 @@ import java.util.UUID; import static java.util.Collections.singletonList; import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static javax.ws.rs.core.Response.Status.CONFLICT; -import static javax.ws.rs.core.Response.Status.CREATED; import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static javax.ws.rs.core.Response.Status.OK; @@ -72,53 +61,70 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE; -import static org.openmetadata.catalog.util.TestUtils.UpdateType.NO_CHANGE; import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders; import static org.openmetadata.catalog.util.TestUtils.assertEntityPagination; import static org.openmetadata.catalog.util.TestUtils.assertResponse; import static org.openmetadata.catalog.util.TestUtils.authHeaders; -public class PipelineResourceTest extends CatalogApplicationTest { +public class PipelineResourceTest extends EntityResourceTest { private static final Logger LOG = LoggerFactory.getLogger(PipelineResourceTest.class); - public static User USER1; - public static EntityReference USER_OWNER1; - public static Team TEAM1; - public static EntityReference TEAM_OWNER1; - public static EntityReference AIRFLOW_REFERENCE; - public static EntityReference PREFECT_REFERENCE; - public static List TASK_REFERENCES; + public static List TASKS; public static final TagLabel TIER_1 = new TagLabel().withTagFQN("Tier.Tier1"); public static final TagLabel USER_ADDRESS_TAG_LABEL = new TagLabel().withTagFQN("User.Address"); + public PipelineResourceTest() { + super(Pipeline.class, "pipelines", PipelineResource.FIELDS); + } + @BeforeAll public static void setup(TestInfo test) throws HttpResponseException, URISyntaxException { - USER1 = UserResourceTest.createUser(UserResourceTest.create(test), authHeaders("test@open-metadata.org")); - USER_OWNER1 = new EntityReference().withId(USER1.getId()).withType("user"); - - TEAM1 = TeamResourceTest.createTeam(TeamResourceTest.create(test), adminAuthHeaders()); - TEAM_OWNER1 = new EntityReference().withId(TEAM1.getId()).withType("team"); - - CreatePipelineService createService = new CreatePipelineService().withName("airflow") - .withServiceType(CreatePipelineService.PipelineServiceType.Airflow) - .withPipelineUrl(TestUtils.PIPELINE_URL); - - PipelineService service = PipelineServiceResourceTest.createService(createService, adminAuthHeaders()); - AIRFLOW_REFERENCE = new PipelineServiceEntityInterface(service).getEntityReference(); - - createService.withName("prefect").withServiceType(CreatePipelineService.PipelineServiceType.Prefect); - service = PipelineServiceResourceTest.createService(createService, adminAuthHeaders()); - PREFECT_REFERENCE = new PipelineServiceEntityInterface(service).getEntityReference(); - TASK_REFERENCES = new ArrayList<>(); + EntityResourceTest.setup(test); + TASKS = new ArrayList<>(); for (int i=0; i < 3; i++) { - CreateTask createTask = TaskResourceTest.create(test, i).withService(AIRFLOW_REFERENCE); - Task task = TaskResourceTest.createTask(createTask, adminAuthHeaders()); - TASK_REFERENCES.add(new TaskEntityInterface(task).getEntityReference()); + Task task = new Task().withName("task" + i).withDescription("description") + .withDisplayName("displayName").withTaskUrl(new URI("http://localhost:0")); + TASKS.add(task); } + } + @Override + public Object createRequest(TestInfo test, String description, String displayName, EntityReference owner) { + return create(test).withDescription(description).withDisplayName(displayName).withOwner(owner); + } + + @Override + public void validateCreatedEntity(Pipeline pipeline, Object request, Map authHeaders) + throws HttpResponseException { + CreatePipeline createRequest = (CreatePipeline) request; + validateCommonEntityFields(getEntityInterface(pipeline), createRequest.getDescription(), + TestUtils.getPrincipal(authHeaders), createRequest.getOwner()); + assertEquals(createRequest.getDisplayName(), pipeline.getDisplayName()); + assertService(createRequest.getService(), pipeline.getService()); + validatePipelineTasks(pipeline, createRequest.getTasks()); + TestUtils.validateTags(pipeline.getFullyQualifiedName(), createRequest.getTags(), pipeline.getTags()); + } + + @Override + public void validateUpdatedEntity(Pipeline pipeline, Object request, Map authHeaders) throws HttpResponseException { + validateCreatedEntity(pipeline, request, authHeaders); + } + + @Override + public void validatePatchedEntity(Pipeline expected, Pipeline updated, Map authHeaders) throws HttpResponseException { + validateCommonEntityFields(getEntityInterface(updated), expected.getDescription(), + TestUtils.getPrincipal(authHeaders), expected.getOwner()); + assertEquals(expected.getDisplayName(), updated.getDisplayName()); + assertService(expected.getService(), updated.getService()); + validatePipelineTasks(updated, expected.getTasks()); + TestUtils.validateTags(updated.getFullyQualifiedName(), expected.getTags(), updated.getTags()); + } + + @Override + public EntityInterface getEntityInterface(Pipeline entity) { + return new PipelineEntityInterface(entity); } @Test @@ -152,25 +158,25 @@ public class PipelineResourceTest extends CatalogApplicationTest { public void post_validPipelines_as_admin_200_OK(TestInfo test) throws HttpResponseException { // Create team with different optional fields CreatePipeline create = create(test); - createAndCheckPipeline(create, adminAuthHeaders()); + createAndCheckEntity(create, adminAuthHeaders()); create.withName(getPipelineName(test, 1)).withDescription("description"); - createAndCheckPipeline(create, adminAuthHeaders()); + createAndCheckEntity(create, adminAuthHeaders()); } @Test public void post_PipelineWithUserOwner_200_ok(TestInfo test) throws HttpResponseException { - createAndCheckPipeline(create(test).withOwner(USER_OWNER1), adminAuthHeaders()); + createAndCheckEntity(create(test).withOwner(USER_OWNER1), adminAuthHeaders()); } @Test public void post_PipelineWithTeamOwner_200_ok(TestInfo test) throws HttpResponseException { - createAndCheckPipeline(create(test).withOwner(TEAM_OWNER1).withDisplayName("Pipeline1"), adminAuthHeaders()); + createAndCheckEntity(create(test).withOwner(TEAM_OWNER1).withDisplayName("Pipeline1"), adminAuthHeaders()); } @Test public void post_PipelineWithTasks_200_ok(TestInfo test) throws HttpResponseException { - createAndCheckPipeline(create(test), TASK_REFERENCES, adminAuthHeaders()); + createAndCheckEntity(create(test).withTasks(TASKS), adminAuthHeaders()); } @Test @@ -214,7 +220,7 @@ public class PipelineResourceTest extends CatalogApplicationTest { // Create Pipeline for each service and test APIs for (EntityReference service : differentServices) { - createAndCheckPipeline(create(test).withService(service), adminAuthHeaders()); + createAndCheckEntity(create(test).withService(service), adminAuthHeaders()); // List Pipelines by filtering on service name and ensure right Pipelines are returned in the response PipelineList list = listPipelines("service", service.getName(), adminAuthHeaders()); @@ -310,71 +316,11 @@ public class PipelineResourceTest extends CatalogApplicationTest { LOG.info("before {} after {} ", list.getPaging().getBefore(), list.getPaging().getAfter()); } - @Test - public void put_PipelineUpdateWithNoChange_200(TestInfo test) throws HttpResponseException { - // Create a Pipeline with POST - CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1); - Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); - - // Update Pipeline two times successfully with PUT requests - pipeline = updateAndCheckPipeline(pipeline, request, OK, adminAuthHeaders(), NO_CHANGE); - updateAndCheckPipeline(pipeline, request, OK, adminAuthHeaders(), NO_CHANGE); - } - - @Test - public void put_PipelineCreate_200(TestInfo test) throws HttpResponseException { - // Create a new Pipeline with PUT - CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1); - updateAndCheckPipeline(null, request.withName(test.getDisplayName()).withDescription(null), CREATED, - adminAuthHeaders(), NO_CHANGE); - } - - @Test - public void put_PipelineCreate_as_owner_200(TestInfo test) throws HttpResponseException { - // Create a new Pipeline with put - CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1); - // Add pipeline as admin - Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); - // Update the pipeline as user owner - updateAndCheckPipeline(pipeline, request.withDescription("new"), OK, authHeaders(USER1.getEmail()), MINOR_UPDATE); - } - - @Test - public void put_PipelineNullDescriptionUpdate_200(TestInfo test) throws HttpResponseException { - CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null); - Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); - - // Update null description with a new description - pipeline = updateAndCheckPipeline(pipeline, request.withDisplayName("Pipeline1"). - withDescription("newDescription"), OK, adminAuthHeaders(), MINOR_UPDATE); - assertEquals("Pipeline1", pipeline.getDisplayName()); // TODO move this to common validate - } - - @Test - public void put_PipelineEmptyDescriptionUpdate_200(TestInfo test) throws HttpResponseException { - // Create table with empty description - CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(""); - Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); - - // Update empty description with a new description - updateAndCheckPipeline(pipeline, request.withDescription("newDescription"), OK, adminAuthHeaders(), MINOR_UPDATE); - } - - @Test - public void put_PipelineNonEmptyDescriptionUpdate_200(TestInfo test) throws HttpResponseException { - CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription("description"); - createAndCheckPipeline(request, adminAuthHeaders()); - - // Updating description is ignored when backend already has description - Pipeline db = updatePipeline(request.withDescription("newDescription"), OK, adminAuthHeaders()); - assertEquals("description", db.getDescription()); - } - @Test public void put_PipelineUrlUpdate_200(TestInfo test) throws HttpResponseException, URISyntaxException { CreatePipeline request = create(test).withService(new EntityReference().withId(AIRFLOW_REFERENCE.getId()) .withType("pipelineService")).withDescription("description"); - createAndCheckPipeline(request, adminAuthHeaders()); + createAndCheckEntity(request, adminAuthHeaders()); URI pipelineURI = new URI("https://airflow.open-metadata.org/tree?dag_id=airflow_redshift_usage"); Integer pipelineConcurrency = 110; Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate(); @@ -391,43 +337,34 @@ public class PipelineResourceTest extends CatalogApplicationTest { } @Test - public void put_PipelineUpdateOwner_200(TestInfo test) throws HttpResponseException { - CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(""); - Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); - - // Change ownership from USER_OWNER1 to TEAM_OWNER1 - pipeline = updateAndCheckPipeline(pipeline, request.withOwner(TEAM_OWNER1), OK, adminAuthHeaders(), MINOR_UPDATE); - - // Remove ownership - pipeline = updateAndCheckPipeline(pipeline, request.withOwner(null), OK, adminAuthHeaders(), MINOR_UPDATE); - assertNull(pipeline.getOwner()); - } - - - @Test - public void put_PipelineTasksUpdate_200(TestInfo test) throws HttpResponseException { + public void put_PipelineTasksUpdate_200(TestInfo test) throws IOException { CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null); - Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); - pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES), - OK, adminAuthHeaders(), MINOR_UPDATE); - validatePipelineTasks(pipeline, TASK_REFERENCES); // TODO clean this up + Pipeline pipeline = createAndCheckEntity(request, adminAuthHeaders()); + + // Add description and tasks + ChangeDescription change = getChangeDescription(pipeline.getVersion()) + .withFieldsAdded(Arrays.asList("description", "tasks")); + pipeline = updateAndCheckEntity(request.withDescription("newDescription").withTasks(TASKS), + OK, adminAuthHeaders(), MINOR_UPDATE, change); + validatePipelineTasks(pipeline, TASKS); // TODO clean this up } @Test - public void put_AddRemovePipelineTasksUpdate_200(TestInfo test) throws HttpResponseException { - CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null); - Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders()); + public void put_AddRemovePipelineTasksUpdate_200(TestInfo test) throws IOException { + CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null).withTasks(null); + Pipeline pipeline = createAndCheckEntity(request, adminAuthHeaders()); - // Add tasks - pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES), - OK, adminAuthHeaders(), MINOR_UPDATE); - validatePipelineTasks(pipeline, TASK_REFERENCES); + // Add tasks and description + ChangeDescription change = getChangeDescription(pipeline.getVersion()) + .withFieldsAdded(Arrays.asList("description", "tasks")); + pipeline = updateAndCheckEntity(request.withDescription("newDescription").withTasks(TASKS), + OK, adminAuthHeaders(), MINOR_UPDATE, change); + // TODO update this once task removal is figured out // remove a task - TASK_REFERENCES.remove(0); - pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES), - OK, adminAuthHeaders(), MINOR_UPDATE); - validatePipelineTasks(pipeline, TASK_REFERENCES); + // TASKS.remove(0); + // change = getChangeDescription(pipeline.getVersion()).withFieldsUpdated(singletonList("tasks")); + //updateAndCheckEntity(request.withTasks(TASKS), OK, adminAuthHeaders(), MINOR_UPDATE, change); } @Test @@ -441,16 +378,16 @@ public class PipelineResourceTest extends CatalogApplicationTest { @Test public void get_PipelineWithDifferentFields_200_OK(TestInfo test) throws HttpResponseException { CreatePipeline create = create(test).withDescription("description").withOwner(USER_OWNER1) - .withService(AIRFLOW_REFERENCE); - Pipeline pipeline = createAndCheckPipeline(create, adminAuthHeaders()); + .withService(AIRFLOW_REFERENCE).withTasks(TASKS); + Pipeline pipeline = createAndCheckEntity(create, adminAuthHeaders()); validateGetWithDifferentFields(pipeline, false); } @Test public void get_PipelineByNameWithDifferentFields_200_OK(TestInfo test) throws HttpResponseException { CreatePipeline create = create(test).withDescription("description").withOwner(USER_OWNER1) - .withService(AIRFLOW_REFERENCE); - Pipeline pipeline = createAndCheckPipeline(create, adminAuthHeaders()); + .withService(AIRFLOW_REFERENCE).withTasks(TASKS); + Pipeline pipeline = createAndCheckEntity(create, adminAuthHeaders()); validateGetWithDifferentFields(pipeline, true); } @@ -466,21 +403,31 @@ public class PipelineResourceTest extends CatalogApplicationTest { pipeline.getService().setHref(null); // href is readonly and not patchable List pipelineTags = singletonList(TIER_1); - // Add description, owner when previously they were null - pipeline = patchPipelineAttributesAndCheck(pipeline, "description", - TEAM_OWNER1, pipelineTags, adminAuthHeaders(), MINOR_UPDATE); + // Add description, owner and tags when previously they were null + String origJson = JsonUtils.pojoToJson(pipeline); + pipeline.withDescription("description").withOwner(TEAM_OWNER1).withTags(pipelineTags); + ChangeDescription change = getChangeDescription(pipeline.getVersion()) + .withFieldsAdded(Arrays.asList("description", "owner", "tags")); + pipeline = patchEntityAndCheck(pipeline, origJson, adminAuthHeaders(), MINOR_UPDATE, change); pipeline.setOwner(TEAM_OWNER1); // Get rid of href and name returned in the response for owner pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service pipelineTags = singletonList(USER_ADDRESS_TAG_LABEL); - // Replace description, tier, owner - pipeline = patchPipelineAttributesAndCheck(pipeline, "description1", - USER_OWNER1, pipelineTags, adminAuthHeaders(), MINOR_UPDATE); + // Replace description, tags, owner + origJson = JsonUtils.pojoToJson(pipeline); + pipeline.withDescription("description1").withOwner(USER_OWNER1).withTags(pipelineTags); + change = getChangeDescription(pipeline.getVersion()) + .withFieldsUpdated(Arrays.asList("description", "owner", "tags")); + pipeline = patchEntityAndCheck(pipeline, origJson, adminAuthHeaders(), MINOR_UPDATE, change); pipeline.setOwner(USER_OWNER1); // Get rid of href and name returned in the response for owner pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service // Remove description, tier, owner - patchPipelineAttributesAndCheck(pipeline, null, null, pipelineTags, adminAuthHeaders(), MINOR_UPDATE); + origJson = JsonUtils.pojoToJson(pipeline); + pipeline.withDescription(null).withOwner(null).withTags(null); + change = getChangeDescription(pipeline.getVersion()) + .withFieldsDeleted(Arrays.asList("description", "owner", "tags")); + patchEntityAndCheck(pipeline, origJson, adminAuthHeaders(), MINOR_UPDATE, change); } // TODO listing tables test:1 @@ -504,58 +451,6 @@ public class PipelineResourceTest extends CatalogApplicationTest { assertResponse(exception, NOT_FOUND, entityNotFound(Entity.PIPELINE, TestUtils.NON_EXISTENT_ENTITY)); } - public static Pipeline createAndCheckPipeline(CreatePipeline create, - Map authHeaders) throws HttpResponseException { - String updatedBy = TestUtils.getPrincipal(authHeaders); - Pipeline pipeline = createPipeline(create, authHeaders); - validatePipeline(pipeline, create.getDisplayName(), - create.getDescription(), create.getOwner(), create.getService(), updatedBy); - return getAndValidate(pipeline.getId(), create, authHeaders, updatedBy); - } - - public static Pipeline createAndCheckPipeline(CreatePipeline create, List tasks, - Map authHeaders) throws HttpResponseException { - String updatedBy = TestUtils.getPrincipal(authHeaders); - create.withTasks(tasks); - Pipeline pipeline = createPipeline(create, authHeaders); - assertEquals(0.1, pipeline.getVersion()); - validatePipeline(pipeline, create.getDescription(), create.getOwner(), create.getService(), create.getTags(), - tasks, updatedBy); - return getAndValidate(pipeline.getId(), create, authHeaders, updatedBy); - } - - public static Pipeline updateAndCheckPipeline(Pipeline before, CreatePipeline create, Status status, - Map authHeaders, UpdateType updateType) - throws HttpResponseException { - String updatedBy = TestUtils.getPrincipal(authHeaders); - Pipeline updatedPipeline = updatePipeline(create, status, authHeaders); - validatePipeline(updatedPipeline, create.getDescription(), create.getOwner(), create.getService(), updatedBy); - if (before == null) { - assertEquals(0.1, updatedPipeline.getVersion()); // First version created - } else { - TestUtils.validateUpdate(before.getVersion(), updatedPipeline.getVersion(), updateType); - } - - // GET the newly updated Pipeline and validate - return getAndValidate(updatedPipeline.getId(), create, authHeaders, updatedBy); - } - - // Make sure in GET operations the returned Pipeline has all the required information passed during creation - public static Pipeline getAndValidate(UUID pipelineId, - CreatePipeline create, - Map authHeaders, - String expectedUpdatedBy) throws HttpResponseException { - // GET the newly created Pipeline by ID and validate - Pipeline pipeline = getPipeline(pipelineId, "service,owner,tasks", authHeaders); - validatePipeline(pipeline, create.getDescription(), create.getOwner(), create.getService(), expectedUpdatedBy); - - // GET the newly created Pipeline by name and validate - String fqn = pipeline.getFullyQualifiedName(); - pipeline = getPipelineByName(fqn, "service,owner,tasks", authHeaders); - return validatePipeline(pipeline, create.getDescription(), create.getOwner(), create.getService(), - expectedUpdatedBy); - } - public static Pipeline updatePipeline(CreatePipeline create, Status status, Map authHeaders) throws HttpResponseException { @@ -593,126 +488,10 @@ public class PipelineResourceTest extends CatalogApplicationTest { assertNotNull(pipeline.getOwner()); assertNotNull(pipeline.getService()); assertNotNull(pipeline.getTasks()); - TestUtils.validateEntityReference(pipeline.getTasks()); - } - private static Pipeline validatePipeline(Pipeline pipeline, String expectedDisplayName, String expectedDescription, - EntityReference expectedOwner, EntityReference expectedService, - String expectedUpdatedBy) { - Pipeline newPipeline = validatePipeline(pipeline, expectedDescription, expectedOwner, expectedService, - expectedUpdatedBy); - assertEquals(expectedDisplayName, newPipeline.getDisplayName()); - assertEquals(expectedUpdatedBy, newPipeline.getUpdatedBy()); - return newPipeline; - } - - private static Pipeline validatePipeline(Pipeline pipeline, String expectedDescription, - EntityReference expectedOwner, EntityReference expectedService, - String expectedUpdatedBy) { - assertNotNull(pipeline.getId()); - assertNotNull(pipeline.getHref()); - assertEquals(expectedDescription, pipeline.getDescription()); - assertEquals(expectedUpdatedBy, pipeline.getUpdatedBy()); - - // Validate owner - if (expectedOwner != null) { - TestUtils.validateEntityReference(pipeline.getOwner()); - assertEquals(expectedOwner.getId(), pipeline.getOwner().getId()); - assertEquals(expectedOwner.getType(), pipeline.getOwner().getType()); - assertNotNull(pipeline.getOwner().getHref()); - } - - // Validate service - if (expectedService != null) { - TestUtils.validateEntityReference(pipeline.getService()); - assertEquals(expectedService.getId(), pipeline.getService().getId()); - assertEquals(expectedService.getType(), pipeline.getService().getType()); - } - return pipeline; - } - - private static Pipeline validatePipeline(Pipeline pipeline, String expectedDescription, - EntityReference expectedOwner, EntityReference expectedService, - List expectedTags, List tasks, - String expectedUpdatedBy) throws HttpResponseException { - assertNotNull(pipeline.getId()); - assertNotNull(pipeline.getHref()); - assertEquals(expectedDescription, pipeline.getDescription()); - assertEquals(expectedUpdatedBy, pipeline.getUpdatedBy()); - - // Validate owner - if (expectedOwner != null) { - TestUtils.validateEntityReference(pipeline.getOwner()); - assertEquals(expectedOwner.getId(), pipeline.getOwner().getId()); - assertEquals(expectedOwner.getType(), pipeline.getOwner().getType()); - assertNotNull(pipeline.getOwner().getHref()); - } - - // Validate service - if (expectedService != null) { - TestUtils.validateEntityReference(pipeline.getService()); - assertEquals(expectedService.getId(), pipeline.getService().getId()); - assertEquals(expectedService.getType(), pipeline.getService().getType()); - } - validatePipelineTasks(pipeline, tasks); - TestUtils.validateTags(pipeline.getFullyQualifiedName(), expectedTags, pipeline.getTags()); - return pipeline; - } - - private static void validatePipelineTasks(Pipeline pipeline, List tasks) { - if (tasks != null) { - List expectedTaskReferences = new ArrayList<>(); - for (EntityReference task: tasks) { - expectedTaskReferences.add(task.getId()); - } - List actualTaskReferences = new ArrayList<>(); - for (EntityReference task: pipeline.getTasks()) { - TestUtils.validateEntityReference(task); - actualTaskReferences.add(task.getId()); - } - assertTrue(actualTaskReferences.containsAll(expectedTaskReferences)); - } - } - - private Pipeline patchPipelineAttributesAndCheck(Pipeline before, String newDescription, - EntityReference newOwner, List tags, - Map authHeaders, UpdateType updateType) - throws JsonProcessingException, HttpResponseException { - String updatedBy = TestUtils.getPrincipal(authHeaders); - String pipelineJson = JsonUtils.pojoToJson(before); - - // Update the table attributes - before.setDescription(newDescription); - before.setOwner(newOwner); - before.setTags(tags); - - // Validate information returned in patch response has the updates - Pipeline updatedPipeline = patchPipeline(pipelineJson, before, authHeaders); - validatePipeline(updatedPipeline, before.getDescription(), newOwner, null, tags, - before.getTasks(), updatedBy); - TestUtils.validateUpdate(before.getVersion(), updatedPipeline.getVersion(), updateType); - - // GET the table and Validate information returned - Pipeline getPipeline = getPipeline(before.getId(), "service,owner", authHeaders); - validatePipeline(updatedPipeline, getPipeline.getDescription(), newOwner, null, tags, - getPipeline.getTasks(), updatedBy); - return updatedPipeline; - } - - private Pipeline patchPipeline(UUID pipelineId, String originalJson, Pipeline updatedPipeline, - Map authHeaders) - throws JsonProcessingException, HttpResponseException { - String updatePipelineJson = JsonUtils.pojoToJson(updatedPipeline); - JsonPatch patch = JsonSchemaUtil.getJsonPatch(originalJson, updatePipelineJson); - return TestUtils.patch(getResource("pipelines/" + pipelineId), patch, Pipeline.class, authHeaders); - } - - private Pipeline patchPipeline(String originalJson, - Pipeline updatedPipeline, - Map authHeaders) - throws JsonProcessingException, HttpResponseException { - return patchPipeline(updatedPipeline.getId(), originalJson, updatedPipeline, authHeaders); + private static void validatePipelineTasks(Pipeline pipeline, List expectedTasks) { + assertEquals(expectedTasks, pipeline.getTasks()); } public static void getPipeline(UUID id, Map authHeaders) throws HttpResponseException { diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/tasks/TaskResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/tasks/TaskResourceTest.java deleted file mode 100644 index 1c49d10cd4c..00000000000 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/tasks/TaskResourceTest.java +++ /dev/null @@ -1,506 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.openmetadata.catalog.resources.tasks; - -import com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.http.client.HttpResponseException; -import org.joda.time.DateTime; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import org.openmetadata.catalog.CatalogApplicationTest; -import org.openmetadata.catalog.Entity; -import org.openmetadata.catalog.api.data.CreateTask; -import org.openmetadata.catalog.api.services.CreatePipelineService; -import org.openmetadata.catalog.entity.data.Task; -import org.openmetadata.catalog.entity.services.PipelineService; -import org.openmetadata.catalog.exception.CatalogExceptionMessage; -import org.openmetadata.catalog.jdbi3.TaskRepository.TaskEntityInterface; -import org.openmetadata.catalog.resources.EntityResourceTest; -import org.openmetadata.catalog.resources.tasks.TaskResource.TaskList; -import org.openmetadata.catalog.type.ChangeDescription; -import org.openmetadata.catalog.type.EntityReference; -import org.openmetadata.catalog.type.TagLabel; -import org.openmetadata.catalog.util.EntityInterface; -import org.openmetadata.catalog.util.JsonUtils; -import org.openmetadata.catalog.util.TestUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.client.WebTarget; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import static javax.ws.rs.core.Response.Status.BAD_REQUEST; -import static javax.ws.rs.core.Response.Status.CONFLICT; -import static javax.ws.rs.core.Response.Status.FORBIDDEN; -import static javax.ws.rs.core.Response.Status.NOT_FOUND; -import static javax.ws.rs.core.Response.Status.OK; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; -import static org.openmetadata.catalog.util.TestUtils.LONG_ENTITY_NAME; -import static org.openmetadata.catalog.util.TestUtils.NON_EXISTENT_ENTITY; -import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE; -import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders; -import static org.openmetadata.catalog.util.TestUtils.assertEntityPagination; -import static org.openmetadata.catalog.util.TestUtils.assertResponse; -import static org.openmetadata.catalog.util.TestUtils.authHeaders; - - -public class TaskResourceTest extends EntityResourceTest { - private static final Logger LOG = LoggerFactory.getLogger(TaskResourceTest.class); - - public TaskResourceTest() { - super(Task.class, "tasks", TaskResource.FIELDS); - } - - @BeforeAll - public static void setup(TestInfo test) throws HttpResponseException, URISyntaxException { - EntityResourceTest.setup(test); - - } - - @Override - public Object createRequest(TestInfo test, String description, String displayName, EntityReference owner) - throws URISyntaxException { - return create(test).withDescription(description).withDisplayName(displayName).withOwner(owner); - } - - @Override - public void validateCreatedEntity(Task task, Object request, Map authHeaders) throws HttpResponseException { - CreateTask createRequest = (CreateTask) request; - validateCommonEntityFields(getEntityInterface(task), createRequest.getDescription(), - TestUtils.getPrincipal(authHeaders), createRequest.getOwner()); - - assertEquals(createRequest.getTaskUrl(), task.getTaskUrl()); - assertService(createRequest.getService(), task.getService()); - TestUtils.validateTags(task.getFullyQualifiedName(), createRequest.getTags(), task.getTags()); - } - - @Override - public void validateUpdatedEntity(Task updatedEntity, Object request, Map authHeaders) throws HttpResponseException { - validateCreatedEntity(updatedEntity, request, authHeaders); - } - - @Override - public void validatePatchedEntity(Task expected, Task patched, Map authHeaders) throws HttpResponseException { - validateCommonEntityFields(getEntityInterface(patched), expected.getDescription(), - TestUtils.getPrincipal(authHeaders), expected.getOwner()); - - // Entity specific validation - assertEquals(expected.getTaskUrl(), patched.getTaskUrl()); - assertService(expected.getService(), patched.getService()); - TestUtils.validateTags(expected.getFullyQualifiedName(), expected.getTags(), patched.getTags()); - } - - @Override - public EntityInterface getEntityInterface(Task entity) { - return new TaskEntityInterface(entity); - } - - @Test - public void post_taskWithLongName_400_badRequest(TestInfo test) throws URISyntaxException { - // Create task with mandatory name field empty - CreateTask create = create(test).withName(LONG_ENTITY_NAME); - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - createTask(create, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]"); - } - - @Test - public void post_taskAlreadyExists_409_conflict(TestInfo test) throws HttpResponseException, URISyntaxException { - CreateTask create = create(test); - createTask(create, adminAuthHeaders()); - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - createTask(create, adminAuthHeaders())); - assertResponse(exception, CONFLICT, CatalogExceptionMessage.ENTITY_ALREADY_EXISTS); - } - - @Test - public void post_validTasks_as_admin_200_OK(TestInfo test) throws HttpResponseException, URISyntaxException { - // Create team with different optional fields - CreateTask create = create(test); - createAndCheckEntity(create, adminAuthHeaders()); - - create.withName(getTaskName(test, 1)).withDescription("description"); - createAndCheckEntity(create, adminAuthHeaders()); - } - - @Test - public void post_taskWithUserOwner_200_ok(TestInfo test) throws HttpResponseException, URISyntaxException { - createAndCheckEntity(create(test).withOwner(USER_OWNER1), adminAuthHeaders()); - } - - @Test - public void post_taskWithTeamOwner_200_ok(TestInfo test) throws HttpResponseException, URISyntaxException { - createAndCheckEntity(create(test).withOwner(TEAM_OWNER1).withDisplayName("chart1"), adminAuthHeaders()); - } - - @Test - public void post_task_as_non_admin_401(TestInfo test) throws URISyntaxException { - CreateTask create = create(test); - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - createTask(create, authHeaders("test@open-metadata.org"))); - assertResponse(exception, FORBIDDEN, "Principal: CatalogPrincipal{name='test'} is not admin"); - } - - @Test - public void post_taskWithoutRequiredFields_4xx(TestInfo test) { - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - createTask(create(test).withName(null), adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "[name must not be null]"); - - exception = assertThrows(HttpResponseException.class, () -> - createTask(create(test).withName(LONG_ENTITY_NAME), adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]"); - - // Service is required field - exception = assertThrows(HttpResponseException.class, () -> - createTask(create(test).withService(null), adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "[service must not be null]"); - - } - - @Test - public void post_taskWithInvalidOwnerType_4xx(TestInfo test) throws URISyntaxException { - EntityReference owner = new EntityReference().withId(TEAM1.getId()); /* No owner type is set */ - - CreateTask create = create(test).withOwner(owner); - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - createTask(create, adminAuthHeaders())); - TestUtils.assertResponseContains(exception, BAD_REQUEST, "type must not be null"); - } - - @Test - public void post_taskWithNonExistentOwner_4xx(TestInfo test) throws URISyntaxException { - EntityReference owner = new EntityReference().withId(NON_EXISTENT_ENTITY).withType("user"); - CreateTask create = create(test).withOwner(owner); - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - createTask(create, adminAuthHeaders())); - assertResponse(exception, NOT_FOUND, entityNotFound("User", NON_EXISTENT_ENTITY)); - } - - @Test - public void post_taskWithDifferentService_200_ok(TestInfo test) throws HttpResponseException, URISyntaxException { - EntityReference[] differentServices = {AIRFLOW_REFERENCE, PREFECT_REFERENCE}; - - // Create task for each service and test APIs - for (EntityReference service : differentServices) { - createAndCheckEntity(create(test).withService(service), adminAuthHeaders()); - - // List tasks by filtering on service name and ensure right tasks are returned in the response - TaskList list = listTasks("service", service.getName(), adminAuthHeaders()); - for (Task task : list.getData()) { - assertEquals(service.getName(), task.getService().getName()); - } - } - } - - @Test - public void get_taskListWithInvalidLimitOffset_4xx() { - // Limit must be >= 1 and <= 1000,000 - HttpResponseException exception = assertThrows(HttpResponseException.class, () - -> listTasks(null, null, -1, null, null, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]"); - - exception = assertThrows(HttpResponseException.class, () - -> listTasks(null, null, 0, null, null, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]"); - - exception = assertThrows(HttpResponseException.class, () - -> listTasks(null, null, 1000001, null, null, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "[query param limit must be less than or equal to 1000000]"); - } - - @Test - public void get_taskListWithInvalidPaginationCursors_4xx() { - // Passing both before and after cursors is invalid - HttpResponseException exception = assertThrows(HttpResponseException.class, () - -> listTasks(null, null, 1, "", "", adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "Only one of before or after query parameter allowed"); - } - - @Test - public void get_taskListWithValidLimitOffset_4xx(TestInfo test) throws HttpResponseException, URISyntaxException { - // Create a large number of tasks - int maxTasks = 40; - for (int i = 0; i < maxTasks; i++) { - createTask(create(test, i), adminAuthHeaders()); - } - - // List all tasks - TaskList allTasks = listTasks(null, null, 1000000, null, - null, adminAuthHeaders()); - int totalRecords = allTasks.getData().size(); - printTasks(allTasks); - - // List limit number tasks at a time at various offsets and ensure right results are returned - for (int limit = 1; limit < maxTasks; limit++) { - String after = null; - String before; - int pageCount = 0; - int indexInAllTasks = 0; - TaskList forwardPage; - TaskList backwardPage; - do { // For each limit (or page size) - forward scroll till the end - LOG.info("Limit {} forward scrollCount {} afterCursor {}", limit, pageCount, after); - forwardPage = listTasks(null, null, limit, null, after, adminAuthHeaders()); - printTasks(forwardPage); - after = forwardPage.getPaging().getAfter(); - before = forwardPage.getPaging().getBefore(); - assertEntityPagination(allTasks.getData(), forwardPage, limit, indexInAllTasks); - - if (pageCount == 0) { // CASE 0 - First page is being returned. There is no before cursor - assertNull(before); - } else { - // Make sure scrolling back based on before cursor returns the correct result - backwardPage = listTasks(null, null, limit, before, null, adminAuthHeaders()); - assertEntityPagination(allTasks.getData(), backwardPage, limit, (indexInAllTasks - limit)); - } - - indexInAllTasks += forwardPage.getData().size(); - pageCount++; - } while (after != null); - - // We have now reached the last page - test backward scroll till the beginning - pageCount = 0; - indexInAllTasks = totalRecords - limit - forwardPage.getData().size(); - do { - LOG.info("Limit {} backward scrollCount {} beforeCursor {}", limit, pageCount, before); - forwardPage = listTasks(null, null, limit, before, null, adminAuthHeaders()); - printTasks(forwardPage); - before = forwardPage.getPaging().getBefore(); - assertEntityPagination(allTasks.getData(), forwardPage, limit, indexInAllTasks); - pageCount++; - indexInAllTasks -= forwardPage.getData().size(); - } while (before != null); - } - } - - private void printTasks(TaskList list) { - list.getData().forEach(task -> LOG.info("Task {}", task.getFullyQualifiedName())); - LOG.info("before {} after {} ", list.getPaging().getBefore(), list.getPaging().getAfter()); - } - - @Test - public void put_taskUrlUpdate_200(TestInfo test) throws HttpResponseException, URISyntaxException { - URI taskURI = new URI("http://localhost:8080/task_id=1"); - String taskSQL = "select * from test;"; - Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate(); - Date endDate = new DateTime("2021-12-13T20:20:39+00:00").toDate(); - CreateTask request = create(test).withService(AIRFLOW_REFERENCE) - .withDescription("description").withTaskUrl(taskURI); - createAndCheckEntity(request, adminAuthHeaders()); - - // Updating description is ignored when backend already has description - Task task = updateEntity(request.withTaskUrl(taskURI).withTaskSQL(taskSQL) - .withTaskType("test").withStartDate(startDate).withEndDate(endDate), - OK, adminAuthHeaders()); - assertEquals(taskURI, task.getTaskUrl()); - assertEquals(taskSQL, task.getTaskSQL()); - assertEquals("test", task.getTaskType()); - assertEquals(startDate, task.getStartDate()); - assertEquals(endDate, task.getEndDate()); - } - - @Test - public void get_nonExistentTask_404_notFound() { - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - getTask(NON_EXISTENT_ENTITY, adminAuthHeaders())); - assertResponse(exception, NOT_FOUND, - entityNotFound(Entity.TASK, NON_EXISTENT_ENTITY)); - } - - @Test - public void get_taskWithDifferentFields_200_OK(TestInfo test) throws HttpResponseException, URISyntaxException { - CreateTask create = create(test).withDescription("description").withOwner(USER_OWNER1) - .withService(AIRFLOW_REFERENCE); - Task task = createAndCheckEntity(create, adminAuthHeaders()); - validateGetWithDifferentFields(task, false); - } - - @Test - public void get_taskByNameWithDifferentFields_200_OK(TestInfo test) throws HttpResponseException, URISyntaxException { - CreateTask create = create(test).withDescription("description").withOwner(USER_OWNER1) - .withService(AIRFLOW_REFERENCE); - Task task = createAndCheckEntity(create, adminAuthHeaders()); - validateGetWithDifferentFields(task, true); - } - - @Test - public void patch_taskAttributes_200_ok(TestInfo test) throws HttpResponseException, JsonProcessingException, - URISyntaxException { - // Create task without description, owner - Task task = createTask(create(test), adminAuthHeaders()); - assertNull(task.getDescription()); - assertNull(task.getOwner()); - assertNotNull(task.getService()); - - // - // Add description, owner and tags when previously they were null - // - List taskTags = List.of(USER_ADDRESS_TAG_LABEL); - String origJson = JsonUtils.pojoToJson(task); - task.withDescription("description").withOwner(TEAM_OWNER1).withTags(taskTags); - ChangeDescription change = getChangeDescription(task.getVersion()) - .withFieldsAdded(Arrays.asList("description","owner", "tags")); - task = patchEntityAndCheck(task, origJson, adminAuthHeaders(), MINOR_UPDATE, change); - task.setOwner(TEAM_OWNER1); // Get rid of href and name returned in the response for owner - task.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service - - // - // Update description, tier, owner - // - taskTags = List.of(USER_ADDRESS_TAG_LABEL, TIER1_TAG_LABEL); - origJson = JsonUtils.pojoToJson(task); - task.withDescription("description1").withOwner(USER_OWNER1).withTags(taskTags); - change = getChangeDescription(task.getVersion()).withFieldsUpdated(Arrays.asList("description", "owner", "tags")); - task = patchEntityAndCheck(task, origJson, adminAuthHeaders(), MINOR_UPDATE, change); - task.setOwner(USER_OWNER1); // Get rid of href and name returned in the response for owner - task.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service - - // - // Remove description and owner - remove a tag - // - taskTags = List.of(TIER1_TAG_LABEL); - origJson = JsonUtils.pojoToJson(task); - task.withDescription(null).withOwner(null).withTags(taskTags); - change = getChangeDescription(task.getVersion()).withFieldsDeleted(Arrays.asList("description","owner")) - .withFieldsUpdated(Collections.singletonList("tags")); - patchEntityAndCheck(task, origJson, adminAuthHeaders(), MINOR_UPDATE, change); - } - - @Test - public void delete_emptyTask_200_ok(TestInfo test) throws HttpResponseException, URISyntaxException { - Task task = createTask(create(test), adminAuthHeaders()); - deleteTask(task.getId(), adminAuthHeaders()); - } - - @Test - public void delete_nonExistentTask_404() { - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - deleteTask(NON_EXISTENT_ENTITY, adminAuthHeaders())); - assertResponse(exception, NOT_FOUND, entityNotFound(Entity.TASK, NON_EXISTENT_ENTITY)); - } - - public static Task createTask(CreateTask create, - Map authHeaders) throws HttpResponseException { - return TestUtils.post(getResource("tasks"), create, Task.class, authHeaders); - } - - /** - * Validate returned fields GET .../tasks/{id}?fields="..." or GET .../tasks/name/{fqn}?fields="..." - */ - private void validateGetWithDifferentFields(Task task, boolean byName) throws HttpResponseException { - // .../tasks?fields=owner - String fields = "owner"; - task = byName ? getTaskByName(task.getFullyQualifiedName(), fields, adminAuthHeaders()) : - getTask(task.getId(), fields, adminAuthHeaders()); - assertNotNull(task.getOwner()); - assertNotNull(task.getService()); // We always return the service - - // .../tasks?fields=owner,service - fields = "owner,service"; - task = byName ? getTaskByName(task.getFullyQualifiedName(), fields, adminAuthHeaders()) : - getTask(task.getId(), fields, adminAuthHeaders()); - assertNotNull(task.getOwner()); - assertNotNull(task.getService()); - - // .../tasks?fields=owner,service - fields = "owner,service"; - task = byName ? getTaskByName(task.getFullyQualifiedName(), fields, adminAuthHeaders()) : - getTask(task.getId(), fields, adminAuthHeaders()); - assertNotNull(task.getOwner()); - assertNotNull(task.getService()); - } - - public static void getTask(UUID id, Map authHeaders) throws HttpResponseException { - getTask(id, null, authHeaders); - } - - public static Task getTask(UUID id, String fields, Map authHeaders) - throws HttpResponseException { - WebTarget target = getResource("tasks/" + id); - target = fields != null ? target.queryParam("fields", fields) : target; - return TestUtils.get(target, Task.class, authHeaders); - } - - public static Task getTaskByName(String fqn, String fields, Map authHeaders) - throws HttpResponseException { - WebTarget target = getResource("tasks/name/" + fqn); - target = fields != null ? target.queryParam("fields", fields) : target; - return TestUtils.get(target, Task.class, authHeaders); - } - - public static TaskList listTasks(String fields, String serviceParam, Map authHeaders) - throws HttpResponseException { - return listTasks(fields, serviceParam, null, null, null, authHeaders); - } - - public static TaskList listTasks(String fields, String serviceParam, Integer limitParam, - String before, String after, Map authHeaders) - throws HttpResponseException { - WebTarget target = getResource("tasks"); - target = fields != null ? target.queryParam("fields", fields) : target; - target = serviceParam != null ? target.queryParam("service", serviceParam) : target; - target = limitParam != null ? target.queryParam("limit", limitParam) : target; - target = before != null ? target.queryParam("before", before) : target; - target = after != null ? target.queryParam("after", after) : target; - return TestUtils.get(target, TaskResource.TaskList.class, authHeaders); - } - - private void deleteTask(UUID id, Map authHeaders) throws HttpResponseException { - TestUtils.delete(getResource("tasks/" + id), authHeaders); - - // Ensure deleted task does not exist - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> getTask(id, authHeaders)); - assertResponse(exception, NOT_FOUND, entityNotFound(Entity.TASK, id)); - } - - public static String getTaskName(TestInfo test) { - return String.format("task_%s", test.getDisplayName()); - } - - public static String getTaskName(TestInfo test, int index) { - return String.format("task%d_%s", index, test.getDisplayName()); - } - - public static CreateTask create(TestInfo test) throws URISyntaxException { - return new CreateTask().withName(getTaskName(test)).withService(AIRFLOW_REFERENCE) - .withTaskUrl(new URI("http://localhost:0")); - } - - public static CreateTask create(TestInfo test, int index) throws URISyntaxException { - return new CreateTask().withName(getTaskName(test, index)).withService(AIRFLOW_REFERENCE) - .withTaskUrl(new URI("http://localhost:0")); - } - - public static PipelineService createService(CreatePipelineService create, - Map authHeaders) throws HttpResponseException { - return TestUtils.post(CatalogApplicationTest.getResource("services/pipelineServices"), - create, PipelineService.class, authHeaders); - } -} \ No newline at end of file