diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json index 57f356a2e5d..cf4f94856e7 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json @@ -43,7 +43,7 @@ }, "retentionTime": { "description": "Retention time in milliseconds. For Kafka - `retention.ms` configuration.", - "type": "integer" + "type": "number" }, "maximumMessageSize": { "description": "Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.", @@ -55,7 +55,7 @@ }, "retentionSize": { "description": "Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.", - "type": "integer", + "type": "number", "default": "-1" }, "topicConfig": { diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json index 6b537bf29d1..09c8d701049 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json @@ -113,7 +113,7 @@ }, "retentionTime": { "description": "Retention time in milliseconds. For Kafka - `retention.ms` configuration.", - "type": "integer" + "type": "number" }, "replicationFactor": { "description": "Replication Factor in integer (more than 1).", @@ -129,7 +129,7 @@ }, "retentionSize": { "description": "Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.", - "type": "integer", + "type": "number", "default": "-1" }, "topicConfig": { diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java index 1995473e36a..d77170732bd 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java @@ -109,8 +109,8 @@ public class TopicResourceTest extends EntityResourceTest { .withMinimumInSyncReplicas(1) .withPartitions(1) .withReplicationFactor(1) - .withRetentionTime(1) - .withRetentionSize(1) + .withRetentionTime(1.0) + .withRetentionSize(1.0) .withSchemaText("abc") .withSchemaType(SchemaType.Avro) .withCleanupPolicies(List.of(CleanupPolicy.COMPACT)); @@ -123,8 +123,8 @@ public class TopicResourceTest extends EntityResourceTest { .withMaximumMessageSize(2) .withPartitions(2) .withReplicationFactor(2) - .withRetentionTime(2) - .withRetentionSize(2) + .withRetentionTime(2.0) + .withRetentionSize(2.0) .withSchemaText("bcd") .withSchemaType(SchemaType.JSON) .withCleanupPolicies(List.of(CleanupPolicy.DELETE)); @@ -137,8 +137,8 @@ public class TopicResourceTest extends EntityResourceTest { change.getFieldsUpdated().add(new FieldChange().withName("minimumInSyncReplicas").withOldValue(1).withNewValue(2)); change.getFieldsUpdated().add(new FieldChange().withName("partitions").withOldValue(1).withNewValue(2)); change.getFieldsUpdated().add(new FieldChange().withName("replicationFactor").withOldValue(1).withNewValue(2)); - change.getFieldsUpdated().add(new FieldChange().withName("retentionTime").withOldValue(1).withNewValue(2)); - change.getFieldsUpdated().add(new FieldChange().withName("retentionSize").withOldValue(1).withNewValue(2)); + change.getFieldsUpdated().add(new FieldChange().withName("retentionTime").withOldValue(1.0).withNewValue(2.0)); + change.getFieldsUpdated().add(new FieldChange().withName("retentionSize").withOldValue(1.0).withNewValue(2.0)); change.getFieldsUpdated().add(new FieldChange().withName("schemaText").withOldValue("abc").withNewValue("bcd")); change .getFieldsUpdated() @@ -162,8 +162,8 @@ public class TopicResourceTest extends EntityResourceTest { .withMinimumInSyncReplicas(1) .withPartitions(1) .withReplicationFactor(1) - .withRetentionTime(1) - .withRetentionSize(1) + .withRetentionTime(1.0) + .withRetentionSize(1.0) .withSchemaText("abc") .withSchemaType(SchemaType.Avro) .withCleanupPolicies(List.of(CleanupPolicy.COMPACT)); @@ -178,8 +178,8 @@ public class TopicResourceTest extends EntityResourceTest { .withMaximumMessageSize(2) .withPartitions(2) .withReplicationFactor(2) - .withRetentionTime(2) - .withRetentionSize(2) + .withRetentionTime(2.0) + .withRetentionSize(2.0) .withSchemaText("bcd") .withSchemaType(SchemaType.JSON) .withCleanupPolicies(List.of(CleanupPolicy.DELETE)); @@ -192,8 +192,8 @@ public class TopicResourceTest extends EntityResourceTest { change.getFieldsUpdated().add(new FieldChange().withName("minimumInSyncReplicas").withOldValue(1).withNewValue(2)); change.getFieldsUpdated().add(new FieldChange().withName("partitions").withOldValue(1).withNewValue(2)); change.getFieldsUpdated().add(new FieldChange().withName("replicationFactor").withOldValue(1).withNewValue(2)); - change.getFieldsUpdated().add(new FieldChange().withName("retentionTime").withOldValue(1).withNewValue(2)); - change.getFieldsUpdated().add(new FieldChange().withName("retentionSize").withOldValue(1).withNewValue(2)); + change.getFieldsUpdated().add(new FieldChange().withName("retentionTime").withOldValue(1.0).withNewValue(2.0)); + change.getFieldsUpdated().add(new FieldChange().withName("retentionSize").withOldValue(1.0).withNewValue(2.0)); change.getFieldsUpdated().add(new FieldChange().withName("schemaText").withOldValue("abc").withNewValue("bcd")); change .getFieldsUpdated() diff --git a/ingestion/src/metadata/ingestion/source/kafka.py b/ingestion/src/metadata/ingestion/source/kafka.py index 23fdb54cfb9..84a3fd26a7e 100644 --- a/ingestion/src/metadata/ingestion/source/kafka.py +++ b/ingestion/src/metadata/ingestion/source/kafka.py @@ -155,9 +155,10 @@ class KafkaSource(Source[CreateTopicRequest]): ).value if "cleanup.policy" in config_response: - topic.cleanupPolicies = [ - config_response.get("cleanup.policy").value - ] + cleanup_policies = config_response.get( + "cleanup.policy" + ).value + topic.cleanupPolicies = cleanup_policies.split(",") topic_config = {} for key, conf_response in config_response.items():