diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java index 9cf6660b47e..710df72732b 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java @@ -230,8 +230,13 @@ public class TopicResource { public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateTopic create) throws IOException { SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); - Topic topic = new Topic().withId(UUID.randomUUID()).withName(create.getName()).withService(create.getService()) - .withDescription(create.getDescription()); + Topic topic = + new Topic().withId(UUID.randomUUID()).withName(create.getName()).withDescription(create.getDescription()) + .withService(create.getService()).withPartitions(create.getPartitions()).withSchema(create.getSchema()) + .withSchemaType(create.getSchemaType()).withCleanupPolicies(create.getCleanupPolicies()) + .withMaximumMessageSize(create.getMaximumMessageSize()) + .withMinimumInSyncReplicas(create.getMinimumInSyncReplicas()) + .withRetentionSize(create.getRetentionSize()).withRetentionTime(create.getRetentionTime()); topic = addHref(uriInfo, dao.create(topic, create.getService(), create.getOwner())); return Response.created(topic.getHref()).entity(topic).build(); } diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json index 47c4209f0e2..8e287745efc 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json @@ -13,17 +13,54 @@ "description": "Description of the topic instance. What it has and how to use it.", "type": "string" }, - "owner": { - "description": "Owner of this topic", - "$ref": "../../type/entityReference.json" - }, "service": { "description": "Link to the messaging service where this topic is hosted in", "$ref": "../../type/entityReference.json" + }, + "partitions" : { + "description" : "Number of partitions into which the topic is divided.", + "type" : "integer", + "minimum": "1" + }, + "schema" : { + "description" : "Schema used for message serialization. Optional as some topics may not have associated schemas.", + "type" : "string" + }, + "schemaType" : { + "description" : "Schema used for message serialization.", + "$ref": "../../entity/data/topic.json#/definitions/schemaType" + }, + "cleanupPolicies": { + "description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.", + "type": "array", + "items": { + "$ref" : "../../entity/data/topic.json#/definitions/cleanupPolicy" + } + }, + "retentionTime": { + "description": "Retention time in milliseconds. For Kafka - `retention.ms` configuration.", + "type" : "number" + }, + "maximumMessageSize" : { + "description": "Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.", + "type" : "integer" + }, + "minimumInSyncReplicas" : { + "description": "Minimum number replicas in sync to control durability. For Kafka - `min.insync.replicas` configuration." + }, + "retentionSize": { + "description": "Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.", + "type" : "number", + "default" : "-1" + }, + "owner": { + "description": "Owner of this topic", + "$ref": "../../type/entityReference.json" } }, "required": [ "name", - "service" + "service", + "partitions" ] } \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/json/schema/api/services/createMessagingService.json b/catalog-rest-service/src/main/resources/json/schema/api/services/createMessagingService.json index f7cf501a6d8..2b4095831be 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/services/createMessagingService.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/services/createMessagingService.json @@ -4,8 +4,7 @@ "title": "Create Messaging service entity request", "description": "Create Messaging service entity request", "type": "object", - - "properties" : { + "properties": { "name": { "description": "Name that identifies the this entity instance uniquely", "type": "string", @@ -19,10 +18,23 @@ "serviceType": { "$ref": "../../entity/services/messagingService.json#/definitions/messagingServiceType" }, - "ingestionSchedule" : { + "brokers": { + "description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.", + "$ref" : "../../entity/services/messagingService.json#/definitions/brokers" + }, + "schemaRegistry": { + "description": "Schema registry URL", + "type": "string", + "format": "uri" + }, + "ingestionSchedule": { "description": "Schedule for running metadata ingestion jobs", - "$ref" : "../../type/schedule.json" + "$ref": "../../type/schedule.json" } }, - "required": ["name", "serviceType"] + "required": [ + "name", + "serviceType", + "brokers" + ] } diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json index 20d6d5334db..41c6def71d0 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json @@ -8,9 +8,42 @@ "topicName": { "description": "Name that identifies a topic.", "type": "string", + "javaType": "org.openmetadata.catalog.type.topic.TopicName", "minLength": 1, "maxLength": 64, "pattern": "^[^.]*$" + }, + "schemaType": { + "description": "Schema type used for the message.", + "javaType": "org.openmetadata.catalog.type.topic.SchemaType", + "enum" : [ + "Avro", + "Protobuf", + "JSON", + "Other" + ], + "javaEnums": [ + { + "name": "Avro" + }, + { + "name": "Protobuf" + }, + { + "name": "JSON" + }, + { + "name": "Other" + } + ] + }, + "cleanupPolicy": { + "javaType": "org.openmetadata.catalog.type.topic.CleanupPolicy", + "description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.", + "enum": [ + "delete", + "compact" + ] } }, "properties": { @@ -30,18 +63,28 @@ "description": "Description of the topic instance.", "type": "string" }, + "service": { + "description": "Link to the messaging cluster/service where this topic is hosted in.", + "$ref": "../../type/entityReference.json" + }, "partitions" : { "description" : "Number of partitions into which the topic is divided.", - "type" : "integer" + "type" : "integer", + "minimum": "1" }, - "cleanupPolicy": { - "description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.", + "schema" : { + "description" : "Schema used for message serialization. Optional as some topics may not have associated schemas.", + "type" : "string" + }, + "schemaType" : { + "description" : "Schema used for message serialization.", + "$ref": "#/definitions/schemaType" + }, + "cleanupPolicies": { + "description": "Topic clean up policies. For Kafka - `cleanup.policy` configuration.", "type": "array", "items": { - "enum": [ - "delete", - "compact" - ] + "$ref" : "#/definitions/cleanupPolicy" } }, "retentionTime": { @@ -50,13 +93,13 @@ }, "maximumMessageSize" : { "description": "Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.", - "type" : "int" + "type" : "integer" }, "minimumInSyncReplicas" : { "description": "Minimum number replicas in sync to control durability. For Kafka - `min.insync.replicas` configuration." }, "retentionSize": { - "description": "Maximum size of a partion in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.", + "description": "Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.", "type" : "number", "default" : "-1" }, @@ -64,17 +107,16 @@ "description": "Owner of this topic.", "$ref": "../../type/entityReference.json" }, - "service": { - "description": "Link to the messaging cluster/service where this topic is hosted in.", - "$ref": "../../type/entityReference.json" - }, "href": { "description": "Link to the resource corresponding to this entity.", "$ref": "../../type/basic.json#/definitions/href" } }, "required": [ + "id", "name", + "fullyQualifiedName", + "partitions", "service" ] } \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json index 154a228ef4d..12f82d52db0 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json @@ -20,6 +20,14 @@ "name": "Pulsar" } ] + }, + "brokers": { + "description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.", + "type": "array", + "items": { + "type": "string" + }, + "default": null } }, "properties": { @@ -43,10 +51,7 @@ }, "brokers": { "description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.", - "type": "array", - "items": { - "type": "string" - } + "$ref" : "#/definitions/brokers" }, "schemaRegistry" : { "description": "Schema registry URL", @@ -66,6 +71,6 @@ "id", "name", "serviceType", - "href" + "brokers" ] } diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/MessagingServiceResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/MessagingServiceResourceTest.java index cb5cf28f68e..7e15ff47d07 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/MessagingServiceResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/MessagingServiceResourceTest.java @@ -33,6 +33,7 @@ import org.openmetadata.catalog.util.TestUtils; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response.Status; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -58,12 +59,26 @@ public class MessagingServiceResourceTest extends CatalogApplicationTest { } @Test - public void post_serviceWithoutName_400_badRequest(TestInfo test) { - // Create messaging with mandatory name field empty - CreateMessagingService create = create(test).withName(""); + public void post_withoutRequiredFields_400_badRequest(TestInfo test) { + // Create messaging with mandatory name field null HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - createService(create, adminAuthHeaders())); + createService(create(test).withName(null), adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[name must not be null]"); + + // Create messaging with mandatory name field empty + exception = assertThrows(HttpResponseException.class, () -> + createService(create(test).withName(""), adminAuthHeaders())); TestUtils.assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]"); + + // Create messaging with mandatory serviceType field empty + exception = assertThrows(HttpResponseException.class, () -> + createService(create(test).withServiceType(null), adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[serviceType must not be null]"); + + // Create messaging with mandatory brokers field empty + exception = assertThrows(HttpResponseException.class, () -> + createService(create(test).withBrokers(null), adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[brokers must not be null]"); } @Test @@ -322,11 +337,15 @@ public class MessagingServiceResourceTest extends CatalogApplicationTest { public static CreateMessagingService create(TestInfo test) { String startDate = RestUtil.DATE_TIME_FORMAT.format(new Date()); return new CreateMessagingService().withName(getName(test)).withServiceType(MessagingServiceType.Kafka) + .withBrokers(List.of("192.1.1.1:0")) .withIngestionSchedule(new Schedule().withStartDate(startDate).withRepeatFrequency("P1D")); } private static CreateMessagingService create(TestInfo test, int index) { - return new CreateMessagingService().withName(getName(test, index)).withServiceType(MessagingServiceType.Pulsar); + String startDate = RestUtil.DATE_TIME_FORMAT.format(new Date()); + return new CreateMessagingService().withName(getName(test, index)).withServiceType(MessagingServiceType.Pulsar) + .withBrokers(List.of("192.1.1.1:0")) + .withIngestionSchedule(new Schedule().withStartDate(startDate).withRepeatFrequency("P1D")); } public static void updateAndCheckService(String id, UpdateMessagingService update, Status status, diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java index e94a7b8e93c..df0cebafa00 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java @@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory; import javax.json.JsonPatch; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response.Status; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -61,6 +62,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; import static org.openmetadata.catalog.exception.CatalogExceptionMessage.readOnlyAttribute; +import static org.openmetadata.catalog.util.TestUtils.LONG_ENTITY_NAME; import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders; import static org.openmetadata.catalog.util.TestUtils.assertEntityPagination; import static org.openmetadata.catalog.util.TestUtils.assertResponse; @@ -84,11 +86,11 @@ public class TopicResourceTest extends CatalogApplicationTest { TEAM_OWNER1 = new EntityReference().withId(TEAM1.getId()).withType("team"); CreateMessagingService createService = new CreateMessagingService().withName("kafka") - .withServiceType(MessagingServiceType.Kafka); + .withServiceType(MessagingServiceType.Kafka).withBrokers(List.of("192.168.1.1:0")); MessagingService service = MessagingServiceResourceTest.createService(createService, adminAuthHeaders()); KAFKA_REFERENCE = EntityUtil.getEntityReference(service); - createService.withName("pulsar").withServiceType(MessagingServiceType.Pulsar); + createService.withName("pulsar").withServiceType(MessagingServiceType.Pulsar).withBrokers(List.of("192.168.1.1:0")); service = MessagingServiceResourceTest.createService(createService, adminAuthHeaders()); PULSAR_REFERENCE = EntityUtil.getEntityReference(service); } @@ -102,15 +104,6 @@ public class TopicResourceTest extends CatalogApplicationTest { assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]"); } - @Test - public void post_topicWithoutName_400_badRequest(TestInfo test) { - // Create topic with mandatory name field empty - CreateTopic create = create(test).withName(""); - HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - createTopic(create, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]"); - } - @Test public void post_topicAlreadyExists_409_conflict(TestInfo test) throws HttpResponseException { CreateTopic create = create(test); @@ -121,7 +114,7 @@ public class TopicResourceTest extends CatalogApplicationTest { } @Test - public void post_validTopicss_as_admin_200_OK(TestInfo test) throws HttpResponseException { + public void post_validTopics_as_admin_200_OK(TestInfo test) throws HttpResponseException { // Create team with different optional fields CreateTopic create = create(test); createAndCheckTopic(create, adminAuthHeaders()); @@ -149,11 +142,29 @@ public class TopicResourceTest extends CatalogApplicationTest { } @Test - public void post_topicWithoutRequiredService_4xx(TestInfo test) { - CreateTopic create = create(test).withService(null); + public void post_topicWithoutRequiredFields_4xx(TestInfo test) { HttpResponseException exception = assertThrows(HttpResponseException.class, () -> - createTopic(create, adminAuthHeaders())); - TestUtils.assertResponseContains(exception, BAD_REQUEST, "service must not be null"); + createTopic(create(test).withName(null), adminAuthHeaders())); + assertResponse(exception, BAD_REQUEST, "[name must not be null]"); + + exception = assertThrows(HttpResponseException.class, () -> + createTopic(create(test).withName(LONG_ENTITY_NAME), adminAuthHeaders())); + assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]"); + + // Service is required field + exception = assertThrows(HttpResponseException.class, () -> + createTopic(create(test).withService(null), adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[service must not be null]"); + + // Partitions is required field + exception = assertThrows(HttpResponseException.class, () -> + createTopic(create(test).withPartitions(null), adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[partitions must not be null]"); + + // Partitions must be >= 1 + exception = assertThrows(HttpResponseException.class, () -> + createTopic(create(test).withPartitions(0), adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[partitions must be greater than or equal to 1]"); } @Test @@ -649,10 +660,10 @@ public class TopicResourceTest extends CatalogApplicationTest { } public static CreateTopic create(TestInfo test) { - return new CreateTopic().withName(getTopicName(test)).withService(KAFKA_REFERENCE); + return new CreateTopic().withName(getTopicName(test)).withService(KAFKA_REFERENCE).withPartitions(1); } public static CreateTopic create(TestInfo test, int index) { - return new CreateTopic().withName(getTopicName(test, index)).withService(KAFKA_REFERENCE); + return new CreateTopic().withName(getTopicName(test, index)).withService(KAFKA_REFERENCE).withPartitions(1); } }