Fix #218 Add support for messaging service - update 2

This commit is contained in:
sureshms 2021-08-21 12:54:24 -07:00
parent af439cdaf0
commit 9ab269b74e
7 changed files with 184 additions and 53 deletions

View File

@ -230,8 +230,13 @@ public class TopicResource {
public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext,
@Valid CreateTopic create) throws IOException {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
Topic topic = new Topic().withId(UUID.randomUUID()).withName(create.getName()).withService(create.getService())
.withDescription(create.getDescription());
Topic topic =
new Topic().withId(UUID.randomUUID()).withName(create.getName()).withDescription(create.getDescription())
.withService(create.getService()).withPartitions(create.getPartitions()).withSchema(create.getSchema())
.withSchemaType(create.getSchemaType()).withCleanupPolicies(create.getCleanupPolicies())
.withMaximumMessageSize(create.getMaximumMessageSize())
.withMinimumInSyncReplicas(create.getMinimumInSyncReplicas())
.withRetentionSize(create.getRetentionSize()).withRetentionTime(create.getRetentionTime());
topic = addHref(uriInfo, dao.create(topic, create.getService(), create.getOwner()));
return Response.created(topic.getHref()).entity(topic).build();
}

View File

@ -13,17 +13,54 @@
"description": "Description of the topic instance. What it has and how to use it.",
"type": "string"
},
"owner": {
"description": "Owner of this topic",
"$ref": "../../type/entityReference.json"
},
"service": {
"description": "Link to the messaging service where this topic is hosted in",
"$ref": "../../type/entityReference.json"
},
"partitions" : {
"description" : "Number of partitions into which the topic is divided.",
"type" : "integer",
"minimum": "1"
},
"schema" : {
"description" : "Schema used for message serialization. Optional as some topics may not have associated schemas.",
"type" : "string"
},
"schemaType" : {
"description" : "Schema used for message serialization.",
"$ref": "../../entity/data/topic.json#/definitions/schemaType"
},
"cleanupPolicies": {
"description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.",
"type": "array",
"items": {
"$ref" : "../../entity/data/topic.json#/definitions/cleanupPolicy"
}
},
"retentionTime": {
"description": "Retention time in milliseconds. For Kafka - `retention.ms` configuration.",
"type" : "number"
},
"maximumMessageSize" : {
"description": "Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.",
"type" : "integer"
},
"minimumInSyncReplicas" : {
"description": "Minimum number replicas in sync to control durability. For Kafka - `min.insync.replicas` configuration."
},
"retentionSize": {
"description": "Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.",
"type" : "number",
"default" : "-1"
},
"owner": {
"description": "Owner of this topic",
"$ref": "../../type/entityReference.json"
}
},
"required": [
"name",
"service"
"service",
"partitions"
]
}

View File

