Fix #218 Add support for messaging service

This commit is contained in:
sureshms 2021-08-17 14:59:37 -07:00
parent c84a2a7d36
commit 701b611ae8
10 changed files with 809 additions and 5 deletions

View File

@ -61,6 +61,16 @@ CREATE TABLE IF NOT EXISTS dbservice_entity (
UNIQUE KEY unique_name(name)
);
CREATE TABLE IF NOT EXISTS messaging_service_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
serviceType VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.serviceType') NOT NULL,
json JSON NOT NULL,
timestamp BIGINT,
PRIMARY KEY (id),
UNIQUE KEY unique_name(name)
);
--
-- Data entities
--

View File

@ -21,6 +21,7 @@ public final class Entity {
// Services
public static final String DATABASE_SERVICE = "databaseService";
public static final String MESSAGING_SERVICE = "messagingService";
// Data assets
public static final String TABLE = "table";

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.joda.time.Period;
import org.joda.time.format.ISOPeriodFormat;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.type.Schedule;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.JsonUtils;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
public abstract class MessagingServiceRepository {
private static final Logger LOG = LoggerFactory.getLogger(MessagingServiceRepository.class);
@CreateSqlObject
abstract MessagingServiceDAO messagingServiceDOA();
@CreateSqlObject
abstract EntityRelationshipDAO relationshipDAO();
@Transaction
public List<MessagingService> list(String name) throws IOException {
return JsonUtils.readObjects(messagingServiceDOA().list(name), MessagingService.class);
}
@Transaction
public MessagingService get(String id) throws IOException {
return EntityUtil.validate(id, messagingServiceDOA().findById(id), MessagingService.class);
}
@Transaction
public MessagingService getByName(String name) throws IOException {
return EntityUtil.validate(name, messagingServiceDOA().findByName(name), MessagingService.class);
}
@Transaction
public MessagingService create(MessagingService messagingService) throws JsonProcessingException {
// Validate fields
validateIngestionSchedule(messagingService.getIngestionSchedule());
messagingServiceDOA().insert(JsonUtils.pojoToJson(messagingService));
return messagingService;
}
public MessagingService update(String id, String description, Schedule ingestionSchedule)
throws IOException {
validateIngestionSchedule(ingestionSchedule);
MessagingService dbService = EntityUtil.validate(id, messagingServiceDOA().findById(id), MessagingService.class);
// Update fields
dbService.withDescription(description).withIngestionSchedule(ingestionSchedule);
messagingServiceDOA().update(id, JsonUtils.pojoToJson(dbService));
return dbService;
}
@Transaction
public void delete(String id) {
if (messagingServiceDOA().delete(id) <= 0) {
throw EntityNotFoundException.byMessage(entityNotFound(Entity.MESSAGING_SERVICE, id));
}
relationshipDAO().deleteAll(id);
}
private void validateIngestionSchedule(Schedule ingestion) {
if (ingestion == null) {
return;
}
String duration = ingestion.getRepeatFrequency();
// ISO8601 duration format is P{y}Y{m}M{d}DT{h}H{m}M{s}S.
String[] splits = duration.split("T");
if (splits[0].contains("Y") || splits[0].contains("M") ||
(splits.length == 2 && splits[1].contains("S"))) {
throw new IllegalArgumentException("Ingestion repeatFrequency can only contain Days, Hours, and Minutes - " +
"example P{d}DT{h}H{m}M");
}
Period period;
try {
period = ISOPeriodFormat.standard().parsePeriod(duration);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid ingestion repeatFrequency " + duration, e);
}
if (period.toStandardMinutes().getMinutes() < 60) {
throw new IllegalArgumentException("Ingestion repeatFrequency is too short and must be more than 60 minutes");
}
}
public interface MessagingServiceDAO {
@SqlUpdate("INSERT INTO messaging_service_entity (json) VALUES (:json)")
void insert(@Bind("json") String json);
@SqlUpdate("UPDATE messaging_service_entity SET json = :json where id = :id")
void update(@Bind("id") String id, @Bind("json") String json);
@SqlQuery("SELECT json FROM messaging_service_entity WHERE id = :id")
String findById(@Bind("id") String id);
@SqlQuery("SELECT json FROM messaging_service_entity WHERE name = :name")
String findByName(@Bind("name") String name);
@SqlQuery("SELECT json FROM messaging_service_entity WHERE (name = :name OR :name is NULL)")
List<String> list(@Bind("name") String name);
@SqlUpdate("DELETE FROM messaging_service_entity WHERE id = :id")
int delete(@Bind("id") String id);
}
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.resources.services.messaging;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.inject.Inject;
import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.openmetadata.catalog.api.services.CreateMessagingService;
import org.openmetadata.catalog.api.services.UpdateMessagingService;
import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.jdbi3.MessagingServiceRepository;
import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.security.CatalogAuthorizer;
import org.openmetadata.catalog.security.SecurityUtil;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.ResultList;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@Path("/v1/services/messagingServices")
@Api(value = "Messaging service collection", tags = "Services -> Messaging service collection")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "messagingServices", repositoryClass = "org.openmetadata.catalog.jdbi3.MessagingServiceRepository")
public class MessagingServiceResource {
private final MessagingServiceRepository dao;
private final CatalogAuthorizer authorizer;
public static EntityReference addHref(UriInfo uriInfo, EntityReference service) {
return service.withHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", service.getId()));
}
private static List<MessagingService> addHref(UriInfo uriInfo, List<MessagingService> instances) {
instances.forEach(i -> addHref(uriInfo, i));
return instances;
}
private static MessagingService addHref(UriInfo uriInfo, MessagingService dbService) {
dbService.setHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", dbService.getId()));
return dbService;
}
@Inject
public MessagingServiceResource(MessagingServiceRepository dao, CatalogAuthorizer authorizer) {
Objects.requireNonNull(dao, "MessagingServiceRepository must not be null");
this.dao = dao;
this.authorizer = authorizer;
}
static class MessagingServiceList extends ResultList<MessagingService> {
MessagingServiceList(List<MessagingService> data) {
super(data);
}
}
@GET
@Operation(summary = "List messaging services", tags = "services",
description = "Get a list of messaging services.",
responses = {
@ApiResponse(responseCode = "200", description = "List of messaging service instances",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = MessagingServiceList.class)))
})
public MessagingServiceList list(@Context UriInfo uriInfo, @QueryParam("name") String name) throws IOException {
return new MessagingServiceList(addHref(uriInfo, dao.list(name)));
}
@GET
@Path("/{id}")
@Operation(summary = "Get a messaging service", tags = "services",
description = "Get a messaging service by `id`.",
responses = {
@ApiResponse(responseCode = "200", description = "Messaging service instance",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = Dashboard.class))),
@ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found")
})
public MessagingService get(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@PathParam("id") String id) throws IOException {
return addHref(uriInfo, dao.get(id));
}
@GET
@Path("/name/{name}")
@Operation(summary = "Get messaging service by name", tags = "services",
description = "Get a messaging service by the service `name`.",
responses = {
@ApiResponse(responseCode = "200", description = "Messaging service instance",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = Dashboard.class))),
@ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found")
})
public MessagingService getByName(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@PathParam("name") String name) throws IOException {
return addHref(uriInfo, dao.getByName(name));
}
@POST
@Operation(summary = "Create a messaging service", tags = "services",
description = "Create a new messaging service.",
responses = {
@ApiResponse(responseCode = "200", description = "Messaging service instance",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = MessagingService.class))),
@ApiResponse(responseCode = "400", description = "Bad request")
})
public Response create(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Valid CreateMessagingService create) throws JsonProcessingException {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
MessagingService service = new MessagingService().withId(UUID.randomUUID())
.withName(create.getName()).withDescription(create.getDescription())
.withServiceType(create.getServiceType())
.withIngestionSchedule(create.getIngestionSchedule());
addHref(uriInfo, dao.create(service));
return Response.created(service.getHref()).entity(service).build();
}
@PUT
@Path("/{id}")
@Operation(summary = "Update a messaging service", tags = "services",
description = "Update an existing messaging service identified by `id`.",
responses = {
@ApiResponse(responseCode = "200", description = "Messaging service instance",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = MessagingService.class))),
@ApiResponse(responseCode = "400", description = "Bad request")
})
public Response update(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the messaging service", schema = @Schema(type = "string"))
@PathParam("id") String id,
@Valid UpdateMessagingService update) throws IOException {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
MessagingService service = addHref(uriInfo,
dao.update(id, update.getDescription(), update.getIngestionSchedule()));
return Response.ok(service).build();
}
@DELETE
@Path("/{id}")
@Operation(summary = "Delete a database service", tags = "services",
description = "Delete a database services. If databases (and tables) belong the service, it can't be " +
"deleted.",
responses = {
@ApiResponse(responseCode = "200", description = "OK"),
@ApiResponse(responseCode = "404", description = "DatabaseService service for instance {id} " +
"is not found")
})
public Response delete(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the database service", schema = @Schema(type = "string"))
@PathParam("id") String id) {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
dao.delete(id);
return Response.ok().build();
}
}

