diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java new file mode 100644 index 00000000000..45b2778f948 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.entity.data.Chart; +import org.openmetadata.catalog.entity.services.DashboardService; +import org.openmetadata.catalog.exception.CatalogExceptionMessage; +import org.openmetadata.catalog.exception.EntityNotFoundException; +import org.openmetadata.catalog.jdbi3.DashboardServiceRepository.DashboardServiceDAO; +import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; +import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO; +import org.openmetadata.catalog.resources.charts.ChartResource; +import org.openmetadata.catalog.resources.charts.ChartResource.ChartList; +import org.openmetadata.catalog.type.EntityReference; +import org.openmetadata.catalog.type.TagLabel; +import org.openmetadata.catalog.util.EntityUtil; +import org.openmetadata.catalog.util.EntityUtil.Fields; +import org.openmetadata.catalog.util.JsonUtils; +import org.openmetadata.catalog.util.RestUtil.PutResponse; +import org.openmetadata.common.utils.CipherText; +import org.skife.jdbi.v2.sqlobject.Bind; +import org.skife.jdbi.v2.sqlobject.CreateSqlObject; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; +import org.skife.jdbi.v2.sqlobject.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.json.JsonPatch; +import javax.ws.rs.core.Response.Status; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; + +public abstract class ChartRepository { + private static final Logger LOG = LoggerFactory.getLogger(ChartRepository.class); + private static final Fields CHART_UPDATE_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner"); + private static final Fields CHART_PATCH_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,service,tags"); + + public static String getFQN(EntityReference service, Chart chart) { + return (service.getName() + "." + chart.getName()); + } + + @CreateSqlObject + abstract ChartDAO chartDAO(); + + @CreateSqlObject + abstract EntityRelationshipDAO relationshipDAO(); + + @CreateSqlObject + abstract UserDAO userDAO(); + + @CreateSqlObject + abstract TeamDAO teamDAO(); + + @CreateSqlObject + abstract DashboardServiceDAO dashboardServiceDAO(); + + @CreateSqlObject + abstract TagRepository.TagDAO tagDAO(); + + + @Transaction + public ChartList listAfter(Fields fields, String serviceName, int limitParam, String after) throws IOException, + GeneralSecurityException { + // forward scrolling, if after == null then first page is being asked being asked + List jsons = chartDAO().listAfter(serviceName, limitParam + 1, after == null ? "" : + CipherText.instance().decrypt(after)); + + List charts = new ArrayList<>(); + for (String json : jsons) { + charts.add(setFields(JsonUtils.readValue(json, Chart.class), fields)); + } + int total = chartDAO().listCount(serviceName); + + String beforeCursor, afterCursor = null; + beforeCursor = after == null ? null : charts.get(0).getFullyQualifiedName(); + if (charts.size() > limitParam) { // If extra result exists, then next page exists - return after cursor + charts.remove(limitParam); + afterCursor = charts.get(limitParam - 1).getFullyQualifiedName(); + } + return new ChartList(charts, beforeCursor, afterCursor, total); + } + + @Transaction + public ChartList listBefore(Fields fields, String serviceName, int limitParam, String before) throws IOException, + GeneralSecurityException { + // Reverse scrolling - Get one extra result used for computing before cursor + List jsons = chartDAO().listBefore(serviceName, limitParam + 1, CipherText.instance().decrypt(before)); + List charts = new ArrayList<>(); + for (String json : jsons) { + charts.add(setFields(JsonUtils.readValue(json, Chart.class), fields)); + } + int total = chartDAO().listCount(serviceName); + + String beforeCursor = null, afterCursor; + if (charts.size() > limitParam) { // If extra result exists, then previous page exists - return before cursor + charts.remove(0); + beforeCursor = charts.get(0).getFullyQualifiedName(); + } + afterCursor = charts.get(charts.size() - 1).getFullyQualifiedName(); + return new ChartList(charts, beforeCursor, afterCursor, total); + } + + @Transaction + public Chart get(String id, Fields fields) throws IOException { + return setFields(validateChart(id), fields); + } + + @Transaction + public Chart getByName(String fqn, Fields fields) throws IOException { + Chart chart = EntityUtil.validate(fqn, chartDAO().findByFQN(fqn), Chart.class); + return setFields(chart, fields); + } + + @Transaction + public Chart create(Chart chart, EntityReference service, EntityReference owner) throws IOException { + getService(service); // Validate service + return createInternal(chart, service, owner); + } + + @Transaction + public void delete(String id) { + if (relationshipDAO().findToCount(id, Relationship.CONTAINS.ordinal(), Entity.CHART) > 0) { + throw new IllegalArgumentException("Chart is not empty"); + } + if (chartDAO().delete(id) <= 0) { + throw EntityNotFoundException.byMessage(entityNotFound(Entity.CHART, id)); + } + relationshipDAO().deleteAll(id); + } + + @Transaction + public PutResponse createOrUpdate(Chart updatedChart, EntityReference service, EntityReference newOwner) + throws IOException { + getService(service); // Validate service + + String fqn = getFQN(service, updatedChart); + Chart storedDB = JsonUtils.readValue(chartDAO().findByFQN(fqn), Chart.class); + if (storedDB == null) { // Chart does not exist. Create a new one + return new PutResponse<>(Status.CREATED, createInternal(updatedChart, service, newOwner)); + } + // Update the existing chart + EntityUtil.populateOwner(userDAO(), teamDAO(), newOwner); // Validate new owner + if (storedDB.getDescription() == null || storedDB.getDescription().isEmpty()) { + storedDB.withDescription(updatedChart.getDescription()); + } + + //update the display name from source + if (updatedChart.getDisplayName() != null && !updatedChart.getDisplayName().isEmpty()) { + storedDB.withDisplayName(updatedChart.getDisplayName()); + } + chartDAO().update(storedDB.getId().toString(), JsonUtils.pojoToJson(storedDB)); + + // Update owner relationship + setFields(storedDB, CHART_UPDATE_FIELDS); // First get the ownership information + updateOwner(storedDB, storedDB.getOwner(), newOwner); + + // Service can't be changed in update since service name is part of FQN and + // change to a different service will result in a different FQN and creation of a new chart under the new service + storedDB.setService(service); + applyTags(updatedChart); + + return new PutResponse<>(Status.OK, storedDB); + } + + @Transaction + public Chart patch(String id, JsonPatch patch) throws IOException { + Chart original = setFields(validateChart(id), CHART_PATCH_FIELDS); + Chart updated = JsonUtils.applyPatch(original, patch, Chart.class); + patch(original, updated); + return updated; + } + + public Chart createInternal(Chart chart, EntityReference service, EntityReference owner) throws IOException { + chart.setFullyQualifiedName(getFQN(service, chart)); + EntityUtil.populateOwner(userDAO(), teamDAO(), owner); // Validate owner + + // Query 1 - insert chart into chart_entity table + chartDAO().insert(JsonUtils.pojoToJson(chart)); + setService(chart, service); + setOwner(chart, owner); + applyTags(chart); + return chart; + } + + private void applyTags(Chart chart) throws IOException { + // Add chart level tags by adding tag to chart relationship + EntityUtil.applyTags(tagDAO(), chart.getTags(), chart.getFullyQualifiedName()); + chart.setTags(getTags(chart.getFullyQualifiedName())); // Update tag to handle additional derived tags + } + + private void patch(Chart original, Chart updated) throws IOException { + String chartId = original.getId().toString(); + if (!original.getId().equals(updated.getId())) { + throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.CHART, "id")); + } + if (!original.getName().equals(updated.getName())) { + throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.CHART, "name")); + } + if (updated.getService() == null || !original.getService().getId().equals(updated.getService().getId())) { + throw new IllegalArgumentException(CatalogExceptionMessage.readOnlyAttribute(Entity.CHART, "service")); + } + // Validate new owner + EntityReference newOwner = EntityUtil.populateOwner(userDAO(), teamDAO(), updated.getOwner()); + + EntityReference newService = updated.getService(); + // Remove previous tags. Merge tags from the update and the existing tags + EntityUtil.removeTags(tagDAO(), original.getFullyQualifiedName()); + updated.setHref(null); + updated.setOwner(null); + updated.setService(null); + chartDAO().update(chartId, JsonUtils.pojoToJson(updated)); + updateOwner(updated, original.getOwner(), newOwner); + updated.setService(newService); + applyTags(updated); + } + + public EntityReference getOwner(Chart chart) throws IOException { + if (chart == null) { + return null; + } + return EntityUtil.populateOwner(chart.getId(), relationshipDAO(), userDAO(), teamDAO()); + } + + private void setOwner(Chart chart, EntityReference owner) { + EntityUtil.setOwner(relationshipDAO(), chart.getId(), Entity.CHART, owner); + chart.setOwner(owner); + } + + private void updateOwner(Chart chart, EntityReference origOwner, EntityReference newOwner) { + EntityUtil.updateOwner(relationshipDAO(), origOwner, newOwner, chart.getId(), Entity.CHART); + chart.setOwner(newOwner); + } + + private Chart validateChart(String id) throws IOException { + return EntityUtil.validate(id, chartDAO().findById(id), Chart.class); + } + + private Chart setFields(Chart chart, Fields fields) throws IOException { + chart.setOwner(fields.contains("owner") ? getOwner(chart) : null); + chart.setService(fields.contains("service") ? getService(chart) : null); + chart.setFollowers(fields.contains("followers") ? getFollowers(chart) : null); + chart.setTags(fields.contains("tags") ? getTags(chart.getFullyQualifiedName()) : null); + return chart; + } + + private List getFollowers(Chart chart) throws IOException { + return chart == null ? null : EntityUtil.getFollowers(chart.getId(), relationshipDAO(), userDAO()); + } + + private List getTags(String fqn) { + return tagDAO().getTags(fqn); + } + + private EntityReference getService(Chart chart) throws IOException { + return chart == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(relationshipDAO(), + chart.getId(), Entity.DASHBOARD_SERVICE))); + } + + private EntityReference getService(EntityReference service) throws IOException { + String id = service.getId().toString(); + if (service.getType().equalsIgnoreCase(Entity.DASHBOARD_SERVICE)) { + DashboardService serviceInstance = EntityUtil.validate(id, dashboardServiceDAO().findById(id), + DashboardService.class); + service.setDescription(serviceInstance.getDescription()); + service.setName(serviceInstance.getName()); + } else { + throw new IllegalArgumentException(String.format("Invalid service type %s for the chart", service.getType())); + } + return service; + } + + public void setService(Chart chart, EntityReference service) throws IOException { + if (service != null && chart != null) { + getService(service); // Populate service details + relationshipDAO().insert(service.getId().toString(), chart.getId().toString(), service.getType(), + Entity.CHART, Relationship.CONTAINS.ordinal()); + chart.setService(service); + } + } + + @Transaction + public Status addFollower(String chartId, String userId) throws IOException { + EntityUtil.validate(chartId, chartDAO().findById(chartId), Chart.class); + return EntityUtil.addFollower(relationshipDAO(), userDAO(), chartId, Entity.CHART, userId, Entity.USER) ? + Status.CREATED : Status.OK; + } + + @Transaction + public void deleteFollower(String chartId, String userId) { + EntityUtil.validateUser(userDAO(), userId); + EntityUtil.removeFollower(relationshipDAO(), chartId, userId); + } + + public interface ChartDAO { + @SqlUpdate("INSERT INTO chart_entity (json) VALUES (:json)") + void insert(@Bind("json") String json); + + @SqlUpdate("UPDATE chart_entity SET json = :json where id = :id") + void update(@Bind("id") String id, @Bind("json") String json); + + @SqlQuery("SELECT json FROM chart_entity WHERE fullyQualifiedName = :name") + String findByFQN(@Bind("name") String name); + + @SqlQuery("SELECT json FROM chart_entity WHERE id = :id") + String findById(@Bind("id") String id); + + @SqlQuery("SELECT count(*) FROM chart_entity WHERE " + + "(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL)") + int listCount(@Bind("fqnPrefix") String fqnPrefix); + + @SqlQuery( + "SELECT json FROM (" + + "SELECT fullyQualifiedName, json FROM chart_entity WHERE " + + "(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " +// Filter by + // service name + "fullyQualifiedName < :before " + // Pagination by chart fullyQualifiedName + "ORDER BY fullyQualifiedName DESC " + // Pagination ordering by chart fullyQualifiedName + "LIMIT :limit" + + ") last_rows_subquery ORDER BY fullyQualifiedName") + List listBefore(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit, + @Bind("before") String before); + + @SqlQuery("SELECT json FROM chart_entity WHERE " + + "(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " + + "fullyQualifiedName > :after " + + "ORDER BY fullyQualifiedName " + + "LIMIT :limit") + List listAfter(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit, + @Bind("after") String after); + + @SqlQuery("SELECT EXISTS (SELECT * FROM chart_entity WHERE id = :id)") + boolean exists(@Bind("id") String id); + + @SqlUpdate("DELETE FROM chart_entity WHERE id = :id") + int delete(@Bind("id") String id); + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/pipeline/PipelineServiceResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/pipeline/PipelineServiceResource.java new file mode 100644 index 00000000000..d9a1792c7e0 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/pipeline/PipelineServiceResource.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.resources.services.messaging; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.inject.Inject; +import io.swagger.annotations.Api; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import org.openmetadata.catalog.api.services.CreateMessagingService; +import org.openmetadata.catalog.api.services.UpdateMessagingService; +import org.openmetadata.catalog.entity.data.Dashboard; +import org.openmetadata.catalog.entity.services.MessagingService; +import org.openmetadata.catalog.jdbi3.MessagingServiceRepository; +import org.openmetadata.catalog.resources.Collection; +import org.openmetadata.catalog.security.CatalogAuthorizer; +import org.openmetadata.catalog.security.SecurityUtil; +import org.openmetadata.catalog.type.EntityReference; +import org.openmetadata.catalog.util.RestUtil; +import org.openmetadata.catalog.util.ResultList; + +import javax.validation.Valid; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; +import javax.ws.rs.core.UriInfo; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.UUID; + +@Path("/v1/services/messagingServices") +@Api(value = "Messaging service collection", tags = "Services -> Messaging service collection") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Collection(name = "messagingServices", repositoryClass = "org.openmetadata.catalog.jdbi3.MessagingServiceRepository") +public class MessagingServiceResource { + private final MessagingServiceRepository dao; + private final CatalogAuthorizer authorizer; + + public static EntityReference addHref(UriInfo uriInfo, EntityReference service) { + return service.withHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", service.getId())); + } + + private static List addHref(UriInfo uriInfo, List instances) { + instances.forEach(i -> addHref(uriInfo, i)); + return instances; + } + + private static MessagingService addHref(UriInfo uriInfo, MessagingService dbService) { + dbService.setHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", dbService.getId())); + return dbService; + } + + @Inject + public MessagingServiceResource(MessagingServiceRepository dao, CatalogAuthorizer authorizer) { + Objects.requireNonNull(dao, "MessagingServiceRepository must not be null"); + this.dao = dao; + this.authorizer = authorizer; + } + + static class MessagingServiceList extends ResultList { + MessagingServiceList(List data) { + super(data); + } + } + + @GET + @Operation(summary = "List messaging services", tags = "services", + description = "Get a list of messaging services.", + responses = { + @ApiResponse(responseCode = "200", description = "List of messaging service instances", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = MessagingServiceList.class))) + }) + public MessagingServiceList list(@Context UriInfo uriInfo, @QueryParam("name") String name) throws IOException { + return new MessagingServiceList(addHref(uriInfo, dao.list(name))); + } + + @GET + @Path("/{id}") + @Operation(summary = "Get a messaging service", tags = "services", + description = "Get a messaging service by `id`.", + responses = { + @ApiResponse(responseCode = "200", description = "Messaging service instance", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = Dashboard.class))), + @ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found") + }) + public MessagingService get(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @PathParam("id") String id) throws IOException { + return addHref(uriInfo, dao.get(id)); + } + + @GET + @Path("/name/{name}") + @Operation(summary = "Get messaging service by name", tags = "services", + description = "Get a messaging service by the service `name`.", + responses = { + @ApiResponse(responseCode = "200", description = "Messaging service instance", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = Dashboard.class))), + @ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found") + }) + public MessagingService getByName(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @PathParam("name") String name) throws IOException { + return addHref(uriInfo, dao.getByName(name)); + } + + @POST + @Operation(summary = "Create a messaging service", tags = "services", + description = "Create a new messaging service.", + responses = { + @ApiResponse(responseCode = "200", description = "Messaging service instance", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = MessagingService.class))), + @ApiResponse(responseCode = "400", description = "Bad request") + }) + public Response create(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Valid CreateMessagingService create) throws JsonProcessingException { + SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); + MessagingService service = new MessagingService().withId(UUID.randomUUID()) + .withName(create.getName()).withDescription(create.getDescription()) + .withServiceType(create.getServiceType()) + .withBrokers(create.getBrokers()) + .withSchemaRegistry(create.getSchemaRegistry()) + .withIngestionSchedule(create.getIngestionSchedule()); + + addHref(uriInfo, dao.create(service)); + return Response.created(service.getHref()).entity(service).build(); + } + + @PUT + @Path("/{id}") + @Operation(summary = "Update a messaging service", tags = "services", + description = "Update an existing messaging service identified by `id`.", + responses = { + @ApiResponse(responseCode = "200", description = "Messaging service instance", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = MessagingService.class))), + @ApiResponse(responseCode = "400", description = "Bad request") + }) + public Response update(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Id of the messaging service", schema = @Schema(type = "string")) + @PathParam("id") String id, + @Valid UpdateMessagingService update) throws IOException { + SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); + MessagingService service = addHref(uriInfo, + dao.update(id, update.getDescription(), update.getBrokers(), update.getSchemaRegistry(), + update.getIngestionSchedule())); + return Response.ok(service).build(); + } + + @DELETE + @Path("/{id}") + @Operation(summary = "Delete a database service", tags = "services", + description = "Delete a database services. If databases (and tables) belong the service, it can't be " + + "deleted.", + responses = { + @ApiResponse(responseCode = "200", description = "OK"), + @ApiResponse(responseCode = "404", description = "DatabaseService service for instance {id} " + + "is not found") + }) + public Response delete(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Id of the database service", schema = @Schema(type = "string")) + @PathParam("id") String id) { + SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); + dao.delete(id); + return Response.ok().build(); + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java new file mode 100644 index 00000000000..d1efb0218e7 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.resources.charts; + +import com.google.inject.Inject; +import io.swagger.annotations.Api; +import io.swagger.v3.oas.annotations.ExternalDocumentation; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.ExampleObject; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.parameters.RequestBody; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import org.openmetadata.catalog.api.data.CreateChart; +import org.openmetadata.catalog.entity.data.Chart; +import org.openmetadata.catalog.entity.data.Dashboard; +import org.openmetadata.catalog.jdbi3.ChartRepository; +import org.openmetadata.catalog.resources.Collection; +import org.openmetadata.catalog.security.CatalogAuthorizer; +import org.openmetadata.catalog.security.SecurityUtil; +import org.openmetadata.catalog.type.EntityReference; +import org.openmetadata.catalog.util.EntityUtil; +import org.openmetadata.catalog.util.EntityUtil.Fields; +import org.openmetadata.catalog.util.RestUtil; +import org.openmetadata.catalog.util.RestUtil.PutResponse; +import org.openmetadata.catalog.util.ResultList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.json.JsonPatch; +import javax.validation.Valid; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.PATCH; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; +import javax.ws.rs.core.UriInfo; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.GeneralSecurityException; +import java.text.ParseException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; + +@Path("/v1/charts") +@Api(value = "Chart data asset collection", tags = "Chart data asset collection") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Collection(name = "charts", repositoryClass = "org.openmetadata.catalog.jdbi3.ChartRepository") +public class ChartResource { + private static final Logger LOG = LoggerFactory.getLogger(ChartResource.class); + private static final String CHART_COLLECTION_PATH = "v1/charts/"; + private final ChartRepository dao; + private final CatalogAuthorizer authorizer; + + public static void addHref(UriInfo uriInfo, EntityReference ref) { + ref.withHref(RestUtil.getHref(uriInfo, CHART_COLLECTION_PATH, ref.getId())); + } + + public static List addHref(UriInfo uriInfo, List charts) { + Optional.ofNullable(charts).orElse(Collections.emptyList()).forEach(i -> addHref(uriInfo, i)); + return charts; + } + + public static Chart addHref(UriInfo uriInfo, Chart chart) { + chart.setHref(RestUtil.getHref(uriInfo, CHART_COLLECTION_PATH, chart.getId())); + EntityUtil.addHref(uriInfo, chart.getOwner()); + EntityUtil.addHref(uriInfo, chart.getService()); + EntityUtil.addHref(uriInfo, chart.getFollowers()); + + return chart; + } + + @Inject + public ChartResource(ChartRepository dao, CatalogAuthorizer authorizer) { + Objects.requireNonNull(dao, "ChartRepository must not be null"); + this.dao = dao; + this.authorizer = authorizer; + } + + public static class ChartList extends ResultList { + @SuppressWarnings("unused") + ChartList() { + // Empty constructor needed for deserialization + } + + public ChartList(List data, String beforeCursor, String afterCursor, int total) + throws GeneralSecurityException, UnsupportedEncodingException { + super(data, beforeCursor, afterCursor, total); + } + } + + static final String FIELDS = "owner,service,followers,tags"; + public static final List FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "") + .split(",")); + + @GET + @Valid + @Operation(summary = "List charts", tags = "charts", + description = "Get a list of charts, optionally filtered by `service` it belongs to. Use `fields` " + + "parameter to get only necessary fields. Use cursor-based pagination to limit the number " + + "entries in the list using `limit` and `before` or `after` query params.", + responses = { + @ApiResponse(responseCode = "200", description = "List of charts", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = ChartList.class))) + }) + public ChartList list(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Fields requested in the returned resource", + schema = @Schema(type = "string", example = FIELDS)) + @QueryParam("fields") String fieldsParam, + @Parameter(description = "Filter charts by service name", + schema = @Schema(type = "string", example = "superset")) + @QueryParam("service") String serviceParam, + @Parameter(description = "Limit the number charts returned. (1 to 1000000, default = 10)") + @DefaultValue("10") + @Min(1) + @Max(1000000) + @QueryParam("limit") int limitParam, + @Parameter(description = "Returns list of charts before this cursor", + schema = @Schema(type = "string")) + @QueryParam("before") String before, + @Parameter(description = "Returns list of charts after this cursor", + schema = @Schema(type = "string")) + @QueryParam("after") String after + ) throws IOException, GeneralSecurityException { + RestUtil.validateCursors(before, after); + Fields fields = new Fields(FIELD_LIST, fieldsParam); + + ChartList charts; + if (before != null) { // Reverse paging + charts = dao.listBefore(fields, serviceParam, limitParam, before); // Ask for one extra entry + } else { // Forward paging or first page + charts = dao.listAfter(fields, serviceParam, limitParam, after); + } + addHref(uriInfo, charts.getData()); + return charts; + } + + @GET + @Path("/{id}") + @Operation(summary = "Get a Chart", tags = "charts", + description = "Get a chart by `id`.", + responses = { + @ApiResponse(responseCode = "200", description = "The chart", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = Dashboard.class))), + @ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found") + }) + public Chart get(@Context UriInfo uriInfo, @PathParam("id") String id, + @Context SecurityContext securityContext, + @Parameter(description = "Fields requested in the returned resource", + schema = @Schema(type = "string", example = FIELDS)) + @QueryParam("fields") String fieldsParam) throws IOException { + Fields fields = new Fields(FIELD_LIST, fieldsParam); + return addHref(uriInfo, dao.get(id, fields)); + } + + @GET + @Path("/name/{fqn}") + @Operation(summary = "Get a chart by name", tags = "charts", + description = "Get a chart by fully qualified name.", + responses = { + @ApiResponse(responseCode = "200", description = "The chart", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = Chart.class))), + @ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found") + }) + public Response getByName(@Context UriInfo uriInfo, @PathParam("fqn") String fqn, + @Context SecurityContext securityContext, + @Parameter(description = "Fields requested in the returned resource", + schema = @Schema(type = "string", example = FIELDS)) + @QueryParam("fields") String fieldsParam) throws IOException { + Fields fields = new Fields(FIELD_LIST, fieldsParam); + Chart chart = dao.getByName(fqn, fields); + addHref(uriInfo, chart); + return Response.ok(chart).build(); + } + + @POST + @Operation(summary = "Create a chart", tags = "charts", + description = "Create a chart under an existing `service`.", + responses = { + @ApiResponse(responseCode = "200", description = "The chart", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = Chart.class))), + @ApiResponse(responseCode = "400", description = "Bad request") + }) + public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext, + @Valid CreateChart create) throws IOException { + SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); + Chart chart = + new Chart().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName()) + .withDescription(create.getDescription()) + .withService(create.getService()) + .withChartType(create.getChartType()).withChartUrl(create.getChartUrl()) + .withTables(create.getTables()).withTags(create.getTags()) + .withOwner(create.getOwner()); + chart = addHref(uriInfo, dao.create(chart, create.getService(), create.getOwner())); + return Response.created(chart.getHref()).entity(chart).build(); + } + + @PATCH + @Path("/{id}") + @Operation(summary = "Update a chart", tags = "charts", + description = "Update an existing chart using JsonPatch.", + externalDocs = @ExternalDocumentation(description = "JsonPatch RFC", + url = "https://tools.ietf.org/html/rfc6902")) + @Consumes(MediaType.APPLICATION_JSON_PATCH_JSON) + public Chart updateDescription(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @PathParam("id") String id, + @RequestBody(description = "JsonPatch with array of operations", + content = @Content(mediaType = MediaType.APPLICATION_JSON_PATCH_JSON, + examples = {@ExampleObject("[" + + "{op:remove, path:/a}," + + "{op:add, path: /b, value: val}" + + "]")})) + JsonPatch patch) throws IOException { + Fields fields = new Fields(FIELD_LIST, FIELDS); + Chart chart = dao.get(id, fields); + SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext, + EntityUtil.getEntityReference(chart)); + chart = dao.patch(id, patch); + return addHref(uriInfo, chart); + } + + @PUT + @Operation(summary = "Create or update chart", tags = "charts", + description = "Create a chart, it it does not exist or update an existing chart.", + responses = { + @ApiResponse(responseCode = "200", description = "The updated chart ", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = Chart.class))) + }) + public Response createOrUpdate(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Valid CreateChart create) throws IOException { + + Chart chart = + new Chart().withId(UUID.randomUUID()).withName(create.getName()).withDisplayName(create.getDisplayName()) + .withDescription(create.getDescription()) + .withService(create.getService()) + .withChartType(create.getChartType()).withChartUrl(create.getChartUrl()) + .withTables(create.getTables()).withTags(create.getTags()) + .withOwner(create.getOwner()); + PutResponse response = dao.createOrUpdate(chart, create.getService(), create.getOwner()); + chart = addHref(uriInfo, response.getEntity()); + return Response.status(response.getStatus()).entity(chart).build(); + } + + @PUT + @Path("/{id}/followers") + @Operation(summary = "Add a follower", tags = "charts", + description = "Add a user identified by `userId` as followed of this chart", + responses = { + @ApiResponse(responseCode = "200", description = "OK"), + @ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found") + }) + public Response addFollower(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Id of the chart", schema = @Schema(type = "string")) + @PathParam("id") String id, + @Parameter(description = "Id of the user to be added as follower", + schema = @Schema(type = "string")) + String userId) throws IOException, ParseException { + Fields fields = new Fields(FIELD_LIST, "followers"); + Response.Status status = dao.addFollower(id, userId); + Chart chart = dao.get(id, fields); + return Response.status(status).entity(chart).build(); + } + + @DELETE + @Path("/{id}/followers/{userId}") + @Operation(summary = "Remove a follower", tags = "charts", + description = "Remove the user identified `userId` as a follower of the chart.") + public Chart deleteFollower(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Id of the chart", + schema = @Schema(type = "string")) + @PathParam("id") String id, + @Parameter(description = "Id of the user being removed as follower", + schema = @Schema(type = "string")) + @PathParam("userId") String userId) throws IOException, ParseException { + Fields fields = new Fields(FIELD_LIST, "followers"); + dao.deleteFollower(id, userId); + Chart chart = dao.get(id, fields); + return addHref(uriInfo, chart); + } + + + @DELETE + @Path("/{id}") + @Operation(summary = "Delete a Chart", tags = "charts", + description = "Delete a chart by `id`.", + responses = { + @ApiResponse(responseCode = "200", description = "OK"), + @ApiResponse(responseCode = "404", description = "Chart for instance {id} is not found") + }) + public Response delete(@Context UriInfo uriInfo, @PathParam("id") String id) { + dao.delete(id); + return Response.ok().build(); + } +} diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createTask.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createTask.json new file mode 100644 index 00000000000..49b7843d892 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createTask.json @@ -0,0 +1,52 @@ +{ + "$id": "https://open-metadata.org/schema/api/data/createChart.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Create Chart entity request", + "description": "Create Chart entity request", + "type": "object", + "properties" : { + "name": { + "description": "Name that identifies this dashboard.", + "type": "string", + "minLength": 1, + "maxLength": 64 + }, + "displayName": { + "description": "Display Name that identifies this Chart. It could be title or label from the source services", + "type": "string" + }, + "description": { + "description": "Description of the database instance. What it has and how to use it.", + "type": "string" + }, + "chartType": { + "$ref": "../../entity/data/chart.json#/definitions/chartType" + }, + "chartUrl" : { + "description": "Chart URL, pointing to its own Service URL", + "type": "string", + "format": "uri" + }, + "tables": { + "description": "Link to tables used in this chart.", + "$ref": "../../type/entityReference.json#/definitions/entityReferenceList" + }, + "tags": { + "description": "Tags for this chart", + "type": "array", + "items": { + "$ref": "../../type/tagLabel.json" + }, + "default": null + }, + "owner": { + "description": "Owner of this database", + "$ref": "../../type/entityReference.json" + }, + "service" : { + "description": "Link to the database service where this database is hosted in", + "$ref" : "../../type/entityReference.json" + } + }, + "required": ["name", "service"] +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/task.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/task.json new file mode 100644 index 00000000000..e69de29bb2d diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json new file mode 100644 index 00000000000..c606b0a40c7 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json @@ -0,0 +1,76 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/messagingService.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Messaging Service", + "description": "This schema defines the Messaging Service entity, such as Kafka and Pulsar.", + "type": "object", + "definitions": { + "messagingServiceType": { + "description": "Type of messaging service - Kafka or Pulsar.", + "type": "string", + "enum": [ + "Kafka", + "Pulsar" + ], + "javaEnums": [ + { + "name": "Kafka" + }, + { + "name": "Pulsar" + } + ] + }, + "brokers": { + "description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.", + "type": "array", + "items": { + "type": "string" + }, + "default": null + } + }, + "properties": { + "id": { + "description": "Unique identifier of this messaging service instance.", + "$ref": "../../type/basic.json#/definitions/uuid" + }, + "name": { + "description": "Name that identifies this messaging service.", + "type": "string", + "minLength": 1, + "maxLength": 64 + }, + "serviceType": { + "description": "Type of messaging service such as Kafka or Pulsar...", + "$ref": "#/definitions/messagingServiceType" + }, + "description": { + "description": "Description of a messaging service instance.", + "type": "string" + }, + "brokers": { + "description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.", + "$ref" : "#/definitions/brokers" + }, + "schemaRegistry" : { + "description": "Schema registry URL.", + "type": "string", + "format": "uri" + }, + "ingestionSchedule": { + "description": "Schedule for running metadata ingestion jobs.", + "$ref": "../../type/schedule.json" + }, + "href": { + "description": "Link to the resource corresponding to this messaging service.", + "$ref": "../../type/basic.json#/definitions/href" + } + }, + "required": [ + "id", + "name", + "serviceType", + "brokers" + ] +}