@ -4,8 +4,7 @@
"title": "Create Messaging service entity request",
"description": "Create Messaging service entity request",
"type": "object",
"properties" : {
"properties": {
"name": {
"description": "Name that identifies the this entity instance uniquely",
"type": "string",
@ -19,10 +18,23 @@
"serviceType": {
"$ref": "../../entity/services/messagingService.json#/definitions/messagingServiceType"
},
"ingestionSchedule" : {
"brokers": {
"description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.",
"$ref" : "../../entity/services/messagingService.json#/definitions/brokers"
},
"schemaRegistry": {
"description": "Schema registry URL",
"type": "string",
"format": "uri"
},
"ingestionSchedule": {
"description": "Schedule for running metadata ingestion jobs",
"$ref" : "../../type/schedule.json"
"$ref": "../../type/schedule.json"
}
},
"required": ["name", "serviceType"]
"required": [
"name",
"serviceType",
"brokers"
]
}

View File

@ -8,9 +8,42 @@
"topicName": {
"description": "Name that identifies a topic.",
"type": "string",
"javaType": "org.openmetadata.catalog.type.topic.TopicName",
"minLength": 1,
"maxLength": 64,
"pattern": "^[^.]*$"
},
"schemaType": {
"description": "Schema type used for the message.",
"javaType": "org.openmetadata.catalog.type.topic.SchemaType",
"enum" : [
"Avro",
"Protobuf",
"JSON",
"Other"
],
"javaEnums": [
{
"name": "Avro"
},
{
"name": "Protobuf"
},
{
"name": "JSON"
},
{
"name": "Other"
}
]
},
"cleanupPolicy": {
"javaType": "org.openmetadata.catalog.type.topic.CleanupPolicy",
"description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.",
"enum": [
"delete",
"compact"
]
}
},
"properties": {
@ -30,18 +63,28 @@
"description": "Description of the topic instance.",
"type": "string"
},
"service": {
"description": "Link to the messaging cluster/service where this topic is hosted in.",
"$ref": "../../type/entityReference.json"
},
"partitions" : {
"description" : "Number of partitions into which the topic is divided.",
"type" : "integer"
"type" : "integer",
"minimum": "1"
},
"cleanupPolicy": {
"description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.",
"schema" : {
"description" : "Schema used for message serialization. Optional as some topics may not have associated schemas.",
"type" : "string"
},
"schemaType" : {
"description" : "Schema used for message serialization.",
"$ref": "#/definitions/schemaType"
},
"cleanupPolicies": {
"description": "Topic clean up policies. For Kafka - `cleanup.policy` configuration.",
"type": "array",
"items": {
"enum": [
"delete",
"compact"
]
"$ref" : "#/definitions/cleanupPolicy"
}
},
"retentionTime": {
@ -50,13 +93,13 @@
},
"maximumMessageSize" : {
"description": "Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.",
"type" : "int"
"type" : "integer"
},
"minimumInSyncReplicas" : {
"description": "Minimum number replicas in sync to control durability. For Kafka - `min.insync.replicas` configuration."
},
"retentionSize": {
"description": "Maximum size of a partion in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.",
"description": "Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.",
"type" : "number",
"default" : "-1"
},
@ -64,17 +107,16 @@
"description": "Owner of this topic.",
"$ref": "../../type/entityReference.json"
},
"service": {
"description": "Link to the messaging cluster/service where this topic is hosted in.",
"$ref": "../../type/entityReference.json"
},
"href": {
"description": "Link to the resource corresponding to this entity.",
"$ref": "../../type/basic.json#/definitions/href"
}
},
"required": [
"id",
"name",
"fullyQualifiedName",
"partitions",
"service"
]
}

View File

@ -20,6 +20,14 @@
"name": "Pulsar"
}
]
},
"brokers": {
"description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.",
"type": "array",
"items": {
"type": "string"
},
"default": null
}
},
"properties": {
@ -43,10 +51,7 @@
},
"brokers": {
"description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.",
"type": "array",
"items": {
"type": "string"
}
"$ref" : "#/definitions/brokers"
},
"schemaRegistry" : {
"description": "Schema registry URL",
@ -66,6 +71,6 @@
"id",
"name",
"serviceType",
"href"
"brokers"
]
}

View File