View File

@ -0,0 +1,28 @@
{
"$id": "https://github.com/open-metadata/OpenMetadata/blob/main/catalog-rest-service/src/main/resources/json/schema/api/services/createMessagingService.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Create Messaging service entity request",
"description": "Create Messaging service entity request",
"type": "object",
"properties" : {
"name": {
"description": "Name that identifies the this entity instance uniquely",
"type": "string",
"minLength": 1,
"maxLength": 64
},
"description": {
"description": "Description of messaging service entity.",
"type": "string"
},
"serviceType": {
"$ref": "../../entity/services/messagingService.json#/definitions/messagingServiceType"
},
"ingestionSchedule" : {
"description": "Schedule for running metadata ingestion jobs",
"$ref" : "../../type/schedule.json"
}
},
"required": ["name", "serviceType"]
}

View File

@ -2,12 +2,12 @@
"$id": "https://github.com/open-metadata/OpenMetadata/blob/main/catalog-rest-service/src/main/resources/json/schema/api/services/updateDatabaseService.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Update Database service entity request",
"description": "Udpate Database service entity request",
"description": "Update Database service entity request",
"type": "object",
"properties" : {
"description": {
"description": "Description of Database entity.",
"description": "Description of Database service entity.",
"type": "string"
},
"jdbc": {

View File

@ -0,0 +1,18 @@
{
"$id": "https://github.com/open-metadata/OpenMetadata/blob/main/catalog-rest-service/src/main/resources/json/schema/api/services/updateMessagingService.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Update Messaging service entity request",
"description": "Update Messaging service entity request",
"type": "object",
"properties" : {
"description": {
"description": "Description of Messaging service entity.",
"type": "string"
},
"ingestionSchedule" : {
"description": "Schedule for running metadata ingestion jobs",
"$ref" : "../../type/schedule.json"
}
}
}

View File

@ -0,0 +1,59 @@
{
"$id": "https://open-metadata.org/schema/entity/services/messagingService.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Messaging Service",
"description": "This schema defines the Messaging Service entity, such as Kafka and Pulsar.",
"type": "object",
"definitions": {
"messagingServiceType": {
"description": "Type of messaging service - Kafka or Pulsar",
"type": "string",
"enum": [
"Kafka",
"Pulsar"
],
"javaEnums": [
{
"name": "Kafka"
},
{
"name": "Pulsar"
}
]
}
},
"properties": {
"id": {
"description": "Unique identifier of this messaging service instance.",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"name": {
"description": "Name that identifies this messaging service.",
"type": "string",
"minLength": 1,
"maxLength": 64
},
"serviceType": {
"description": "Type of messaging service such as Kafka or Pulsar...",
"$ref": "#/definitions/messagingServiceType"
},
"description": {
"description": "Description of a messaging service instance.",
"type": "string"
},
"href": {
"description": "Link to the resource corresponding to this messaging service.",
"$ref": "../../type/basic.json#/definitions/href"
},
"ingestionSchedule": {
"description": "Schedule for running metadata ingestion jobs.",
"$ref": "../../type/schedule.json"
}
},
"required": [
"id",
"name",
"serviceType",
"href"
]
}

View File

@ -215,7 +215,7 @@ public class DatabaseServiceResourceTest extends CatalogApplicationTest {
}
@Test
public void get_nonExistentTeam_404_notFound() {
public void get_nonExistentDatabaseService_404_notFound() {
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
getService(TestUtils.NON_EXISTENT_ENTITY, adminAuthHeaders()));
TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.DATABASE_SERVICE,
@ -223,7 +223,7 @@ public class DatabaseServiceResourceTest extends CatalogApplicationTest {
}
@Test
public void get_nonExistentTeamByName_404_notFound() {
public void get_nonExistentDatabaseServiceByName_404_notFound() {
HttpResponseException exception = assertThrows(HttpResponseException.class, ()
-> getServiceByName("invalidName", null, adminAuthHeaders()));
TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.DATABASE_SERVICE,
@ -297,7 +297,7 @@ public class DatabaseServiceResourceTest extends CatalogApplicationTest {
}
@Test
public void delete_ExistentDatabaseService_ad_admin_200(TestInfo test) throws HttpResponseException {
public void delete_ExistentDatabaseService_as_admin_200(TestInfo test) throws HttpResponseException {
Map<String, String> authHeaders = adminAuthHeaders();
DatabaseService databaseService = createService(create(test), authHeaders);
deleteService(databaseService.getId(), databaseService.getName(), authHeaders);

View File

@ -0,0 +1,352 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.resources.services;
import org.apache.http.client.HttpResponseException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.openmetadata.catalog.CatalogApplicationTest;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.services.CreateMessagingService;
import org.openmetadata.catalog.api.services.CreateMessagingService.MessagingServiceType;
import org.openmetadata.catalog.api.services.UpdateMessagingService;
import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.type.Schedule;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.TestUtils;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.CONFLICT;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
import static org.openmetadata.catalog.util.TestUtils.authHeaders;
public class MessagingServiceResourceTest extends CatalogApplicationTest {
@Test
public void post_serviceWithLongName_400_badRequest(TestInfo test) {
// Create messaging with mandatory name field empty
CreateMessagingService create = create(test).withName(TestUtils.LONG_ENTITY_NAME);
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]");
}
@Test
public void post_serviceWithoutName_400_badRequest(TestInfo test) {
// Create messaging with mandatory name field empty
CreateMessagingService create = create(test).withName("");
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]");
}
@Test
public void post_serviceAlreadyExists_409(TestInfo test) throws HttpResponseException {
CreateMessagingService create = create(test);
createService(create, adminAuthHeaders());
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, CONFLICT, CatalogExceptionMessage.ENTITY_ALREADY_EXISTS);
}
@Test
public void post_validService_as_admin_200_ok(TestInfo test) throws HttpResponseException {
// Create messaging service with different optional fields
Map<String, String> authHeaders = adminAuthHeaders();
createAndCheckService(create(test, 1).withDescription(null), authHeaders);
createAndCheckService(create(test, 2).withDescription("description"), authHeaders);
createAndCheckService(create(test, 3).withIngestionSchedule(null), authHeaders);
}
@Test
public void post_validService_as_non_admin_401(TestInfo test) {
// Create messaging service with different optional fields
Map<String, String> authHeaders = authHeaders("test@open-metadata.org");
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createAndCheckService(create(test, 1).withDescription(null), authHeaders));
TestUtils.assertResponse(exception, FORBIDDEN,
"Principal: CatalogPrincipal{name='test'} is not admin");
}
@Test
public void post_invalidIngestionSchedule_4xx(TestInfo test) {
// No jdbc connection set
CreateMessagingService create = create(test);
Schedule schedule = create.getIngestionSchedule();
// Invalid format
create.withIngestionSchedule(schedule.withRepeatFrequency("INVALID"));
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "Invalid ingestion repeatFrequency INVALID");
// Duration that contains years, months and seconds are not allowed
create.withIngestionSchedule(schedule.withRepeatFrequency("P1Y"));
exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST,
"Ingestion repeatFrequency can only contain Days, Hours, " +
"and Minutes - example P{d}DT{h}H{m}M");
create.withIngestionSchedule(schedule.withRepeatFrequency("P1M"));
exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST,
"Ingestion repeatFrequency can only contain Days, Hours, " +
"and Minutes - example P{d}DT{h}H{m}M");
create.withIngestionSchedule(schedule.withRepeatFrequency("PT1S"));
exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST,
"Ingestion repeatFrequency can only contain Days, Hours, " +
"and Minutes - example P{d}DT{h}H{m}M");
}
@Test
public void post_validIngestionSchedules_as_admin_200(TestInfo test) throws HttpResponseException {
Schedule schedule = new Schedule().withStartDate(RestUtil.DATE_TIME_FORMAT.format(new Date()));
schedule.withRepeatFrequency("PT60M"); // Repeat every 60M should be valid
createAndCheckService(create(test, 1).withIngestionSchedule(schedule), adminAuthHeaders());
schedule.withRepeatFrequency("PT1H49M");
createAndCheckService(create(test, 2).withIngestionSchedule(schedule), adminAuthHeaders());
schedule.withRepeatFrequency("P1DT1H49M");
createAndCheckService(create(test, 3).withIngestionSchedule(schedule), adminAuthHeaders());
}
@Test
public void post_ingestionScheduleIsTooShort_4xx(TestInfo test) {
// No jdbc connection set
CreateMessagingService create = create(test);
Schedule schedule = create.getIngestionSchedule();
create.withIngestionSchedule(schedule.withRepeatFrequency("PT1M")); // Repeat every 0 seconds
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createService(create, adminAuthHeaders()));
TestUtils.assertResponseContains(exception, BAD_REQUEST,
"Ingestion repeatFrequency is too short and must be more than 60 minutes");
create.withIngestionSchedule(schedule.withRepeatFrequency("PT59M")); // Repeat every 50 minutes 59 seconds
exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "Ingestion repeatFrequency is too short and must " +
"be more than 60 minutes");
}
@Test
public void put_updateNonExistentService_404() {
// Update messaging description and ingestion service that are null
UpdateMessagingService update = new UpdateMessagingService().withDescription("description1");
HttpResponseException exception = assertThrows(HttpResponseException.class, ()
-> updateMessagingService(TestUtils.NON_EXISTENT_ENTITY.toString(), update, OK, adminAuthHeaders()));
TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound("MessagingService",
TestUtils.NON_EXISTENT_ENTITY));
}
@Test
public void put_updateService_as_admin_2xx(TestInfo test) throws HttpResponseException {
MessagingService dbService = createAndCheckService(create(test).withDescription(null).withIngestionSchedule(null),
adminAuthHeaders());
String id = dbService.getId().toString();
String startDate = RestUtil.DATE_TIME_FORMAT.format(new Date());
// Update messaging description and ingestion service that are null
UpdateMessagingService update = new UpdateMessagingService().withDescription("description1");
updateAndCheckService(id, update, OK, adminAuthHeaders());
// Update ingestion schedule
Schedule schedule = new Schedule().withStartDate(startDate).withRepeatFrequency("P1D");
update.withIngestionSchedule(schedule);
updateAndCheckService(id, update, OK, adminAuthHeaders());
// Update description and ingestion schedule again
update.withDescription("description1").withIngestionSchedule(schedule.withRepeatFrequency("PT1H"));
updateAndCheckService(id, update, OK, adminAuthHeaders());
}
@Test
public void put_update_as_non_admin_401(TestInfo test) throws HttpResponseException {
Map<String, String> authHeaders = adminAuthHeaders();
MessagingService dbService = createAndCheckService(create(test).withDescription(null).withIngestionSchedule(null),
authHeaders);
String id = dbService.getId().toString();
RestUtil.DATE_TIME_FORMAT.format(new Date());
// Update messaging description and ingestion service that are null
UpdateMessagingService update = new UpdateMessagingService().withDescription("description1");
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
updateAndCheckService(id, update, OK, authHeaders("test@open-metadata.org")));
TestUtils.assertResponse(exception, FORBIDDEN, "Principal: CatalogPrincipal{name='test'} " +
"is not admin");
}
@Test
public void get_nonExistentService_404_notFound() {
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
getService(TestUtils.NON_EXISTENT_ENTITY, adminAuthHeaders()));
TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.MESSAGING_SERVICE,
TestUtils.NON_EXISTENT_ENTITY));
}
@Test
public void get_nonExistentServiceByName_404_notFound() {
HttpResponseException exception = assertThrows(HttpResponseException.class, ()
-> getServiceByName("invalidName", null, adminAuthHeaders()));
TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.MESSAGING_SERVICE,
"invalidName"));
}
public static MessagingService createAndCheckService(CreateMessagingService create,
Map<String, String> authHeaders) throws HttpResponseException {
MessagingService service = createService(create, authHeaders);
validateService(service, create.getName(), create.getDescription(), create.getIngestionSchedule());
// GET the newly created service and validate
MessagingService getService = getService(service.getId(), authHeaders);
validateService(getService, create.getName(), create.getDescription(), create.getIngestionSchedule());
// GET the newly created service by name and validate
getService = getServiceByName(service.getName(), null, authHeaders);
validateService(getService, create.getName(), create.getDescription(), create.getIngestionSchedule());
return service;
}
public static MessagingService createService(CreateMessagingService create,
Map<String, String> authHeaders) throws HttpResponseException {
return TestUtils.post(CatalogApplicationTest.getResource("services/messagingServices"),
create, MessagingService.class, authHeaders);
}
private static void validateService(MessagingService service, String expectedName, String expectedDescription,
Schedule expectedIngestion) {
assertNotNull(service.getId());
assertNotNull(service.getHref());
assertEquals(expectedName, service.getName());
assertEquals(expectedDescription, service.getDescription());
if (expectedIngestion != null) {
assertEquals(expectedIngestion.getStartDate(), service.getIngestionSchedule().getStartDate());
assertEquals(expectedIngestion.getRepeatFrequency(), service.getIngestionSchedule().getRepeatFrequency());
}
}
public static MessagingService getService(UUID id, Map<String, String> authHeaders) throws HttpResponseException {
return getService(id, null, authHeaders);
}
public static MessagingService getService(UUID id, String fields, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = CatalogApplicationTest.getResource("services/messagingServices/" + id);
target = fields != null ? target.queryParam("fields", fields) : target;
return TestUtils.get(target, MessagingService.class, authHeaders);
}
public static MessagingService getServiceByName(String name, String fields, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = CatalogApplicationTest.getResource("services/messagingServices/name/" + name);
target = fields != null ? target.queryParam("fields", fields) : target;
return TestUtils.get(target, MessagingService.class, authHeaders);
}
public static String getName(TestInfo test) {
return String.format("dbservice_%s", test.getDisplayName());
}
public static String getName(TestInfo test, int index) {
return String.format("dbservice_%d_%s", index, test.getDisplayName());
}
@Test
public void delete_ExistentMessagingService_as_admin_200(TestInfo test) throws HttpResponseException {
Map<String, String> authHeaders = adminAuthHeaders();
MessagingService messagingService = createService(create(test), authHeaders);
deleteService(messagingService.getId(), messagingService.getName(), authHeaders);
}
@Test
public void delete_as_user_401(TestInfo test) throws HttpResponseException {
Map<String, String> authHeaders = adminAuthHeaders();
MessagingService messagingService = createService(create(test), authHeaders);
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
deleteService(messagingService.getId(), messagingService.getName(),
authHeaders("test@open-metadata.org")));
TestUtils.assertResponse(exception, FORBIDDEN,
"Principal: CatalogPrincipal{name='test'} is not admin");
}
@Test
public void delete_notExistentMessagingService() {
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
getService(TestUtils.NON_EXISTENT_ENTITY, adminAuthHeaders()));
TestUtils.assertResponse(exception, NOT_FOUND,
CatalogExceptionMessage.entityNotFound(Entity.MESSAGING_SERVICE, TestUtils.NON_EXISTENT_ENTITY));
}
private void deleteService(UUID id, String name, Map<String, String> authHeaders) throws HttpResponseException {
TestUtils.delete(CatalogApplicationTest.getResource("services/messagingServices/" + id), authHeaders);
// Ensure deleted service does not exist
HttpResponseException exception = assertThrows(HttpResponseException.class, () -> getService(id, authHeaders));
TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.MESSAGING_SERVICE, id));
// Ensure deleted service does not exist when getting by name
exception = assertThrows(HttpResponseException.class, () -> getServiceByName(name, null, authHeaders));
TestUtils.assertResponse(exception, NOT_FOUND,
CatalogExceptionMessage.entityNotFound(Entity.MESSAGING_SERVICE, name));
}
public static CreateMessagingService create(TestInfo test) {
String startDate = RestUtil.DATE_TIME_FORMAT.format(new Date());
return new CreateMessagingService().withName(getName(test)).withServiceType(MessagingServiceType.Kafka)
.withIngestionSchedule(new Schedule().withStartDate(startDate).withRepeatFrequency("P1D"));
}
private static CreateMessagingService create(TestInfo test, int index) {
return new CreateMessagingService().withName(getName(test, index)).withServiceType(MessagingServiceType.Pulsar);
}
public static void updateAndCheckService(String id, UpdateMessagingService update, Status status,
Map<String, String> authHeaders) throws HttpResponseException {
MessagingService service = updateMessagingService(id, update, status, authHeaders);
validateService(service, service.getName(), update.getDescription(), update.getIngestionSchedule());
// GET the newly updated messaging and validate
MessagingService getService = getService(service.getId(), authHeaders);
validateService(getService, service.getName(), update.getDescription(), update.getIngestionSchedule());
// GET the newly updated messaging by name and validate
getService = getServiceByName(service.getName(), null, authHeaders);
validateService(getService, service.getName(), update.getDescription(), update.getIngestionSchedule());
}
public static MessagingService updateMessagingService(String id, UpdateMessagingService updated,
Status status, Map<String, String> authHeaders)
throws HttpResponseException {
return TestUtils.put(CatalogApplicationTest.getResource("services/messagingServices/" + id), updated,
MessagingService.class, status, authHeaders);
}
}