From 701b611ae8b8b24852ffedff95ac40e9aa0c9be6 Mon Sep 17 00:00:00 2001 From: sureshms Date: Tue, 17 Aug 2021 14:59:37 -0700 Subject: [PATCH] Fix #218 Add support for messaging service --- .../mysql/v001__create_db_connection_info.sql | 10 + .../java/org/openmetadata/catalog/Entity.java | 1 + .../jdbi3/MessagingServiceRepository.java | 136 +++++++ .../messaging/MessagingServiceResource.java | 200 ++++++++++ .../api/services/createMessagingService.json | 28 ++ .../api/services/updateDatabaseService.json | 4 +- .../api/services/updateMessagingService.json | 18 + .../entity/services/messagingService.json | 59 +++ .../services/DatabaseServiceResourceTest.java | 6 +- .../MessagingServiceResourceTest.java | 352 ++++++++++++++++++ 10 files changed, 809 insertions(+), 5 deletions(-) create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/MessagingServiceRepository.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java create mode 100644 catalog-rest-service/src/main/resources/json/schema/api/services/createMessagingService.json create mode 100644 catalog-rest-service/src/main/resources/json/schema/api/services/updateMessagingService.json create mode 100644 catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json create mode 100644 catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/MessagingServiceResourceTest.java diff --git a/bootstrap/sql/mysql/v001__create_db_connection_info.sql b/bootstrap/sql/mysql/v001__create_db_connection_info.sql index 4e76dd348a3..902b03797bf 100644 --- a/bootstrap/sql/mysql/v001__create_db_connection_info.sql +++ b/bootstrap/sql/mysql/v001__create_db_connection_info.sql @@ -61,6 +61,16 @@ CREATE TABLE IF NOT EXISTS dbservice_entity ( UNIQUE KEY unique_name(name) ); +CREATE TABLE IF NOT EXISTS messaging_service_entity ( + id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL, + name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL, + serviceType VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.serviceType') NOT NULL, + json JSON NOT NULL, + timestamp BIGINT, + PRIMARY KEY (id), + UNIQUE KEY unique_name(name) +); + -- -- Data entities -- diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java index 8dab3dd77b3..9a0b61f65a5 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java @@ -21,6 +21,7 @@ public final class Entity { // Services public static final String DATABASE_SERVICE = "databaseService"; + public static final String MESSAGING_SERVICE = "messagingService"; // Data assets public static final String TABLE = "table"; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/MessagingServiceRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/MessagingServiceRepository.java new file mode 100644 index 00000000000..32b21d1a9c8 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/MessagingServiceRepository.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.joda.time.Period; +import org.joda.time.format.ISOPeriodFormat; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.entity.services.MessagingService; +import org.openmetadata.catalog.exception.EntityNotFoundException; +import org.openmetadata.catalog.type.Schedule; +import org.openmetadata.catalog.util.EntityUtil; +import org.openmetadata.catalog.util.JsonUtils; +import org.skife.jdbi.v2.sqlobject.Bind; +import org.skife.jdbi.v2.sqlobject.CreateSqlObject; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; +import org.skife.jdbi.v2.sqlobject.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; + + +public abstract class MessagingServiceRepository { + private static final Logger LOG = LoggerFactory.getLogger(MessagingServiceRepository.class); + + @CreateSqlObject + abstract MessagingServiceDAO messagingServiceDOA(); + + @CreateSqlObject + abstract EntityRelationshipDAO relationshipDAO(); + + @Transaction + public List list(String name) throws IOException { + return JsonUtils.readObjects(messagingServiceDOA().list(name), MessagingService.class); + } + + @Transaction + public MessagingService get(String id) throws IOException { + return EntityUtil.validate(id, messagingServiceDOA().findById(id), MessagingService.class); + } + + @Transaction + public MessagingService getByName(String name) throws IOException { + return EntityUtil.validate(name, messagingServiceDOA().findByName(name), MessagingService.class); + } + + @Transaction + public MessagingService create(MessagingService messagingService) throws JsonProcessingException { + // Validate fields + validateIngestionSchedule(messagingService.getIngestionSchedule()); + messagingServiceDOA().insert(JsonUtils.pojoToJson(messagingService)); + return messagingService; + } + + public MessagingService update(String id, String description, Schedule ingestionSchedule) + throws IOException { + validateIngestionSchedule(ingestionSchedule); + MessagingService dbService = EntityUtil.validate(id, messagingServiceDOA().findById(id), MessagingService.class); + // Update fields + dbService.withDescription(description).withIngestionSchedule(ingestionSchedule); + messagingServiceDOA().update(id, JsonUtils.pojoToJson(dbService)); + return dbService; + } + + @Transaction + public void delete(String id) { + if (messagingServiceDOA().delete(id) <= 0) { + throw EntityNotFoundException.byMessage(entityNotFound(Entity.MESSAGING_SERVICE, id)); + } + relationshipDAO().deleteAll(id); + } + + private void validateIngestionSchedule(Schedule ingestion) { + if (ingestion == null) { + return; + } + String duration = ingestion.getRepeatFrequency(); + + // ISO8601 duration format is P{y}Y{m}M{d}DT{h}H{m}M{s}S. + String[] splits = duration.split("T"); + if (splits[0].contains("Y") || splits[0].contains("M") || + (splits.length == 2 && splits[1].contains("S"))) { + throw new IllegalArgumentException("Ingestion repeatFrequency can only contain Days, Hours, and Minutes - " + + "example P{d}DT{h}H{m}M"); + } + + Period period; + try { + period = ISOPeriodFormat.standard().parsePeriod(duration); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid ingestion repeatFrequency " + duration, e); + } + if (period.toStandardMinutes().getMinutes() < 60) { + throw new IllegalArgumentException("Ingestion repeatFrequency is too short and must be more than 60 minutes"); + } + } + + public interface MessagingServiceDAO { + @SqlUpdate("INSERT INTO messaging_service_entity (json) VALUES (:json)") + void insert(@Bind("json") String json); + + @SqlUpdate("UPDATE messaging_service_entity SET json = :json where id = :id") + void update(@Bind("id") String id, @Bind("json") String json); + + @SqlQuery("SELECT json FROM messaging_service_entity WHERE id = :id") + String findById(@Bind("id") String id); + + @SqlQuery("SELECT json FROM messaging_service_entity WHERE name = :name") + String findByName(@Bind("name") String name); + + @SqlQuery("SELECT json FROM messaging_service_entity WHERE (name = :name OR :name is NULL)") + List list(@Bind("name") String name); + + @SqlUpdate("DELETE FROM messaging_service_entity WHERE id = :id") + int delete(@Bind("id") String id); + } +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java new file mode 100644 index 00000000000..4e64ebdc484 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.resources.services.messaging; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.inject.Inject; +import io.swagger.annotations.Api; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import org.openmetadata.catalog.api.services.CreateMessagingService; +import org.openmetadata.catalog.api.services.UpdateMessagingService; +import org.openmetadata.catalog.entity.data.Dashboard; +import org.openmetadata.catalog.entity.services.MessagingService; +import org.openmetadata.catalog.jdbi3.MessagingServiceRepository; +import org.openmetadata.catalog.resources.Collection; +import org.openmetadata.catalog.security.CatalogAuthorizer; +import org.openmetadata.catalog.security.SecurityUtil; +import org.openmetadata.catalog.type.EntityReference; +import org.openmetadata.catalog.util.RestUtil; +import org.openmetadata.catalog.util.ResultList; + +import javax.validation.Valid; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; +import javax.ws.rs.core.UriInfo; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.UUID; + +@Path("/v1/services/messagingServices") +@Api(value = "Messaging service collection", tags = "Services -> Messaging service collection") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Collection(name = "messagingServices", repositoryClass = "org.openmetadata.catalog.jdbi3.MessagingServiceRepository") +public class MessagingServiceResource { + private final MessagingServiceRepository dao; + private final CatalogAuthorizer authorizer; + + public static EntityReference addHref(UriInfo uriInfo, EntityReference service) { + return service.withHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", service.getId())); + } + + private static List addHref(UriInfo uriInfo, List instances) { + instances.forEach(i -> addHref(uriInfo, i)); + return instances; + } + + private static MessagingService addHref(UriInfo uriInfo, MessagingService dbService) { + dbService.setHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", dbService.getId())); + return dbService; + } + + @Inject + public MessagingServiceResource(MessagingServiceRepository dao, CatalogAuthorizer authorizer) { + Objects.requireNonNull(dao, "MessagingServiceRepository must not be null"); + this.dao = dao; + this.authorizer = authorizer; + } + + static class MessagingServiceList extends ResultList { + MessagingServiceList(List data) { + super(data); + } + } + + @GET + @Operation(summary = "List messaging services", tags = "services", + description = "Get a list of messaging services.", + responses = { + @ApiResponse(responseCode = "200", description = "List of messaging service instances", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = MessagingServiceList.class))) + }) + public MessagingServiceList list(@Context UriInfo uriInfo, @QueryParam("name") String name) throws IOException { + return new MessagingServiceList(addHref(uriInfo, dao.list(name))); + } + + @GET + @Path("/{id}") + @Operation(summary = "Get a messaging service", tags = "services", + description = "Get a messaging service by `id`.", + responses = { + @ApiResponse(responseCode = "200", description = "Messaging service instance", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = Dashboard.class))), + @ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found") + }) + public MessagingService get(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @PathParam("id") String id) throws IOException { + return addHref(uriInfo, dao.get(id)); + } + + @GET + @Path("/name/{name}") + @Operation(summary = "Get messaging service by name", tags = "services", + description = "Get a messaging service by the service `name`.", + responses = { + @ApiResponse(responseCode = "200", description = "Messaging service instance", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = Dashboard.class))), + @ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found") + }) + public MessagingService getByName(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @PathParam("name") String name) throws IOException { + return addHref(uriInfo, dao.getByName(name)); + } + + @POST + @Operation(summary = "Create a messaging service", tags = "services", + description = "Create a new messaging service.", + responses = { + @ApiResponse(responseCode = "200", description = "Messaging service instance", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = MessagingService.class))), + @ApiResponse(responseCode = "400", description = "Bad request") + }) + public Response create(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Valid CreateMessagingService create) throws JsonProcessingException { + SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); + MessagingService service = new MessagingService().withId(UUID.randomUUID()) + .withName(create.getName()).withDescription(create.getDescription()) + .withServiceType(create.getServiceType()) + .withIngestionSchedule(create.getIngestionSchedule()); + + addHref(uriInfo, dao.create(service)); + return Response.created(service.getHref()).entity(service).build(); + } + + @PUT + @Path("/{id}") + @Operation(summary = "Update a messaging service", tags = "services", + description = "Update an existing messaging service identified by `id`.", + responses = { + @ApiResponse(responseCode = "200", description = "Messaging service instance", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = MessagingService.class))), + @ApiResponse(responseCode = "400", description = "Bad request") + }) + public Response update(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Id of the messaging service", schema = @Schema(type = "string")) + @PathParam("id") String id, + @Valid UpdateMessagingService update) throws IOException { + SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); + MessagingService service = addHref(uriInfo, + dao.update(id, update.getDescription(), update.getIngestionSchedule())); + return Response.ok(service).build(); + } + + @DELETE + @Path("/{id}") + @Operation(summary = "Delete a database service", tags = "services", + description = "Delete a database services. If databases (and tables) belong the service, it can't be " + + "deleted.", + responses = { + @ApiResponse(responseCode = "200", description = "OK"), + @ApiResponse(responseCode = "404", description = "DatabaseService service for instance {id} " + + "is not found") + }) + public Response delete(@Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Id of the database service", schema = @Schema(type = "string")) + @PathParam("id") String id) { + SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); + dao.delete(id); + return Response.ok().build(); + } +} diff --git a/catalog-rest-service/src/main/resources/json/schema/api/services/createMessagingService.json b/catalog-rest-service/src/main/resources/json/schema/api/services/createMessagingService.json new file mode 100644 index 00000000000..f7cf501a6d8 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/api/services/createMessagingService.json @@ -0,0 +1,28 @@ +{ + "$id": "https://github.com/open-metadata/OpenMetadata/blob/main/catalog-rest-service/src/main/resources/json/schema/api/services/createMessagingService.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Create Messaging service entity request", + "description": "Create Messaging service entity request", + "type": "object", + + "properties" : { + "name": { + "description": "Name that identifies the this entity instance uniquely", + "type": "string", + "minLength": 1, + "maxLength": 64 + }, + "description": { + "description": "Description of messaging service entity.", + "type": "string" + }, + "serviceType": { + "$ref": "../../entity/services/messagingService.json#/definitions/messagingServiceType" + }, + "ingestionSchedule" : { + "description": "Schedule for running metadata ingestion jobs", + "$ref" : "../../type/schedule.json" + } + }, + "required": ["name", "serviceType"] +} diff --git a/catalog-rest-service/src/main/resources/json/schema/api/services/updateDatabaseService.json b/catalog-rest-service/src/main/resources/json/schema/api/services/updateDatabaseService.json index 5fcecf8a167..5fd5a44a3c5 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/services/updateDatabaseService.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/services/updateDatabaseService.json @@ -2,12 +2,12 @@ "$id": "https://github.com/open-metadata/OpenMetadata/blob/main/catalog-rest-service/src/main/resources/json/schema/api/services/updateDatabaseService.json", "$schema": "http://json-schema.org/draft-07/schema#", "title": "Update Database service entity request", - "description": "Udpate Database service entity request", + "description": "Update Database service entity request", "type": "object", "properties" : { "description": { - "description": "Description of Database entity.", + "description": "Description of Database service entity.", "type": "string" }, "jdbc": { diff --git a/catalog-rest-service/src/main/resources/json/schema/api/services/updateMessagingService.json b/catalog-rest-service/src/main/resources/json/schema/api/services/updateMessagingService.json new file mode 100644 index 00000000000..db32e308a27 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/api/services/updateMessagingService.json @@ -0,0 +1,18 @@ +{ + "$id": "https://github.com/open-metadata/OpenMetadata/blob/main/catalog-rest-service/src/main/resources/json/schema/api/services/updateMessagingService.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Update Messaging service entity request", + "description": "Update Messaging service entity request", + "type": "object", + + "properties" : { + "description": { + "description": "Description of Messaging service entity.", + "type": "string" + }, + "ingestionSchedule" : { + "description": "Schedule for running metadata ingestion jobs", + "$ref" : "../../type/schedule.json" + } + } +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json new file mode 100644 index 00000000000..5fe68805a95 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json @@ -0,0 +1,59 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/messagingService.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Messaging Service", + "description": "This schema defines the Messaging Service entity, such as Kafka and Pulsar.", + "type": "object", + "definitions": { + "messagingServiceType": { + "description": "Type of messaging service - Kafka or Pulsar", + "type": "string", + "enum": [ + "Kafka", + "Pulsar" + ], + "javaEnums": [ + { + "name": "Kafka" + }, + { + "name": "Pulsar" + } + ] + } + }, + "properties": { + "id": { + "description": "Unique identifier of this messaging service instance.", + "$ref": "../../type/basic.json#/definitions/uuid" + }, + "name": { + "description": "Name that identifies this messaging service.", + "type": "string", + "minLength": 1, + "maxLength": 64 + }, + "serviceType": { + "description": "Type of messaging service such as Kafka or Pulsar...", + "$ref": "#/definitions/messagingServiceType" + }, + "description": { + "description": "Description of a messaging service instance.", + "type": "string" + }, + "href": { + "description": "Link to the resource corresponding to this messaging service.", + "$ref": "../../type/basic.json#/definitions/href" + }, + "ingestionSchedule": { + "description": "Schedule for running metadata ingestion jobs.", + "$ref": "../../type/schedule.json" + } + }, + "required": [ + "id", + "name", + "serviceType", + "href" + ] +} diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java index 501c9312c85..d2ec1604b1e 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java @@ -215,7 +215,7 @@ public class DatabaseServiceResourceTest extends CatalogApplicationTest { } @Test - public void get_nonExistentTeam_404_notFound() { + public void get_nonExistentDatabaseService_404_notFound() { HttpResponseException exception = assertThrows(HttpResponseException.class, () -> getService(TestUtils.NON_EXISTENT_ENTITY, adminAuthHeaders())); TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.DATABASE_SERVICE, @@ -223,7 +223,7 @@ public class DatabaseServiceResourceTest extends CatalogApplicationTest { } @Test - public void get_nonExistentTeamByName_404_notFound() { + public void get_nonExistentDatabaseServiceByName_404_notFound() { HttpResponseException exception = assertThrows(HttpResponseException.class, () -> getServiceByName("invalidName", null, adminAuthHeaders())); TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.DATABASE_SERVICE, @@ -297,7 +297,7 @@ public class DatabaseServiceResourceTest extends CatalogApplicationTest { } @Test - public void delete_ExistentDatabaseService_ad_admin_200(TestInfo test) throws HttpResponseException { + public void delete_ExistentDatabaseService_as_admin_200(TestInfo test) throws HttpResponseException { Map authHeaders = adminAuthHeaders(); DatabaseService databaseService = createService(create(test), authHeaders); deleteService(databaseService.getId(), databaseService.getName(), authHeaders); diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/MessagingServiceResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/MessagingServiceResourceTest.java new file mode 100644 index 00000000000..cb5cf28f68e --- /dev/null +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/MessagingServiceResourceTest.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.resources.services; + +import org.apache.http.client.HttpResponseException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.openmetadata.catalog.CatalogApplicationTest; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.api.services.CreateMessagingService; +import org.openmetadata.catalog.api.services.CreateMessagingService.MessagingServiceType; +import org.openmetadata.catalog.api.services.UpdateMessagingService; +import org.openmetadata.catalog.entity.services.MessagingService; +import org.openmetadata.catalog.exception.CatalogExceptionMessage; +import org.openmetadata.catalog.type.Schedule; +import org.openmetadata.catalog.util.RestUtil; +import org.openmetadata.catalog.util.TestUtils; + +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response.Status; +import java.util.Date; +import java.util.Map; +import java.util.UUID; + +import static javax.ws.rs.core.Response.Status.BAD_REQUEST; +import static javax.ws.rs.core.Response.Status.CONFLICT; +import static javax.ws.rs.core.Response.Status.FORBIDDEN; +import static javax.ws.rs.core.Response.Status.NOT_FOUND; +import static javax.ws.rs.core.Response.Status.OK; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders; +import static org.openmetadata.catalog.util.TestUtils.authHeaders; + +public class MessagingServiceResourceTest extends CatalogApplicationTest { + @Test + public void post_serviceWithLongName_400_badRequest(TestInfo test) { + // Create messaging with mandatory name field empty + CreateMessagingService create = create(test).withName(TestUtils.LONG_ENTITY_NAME); + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]"); + } + + @Test + public void post_serviceWithoutName_400_badRequest(TestInfo test) { + // Create messaging with mandatory name field empty + CreateMessagingService create = create(test).withName(""); + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]"); + } + + @Test + public void post_serviceAlreadyExists_409(TestInfo test) throws HttpResponseException { + CreateMessagingService create = create(test); + createService(create, adminAuthHeaders()); + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, CONFLICT, CatalogExceptionMessage.ENTITY_ALREADY_EXISTS); + } + + @Test + public void post_validService_as_admin_200_ok(TestInfo test) throws HttpResponseException { + // Create messaging service with different optional fields + Map authHeaders = adminAuthHeaders(); + createAndCheckService(create(test, 1).withDescription(null), authHeaders); + createAndCheckService(create(test, 2).withDescription("description"), authHeaders); + createAndCheckService(create(test, 3).withIngestionSchedule(null), authHeaders); + } + + @Test + public void post_validService_as_non_admin_401(TestInfo test) { + // Create messaging service with different optional fields + Map authHeaders = authHeaders("test@open-metadata.org"); + + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createAndCheckService(create(test, 1).withDescription(null), authHeaders)); + TestUtils.assertResponse(exception, FORBIDDEN, + "Principal: CatalogPrincipal{name='test'} is not admin"); + } + + @Test + public void post_invalidIngestionSchedule_4xx(TestInfo test) { + // No jdbc connection set + CreateMessagingService create = create(test); + Schedule schedule = create.getIngestionSchedule(); + + // Invalid format + create.withIngestionSchedule(schedule.withRepeatFrequency("INVALID")); + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "Invalid ingestion repeatFrequency INVALID"); + + // Duration that contains years, months and seconds are not allowed + create.withIngestionSchedule(schedule.withRepeatFrequency("P1Y")); + exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, + "Ingestion repeatFrequency can only contain Days, Hours, " + + "and Minutes - example P{d}DT{h}H{m}M"); + + create.withIngestionSchedule(schedule.withRepeatFrequency("P1M")); + exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, + "Ingestion repeatFrequency can only contain Days, Hours, " + + "and Minutes - example P{d}DT{h}H{m}M"); + + create.withIngestionSchedule(schedule.withRepeatFrequency("PT1S")); + exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, + "Ingestion repeatFrequency can only contain Days, Hours, " + + "and Minutes - example P{d}DT{h}H{m}M"); + } + + @Test + public void post_validIngestionSchedules_as_admin_200(TestInfo test) throws HttpResponseException { + Schedule schedule = new Schedule().withStartDate(RestUtil.DATE_TIME_FORMAT.format(new Date())); + schedule.withRepeatFrequency("PT60M"); // Repeat every 60M should be valid + createAndCheckService(create(test, 1).withIngestionSchedule(schedule), adminAuthHeaders()); + + schedule.withRepeatFrequency("PT1H49M"); + createAndCheckService(create(test, 2).withIngestionSchedule(schedule), adminAuthHeaders()); + + schedule.withRepeatFrequency("P1DT1H49M"); + createAndCheckService(create(test, 3).withIngestionSchedule(schedule), adminAuthHeaders()); + } + + @Test + public void post_ingestionScheduleIsTooShort_4xx(TestInfo test) { + // No jdbc connection set + CreateMessagingService create = create(test); + Schedule schedule = create.getIngestionSchedule(); + create.withIngestionSchedule(schedule.withRepeatFrequency("PT1M")); // Repeat every 0 seconds + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createService(create, adminAuthHeaders())); + TestUtils.assertResponseContains(exception, BAD_REQUEST, + "Ingestion repeatFrequency is too short and must be more than 60 minutes"); + + create.withIngestionSchedule(schedule.withRepeatFrequency("PT59M")); // Repeat every 50 minutes 59 seconds + exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "Ingestion repeatFrequency is too short and must " + + "be more than 60 minutes"); + } + + @Test + public void put_updateNonExistentService_404() { + // Update messaging description and ingestion service that are null + UpdateMessagingService update = new UpdateMessagingService().withDescription("description1"); + HttpResponseException exception = assertThrows(HttpResponseException.class, () + -> updateMessagingService(TestUtils.NON_EXISTENT_ENTITY.toString(), update, OK, adminAuthHeaders())); + TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound("MessagingService", + TestUtils.NON_EXISTENT_ENTITY)); + } + + @Test + public void put_updateService_as_admin_2xx(TestInfo test) throws HttpResponseException { + MessagingService dbService = createAndCheckService(create(test).withDescription(null).withIngestionSchedule(null), + adminAuthHeaders()); + String id = dbService.getId().toString(); + String startDate = RestUtil.DATE_TIME_FORMAT.format(new Date()); + + // Update messaging description and ingestion service that are null + UpdateMessagingService update = new UpdateMessagingService().withDescription("description1"); + updateAndCheckService(id, update, OK, adminAuthHeaders()); + // Update ingestion schedule + Schedule schedule = new Schedule().withStartDate(startDate).withRepeatFrequency("P1D"); + update.withIngestionSchedule(schedule); + updateAndCheckService(id, update, OK, adminAuthHeaders()); + + // Update description and ingestion schedule again + update.withDescription("description1").withIngestionSchedule(schedule.withRepeatFrequency("PT1H")); + updateAndCheckService(id, update, OK, adminAuthHeaders()); + } + + @Test + public void put_update_as_non_admin_401(TestInfo test) throws HttpResponseException { + Map authHeaders = adminAuthHeaders(); + MessagingService dbService = createAndCheckService(create(test).withDescription(null).withIngestionSchedule(null), + authHeaders); + String id = dbService.getId().toString(); + RestUtil.DATE_TIME_FORMAT.format(new Date()); + + // Update messaging description and ingestion service that are null + UpdateMessagingService update = new UpdateMessagingService().withDescription("description1"); + + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + updateAndCheckService(id, update, OK, authHeaders("test@open-metadata.org"))); + TestUtils.assertResponse(exception, FORBIDDEN, "Principal: CatalogPrincipal{name='test'} " + + "is not admin"); + } + + @Test + public void get_nonExistentService_404_notFound() { + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + getService(TestUtils.NON_EXISTENT_ENTITY, adminAuthHeaders())); + TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.MESSAGING_SERVICE, + TestUtils.NON_EXISTENT_ENTITY)); + } + + @Test + public void get_nonExistentServiceByName_404_notFound() { + HttpResponseException exception = assertThrows(HttpResponseException.class, () + -> getServiceByName("invalidName", null, adminAuthHeaders())); + TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.MESSAGING_SERVICE, + "invalidName")); + } + + public static MessagingService createAndCheckService(CreateMessagingService create, + Map authHeaders) throws HttpResponseException { + MessagingService service = createService(create, authHeaders); + validateService(service, create.getName(), create.getDescription(), create.getIngestionSchedule()); + + // GET the newly created service and validate + MessagingService getService = getService(service.getId(), authHeaders); + validateService(getService, create.getName(), create.getDescription(), create.getIngestionSchedule()); + + // GET the newly created service by name and validate + getService = getServiceByName(service.getName(), null, authHeaders); + validateService(getService, create.getName(), create.getDescription(), create.getIngestionSchedule()); + return service; + } + + public static MessagingService createService(CreateMessagingService create, + Map authHeaders) throws HttpResponseException { + return TestUtils.post(CatalogApplicationTest.getResource("services/messagingServices"), + create, MessagingService.class, authHeaders); + } + + private static void validateService(MessagingService service, String expectedName, String expectedDescription, + Schedule expectedIngestion) { + assertNotNull(service.getId()); + assertNotNull(service.getHref()); + assertEquals(expectedName, service.getName()); + assertEquals(expectedDescription, service.getDescription()); + + if (expectedIngestion != null) { + assertEquals(expectedIngestion.getStartDate(), service.getIngestionSchedule().getStartDate()); + assertEquals(expectedIngestion.getRepeatFrequency(), service.getIngestionSchedule().getRepeatFrequency()); + } + } + + public static MessagingService getService(UUID id, Map authHeaders) throws HttpResponseException { + return getService(id, null, authHeaders); + } + + public static MessagingService getService(UUID id, String fields, Map authHeaders) + throws HttpResponseException { + WebTarget target = CatalogApplicationTest.getResource("services/messagingServices/" + id); + target = fields != null ? target.queryParam("fields", fields) : target; + return TestUtils.get(target, MessagingService.class, authHeaders); + } + + public static MessagingService getServiceByName(String name, String fields, Map authHeaders) + throws HttpResponseException { + WebTarget target = CatalogApplicationTest.getResource("services/messagingServices/name/" + name); + target = fields != null ? target.queryParam("fields", fields) : target; + return TestUtils.get(target, MessagingService.class, authHeaders); + } + + public static String getName(TestInfo test) { + return String.format("dbservice_%s", test.getDisplayName()); + } + + public static String getName(TestInfo test, int index) { + return String.format("dbservice_%d_%s", index, test.getDisplayName()); + } + + @Test + public void delete_ExistentMessagingService_as_admin_200(TestInfo test) throws HttpResponseException { + Map authHeaders = adminAuthHeaders(); + MessagingService messagingService = createService(create(test), authHeaders); + deleteService(messagingService.getId(), messagingService.getName(), authHeaders); + } + + @Test + public void delete_as_user_401(TestInfo test) throws HttpResponseException { + Map authHeaders = adminAuthHeaders(); + MessagingService messagingService = createService(create(test), authHeaders); + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + deleteService(messagingService.getId(), messagingService.getName(), + authHeaders("test@open-metadata.org"))); + TestUtils.assertResponse(exception, FORBIDDEN, + "Principal: CatalogPrincipal{name='test'} is not admin"); + } + + @Test + public void delete_notExistentMessagingService() { + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + getService(TestUtils.NON_EXISTENT_ENTITY, adminAuthHeaders())); + TestUtils.assertResponse(exception, NOT_FOUND, + CatalogExceptionMessage.entityNotFound(Entity.MESSAGING_SERVICE, TestUtils.NON_EXISTENT_ENTITY)); + } + + private void deleteService(UUID id, String name, Map authHeaders) throws HttpResponseException { + TestUtils.delete(CatalogApplicationTest.getResource("services/messagingServices/" + id), authHeaders); + + // Ensure deleted service does not exist + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> getService(id, authHeaders)); + TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.MESSAGING_SERVICE, id)); + + // Ensure deleted service does not exist when getting by name + exception = assertThrows(HttpResponseException.class, () -> getServiceByName(name, null, authHeaders)); + TestUtils.assertResponse(exception, NOT_FOUND, + CatalogExceptionMessage.entityNotFound(Entity.MESSAGING_SERVICE, name)); + } + + public static CreateMessagingService create(TestInfo test) { + String startDate = RestUtil.DATE_TIME_FORMAT.format(new Date()); + return new CreateMessagingService().withName(getName(test)).withServiceType(MessagingServiceType.Kafka) + .withIngestionSchedule(new Schedule().withStartDate(startDate).withRepeatFrequency("P1D")); + } + + private static CreateMessagingService create(TestInfo test, int index) { + return new CreateMessagingService().withName(getName(test, index)).withServiceType(MessagingServiceType.Pulsar); + } + + public static void updateAndCheckService(String id, UpdateMessagingService update, Status status, + Map authHeaders) throws HttpResponseException { + MessagingService service = updateMessagingService(id, update, status, authHeaders); + validateService(service, service.getName(), update.getDescription(), update.getIngestionSchedule()); + + // GET the newly updated messaging and validate + MessagingService getService = getService(service.getId(), authHeaders); + validateService(getService, service.getName(), update.getDescription(), update.getIngestionSchedule()); + + // GET the newly updated messaging by name and validate + getService = getServiceByName(service.getName(), null, authHeaders); + validateService(getService, service.getName(), update.getDescription(), update.getIngestionSchedule()); + } + + public static MessagingService updateMessagingService(String id, UpdateMessagingService updated, + Status status, Map authHeaders) + throws HttpResponseException { + return TestUtils.put(CatalogApplicationTest.getResource("services/messagingServices/" + id), updated, + MessagingService.class, status, authHeaders); + } +}