@ -33,6 +33,7 @@ import org.openmetadata.catalog.util.TestUtils;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -58,12 +59,26 @@ public class MessagingServiceResourceTest extends CatalogApplicationTest {
}
@Test
public void post_serviceWithoutName_400_badRequest(TestInfo test) {
// Create messaging with mandatory name field empty
CreateMessagingService create = create(test).withName("");
public void post_withoutRequiredFields_400_badRequest(TestInfo test) {
// Create messaging with mandatory name field null
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createService(create, adminAuthHeaders()));
createService(create(test).withName(null), adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[name must not be null]");
// Create messaging with mandatory name field empty
exception = assertThrows(HttpResponseException.class, () ->
createService(create(test).withName(""), adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]");
// Create messaging with mandatory serviceType field empty
exception = assertThrows(HttpResponseException.class, () ->
createService(create(test).withServiceType(null), adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[serviceType must not be null]");
// Create messaging with mandatory brokers field empty
exception = assertThrows(HttpResponseException.class, () ->
createService(create(test).withBrokers(null), adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[brokers must not be null]");
}
@Test
@ -322,11 +337,15 @@ public class MessagingServiceResourceTest extends CatalogApplicationTest {
public static CreateMessagingService create(TestInfo test) {
String startDate = RestUtil.DATE_TIME_FORMAT.format(new Date());
return new CreateMessagingService().withName(getName(test)).withServiceType(MessagingServiceType.Kafka)
.withBrokers(List.of("192.1.1.1:0"))
.withIngestionSchedule(new Schedule().withStartDate(startDate).withRepeatFrequency("P1D"));
}
private static CreateMessagingService create(TestInfo test, int index) {
return new CreateMessagingService().withName(getName(test, index)).withServiceType(MessagingServiceType.Pulsar);
String startDate = RestUtil.DATE_TIME_FORMAT.format(new Date());
return new CreateMessagingService().withName(getName(test, index)).withServiceType(MessagingServiceType.Pulsar)
.withBrokers(List.of("192.1.1.1:0"))
.withIngestionSchedule(new Schedule().withStartDate(startDate).withRepeatFrequency("P1D"));
}
public static void updateAndCheckService(String id, UpdateMessagingService update, Status status,

View File

@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
import javax.json.JsonPatch;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -61,6 +62,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.readOnlyAttribute;
import static org.openmetadata.catalog.util.TestUtils.LONG_ENTITY_NAME;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
import static org.openmetadata.catalog.util.TestUtils.assertEntityPagination;
import static org.openmetadata.catalog.util.TestUtils.assertResponse;
@ -84,11 +86,11 @@ public class TopicResourceTest extends CatalogApplicationTest {
TEAM_OWNER1 = new EntityReference().withId(TEAM1.getId()).withType("team");
CreateMessagingService createService = new CreateMessagingService().withName("kafka")
.withServiceType(MessagingServiceType.Kafka);
.withServiceType(MessagingServiceType.Kafka).withBrokers(List.of("192.168.1.1:0"));
MessagingService service = MessagingServiceResourceTest.createService(createService, adminAuthHeaders());
KAFKA_REFERENCE = EntityUtil.getEntityReference(service);
createService.withName("pulsar").withServiceType(MessagingServiceType.Pulsar);
createService.withName("pulsar").withServiceType(MessagingServiceType.Pulsar).withBrokers(List.of("192.168.1.1:0"));
service = MessagingServiceResourceTest.createService(createService, adminAuthHeaders());
PULSAR_REFERENCE = EntityUtil.getEntityReference(service);
}
@ -102,15 +104,6 @@ public class TopicResourceTest extends CatalogApplicationTest {
assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]");
}
@Test
public void post_topicWithoutName_400_badRequest(TestInfo test) {
// Create topic with mandatory name field empty
CreateTopic create = create(test).withName("");
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createTopic(create, adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]");
}
@Test
public void post_topicAlreadyExists_409_conflict(TestInfo test) throws HttpResponseException {
CreateTopic create = create(test);
@ -121,7 +114,7 @@ public class TopicResourceTest extends CatalogApplicationTest {
}
@Test
public void post_validTopicss_as_admin_200_OK(TestInfo test) throws HttpResponseException {
public void post_validTopics_as_admin_200_OK(TestInfo test) throws HttpResponseException {
// Create team with different optional fields
CreateTopic create = create(test);
createAndCheckTopic(create, adminAuthHeaders());
@ -149,11 +142,29 @@ public class TopicResourceTest extends CatalogApplicationTest {
}
@Test
public void post_topicWithoutRequiredService_4xx(TestInfo test) {
CreateTopic create = create(test).withService(null);
public void post_topicWithoutRequiredFields_4xx(TestInfo test) {
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createTopic(create, adminAuthHeaders()));
TestUtils.assertResponseContains(exception, BAD_REQUEST, "service must not be null");
createTopic(create(test).withName(null), adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "[name must not be null]");
exception = assertThrows(HttpResponseException.class, () ->
createTopic(create(test).withName(LONG_ENTITY_NAME), adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]");
// Service is required field
exception = assertThrows(HttpResponseException.class, () ->
createTopic(create(test).withService(null), adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[service must not be null]");
// Partitions is required field
exception = assertThrows(HttpResponseException.class, () ->
createTopic(create(test).withPartitions(null), adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[partitions must not be null]");
// Partitions must be >= 1
exception = assertThrows(HttpResponseException.class, () ->
createTopic(create(test).withPartitions(0), adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[partitions must be greater than or equal to 1]");
}
@Test
@ -649,10 +660,10 @@ public class TopicResourceTest extends CatalogApplicationTest {
}
public static CreateTopic create(TestInfo test) {
return new CreateTopic().withName(getTopicName(test)).withService(KAFKA_REFERENCE);
return new CreateTopic().withName(getTopicName(test)).withService(KAFKA_REFERENCE).withPartitions(1);
}
public static CreateTopic create(TestInfo test, int index) {
return new CreateTopic().withName(getTopicName(test, index)).withService(KAFKA_REFERENCE);
return new CreateTopic().withName(getTopicName(test, index)).withService(KAFKA_REFERENCE).withPartitions(1);
}
}