mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-03 20:19:31 +00:00
[WIP] Airlfow integration
This commit is contained in:
parent
fb06f73736
commit
75029a6e87
@ -0,0 +1,358 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.catalog.jdbi3;
|
||||
|
||||
import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.entity.data.Chart;
|
||||
import org.openmetadata.catalog.entity.services.DashboardService;
|
||||
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.catalog.exception.EntityNotFoundException;
|
||||
import org.openmetadata.catalog.jdbi3.DashboardServiceRepository.DashboardServiceDAO;
|
||||
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
|
||||
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
|
||||
import org.openmetadata.catalog.resources.charts.ChartResource;
|
||||
import org.openmetadata.catalog.resources.charts.ChartResource.ChartList;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.type.TagLabel;
|
||||
import org.openmetadata.catalog.util.EntityUtil;
|
||||
import org.openmetadata.catalog.util.EntityUtil.Fields;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import org.openmetadata.catalog.util.RestUtil.PutResponse;
|
||||
import org.openmetadata.common.utils.CipherText;
|
||||
import org.skife.jdbi.v2.sqlobject.Bind;
|
||||
import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
|
||||
import org.skife.jdbi.v2.sqlobject.SqlQuery;
|
||||
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
|
||||
import org.skife.jdbi.v2.sqlobject.Transaction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.json.JsonPatch;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import java.io.IOException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
|
||||
|
||||
public abstract class ChartRepository {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ChartRepository.class);
|
||||
private static final Fields CHART_UPDATE_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner");
|
||||
private static final Fields CHART_PATCH_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,service,tags");
|
||||
|
||||
public static String getFQN(EntityReference service, Chart chart) {
|
||||
return (service.getName() + "." + chart.getName());
|
||||
}
|
||||
|
||||
@CreateSqlObject
|
||||
abstract ChartDAO chartDAO();
|
||||
|
||||
@CreateSqlObject
|
||||
abstract EntityRelationshipDAO relationshipDAO();
|
||||
|
||||
@CreateSqlObject
|
||||
abstract UserDAO userDAO();
|
||||
|
||||
@CreateSqlObject
|
||||
abstract TeamDAO teamDAO();
|
||||
|
||||
@CreateSqlObject
|
||||
abstract DashboardServiceDAO dashboardServiceDAO();
|
||||
|
||||
@CreateSqlObject
|
||||
abstract TagRepository.TagDAO tagDAO();
|
||||
|
||||
|
||||
@Transaction
|
||||
public ChartList listAfter(Fields fields, String serviceName, int limitParam, String after) throws IOException,
|
||||
GeneralSecurityException {
|
||||
// forward scrolling, if after == null then first page is being asked being asked
|
||||
List<String> jsons = chartDAO().listAfter(serviceName, limitParam + 1, after == null ? "" :
|
||||
CipherText.instance().decrypt(after));
|
||||
|
||||
List<Chart> charts = new ArrayList<>();
|
||||
for (String json : jsons) {
|
||||
charts.add(setFields(JsonUtils.readValue(json, Chart.class), fields));
|
||||
}
|
||||
int total = chartDAO().listCount(serviceName);
|
||||
|
||||
String beforeCursor, afterCursor = null;
|
||||
beforeCursor = after == null ? null : charts.get(0).getFullyQualifiedName();
|
||||
if (charts.size() > limitParam) { // If extra result exists, then next page exists - return after cursor
|
||||
charts.remove(limitParam);
|
||||
afterCursor = charts.get(limitParam - 1).getFullyQualifiedName();
|
||||
}
|
||||
return new ChartList(charts, beforeCursor, afterCursor, total);
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public ChartList listBefore(Fields fields, String serviceName, int limitParam, String before) throws IOException,
|
||||
GeneralSecurityException {
|
||||
// Reverse scrolling - Get one extra result used for computing before cursor
|
||||
List<String> jsons = chartDAO().listBefore(serviceName, limitParam + 1, CipherText.instance().decrypt(before));
|
||||
List<Chart> charts = new ArrayList<>();
|
||||
for (String json : jsons) {
|
||||
charts.add(setFields(JsonUtils.readValue(json, Chart.class), fields));
|
||||
}
|
||||
int total = chartDAO().listCount(serviceName);
|
||||
|
||||
String beforeCursor = null, afterCursor;
|
||||
if (charts.size() > limitParam) { // If extra result exists, then previous page exists - return before cursor
|
||||
charts.remove(0);
|
||||
beforeCursor = charts.get(0).getFullyQualifiedName();
|
||||
}
|
||||
afterCursor = charts.get(charts.size() - 1).getFullyQualifiedName();
|
||||
return new ChartList(charts, beforeCursor, afterCursor, total);
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public Chart get(String id, Fields fields) throws IOException {
|
||||
return setFields(validateChart(id), fields);
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public Chart getByName(String fqn, Fields fields) throws IOException {
|
||||
Chart chart = EntityUtil.validate(fqn, chartDAO().findByFQN(fqn), Chart.class);
|
||||
return setFields(chart, fields);
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public Chart create(Chart chart, EntityReference service, EntityReference owner) throws IOException {
|
||||
getService(service); // Validate service
|
||||
return createInternal(chart, service, owner);
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public void delete(String id) {
|
||||
if (relationshipDAO().findToCount(id, Relationship.CONTAINS.ordinal(), Entity.CHART) > 0) {
|
||||
throw new IllegalArgumentException("Chart is not empty");
|
||||
}
|
||||
if (chartDAO().delete(id) <= 0) {
|
||||
throw EntityNotFoundException.byMessage(entityNotFound(Entity.CHART, id));
|
||||
}
|
||||
relationshipDAO().deleteAll(id);
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public PutResponse<Chart> createOrUpdate(Chart updatedChart, EntityReference service, EntityReference newOwner)
|
||||
throws IOException {
|
||||
getService(service); // Validate service
|
||||
|
||||
String fqn = getFQN(service, updatedChart);
|
||||
Chart storedDB = JsonUtils.readValue(chartDAO().findByFQN(fqn), Chart.class);
|
||||
if (storedDB == null) { // Chart does not exist. Create a new one
|
||||
return new PutResponse<>(Status.CREATED, createInternal(updatedChart, service, newOwner));
|
||||
}
|
||||
// Update the existing chart
|
||||
EntityUtil.populateOwner(userDAO(), teamDAO(), newOwner); // Validate new owner
|
||||
if (storedDB.getDescription() == null || storedDB.getDescription().isEmpty()) {
|
||||
storedDB.withDescription(updatedChart.getDescription());
|
||||
}
|
||||
|
||||
//update the display name from source
|
||||
if (updatedChart.getDisplayName() != null && !updatedChart.getDisplayName().isEmpty()) {
|
||||
storedDB.withDisplayName(updatedChart.getDisplayName());
|
||||
}
|
||||
chartDAO().update(storedDB.getId().toString(), JsonUtils.pojoToJson(storedDB));
|
||||
|
||||
// Update owner relationship
|
||||
setFields(storedDB, CHART_UPDATE_FIELDS); // First get the ownership information
|
||||
updateOwner(storedDB, storedDB.getOwner(), newOwner);
|
||||
|
||||
// Service can't be changed in update since service name is part of FQN and
|
||||
// change to a different service will result in a different FQN and creation of a new chart under the new service
|
||||
storedDB.setService(service);
|
||||
applyTags(updatedChart);
|
||||
|
||||
return new PutResponse<>(Status.OK, storedDB);
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public Chart patch(String id, JsonPatch patch) throws IOException {
|
||||
Chart original = setFields(validateChart(id), CHART_PATCH_FIELDS);
|
||||
Chart updated = JsonUtils.applyPatch(original, patch, Chart.class);
|
||||
patch(original, updated);
|
||||
return updated;
|
||||
}
|
||||
|
||||
public Chart createInternal(Chart chart, EntityReference service, EntityReference owner) throws IOException {
|
||||
chart.setFullyQualifiedName(getFQN(service, chart));
|
||||
EntityUtil.populateOwner(userDAO(), teamDAO(), owner); // Validate owner
|
||||
|
||||
// Query 1 - insert chart into chart_entity table
|
||||
chartDAO().insert(JsonUtils.pojoToJson(chart));
|
||||
setService(chart, service);
|
||||
setOwner(chart, owner);
|
||||
applyTags(chart);
|
||||
return chart;
|
||||
}
|
||||
|
||||
private void applyTags(Chart chart) throws IOException {
|
||||
// Add chart level tags by adding tag to chart relationship
|
||||
EntityUtil.applyTags(tagDAO(), chart.getTags(), chart.getFullyQualifiedName());
|
||||
chart.setTags(getTags(chart.getFullyQualifiedName())); // Update tag to handle additional derived tags
|
||||
}
|
||||
|
||||
private void patch(Chart original, Chart updated) throws IOException {
|
||||
String chartId = original.getId().toString();
|
||||
if (!original.getId().equals(updated.getId())) {
|
||||
throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.CHART, "id"));
|
||||
}
|
||||
if (!original.getName().equals(updated.getName())) {
|
||||
throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.CHART, "name"));
|
||||
}
|
||||
if (updated.getService() == null || !original.getService().getId().equals(updated.getService().getId())) {
|
||||
throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.CHART, "service"));
|
||||
}
|
||||
// Validate new owner
|
||||
EntityReference newOwner = EntityUtil.populateOwner(userDAO(), teamDAO(), updated.getOwner());
|
||||
|
||||
EntityReference newService = updated.getService();
|
||||
// Remove previous tags. Merge tags from the update and the existing tags
|
||||
EntityUtil.removeTags(tagDAO(), original.getFullyQualifiedName());
|
||||
updated.setHref(null);
|
||||
updated.setOwner(null);
|
||||
updated.setService(null);
|
||||
chartDAO().update(chartId, JsonUtils.pojoToJson(updated));
|
||||
updateOwner(updated, original.getOwner(), newOwner);
|
||||
updated.setService(newService);
|
||||
applyTags(updated);
|
||||
}
|
||||
|
||||
public EntityReference getOwner(Chart chart) throws IOException {
|
||||
if (chart == null) {
|
||||
return null;
|
||||
}
|
||||
return EntityUtil.populateOwner(chart.getId(), relationshipDAO(), userDAO(), teamDAO());
|
||||
}
|
||||
|
||||
private void setOwner(Chart chart, EntityReference owner) {
|
||||
EntityUtil.setOwner(relationshipDAO(), chart.getId(), Entity.CHART, owner);
|
||||
chart.setOwner(owner);
|
||||
}
|
||||
|
||||
private void updateOwner(Chart chart, EntityReference origOwner, EntityReference newOwner) {
|
||||
EntityUtil.updateOwner(relationshipDAO(), origOwner, newOwner, chart.getId(), Entity.CHART);
|
||||
chart.setOwner(newOwner);
|
||||
}
|
||||
|
||||
private Chart validateChart(String id) throws IOException {
|
||||
return EntityUtil.validate(id, chartDAO().findById(id), Chart.class);
|
||||
}
|
||||
|
||||
private Chart setFields(Chart chart, Fields fields) throws IOException {
|
||||
chart.setOwner(fields.contains("owner") ? getOwner(chart) : null);
|
||||
chart.setService(fields.contains("service") ? getService(chart) : null);
|
||||
chart.setFollowers(fields.contains("followers") ? getFollowers(chart) : null);
|
||||
chart.setTags(fields.contains("tags") ? getTags(chart.getFullyQualifiedName()) : null);
|
||||
return chart;
|
||||
}
|
||||
|
||||
private List<EntityReference> getFollowers(Chart chart) throws IOException {
|
||||
return chart == null ? null : EntityUtil.getFollowers(chart.getId(), relationshipDAO(), userDAO());
|
||||
}
|
||||
|
||||
private List<TagLabel> getTags(String fqn) {
|
||||
return tagDAO().getTags(fqn);
|
||||
}
|
||||
|
||||
private EntityReference getService(Chart chart) throws IOException {
|
||||
return chart == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(relationshipDAO(),
|
||||
chart.getId(), Entity.DASHBOARD_SERVICE)));
|
||||
}
|
||||
|
||||
private EntityReference getService(EntityReference service) throws IOException {
|
||||
String id = service.getId().toString();
|
||||
if (service.getType().equalsIgnoreCase(Entity.DASHBOARD_SERVICE)) {
|
||||
DashboardService serviceInstance = EntityUtil.validate(id, dashboardServiceDAO().findById(id),
|
||||
DashboardService.class);
|
||||
service.setDescription(serviceInstance.getDescription());
|
||||
service.setName(serviceInstance.getName());
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("Invalid service type %s for the chart", service.getType()));
|
||||
}
|
||||
return service;
|
||||
}
|
||||
|
||||
public void setService(Chart chart, EntityReference service) throws IOException {
|
||||
if (service != null && chart != null) {
|
||||
getService(service); // Populate service details
|
||||
relationshipDAO().insert(service.getId().toString(), chart.getId().toString(), service.getType(),
|
||||
Entity.CHART, Relationship.CONTAINS.ordinal());
|
||||
chart.setService(service);
|
||||
}
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public Status addFollower(String chartId, String userId) throws IOException {
|
||||
EntityUtil.validate(chartId, chartDAO().findById(chartId), Chart.class);
|
||||
return EntityUtil.addFollower(relationshipDAO(), userDAO(), chartId, Entity.CHART, userId, Entity.USER) ?
|
||||
Status.CREATED : Status.OK;
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public void deleteFollower(String chartId, String userId) {
|
||||
EntityUtil.validateUser(userDAO(), userId);
|
||||
EntityUtil.removeFollower(relationshipDAO(), chartId, userId);
|
||||
}
|
||||
|
||||
public interface ChartDAO {
|
||||
@SqlUpdate("INSERT INTO chart_entity (json) VALUES (:json)")
|
||||
void insert(@Bind("json") String json);
|
||||
|
||||
@SqlUpdate("UPDATE chart_entity SET json = :json where id = :id")
|
||||
void update(@Bind("id") String id, @Bind("json") String json);
|
||||
|
||||
@SqlQuery("SELECT json FROM chart_entity WHERE fullyQualifiedName = :name")
|
||||
String findByFQN(@Bind("name") String name);
|
||||
|
||||
@SqlQuery("SELECT json FROM chart_entity WHERE id = :id")
|
||||
String findById(@Bind("id") String id);
|
||||
|
||||
@SqlQuery("SELECT count(*) FROM chart_entity WHERE " +
|
||||
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL)")
|
||||
int listCount(@Bind("fqnPrefix") String fqnPrefix);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT json FROM (" +
|
||||
"SELECT fullyQualifiedName, json FROM chart_entity WHERE " +
|
||||
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " +// Filter by
|
||||
// service name
|
||||
"fullyQualifiedName < :before " + // Pagination by chart fullyQualifiedName
|
||||
"ORDER BY fullyQualifiedName DESC " + // Pagination ordering by chart fullyQualifiedName
|
||||
"LIMIT :limit" +
|
||||
") last_rows_subquery ORDER BY fullyQualifiedName")
|
||||
List<String> listBefore(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit,
|
||||
@Bind("before") String before);
|
||||
|
||||
@SqlQuery("SELECT json FROM chart_entity WHERE " +
|
||||
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " +
|
||||
"fullyQualifiedName > :after " +
|
||||
"ORDER BY fullyQualifiedName " +
|
||||
"LIMIT :limit")
|
||||
List<String> listAfter(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit,
|
||||
@Bind("after") String after);
|
||||
|
||||
@SqlQuery("SELECT EXISTS (SELECT * FROM chart_entity WHERE id = :id)")
|
||||
boolean exists(@Bind("id") String id);
|
||||
|
||||
@SqlUpdate("DELETE FROM chart_entity WHERE id = :id")
|
||||
int delete(@Bind("id") String id);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,203 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.catalog.resources.services.messaging;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.inject.Inject;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import org.openmetadata.catalog.api.services.CreateMessagingService;
|
||||
import org.openmetadata.catalog.api.services.UpdateMessagingService;
|
||||
import org.openmetadata.catalog.entity.data.Dashboard;
|
||||
import org.openmetadata.catalog.entity.services.MessagingService;
|
||||
import org.openmetadata.catalog.jdbi3.MessagingServiceRepository;
|
||||
import org.openmetadata.catalog.resources.Collection;
|
||||
import org.openmetadata.catalog.security.CatalogAuthorizer;
|
||||
import org.openmetadata.catalog.security.SecurityUtil;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.util.RestUtil;
|
||||
import org.openmetadata.catalog.util.ResultList;
|
||||
|
||||
import javax.validation.Valid;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.PUT;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.SecurityContext;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
|
||||
@Path("/v1/services/messagingServices")
|
||||
@Api(value = "Messaging service collection", tags = "Services -> Messaging service collection")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
@Collection(name = "messagingServices", repositoryClass = "org.openmetadata.catalog.jdbi3.MessagingServiceRepository")
|
||||
public class MessagingServiceResource {
|
||||
private final MessagingServiceRepository dao;
|
||||
private final CatalogAuthorizer authorizer;
|
||||
|
||||
public static EntityReference addHref(UriInfo uriInfo, EntityReference service) {
|
||||
return service.withHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", service.getId()));
|
||||
}
|
||||
|
||||
private static List<MessagingService> addHref(UriInfo uriInfo, List<MessagingService> instances) {
|
||||
instances.forEach(i -> addHref(uriInfo, i));
|
||||
return instances;
|
||||
}
|
||||
|
||||
private static MessagingService addHref(UriInfo uriInfo, MessagingService dbService) {
|
||||
dbService.setHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", dbService.getId()));
|
||||
return dbService;
|
||||
}
|
||||
|
||||
@Inject
|
||||
public MessagingServiceResource(MessagingServiceRepository dao, CatalogAuthorizer authorizer) {
|
||||
Objects.requireNonNull(dao, "MessagingServiceRepository must not be null");
|
||||
this.dao = dao;
|
||||
this.authorizer = authorizer;
|
||||
}
|
||||
|
||||
static class MessagingServiceList extends ResultList<MessagingService> {
|
||||
MessagingServiceList(List<MessagingService> data) {
|
||||
super(data);
|
||||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
@Operation(summary = "List messaging services", tags = "services",
|
||||
description = "Get a list of messaging services.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "List of messaging service instances",
|
||||
content = @Content(mediaType = "application/json",
|
||||
schema = @Schema(implementation = MessagingServiceList.class)))
|
||||
})
|
||||
public MessagingServiceList list(@Context UriInfo uriInfo, @QueryParam("name") String name) throws IOException {
|
||||
return new MessagingServiceList(addHref(uriInfo, dao.list(name)));
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/{id}")
|
||||
@Operation(summary = "Get a messaging service", tags = "services",
|
||||
description = "Get a messaging service by `id`.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "Messaging service instance",
|
||||
content = @Content(mediaType = "application/json",
|
||||
schema = @Schema(implementation = Dashboard.class))),
|
||||
@ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found")
|
||||
})
|
||||
public MessagingService get(@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@PathParam("id") String id) throws IOException {
|
||||
return addHref(uriInfo, dao.get(id));
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/name/{name}")
|
||||
@Operation(summary = "Get messaging service by name", tags = "services",
|
||||
description = "Get a messaging service by the service `name`.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "Messaging service instance",
|
||||
content = @Content(mediaType = "application/json",
|
||||
schema = @Schema(implementation = Dashboard.class))),
|
||||
@ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found")
|
||||
})
|
||||
public MessagingService getByName(@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@PathParam("name") String name) throws IOException {
|
||||
return addHref(uriInfo, dao.getByName(name));
|
||||
}
|
||||
|
||||
@POST
|
||||
@Operation(summary = "Create a messaging service", tags = "services",
|
||||
description = "Create a new messaging service.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "Messaging service instance",
|
||||
content = @Content(mediaType = "application/json",
|
||||
schema = @Schema(implementation = MessagingService.class))),
|
||||
@ApiResponse(responseCode = "400", description = "Bad request")
|
||||
})
|
||||
public Response create(@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Valid CreateMessagingService create) throws JsonProcessingException {
|
||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
MessagingService service = new MessagingService().withId(UUID.randomUUID())
|
||||
.withName(create.getName()).withDescription(create.getDescription())
|
||||
.withServiceType(create.getServiceType())
|
||||
.withBrokers(create.getBrokers())
|
||||
.withSchemaRegistry(create.getSchemaRegistry())
|
||||
.withIngestionSchedule(create.getIngestionSchedule());
|
||||
|
||||
addHref(uriInfo, dao.create(service));
|
||||
return Response.created(service.getHref()).entity(service).build();
|
||||
}
|
||||
|
||||
@PUT
|
||||
@Path("/{id}")
|
||||
@Operation(summary = "Update a messaging service", tags = "services",
|
||||
description = "Update an existing messaging service identified by `id`.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "Messaging service instance",
|
||||
content = @Content(mediaType = "application/json",
|
||||
schema = @Schema(implementation = MessagingService.class))),
|
||||
@ApiResponse(responseCode = "400", description = "Bad request")
|
||||
})
|
||||
public Response update(@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "Id of the messaging service", schema = @Schema(type = "string"))
|
||||
@PathParam("id") String id,
|
||||
@Valid UpdateMessagingService update) throws IOException {
|
||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
MessagingService service = addHref(uriInfo,
|
||||
dao.update(id, update.getDescription(), update.getBrokers(), update.getSchemaRegistry(),
|
||||
update.getIngestionSchedule()));
|
||||
return Response.ok(service).build();
|
||||
}
|
||||
|
||||
@DELETE
|
||||
@Path("/{id}")
|
||||
@Operation(summary = "Delete a database service", tags = "services",
|
||||
description = "Delete a database services. If databases (and tables) belong the service, it can't be " +
|
||||
"deleted.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "OK"),
|
||||
@ApiResponse(responseCode = "404", description = "DatabaseService service for instance {id} " +
|
||||
"is not found")
|
||||
})
|
||||
public Response delete(@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "Id of the database service", schema = @Schema(type = "string"))
|
||||
@PathParam("id") String id) {
|
||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
dao.delete(id);
|
||||
return Response.ok().build();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,336 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.catalog.resources.charts;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.v3.oas.annotations.ExternalDocumentation;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.ExampleObject;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import io.swagger.v3.oas.annotations.parameters.RequestBody;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import org.openmetadata.catalog.api.data.CreateChart;
|
||||
import org.openmetadata.catalog.entity.data.Chart;
|
||||
import org.openmetadata.catalog.entity.data.Dashboard;
|
||||
import org.openmetadata.catalog.jdbi3.ChartRepository;
|
||||
import org.openmetadata.catalog.resources.Collection;
|
||||
import org.openmetadata.catalog.security.CatalogAuthorizer;
|
||||
import org.openmetadata.catalog.security.SecurityUtil;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.util.EntityUtil;
|
||||
import org.openmetadata.catalog.util.EntityUtil.Fields;
|
||||
import org.openmetadata.catalog.util.RestUtil;
|
||||
import org.openmetadata.catalog.util.RestUtil.PutResponse;
|
||||
import org.openmetadata.catalog.util.ResultList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.json.JsonPatch;
|
||||
import javax.validation.Valid;
|
||||
import javax.validation.constraints.Max;
|
||||
import javax.validation.constraints.Min;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.PATCH;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.PUT;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.SecurityContext;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.text.ParseException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
@Path("/v1/charts")
|
||||
@Api(value = "Chart data asset collection", tags = "Chart data asset collection")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
@Collection(name = "charts", repositoryClass = "org.openmetadata.catalog.jdbi3.ChartRepository")
|
||||
public class ChartResource {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ChartResource.class);
|
||||
private static final String CHART_COLLECTION_PATH = "v1/charts/";
|
||||
private final ChartRepository dao;
|
||||
private final CatalogAuthorizer authorizer;
|
||||
|
||||
public static void addHref(UriInfo uriInfo, EntityReference ref) {
|
||||
ref.withHref(RestUtil.getHref(uriInfo, CHART_COLLECTION_PATH, ref.getId()));
|
||||
}
|
||||
|
||||
public static List<Chart> addHref(UriInfo uriInfo, List<Chart> charts) {
|
||||
Optional.ofNullable(charts).orElse(Collections.emptyList()).forEach(i -> addHref(uriInfo, i));
|
||||
return charts;
|
||||
}
|
||||
|
||||
public static Chart addHref(UriInfo uriInfo, Chart chart) {
|
||||
chart.setHref(RestUtil.getHref(uriInfo, CHART_COLLECTION_PATH, chart.getId()));
|
||||
EntityUtil.addHref(uriInfo, chart.getOwner());
|
||||
EntityUtil.addHref(uriInfo, chart.getService());
|
||||
EntityUtil.addHref(uriInfo, chart.getFollowers());
|
||||
|
||||
return chart;
|
||||
}
|
||||
|
||||
@Inject
|
||||
public ChartResource(ChartRepository dao, CatalogAuthorizer authorizer) {
|
||||
Objects.requireNonNull(dao, "ChartRepository must not be null");
|
||||
this.dao = dao;
|
||||
this.authorizer = authorizer;
|
||||
}
|
||||
|
||||
public static class ChartList extends ResultList<Chart> {
|
||||
@SuppressWarnings("unused")
|
||||
ChartList() {
|
||||
// Empty constructor needed for deserialization
|
||||
}
|
||||
|
||||
public ChartList(List<Chart> data, String beforeCursor, String afterCursor, int total)
|
||||
throws GeneralSecurityException, UnsupportedEncodingException {
|
||||
super(data, beforeCursor, afterCursor, total);
|
||||
}
|
||||
}
|
||||
|
||||
static final String FIELDS = "owner,service,followers,tags";
|
||||
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "")
|
||||
.split(","));
|
||||
|
||||
@GET
|
||||
@Valid
|
||||
@Operation(summary = "List charts", tags = "charts",
|
||||
description = "Get a list of charts, optionally filtered by `service` it belongs to. Use `fields` " +
|
||||
"parameter to get only necessary fields. Use cursor-based pagination to limit the number " +
|
||||
"entries in the list using `limit` and `before` or `after` query params.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "List of charts",
|
||||
content = @Content(mediaType = "application/json",
|
||||
schema = @Schema(implementation = ChartList.class)))
|
||||
})
|
||||
public ChartList list(@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "Fields requested in the returned resource",
|
||||
schema = @Schema(type = "string", example = FIELDS))
|
||||
@QueryParam("fields") String fieldsParam,
|
||||
@Parameter(description = "Filter charts by service name",
|
||||
schema = @Schema(type = "string", example = "superset"))
|
||||
@QueryParam("service") String serviceParam,
|
||||
@Parameter(description = "Limit the number charts returned. (1 to 1000000, default = 10)")
|
||||
@DefaultValue("10")
|
||||
@Min(1)
|
||||
@Max(1000000)
|
||||
@QueryParam("limit") int limitParam,
|
||||
@Parameter(description = "Returns list of charts before this cursor",
|
||||
schema = @Schema(type = "string"))
|
||||
@QueryParam("before") String before,
|
||||
@Parameter(description = "Returns list of charts after this cursor",
|
||||
schema = @Schema(type = "string"))
|
||||
@QueryParam("after") String after
|
||||
) throws IOException, GeneralSecurityException {
|
||||
RestUtil.validateCursors(before, after);
|
||||
Fields fields = new Fields(FIELD_LIST, fieldsParam);
|
||||
|
||||
ChartList charts;
|
||||
if (before != null) { // Reverse paging
|
||||
charts = dao.listBefore(fields, serviceParam, limitParam, before); // Ask for one extra entry
|
||||
} else { // Forward paging or first page
|
||||
charts = dao.listAfter(fields, serviceParam, limitParam, after);
|
||||
}
|
||||
addHref(uriInfo, charts.getData());
|
||||
return charts;
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/{id}")
|
||||
@Operation(summary = "Get a Chart", tags = "charts",
|
||||
description = "Get a chart by `id`.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "The chart",
|
||||
content = @Content(mediaType = "application/json",
|
||||
schema = @Schema(implementation = Dashboard.class))),
|
||||
@ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found")
|
||||
})
|
||||
public Chart get(@Context UriInfo uriInfo, @PathParam("id") String id,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "Fields requested in the returned resource",
|
||||
schema = @Schema(type = "string", example = FIELDS))
|
||||
@QueryParam("fields") String fieldsParam) throws IOException {
|
||||
Fields fields = new Fields(FIELD_LIST, fieldsParam);
|
||||
return addHref(uriInfo, dao.get(id, fields));
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/name/{fqn}")
|
||||
@Operation(summary = "Get a chart by name", tags = "charts",
|
||||
description = "Get a chart by fully qualified name.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "The chart",
|
||||
content = @Content(mediaType = "application/json",
|
||||
schema = @Schema(implementation = Chart.class))),
|
||||
@ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found")
|
||||
})
|
||||
public Response getByName(@Context UriInfo uriInfo, @PathParam("fqn") String fqn,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "Fields requested in the returned resource",
|
||||
schema = @Schema(type = "string", example = FIELDS))
|
||||
@QueryParam("fields") String fieldsParam) throws IOException {
|
||||
Fields fields = new Fields(FIELD_LIST, fieldsParam);
|
||||
Chart chart = dao.getByName(fqn, fields);
|
||||
addHref(uriInfo, chart);
|
||||
return Response.ok(chart).build();
|
||||
}
|
||||
|
||||
@POST
|
||||
@Operation(summary = "Create a chart", tags = "charts",
|
||||
description = "Create a chart under an existing `service`.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "The chart",
|
||||
content = @Content(mediaType = "application/json",
|
||||
schema = @Schema(implementation = Chart.class))),
|
||||
@ApiResponse(responseCode = "400", description = "Bad request")
|
||||
})
|
||||
public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext,
|
||||
@Valid CreateChart create) throws IOException {
|
||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
Chart chart =
|
||||
new Chart().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName())
|
||||
.withDescription(create.getDescription())
|
||||
.withService(create.getService())
|
||||
.withChartType(create.getChartType()).withChartUrl(create.getChartUrl())
|
||||
.withTables(create.getTables()).withTags(create.getTags())
|
||||
.withOwner(create.getOwner());
|
||||
chart = addHref(uriInfo, dao.create(chart, create.getService(), create.getOwner()));
|
||||
return Response.created(chart.getHref()).entity(chart).build();
|
||||
}
|
||||
|
||||
@PATCH
|
||||
@Path("/{id}")
|
||||
@Operation(summary = "Update a chart", tags = "charts",
|
||||
description = "Update an existing chart using JsonPatch.",
|
||||
externalDocs = @ExternalDocumentation(description = "JsonPatch RFC",
|
||||
url = "https://tools.ietf.org/html/rfc6902"))
|
||||
@Consumes(MediaType.APPLICATION_JSON_PATCH_JSON)
|
||||
public Chart updateDescription(@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@PathParam("id") String id,
|
||||
@RequestBody(description = "JsonPatch with array of operations",
|
||||
content = @Content(mediaType = MediaType.APPLICATION_JSON_PATCH_JSON,
|
||||
examples = {@ExampleObject("[" +
|
||||
"{op:remove, path:/a}," +
|
||||
"{op:add, path: /b, value: val}" +
|
||||
"]")}))
|
||||
JsonPatch patch) throws IOException {
|
||||
Fields fields = new Fields(FIELD_LIST, FIELDS);
|
||||
Chart chart = dao.get(id, fields);
|
||||
SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext,
|
||||
EntityUtil.getEntityReference(chart));
|
||||
chart = dao.patch(id, patch);
|
||||
return addHref(uriInfo, chart);
|
||||
}
|
||||
|
||||
@PUT
|
||||
@Operation(summary = "Create or update chart", tags = "charts",
|
||||
description = "Create a chart, it it does not exist or update an existing chart.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "The updated chart ",
|
||||
content = @Content(mediaType = "application/json",
|
||||
schema = @Schema(implementation = Chart.class)))
|
||||
})
|
||||
public Response createOrUpdate(@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Valid CreateChart create) throws IOException {
|
||||
|
||||
Chart chart =
|
||||
new Chart().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName())
|
||||
.withDescription(create.getDescription())
|
||||
.withService(create.getService())
|
||||
.withChartType(create.getChartType()).withChartUrl(create.getChartUrl())
|
||||
.withTables(create.getTables()).withTags(create.getTags())
|
||||
.withOwner(create.getOwner());
|
||||
PutResponse<Chart> response = dao.createOrUpdate(chart, create.getService(), create.getOwner());
|
||||
chart = addHref(uriInfo, response.getEntity());
|
||||
return Response.status(response.getStatus()).entity(chart).build();
|
||||
}
|
||||
|
||||
@PUT
|
||||
@Path("/{id}/followers")
|
||||
@Operation(summary = "Add a follower", tags = "charts",
|
||||
description = "Add a user identified by `userId` as followed of this chart",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "OK"),
|
||||
@ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found")
|
||||
})
|
||||
public Response addFollower(@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "Id of the chart", schema = @Schema(type = "string"))
|
||||
@PathParam("id") String id,
|
||||
@Parameter(description = "Id of the user to be added as follower",
|
||||
schema = @Schema(type = "string"))
|
||||
String userId) throws IOException, ParseException {
|
||||
Fields fields = new Fields(FIELD_LIST, "followers");
|
||||
Response.Status status = dao.addFollower(id, userId);
|
||||
Chart chart = dao.get(id, fields);
|
||||
return Response.status(status).entity(chart).build();
|
||||
}
|
||||
|
||||
@DELETE
|
||||
@Path("/{id}/followers/{userId}")
|
||||
@Operation(summary = "Remove a follower", tags = "charts",
|
||||
description = "Remove the user identified `userId` as a follower of the chart.")
|
||||
public Chart deleteFollower(@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "Id of the chart",
|
||||
schema = @Schema(type = "string"))
|
||||
@PathParam("id") String id,
|
||||
@Parameter(description = "Id of the user being removed as follower",
|
||||
schema = @Schema(type = "string"))
|
||||
@PathParam("userId") String userId) throws IOException, ParseException {
|
||||
Fields fields = new Fields(FIELD_LIST, "followers");
|
||||
dao.deleteFollower(id, userId);
|
||||
Chart chart = dao.get(id, fields);
|
||||
return addHref(uriInfo, chart);
|
||||
}
|
||||
|
||||
|
||||
@DELETE
|
||||
@Path("/{id}")
|
||||
@Operation(summary = "Delete a Chart", tags = "charts",
|
||||
description = "Delete a chart by `id`.",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "OK"),
|
||||
@ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found")
|
||||
})
|
||||
public Response delete(@Context UriInfo uriInfo, @PathParam("id") String id) {
|
||||
dao.delete(id);
|
||||
return Response.ok().build();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/api/data/createChart.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "Create Chart entity request",
|
||||
"description": "Create Chart entity request",
|
||||
"type": "object",
|
||||
"properties" : {
|
||||
"name": {
|
||||
"description": "Name that identifies this dashboard.",
|
||||
"type": "string",
|
||||
"minLength": 1,
|
||||
"maxLength": 64
|
||||
},
|
||||
"displayName": {
|
||||
"description": "Display Name that identifies this Chart. It could be title or label from the source services",
|
||||
"type": "string"
|
||||
},
|
||||
"description": {
|
||||
"description": "Description of the database instance. What it has and how to use it.",
|
||||
"type": "string"
|
||||
},
|
||||
"chartType": {
|
||||
"$ref": "../../entity/data/chart.json#/definitions/chartType"
|
||||
},
|
||||
"chartUrl" : {
|
||||
"description": "Chart URL, pointing to its own Service URL",
|
||||
"type": "string",
|
||||
"format": "uri"
|
||||
},
|
||||
"tables": {
|
||||
"description": "Link to tables used in this chart.",
|
||||
"$ref": "../../type/entityReference.json#/definitions/entityReferenceList"
|
||||
},
|
||||
"tags": {
|
||||
"description": "Tags for this chart",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "../../type/tagLabel.json"
|
||||
},
|
||||
"default": null
|
||||
},
|
||||
"owner": {
|
||||
"description": "Owner of this database",
|
||||
"$ref": "../../type/entityReference.json"
|
||||
},
|
||||
"service" : {
|
||||
"description": "Link to the database service where this database is hosted in",
|
||||
"$ref" : "../../type/entityReference.json"
|
||||
}
|
||||
},
|
||||
"required": ["name", "service"]
|
||||
}
|
||||
@ -0,0 +1,76 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/entity/services/messagingService.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "Messaging Service",
|
||||
"description": "This schema defines the Messaging Service entity, such as Kafka and Pulsar.",
|
||||
"type": "object",
|
||||
"definitions": {
|
||||
"messagingServiceType": {
|
||||
"description": "Type of messaging service - Kafka or Pulsar.",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"Kafka",
|
||||
"Pulsar"
|
||||
],
|
||||
"javaEnums": [
|
||||
{
|
||||
"name": "Kafka"
|
||||
},
|
||||
{
|
||||
"name": "Pulsar"
|
||||
}
|
||||
]
|
||||
},
|
||||
"brokers": {
|
||||
"description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
},
|
||||
"default": null
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"id": {
|
||||
"description": "Unique identifier of this messaging service instance.",
|
||||
"$ref": "../../type/basic.json#/definitions/uuid"
|
||||
},
|
||||
"name": {
|
||||
"description": "Name that identifies this messaging service.",
|
||||
"type": "string",
|
||||
"minLength": 1,
|
||||
"maxLength": 64
|
||||
},
|
||||
"serviceType": {
|
||||
"description": "Type of messaging service such as Kafka or Pulsar...",
|
||||
"$ref": "#/definitions/messagingServiceType"
|
||||
},
|
||||
"description": {
|
||||
"description": "Description of a messaging service instance.",
|
||||
"type": "string"
|
||||
},
|
||||
"brokers": {
|
||||
"description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.",
|
||||
"$ref" : "#/definitions/brokers"
|
||||
},
|
||||
"schemaRegistry" : {
|
||||
"description": "Schema registry URL.",
|
||||
"type": "string",
|
||||
"format": "uri"
|
||||
},
|
||||
"ingestionSchedule": {
|
||||
"description": "Schedule for running metadata ingestion jobs.",
|
||||
"$ref": "../../type/schedule.json"
|
||||
},
|
||||
"href": {
|
||||
"description": "Link to the resource corresponding to this messaging service.",
|
||||
"$ref": "../../type/basic.json#/definitions/href"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"name",
|
||||
"serviceType",
|
||||
"brokers"
|
||||
]
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user