diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java index e44d02018a6..d448130b4bc 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java @@ -228,6 +228,7 @@ public class TopicResource { .withMaximumMessageSize(create.getMaximumMessageSize()) .withMinimumInSyncReplicas(create.getMinimumInSyncReplicas()) .withRetentionSize(create.getRetentionSize()).withRetentionTime(create.getRetentionTime()) + .withReplicationFactor(create.getReplicationFactor()) .withTags(create.getTags()); topic = addHref(uriInfo, dao.create(topic, create.getService(), create.getOwner())); return Response.created(topic.getHref()).entity(topic).build(); @@ -278,6 +279,7 @@ public class TopicResource { .withMaximumMessageSize(create.getMaximumMessageSize()) .withMinimumInSyncReplicas(create.getMinimumInSyncReplicas()) .withRetentionSize(create.getRetentionSize()).withRetentionTime(create.getRetentionTime()) + .withReplicationFactor(create.getReplicationFactor()) .withTags(create.getTags()); PutResponse response = dao.createOrUpdate(topic, create.getService(), create.getOwner()); topic = addHref(uriInfo, response.getEntity()); diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json index 82170cffbba..fb69cdbfa45 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json @@ -37,6 +37,10 @@ "$ref" : "../../entity/data/topic.json#/definitions/cleanupPolicy" } }, + "replicationFactor": { + "description": "Replication Factor in integer (more than 1).", + "type" : "integer" + }, "retentionTime": { "description": "Retention time in milliseconds. For Kafka - `retention.ms` configuration.", "type" : "number" diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json index f7160476b7c..428a07cefd2 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json @@ -91,6 +91,10 @@ "description": "Retention time in milliseconds. For Kafka - `retention.ms` configuration.", "type" : "number" }, + "replicationFactor": { + "description": "Replication Factor in integer (more than 1).", + "type" : "integer" + }, "maximumMessageSize" : { "description": "Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.", "type" : "integer" diff --git a/ingestion/examples/sample_data/topics/topics.json b/ingestion/examples/sample_data/topics/topics.json index d89fb5f981b..c5e20d0644e 100644 --- a/ingestion/examples/sample_data/topics/topics.json +++ b/ingestion/examples/sample_data/topics/topics.json @@ -31,7 +31,7 @@ "description": "Kafka topic to get products in a shop. This is constantly updating", "partitions": 128, "retentionSize": 3222122382273, - "cleanupPolicies": ["delete"], + "cleanupPolicies": ["compact,delete"], "schemaType": "Avro", "schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Shop Products\",\"fields\":[{\"name\":\"prodcut_id\",\"type\":\"int\"},{\"name\":\"product_variant_id\",\"type\":\"int\"},{\"name\":\"shop_id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"product_title\",\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"},{\"name\":\"quantity\",\"type\":\"int\"},{\"name\":\"product_vendor\",\"type\":\"int\"},{\"name\":\"fulfillable_quantity\",\"type\":\"int\"},{\"name\":\"fulfilment_service\",\"type\":\"string\"}]}" }, @@ -40,7 +40,7 @@ "description": "All the order events on our online store", "partitions": 128, "retentionSize": 3222122382273, - "cleanupPolicies": ["delete"], + "cleanupPolicies": ["compact"], "schemaType": "Avro", "schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"int\"},{\"name\":\"api_client_id\",\"type\":\"int\"},{\"name\":\"billing_address_id\",\"type\":\"int\"},{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"location_id\",\"type\":\"int\"},{\"name\":\"shipping_address_id\",\"type\":\"int\"},{\"name\":\"user_id\",\"type\":\"int\"},{\"name\":\"total_price\",\"type\":\"double\"},{\"name\":\"discount_code\",\"type\":\"string\"},{\"name\":\"processed_at\",\"type\":\"int\"}]}" }, @@ -49,7 +49,7 @@ "description": "All sales related events gets captured in this topic", "partitions": 128, "retentionSize": 3222122382273, - "cleanupPolicies": ["delete"], + "cleanupPolicies": ["compact,delete"], "schemaType": "Avro", "schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"sale_id\",\"type\":\"int\"},{\"name\":\"billing_address_id\",\"type\":\"int\"},{\"name\":\"billing_address_id\",\"type\":\"int\"},{\"name\":\"api_client_id\",\"type\":\"int\"},{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"product_id\",\"type\":\"int\"},{\"name\":\"location_id\",\"type\":\"int\"},{\"name\":\"order_id\",\"type\":\"double\"}]}" } diff --git a/ingestion/src/metadata/generated/schema/api/data/createTopic.py b/ingestion/src/metadata/generated/schema/api/data/createTopic.py index a8f71ec7495..97307991d17 100644 --- a/ingestion/src/metadata/generated/schema/api/data/createTopic.py +++ b/ingestion/src/metadata/generated/schema/api/data/createTopic.py @@ -37,6 +37,10 @@ class CreateTopic(BaseModel): None, description='Topic clean up policy. For Kafka - `cleanup.policy` configuration.', ) + replicationFactor: Optional[int] = Field( + None, + description='Replication Factor in integer (more than 1).', + ) retentionTime: Optional[float] = Field( None, description='Retention time in milliseconds. For Kafka - `retention.ms` configuration.',