mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-20 06:58:18 +00:00
[WIP] Airlfow integration
This commit is contained in:
parent
75029a6e87
commit
a2f2e0bc2d
@ -156,6 +156,15 @@ CREATE TABLE IF NOT EXISTS chart_entity (
|
|||||||
UNIQUE KEY unique_name(fullyQualifiedName)
|
UNIQUE KEY unique_name(fullyQualifiedName)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS task_entity (
|
||||||
|
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
|
||||||
|
fullyQualifiedName VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.fullyQualifiedName') NOT NULL,
|
||||||
|
json JSON NOT NULL,
|
||||||
|
timestamp BIGINT,
|
||||||
|
PRIMARY KEY (id),
|
||||||
|
UNIQUE KEY unique_name(fullyQualifiedName)
|
||||||
|
);
|
||||||
|
|
||||||
--
|
--
|
||||||
-- Feed related tables
|
-- Feed related tables
|
||||||
--
|
--
|
||||||
|
@ -23,15 +23,18 @@ public final class Entity {
|
|||||||
public static final String DATABASE_SERVICE = "databaseService";
|
public static final String DATABASE_SERVICE = "databaseService";
|
||||||
public static final String MESSAGING_SERVICE = "messagingService";
|
public static final String MESSAGING_SERVICE = "messagingService";
|
||||||
public static final String DASHBOARD_SERVICE = "dashboardService";
|
public static final String DASHBOARD_SERVICE = "dashboardService";
|
||||||
|
public static final String PIPELINE_SERVICE = "pipelineService";
|
||||||
|
|
||||||
// Data assets
|
// Data assets
|
||||||
public static final String TABLE = "table";
|
public static final String TABLE = "table";
|
||||||
public static final String DATABASE = "database";
|
public static final String DATABASE = "database";
|
||||||
public static final String METRICS = "metrics";
|
public static final String METRICS = "metrics";
|
||||||
public static final String DASHBOARD = "dashboard";
|
public static final String DASHBOARD = "dashboard";
|
||||||
|
public static final String PIPELINE = "pipeline";
|
||||||
public static final String CHART = "chart";
|
public static final String CHART = "chart";
|
||||||
public static final String REPORT = "report";
|
public static final String REPORT = "report";
|
||||||
public static final String TOPIC = "topic";
|
public static final String TOPIC = "topic";
|
||||||
|
public static final String TASK = "task";
|
||||||
|
|
||||||
// Team/user
|
// Team/user
|
||||||
public static final String USER = "user";
|
public static final String USER = "user";
|
||||||
|
@ -18,6 +18,7 @@ package org.openmetadata.catalog.jdbi3;
|
|||||||
|
|
||||||
import org.openmetadata.catalog.Entity;
|
import org.openmetadata.catalog.Entity;
|
||||||
import org.openmetadata.catalog.entity.data.Chart;
|
import org.openmetadata.catalog.entity.data.Chart;
|
||||||
|
import org.openmetadata.catalog.entity.data.Task;
|
||||||
import org.openmetadata.catalog.entity.services.DashboardService;
|
import org.openmetadata.catalog.entity.services.DashboardService;
|
||||||
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
|
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
|
||||||
import org.openmetadata.catalog.exception.EntityNotFoundException;
|
import org.openmetadata.catalog.exception.EntityNotFoundException;
|
||||||
@ -25,7 +26,7 @@ import org.openmetadata.catalog.jdbi3.DashboardServiceRepository.DashboardServic
|
|||||||
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
|
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
|
||||||
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
|
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
|
||||||
import org.openmetadata.catalog.resources.charts.ChartResource;
|
import org.openmetadata.catalog.resources.charts.ChartResource;
|
||||||
import org.openmetadata.catalog.resources.charts.ChartResource.ChartList;
|
import org.openmetadata.catalog.resources.tasks.TaskResource.TaskList;
|
||||||
import org.openmetadata.catalog.type.EntityReference;
|
import org.openmetadata.catalog.type.EntityReference;
|
||||||
import org.openmetadata.catalog.type.TagLabel;
|
import org.openmetadata.catalog.type.TagLabel;
|
||||||
import org.openmetadata.catalog.util.EntityUtil;
|
import org.openmetadata.catalog.util.EntityUtil;
|
||||||
@ -51,17 +52,17 @@ import java.util.Objects;
|
|||||||
|
|
||||||
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
|
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
|
||||||
|
|
||||||
public abstract class ChartRepository {
|
public abstract class TaskRepository {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ChartRepository.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TaskRepository.class);
|
||||||
private static final Fields CHART_UPDATE_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner");
|
private static final Fields TASK_UPDATE_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,taskConfig");
|
||||||
private static final Fields CHART_PATCH_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,service,tags");
|
private static final Fields TASK_PATCH_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,service,tags");
|
||||||
|
|
||||||
public static String getFQN(EntityReference service, Chart chart) {
|
public static String getFQN(EntityReference service, Task task) {
|
||||||
return (service.getName() + "." + chart.getName());
|
return (service.getName() + "." + task.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@CreateSqlObject
|
@CreateSqlObject
|
||||||
abstract ChartDAO chartDAO();
|
abstract TaskDAO taskDAO();
|
||||||
|
|
||||||
@CreateSqlObject
|
@CreateSqlObject
|
||||||
abstract EntityRelationshipDAO relationshipDAO();
|
abstract EntityRelationshipDAO relationshipDAO();
|
||||||
@ -80,62 +81,62 @@ public abstract class ChartRepository {
|
|||||||
|
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
public ChartList listAfter(Fields fields, String serviceName, int limitParam, String after) throws IOException,
|
public TaskList listAfter(Fields fields, String serviceName, int limitParam, String after) throws IOException,
|
||||||
GeneralSecurityException {
|
GeneralSecurityException {
|
||||||
// forward scrolling, if after == null then first page is being asked being asked
|
// forward scrolling, if after == null then first page is being asked being asked
|
||||||
List<String> jsons = chartDAO().listAfter(serviceName, limitParam + 1, after == null ? "" :
|
List<String> jsons = taskDAO().listAfter(serviceName, limitParam + 1, after == null ? "" :
|
||||||
CipherText.instance().decrypt(after));
|
CipherText.instance().decrypt(after));
|
||||||
|
|
||||||
List<Chart> charts = new ArrayList<>();
|
List<Task> tasks = new ArrayList<>();
|
||||||
for (String json : jsons) {
|
for (String json : jsons) {
|
||||||
charts.add(setFields(JsonUtils.readValue(json, Chart.class), fields));
|
tasks.add(setFields(JsonUtils.readValue(json, Task.class), fields));
|
||||||
}
|
}
|
||||||
int total = chartDAO().listCount(serviceName);
|
int total = taskDAO().listCount(serviceName);
|
||||||
|
|
||||||
String beforeCursor, afterCursor = null;
|
String beforeCursor, afterCursor = null;
|
||||||
beforeCursor = after == null ? null : charts.get(0).getFullyQualifiedName();
|
beforeCursor = after == null ? null : tasks.get(0).getFullyQualifiedName();
|
||||||
if (charts.size() > limitParam) { // If extra result exists, then next page exists - return after cursor
|
if (tasks.size() > limitParam) { // If extra result exists, then next page exists - return after cursor
|
||||||
charts.remove(limitParam);
|
tasks.remove(limitParam);
|
||||||
afterCursor = charts.get(limitParam - 1).getFullyQualifiedName();
|
afterCursor = tasks.get(limitParam - 1).getFullyQualifiedName();
|
||||||
}
|
}
|
||||||
return new ChartList(charts, beforeCursor, afterCursor, total);
|
return new TaskList(tasks, beforeCursor, afterCursor, total);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
public ChartList listBefore(Fields fields, String serviceName, int limitParam, String before) throws IOException,
|
public TaskList listBefore(Fields fields, String serviceName, int limitParam, String before) throws IOException,
|
||||||
GeneralSecurityException {
|
GeneralSecurityException {
|
||||||
// Reverse scrolling - Get one extra result used for computing before cursor
|
// Reverse scrolling - Get one extra result used for computing before cursor
|
||||||
List<String> jsons = chartDAO().listBefore(serviceName, limitParam + 1, CipherText.instance().decrypt(before));
|
List<String> jsons = taskDAO().listBefore(serviceName, limitParam + 1, CipherText.instance().decrypt(before));
|
||||||
List<Chart> charts = new ArrayList<>();
|
List<Task> tasks = new ArrayList<>();
|
||||||
for (String json : jsons) {
|
for (String json : jsons) {
|
||||||
charts.add(setFields(JsonUtils.readValue(json, Chart.class), fields));
|
tasks.add(setFields(JsonUtils.readValue(json, Task.class), fields));
|
||||||
}
|
}
|
||||||
int total = chartDAO().listCount(serviceName);
|
int total = taskDAO().listCount(serviceName);
|
||||||
|
|
||||||
String beforeCursor = null, afterCursor;
|
String beforeCursor = null, afterCursor;
|
||||||
if (charts.size() > limitParam) { // If extra result exists, then previous page exists - return before cursor
|
if (tasks.size() > limitParam) { // If extra result exists, then previous page exists - return before cursor
|
||||||
charts.remove(0);
|
tasks.remove(0);
|
||||||
beforeCursor = charts.get(0).getFullyQualifiedName();
|
beforeCursor = tasks.get(0).getFullyQualifiedName();
|
||||||
}
|
}
|
||||||
afterCursor = charts.get(charts.size() - 1).getFullyQualifiedName();
|
afterCursor = tasks.get(tasks.size() - 1).getFullyQualifiedName();
|
||||||
return new ChartList(charts, beforeCursor, afterCursor, total);
|
return new TaskList(tasks, beforeCursor, afterCursor, total);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
public Chart get(String id, Fields fields) throws IOException {
|
public Task get(String id, Fields fields) throws IOException {
|
||||||
return setFields(validateChart(id), fields);
|
return setFields(validateTask(id), fields);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
public Chart getByName(String fqn, Fields fields) throws IOException {
|
public Task getByName(String fqn, Fields fields) throws IOException {
|
||||||
Chart chart = EntityUtil.validate(fqn, chartDAO().findByFQN(fqn), Chart.class);
|
Task task = EntityUtil.validate(fqn, taskDAO().findByFQN(fqn), Task.class);
|
||||||
return setFields(chart, fields);
|
return setFields(task, fields);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
public Chart create(Chart chart, EntityReference service, EntityReference owner) throws IOException {
|
public Task create(Task task, EntityReference service, EntityReference owner) throws IOException {
|
||||||
getService(service); // Validate service
|
getService(service); // Validate service
|
||||||
return createInternal(chart, service, owner);
|
return createInternal(task, service, owner);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
@ -143,73 +144,73 @@ public abstract class ChartRepository {
|
|||||||
if (relationshipDAO().findToCount(id, Relationship.CONTAINS.ordinal(), Entity.CHART) > 0) {
|
if (relationshipDAO().findToCount(id, Relationship.CONTAINS.ordinal(), Entity.CHART) > 0) {
|
||||||
throw new IllegalArgumentException("Chart is not empty");
|
throw new IllegalArgumentException("Chart is not empty");
|
||||||
}
|
}
|
||||||
if (chartDAO().delete(id) <= 0) {
|
if (taskDAO().delete(id) <= 0) {
|
||||||
throw EntityNotFoundException.byMessage(entityNotFound(Entity.CHART, id));
|
throw EntityNotFoundException.byMessage(entityNotFound(Entity.CHART, id));
|
||||||
}
|
}
|
||||||
relationshipDAO().deleteAll(id);
|
relationshipDAO().deleteAll(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
public PutResponse<Chart> createOrUpdate(Chart updatedChart, EntityReference service, EntityReference newOwner)
|
public PutResponse<Task> createOrUpdate(Task updatedTask, EntityReference service, EntityReference newOwner)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
getService(service); // Validate service
|
getService(service); // Validate service
|
||||||
|
|
||||||
String fqn = getFQN(service, updatedChart);
|
String fqn = getFQN(service, updatedTask);
|
||||||
Chart storedDB = JsonUtils.readValue(chartDAO().findByFQN(fqn), Chart.class);
|
Task storedDB = JsonUtils.readValue(taskDAO().findByFQN(fqn), Task.class);
|
||||||
if (storedDB == null) { // Chart does not exist. Create a new one
|
if (storedDB == null) { // Chart does not exist. Create a new one
|
||||||
return new PutResponse<>(Status.CREATED, createInternal(updatedChart, service, newOwner));
|
return new PutResponse<>(Status.CREATED, createInternal(updatedTask, service, newOwner));
|
||||||
}
|
}
|
||||||
// Update the existing chart
|
// Update the existing chart
|
||||||
EntityUtil.populateOwner(userDAO(), teamDAO(), newOwner); // Validate new owner
|
EntityUtil.populateOwner(userDAO(), teamDAO(), newOwner); // Validate new owner
|
||||||
if (storedDB.getDescription() == null || storedDB.getDescription().isEmpty()) {
|
if (storedDB.getDescription() == null || storedDB.getDescription().isEmpty()) {
|
||||||
storedDB.withDescription(updatedChart.getDescription());
|
storedDB.withDescription(updatedTask.getDescription());
|
||||||
}
|
}
|
||||||
|
|
||||||
//update the display name from source
|
//update the display name from source
|
||||||
if (updatedChart.getDisplayName() != null && !updatedChart.getDisplayName().isEmpty()) {
|
if (updatedTask.getDisplayName() != null && !updatedTask.getDisplayName().isEmpty()) {
|
||||||
storedDB.withDisplayName(updatedChart.getDisplayName());
|
storedDB.withDisplayName(updatedTask.getDisplayName());
|
||||||
}
|
}
|
||||||
chartDAO().update(storedDB.getId().toString(), JsonUtils.pojoToJson(storedDB));
|
taskDAO().update(storedDB.getId().toString(), JsonUtils.pojoToJson(storedDB));
|
||||||
|
|
||||||
// Update owner relationship
|
// Update owner relationship
|
||||||
setFields(storedDB, CHART_UPDATE_FIELDS); // First get the ownership information
|
setFields(storedDB, TASK_UPDATE_FIELDS); // First get the ownership information
|
||||||
updateOwner(storedDB, storedDB.getOwner(), newOwner);
|
updateOwner(storedDB, storedDB.getOwner(), newOwner);
|
||||||
|
|
||||||
// Service can't be changed in update since service name is part of FQN and
|
// Service can't be changed in update since service name is part of FQN and
|
||||||
// change to a different service will result in a different FQN and creation of a new chart under the new service
|
// change to a different service will result in a different FQN and creation of a new chart under the new service
|
||||||
storedDB.setService(service);
|
storedDB.setService(service);
|
||||||
applyTags(updatedChart);
|
applyTags(updatedTask);
|
||||||
|
|
||||||
return new PutResponse<>(Status.OK, storedDB);
|
return new PutResponse<>(Status.OK, storedDB);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
public Chart patch(String id, JsonPatch patch) throws IOException {
|
public Task patch(String id, JsonPatch patch) throws IOException {
|
||||||
Chart original = setFields(validateChart(id), CHART_PATCH_FIELDS);
|
Task original = setFields(validateTask(id), TASK_PATCH_FIELDS);
|
||||||
Chart updated = JsonUtils.applyPatch(original, patch, Chart.class);
|
Task updated = JsonUtils.applyPatch(original, patch, Task.class);
|
||||||
patch(original, updated);
|
patch(original, updated);
|
||||||
return updated;
|
return updated;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Chart createInternal(Chart chart, EntityReference service, EntityReference owner) throws IOException {
|
public Task createInternal(Task task, EntityReference service, EntityReference owner) throws IOException {
|
||||||
chart.setFullyQualifiedName(getFQN(service, chart));
|
task.setFullyQualifiedName(getFQN(service, task));
|
||||||
EntityUtil.populateOwner(userDAO(), teamDAO(), owner); // Validate owner
|
EntityUtil.populateOwner(userDAO(), teamDAO(), owner); // Validate owner
|
||||||
|
|
||||||
// Query 1 - insert chart into chart_entity table
|
// Query 1 - insert chart into chart_entity table
|
||||||
chartDAO().insert(JsonUtils.pojoToJson(chart));
|
taskDAO().insert(JsonUtils.pojoToJson(task));
|
||||||
setService(chart, service);
|
setService(task, service);
|
||||||
setOwner(chart, owner);
|
setOwner(task, owner);
|
||||||
applyTags(chart);
|
applyTags(task);
|
||||||
return chart;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void applyTags(Chart chart) throws IOException {
|
private void applyTags(Task task) throws IOException {
|
||||||
// Add chart level tags by adding tag to chart relationship
|
// Add chart level tags by adding tag to chart relationship
|
||||||
EntityUtil.applyTags(tagDAO(), chart.getTags(), chart.getFullyQualifiedName());
|
EntityUtil.applyTags(tagDAO(), task.getTags(), task.getFullyQualifiedName());
|
||||||
chart.setTags(getTags(chart.getFullyQualifiedName())); // Update tag to handle additional derived tags
|
task.setTags(getTags(task.getFullyQualifiedName())); // Update tag to handle additional derived tags
|
||||||
}
|
}
|
||||||
|
|
||||||
private void patch(Chart original, Chart updated) throws IOException {
|
private void patch(Task original, Task updated) throws IOException {
|
||||||
String chartId = original.getId().toString();
|
String chartId = original.getId().toString();
|
||||||
if (!original.getId().equals(updated.getId())) {
|
if (!original.getId().equals(updated.getId())) {
|
||||||
throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.CHART, "id"));
|
throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.CHART, "id"));
|
||||||
@ -229,57 +230,57 @@ public abstract class ChartRepository {
|
|||||||
updated.setHref(null);
|
updated.setHref(null);
|
||||||
updated.setOwner(null);
|
updated.setOwner(null);
|
||||||
updated.setService(null);
|
updated.setService(null);
|
||||||
chartDAO().update(chartId, JsonUtils.pojoToJson(updated));
|
taskDAO().update(chartId, JsonUtils.pojoToJson(updated));
|
||||||
updateOwner(updated, original.getOwner(), newOwner);
|
updateOwner(updated, original.getOwner(), newOwner);
|
||||||
updated.setService(newService);
|
updated.setService(newService);
|
||||||
applyTags(updated);
|
applyTags(updated);
|
||||||
}
|
}
|
||||||
|
|
||||||
public EntityReference getOwner(Chart chart) throws IOException {
|
public EntityReference getOwner(Task task) throws IOException {
|
||||||
if (chart == null) {
|
if (task == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return EntityUtil.populateOwner(chart.getId(), relationshipDAO(), userDAO(), teamDAO());
|
return EntityUtil.populateOwner(task.getId(), relationshipDAO(), userDAO(), teamDAO());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setOwner(Chart chart, EntityReference owner) {
|
private void setOwner(Task task, EntityReference owner) {
|
||||||
EntityUtil.setOwner(relationshipDAO(), chart.getId(), Entity.CHART, owner);
|
EntityUtil.setOwner(relationshipDAO(), task.getId(), Entity.TASK, owner);
|
||||||
chart.setOwner(owner);
|
task.setOwner(owner);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateOwner(Chart chart, EntityReference origOwner, EntityReference newOwner) {
|
private void updateOwner(Task task, EntityReference origOwner, EntityReference newOwner) {
|
||||||
EntityUtil.updateOwner(relationshipDAO(), origOwner, newOwner, chart.getId(), Entity.CHART);
|
EntityUtil.updateOwner(relationshipDAO(), origOwner, newOwner, task.getId(), Entity.TASK);
|
||||||
chart.setOwner(newOwner);
|
task.setOwner(newOwner);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Chart validateChart(String id) throws IOException {
|
private Task validateTask(String id) throws IOException {
|
||||||
return EntityUtil.validate(id, chartDAO().findById(id), Chart.class);
|
return EntityUtil.validate(id, taskDAO().findById(id), Task.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Chart setFields(Chart chart, Fields fields) throws IOException {
|
private Task setFields(Task task, Fields fields) throws IOException {
|
||||||
chart.setOwner(fields.contains("owner") ? getOwner(chart) : null);
|
task.setOwner(fields.contains("owner") ? getOwner(task) : null);
|
||||||
chart.setService(fields.contains("service") ? getService(chart) : null);
|
task.setService(fields.contains("service") ? getService(task) : null);
|
||||||
chart.setFollowers(fields.contains("followers") ? getFollowers(chart) : null);
|
task.setFollowers(fields.contains("followers") ? getFollowers(task) : null);
|
||||||
chart.setTags(fields.contains("tags") ? getTags(chart.getFullyQualifiedName()) : null);
|
task.setTags(fields.contains("tags") ? getTags(task.getFullyQualifiedName()) : null);
|
||||||
return chart;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<EntityReference> getFollowers(Chart chart) throws IOException {
|
private List<EntityReference> getFollowers(Task task) throws IOException {
|
||||||
return chart == null ? null : EntityUtil.getFollowers(chart.getId(), relationshipDAO(), userDAO());
|
return task == null ? null : EntityUtil.getFollowers(task.getId(), relationshipDAO(), userDAO());
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<TagLabel> getTags(String fqn) {
|
private List<TagLabel> getTags(String fqn) {
|
||||||
return tagDAO().getTags(fqn);
|
return tagDAO().getTags(fqn);
|
||||||
}
|
}
|
||||||
|
|
||||||
private EntityReference getService(Chart chart) throws IOException {
|
private EntityReference getService(Task task) throws IOException {
|
||||||
return chart == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(relationshipDAO(),
|
return task == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(relationshipDAO(),
|
||||||
chart.getId(), Entity.DASHBOARD_SERVICE)));
|
task.getId(), Entity.PIPELINE_SERVICE)));
|
||||||
}
|
}
|
||||||
|
|
||||||
private EntityReference getService(EntityReference service) throws IOException {
|
private EntityReference getService(EntityReference service) throws IOException {
|
||||||
String id = service.getId().toString();
|
String id = service.getId().toString();
|
||||||
if (service.getType().equalsIgnoreCase(Entity.DASHBOARD_SERVICE)) {
|
if (service.getType().equalsIgnoreCase(Entity.PIPELINE_SERVICE)) {
|
||||||
DashboardService serviceInstance = EntityUtil.validate(id, dashboardServiceDAO().findById(id),
|
DashboardService serviceInstance = EntityUtil.validate(id, dashboardServiceDAO().findById(id),
|
||||||
DashboardService.class);
|
DashboardService.class);
|
||||||
service.setDescription(serviceInstance.getDescription());
|
service.setDescription(serviceInstance.getDescription());
|
||||||
@ -290,19 +291,19 @@ public abstract class ChartRepository {
|
|||||||
return service;
|
return service;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setService(Chart chart, EntityReference service) throws IOException {
|
public void setService(Task task, EntityReference service) throws IOException {
|
||||||
if (service != null && chart != null) {
|
if (service != null && task != null) {
|
||||||
getService(service); // Populate service details
|
getService(service); // Populate service details
|
||||||
relationshipDAO().insert(service.getId().toString(), chart.getId().toString(), service.getType(),
|
relationshipDAO().insert(service.getId().toString(), task.getId().toString(), service.getType(),
|
||||||
Entity.CHART, Relationship.CONTAINS.ordinal());
|
Entity.CHART, Relationship.CONTAINS.ordinal());
|
||||||
chart.setService(service);
|
task.setService(service);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
public Status addFollower(String chartId, String userId) throws IOException {
|
public Status addFollower(String taskId, String userId) throws IOException {
|
||||||
EntityUtil.validate(chartId, chartDAO().findById(chartId), Chart.class);
|
EntityUtil.validate(taskId, taskDAO().findById(taskId), Task.class);
|
||||||
return EntityUtil.addFollower(relationshipDAO(), userDAO(), chartId, Entity.CHART, userId, Entity.USER) ?
|
return EntityUtil.addFollower(relationshipDAO(), userDAO(), taskId, Entity.TASK, userId, Entity.USER) ?
|
||||||
Status.CREATED : Status.OK;
|
Status.CREATED : Status.OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -312,36 +313,36 @@ public abstract class ChartRepository {
|
|||||||
EntityUtil.removeFollower(relationshipDAO(), chartId, userId);
|
EntityUtil.removeFollower(relationshipDAO(), chartId, userId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface ChartDAO {
|
public interface TaskDAO {
|
||||||
@SqlUpdate("INSERT INTO chart_entity (json) VALUES (:json)")
|
@SqlUpdate("INSERT INTO task_entity (json) VALUES (:json)")
|
||||||
void insert(@Bind("json") String json);
|
void insert(@Bind("json") String json);
|
||||||
|
|
||||||
@SqlUpdate("UPDATE chart_entity SET json = :json where id = :id")
|
@SqlUpdate("UPDATE task_entity SET json = :json where id = :id")
|
||||||
void update(@Bind("id") String id, @Bind("json") String json);
|
void update(@Bind("id") String id, @Bind("json") String json);
|
||||||
|
|
||||||
@SqlQuery("SELECT json FROM chart_entity WHERE fullyQualifiedName = :name")
|
@SqlQuery("SELECT json FROM task_entity WHERE fullyQualifiedName = :name")
|
||||||
String findByFQN(@Bind("name") String name);
|
String findByFQN(@Bind("name") String name);
|
||||||
|
|
||||||
@SqlQuery("SELECT json FROM chart_entity WHERE id = :id")
|
@SqlQuery("SELECT json FROM task_entity WHERE id = :id")
|
||||||
String findById(@Bind("id") String id);
|
String findById(@Bind("id") String id);
|
||||||
|
|
||||||
@SqlQuery("SELECT count(*) FROM chart_entity WHERE " +
|
@SqlQuery("SELECT count(*) FROM task_entity WHERE " +
|
||||||
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL)")
|
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL)")
|
||||||
int listCount(@Bind("fqnPrefix") String fqnPrefix);
|
int listCount(@Bind("fqnPrefix") String fqnPrefix);
|
||||||
|
|
||||||
@SqlQuery(
|
@SqlQuery(
|
||||||
"SELECT json FROM (" +
|
"SELECT json FROM (" +
|
||||||
"SELECT fullyQualifiedName, json FROM chart_entity WHERE " +
|
"SELECT fullyQualifiedName, json FROM task_entity WHERE " +
|
||||||
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " +// Filter by
|
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " +// Filter by
|
||||||
// service name
|
// service name
|
||||||
"fullyQualifiedName < :before " + // Pagination by chart fullyQualifiedName
|
"fullyQualifiedName < :before " + // Pagination by task fullyQualifiedName
|
||||||
"ORDER BY fullyQualifiedName DESC " + // Pagination ordering by chart fullyQualifiedName
|
"ORDER BY fullyQualifiedName DESC " + // Pagination ordering by task fullyQualifiedName
|
||||||
"LIMIT :limit" +
|
"LIMIT :limit" +
|
||||||
") last_rows_subquery ORDER BY fullyQualifiedName")
|
") last_rows_subquery ORDER BY fullyQualifiedName")
|
||||||
List<String> listBefore(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit,
|
List<String> listBefore(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit,
|
||||||
@Bind("before") String before);
|
@Bind("before") String before);
|
||||||
|
|
||||||
@SqlQuery("SELECT json FROM chart_entity WHERE " +
|
@SqlQuery("SELECT json FROM task_entity WHERE " +
|
||||||
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " +
|
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " +
|
||||||
"fullyQualifiedName > :after " +
|
"fullyQualifiedName > :after " +
|
||||||
"ORDER BY fullyQualifiedName " +
|
"ORDER BY fullyQualifiedName " +
|
||||||
@ -349,10 +350,10 @@ public abstract class ChartRepository {
|
|||||||
List<String> listAfter(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit,
|
List<String> listAfter(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit,
|
||||||
@Bind("after") String after);
|
@Bind("after") String after);
|
||||||
|
|
||||||
@SqlQuery("SELECT EXISTS (SELECT * FROM chart_entity WHERE id = :id)")
|
@SqlQuery("SELECT EXISTS (SELECT * FROM task_entity WHERE id = :id)")
|
||||||
boolean exists(@Bind("id") String id);
|
boolean exists(@Bind("id") String id);
|
||||||
|
|
||||||
@SqlUpdate("DELETE FROM chart_entity WHERE id = :id")
|
@SqlUpdate("DELETE FROM task_entity WHERE id = :id")
|
||||||
int delete(@Bind("id") String id);
|
int delete(@Bind("id") String id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.openmetadata.catalog.resources.services.messaging;
|
package org.openmetadata.catalog.resources.services.pipeline;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
@ -56,12 +56,12 @@ import java.util.List;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
@Path("/v1/services/messagingServices")
|
@Path("/v1/services/pipelineServices")
|
||||||
@Api(value = "Messaging service collection", tags = "Services -> Messaging service collection")
|
@Api(value = "Pipeline service collection", tags = "Services -> Pipeline service collection")
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Consumes(MediaType.APPLICATION_JSON)
|
@Consumes(MediaType.APPLICATION_JSON)
|
||||||
@Collection(name = "messagingServices", repositoryClass = "org.openmetadata.catalog.jdbi3.MessagingServiceRepository")
|
@Collection(name = "messagingServices", repositoryClass = "org.openmetadata.catalog.jdbi3.MessagingServiceRepository")
|
||||||
public class MessagingServiceResource {
|
public class PipelineServiceResource {
|
||||||
private final MessagingServiceRepository dao;
|
private final MessagingServiceRepository dao;
|
||||||
private final CatalogAuthorizer authorizer;
|
private final CatalogAuthorizer authorizer;
|
||||||
|
|
||||||
@ -80,7 +80,7 @@ public class MessagingServiceResource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public MessagingServiceResource(MessagingServiceRepository dao, CatalogAuthorizer authorizer) {
|
public PipelineServiceResource(MessagingServiceRepository dao, CatalogAuthorizer authorizer) {
|
||||||
Objects.requireNonNull(dao, "MessagingServiceRepository must not be null");
|
Objects.requireNonNull(dao, "MessagingServiceRepository must not be null");
|
||||||
this.dao = dao;
|
this.dao = dao;
|
||||||
this.authorizer = authorizer;
|
this.authorizer = authorizer;
|
||||||
|
@ -14,7 +14,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.openmetadata.catalog.resources.charts;
|
package org.openmetadata.catalog.resources.tasks;
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import io.swagger.annotations.Api;
|
import io.swagger.annotations.Api;
|
||||||
@ -26,10 +26,12 @@ import io.swagger.v3.oas.annotations.media.ExampleObject;
|
|||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import io.swagger.v3.oas.annotations.parameters.RequestBody;
|
import io.swagger.v3.oas.annotations.parameters.RequestBody;
|
||||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||||
import org.openmetadata.catalog.api.data.CreateChart;
|
import org.openmetadata.catalog.api.data.CreateTask;
|
||||||
import org.openmetadata.catalog.entity.data.Chart;
|
import org.openmetadata.catalog.entity.data.Chart;
|
||||||
import org.openmetadata.catalog.entity.data.Dashboard;
|
import org.openmetadata.catalog.entity.data.Dashboard;
|
||||||
|
import org.openmetadata.catalog.entity.data.Task;
|
||||||
import org.openmetadata.catalog.jdbi3.ChartRepository;
|
import org.openmetadata.catalog.jdbi3.ChartRepository;
|
||||||
|
import org.openmetadata.catalog.jdbi3.TaskRepository;
|
||||||
import org.openmetadata.catalog.resources.Collection;
|
import org.openmetadata.catalog.resources.Collection;
|
||||||
import org.openmetadata.catalog.security.CatalogAuthorizer;
|
import org.openmetadata.catalog.security.CatalogAuthorizer;
|
||||||
import org.openmetadata.catalog.security.SecurityUtil;
|
import org.openmetadata.catalog.security.SecurityUtil;
|
||||||
@ -73,49 +75,49 @@ import java.util.Objects;
|
|||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
@Path("/v1/charts")
|
@Path("/v1/tasks")
|
||||||
@Api(value = "Chart data asset collection", tags = "Chart data asset collection")
|
@Api(value = "tasks data asset collection", tags = "Task data asset collection")
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Consumes(MediaType.APPLICATION_JSON)
|
@Consumes(MediaType.APPLICATION_JSON)
|
||||||
@Collection(name = "charts", repositoryClass = "org.openmetadata.catalog.jdbi3.ChartRepository")
|
@Collection(name = "tasks", repositoryClass = "org.openmetadata.catalog.jdbi3.TaskRepository")
|
||||||
public class ChartResource {
|
public class TaskResource {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ChartResource.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TaskResource.class);
|
||||||
private static final String CHART_COLLECTION_PATH = "v1/charts/";
|
private static final String TASK_COLLECTION_PATH = "v1/tasks/";
|
||||||
private final ChartRepository dao;
|
private final TaskRepository dao;
|
||||||
private final CatalogAuthorizer authorizer;
|
private final CatalogAuthorizer authorizer;
|
||||||
|
|
||||||
public static void addHref(UriInfo uriInfo, EntityReference ref) {
|
public static void addHref(UriInfo uriInfo, EntityReference ref) {
|
||||||
ref.withHref(RestUtil.getHref(uriInfo, CHART_COLLECTION_PATH, ref.getId()));
|
ref.withHref(RestUtil.getHref(uriInfo, TASK_COLLECTION_PATH, ref.getId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<Chart> addHref(UriInfo uriInfo, List<Chart> charts) {
|
public static List<Task> addHref(UriInfo uriInfo, List<Task> tasks) {
|
||||||
Optional.ofNullable(charts).orElse(Collections.emptyList()).forEach(i -> addHref(uriInfo, i));
|
Optional.ofNullable(tasks).orElse(Collections.emptyList()).forEach(i -> addHref(uriInfo, i));
|
||||||
return charts;
|
return tasks;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Chart addHref(UriInfo uriInfo, Chart chart) {
|
public static Task addHref(UriInfo uriInfo, Task task) {
|
||||||
chart.setHref(RestUtil.getHref(uriInfo, CHART_COLLECTION_PATH, chart.getId()));
|
task.setHref(RestUtil.getHref(uriInfo, TASK_COLLECTION_PATH, task.getId()));
|
||||||
EntityUtil.addHref(uriInfo, chart.getOwner());
|
EntityUtil.addHref(uriInfo, task.getOwner());
|
||||||
EntityUtil.addHref(uriInfo, chart.getService());
|
EntityUtil.addHref(uriInfo, task.getService());
|
||||||
EntityUtil.addHref(uriInfo, chart.getFollowers());
|
EntityUtil.addHref(uriInfo, task.getFollowers());
|
||||||
|
|
||||||
return chart;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ChartResource(ChartRepository dao, CatalogAuthorizer authorizer) {
|
public TaskResource(TaskRepository dao, CatalogAuthorizer authorizer) {
|
||||||
Objects.requireNonNull(dao, "ChartRepository must not be null");
|
Objects.requireNonNull(dao, "TaskRepository must not be null");
|
||||||
this.dao = dao;
|
this.dao = dao;
|
||||||
this.authorizer = authorizer;
|
this.authorizer = authorizer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class ChartList extends ResultList<Chart> {
|
public static class TaskList extends ResultList<Task> {
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
ChartList() {
|
TaskList() {
|
||||||
// Empty constructor needed for deserialization
|
// Empty constructor needed for deserialization
|
||||||
}
|
}
|
||||||
|
|
||||||
public ChartList(List<Chart> data, String beforeCursor, String afterCursor, int total)
|
public TaskList(List<Task> data, String beforeCursor, String afterCursor, int total)
|
||||||
throws GeneralSecurityException, UnsupportedEncodingException {
|
throws GeneralSecurityException, UnsupportedEncodingException {
|
||||||
super(data, beforeCursor, afterCursor, total);
|
super(data, beforeCursor, afterCursor, total);
|
||||||
}
|
}
|
||||||
@ -127,16 +129,16 @@ public class ChartResource {
|
|||||||
|
|
||||||
@GET
|
@GET
|
||||||
@Valid
|
@Valid
|
||||||
@Operation(summary = "List charts", tags = "charts",
|
@Operation(summary = "List tasks", tags = "tasks",
|
||||||
description = "Get a list of charts, optionally filtered by `service` it belongs to. Use `fields` " +
|
description = "Get a list of tasks, optionally filtered by `service` it belongs to. Use `fields` " +
|
||||||
"parameter to get only necessary fields. Use cursor-based pagination to limit the number " +
|
"parameter to get only necessary fields. Use cursor-based pagination to limit the number " +
|
||||||
"entries in the list using `limit` and `before` or `after` query params.",
|
"entries in the list using `limit` and `before` or `after` query params.",
|
||||||
responses = {
|
responses = {
|
||||||
@ApiResponse(responseCode = "200", description = "List of charts",
|
@ApiResponse(responseCode = "200", description = "List of charts",
|
||||||
content = @Content(mediaType = "application/json",
|
content = @Content(mediaType = "application/json",
|
||||||
schema = @Schema(implementation = ChartList.class)))
|
schema = @Schema(implementation = TaskList.class)))
|
||||||
})
|
})
|
||||||
public ChartList list(@Context UriInfo uriInfo,
|
public TaskList list(@Context UriInfo uriInfo,
|
||||||
@Context SecurityContext securityContext,
|
@Context SecurityContext securityContext,
|
||||||
@Parameter(description = "Fields requested in the returned resource",
|
@Parameter(description = "Fields requested in the returned resource",
|
||||||
schema = @Schema(type = "string", example = FIELDS))
|
schema = @Schema(type = "string", example = FIELDS))
|
||||||
@ -144,7 +146,7 @@ public class ChartResource {
|
|||||||
@Parameter(description = "Filter charts by service name",
|
@Parameter(description = "Filter charts by service name",
|
||||||
schema = @Schema(type = "string", example = "superset"))
|
schema = @Schema(type = "string", example = "superset"))
|
||||||
@QueryParam("service") String serviceParam,
|
@QueryParam("service") String serviceParam,
|
||||||
@Parameter(description = "Limit the number charts returned. (1 to 1000000, default = 10)")
|
@Parameter(description = "Limit the number tasks returned. (1 to 1000000, default = 10)")
|
||||||
@DefaultValue("10")
|
@DefaultValue("10")
|
||||||
@Min(1)
|
@Min(1)
|
||||||
@Max(1000000)
|
@Max(1000000)
|
||||||
@ -159,31 +161,31 @@ public class ChartResource {
|
|||||||
RestUtil.validateCursors(before, after);
|
RestUtil.validateCursors(before, after);
|
||||||
Fields fields = new Fields(FIELD_LIST, fieldsParam);
|
Fields fields = new Fields(FIELD_LIST, fieldsParam);
|
||||||
|
|
||||||
ChartList charts;
|
TaskList tasks;
|
||||||
if (before != null) { // Reverse paging
|
if (before != null) { // Reverse paging
|
||||||
charts = dao.listBefore(fields, serviceParam, limitParam, before); // Ask for one extra entry
|
tasks = dao.listBefore(fields, serviceParam, limitParam, before); // Ask for one extra entry
|
||||||
} else { // Forward paging or first page
|
} else { // Forward paging or first page
|
||||||
charts = dao.listAfter(fields, serviceParam, limitParam, after);
|
tasks = dao.listAfter(fields, serviceParam, limitParam, after);
|
||||||
}
|
}
|
||||||
addHref(uriInfo, charts.getData());
|
addHref(uriInfo, tasks.getData());
|
||||||
return charts;
|
return tasks;
|
||||||
}
|
}
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
@Path("/{id}")
|
@Path("/{id}")
|
||||||
@Operation(summary = "Get a Chart", tags = "charts",
|
@Operation(summary = "Get a Task", tags = "tasks",
|
||||||
description = "Get a chart by `id`.",
|
description = "Get a task by `id`.",
|
||||||
responses = {
|
responses = {
|
||||||
@ApiResponse(responseCode = "200", description = "The chart",
|
@ApiResponse(responseCode = "200", description = "The Task",
|
||||||
content = @Content(mediaType = "application/json",
|
content = @Content(mediaType = "application/json",
|
||||||
schema = @Schema(implementation = Dashboard.class))),
|
schema = @Schema(implementation = Dashboard.class))),
|
||||||
@ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found")
|
@ApiResponse(responseCode = "404", description = "Task for instance {id} is not found")
|
||||||
})
|
})
|
||||||
public Chart get(@Context UriInfo uriInfo, @PathParam("id") String id,
|
public Task get(@Context UriInfo uriInfo, @PathParam("id") String id,
|
||||||
@Context SecurityContext securityContext,
|
@Context SecurityContext securityContext,
|
||||||
@Parameter(description = "Fields requested in the returned resource",
|
@Parameter(description = "Fields requested in the returned resource",
|
||||||
schema = @Schema(type = "string", example = FIELDS))
|
schema = @Schema(type = "string", example = FIELDS))
|
||||||
@QueryParam("fields") String fieldsParam) throws IOException {
|
@QueryParam("fields") String fieldsParam) throws IOException {
|
||||||
Fields fields = new Fields(FIELD_LIST, fieldsParam);
|
Fields fields = new Fields(FIELD_LIST, fieldsParam);
|
||||||
return addHref(uriInfo, dao.get(id, fields));
|
return addHref(uriInfo, dao.get(id, fields));
|
||||||
}
|
}
|
||||||
@ -204,9 +206,9 @@ public class ChartResource {
|
|||||||
schema = @Schema(type = "string", example = FIELDS))
|
schema = @Schema(type = "string", example = FIELDS))
|
||||||
@QueryParam("fields") String fieldsParam) throws IOException {
|
@QueryParam("fields") String fieldsParam) throws IOException {
|
||||||
Fields fields = new Fields(FIELD_LIST, fieldsParam);
|
Fields fields = new Fields(FIELD_LIST, fieldsParam);
|
||||||
Chart chart = dao.getByName(fqn, fields);
|
Task task = dao.getByName(fqn, fields);
|
||||||
addHref(uriInfo, chart);
|
addHref(uriInfo, task);
|
||||||
return Response.ok(chart).build();
|
return Response.ok(task).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@POST
|
@POST
|
||||||
@ -219,27 +221,28 @@ public class ChartResource {
|
|||||||
@ApiResponse(responseCode = "400", description = "Bad request")
|
@ApiResponse(responseCode = "400", description = "Bad request")
|
||||||
})
|
})
|
||||||
public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext,
|
public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext,
|
||||||
@Valid CreateChart create) throws IOException {
|
@Valid CreateTask create) throws IOException {
|
||||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||||
Chart chart =
|
Task task =
|
||||||
new Chart().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName())
|
new Task().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName())
|
||||||
.withDescription(create.getDescription())
|
.withDescription(create.getDescription())
|
||||||
.withService(create.getService())
|
.withService(create.getService())
|
||||||
.withChartType(create.getChartType()).withChartUrl(create.getChartUrl())
|
.withTaskConfig(create.getTaskConfig())
|
||||||
.withTables(create.getTables()).withTags(create.getTags())
|
.withTaskUrl(create.getTaskUrl())
|
||||||
|
.withTags(create.getTags())
|
||||||
.withOwner(create.getOwner());
|
.withOwner(create.getOwner());
|
||||||
chart = addHref(uriInfo, dao.create(chart, create.getService(), create.getOwner()));
|
task = addHref(uriInfo, dao.create(task, create.getService(), create.getOwner()));
|
||||||
return Response.created(chart.getHref()).entity(chart).build();
|
return Response.created(task.getHref()).entity(task).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@PATCH
|
@PATCH
|
||||||
@Path("/{id}")
|
@Path("/{id}")
|
||||||
@Operation(summary = "Update a chart", tags = "charts",
|
@Operation(summary = "Update a Task", tags = "task",
|
||||||
description = "Update an existing chart using JsonPatch.",
|
description = "Update an existing chart using JsonPatch.",
|
||||||
externalDocs = @ExternalDocumentation(description = "JsonPatch RFC",
|
externalDocs = @ExternalDocumentation(description = "JsonPatch RFC",
|
||||||
url = "https://tools.ietf.org/html/rfc6902"))
|
url = "https://tools.ietf.org/html/rfc6902"))
|
||||||
@Consumes(MediaType.APPLICATION_JSON_PATCH_JSON)
|
@Consumes(MediaType.APPLICATION_JSON_PATCH_JSON)
|
||||||
public Chart updateDescription(@Context UriInfo uriInfo,
|
public Task updateDescription(@Context UriInfo uriInfo,
|
||||||
@Context SecurityContext securityContext,
|
@Context SecurityContext securityContext,
|
||||||
@PathParam("id") String id,
|
@PathParam("id") String id,
|
||||||
@RequestBody(description = "JsonPatch with array of operations",
|
@RequestBody(description = "JsonPatch with array of operations",
|
||||||
@ -250,63 +253,64 @@ public class ChartResource {
|
|||||||
"]")}))
|
"]")}))
|
||||||
JsonPatch patch) throws IOException {
|
JsonPatch patch) throws IOException {
|
||||||
Fields fields = new Fields(FIELD_LIST, FIELDS);
|
Fields fields = new Fields(FIELD_LIST, FIELDS);
|
||||||
Chart chart = dao.get(id, fields);
|
Task task = dao.get(id, fields);
|
||||||
SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext,
|
SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext,
|
||||||
EntityUtil.getEntityReference(chart));
|
EntityUtil.getEntityReference(task));
|
||||||
chart = dao.patch(id, patch);
|
task = dao.patch(id, patch);
|
||||||
return addHref(uriInfo, chart);
|
return addHref(uriInfo, task);
|
||||||
}
|
}
|
||||||
|
|
||||||
@PUT
|
@PUT
|
||||||
@Operation(summary = "Create or update chart", tags = "charts",
|
@Operation(summary = "Create or update task", tags = "tasks",
|
||||||
description = "Create a chart, it it does not exist or update an existing chart.",
|
description = "Create a task, it it does not exist or update an existing chart.",
|
||||||
responses = {
|
responses = {
|
||||||
@ApiResponse(responseCode = "200", description = "The updated chart ",
|
@ApiResponse(responseCode = "200", description = "The updated task ",
|
||||||
content = @Content(mediaType = "application/json",
|
content = @Content(mediaType = "application/json",
|
||||||
schema = @Schema(implementation = Chart.class)))
|
schema = @Schema(implementation = Chart.class)))
|
||||||
})
|
})
|
||||||
public Response createOrUpdate(@Context UriInfo uriInfo,
|
public Response createOrUpdate(@Context UriInfo uriInfo,
|
||||||
@Context SecurityContext securityContext,
|
@Context SecurityContext securityContext,
|
||||||
@Valid CreateChart create) throws IOException {
|
@Valid CreateTask create) throws IOException {
|
||||||
|
|
||||||
Chart chart =
|
Task task =
|
||||||
new Chart().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName())
|
new Task().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName())
|
||||||
.withDescription(create.getDescription())
|
.withDescription(create.getDescription())
|
||||||
.withService(create.getService())
|
.withService(create.getService())
|
||||||
.withChartType(create.getChartType()).withChartUrl(create.getChartUrl())
|
.withTaskUrl(create.getTaskUrl())
|
||||||
.withTables(create.getTables()).withTags(create.getTags())
|
.withTaskConfig(create.getTaskConfig())
|
||||||
|
.withTags(create.getTags())
|
||||||
.withOwner(create.getOwner());
|
.withOwner(create.getOwner());
|
||||||
PutResponse<Chart> response = dao.createOrUpdate(chart, create.getService(), create.getOwner());
|
PutResponse<Task> response = dao.createOrUpdate(task, create.getService(), create.getOwner());
|
||||||
chart = addHref(uriInfo, response.getEntity());
|
task = addHref(uriInfo, response.getEntity());
|
||||||
return Response.status(response.getStatus()).entity(chart).build();
|
return Response.status(response.getStatus()).entity(task).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@PUT
|
@PUT
|
||||||
@Path("/{id}/followers")
|
@Path("/{id}/followers")
|
||||||
@Operation(summary = "Add a follower", tags = "charts",
|
@Operation(summary = "Add a follower", tags = "tasks",
|
||||||
description = "Add a user identified by `userId` as followed of this chart",
|
description = "Add a user identified by `userId` as followed of this chart",
|
||||||
responses = {
|
responses = {
|
||||||
@ApiResponse(responseCode = "200", description = "OK"),
|
@ApiResponse(responseCode = "200", description = "OK"),
|
||||||
@ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found")
|
@ApiResponse(responseCode = "404", description = "Task for instance {id} is not found")
|
||||||
})
|
})
|
||||||
public Response addFollower(@Context UriInfo uriInfo,
|
public Response addFollower(@Context UriInfo uriInfo,
|
||||||
@Context SecurityContext securityContext,
|
@Context SecurityContext securityContext,
|
||||||
@Parameter(description = "Id of the chart", schema = @Schema(type = "string"))
|
@Parameter(description = "Id of the task", schema = @Schema(type = "string"))
|
||||||
@PathParam("id") String id,
|
@PathParam("id") String id,
|
||||||
@Parameter(description = "Id of the user to be added as follower",
|
@Parameter(description = "Id of the user to be added as follower",
|
||||||
schema = @Schema(type = "string"))
|
schema = @Schema(type = "string"))
|
||||||
String userId) throws IOException, ParseException {
|
String userId) throws IOException, ParseException {
|
||||||
Fields fields = new Fields(FIELD_LIST, "followers");
|
Fields fields = new Fields(FIELD_LIST, "followers");
|
||||||
Response.Status status = dao.addFollower(id, userId);
|
Response.Status status = dao.addFollower(id, userId);
|
||||||
Chart chart = dao.get(id, fields);
|
Task task = dao.get(id, fields);
|
||||||
return Response.status(status).entity(chart).build();
|
return Response.status(status).entity(task).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@DELETE
|
@DELETE
|
||||||
@Path("/{id}/followers/{userId}")
|
@Path("/{id}/followers/{userId}")
|
||||||
@Operation(summary = "Remove a follower", tags = "charts",
|
@Operation(summary = "Remove a follower", tags = "charts",
|
||||||
description = "Remove the user identified `userId` as a follower of the chart.")
|
description = "Remove the user identified `userId` as a follower of the chart.")
|
||||||
public Chart deleteFollower(@Context UriInfo uriInfo,
|
public Task deleteFollower(@Context UriInfo uriInfo,
|
||||||
@Context SecurityContext securityContext,
|
@Context SecurityContext securityContext,
|
||||||
@Parameter(description = "Id of the chart",
|
@Parameter(description = "Id of the chart",
|
||||||
schema = @Schema(type = "string"))
|
schema = @Schema(type = "string"))
|
||||||
@ -316,15 +320,15 @@ public class ChartResource {
|
|||||||
@PathParam("userId") String userId) throws IOException, ParseException {
|
@PathParam("userId") String userId) throws IOException, ParseException {
|
||||||
Fields fields = new Fields(FIELD_LIST, "followers");
|
Fields fields = new Fields(FIELD_LIST, "followers");
|
||||||
dao.deleteFollower(id, userId);
|
dao.deleteFollower(id, userId);
|
||||||
Chart chart = dao.get(id, fields);
|
Task task = dao.get(id, fields);
|
||||||
return addHref(uriInfo, chart);
|
return addHref(uriInfo, task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@DELETE
|
@DELETE
|
||||||
@Path("/{id}")
|
@Path("/{id}")
|
||||||
@Operation(summary = "Delete a Chart", tags = "charts",
|
@Operation(summary = "Delete a Task", tags = "tasks",
|
||||||
description = "Delete a chart by `id`.",
|
description = "Delete a task by `id`.",
|
||||||
responses = {
|
responses = {
|
||||||
@ApiResponse(responseCode = "200", description = "OK"),
|
@ApiResponse(responseCode = "200", description = "OK"),
|
||||||
@ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found")
|
@ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found")
|
||||||
|
@ -23,6 +23,7 @@ import org.openmetadata.catalog.entity.data.Database;
|
|||||||
import org.openmetadata.catalog.entity.data.Metrics;
|
import org.openmetadata.catalog.entity.data.Metrics;
|
||||||
import org.openmetadata.catalog.entity.data.Report;
|
import org.openmetadata.catalog.entity.data.Report;
|
||||||
import org.openmetadata.catalog.entity.data.Table;
|
import org.openmetadata.catalog.entity.data.Table;
|
||||||
|
import org.openmetadata.catalog.entity.data.Task;
|
||||||
import org.openmetadata.catalog.entity.data.Topic;
|
import org.openmetadata.catalog.entity.data.Topic;
|
||||||
import org.openmetadata.catalog.entity.services.DashboardService;
|
import org.openmetadata.catalog.entity.services.DashboardService;
|
||||||
import org.openmetadata.catalog.entity.services.DatabaseService;
|
import org.openmetadata.catalog.entity.services.DatabaseService;
|
||||||
@ -40,6 +41,7 @@ import org.openmetadata.catalog.jdbi3.Relationship;
|
|||||||
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
|
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
|
||||||
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
|
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
|
||||||
import org.openmetadata.catalog.jdbi3.TagRepository.TagDAO;
|
import org.openmetadata.catalog.jdbi3.TagRepository.TagDAO;
|
||||||
|
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
|
||||||
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
|
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
|
||||||
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
|
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
|
||||||
import org.openmetadata.catalog.jdbi3.UsageRepository.UsageDAO;
|
import org.openmetadata.catalog.jdbi3.UsageRepository.UsageDAO;
|
||||||
@ -52,6 +54,7 @@ import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
|
|||||||
import org.openmetadata.catalog.resources.services.dashboard.DashboardServiceResource;
|
import org.openmetadata.catalog.resources.services.dashboard.DashboardServiceResource;
|
||||||
import org.openmetadata.catalog.resources.services.database.DatabaseServiceResource;
|
import org.openmetadata.catalog.resources.services.database.DatabaseServiceResource;
|
||||||
import org.openmetadata.catalog.resources.services.messaging.MessagingServiceResource;
|
import org.openmetadata.catalog.resources.services.messaging.MessagingServiceResource;
|
||||||
|
import org.openmetadata.catalog.resources.tasks.TaskResource;
|
||||||
import org.openmetadata.catalog.resources.teams.TeamResource;
|
import org.openmetadata.catalog.resources.teams.TeamResource;
|
||||||
import org.openmetadata.catalog.resources.teams.UserResource;
|
import org.openmetadata.catalog.resources.teams.UserResource;
|
||||||
import org.openmetadata.catalog.resources.topics.TopicResource;
|
import org.openmetadata.catalog.resources.topics.TopicResource;
|
||||||
@ -136,9 +139,6 @@ public final class EntityUtil {
|
|||||||
case Entity.DATABASE:
|
case Entity.DATABASE:
|
||||||
DatabaseResource.addHref(uriInfo, ref);
|
DatabaseResource.addHref(uriInfo, ref);
|
||||||
break;
|
break;
|
||||||
case Entity.DATABASE_SERVICE:
|
|
||||||
DatabaseServiceResource.addHref(uriInfo, ref);
|
|
||||||
break;
|
|
||||||
case Entity.TOPIC:
|
case Entity.TOPIC:
|
||||||
TopicResource.addHref(uriInfo, ref);
|
TopicResource.addHref(uriInfo, ref);
|
||||||
break;
|
break;
|
||||||
@ -148,12 +148,21 @@ public final class EntityUtil {
|
|||||||
case Entity.DASHBOARD:
|
case Entity.DASHBOARD:
|
||||||
DashboardResource.addHref(uriInfo, ref);
|
DashboardResource.addHref(uriInfo, ref);
|
||||||
break;
|
break;
|
||||||
|
case Entity.TASK:
|
||||||
|
TaskResource.addHref(uriInfo, ref);
|
||||||
|
break;
|
||||||
|
case Entity.DATABASE_SERVICE:
|
||||||
|
DatabaseServiceResource.addHref(uriInfo, ref);
|
||||||
|
break;
|
||||||
case Entity.MESSAGING_SERVICE:
|
case Entity.MESSAGING_SERVICE:
|
||||||
MessagingServiceResource.addHref(uriInfo, ref);
|
MessagingServiceResource.addHref(uriInfo, ref);
|
||||||
break;
|
break;
|
||||||
case Entity.DASHBOARD_SERVICE:
|
case Entity.DASHBOARD_SERVICE:
|
||||||
DashboardServiceResource.addHref(uriInfo, ref);
|
DashboardServiceResource.addHref(uriInfo, ref);
|
||||||
break;
|
break;
|
||||||
|
case Entity.PIPELINE_SERVICE:
|
||||||
|
DashboardServiceResource.addHref(uriInfo, ref);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(ref.getType()));
|
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(ref.getType()));
|
||||||
}
|
}
|
||||||
@ -237,7 +246,8 @@ public final class EntityUtil {
|
|||||||
public static List<EntityReference> getEntityReference(List<EntityReference> list, TableDAO tableDAO,
|
public static List<EntityReference> getEntityReference(List<EntityReference> list, TableDAO tableDAO,
|
||||||
DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
|
DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
|
||||||
DashboardDAO dashboardDAO, ReportDAO reportDAO,
|
DashboardDAO dashboardDAO, ReportDAO reportDAO,
|
||||||
TopicDAO topicDAO, ChartDAO chartDAO) throws IOException {
|
TopicDAO topicDAO, ChartDAO chartDAO,
|
||||||
|
TaskDAO taskDAO) throws IOException {
|
||||||
for (EntityReference ref : list) {
|
for (EntityReference ref : list) {
|
||||||
getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO, chartDAO);
|
getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO, chartDAO);
|
||||||
}
|
}
|
||||||
@ -401,6 +411,8 @@ public final class EntityUtil {
|
|||||||
return getEntityReference(EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class));
|
return getEntityReference(EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class));
|
||||||
case Entity.TOPIC:
|
case Entity.TOPIC:
|
||||||
return getEntityReference(EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class));
|
return getEntityReference(EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class));
|
||||||
|
case Entity.TASK:
|
||||||
|
return getEntityReference(EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Task.class));
|
||||||
default:
|
default:
|
||||||
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entityType, fqn));
|
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entityType, fqn));
|
||||||
}
|
}
|
||||||
@ -419,6 +431,11 @@ public final class EntityUtil {
|
|||||||
return details;
|
return details;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static EntityReference getEntityReference(Task task) {
|
||||||
|
return new EntityReference().withDescription(task.getDescription()).withId(task.getId())
|
||||||
|
.withName(task.getFullyQualifiedName()).withType(Entity.TASK);
|
||||||
|
}
|
||||||
|
|
||||||
public static EntityReference getEntityReference(Chart chart) {
|
public static EntityReference getEntityReference(Chart chart) {
|
||||||
return new EntityReference().withDescription(chart.getDescription()).withId(chart.getId())
|
return new EntityReference().withDescription(chart.getDescription()).withId(chart.getId())
|
||||||
.withName(chart.getFullyQualifiedName()).withType(Entity.CHART);
|
.withName(chart.getFullyQualifiedName()).withType(Entity.CHART);
|
||||||
|
@ -6,7 +6,7 @@
|
|||||||
"type": "object",
|
"type": "object",
|
||||||
"properties" : {
|
"properties" : {
|
||||||
"name": {
|
"name": {
|
||||||
"description": "Name that identifies this dashboard.",
|
"description": "Name that identifies this Chart.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"minLength": 1,
|
"minLength": 1,
|
||||||
"maxLength": 64
|
"maxLength": 64
|
||||||
@ -16,7 +16,7 @@
|
|||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"description": {
|
"description": {
|
||||||
"description": "Description of the database instance. What it has and how to use it.",
|
"description": "Description of the chart instance. What it has and how to use it.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"chartType": {
|
"chartType": {
|
||||||
|
@ -1,35 +1,51 @@
|
|||||||
{
|
{
|
||||||
"$id": "https://open-metadata.org/schema/api/data/createChart.json",
|
"$id": "https://open-metadata.org/schema/api/data/createTask.json",
|
||||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
"title": "Create Chart entity request",
|
"title": "Create Task entity request",
|
||||||
"description": "Create Chart entity request",
|
"description": "Create Task entity request",
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties" : {
|
"properties" : {
|
||||||
"name": {
|
"name": {
|
||||||
"description": "Name that identifies this dashboard.",
|
"description": "Name that identifies this Task.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"minLength": 1,
|
"minLength": 1,
|
||||||
"maxLength": 64
|
"maxLength": 64
|
||||||
},
|
},
|
||||||
"displayName": {
|
"displayName": {
|
||||||
"description": "Display Name that identifies this Chart. It could be title or label from the source services",
|
"description": "Display Name that identifies this Task. It could be title or label from the pipeline services",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"description": {
|
"description": {
|
||||||
"description": "Description of the database instance. What it has and how to use it.",
|
"description": "Description of the task instance. What it has and how to use it.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"chartType": {
|
"chartType": {
|
||||||
"$ref": "../../entity/data/chart.json#/definitions/chartType"
|
"$ref": "../../entity/data/chart.json#/definitions/chartType"
|
||||||
},
|
},
|
||||||
"chartUrl" : {
|
"taskUrl" : {
|
||||||
"description": "Chart URL, pointing to its own Service URL",
|
"description": "Task URL to visit/manage. This URL points to respective pipeline service UI",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"format": "uri"
|
"format": "uri"
|
||||||
},
|
},
|
||||||
"tables": {
|
"upstreamTasks": {
|
||||||
"description": "Link to tables used in this chart.",
|
"description": "All the tasks that are upstream of this task.",
|
||||||
"$ref": "../../type/entityReference.json#/definitions/entityReferenceList"
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"$ref": "../../type/entityReference.json"
|
||||||
|
},
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
"downstreamTasks": {
|
||||||
|
"description": "All the tasks that are downstream of this task.",
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"$ref": "../../type/entityReference.json"
|
||||||
|
},
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
"taskConfig": {
|
||||||
|
"description": "Task Configuration.",
|
||||||
|
"$ref": "../../entity/data/task.json#definitions/taskConfig"
|
||||||
},
|
},
|
||||||
"tags": {
|
"tags": {
|
||||||
"description": "Tags for this chart",
|
"description": "Tags for this chart",
|
||||||
@ -40,11 +56,11 @@
|
|||||||
"default": null
|
"default": null
|
||||||
},
|
},
|
||||||
"owner": {
|
"owner": {
|
||||||
"description": "Owner of this database",
|
"description": "Owner of this Task",
|
||||||
"$ref": "../../type/entityReference.json"
|
"$ref": "../../type/entityReference.json"
|
||||||
},
|
},
|
||||||
"service" : {
|
"service" : {
|
||||||
"description": "Link to the database service where this database is hosted in",
|
"description": "Link to the pipeline service where this task is used",
|
||||||
"$ref" : "../../type/entityReference.json"
|
"$ref" : "../../type/entityReference.json"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -16,6 +16,10 @@
|
|||||||
"minLength": 1,
|
"minLength": 1,
|
||||||
"maxLength": 64
|
"maxLength": 64
|
||||||
},
|
},
|
||||||
|
"displayName": {
|
||||||
|
"description": "Display Name that identifies this Pipeline. It could be title or label from the source services.",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
"fullyQualifiedName": {
|
"fullyQualifiedName": {
|
||||||
"description": "A unique name that identifies a pipeline in the format 'ServiceName.PipelineName'.",
|
"description": "A unique name that identifies a pipeline in the format 'ServiceName.PipelineName'.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
@ -23,9 +27,34 @@
|
|||||||
"maxLength": 64
|
"maxLength": 64
|
||||||
},
|
},
|
||||||
"description": {
|
"description": {
|
||||||
"description": "Description of this pipeline.",
|
"description": "Description of this Pipeline.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
|
"pipelineUrl" : {
|
||||||
|
"description": "Pipeline URL to visit/manage. This URL points to respective pipeline service UI",
|
||||||
|
"type": "string",
|
||||||
|
"format": "uri"
|
||||||
|
},
|
||||||
|
"tasks": {
|
||||||
|
"description": "All the tasks that are part of pipeline.",
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"$ref": "../../type/entityReference.json"
|
||||||
|
},
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
"followers": {
|
||||||
|
"description": "Followers of this Pipeline.",
|
||||||
|
"$ref": "../../type/entityReference.json#/definitions/entityReferenceList"
|
||||||
|
},
|
||||||
|
"tags": {
|
||||||
|
"description": "Tags for this Pipeline.",
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"$ref": "../../type/tagLabel.json"
|
||||||
|
},
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
"href": {
|
"href": {
|
||||||
"description": "Link to the resource corresponding to this entity.",
|
"description": "Link to the resource corresponding to this entity.",
|
||||||
"$ref": "../../type/basic.json#/definitions/href"
|
"$ref": "../../type/basic.json#/definitions/href"
|
||||||
|
@ -0,0 +1,105 @@
|
|||||||
|
{
|
||||||
|
"$id": "https://open-metadata.org/schema/entity/data/task.json",
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"title": "Task",
|
||||||
|
"description": "This schema defines the Task entity. A task is a unit of computation in a Pipeline. ",
|
||||||
|
"type": "object",
|
||||||
|
"javaType": "org.openmetadata.catalog.entity.data.Task",
|
||||||
|
"definitions": {
|
||||||
|
"taskConfig": {
|
||||||
|
"type": "object",
|
||||||
|
"javaType": "org.openmetadata.catalog.type.TaskConfig",
|
||||||
|
"description": "This schema defines the type for a column in a table.",
|
||||||
|
"properties": {
|
||||||
|
"codeLocation": {
|
||||||
|
"description": "Location of task file",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"startDate": {
|
||||||
|
"description": "Start Date of the task",
|
||||||
|
"$ref": "../../type/basic.json#/definitions/date"
|
||||||
|
},
|
||||||
|
"concurrency": {
|
||||||
|
"description": "Concurrency of the Task",
|
||||||
|
"type": "integer"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"properties" : {
|
||||||
|
"id": {
|
||||||
|
"description": "Unique identifier that identifies a task instance.",
|
||||||
|
"$ref": "../../type/basic.json#/definitions/uuid"
|
||||||
|
},
|
||||||
|
"name": {
|
||||||
|
"description": "Name that identifies this task instance uniquely.",
|
||||||
|
"type": "string",
|
||||||
|
"minLength": 1,
|
||||||
|
"maxLength": 64
|
||||||
|
},
|
||||||
|
"displayName": {
|
||||||
|
"description": "Display Name that identifies this Task. It could be title or label from the pipeline services.",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"fullyQualifiedName": {
|
||||||
|
"description": "A unique name that identifies a pipeline in the format 'ServiceName.PipelineName.TaskName'.",
|
||||||
|
"type": "string",
|
||||||
|
"minLength": 1,
|
||||||
|
"maxLength": 64
|
||||||
|
},
|
||||||
|
"description": {
|
||||||
|
"description": "Description of this Task.",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"taskUrl" : {
|
||||||
|
"description": "Task URL to visit/manage. This URL points to respective pipeline service UI",
|
||||||
|
"type": "string",
|
||||||
|
"format": "uri"
|
||||||
|
},
|
||||||
|
"upstreamTasks": {
|
||||||
|
"description": "All the tasks that are upstream of this task.",
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"$ref": "../../type/entityReference.json"
|
||||||
|
},
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
"downstreamTasks": {
|
||||||
|
"description": "All the tasks that are downstream of this task.",
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"$ref": "../../type/entityReference.json"
|
||||||
|
},
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
"taskConfig": {
|
||||||
|
"description": "Task Configuration.",
|
||||||
|
"$ref": "#/definitions/taskConfig"
|
||||||
|
},
|
||||||
|
"followers": {
|
||||||
|
"description": "Followers of this Pipeline.",
|
||||||
|
"$ref": "../../type/entityReference.json#/definitions/entityReferenceList"
|
||||||
|
},
|
||||||
|
"tags": {
|
||||||
|
"description": "Tags for this Pipeline.",
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"$ref": "../../type/tagLabel.json"
|
||||||
|
},
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
"href": {
|
||||||
|
"description": "Link to the resource corresponding to this entity.",
|
||||||
|
"$ref": "../../type/basic.json#/definitions/href"
|
||||||
|
},
|
||||||
|
"owner": {
|
||||||
|
"description": "Owner of this pipeline.",
|
||||||
|
"$ref": "../../type/entityReference.json"
|
||||||
|
},
|
||||||
|
"service" : {
|
||||||
|
"description": "Link to service where this pipeline is hosted in.",
|
||||||
|
"$ref" : "../../type/entityReference.json"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": ["id", "name", "service"]
|
||||||
|
}
|
@ -2,62 +2,45 @@
|
|||||||
"$id": "https://open-metadata.org/schema/entity/services/messagingService.json",
|
"$id": "https://open-metadata.org/schema/entity/services/messagingService.json",
|
||||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
"title": "Messaging Service",
|
"title": "Messaging Service",
|
||||||
"description": "This schema defines the Messaging Service entity, such as Kafka and Pulsar.",
|
"description": "This schema defines the Pipeline Service entity, such as Airflow and Prefect.",
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"definitions": {
|
"definitions": {
|
||||||
"messagingServiceType": {
|
"pipelineServiceType": {
|
||||||
"description": "Type of messaging service - Kafka or Pulsar.",
|
"description": "Type of pipeline service - Airflow or Prefect.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"enum": [
|
"enum": [
|
||||||
"Kafka",
|
"Airflow",
|
||||||
"Pulsar"
|
"Prefect"
|
||||||
],
|
],
|
||||||
"javaEnums": [
|
"javaEnums": [
|
||||||
{
|
{
|
||||||
"name": "Kafka"
|
"name": "Airflow"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "Pulsar"
|
"name": "Prefect"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
|
||||||
"brokers": {
|
|
||||||
"description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.",
|
|
||||||
"type": "array",
|
|
||||||
"items": {
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"default": null
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"properties": {
|
"properties": {
|
||||||
"id": {
|
"id": {
|
||||||
"description": "Unique identifier of this messaging service instance.",
|
"description": "Unique identifier of this pipeline service instance.",
|
||||||
"$ref": "../../type/basic.json#/definitions/uuid"
|
"$ref": "../../type/basic.json#/definitions/uuid"
|
||||||
},
|
},
|
||||||
"name": {
|
"name": {
|
||||||
"description": "Name that identifies this messaging service.",
|
"description": "Name that identifies this pipeline service.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"minLength": 1,
|
"minLength": 1,
|
||||||
"maxLength": 64
|
"maxLength": 64
|
||||||
},
|
},
|
||||||
"serviceType": {
|
"serviceType": {
|
||||||
"description": "Type of messaging service such as Kafka or Pulsar...",
|
"description": "Type of pipeline service such as Airflow or Prefect...",
|
||||||
"$ref": "#/definitions/messagingServiceType"
|
"$ref": "#/definitions/pipelineServiceType"
|
||||||
},
|
},
|
||||||
"description": {
|
"description": {
|
||||||
"description": "Description of a messaging service instance.",
|
"description": "Description of a messaging service instance.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"brokers": {
|
|
||||||
"description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.",
|
|
||||||
"$ref" : "#/definitions/brokers"
|
|
||||||
},
|
|
||||||
"schemaRegistry" : {
|
|
||||||
"description": "Schema registry URL.",
|
|
||||||
"type": "string",
|
|
||||||
"format": "uri"
|
|
||||||
},
|
|
||||||
"ingestionSchedule": {
|
"ingestionSchedule": {
|
||||||
"description": "Schedule for running metadata ingestion jobs.",
|
"description": "Schedule for running metadata ingestion jobs.",
|
||||||
"$ref": "../../type/schedule.json"
|
"$ref": "../../type/schedule.json"
|
||||||
@ -69,8 +52,6 @@
|
|||||||
},
|
},
|
||||||
"required": [
|
"required": [
|
||||||
"id",
|
"id",
|
||||||
"name",
|
"name"
|
||||||
"serviceType",
|
|
||||||
"brokers"
|
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user