Fixes 991 - Merge Tasks as attributes into Pipeline

This commit is contained in:
sureshms 2021-10-31 08:56:34 -07:00
parent 7b54fb8bbe
commit 47b3dc5932
15 changed files with 242 additions and 1710 deletions

View File

@ -226,19 +226,6 @@ CREATE TABLE IF NOT EXISTS chart_entity (
INDEX (updatedAt)
);
CREATE TABLE IF NOT EXISTS task_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
fullyQualifiedName VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.fullyQualifiedName') NOT NULL,
json JSON NOT NULL,
updatedAt TIMESTAMP GENERATED ALWAYS AS (TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ'))) NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.updatedBy') NOT NULL,
timestamp BIGINT,
PRIMARY KEY (id),
UNIQUE KEY unique_name(fullyQualifiedName),
INDEX (updatedBy),
INDEX (updatedAt)
);
--
-- Feed related tables
--

View File

@ -34,7 +34,6 @@ public final class Entity {
public static final String CHART = "chart";
public static final String REPORT = "report";
public static final String TOPIC = "topic";
public static final String TASK = "task";
public static final String MODEL = "model";
public static final String BOTS = "bots";

View File

@ -16,7 +16,6 @@ import org.openmetadata.catalog.entity.data.Model;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Report;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Task;
import org.openmetadata.catalog.entity.data.Topic;
import org.openmetadata.catalog.entity.services.DashboardService;
import org.openmetadata.catalog.entity.services.DatabaseService;
@ -39,7 +38,6 @@ import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineEntityInterface
import org.openmetadata.catalog.jdbi3.PipelineServiceRepository.PipelineServiceEntityInterface;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportEntityInterface;
import org.openmetadata.catalog.jdbi3.TableRepository.TableEntityInterface;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskEntityInterface;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamEntityInterface;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicEntityInterface;
import org.openmetadata.catalog.jdbi3.UserRepository.UserEntityInterface;
@ -53,8 +51,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
public interface CollectionDAO {
@CreateSqlObject
@ -87,9 +83,6 @@ public interface CollectionDAO {
@CreateSqlObject
MetricsDAO metricsDAO();
@CreateSqlObject
TaskDAO taskDAO();
@CreateSqlObject
ChartDAO chartDAO();
@ -211,8 +204,8 @@ public interface CollectionDAO {
}
class EntityVersionPair {
private Double version;
private String entityJson;
private final Double version;
private final String entityJson;
public Double getVersion() {
return version;
@ -579,22 +572,6 @@ public interface CollectionDAO {
}
}
interface TaskDAO extends EntityDAO<Task>{
@Override
default String getTableName() { return "task_entity"; }
@Override
default Class<Task> getEntityClass() { return Task.class; }
@Override
default String getNameColumn() { return "fullyQualifiedName"; }
@Override
default EntityReference getEntityReference(Task entity) {
return new TaskEntityInterface(entity).getEntityReference();
}
}
interface TeamDAO extends EntityDAO<Team> {
@Override
default String getTableName() { return "team_entity"; }

View File

@ -25,6 +25,7 @@ import org.openmetadata.catalog.resources.pipelines.PipelineResource;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.type.Task;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.EntityUtil.Fields;
@ -32,11 +33,15 @@ import org.openmetadata.catalog.util.JsonUtils;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
@ -95,7 +100,9 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
pipeline.setConcurrency(pipeline.getConcurrency());
pipeline.setOwner(fields.contains("owner") ? getOwner(pipeline) : null);
pipeline.setFollowers(fields.contains("followers") ? getFollowers(pipeline) : null);
pipeline.setTasks(fields.contains("tasks") ? getTasks(pipeline) : null);
if (!fields.contains("tasks")) {
pipeline.withTasks(null);
}
pipeline.setTags(fields.contains("tags") ? getTags(pipeline.getFullyQualifiedName()) : null);
return pipeline;
}
@ -132,10 +139,9 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
EntityReference owner = pipeline.getOwner();
List<TagLabel> tags = pipeline.getTags();
EntityReference service = pipeline.getService();
List<EntityReference> tasks = pipeline.getTasks();
// Don't store owner, database, href and tags as JSON. Build it on the fly based on relationships
pipeline.withOwner(null).withService(null).withTasks(null).withHref(null).withTags(null);
pipeline.withOwner(null).withService(null).withHref(null).withTags(null);
if (update) {
dao.pipelineDAO().update(pipeline.getId(), JsonUtils.pojoToJson(pipeline));
@ -144,7 +150,7 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
}
// Restore the relationships
pipeline.withOwner(owner).withService(service).withTasks(tasks).withTags(tags);
pipeline.withOwner(owner).withService(service).withTags(tags);
}
@Override
@ -153,14 +159,6 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
dao.relationshipDAO().insert(service.getId().toString(), pipeline.getId().toString(), service.getType(),
Entity.PIPELINE, Relationship.CONTAINS.ordinal());
// Add relationship from pipeline to task
String pipelineId = pipeline.getId().toString();
if (pipeline.getTasks() != null) {
for (EntityReference task : pipeline.getTasks()) {
dao.relationshipDAO().insert(pipelineId, task.getId().toString(), Entity.PIPELINE, Entity.TASK,
Relationship.CONTAINS.ordinal());
}
}
// Add owner relationship
EntityUtil.setOwner(dao.relationshipDAO(), pipeline.getId(), Entity.PIPELINE, pipeline.getOwner());
@ -210,37 +208,10 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
return pipeline == null ? null : EntityUtil.getFollowers(pipeline.getId(), dao.relationshipDAO(), dao.userDAO());
}
private List<EntityReference> getTasks(Pipeline pipeline) throws IOException {
if (pipeline == null) {
return null;
}
String pipelineId = pipeline.getId().toString();
List<String> taskIds = dao.relationshipDAO().findTo(pipelineId, Relationship.CONTAINS.ordinal(), Entity.TASK);
List<EntityReference> tasks = new ArrayList<>();
for (String taskId : taskIds) {
tasks.add(dao.taskDAO().findEntityReferenceById(UUID.fromString(taskId)));
}
return tasks;
}
private void updateTaskRelationships(Pipeline pipeline) {
String pipelineId = pipeline.getId().toString();
// Add relationship from pipeline to task
if (pipeline.getTasks() != null) {
// Remove any existing tasks associated with this pipeline
dao.relationshipDAO().deleteFrom(pipelineId, Relationship.CONTAINS.ordinal(), Entity.TASK);
for (EntityReference task : pipeline.getTasks()) {
dao.relationshipDAO().insert(pipelineId, task.getId().toString(), Entity.PIPELINE, Entity.TASK,
Relationship.CONTAINS.ordinal());
}
}
}
static class PipelineEntityInterface implements EntityInterface<Pipeline> {
public static class PipelineEntityInterface implements EntityInterface<Pipeline> {
private final Pipeline entity;
PipelineEntityInterface(Pipeline entity) {
public PipelineEntityInterface(Pipeline entity) {
this.entity = entity;
}
@ -283,6 +254,9 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
@Override
public Date getUpdatedAt() { return entity.getUpdatedAt(); }
@Override
public URI getHref() { return entity.getHref(); }
@Override
public EntityReference getEntityReference() {
return new EntityReference().withId(getId()).withName(getFullyQualifiedName()).withDescription(getDescription())
@ -318,9 +292,10 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
}
@Override
public void setTags(List<TagLabel> tags) {
entity.setTags(tags);
}
public ChangeDescription getChangeDescription() { return entity.getChangeDescription(); }
@Override
public void setTags(List<TagLabel> tags) { entity.setTags(tags); }
}
/**
@ -339,18 +314,20 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
private void updateTasks(Pipeline origPipeline, Pipeline updatedPipeline) {
// Airflow lineage backend gets executed per task in a DAG. This means we will not a get full picture of the
// pipeline in each call. Hence we may create a pipeline and add a single task when one task finishes in a
// pipeline
// in the next task run we may have to update. To take care of this we will merge the tasks
if (updatedPipeline.getTasks() == null) {
updatedPipeline.setTasks(origPipeline.getTasks());
} else {
updatedPipeline.getTasks().addAll(origPipeline.getTasks()); // TODO remove duplicates
}
// pipeline in the next task run we may have to update. To take care of this we will merge the tasks
List<Task> updatedTasks = Optional.ofNullable(updatedPipeline.getTasks()).orElse(Collections.emptyList());
List<Task> origTasks = Optional.ofNullable(origPipeline.getTasks()).orElse(Collections.emptyList());
// Add relationship from pipeline to task
updateTaskRelationships(updatedPipeline);
recordChange("tasks", EntityUtil.getIDList(updatedPipeline.getTasks()),
EntityUtil.getIDList(origPipeline.getTasks()));
// TODO this might not provide distinct
updatedTasks = Stream.concat(origTasks.stream(), updatedTasks.stream()).distinct().collect(Collectors.toList());
if (origTasks.isEmpty()) {
origTasks = null;
}
if (updatedTasks.isEmpty()) {
updatedTasks = null;
}
updatedPipeline.setTasks(updatedTasks);
recordChange("tasks", origTasks, updatedTasks);
}
}
}

View File

@ -1,266 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Task;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.resources.tasks.TaskResource;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.JsonUtils;
import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
public class TaskRepository extends EntityRepository<Task> {
private static final Fields TASK_UPDATE_FIELDS = new Fields(TaskResource.FIELD_LIST, "owner," +
"taskConfig,tags,downstreamTasks");
private static final Fields TASK_PATCH_FIELDS = new Fields(TaskResource.FIELD_LIST, "owner,service,tags");
private final CollectionDAO dao;
public static String getFQN(Task task) {
return (task.getService().getName() + "." + task.getName());
}
public TaskRepository(CollectionDAO dao) {
super(Task.class, dao.taskDAO(), dao, TASK_PATCH_FIELDS, TASK_UPDATE_FIELDS);
this.dao = dao;
}
@Transaction
public void delete(UUID id) {
if (dao.relationshipDAO().findToCount(id.toString(), Relationship.CONTAINS.ordinal(), Entity.TASK) > 0) {
throw new IllegalArgumentException("Task is not empty");
}
if (dao.taskDAO().delete(id) <= 0) {
throw EntityNotFoundException.byMessage(entityNotFound(Entity.TASK, id));
}
dao.relationshipDAO().deleteAll(id.toString());
}
@Override
public void validate(Task task) throws IOException {
EntityReference pipelineService = getService(task.getService());
task.setService(pipelineService);
task.setFullyQualifiedName(getFQN(task));
EntityUtil.populateOwner(dao.userDAO(), dao.teamDAO(), task.getOwner()); // Validate owner
getService(task.getService());
task.setTags(EntityUtil.addDerivedTags(dao.tagDAO(), task.getTags()));
}
@Override
public void store(Task task, boolean update) throws IOException {
// Relationships and fields such as href are derived and not stored as part of json
EntityReference owner = task.getOwner();
List<TagLabel> tags = task.getTags();
EntityReference service = task.getService();
// Don't store owner, database, href and tags as JSON. Build it on the fly based on relationships
task.withOwner(null).withService(null).withHref(null).withTags(null);
if (update) {
dao.taskDAO().update(task.getId(), JsonUtils.pojoToJson(task));
} else {
dao.taskDAO().insert(task);
}
// Restore the relationships
task.withOwner(owner).withService(service).withTags(tags);
}
@Override
public void storeRelationships(Task task) throws IOException {
setService(task, task.getService());
setOwner(task, task.getOwner());
applyTags(task);
}
private void applyTags(Task task) throws IOException {
// Add task level tags by adding tag to task relationship
EntityUtil.applyTags(dao.tagDAO(), task.getTags(), task.getFullyQualifiedName());
task.setTags(getTags(task.getFullyQualifiedName())); // Update tag to handle additional derived tags
}
public EntityReference getOwner(Task task) throws IOException {
return task != null ?
EntityUtil.populateOwner(task.getId(), dao.relationshipDAO(), dao.userDAO(), dao.teamDAO()) : null;
}
private void setOwner(Task task, EntityReference owner) {
EntityUtil.setOwner(dao.relationshipDAO(), task.getId(), Entity.TASK, owner);
task.setOwner(owner);
}
@Override
public Task setFields(Task task, Fields fields) throws IOException {
task.setTaskUrl(task.getTaskUrl());
task.setTaskSQL(task.getTaskSQL());
task.setStartDate(task.getStartDate());
task.setEndDate(task.getEndDate());
task.setService(getService(task));
task.setOwner(fields.contains("owner") ? getOwner(task) : null);
task.setTags(fields.contains("tags") ? getTags(task.getFullyQualifiedName()) : null);
task.setDownstreamTasks(fields.contains("downstreamTasks") ? task.getDownstreamTasks() : null);
return task;
}
@Override
public void restorePatchAttributes(Task original, Task updated) throws IOException, ParseException {
}
@Override
public EntityInterface<Task> getEntityInterface(Task entity) {
return new TaskEntityInterface(entity);
}
private List<TagLabel> getTags(String fqn) {
return dao.tagDAO().getTags(fqn);
}
private EntityReference getService(Task task) throws IOException {
return task == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(dao.relationshipDAO(),
task.getId(), Entity.PIPELINE_SERVICE)));
}
private EntityReference getService(EntityReference service) throws IOException {
if (service.getType().equalsIgnoreCase(Entity.PIPELINE_SERVICE)) {
PipelineService serviceInstance = dao.pipelineServiceDAO().findEntityById(service.getId());
service.setDescription(serviceInstance.getDescription());
service.setName(serviceInstance.getName());
} else {
throw new IllegalArgumentException(String.format("Invalid service type %s for the task", service.getType()));
}
return service;
}
public void setService(Task task, EntityReference service) throws IOException {
if (service != null && task != null) {
getService(service); // Populate service details
dao.relationshipDAO().insert(service.getId().toString(), task.getId().toString(), service.getType(),
Entity.TASK, Relationship.CONTAINS.ordinal());
task.setService(service);
}
}
public static class TaskEntityInterface implements EntityInterface<Task> {
private final Task entity;
public TaskEntityInterface(Task entity) {
this.entity = entity;
}
@Override
public UUID getId() {
return entity.getId();
}
@Override
public String getDescription() {
return entity.getDescription();
}
@Override
public String getDisplayName() {
return entity.getDisplayName();
}
@Override
public EntityReference getOwner() {
return entity.getOwner();
}
@Override
public String getFullyQualifiedName() {
return entity.getFullyQualifiedName();
}
@Override
public List<TagLabel> getTags() {
return entity.getTags();
}
@Override
public Double getVersion() { return entity.getVersion(); }
@Override
public String getUpdatedBy() { return entity.getUpdatedBy(); }
@Override
public Date getUpdatedAt() { return entity.getUpdatedAt(); }
@Override
public URI getHref() { return entity.getHref(); }
@Override
public EntityReference getEntityReference() {
return new EntityReference().withId(getId()).withName(getFullyQualifiedName()).withDescription(getDescription())
.withDisplayName(getDisplayName()).withType(Entity.TASK);
}
@Override
public Task getEntity() { return entity; }
@Override
public void setId(UUID id) { entity.setId(id); }
@Override
public void setDescription(String description) {
entity.setDescription(description);
}
@Override
public void setDisplayName(String displayName) {
entity.setDisplayName(displayName);
}
@Override
public void setUpdateDetails(String updatedBy, Date updatedAt) {
entity.setUpdatedBy(updatedBy);
entity.setUpdatedAt(updatedAt);
}
@Override
public void setChangeDescription(Double newVersion, ChangeDescription changeDescription) {
entity.setVersion(newVersion);
entity.setChangeDescription(changeDescription);
}
@Override
public ChangeDescription getChangeDescription() { return entity.getChangeDescription(); }
@Override
public void setTags(List<TagLabel> tags) {
entity.setTags(tags);
}
}
}

View File

@ -28,7 +28,6 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.openmetadata.catalog.api.data.CreateDatabase;
import org.openmetadata.catalog.entity.data.Database;
import org.openmetadata.catalog.entity.data.Task;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.jdbi3.DatabaseRepository;
import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseEntityInterface;

View File

@ -33,6 +33,7 @@ import org.openmetadata.catalog.jdbi3.PipelineRepository;
import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.security.CatalogAuthorizer;
import org.openmetadata.catalog.security.SecurityUtil;
import org.openmetadata.catalog.type.EntityHistory;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.EntityUtil.Fields;
@ -95,9 +96,6 @@ public class PipelineResource {
pipeline.setHref(RestUtil.getHref(uriInfo, PIPELINE_COLLECTION_PATH, pipeline.getId()));
EntityUtil.addHref(uriInfo, pipeline.getOwner());
EntityUtil.addHref(uriInfo, pipeline.getService());
if (pipeline.getTasks() != null) {
EntityUtil.addHref(uriInfo, pipeline.getTasks());
}
EntityUtil.addHref(uriInfo, pipeline.getFollowers());
return pipeline;
}
@ -170,6 +168,22 @@ public class PipelineResource {
return pipelines;
}
@GET
@Path("/{id}/versions")
@Operation(summary = "List pipeline versions", tags = "pipelines",
description = "Get a list of all the versions of a pipeline identified by `id`",
responses = {@ApiResponse(responseCode = "200", description = "List of pipeline versions",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = EntityHistory.class)))
})
public EntityHistory listVersions(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "pipeline Id", schema = @Schema(type = "string"))
@PathParam("id") String id)
throws IOException, ParseException, GeneralSecurityException {
return dao.listVersions(id);
}
@GET
@Path("/{id}")
@Operation(summary = "Get a pipeline", tags = "pipelines",
@ -210,6 +224,26 @@ public class PipelineResource {
return addHref(uriInfo, pipeline);
}
@GET
@Path("/{id}/versions/{version}")
@Operation(summary = "Get a version of the pipeline", tags = "pipelines",
description = "Get a version of the pipeline by given `id`",
responses = {
@ApiResponse(responseCode = "200", description = "pipeline",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = Pipeline.class))),
@ApiResponse(responseCode = "404", description = "Pipeline for instance {id} and version {version} is " +
"not found")
})
public Pipeline getVersion(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Pipeline Id", schema = @Schema(type = "string"))
@PathParam("id") String id,
@Parameter(description = "Pipeline version number in the form `major`.`minor`",
schema = @Schema(type = "string", example = "0.1 or 1.1"))
@PathParam("version") String version) throws IOException, ParseException {
return dao.getVersion(id, version);
}
@POST
@Operation(summary = "Create a pipeline", tags = "pipelines",

View File

@ -1,331 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.resources.tasks;
import com.google.inject.Inject;
import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.ExternalDocumentation;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.ExampleObject;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.openmetadata.catalog.api.data.CreateTask;
import org.openmetadata.catalog.entity.data.Task;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskEntityInterface;
import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.security.CatalogAuthorizer;
import org.openmetadata.catalog.security.SecurityUtil;
import org.openmetadata.catalog.type.EntityHistory;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.RestUtil.PutResponse;
import org.openmetadata.catalog.util.ResultList;
import javax.json.JsonPatch;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.PATCH;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
@Path("/v1/tasks")
@Api(value = "tasks data asset collection", tags = "Task data asset collection")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "tasks")
public class TaskResource {
private static final String TASK_COLLECTION_PATH = "v1/tasks/";
private final TaskRepository dao;
private final CatalogAuthorizer authorizer;
public static void addHref(UriInfo uriInfo, EntityReference ref) {
ref.withHref(RestUtil.getHref(uriInfo, TASK_COLLECTION_PATH, ref.getId()));
}
public static List<Task> addHref(UriInfo uriInfo, List<Task> tasks) {
Optional.ofNullable(tasks).orElse(Collections.emptyList()).forEach(i -> addHref(uriInfo, i));
return tasks;
}
public static Task addHref(UriInfo uriInfo, Task task) {
task.setHref(RestUtil.getHref(uriInfo, TASK_COLLECTION_PATH, task.getId()));
EntityUtil.addHref(uriInfo, task.getOwner());
EntityUtil.addHref(uriInfo, task.getService());
return task;
}
@Inject
public TaskResource(CollectionDAO dao, CatalogAuthorizer authorizer) {
Objects.requireNonNull(dao, "TaskRepository must not be null");
this.dao = new TaskRepository(dao);
this.authorizer = authorizer;
}
public static class TaskList extends ResultList<Task> {
@SuppressWarnings("unused")
TaskList() {
// Empty constructor needed for deserialization
}
public TaskList(List<Task> data, String beforeCursor, String afterCursor, int total)
throws GeneralSecurityException, UnsupportedEncodingException {
super(data, beforeCursor, afterCursor, total);
}
}
static final String FIELDS = "downstreamTasks,taskConfig,owner,service,tags";
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "")
.split(","));
@GET
@Operation(summary = "List tasks", tags = "tasks",
description = "Get a list of tasks, optionally filtered by `service` it belongs to. Use `fields` " +
"parameter to get only necessary fields. Use cursor-based pagination to limit the number " +
"entries in the list using `limit` and `before` or `after` query params.",
responses = {
@ApiResponse(responseCode = "200", description = "List of tasks",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = TaskList.class)))
})
public ResultList<Task> list(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Fields requested in the returned resource",
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields") String fieldsParam,
@Parameter(description = "Filter tasks by service name",
schema = @Schema(type = "string", example = "superset"))
@QueryParam("service") String serviceParam,
@Parameter(description = "Limit the number tasks returned. (1 to 1000000, default = 10)")
@DefaultValue("10")
@QueryParam("limit") @Min(1) @Max(1000000) int limitParam,
@Parameter(description = "Returns list of tasks before this cursor",
schema = @Schema(type = "string"))
@QueryParam("before") String before,
@Parameter(description = "Returns list of tasks after this cursor",
schema = @Schema(type = "string"))
@QueryParam("after") String after
) throws IOException, GeneralSecurityException, ParseException {
RestUtil.validateCursors(before, after);
Fields fields = new Fields(FIELD_LIST, fieldsParam);
ResultList<Task> tasks;
if (before != null) { // Reverse paging
tasks = dao.listBefore(fields, serviceParam, limitParam, before); // Ask for one extra entry
} else { // Forward paging or first page
tasks = dao.listAfter(fields, serviceParam, limitParam, after);
}
addHref(uriInfo, tasks.getData());
return tasks;
}
@GET
@Path("/{id}/versions")
@Operation(summary = "List task versions", tags = "tasks",
description = "Get a list of all the versions of a task identified by `id`",
responses = {@ApiResponse(responseCode = "200", description = "List of task versions",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = EntityHistory.class)))
})
public EntityHistory listVersions(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "task Id", schema = @Schema(type = "string"))
@PathParam("id") String id)
throws IOException, ParseException, GeneralSecurityException {
return dao.listVersions(id);
}
@GET
@Path("/{id}")
@Operation(summary = "Get a Task", tags = "tasks",
description = "Get a task by `id`.",
responses = {
@ApiResponse(responseCode = "200", description = "The Task",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = Task.class))),
@ApiResponse(responseCode = "404", description = "Task for instance {id} is not found")
})
public Task get(@Context UriInfo uriInfo, @PathParam("id") String id,
@Context SecurityContext securityContext,
@Parameter(description = "Fields requested in the returned resource",
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields") String fieldsParam) throws IOException, ParseException {
Fields fields = new Fields(FIELD_LIST, fieldsParam);
return addHref(uriInfo, dao.get(id, fields));
}
@GET
@Path("/name/{fqn}")
@Operation(summary = "Get a task by name", tags = "tasks",
description = "Get a task by fully qualified name.",
responses = {
@ApiResponse(responseCode = "200", description = "The task",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = Task.class))),
@ApiResponse(responseCode = "404", description = "Task for instance {id} is not found")
})
public Response getByName(@Context UriInfo uriInfo, @PathParam("fqn") String fqn,
@Context SecurityContext securityContext,
@Parameter(description = "Fields requested in the returned resource",
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields") String fieldsParam) throws IOException, ParseException {
Fields fields = new Fields(FIELD_LIST, fieldsParam);
Task task = dao.getByName(fqn, fields);
addHref(uriInfo, task);
return Response.ok(task).build();
}
@GET
@Path("/{id}/versions/{version}")
@Operation(summary = "Get a version of the task", tags = "tasks",
description = "Get a version of the task by given `id`",
responses = {
@ApiResponse(responseCode = "200", description = "task",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = Task.class))),
@ApiResponse(responseCode = "404", description = "Task for instance {id} and version {version} is " +
"not found")
})
public Task getVersion(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Task Id", schema = @Schema(type = "string"))
@PathParam("id") String id,
@Parameter(description = "Task version number in the form `major`.`minor`",
schema = @Schema(type = "string", example = "0.1 or 1.1"))
@PathParam("version") String version) throws IOException, ParseException {
return dao.getVersion(id, version);
}
@POST
@Operation(summary = "Create a task", tags = "tasks",
description = "Create a task under an existing `service`.",
responses = {
@ApiResponse(responseCode = "200", description = "The task",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = CreateTask.class))),
@ApiResponse(responseCode = "400", description = "Bad request")
})
public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext,
@Valid CreateTask create) throws IOException, ParseException {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
Task task = getTask(securityContext, create);
task = addHref(uriInfo, dao.create(task));
return Response.created(task.getHref()).entity(task).build();
}
@PATCH
@Path("/{id}")
@Operation(summary = "Update a Task", tags = "task",
description = "Update an existing task using JsonPatch.",
externalDocs = @ExternalDocumentation(description = "JsonPatch RFC",
url = "https://tools.ietf.org/html/rfc6902"))
@Consumes(MediaType.APPLICATION_JSON_PATCH_JSON)
public Task updateDescription(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@PathParam("id") String id,
@RequestBody(description = "JsonPatch with array of operations",
content = @Content(mediaType = MediaType.APPLICATION_JSON_PATCH_JSON,
examples = {@ExampleObject("[" +
"{op:remove, path:/a}," +
"{op:add, path: /b, value: val}" +
"]")}))
JsonPatch patch) throws IOException, ParseException {
Fields fields = new Fields(FIELD_LIST, FIELDS);
Task task = dao.get(id, fields);
SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext,
new TaskEntityInterface(task).getEntityReference());
task = dao.patch(UUID.fromString(id), securityContext.getUserPrincipal().getName(), patch);
return addHref(uriInfo, task);
}
@PUT
@Operation(summary = "Create or update task", tags = "tasks",
description = "Create a task, it it does not exist or update an existing task.",
responses = {
@ApiResponse(responseCode = "200", description = "The updated task ",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = CreateTask.class)))
})
public Response createOrUpdate(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Valid CreateTask create) throws IOException, ParseException {
Task task = getTask(securityContext, create).withDownstreamTasks(create.getDownstreamTasks());
PutResponse<Task> response = dao.createOrUpdate(task);
task = addHref(uriInfo, response.getEntity());
return Response.status(response.getStatus()).entity(task).build();
}
@DELETE
@Path("/{id}")
@Operation(summary = "Delete a Task", tags = "tasks",
description = "Delete a task by `id`.",
responses = {
@ApiResponse(responseCode = "200", description = "OK"),
@ApiResponse(responseCode = "404", description = "task for instance {id} is not found")
})
public Response delete(@Context UriInfo uriInfo, @PathParam("id") String id) {
dao.delete(UUID.fromString(id));
return Response.ok().build();
}
private Task getTask(SecurityContext securityContext, CreateTask create) {
return new Task().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName())
.withDescription(create.getDescription())
.withService(create.getService())
.withStartDate(create.getStartDate())
.withEndDate(create.getEndDate())
.withTaskType(create.getTaskType())
.withTaskSQL(create.getTaskSQL())
.withTaskUrl(create.getTaskUrl())
.withTags(create.getTags())
.withOwner(create.getOwner())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
}
}

View File

@ -39,7 +39,6 @@ import org.openmetadata.catalog.resources.services.dashboard.DashboardServiceRes
import org.openmetadata.catalog.resources.services.database.DatabaseServiceResource;
import org.openmetadata.catalog.resources.services.messaging.MessagingServiceResource;
import org.openmetadata.catalog.resources.services.pipeline.PipelineServiceResource;
import org.openmetadata.catalog.resources.tasks.TaskResource;
import org.openmetadata.catalog.resources.teams.TeamResource;
import org.openmetadata.catalog.resources.teams.UserResource;
import org.openmetadata.catalog.resources.topics.TopicResource;
@ -129,8 +128,6 @@ public final class EntityUtil {
DashboardResource.addHref(uriInfo, ref);
} else if (entity.equalsIgnoreCase(Entity.MODEL)) {
ModelResource.addHref(uriInfo, ref);
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
TaskResource.addHref(uriInfo, ref);
} else if (entity.equalsIgnoreCase(Entity.PIPELINE)) {
PipelineResource.addHref(uriInfo, ref);
} else if (entity.equalsIgnoreCase(Entity.DATABASE_SERVICE)) {
@ -247,8 +244,6 @@ public final class EntityUtil {
return dao.topicDAO().findEntityReferenceById(id);
} else if (entity.equalsIgnoreCase(Entity.CHART)) {
return dao.chartDAO().findEntityReferenceById(id);
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
return dao.taskDAO().findEntityReferenceById(id);
} else if (entity.equalsIgnoreCase(Entity.PIPELINE)) {
return dao.pipelineDAO().findEntityReferenceById(id);
} else if (entity.equalsIgnoreCase(Entity.MODEL)) {
@ -273,8 +268,6 @@ public final class EntityUtil {
return dao.chartDAO().findEntityReferenceByName(fqn);
} else if (entity.equalsIgnoreCase(Entity.DASHBOARD)) {
return dao.dashboardDAO().findEntityReferenceByName(fqn);
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
return dao.taskDAO().findEntityReferenceByName(fqn);
} else if (entity.equalsIgnoreCase(Entity.PIPELINE)) {
return dao.pipelineDAO().findEntityReferenceByName(fqn);
} else if (entity.equalsIgnoreCase(Entity.MODEL)) {

View File

@ -41,7 +41,7 @@
"description": "All the tasks that are part of pipeline.",
"type": "array",
"items": {
"$ref": "../../type/entityReference.json"
"$ref": "../../entity/data/pipeline.json#/definitions/task"
},
"default": null
},

View File

@ -1,71 +0,0 @@
{
"$id": "https://open-metadata.org/schema/api/data/createTask.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Create Task entity request",
"description": "Create Task entity request",
"type": "object",
"properties" : {
"name": {
"description": "Name that identifies this Task.",
"type": "string",
"minLength": 1,
"maxLength": 64
},
"displayName": {
"description": "Display Name that identifies this Task. It could be title or label from the pipeline services",
"type": "string"
},
"description": {
"description": "Description of the task instance. What it has and how to use it.",
"type": "string"
},
"taskUrl" : {
"description": "Task URL to visit/manage. This URL points to respective pipeline service UI",
"type": "string",
"format": "uri"
},
"taskType": {
"description": "Type of the Task. Usually refers to the class it implements",
"type": "string"
},
"taskSQL": {
"description": "SQL used in the task. Can be used to determine the lineage",
"type": "string"
},
"downstreamTasks": {
"description": "All the tasks that are downstream of this task.",
"type": "array",
"items": {
"type": "string",
"minLength": 1,
"maxLength": 64
},
"default": null
},
"startDate": {
"description": "Start date of the task",
"$ref": "../../type/basic.json#/definitions/dateTime"
},
"endDate": {
"description": "End date of the task",
"$ref": "../../type/basic.json#/definitions/dateTime"
},
"tags": {
"description": "Tags for this chart",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
},
"default": null
},
"owner": {
"description": "Owner of this Task",
"$ref": "../../type/entityReference.json"
},
"service" : {
"description": "Link to the pipeline service where this task is used",
"$ref" : "../../type/entityReference.json"
}
},
"required": ["name", "service"]
}

View File

@ -4,6 +4,69 @@
"title": "Pipeline",
"description": "This schema defines the Pipeline entity. A pipeline enables the flow of data from source to destination through a series of processing steps. ETL is a type of pipeline where the series of steps Extract, Transform and Load the data.",
"type": "object",
"definitions": {
"task": {
"type": "object",
"javaType": "org.openmetadata.catalog.type.Task",
"properties": {
"name": {
"description": "Name that identifies this task instance uniquely.",
"type": "string",
"minLength": 1,
"maxLength": 64
},
"displayName": {
"description": "Display Name that identifies this Task. It could be title or label from the pipeline services.",
"type": "string"
},
"fullyQualifiedName": {
"description": "A unique name that identifies a pipeline in the format 'ServiceName.PipelineName.TaskName'.",
"type": "string",
"minLength": 1,
"maxLength": 64
},
"description": {
"description": "Description of this Task.",
"type": "string"
},
"taskUrl": {
"description": "Task URL to visit/manage. This URL points to respective pipeline service UI.",
"type": "string",
"format": "uri"
},
"downstreamTasks": {
"description": "All the tasks that are downstream of this task.",
"type": "array",
"items": {
"type": "string",
"minLength": 1,
"maxLength": 64
},
"default": null
},
"taskType": {
"description": "Type of the Task. Usually refers to the class it implements.",
"type": "string"
},
"taskSQL": {
"description": "SQL used in the task. Can be used to determine the lineage.",
"$ref": "../../type/basic.json#/definitions/sqlQuery"
},
"tags": {
"description": "Tags for this task.",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
},
"default": null
}
},
"required": [
"id",
"name"
]
}
},
"properties" : {
"id": {
"description": "Unique identifier that identifies a pipeline instance.",
@ -62,7 +125,7 @@
"description": "All the tasks that are part of pipeline.",
"type": "array",
"items": {
"$ref": "../../type/entityReference.json"
"$ref": "#/definitions/task"
},
"default": null
},

View File

@ -1,102 +0,0 @@
{
"$id": "https://open-metadata.org/schema/entity/data/task.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Task",
"description": "This schema defines the Task entity. A task is a unit of computation in a Pipeline.",
"type": "object",
"javaType": "org.openmetadata.catalog.entity.data.Task",
"properties" : {
"id": {
"description": "Unique identifier that identifies a task instance.",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"name": {
"description": "Name that identifies this task instance uniquely.",
"type": "string",
"minLength": 1,
"maxLength": 64
},
"displayName": {
"description": "Display Name that identifies this Task. It could be title or label from the pipeline services.",
"type": "string"
},
"fullyQualifiedName": {
"description": "A unique name that identifies a pipeline in the format 'ServiceName.PipelineName.TaskName'.",
"type": "string",
"minLength": 1,
"maxLength": 64
},
"description": {
"description": "Description of this Task.",
"type": "string"
},
"version" : {
"description": "Metadata version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt" : {
"description": "Last update time corresponding to the new version of the entity.",
"$ref": "../../type/basic.json#/definitions/dateTime"
},
"updatedBy" : {
"description": "User who made the update.",
"type": "string"
},
"taskUrl" : {
"description": "Task URL to visit/manage. This URL points to respective pipeline service UI.",
"type": "string",
"format": "uri"
},
"downstreamTasks": {
"description": "All the tasks that are downstream of this task.",
"type": "array",
"items": {
"type": "string",
"minLength": 1,
"maxLength": 64
},
"default": null
},
"taskType": {
"description": "Type of the Task. Usually refers to the class it implements.",
"type": "string"
},
"taskSQL": {
"description": "SQL used in the task. Can be used to determine the lineage.",
"type": "string"
},
"startDate": {
"description": "Start date of the task.",
"$ref": "../../type/basic.json#/definitions/dateTime"
},
"endDate": {
"description": "End date of the task.",
"$ref": "../../type/basic.json#/definitions/dateTime"
},
"tags": {
"description": "Tags for this Pipeline.",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
},
"default": null
},
"href": {
"description": "Link to the resource corresponding to this entity.",
"$ref": "../../type/basic.json#/definitions/href"
},
"owner": {
"description": "Owner of this pipeline.",
"$ref": "../../type/entityReference.json"
},
"service" : {
"description": "Link to service where this pipeline is hosted in.",
"$ref" : "../../type/entityReference.json"
},
"changeDescription": {
"description" : "Change that lead to this version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/changeDescription"
}
},
"required": ["id", "name", "service"]
}

View File

@ -22,40 +22,30 @@ import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.openmetadata.catalog.CatalogApplicationTest;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.data.CreatePipeline;
import org.openmetadata.catalog.api.data.CreateTask;
import org.openmetadata.catalog.api.services.CreatePipelineService;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Task;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.entity.teams.Team;
import org.openmetadata.catalog.entity.teams.User;
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.jdbi3.PipelineServiceRepository.PipelineServiceEntityInterface;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskEntityInterface;
import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineEntityInterface;
import org.openmetadata.catalog.resources.EntityResourceTest;
import org.openmetadata.catalog.resources.pipelines.PipelineResource.PipelineList;
import org.openmetadata.catalog.resources.services.PipelineServiceResourceTest;
import org.openmetadata.catalog.resources.tasks.TaskResourceTest;
import org.openmetadata.catalog.resources.teams.TeamResourceTest;
import org.openmetadata.catalog.resources.teams.UserResourceTest;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.type.Task;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.TestUtils;
import org.openmetadata.catalog.util.TestUtils.UpdateType;
import org.openmetadata.common.utils.JsonSchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.json.JsonPatch;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -64,7 +54,6 @@ import java.util.UUID;
import static java.util.Collections.singletonList;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.CONFLICT;
import static javax.ws.rs.core.Response.Status.CREATED;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
@ -72,53 +61,70 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE;
import static org.openmetadata.catalog.util.TestUtils.UpdateType.NO_CHANGE;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
import static org.openmetadata.catalog.util.TestUtils.assertEntityPagination;
import static org.openmetadata.catalog.util.TestUtils.assertResponse;
import static org.openmetadata.catalog.util.TestUtils.authHeaders;
public class PipelineResourceTest extends CatalogApplicationTest {
public class PipelineResourceTest extends EntityResourceTest<Pipeline> {
private static final Logger LOG = LoggerFactory.getLogger(PipelineResourceTest.class);
public static User USER1;
public static EntityReference USER_OWNER1;
public static Team TEAM1;
public static EntityReference TEAM_OWNER1;
public static EntityReference AIRFLOW_REFERENCE;
public static EntityReference PREFECT_REFERENCE;
public static List<EntityReference> TASK_REFERENCES;
public static List<Task> TASKS;
public static final TagLabel TIER_1 = new TagLabel().withTagFQN("Tier.Tier1");
public static final TagLabel USER_ADDRESS_TAG_LABEL = new TagLabel().withTagFQN("User.Address");
public PipelineResourceTest() {
super(Pipeline.class, "pipelines", PipelineResource.FIELDS);
}
@BeforeAll
public static void setup(TestInfo test) throws HttpResponseException, URISyntaxException {
USER1 = UserResourceTest.createUser(UserResourceTest.create(test), authHeaders("test@open-metadata.org"));
USER_OWNER1 = new EntityReference().withId(USER1.getId()).withType("user");
TEAM1 = TeamResourceTest.createTeam(TeamResourceTest.create(test), adminAuthHeaders());
TEAM_OWNER1 = new EntityReference().withId(TEAM1.getId()).withType("team");
CreatePipelineService createService = new CreatePipelineService().withName("airflow")
.withServiceType(CreatePipelineService.PipelineServiceType.Airflow)
.withPipelineUrl(TestUtils.PIPELINE_URL);
PipelineService service = PipelineServiceResourceTest.createService(createService, adminAuthHeaders());
AIRFLOW_REFERENCE = new PipelineServiceEntityInterface(service).getEntityReference();
createService.withName("prefect").withServiceType(CreatePipelineService.PipelineServiceType.Prefect);
service = PipelineServiceResourceTest.createService(createService, adminAuthHeaders());
PREFECT_REFERENCE = new PipelineServiceEntityInterface(service).getEntityReference();
TASK_REFERENCES = new ArrayList<>();
EntityResourceTest.setup(test);
TASKS = new ArrayList<>();
for (int i=0; i < 3; i++) {
CreateTask createTask = TaskResourceTest.create(test, i).withService(AIRFLOW_REFERENCE);
Task task = TaskResourceTest.createTask(createTask, adminAuthHeaders());
TASK_REFERENCES.add(new TaskEntityInterface(task).getEntityReference());
Task task = new Task().withName("task" + i).withDescription("description")
.withDisplayName("displayName").withTaskUrl(new URI("http://localhost:0"));
TASKS.add(task);
}
}
@Override
public Object createRequest(TestInfo test, String description, String displayName, EntityReference owner) {
return create(test).withDescription(description).withDisplayName(displayName).withOwner(owner);
}
@Override
public void validateCreatedEntity(Pipeline pipeline, Object request, Map<String, String> authHeaders)
throws HttpResponseException {
CreatePipeline createRequest = (CreatePipeline) request;
validateCommonEntityFields(getEntityInterface(pipeline), createRequest.getDescription(),
TestUtils.getPrincipal(authHeaders), createRequest.getOwner());
assertEquals(createRequest.getDisplayName(), pipeline.getDisplayName());
assertService(createRequest.getService(), pipeline.getService());
validatePipelineTasks(pipeline, createRequest.getTasks());
TestUtils.validateTags(pipeline.getFullyQualifiedName(), createRequest.getTags(), pipeline.getTags());
}
@Override
public void validateUpdatedEntity(Pipeline pipeline, Object request, Map<String, String> authHeaders) throws HttpResponseException {
validateCreatedEntity(pipeline, request, authHeaders);
}
@Override
public void validatePatchedEntity(Pipeline expected, Pipeline updated, Map<String, String> authHeaders) throws HttpResponseException {
validateCommonEntityFields(getEntityInterface(updated), expected.getDescription(),
TestUtils.getPrincipal(authHeaders), expected.getOwner());
assertEquals(expected.getDisplayName(), updated.getDisplayName());
assertService(expected.getService(), updated.getService());
validatePipelineTasks(updated, expected.getTasks());
TestUtils.validateTags(updated.getFullyQualifiedName(), expected.getTags(), updated.getTags());
}
@Override
public EntityInterface<Pipeline> getEntityInterface(Pipeline entity) {
return new PipelineEntityInterface(entity);
}
@Test
@ -152,25 +158,25 @@ public class PipelineResourceTest extends CatalogApplicationTest {
public void post_validPipelines_as_admin_200_OK(TestInfo test) throws HttpResponseException {
// Create team with different optional fields
CreatePipeline create = create(test);
createAndCheckPipeline(create, adminAuthHeaders());
createAndCheckEntity(create, adminAuthHeaders());
create.withName(getPipelineName(test, 1)).withDescription("description");
createAndCheckPipeline(create, adminAuthHeaders());
createAndCheckEntity(create, adminAuthHeaders());
}
@Test
public void post_PipelineWithUserOwner_200_ok(TestInfo test) throws HttpResponseException {
createAndCheckPipeline(create(test).withOwner(USER_OWNER1), adminAuthHeaders());
createAndCheckEntity(create(test).withOwner(USER_OWNER1), adminAuthHeaders());
}
@Test
public void post_PipelineWithTeamOwner_200_ok(TestInfo test) throws HttpResponseException {
createAndCheckPipeline(create(test).withOwner(TEAM_OWNER1).withDisplayName("Pipeline1"), adminAuthHeaders());
createAndCheckEntity(create(test).withOwner(TEAM_OWNER1).withDisplayName("Pipeline1"), adminAuthHeaders());
}
@Test
public void post_PipelineWithTasks_200_ok(TestInfo test) throws HttpResponseException {
createAndCheckPipeline(create(test), TASK_REFERENCES, adminAuthHeaders());
createAndCheckEntity(create(test).withTasks(TASKS), adminAuthHeaders());
}
@Test
@ -214,7 +220,7 @@ public class PipelineResourceTest extends CatalogApplicationTest {
// Create Pipeline for each service and test APIs
for (EntityReference service : differentServices) {
createAndCheckPipeline(create(test).withService(service), adminAuthHeaders());
createAndCheckEntity(create(test).withService(service), adminAuthHeaders());
// List Pipelines by filtering on service name and ensure right Pipelines are returned in the response
PipelineList list = listPipelines("service", service.getName(), adminAuthHeaders());
@ -310,71 +316,11 @@ public class PipelineResourceTest extends CatalogApplicationTest {
LOG.info("before {} after {} ", list.getPaging().getBefore(), list.getPaging().getAfter());
}
@Test
public void put_PipelineUpdateWithNoChange_200(TestInfo test) throws HttpResponseException {
// Create a Pipeline with POST
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1);
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
// Update Pipeline two times successfully with PUT requests
pipeline = updateAndCheckPipeline(pipeline, request, OK, adminAuthHeaders(), NO_CHANGE);
updateAndCheckPipeline(pipeline, request, OK, adminAuthHeaders(), NO_CHANGE);
}
@Test
public void put_PipelineCreate_200(TestInfo test) throws HttpResponseException {
// Create a new Pipeline with PUT
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1);
updateAndCheckPipeline(null, request.withName(test.getDisplayName()).withDescription(null), CREATED,
adminAuthHeaders(), NO_CHANGE);
}
@Test
public void put_PipelineCreate_as_owner_200(TestInfo test) throws HttpResponseException {
// Create a new Pipeline with put
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withOwner(USER_OWNER1);
// Add pipeline as admin
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
// Update the pipeline as user owner
updateAndCheckPipeline(pipeline, request.withDescription("new"), OK, authHeaders(USER1.getEmail()), MINOR_UPDATE);
}
@Test
public void put_PipelineNullDescriptionUpdate_200(TestInfo test) throws HttpResponseException {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null);
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
// Update null description with a new description
pipeline = updateAndCheckPipeline(pipeline, request.withDisplayName("Pipeline1").
withDescription("newDescription"), OK, adminAuthHeaders(), MINOR_UPDATE);
assertEquals("Pipeline1", pipeline.getDisplayName()); // TODO move this to common validate
}
@Test
public void put_PipelineEmptyDescriptionUpdate_200(TestInfo test) throws HttpResponseException {
// Create table with empty description
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription("");
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
// Update empty description with a new description
updateAndCheckPipeline(pipeline, request.withDescription("newDescription"), OK, adminAuthHeaders(), MINOR_UPDATE);
}
@Test
public void put_PipelineNonEmptyDescriptionUpdate_200(TestInfo test) throws HttpResponseException {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription("description");
createAndCheckPipeline(request, adminAuthHeaders());
// Updating description is ignored when backend already has description
Pipeline db = updatePipeline(request.withDescription("newDescription"), OK, adminAuthHeaders());
assertEquals("description", db.getDescription());
}
@Test
public void put_PipelineUrlUpdate_200(TestInfo test) throws HttpResponseException, URISyntaxException {
CreatePipeline request = create(test).withService(new EntityReference().withId(AIRFLOW_REFERENCE.getId())
.withType("pipelineService")).withDescription("description");
createAndCheckPipeline(request, adminAuthHeaders());
createAndCheckEntity(request, adminAuthHeaders());
URI pipelineURI = new URI("https://airflow.open-metadata.org/tree?dag_id=airflow_redshift_usage");
Integer pipelineConcurrency = 110;
Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate();
@ -391,43 +337,34 @@ public class PipelineResourceTest extends CatalogApplicationTest {
}
@Test
public void put_PipelineUpdateOwner_200(TestInfo test) throws HttpResponseException {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription("");
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
// Change ownership from USER_OWNER1 to TEAM_OWNER1
pipeline = updateAndCheckPipeline(pipeline, request.withOwner(TEAM_OWNER1), OK, adminAuthHeaders(), MINOR_UPDATE);
// Remove ownership
pipeline = updateAndCheckPipeline(pipeline, request.withOwner(null), OK, adminAuthHeaders(), MINOR_UPDATE);
assertNull(pipeline.getOwner());
}
@Test
public void put_PipelineTasksUpdate_200(TestInfo test) throws HttpResponseException {
public void put_PipelineTasksUpdate_200(TestInfo test) throws IOException {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null);
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES),
OK, adminAuthHeaders(), MINOR_UPDATE);
validatePipelineTasks(pipeline, TASK_REFERENCES); // TODO clean this up
Pipeline pipeline = createAndCheckEntity(request, adminAuthHeaders());
// Add description and tasks
ChangeDescription change = getChangeDescription(pipeline.getVersion())
.withFieldsAdded(Arrays.asList("description", "tasks"));
pipeline = updateAndCheckEntity(request.withDescription("newDescription").withTasks(TASKS),
OK, adminAuthHeaders(), MINOR_UPDATE, change);
validatePipelineTasks(pipeline, TASKS); // TODO clean this up
}
@Test
public void put_AddRemovePipelineTasksUpdate_200(TestInfo test) throws HttpResponseException {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null);
Pipeline pipeline = createAndCheckPipeline(request, adminAuthHeaders());
public void put_AddRemovePipelineTasksUpdate_200(TestInfo test) throws IOException {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null).withTasks(null);
Pipeline pipeline = createAndCheckEntity(request, adminAuthHeaders());
// Add tasks
pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES),
OK, adminAuthHeaders(), MINOR_UPDATE);
validatePipelineTasks(pipeline, TASK_REFERENCES);
// Add tasks and description
ChangeDescription change = getChangeDescription(pipeline.getVersion())
.withFieldsAdded(Arrays.asList("description", "tasks"));
pipeline = updateAndCheckEntity(request.withDescription("newDescription").withTasks(TASKS),
OK, adminAuthHeaders(), MINOR_UPDATE, change);
// TODO update this once task removal is figured out
// remove a task
TASK_REFERENCES.remove(0);
pipeline = updateAndCheckPipeline(pipeline, request.withDescription("newDescription").withTasks(TASK_REFERENCES),
OK, adminAuthHeaders(), MINOR_UPDATE);
validatePipelineTasks(pipeline, TASK_REFERENCES);
// TASKS.remove(0);
// change = getChangeDescription(pipeline.getVersion()).withFieldsUpdated(singletonList("tasks"));
//updateAndCheckEntity(request.withTasks(TASKS), OK, adminAuthHeaders(), MINOR_UPDATE, change);
}
@Test
@ -441,16 +378,16 @@ public class PipelineResourceTest extends CatalogApplicationTest {
@Test
public void get_PipelineWithDifferentFields_200_OK(TestInfo test) throws HttpResponseException {
CreatePipeline create = create(test).withDescription("description").withOwner(USER_OWNER1)
.withService(AIRFLOW_REFERENCE);
Pipeline pipeline = createAndCheckPipeline(create, adminAuthHeaders());
.withService(AIRFLOW_REFERENCE).withTasks(TASKS);
Pipeline pipeline = createAndCheckEntity(create, adminAuthHeaders());
validateGetWithDifferentFields(pipeline, false);
}
@Test
public void get_PipelineByNameWithDifferentFields_200_OK(TestInfo test) throws HttpResponseException {
CreatePipeline create = create(test).withDescription("description").withOwner(USER_OWNER1)
.withService(AIRFLOW_REFERENCE);
Pipeline pipeline = createAndCheckPipeline(create, adminAuthHeaders());
.withService(AIRFLOW_REFERENCE).withTasks(TASKS);
Pipeline pipeline = createAndCheckEntity(create, adminAuthHeaders());
validateGetWithDifferentFields(pipeline, true);
}
@ -466,21 +403,31 @@ public class PipelineResourceTest extends CatalogApplicationTest {
pipeline.getService().setHref(null); // href is readonly and not patchable
List<TagLabel> pipelineTags = singletonList(TIER_1);
// Add description, owner when previously they were null
pipeline = patchPipelineAttributesAndCheck(pipeline, "description",
TEAM_OWNER1, pipelineTags, adminAuthHeaders(), MINOR_UPDATE);
// Add description, owner and tags when previously they were null
String origJson = JsonUtils.pojoToJson(pipeline);
pipeline.withDescription("description").withOwner(TEAM_OWNER1).withTags(pipelineTags);
ChangeDescription change = getChangeDescription(pipeline.getVersion())
.withFieldsAdded(Arrays.asList("description", "owner", "tags"));
pipeline = patchEntityAndCheck(pipeline, origJson, adminAuthHeaders(), MINOR_UPDATE, change);
pipeline.setOwner(TEAM_OWNER1); // Get rid of href and name returned in the response for owner
pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
pipelineTags = singletonList(USER_ADDRESS_TAG_LABEL);
// Replace description, tier, owner
pipeline = patchPipelineAttributesAndCheck(pipeline, "description1",
USER_OWNER1, pipelineTags, adminAuthHeaders(), MINOR_UPDATE);
// Replace description, tags, owner
origJson = JsonUtils.pojoToJson(pipeline);
pipeline.withDescription("description1").withOwner(USER_OWNER1).withTags(pipelineTags);
change = getChangeDescription(pipeline.getVersion())
.withFieldsUpdated(Arrays.asList("description", "owner", "tags"));
pipeline = patchEntityAndCheck(pipeline, origJson, adminAuthHeaders(), MINOR_UPDATE, change);
pipeline.setOwner(USER_OWNER1); // Get rid of href and name returned in the response for owner
pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
// Remove description, tier, owner
patchPipelineAttributesAndCheck(pipeline, null, null, pipelineTags, adminAuthHeaders(), MINOR_UPDATE);
origJson = JsonUtils.pojoToJson(pipeline);
pipeline.withDescription(null).withOwner(null).withTags(null);
change = getChangeDescription(pipeline.getVersion())
.withFieldsDeleted(Arrays.asList("description", "owner", "tags"));
patchEntityAndCheck(pipeline, origJson, adminAuthHeaders(), MINOR_UPDATE, change);
}
// TODO listing tables test:1
@ -504,58 +451,6 @@ public class PipelineResourceTest extends CatalogApplicationTest {
assertResponse(exception, NOT_FOUND, entityNotFound(Entity.PIPELINE, TestUtils.NON_EXISTENT_ENTITY));
}
public static Pipeline createAndCheckPipeline(CreatePipeline create,
Map<String, String> authHeaders) throws HttpResponseException {
String updatedBy = TestUtils.getPrincipal(authHeaders);
Pipeline pipeline = createPipeline(create, authHeaders);
validatePipeline(pipeline, create.getDisplayName(),
create.getDescription(), create.getOwner(), create.getService(), updatedBy);
return getAndValidate(pipeline.getId(), create, authHeaders, updatedBy);
}
public static Pipeline createAndCheckPipeline(CreatePipeline create, List<EntityReference> tasks,
Map<String, String> authHeaders) throws HttpResponseException {
String updatedBy = TestUtils.getPrincipal(authHeaders);
create.withTasks(tasks);
Pipeline pipeline = createPipeline(create, authHeaders);
assertEquals(0.1, pipeline.getVersion());
validatePipeline(pipeline, create.getDescription(), create.getOwner(), create.getService(), create.getTags(),
tasks, updatedBy);
return getAndValidate(pipeline.getId(), create, authHeaders, updatedBy);
}
public static Pipeline updateAndCheckPipeline(Pipeline before, CreatePipeline create, Status status,
Map<String, String> authHeaders, UpdateType updateType)
throws HttpResponseException {
String updatedBy = TestUtils.getPrincipal(authHeaders);
Pipeline updatedPipeline = updatePipeline(create, status, authHeaders);
validatePipeline(updatedPipeline, create.getDescription(), create.getOwner(), create.getService(), updatedBy);
if (before == null) {
assertEquals(0.1, updatedPipeline.getVersion()); // First version created
} else {
TestUtils.validateUpdate(before.getVersion(), updatedPipeline.getVersion(), updateType);
}
// GET the newly updated Pipeline and validate
return getAndValidate(updatedPipeline.getId(), create, authHeaders, updatedBy);
}
// Make sure in GET operations the returned Pipeline has all the required information passed during creation
public static Pipeline getAndValidate(UUID pipelineId,
CreatePipeline create,
Map<String, String> authHeaders,
String expectedUpdatedBy) throws HttpResponseException {
// GET the newly created Pipeline by ID and validate
Pipeline pipeline = getPipeline(pipelineId, "service,owner,tasks", authHeaders);
validatePipeline(pipeline, create.getDescription(), create.getOwner(), create.getService(), expectedUpdatedBy);
// GET the newly created Pipeline by name and validate
String fqn = pipeline.getFullyQualifiedName();
pipeline = getPipelineByName(fqn, "service,owner,tasks", authHeaders);
return validatePipeline(pipeline, create.getDescription(), create.getOwner(), create.getService(),
expectedUpdatedBy);
}
public static Pipeline updatePipeline(CreatePipeline create,
Status status,
Map<String, String> authHeaders) throws HttpResponseException {
@ -593,126 +488,10 @@ public class PipelineResourceTest extends CatalogApplicationTest {
assertNotNull(pipeline.getOwner());
assertNotNull(pipeline.getService());
assertNotNull(pipeline.getTasks());
TestUtils.validateEntityReference(pipeline.getTasks());
}
private static Pipeline validatePipeline(Pipeline pipeline, String expectedDisplayName, String expectedDescription,
EntityReference expectedOwner, EntityReference expectedService,
String expectedUpdatedBy) {
Pipeline newPipeline = validatePipeline(pipeline, expectedDescription, expectedOwner, expectedService,
expectedUpdatedBy);
assertEquals(expectedDisplayName, newPipeline.getDisplayName());
assertEquals(expectedUpdatedBy, newPipeline.getUpdatedBy());
return newPipeline;
}
private static Pipeline validatePipeline(Pipeline pipeline, String expectedDescription,
EntityReference expectedOwner, EntityReference expectedService,
String expectedUpdatedBy) {
assertNotNull(pipeline.getId());
assertNotNull(pipeline.getHref());
assertEquals(expectedDescription, pipeline.getDescription());
assertEquals(expectedUpdatedBy, pipeline.getUpdatedBy());
// Validate owner
if (expectedOwner != null) {
TestUtils.validateEntityReference(pipeline.getOwner());
assertEquals(expectedOwner.getId(), pipeline.getOwner().getId());
assertEquals(expectedOwner.getType(), pipeline.getOwner().getType());
assertNotNull(pipeline.getOwner().getHref());
}
// Validate service
if (expectedService != null) {
TestUtils.validateEntityReference(pipeline.getService());
assertEquals(expectedService.getId(), pipeline.getService().getId());
assertEquals(expectedService.getType(), pipeline.getService().getType());
}
return pipeline;
}
private static Pipeline validatePipeline(Pipeline pipeline, String expectedDescription,
EntityReference expectedOwner, EntityReference expectedService,
List<TagLabel> expectedTags, List<EntityReference> tasks,
String expectedUpdatedBy) throws HttpResponseException {
assertNotNull(pipeline.getId());
assertNotNull(pipeline.getHref());
assertEquals(expectedDescription, pipeline.getDescription());
assertEquals(expectedUpdatedBy, pipeline.getUpdatedBy());
// Validate owner
if (expectedOwner != null) {
TestUtils.validateEntityReference(pipeline.getOwner());
assertEquals(expectedOwner.getId(), pipeline.getOwner().getId());
assertEquals(expectedOwner.getType(), pipeline.getOwner().getType());
assertNotNull(pipeline.getOwner().getHref());
}
// Validate service
if (expectedService != null) {
TestUtils.validateEntityReference(pipeline.getService());
assertEquals(expectedService.getId(), pipeline.getService().getId());
assertEquals(expectedService.getType(), pipeline.getService().getType());
}
validatePipelineTasks(pipeline, tasks);
TestUtils.validateTags(pipeline.getFullyQualifiedName(), expectedTags, pipeline.getTags());
return pipeline;
}
private static void validatePipelineTasks(Pipeline pipeline, List<EntityReference> tasks) {
if (tasks != null) {
List<UUID> expectedTaskReferences = new ArrayList<>();
for (EntityReference task: tasks) {
expectedTaskReferences.add(task.getId());
}
List<UUID> actualTaskReferences = new ArrayList<>();
for (EntityReference task: pipeline.getTasks()) {
TestUtils.validateEntityReference(task);
actualTaskReferences.add(task.getId());
}
assertTrue(actualTaskReferences.containsAll(expectedTaskReferences));
}
}
private Pipeline patchPipelineAttributesAndCheck(Pipeline before, String newDescription,
EntityReference newOwner, List<TagLabel> tags,
Map<String, String> authHeaders, UpdateType updateType)
throws JsonProcessingException, HttpResponseException {
String updatedBy = TestUtils.getPrincipal(authHeaders);
String pipelineJson = JsonUtils.pojoToJson(before);
// Update the table attributes
before.setDescription(newDescription);
before.setOwner(newOwner);
before.setTags(tags);
// Validate information returned in patch response has the updates
Pipeline updatedPipeline = patchPipeline(pipelineJson, before, authHeaders);
validatePipeline(updatedPipeline, before.getDescription(), newOwner, null, tags,
before.getTasks(), updatedBy);
TestUtils.validateUpdate(before.getVersion(), updatedPipeline.getVersion(), updateType);
// GET the table and Validate information returned
Pipeline getPipeline = getPipeline(before.getId(), "service,owner", authHeaders);
validatePipeline(updatedPipeline, getPipeline.getDescription(), newOwner, null, tags,
getPipeline.getTasks(), updatedBy);
return updatedPipeline;
}
private Pipeline patchPipeline(UUID pipelineId, String originalJson, Pipeline updatedPipeline,
Map<String, String> authHeaders)
throws JsonProcessingException, HttpResponseException {
String updatePipelineJson = JsonUtils.pojoToJson(updatedPipeline);
JsonPatch patch = JsonSchemaUtil.getJsonPatch(originalJson, updatePipelineJson);
return TestUtils.patch(getResource("pipelines/" + pipelineId), patch, Pipeline.class, authHeaders);
}
private Pipeline patchPipeline(String originalJson,
Pipeline updatedPipeline,
Map<String, String> authHeaders)
throws JsonProcessingException, HttpResponseException {
return patchPipeline(updatedPipeline.getId(), originalJson, updatedPipeline, authHeaders);
private static void validatePipelineTasks(Pipeline pipeline, List<Task> expectedTasks) {
assertEquals(expectedTasks, pipeline.getTasks());
}
public static void getPipeline(UUID id, Map<String, String> authHeaders) throws HttpResponseException {

View File

@ -1,506 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.resources.tasks;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.http.client.HttpResponseException;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.openmetadata.catalog.CatalogApplicationTest;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.data.CreateTask;
import org.openmetadata.catalog.api.services.CreatePipelineService;
import org.openmetadata.catalog.entity.data.Task;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskEntityInterface;
import org.openmetadata.catalog.resources.EntityResourceTest;
import org.openmetadata.catalog.resources.tasks.TaskResource.TaskList;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.client.WebTarget;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.CONFLICT;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
import static org.openmetadata.catalog.util.TestUtils.LONG_ENTITY_NAME;
import static org.openmetadata.catalog.util.TestUtils.NON_EXISTENT_ENTITY;
import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
import static org.openmetadata.catalog.util.TestUtils.assertEntityPagination;
import static org.openmetadata.catalog.util.TestUtils.assertResponse;
import static org.openmetadata.catalog.util.TestUtils.authHeaders;
public class TaskResourceTest extends EntityResourceTest<Task> {
private static final Logger LOG = LoggerFactory.getLogger(TaskResourceTest.class);
public TaskResourceTest() {
super(Task.class, "tasks", TaskResource.FIELDS);
}
@BeforeAll
public static void setup(TestInfo test) throws HttpResponseException, URISyntaxException {
EntityResourceTest.setup(test);
}
@Override
public Object createRequest(TestInfo test, String description, String displayName, EntityReference owner)
throws URISyntaxException {
return create(test).withDescription(description).withDisplayName(displayName).withOwner(owner);
}
@Override
public void validateCreatedEntity(Task task, Object request, Map<String, String> authHeaders) throws HttpResponseException {
CreateTask createRequest = (CreateTask) request;
validateCommonEntityFields(getEntityInterface(task), createRequest.getDescription(),
TestUtils.getPrincipal(authHeaders), createRequest.getOwner());
assertEquals(createRequest.getTaskUrl(), task.getTaskUrl());
assertService(createRequest.getService(), task.getService());
TestUtils.validateTags(task.getFullyQualifiedName(), createRequest.getTags(), task.getTags());
}
@Override
public void validateUpdatedEntity(Task updatedEntity, Object request, Map<String, String> authHeaders) throws HttpResponseException {
validateCreatedEntity(updatedEntity, request, authHeaders);
}
@Override
public void validatePatchedEntity(Task expected, Task patched, Map<String, String> authHeaders) throws HttpResponseException {
validateCommonEntityFields(getEntityInterface(patched), expected.getDescription(),
TestUtils.getPrincipal(authHeaders), expected.getOwner());
// Entity specific validation
assertEquals(expected.getTaskUrl(), patched.getTaskUrl());
assertService(expected.getService(), patched.getService());
TestUtils.validateTags(expected.getFullyQualifiedName(), expected.getTags(), patched.getTags());
}
@Override
public EntityInterface<Task> getEntityInterface(Task entity) {
return new TaskEntityInterface(entity);
}
@Test
public void post_taskWithLongName_400_badRequest(TestInfo test) throws URISyntaxException {
// Create task with mandatory name field empty
CreateTask create = create(test).withName(LONG_ENTITY_NAME);
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createTask(create, adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]");
}
@Test
public void post_taskAlreadyExists_409_conflict(TestInfo test) throws HttpResponseException, URISyntaxException {
CreateTask create = create(test);
createTask(create, adminAuthHeaders());
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createTask(create, adminAuthHeaders()));
assertResponse(exception, CONFLICT, CatalogExceptionMessage.ENTITY_ALREADY_EXISTS);
}
@Test
public void post_validTasks_as_admin_200_OK(TestInfo test) throws HttpResponseException, URISyntaxException {
// Create team with different optional fields
CreateTask create = create(test);
createAndCheckEntity(create, adminAuthHeaders());
create.withName(getTaskName(test, 1)).withDescription("description");
createAndCheckEntity(create, adminAuthHeaders());
}
@Test
public void post_taskWithUserOwner_200_ok(TestInfo test) throws HttpResponseException, URISyntaxException {
createAndCheckEntity(create(test).withOwner(USER_OWNER1), adminAuthHeaders());
}
@Test
public void post_taskWithTeamOwner_200_ok(TestInfo test) throws HttpResponseException, URISyntaxException {
createAndCheckEntity(create(test).withOwner(TEAM_OWNER1).withDisplayName("chart1"), adminAuthHeaders());
}
@Test
public void post_task_as_non_admin_401(TestInfo test) throws URISyntaxException {
CreateTask create = create(test);
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createTask(create, authHeaders("test@open-metadata.org")));
assertResponse(exception, FORBIDDEN, "Principal: CatalogPrincipal{name='test'} is not admin");
}
@Test
public void post_taskWithoutRequiredFields_4xx(TestInfo test) {
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createTask(create(test).withName(null), adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "[name must not be null]");
exception = assertThrows(HttpResponseException.class, () ->
createTask(create(test).withName(LONG_ENTITY_NAME), adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]");
// Service is required field
exception = assertThrows(HttpResponseException.class, () ->
createTask(create(test).withService(null), adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "[service must not be null]");
}
@Test
public void post_taskWithInvalidOwnerType_4xx(TestInfo test) throws URISyntaxException {
EntityReference owner = new EntityReference().withId(TEAM1.getId()); /* No owner type is set */
CreateTask create = create(test).withOwner(owner);
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createTask(create, adminAuthHeaders()));
TestUtils.assertResponseContains(exception, BAD_REQUEST, "type must not be null");
}
@Test
public void post_taskWithNonExistentOwner_4xx(TestInfo test) throws URISyntaxException {
EntityReference owner = new EntityReference().withId(NON_EXISTENT_ENTITY).withType("user");
CreateTask create = create(test).withOwner(owner);
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createTask(create, adminAuthHeaders()));
assertResponse(exception, NOT_FOUND, entityNotFound("User", NON_EXISTENT_ENTITY));
}
@Test
public void post_taskWithDifferentService_200_ok(TestInfo test) throws HttpResponseException, URISyntaxException {
EntityReference[] differentServices = {AIRFLOW_REFERENCE, PREFECT_REFERENCE};
// Create task for each service and test APIs
for (EntityReference service : differentServices) {
createAndCheckEntity(create(test).withService(service), adminAuthHeaders());
// List tasks by filtering on service name and ensure right tasks are returned in the response
TaskList list = listTasks("service", service.getName(), adminAuthHeaders());
for (Task task : list.getData()) {
assertEquals(service.getName(), task.getService().getName());
}
}
}
@Test
public void get_taskListWithInvalidLimitOffset_4xx() {
// Limit must be >= 1 and <= 1000,000
HttpResponseException exception = assertThrows(HttpResponseException.class, ()
-> listTasks(null, null, -1, null, null, adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]");
exception = assertThrows(HttpResponseException.class, ()
-> listTasks(null, null, 0, null, null, adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]");
exception = assertThrows(HttpResponseException.class, ()
-> listTasks(null, null, 1000001, null, null, adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "[query param limit must be less than or equal to 1000000]");
}
@Test
public void get_taskListWithInvalidPaginationCursors_4xx() {
// Passing both before and after cursors is invalid
HttpResponseException exception = assertThrows(HttpResponseException.class, ()
-> listTasks(null, null, 1, "", "", adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "Only one of before or after query parameter allowed");
}
@Test
public void get_taskListWithValidLimitOffset_4xx(TestInfo test) throws HttpResponseException, URISyntaxException {
// Create a large number of tasks
int maxTasks = 40;
for (int i = 0; i < maxTasks; i++) {
createTask(create(test, i), adminAuthHeaders());
}
// List all tasks
TaskList allTasks = listTasks(null, null, 1000000, null,
null, adminAuthHeaders());
int totalRecords = allTasks.getData().size();
printTasks(allTasks);
// List limit number tasks at a time at various offsets and ensure right results are returned
for (int limit = 1; limit < maxTasks; limit++) {
String after = null;
String before;
int pageCount = 0;
int indexInAllTasks = 0;
TaskList forwardPage;
TaskList backwardPage;
do { // For each limit (or page size) - forward scroll till the end
LOG.info("Limit {} forward scrollCount {} afterCursor {}", limit, pageCount, after);
forwardPage = listTasks(null, null, limit, null, after, adminAuthHeaders());
printTasks(forwardPage);
after = forwardPage.getPaging().getAfter();
before = forwardPage.getPaging().getBefore();
assertEntityPagination(allTasks.getData(), forwardPage, limit, indexInAllTasks);
if (pageCount == 0) { // CASE 0 - First page is being returned. There is no before cursor
assertNull(before);
} else {
// Make sure scrolling back based on before cursor returns the correct result
backwardPage = listTasks(null, null, limit, before, null, adminAuthHeaders());
assertEntityPagination(allTasks.getData(), backwardPage, limit, (indexInAllTasks - limit));
}
indexInAllTasks += forwardPage.getData().size();
pageCount++;
} while (after != null);
// We have now reached the last page - test backward scroll till the beginning
pageCount = 0;
indexInAllTasks = totalRecords - limit - forwardPage.getData().size();
do {
LOG.info("Limit {} backward scrollCount {} beforeCursor {}", limit, pageCount, before);
forwardPage = listTasks(null, null, limit, before, null, adminAuthHeaders());
printTasks(forwardPage);
before = forwardPage.getPaging().getBefore();
assertEntityPagination(allTasks.getData(), forwardPage, limit, indexInAllTasks);
pageCount++;
indexInAllTasks -= forwardPage.getData().size();
} while (before != null);
}
}
private void printTasks(TaskList list) {
list.getData().forEach(task -> LOG.info("Task {}", task.getFullyQualifiedName()));
LOG.info("before {} after {} ", list.getPaging().getBefore(), list.getPaging().getAfter());
}
@Test
public void put_taskUrlUpdate_200(TestInfo test) throws HttpResponseException, URISyntaxException {
URI taskURI = new URI("http://localhost:8080/task_id=1");
String taskSQL = "select * from test;";
Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate();
Date endDate = new DateTime("2021-12-13T20:20:39+00:00").toDate();
CreateTask request = create(test).withService(AIRFLOW_REFERENCE)
.withDescription("description").withTaskUrl(taskURI);
createAndCheckEntity(request, adminAuthHeaders());
// Updating description is ignored when backend already has description
Task task = updateEntity(request.withTaskUrl(taskURI).withTaskSQL(taskSQL)
.withTaskType("test").withStartDate(startDate).withEndDate(endDate),
OK, adminAuthHeaders());
assertEquals(taskURI, task.getTaskUrl());
assertEquals(taskSQL, task.getTaskSQL());
assertEquals("test", task.getTaskType());
assertEquals(startDate, task.getStartDate());
assertEquals(endDate, task.getEndDate());
}
@Test
public void get_nonExistentTask_404_notFound() {
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
getTask(NON_EXISTENT_ENTITY, adminAuthHeaders()));
assertResponse(exception, NOT_FOUND,
entityNotFound(Entity.TASK, NON_EXISTENT_ENTITY));
}
@Test
public void get_taskWithDifferentFields_200_OK(TestInfo test) throws HttpResponseException, URISyntaxException {
CreateTask create = create(test).withDescription("description").withOwner(USER_OWNER1)
.withService(AIRFLOW_REFERENCE);
Task task = createAndCheckEntity(create, adminAuthHeaders());
validateGetWithDifferentFields(task, false);
}
@Test
public void get_taskByNameWithDifferentFields_200_OK(TestInfo test) throws HttpResponseException, URISyntaxException {
CreateTask create = create(test).withDescription("description").withOwner(USER_OWNER1)
.withService(AIRFLOW_REFERENCE);
Task task = createAndCheckEntity(create, adminAuthHeaders());
validateGetWithDifferentFields(task, true);
}
@Test
public void patch_taskAttributes_200_ok(TestInfo test) throws HttpResponseException, JsonProcessingException,
URISyntaxException {
// Create task without description, owner
Task task = createTask(create(test), adminAuthHeaders());
assertNull(task.getDescription());
assertNull(task.getOwner());
assertNotNull(task.getService());
//
// Add description, owner and tags when previously they were null
//
List<TagLabel> taskTags = List.of(USER_ADDRESS_TAG_LABEL);
String origJson = JsonUtils.pojoToJson(task);
task.withDescription("description").withOwner(TEAM_OWNER1).withTags(taskTags);
ChangeDescription change = getChangeDescription(task.getVersion())
.withFieldsAdded(Arrays.asList("description","owner", "tags"));
task = patchEntityAndCheck(task, origJson, adminAuthHeaders(), MINOR_UPDATE, change);
task.setOwner(TEAM_OWNER1); // Get rid of href and name returned in the response for owner
task.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
//
// Update description, tier, owner
//
taskTags = List.of(USER_ADDRESS_TAG_LABEL, TIER1_TAG_LABEL);
origJson = JsonUtils.pojoToJson(task);
task.withDescription("description1").withOwner(USER_OWNER1).withTags(taskTags);
change = getChangeDescription(task.getVersion()).withFieldsUpdated(Arrays.asList("description", "owner", "tags"));
task = patchEntityAndCheck(task, origJson, adminAuthHeaders(), MINOR_UPDATE, change);
task.setOwner(USER_OWNER1); // Get rid of href and name returned in the response for owner
task.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
//
// Remove description and owner - remove a tag
//
taskTags = List.of(TIER1_TAG_LABEL);
origJson = JsonUtils.pojoToJson(task);
task.withDescription(null).withOwner(null).withTags(taskTags);
change = getChangeDescription(task.getVersion()).withFieldsDeleted(Arrays.asList("description","owner"))
.withFieldsUpdated(Collections.singletonList("tags"));
patchEntityAndCheck(task, origJson, adminAuthHeaders(), MINOR_UPDATE, change);
}
@Test
public void delete_emptyTask_200_ok(TestInfo test) throws HttpResponseException, URISyntaxException {
Task task = createTask(create(test), adminAuthHeaders());
deleteTask(task.getId(), adminAuthHeaders());
}
@Test
public void delete_nonExistentTask_404() {
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
deleteTask(NON_EXISTENT_ENTITY, adminAuthHeaders()));
assertResponse(exception, NOT_FOUND, entityNotFound(Entity.TASK, NON_EXISTENT_ENTITY));
}
public static Task createTask(CreateTask create,
Map<String, String> authHeaders) throws HttpResponseException {
return TestUtils.post(getResource("tasks"), create, Task.class, authHeaders);
}
/**
* Validate returned fields GET .../tasks/{id}?fields="..." or GET .../tasks/name/{fqn}?fields="..."
*/
private void validateGetWithDifferentFields(Task task, boolean byName) throws HttpResponseException {
// .../tasks?fields=owner
String fields = "owner";
task = byName ? getTaskByName(task.getFullyQualifiedName(), fields, adminAuthHeaders()) :
getTask(task.getId(), fields, adminAuthHeaders());
assertNotNull(task.getOwner());
assertNotNull(task.getService()); // We always return the service
// .../tasks?fields=owner,service
fields = "owner,service";
task = byName ? getTaskByName(task.getFullyQualifiedName(), fields, adminAuthHeaders()) :
getTask(task.getId(), fields, adminAuthHeaders());
assertNotNull(task.getOwner());
assertNotNull(task.getService());
// .../tasks?fields=owner,service
fields = "owner,service";
task = byName ? getTaskByName(task.getFullyQualifiedName(), fields, adminAuthHeaders()) :
getTask(task.getId(), fields, adminAuthHeaders());
assertNotNull(task.getOwner());
assertNotNull(task.getService());
}
public static void getTask(UUID id, Map<String, String> authHeaders) throws HttpResponseException {
getTask(id, null, authHeaders);
}
public static Task getTask(UUID id, String fields, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource("tasks/" + id);
target = fields != null ? target.queryParam("fields", fields) : target;
return TestUtils.get(target, Task.class, authHeaders);
}
public static Task getTaskByName(String fqn, String fields, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource("tasks/name/" + fqn);
target = fields != null ? target.queryParam("fields", fields) : target;
return TestUtils.get(target, Task.class, authHeaders);
}
public static TaskList listTasks(String fields, String serviceParam, Map<String, String> authHeaders)
throws HttpResponseException {
return listTasks(fields, serviceParam, null, null, null, authHeaders);
}
public static TaskList listTasks(String fields, String serviceParam, Integer limitParam,
String before, String after, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource("tasks");
target = fields != null ? target.queryParam("fields", fields) : target;
target = serviceParam != null ? target.queryParam("service", serviceParam) : target;
target = limitParam != null ? target.queryParam("limit", limitParam) : target;
target = before != null ? target.queryParam("before", before) : target;
target = after != null ? target.queryParam("after", after) : target;
return TestUtils.get(target, TaskResource.TaskList.class, authHeaders);
}
private void deleteTask(UUID id, Map<String, String> authHeaders) throws HttpResponseException {
TestUtils.delete(getResource("tasks/" + id), authHeaders);
// Ensure deleted task does not exist
HttpResponseException exception = assertThrows(HttpResponseException.class, () -> getTask(id, authHeaders));
assertResponse(exception, NOT_FOUND, entityNotFound(Entity.TASK, id));
}
public static String getTaskName(TestInfo test) {
return String.format("task_%s", test.getDisplayName());
}
public static String getTaskName(TestInfo test, int index) {
return String.format("task%d_%s", index, test.getDisplayName());
}
public static CreateTask create(TestInfo test) throws URISyntaxException {
return new CreateTask().withName(getTaskName(test)).withService(AIRFLOW_REFERENCE)
.withTaskUrl(new URI("http://localhost:0"));
}
public static CreateTask create(TestInfo test, int index) throws URISyntaxException {
return new CreateTask().withName(getTaskName(test, index)).withService(AIRFLOW_REFERENCE)
.withTaskUrl(new URI("http://localhost:0"));
}
public static PipelineService createService(CreatePipelineService create,
Map<String, String> authHeaders) throws HttpResponseException {
return TestUtils.post(CatalogApplicationTest.getResource("services/pipelineServices"),
create, PipelineService.class, authHeaders);
}
}