From a2f2e0bc2d2da5f5bcd9251d91e547ff8e452c8b Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 24 Sep 2021 08:54:04 -0700 Subject: [PATCH] [WIP] Airlfow integration --- .../mysql/v001__create_db_connection_info.sql | 9 + .../java/org/openmetadata/catalog/Entity.java | 3 + .../catalog/jdbi3/TaskRepository.java | 207 +++++++++--------- .../pipeline/PipelineServiceResource.java | 10 +- .../catalog/resources/tasks/TaskResource.java | 162 +++++++------- .../openmetadata/catalog/util/EntityUtil.java | 25 ++- .../json/schema/api/data/createChart.json | 4 +- .../json/schema/api/data/createTask.json | 42 ++-- .../json/schema/entity/data/pipeline.json | 31 ++- .../json/schema/entity/data/task.json | 105 +++++++++ .../entity/services/pipelineService.json | 43 +--- 11 files changed, 403 insertions(+), 238 deletions(-) diff --git a/bootstrap/sql/mysql/v001__create_db_connection_info.sql b/bootstrap/sql/mysql/v001__create_db_connection_info.sql index 636ba373412..7b9efec6a91 100644 --- a/bootstrap/sql/mysql/v001__create_db_connection_info.sql +++ b/bootstrap/sql/mysql/v001__create_db_connection_info.sql @@ -156,6 +156,15 @@ CREATE TABLE IF NOT EXISTS chart_entity ( UNIQUE KEY unique_name(fullyQualifiedName) ); +CREATE TABLE IF NOT EXISTS task_entity ( + id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL, + fullyQualifiedName VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.fullyQualifiedName') NOT NULL, + json JSON NOT NULL, + timestamp BIGINT, + PRIMARY KEY (id), + UNIQUE KEY unique_name(fullyQualifiedName) +); + -- -- Feed related tables -- diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java index 10db3256cfc..524d567ecab 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java @@ -23,15 +23,18 @@ public final class Entity { public static final String DATABASE_SERVICE = "databaseService"; public static final String MESSAGING_SERVICE = "messagingService"; public static final String DASHBOARD_SERVICE = "dashboardService"; + public static final String PIPELINE_SERVICE = "pipelineService"; // Data assets public static final String TABLE = "table"; public static final String DATABASE = "database"; public static final String METRICS = "metrics"; public static final String DASHBOARD = "dashboard"; + public static final String PIPELINE = "pipeline"; public static final String CHART = "chart"; public static final String REPORT = "report"; public static final String TOPIC = "topic"; + public static final String TASK = "task"; // Team/user public static final String USER = "user"; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java index 45b2778f948..f03a08b2661 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java @@ -18,6 +18,7 @@ package org.openmetadata.catalog.jdbi3; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.entity.data.Chart; +import org.openmetadata.catalog.entity.data.Task; import org.openmetadata.catalog.entity.services.DashboardService; import org.openmetadata.catalog.exception.CatalogExceptionMessage; import org.openmetadata.catalog.exception.EntityNotFoundException; @@ -25,7 +26,7 @@ import org.openmetadata.catalog.jdbi3.DashboardServiceRepository.DashboardServic import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO; import org.openmetadata.catalog.resources.charts.ChartResource; -import org.openmetadata.catalog.resources.charts.ChartResource.ChartList; +import org.openmetadata.catalog.resources.tasks.TaskResource.TaskList; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.TagLabel; import org.openmetadata.catalog.util.EntityUtil; @@ -51,17 +52,17 @@ import java.util.Objects; import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; -public abstract class ChartRepository { - private static final Logger LOG = LoggerFactory.getLogger(ChartRepository.class); - private static final Fields CHART_UPDATE_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner"); - private static final Fields CHART_PATCH_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,service,tags"); +public abstract class TaskRepository { + private static final Logger LOG = LoggerFactory.getLogger(TaskRepository.class); + private static final Fields TASK_UPDATE_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,taskConfig"); + private static final Fields TASK_PATCH_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,service,tags"); - public static String getFQN(EntityReference service, Chart chart) { - return (service.getName() + "." + chart.getName()); + public static String getFQN(EntityReference service, Task task) { + return (service.getName() + "." + task.getName()); } @CreateSqlObject - abstract ChartDAO chartDAO(); + abstract TaskDAO taskDAO(); @CreateSqlObject abstract EntityRelationshipDAO relationshipDAO(); @@ -80,62 +81,62 @@ public abstract class ChartRepository { @Transaction - public ChartList listAfter(Fields fields, String serviceName, int limitParam, String after) throws IOException, + public TaskList listAfter(Fields fields, String serviceName, int limitParam, String after) throws IOException, GeneralSecurityException { // forward scrolling, if after == null then first page is being asked being asked - List jsons = chartDAO().listAfter(serviceName, limitParam + 1, after == null ? "" : + List jsons = taskDAO().listAfter(serviceName, limitParam + 1, after == null ? "" : CipherText.instance().decrypt(after)); - List charts = new ArrayList<>(); + List tasks = new ArrayList<>(); for (String json : jsons) { - charts.add(setFields(JsonUtils.readValue(json, Chart.class), fields)); + tasks.add(setFields(JsonUtils.readValue(json, Task.class), fields)); } - int total = chartDAO().listCount(serviceName); + int total = taskDAO().listCount(serviceName); String beforeCursor, afterCursor = null; - beforeCursor = after == null ? null : charts.get(0).getFullyQualifiedName(); - if (charts.size() > limitParam) { // If extra result exists, then next page exists - return after cursor - charts.remove(limitParam); - afterCursor = charts.get(limitParam - 1).getFullyQualifiedName(); + beforeCursor = after == null ? null : tasks.get(0).getFullyQualifiedName(); + if (tasks.size() > limitParam) { // If extra result exists, then next page exists - return after cursor + tasks.remove(limitParam); + afterCursor = tasks.get(limitParam - 1).getFullyQualifiedName(); } - return new ChartList(charts, beforeCursor, afterCursor, total); + return new TaskList(tasks, beforeCursor, afterCursor, total); } @Transaction - public ChartList listBefore(Fields fields, String serviceName, int limitParam, String before) throws IOException, + public TaskList listBefore(Fields fields, String serviceName, int limitParam, String before) throws IOException, GeneralSecurityException { // Reverse scrolling - Get one extra result used for computing before cursor - List jsons = chartDAO().listBefore(serviceName, limitParam + 1, CipherText.instance().decrypt(before)); - List charts = new ArrayList<>(); + List jsons = taskDAO().listBefore(serviceName, limitParam + 1, CipherText.instance().decrypt(before)); + List tasks = new ArrayList<>(); for (String json : jsons) { - charts.add(setFields(JsonUtils.readValue(json, Chart.class), fields)); + tasks.add(setFields(JsonUtils.readValue(json, Task.class), fields)); } - int total = chartDAO().listCount(serviceName); + int total = taskDAO().listCount(serviceName); String beforeCursor = null, afterCursor; - if (charts.size() > limitParam) { // If extra result exists, then previous page exists - return before cursor - charts.remove(0); - beforeCursor = charts.get(0).getFullyQualifiedName(); + if (tasks.size() > limitParam) { // If extra result exists, then previous page exists - return before cursor + tasks.remove(0); + beforeCursor = tasks.get(0).getFullyQualifiedName(); } - afterCursor = charts.get(charts.size() - 1).getFullyQualifiedName(); - return new ChartList(charts, beforeCursor, afterCursor, total); + afterCursor = tasks.get(tasks.size() - 1).getFullyQualifiedName(); + return new TaskList(tasks, beforeCursor, afterCursor, total); } @Transaction - public Chart get(String id, Fields fields) throws IOException { - return setFields(validateChart(id), fields); + public Task get(String id, Fields fields) throws IOException { + return setFields(validateTask(id), fields); } @Transaction - public Chart getByName(String fqn, Fields fields) throws IOException { - Chart chart = EntityUtil.validate(fqn, chartDAO().findByFQN(fqn), Chart.class); - return setFields(chart, fields); + public Task getByName(String fqn, Fields fields) throws IOException { + Task task = EntityUtil.validate(fqn, taskDAO().findByFQN(fqn), Task.class); + return setFields(task, fields); } @Transaction - public Chart create(Chart chart, EntityReference service, EntityReference owner) throws IOException { + public Task create(Task task, EntityReference service, EntityReference owner) throws IOException { getService(service); // Validate service - return createInternal(chart, service, owner); + return createInternal(task, service, owner); } @Transaction @@ -143,73 +144,73 @@ public abstract class ChartRepository { if (relationshipDAO().findToCount(id, Relationship.CONTAINS.ordinal(), Entity.CHART) > 0) { throw new IllegalArgumentException("Chart is not empty"); } - if (chartDAO().delete(id) <= 0) { + if (taskDAO().delete(id) <= 0) { throw EntityNotFoundException.byMessage(entityNotFound(Entity.CHART, id)); } relationshipDAO().deleteAll(id); } @Transaction - public PutResponse createOrUpdate(Chart updatedChart, EntityReference service, EntityReference newOwner) + public PutResponse createOrUpdate(Task updatedTask, EntityReference service, EntityReference newOwner) throws IOException { getService(service); // Validate service - String fqn = getFQN(service, updatedChart); - Chart storedDB = JsonUtils.readValue(chartDAO().findByFQN(fqn), Chart.class); + String fqn = getFQN(service, updatedTask); + Task storedDB = JsonUtils.readValue(taskDAO().findByFQN(fqn), Task.class); if (storedDB == null) { // Chart does not exist. Create a new one - return new PutResponse<>(Status.CREATED, createInternal(updatedChart, service, newOwner)); + return new PutResponse<>(Status.CREATED, createInternal(updatedTask, service, newOwner)); } // Update the existing chart EntityUtil.populateOwner(userDAO(), teamDAO(), newOwner); // Validate new owner if (storedDB.getDescription() == null || storedDB.getDescription().isEmpty()) { - storedDB.withDescription(updatedChart.getDescription()); + storedDB.withDescription(updatedTask.getDescription()); } //update the display name from source - if (updatedChart.getDisplayName() != null && !updatedChart.getDisplayName().isEmpty()) { - storedDB.withDisplayName(updatedChart.getDisplayName()); + if (updatedTask.getDisplayName() != null && !updatedTask.getDisplayName().isEmpty()) { + storedDB.withDisplayName(updatedTask.getDisplayName()); } - chartDAO().update(storedDB.getId().toString(), JsonUtils.pojoToJson(storedDB)); + taskDAO().update(storedDB.getId().toString(), JsonUtils.pojoToJson(storedDB)); // Update owner relationship - setFields(storedDB, CHART_UPDATE_FIELDS); // First get the ownership information + setFields(storedDB, TASK_UPDATE_FIELDS); // First get the ownership information updateOwner(storedDB, storedDB.getOwner(), newOwner); // Service can't be changed in update since service name is part of FQN and // change to a different service will result in a different FQN and creation of a new chart under the new service storedDB.setService(service); - applyTags(updatedChart); + applyTags(updatedTask); return new PutResponse<>(Status.OK, storedDB); } @Transaction - public Chart patch(String id, JsonPatch patch) throws IOException { - Chart original = setFields(validateChart(id), CHART_PATCH_FIELDS); - Chart updated = JsonUtils.applyPatch(original, patch, Chart.class); + public Task patch(String id, JsonPatch patch) throws IOException { + Task original = setFields(validateTask(id), TASK_PATCH_FIELDS); + Task updated = JsonUtils.applyPatch(original, patch, Task.class); patch(original, updated); return updated; } - public Chart createInternal(Chart chart, EntityReference service, EntityReference owner) throws IOException { - chart.setFullyQualifiedName(getFQN(service, chart)); + public Task createInternal(Task task, EntityReference service, EntityReference owner) throws IOException { + task.setFullyQualifiedName(getFQN(service, task)); EntityUtil.populateOwner(userDAO(), teamDAO(), owner); // Validate owner // Query 1 - insert chart into chart_entity table - chartDAO().insert(JsonUtils.pojoToJson(chart)); - setService(chart, service); - setOwner(chart, owner); - applyTags(chart); - return chart; + taskDAO().insert(JsonUtils.pojoToJson(task)); + setService(task, service); + setOwner(task, owner); + applyTags(task); + return task; } - private void applyTags(Chart chart) throws IOException { + private void applyTags(Task task) throws IOException { // Add chart level tags by adding tag to chart relationship - EntityUtil.applyTags(tagDAO(), chart.getTags(), chart.getFullyQualifiedName()); - chart.setTags(getTags(chart.getFullyQualifiedName())); // Update tag to handle additional derived tags + EntityUtil.applyTags(tagDAO(), task.getTags(), task.getFullyQualifiedName()); + task.setTags(getTags(task.getFullyQualifiedName())); // Update tag to handle additional derived tags } - private void patch(Chart original, Chart updated) throws IOException { + private void patch(Task original, Task updated) throws IOException { String chartId = original.getId().toString(); if (!original.getId().equals(updated.getId())) { throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.CHART, "id")); @@ -229,57 +230,57 @@ public abstract class ChartRepository { updated.setHref(null); updated.setOwner(null); updated.setService(null); - chartDAO().update(chartId, JsonUtils.pojoToJson(updated)); + taskDAO().update(chartId, JsonUtils.pojoToJson(updated)); updateOwner(updated, original.getOwner(), newOwner); updated.setService(newService); applyTags(updated); } - public EntityReference getOwner(Chart chart) throws IOException { - if (chart == null) { + public EntityReference getOwner(Task task) throws IOException { + if (task == null) { return null; } - return EntityUtil.populateOwner(chart.getId(), relationshipDAO(), userDAO(), teamDAO()); + return EntityUtil.populateOwner(task.getId(), relationshipDAO(), userDAO(), teamDAO()); } - private void setOwner(Chart chart, EntityReference owner) { - EntityUtil.setOwner(relationshipDAO(), chart.getId(), Entity.CHART, owner); - chart.setOwner(owner); + private void setOwner(Task task, EntityReference owner) { + EntityUtil.setOwner(relationshipDAO(), task.getId(), Entity.TASK, owner); + task.setOwner(owner); } - private void updateOwner(Chart chart, EntityReference origOwner, EntityReference newOwner) { - EntityUtil.updateOwner(relationshipDAO(), origOwner, newOwner, chart.getId(), Entity.CHART); - chart.setOwner(newOwner); + private void updateOwner(Task task, EntityReference origOwner, EntityReference newOwner) { + EntityUtil.updateOwner(relationshipDAO(), origOwner, newOwner, task.getId(), Entity.TASK); + task.setOwner(newOwner); } - private Chart validateChart(String id) throws IOException { - return EntityUtil.validate(id, chartDAO().findById(id), Chart.class); + private Task validateTask(String id) throws IOException { + return EntityUtil.validate(id, taskDAO().findById(id), Task.class); } - private Chart setFields(Chart chart, Fields fields) throws IOException { - chart.setOwner(fields.contains("owner") ? getOwner(chart) : null); - chart.setService(fields.contains("service") ? getService(chart) : null); - chart.setFollowers(fields.contains("followers") ? getFollowers(chart) : null); - chart.setTags(fields.contains("tags") ? getTags(chart.getFullyQualifiedName()) : null); - return chart; + private Task setFields(Task task, Fields fields) throws IOException { + task.setOwner(fields.contains("owner") ? getOwner(task) : null); + task.setService(fields.contains("service") ? getService(task) : null); + task.setFollowers(fields.contains("followers") ? getFollowers(task) : null); + task.setTags(fields.contains("tags") ? getTags(task.getFullyQualifiedName()) : null); + return task; } - private List getFollowers(Chart chart) throws IOException { - return chart == null ? null : EntityUtil.getFollowers(chart.getId(), relationshipDAO(), userDAO()); + private List getFollowers(Task task) throws IOException { + return task == null ? null : EntityUtil.getFollowers(task.getId(), relationshipDAO(), userDAO()); } private List getTags(String fqn) { return tagDAO().getTags(fqn); } - private EntityReference getService(Chart chart) throws IOException { - return chart == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(relationshipDAO(), - chart.getId(), Entity.DASHBOARD_SERVICE))); + private EntityReference getService(Task task) throws IOException { + return task == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(relationshipDAO(), + task.getId(), Entity.PIPELINE_SERVICE))); } private EntityReference getService(EntityReference service) throws IOException { String id = service.getId().toString(); - if (service.getType().equalsIgnoreCase(Entity.DASHBOARD_SERVICE)) { + if (service.getType().equalsIgnoreCase(Entity.PIPELINE_SERVICE)) { DashboardService serviceInstance = EntityUtil.validate(id, dashboardServiceDAO().findById(id), DashboardService.class); service.setDescription(serviceInstance.getDescription()); @@ -290,19 +291,19 @@ public abstract class ChartRepository { return service; } - public void setService(Chart chart, EntityReference service) throws IOException { - if (service != null && chart != null) { + public void setService(Task task, EntityReference service) throws IOException { + if (service != null && task != null) { getService(service); // Populate service details - relationshipDAO().insert(service.getId().toString(), chart.getId().toString(), service.getType(), + relationshipDAO().insert(service.getId().toString(), task.getId().toString(), service.getType(), Entity.CHART, Relationship.CONTAINS.ordinal()); - chart.setService(service); + task.setService(service); } } @Transaction - public Status addFollower(String chartId, String userId) throws IOException { - EntityUtil.validate(chartId, chartDAO().findById(chartId), Chart.class); - return EntityUtil.addFollower(relationshipDAO(), userDAO(), chartId, Entity.CHART, userId, Entity.USER) ? + public Status addFollower(String taskId, String userId) throws IOException { + EntityUtil.validate(taskId, taskDAO().findById(taskId), Task.class); + return EntityUtil.addFollower(relationshipDAO(), userDAO(), taskId, Entity.TASK, userId, Entity.USER) ? Status.CREATED : Status.OK; } @@ -312,36 +313,36 @@ public abstract class ChartRepository { EntityUtil.removeFollower(relationshipDAO(), chartId, userId); } - public interface ChartDAO { - @SqlUpdate("INSERT INTO chart_entity (json) VALUES (:json)") + public interface TaskDAO { + @SqlUpdate("INSERT INTO task_entity (json) VALUES (:json)") void insert(@Bind("json") String json); - @SqlUpdate("UPDATE chart_entity SET json = :json where id = :id") + @SqlUpdate("UPDATE task_entity SET json = :json where id = :id") void update(@Bind("id") String id, @Bind("json") String json); - @SqlQuery("SELECT json FROM chart_entity WHERE fullyQualifiedName = :name") + @SqlQuery("SELECT json FROM task_entity WHERE fullyQualifiedName = :name") String findByFQN(@Bind("name") String name); - @SqlQuery("SELECT json FROM chart_entity WHERE id = :id") + @SqlQuery("SELECT json FROM task_entity WHERE id = :id") String findById(@Bind("id") String id); - @SqlQuery("SELECT count(*) FROM chart_entity WHERE " + + @SqlQuery("SELECT count(*) FROM task_entity WHERE " + "(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL)") int listCount(@Bind("fqnPrefix") String fqnPrefix); @SqlQuery( "SELECT json FROM (" + - "SELECT fullyQualifiedName, json FROM chart_entity WHERE " + + "SELECT fullyQualifiedName, json FROM task_entity WHERE " + "(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " +// Filter by // service name - "fullyQualifiedName < :before " + // Pagination by chart fullyQualifiedName - "ORDER BY fullyQualifiedName DESC " + // Pagination ordering by chart fullyQualifiedName + "fullyQualifiedName < :before " + // Pagination by task fullyQualifiedName + "ORDER BY fullyQualifiedName DESC " + // Pagination ordering by task fullyQualifiedName "LIMIT :limit" + ") last_rows_subquery ORDER BY fullyQualifiedName") List listBefore(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit, @Bind("before") String before); - @SqlQuery("SELECT json FROM chart_entity WHERE " + + @SqlQuery("SELECT json FROM task_entity WHERE " + "(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " + "fullyQualifiedName > :after " + "ORDER BY fullyQualifiedName " + @@ -349,10 +350,10 @@ public abstract class ChartRepository { List listAfter(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit, @Bind("after") String after); - @SqlQuery("SELECT EXISTS (SELECT * FROM chart_entity WHERE id = :id)") + @SqlQuery("SELECT EXISTS (SELECT * FROM task_entity WHERE id = :id)") boolean exists(@Bind("id") String id); - @SqlUpdate("DELETE FROM chart_entity WHERE id = :id") + @SqlUpdate("DELETE FROM task_entity WHERE id = :id") int delete(@Bind("id") String id); } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/pipeline/PipelineServiceResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/pipeline/PipelineServiceResource.java index d9a1792c7e0..c7f35a96cc2 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/pipeline/PipelineServiceResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/pipeline/PipelineServiceResource.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.openmetadata.catalog.resources.services.messaging; +package org.openmetadata.catalog.resources.services.pipeline; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.inject.Inject; @@ -56,12 +56,12 @@ import java.util.List; import java.util.Objects; import java.util.UUID; -@Path("/v1/services/messagingServices") -@Api(value = "Messaging service collection", tags = "Services -> Messaging service collection") +@Path("/v1/services/pipelineServices") +@Api(value = "Pipeline service collection", tags = "Services -> Pipeline service collection") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) @Collection(name = "messagingServices", repositoryClass = "org.openmetadata.catalog.jdbi3.MessagingServiceRepository") -public class MessagingServiceResource { +public class PipelineServiceResource { private final MessagingServiceRepository dao; private final CatalogAuthorizer authorizer; @@ -80,7 +80,7 @@ public class MessagingServiceResource { } @Inject - public MessagingServiceResource(MessagingServiceRepository dao, CatalogAuthorizer authorizer) { + public PipelineServiceResource(MessagingServiceRepository dao, CatalogAuthorizer authorizer) { Objects.requireNonNull(dao, "MessagingServiceRepository must not be null"); this.dao = dao; this.authorizer = authorizer; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java index d1efb0218e7..f4ac3ae84cd 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.openmetadata.catalog.resources.charts; +package org.openmetadata.catalog.resources.tasks; import com.google.inject.Inject; import io.swagger.annotations.Api; @@ -26,10 +26,12 @@ import io.swagger.v3.oas.annotations.media.ExampleObject; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.responses.ApiResponse; -import org.openmetadata.catalog.api.data.CreateChart; +import org.openmetadata.catalog.api.data.CreateTask; import org.openmetadata.catalog.entity.data.Chart; import org.openmetadata.catalog.entity.data.Dashboard; +import org.openmetadata.catalog.entity.data.Task; import org.openmetadata.catalog.jdbi3.ChartRepository; +import org.openmetadata.catalog.jdbi3.TaskRepository; import org.openmetadata.catalog.resources.Collection; import org.openmetadata.catalog.security.CatalogAuthorizer; import org.openmetadata.catalog.security.SecurityUtil; @@ -73,49 +75,49 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; -@Path("/v1/charts") -@Api(value = "Chart data asset collection", tags = "Chart data asset collection") +@Path("/v1/tasks") +@Api(value = "tasks data asset collection", tags = "Task data asset collection") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -@Collection(name = "charts", repositoryClass = "org.openmetadata.catalog.jdbi3.ChartRepository") -public class ChartResource { - private static final Logger LOG = LoggerFactory.getLogger(ChartResource.class); - private static final String CHART_COLLECTION_PATH = "v1/charts/"; - private final ChartRepository dao; +@Collection(name = "tasks", repositoryClass = "org.openmetadata.catalog.jdbi3.TaskRepository") +public class TaskResource { + private static final Logger LOG = LoggerFactory.getLogger(TaskResource.class); + private static final String TASK_COLLECTION_PATH = "v1/tasks/"; + private final TaskRepository dao; private final CatalogAuthorizer authorizer; public static void addHref(UriInfo uriInfo, EntityReference ref) { - ref.withHref(RestUtil.getHref(uriInfo, CHART_COLLECTION_PATH, ref.getId())); + ref.withHref(RestUtil.getHref(uriInfo, TASK_COLLECTION_PATH, ref.getId())); } - public static List addHref(UriInfo uriInfo, List charts) { - Optional.ofNullable(charts).orElse(Collections.emptyList()).forEach(i -> addHref(uriInfo, i)); - return charts; + public static List addHref(UriInfo uriInfo, List tasks) { + Optional.ofNullable(tasks).orElse(Collections.emptyList()).forEach(i -> addHref(uriInfo, i)); + return tasks; } - public static Chart addHref(UriInfo uriInfo, Chart chart) { - chart.setHref(RestUtil.getHref(uriInfo, CHART_COLLECTION_PATH, chart.getId())); - EntityUtil.addHref(uriInfo, chart.getOwner()); - EntityUtil.addHref(uriInfo, chart.getService()); - EntityUtil.addHref(uriInfo, chart.getFollowers()); + public static Task addHref(UriInfo uriInfo, Task task) { + task.setHref(RestUtil.getHref(uriInfo, TASK_COLLECTION_PATH, task.getId())); + EntityUtil.addHref(uriInfo, task.getOwner()); + EntityUtil.addHref(uriInfo, task.getService()); + EntityUtil.addHref(uriInfo, task.getFollowers()); - return chart; + return task; } @Inject - public ChartResource(ChartRepository dao, CatalogAuthorizer authorizer) { - Objects.requireNonNull(dao, "ChartRepository must not be null"); + public TaskResource(TaskRepository dao, CatalogAuthorizer authorizer) { + Objects.requireNonNull(dao, "TaskRepository must not be null"); this.dao = dao; this.authorizer = authorizer; } - public static class ChartList extends ResultList { + public static class TaskList extends ResultList { @SuppressWarnings("unused") - ChartList() { + TaskList() { // Empty constructor needed for deserialization } - public ChartList(List data, String beforeCursor, String afterCursor, int total) + public TaskList(List data, String beforeCursor, String afterCursor, int total) throws GeneralSecurityException, UnsupportedEncodingException { super(data, beforeCursor, afterCursor, total); } @@ -127,16 +129,16 @@ public class ChartResource { @GET @Valid - @Operation(summary = "List charts", tags = "charts", - description = "Get a list of charts, optionally filtered by `service` it belongs to. Use `fields` " + + @Operation(summary = "List tasks", tags = "tasks", + description = "Get a list of tasks, optionally filtered by `service` it belongs to. Use `fields` " + "parameter to get only necessary fields. Use cursor-based pagination to limit the number " + "entries in the list using `limit` and `before` or `after` query params.", responses = { @ApiResponse(responseCode = "200", description = "List of charts", content = @Content(mediaType = "application/json", - schema = @Schema(implementation = ChartList.class))) + schema = @Schema(implementation = TaskList.class))) }) - public ChartList list(@Context UriInfo uriInfo, + public TaskList list(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description = "Fields requested in the returned resource", schema = @Schema(type = "string", example = FIELDS)) @@ -144,7 +146,7 @@ public class ChartResource { @Parameter(description = "Filter charts by service name", schema = @Schema(type = "string", example = "superset")) @QueryParam("service") String serviceParam, - @Parameter(description = "Limit the number charts returned. (1 to 1000000, default = 10)") + @Parameter(description = "Limit the number tasks returned. (1 to 1000000, default = 10)") @DefaultValue("10") @Min(1) @Max(1000000) @@ -159,31 +161,31 @@ public class ChartResource { RestUtil.validateCursors(before, after); Fields fields = new Fields(FIELD_LIST, fieldsParam); - ChartList charts; + TaskList tasks; if (before != null) { // Reverse paging - charts = dao.listBefore(fields, serviceParam, limitParam, before); // Ask for one extra entry + tasks = dao.listBefore(fields, serviceParam, limitParam, before); // Ask for one extra entry } else { // Forward paging or first page - charts = dao.listAfter(fields, serviceParam, limitParam, after); + tasks = dao.listAfter(fields, serviceParam, limitParam, after); } - addHref(uriInfo, charts.getData()); - return charts; + addHref(uriInfo, tasks.getData()); + return tasks; } @GET @Path("/{id}") - @Operation(summary = "Get a Chart", tags = "charts", - description = "Get a chart by `id`.", + @Operation(summary = "Get a Task", tags = "tasks", + description = "Get a task by `id`.", responses = { - @ApiResponse(responseCode = "200", description = "The chart", + @ApiResponse(responseCode = "200", description = "The Task", content = @Content(mediaType = "application/json", schema = @Schema(implementation = Dashboard.class))), - @ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found") + @ApiResponse(responseCode = "404", description = "Task for instance {id} is not found") }) - public Chart get(@Context UriInfo uriInfo, @PathParam("id") String id, - @Context SecurityContext securityContext, - @Parameter(description = "Fields requested in the returned resource", + public Task get(@Context UriInfo uriInfo, @PathParam("id") String id, + @Context SecurityContext securityContext, + @Parameter(description = "Fields requested in the returned resource", schema = @Schema(type = "string", example = FIELDS)) - @QueryParam("fields") String fieldsParam) throws IOException { + @QueryParam("fields") String fieldsParam) throws IOException { Fields fields = new Fields(FIELD_LIST, fieldsParam); return addHref(uriInfo, dao.get(id, fields)); } @@ -204,9 +206,9 @@ public class ChartResource { schema = @Schema(type = "string", example = FIELDS)) @QueryParam("fields") String fieldsParam) throws IOException { Fields fields = new Fields(FIELD_LIST, fieldsParam); - Chart chart = dao.getByName(fqn, fields); - addHref(uriInfo, chart); - return Response.ok(chart).build(); + Task task = dao.getByName(fqn, fields); + addHref(uriInfo, task); + return Response.ok(task).build(); } @POST @@ -219,27 +221,28 @@ public class ChartResource { @ApiResponse(responseCode = "400", description = "Bad request") }) public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext, - @Valid CreateChart create) throws IOException { + @Valid CreateTask create) throws IOException { SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); - Chart chart = - new Chart().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName()) + Task task = + new Task().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName()) .withDescription(create.getDescription()) .withService(create.getService()) - .withChartType(create.getChartType()).withChartUrl(create.getChartUrl()) - .withTables(create.getTables()).withTags(create.getTags()) + .withTaskConfig(create.getTaskConfig()) + .withTaskUrl(create.getTaskUrl()) + .withTags(create.getTags()) .withOwner(create.getOwner()); - chart = addHref(uriInfo, dao.create(chart, create.getService(), create.getOwner())); - return Response.created(chart.getHref()).entity(chart).build(); + task = addHref(uriInfo, dao.create(task, create.getService(), create.getOwner())); + return Response.created(task.getHref()).entity(task).build(); } @PATCH @Path("/{id}") - @Operation(summary = "Update a chart", tags = "charts", + @Operation(summary = "Update a Task", tags = "task", description = "Update an existing chart using JsonPatch.", externalDocs = @ExternalDocumentation(description = "JsonPatch RFC", url = "https://tools.ietf.org/html/rfc6902")) @Consumes(MediaType.APPLICATION_JSON_PATCH_JSON) - public Chart updateDescription(@Context UriInfo uriInfo, + public Task updateDescription(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("id") String id, @RequestBody(description = "JsonPatch with array of operations", @@ -250,63 +253,64 @@ public class ChartResource { "]")})) JsonPatch patch) throws IOException { Fields fields = new Fields(FIELD_LIST, FIELDS); - Chart chart = dao.get(id, fields); + Task task = dao.get(id, fields); SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext, - EntityUtil.getEntityReference(chart)); - chart = dao.patch(id, patch); - return addHref(uriInfo, chart); + EntityUtil.getEntityReference(task)); + task = dao.patch(id, patch); + return addHref(uriInfo, task); } @PUT - @Operation(summary = "Create or update chart", tags = "charts", - description = "Create a chart, it it does not exist or update an existing chart.", + @Operation(summary = "Create or update task", tags = "tasks", + description = "Create a task, it it does not exist or update an existing chart.", responses = { - @ApiResponse(responseCode = "200", description = "The updated chart ", + @ApiResponse(responseCode = "200", description = "The updated task ", content = @Content(mediaType = "application/json", schema = @Schema(implementation = Chart.class))) }) public Response createOrUpdate(@Context UriInfo uriInfo, @Context SecurityContext securityContext, - @Valid CreateChart create) throws IOException { + @Valid CreateTask create) throws IOException { - Chart chart = - new Chart().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName()) + Task task = + new Task().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName()) .withDescription(create.getDescription()) .withService(create.getService()) - .withChartType(create.getChartType()).withChartUrl(create.getChartUrl()) - .withTables(create.getTables()).withTags(create.getTags()) + .withTaskUrl(create.getTaskUrl()) + .withTaskConfig(create.getTaskConfig()) + .withTags(create.getTags()) .withOwner(create.getOwner()); - PutResponse response = dao.createOrUpdate(chart, create.getService(), create.getOwner()); - chart = addHref(uriInfo, response.getEntity()); - return Response.status(response.getStatus()).entity(chart).build(); + PutResponse response = dao.createOrUpdate(task, create.getService(), create.getOwner()); + task = addHref(uriInfo, response.getEntity()); + return Response.status(response.getStatus()).entity(task).build(); } @PUT @Path("/{id}/followers") - @Operation(summary = "Add a follower", tags = "charts", + @Operation(summary = "Add a follower", tags = "tasks", description = "Add a user identified by `userId` as followed of this chart", responses = { @ApiResponse(responseCode = "200", description = "OK"), - @ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found") + @ApiResponse(responseCode = "404", description = "Task for instance {id} is not found") }) public Response addFollower(@Context UriInfo uriInfo, @Context SecurityContext securityContext, - @Parameter(description = "Id of the chart", schema = @Schema(type = "string")) + @Parameter(description = "Id of the task", schema = @Schema(type = "string")) @PathParam("id") String id, @Parameter(description = "Id of the user to be added as follower", schema = @Schema(type = "string")) String userId) throws IOException, ParseException { Fields fields = new Fields(FIELD_LIST, "followers"); Response.Status status = dao.addFollower(id, userId); - Chart chart = dao.get(id, fields); - return Response.status(status).entity(chart).build(); + Task task = dao.get(id, fields); + return Response.status(status).entity(task).build(); } @DELETE @Path("/{id}/followers/{userId}") @Operation(summary = "Remove a follower", tags = "charts", description = "Remove the user identified `userId` as a follower of the chart.") - public Chart deleteFollower(@Context UriInfo uriInfo, + public Task deleteFollower(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description = "Id of the chart", schema = @Schema(type = "string")) @@ -316,15 +320,15 @@ public class ChartResource { @PathParam("userId") String userId) throws IOException, ParseException { Fields fields = new Fields(FIELD_LIST, "followers"); dao.deleteFollower(id, userId); - Chart chart = dao.get(id, fields); - return addHref(uriInfo, chart); + Task task = dao.get(id, fields); + return addHref(uriInfo, task); } @DELETE @Path("/{id}") - @Operation(summary = "Delete a Chart", tags = "charts", - description = "Delete a chart by `id`.", + @Operation(summary = "Delete a Task", tags = "tasks", + description = "Delete a task by `id`.", responses = { @ApiResponse(responseCode = "200", description = "OK"), @ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found") diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java index ccd358b36a0..dbdb155fb0a 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java @@ -23,6 +23,7 @@ import org.openmetadata.catalog.entity.data.Database; import org.openmetadata.catalog.entity.data.Metrics; import org.openmetadata.catalog.entity.data.Report; import org.openmetadata.catalog.entity.data.Table; +import org.openmetadata.catalog.entity.data.Task; import org.openmetadata.catalog.entity.data.Topic; import org.openmetadata.catalog.entity.services.DashboardService; import org.openmetadata.catalog.entity.services.DatabaseService; @@ -40,6 +41,7 @@ import org.openmetadata.catalog.jdbi3.Relationship; import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO; import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; import org.openmetadata.catalog.jdbi3.TagRepository.TagDAO; +import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; import org.openmetadata.catalog.jdbi3.UsageRepository.UsageDAO; @@ -52,6 +54,7 @@ import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink; import org.openmetadata.catalog.resources.services.dashboard.DashboardServiceResource; import org.openmetadata.catalog.resources.services.database.DatabaseServiceResource; import org.openmetadata.catalog.resources.services.messaging.MessagingServiceResource; +import org.openmetadata.catalog.resources.tasks.TaskResource; import org.openmetadata.catalog.resources.teams.TeamResource; import org.openmetadata.catalog.resources.teams.UserResource; import org.openmetadata.catalog.resources.topics.TopicResource; @@ -136,9 +139,6 @@ public final class EntityUtil { case Entity.DATABASE: DatabaseResource.addHref(uriInfo, ref); break; - case Entity.DATABASE_SERVICE: - DatabaseServiceResource.addHref(uriInfo, ref); - break; case Entity.TOPIC: TopicResource.addHref(uriInfo, ref); break; @@ -148,12 +148,21 @@ public final class EntityUtil { case Entity.DASHBOARD: DashboardResource.addHref(uriInfo, ref); break; + case Entity.TASK: + TaskResource.addHref(uriInfo, ref); + break; + case Entity.DATABASE_SERVICE: + DatabaseServiceResource.addHref(uriInfo, ref); + break; case Entity.MESSAGING_SERVICE: MessagingServiceResource.addHref(uriInfo, ref); break; case Entity.DASHBOARD_SERVICE: DashboardServiceResource.addHref(uriInfo, ref); break; + case Entity.PIPELINE_SERVICE: + DashboardServiceResource.addHref(uriInfo, ref); + break; default: throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(ref.getType())); } @@ -237,7 +246,8 @@ public final class EntityUtil { public static List getEntityReference(List list, TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO, DashboardDAO dashboardDAO, ReportDAO reportDAO, - TopicDAO topicDAO, ChartDAO chartDAO) throws IOException { + TopicDAO topicDAO, ChartDAO chartDAO, + TaskDAO taskDAO) throws IOException { for (EntityReference ref : list) { getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO, chartDAO); } @@ -401,6 +411,8 @@ public final class EntityUtil { return getEntityReference(EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class)); case Entity.TOPIC: return getEntityReference(EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class)); + case Entity.TASK: + return getEntityReference(EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Task.class)); default: throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entityType, fqn)); } @@ -419,6 +431,11 @@ public final class EntityUtil { return details; } + public static EntityReference getEntityReference(Task task) { + return new EntityReference().withDescription(task.getDescription()).withId(task.getId()) + .withName(task.getFullyQualifiedName()).withType(Entity.TASK); + } + public static EntityReference getEntityReference(Chart chart) { return new EntityReference().withDescription(chart.getDescription()).withId(chart.getId()) .withName(chart.getFullyQualifiedName()).withType(Entity.CHART); diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createChart.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createChart.json index 49b7843d892..d052ba98466 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createChart.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createChart.json @@ -6,7 +6,7 @@ "type": "object", "properties" : { "name": { - "description": "Name that identifies this dashboard.", + "description": "Name that identifies this Chart.", "type": "string", "minLength": 1, "maxLength": 64 @@ -16,7 +16,7 @@ "type": "string" }, "description": { - "description": "Description of the database instance. What it has and how to use it.", + "description": "Description of the chart instance. What it has and how to use it.", "type": "string" }, "chartType": { diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createTask.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createTask.json index 49b7843d892..d9414acc89a 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createTask.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createTask.json @@ -1,35 +1,51 @@ { - "$id": "https://open-metadata.org/schema/api/data/createChart.json", + "$id": "https://open-metadata.org/schema/api/data/createTask.json", "$schema": "http://json-schema.org/draft-07/schema#", - "title": "Create Chart entity request", - "description": "Create Chart entity request", + "title": "Create Task entity request", + "description": "Create Task entity request", "type": "object", "properties" : { "name": { - "description": "Name that identifies this dashboard.", + "description": "Name that identifies this Task.", "type": "string", "minLength": 1, "maxLength": 64 }, "displayName": { - "description": "Display Name that identifies this Chart. It could be title or label from the source services", + "description": "Display Name that identifies this Task. It could be title or label from the pipeline services", "type": "string" }, "description": { - "description": "Description of the database instance. What it has and how to use it.", + "description": "Description of the task instance. What it has and how to use it.", "type": "string" }, "chartType": { "$ref": "../../entity/data/chart.json#/definitions/chartType" }, - "chartUrl" : { - "description": "Chart URL, pointing to its own Service URL", + "taskUrl" : { + "description": "Task URL to visit/manage. This URL points to respective pipeline service UI", "type": "string", "format": "uri" }, - "tables": { - "description": "Link to tables used in this chart.", - "$ref": "../../type/entityReference.json#/definitions/entityReferenceList" + "upstreamTasks": { + "description": "All the tasks that are upstream of this task.", + "type": "array", + "items": { + "$ref": "../../type/entityReference.json" + }, + "default": null + }, + "downstreamTasks": { + "description": "All the tasks that are downstream of this task.", + "type": "array", + "items": { + "$ref": "../../type/entityReference.json" + }, + "default": null + }, + "taskConfig": { + "description": "Task Configuration.", + "$ref": "../../entity/data/task.json#definitions/taskConfig" }, "tags": { "description": "Tags for this chart", @@ -40,11 +56,11 @@ "default": null }, "owner": { - "description": "Owner of this database", + "description": "Owner of this Task", "$ref": "../../type/entityReference.json" }, "service" : { - "description": "Link to the database service where this database is hosted in", + "description": "Link to the pipeline service where this task is used", "$ref" : "../../type/entityReference.json" } }, diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json index 654be1bdb60..17329da039b 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json @@ -16,6 +16,10 @@ "minLength": 1, "maxLength": 64 }, + "displayName": { + "description": "Display Name that identifies this Pipeline. It could be title or label from the source services.", + "type": "string" + }, "fullyQualifiedName": { "description": "A unique name that identifies a pipeline in the format 'ServiceName.PipelineName'.", "type": "string", @@ -23,9 +27,34 @@ "maxLength": 64 }, "description": { - "description": "Description of this pipeline.", + "description": "Description of this Pipeline.", "type": "string" }, + "pipelineUrl" : { + "description": "Pipeline URL to visit/manage. This URL points to respective pipeline service UI", + "type": "string", + "format": "uri" + }, + "tasks": { + "description": "All the tasks that are part of pipeline.", + "type": "array", + "items": { + "$ref": "../../type/entityReference.json" + }, + "default": null + }, + "followers": { + "description": "Followers of this Pipeline.", + "$ref": "../../type/entityReference.json#/definitions/entityReferenceList" + }, + "tags": { + "description": "Tags for this Pipeline.", + "type": "array", + "items": { + "$ref": "../../type/tagLabel.json" + }, + "default": null + }, "href": { "description": "Link to the resource corresponding to this entity.", "$ref": "../../type/basic.json#/definitions/href" diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/task.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/task.json index e69de29bb2d..a1237c08743 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/task.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/task.json @@ -0,0 +1,105 @@ +{ + "$id": "https://open-metadata.org/schema/entity/data/task.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Task", + "description": "This schema defines the Task entity. A task is a unit of computation in a Pipeline. ", + "type": "object", + "javaType": "org.openmetadata.catalog.entity.data.Task", + "definitions": { + "taskConfig": { + "type": "object", + "javaType": "org.openmetadata.catalog.type.TaskConfig", + "description": "This schema defines the type for a column in a table.", + "properties": { + "codeLocation": { + "description": "Location of task file", + "type": "string" + }, + "startDate": { + "description": "Start Date of the task", + "$ref": "../../type/basic.json#/definitions/date" + }, + "concurrency": { + "description": "Concurrency of the Task", + "type": "integer" + } + } + } + }, + "properties" : { + "id": { + "description": "Unique identifier that identifies a task instance.", + "$ref": "../../type/basic.json#/definitions/uuid" + }, + "name": { + "description": "Name that identifies this task instance uniquely.", + "type": "string", + "minLength": 1, + "maxLength": 64 + }, + "displayName": { + "description": "Display Name that identifies this Task. It could be title or label from the pipeline services.", + "type": "string" + }, + "fullyQualifiedName": { + "description": "A unique name that identifies a pipeline in the format 'ServiceName.PipelineName.TaskName'.", + "type": "string", + "minLength": 1, + "maxLength": 64 + }, + "description": { + "description": "Description of this Task.", + "type": "string" + }, + "taskUrl" : { + "description": "Task URL to visit/manage. This URL points to respective pipeline service UI", + "type": "string", + "format": "uri" + }, + "upstreamTasks": { + "description": "All the tasks that are upstream of this task.", + "type": "array", + "items": { + "$ref": "../../type/entityReference.json" + }, + "default": null + }, + "downstreamTasks": { + "description": "All the tasks that are downstream of this task.", + "type": "array", + "items": { + "$ref": "../../type/entityReference.json" + }, + "default": null + }, + "taskConfig": { + "description": "Task Configuration.", + "$ref": "#/definitions/taskConfig" + }, + "followers": { + "description": "Followers of this Pipeline.", + "$ref": "../../type/entityReference.json#/definitions/entityReferenceList" + }, + "tags": { + "description": "Tags for this Pipeline.", + "type": "array", + "items": { + "$ref": "../../type/tagLabel.json" + }, + "default": null + }, + "href": { + "description": "Link to the resource corresponding to this entity.", + "$ref": "../../type/basic.json#/definitions/href" + }, + "owner": { + "description": "Owner of this pipeline.", + "$ref": "../../type/entityReference.json" + }, + "service" : { + "description": "Link to service where this pipeline is hosted in.", + "$ref" : "../../type/entityReference.json" + } + }, + "required": ["id", "name", "service"] +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json index c606b0a40c7..bb4e5c7a379 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json @@ -2,62 +2,45 @@ "$id": "https://open-metadata.org/schema/entity/services/messagingService.json", "$schema": "http://json-schema.org/draft-07/schema#", "title": "Messaging Service", - "description": "This schema defines the Messaging Service entity, such as Kafka and Pulsar.", + "description": "This schema defines the Pipeline Service entity, such as Airflow and Prefect.", "type": "object", "definitions": { - "messagingServiceType": { - "description": "Type of messaging service - Kafka or Pulsar.", + "pipelineServiceType": { + "description": "Type of pipeline service - Airflow or Prefect.", "type": "string", "enum": [ - "Kafka", - "Pulsar" + "Airflow", + "Prefect" ], "javaEnums": [ { - "name": "Kafka" + "name": "Airflow" }, { - "name": "Pulsar" + "name": "Prefect" } ] - }, - "brokers": { - "description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.", - "type": "array", - "items": { - "type": "string" - }, - "default": null } }, "properties": { "id": { - "description": "Unique identifier of this messaging service instance.", + "description": "Unique identifier of this pipeline service instance.", "$ref": "../../type/basic.json#/definitions/uuid" }, "name": { - "description": "Name that identifies this messaging service.", + "description": "Name that identifies this pipeline service.", "type": "string", "minLength": 1, "maxLength": 64 }, "serviceType": { - "description": "Type of messaging service such as Kafka or Pulsar...", - "$ref": "#/definitions/messagingServiceType" + "description": "Type of pipeline service such as Airflow or Prefect...", + "$ref": "#/definitions/pipelineServiceType" }, "description": { "description": "Description of a messaging service instance.", "type": "string" }, - "brokers": { - "description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.", - "$ref" : "#/definitions/brokers" - }, - "schemaRegistry" : { - "description": "Schema registry URL.", - "type": "string", - "format": "uri" - }, "ingestionSchedule": { "description": "Schedule for running metadata ingestion jobs.", "$ref": "../../type/schedule.json" @@ -69,8 +52,6 @@ }, "required": [ "id", - "name", - "serviceType", - "brokers" + "name" ] }