Issue-4917: Kafka ingestion fails with a validation error (#4918)

This commit is contained in:
Sriharsha Chintalapani 2022-05-12 20:12:42 -07:00 committed by GitHub
parent ccbbd8b65d
commit 626f03e5a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 20 additions and 19 deletions

View File

@ -43,7 +43,7 @@
},
"retentionTime": {
"description": "Retention time in milliseconds. For Kafka - `retention.ms` configuration.",
"type": "integer"
"type": "number"
},
"maximumMessageSize": {
"description": "Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.",
@ -55,7 +55,7 @@
},
"retentionSize": {
"description": "Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.",
"type": "integer",
"type": "number",
"default": "-1"
},
"topicConfig": {

View File

@ -113,7 +113,7 @@
},
"retentionTime": {
"description": "Retention time in milliseconds. For Kafka - `retention.ms` configuration.",
"type": "integer"
"type": "number"
},
"replicationFactor": {
"description": "Replication Factor in integer (more than 1).",
@ -129,7 +129,7 @@
},
"retentionSize": {
"description": "Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.",
"type": "integer",
"type": "number",
"default": "-1"
},
"topicConfig": {

View File

@ -109,8 +109,8 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
.withMinimumInSyncReplicas(1)
.withPartitions(1)
.withReplicationFactor(1)
.withRetentionTime(1)
.withRetentionSize(1)
.withRetentionTime(1.0)
.withRetentionSize(1.0)
.withSchemaText("abc")
.withSchemaType(SchemaType.Avro)
.withCleanupPolicies(List.of(CleanupPolicy.COMPACT));
@ -123,8 +123,8 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
.withMaximumMessageSize(2)
.withPartitions(2)
.withReplicationFactor(2)
.withRetentionTime(2)
.withRetentionSize(2)
.withRetentionTime(2.0)
.withRetentionSize(2.0)
.withSchemaText("bcd")
.withSchemaType(SchemaType.JSON)
.withCleanupPolicies(List.of(CleanupPolicy.DELETE));
@ -137,8 +137,8 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
change.getFieldsUpdated().add(new FieldChange().withName("minimumInSyncReplicas").withOldValue(1).withNewValue(2));
change.getFieldsUpdated().add(new FieldChange().withName("partitions").withOldValue(1).withNewValue(2));
change.getFieldsUpdated().add(new FieldChange().withName("replicationFactor").withOldValue(1).withNewValue(2));
change.getFieldsUpdated().add(new FieldChange().withName("retentionTime").withOldValue(1).withNewValue(2));
change.getFieldsUpdated().add(new FieldChange().withName("retentionSize").withOldValue(1).withNewValue(2));
change.getFieldsUpdated().add(new FieldChange().withName("retentionTime").withOldValue(1.0).withNewValue(2.0));
change.getFieldsUpdated().add(new FieldChange().withName("retentionSize").withOldValue(1.0).withNewValue(2.0));
change.getFieldsUpdated().add(new FieldChange().withName("schemaText").withOldValue("abc").withNewValue("bcd"));
change
.getFieldsUpdated()
@ -162,8 +162,8 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
.withMinimumInSyncReplicas(1)
.withPartitions(1)
.withReplicationFactor(1)
.withRetentionTime(1)
.withRetentionSize(1)
.withRetentionTime(1.0)
.withRetentionSize(1.0)
.withSchemaText("abc")
.withSchemaType(SchemaType.Avro)
.withCleanupPolicies(List.of(CleanupPolicy.COMPACT));
@ -178,8 +178,8 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
.withMaximumMessageSize(2)
.withPartitions(2)
.withReplicationFactor(2)
.withRetentionTime(2)
.withRetentionSize(2)
.withRetentionTime(2.0)
.withRetentionSize(2.0)
.withSchemaText("bcd")
.withSchemaType(SchemaType.JSON)
.withCleanupPolicies(List.of(CleanupPolicy.DELETE));
@ -192,8 +192,8 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
change.getFieldsUpdated().add(new FieldChange().withName("minimumInSyncReplicas").withOldValue(1).withNewValue(2));
change.getFieldsUpdated().add(new FieldChange().withName("partitions").withOldValue(1).withNewValue(2));
change.getFieldsUpdated().add(new FieldChange().withName("replicationFactor").withOldValue(1).withNewValue(2));
change.getFieldsUpdated().add(new FieldChange().withName("retentionTime").withOldValue(1).withNewValue(2));
change.getFieldsUpdated().add(new FieldChange().withName("retentionSize").withOldValue(1).withNewValue(2));
change.getFieldsUpdated().add(new FieldChange().withName("retentionTime").withOldValue(1.0).withNewValue(2.0));
change.getFieldsUpdated().add(new FieldChange().withName("retentionSize").withOldValue(1.0).withNewValue(2.0));
change.getFieldsUpdated().add(new FieldChange().withName("schemaText").withOldValue("abc").withNewValue("bcd"));
change
.getFieldsUpdated()

View File

@ -155,9 +155,10 @@ class KafkaSource(Source[CreateTopicRequest]):
).value
if "cleanup.policy" in config_response:
topic.cleanupPolicies = [
config_response.get("cleanup.policy").value
]
cleanup_policies = config_response.get(
"cleanup.policy"
).value
topic.cleanupPolicies = cleanup_policies.split(",")
topic_config = {}
for key, conf_response in config_response.items():