feat(kafka-config): add variable KAFKA_CONSUMER_GROUP_ID to override group id value for kafka consumer (#2297)

This commit is contained in:
shakti-garg 2021-03-28 08:54:36 +05:30 committed by GitHub
parent 4591667e05
commit 15ed5bb067
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 14 additions and 3 deletions

View File

@ -47,7 +47,16 @@ We've included environment variables to customize the name each of these topics,
Please ensure that these environment variables are set consistently throughout your ecosystem. DataHub has a few different applications running which communicate with Kafka (see above).
**How to apply configuration?**
## Configuring Consumer Group Id
Kafka Consumers in Spring are configured using Kafka listeners. By default, consumer group id is same as listener id.
We've included an environment variable to customize the consumer group id, if your company or organization has specific naming rules.
### datahub-mce-consumer and datahub-mae-consumer
- `KAFKA_CONSUMER_GROUP_ID`: The name of the kafka consumer's group id.
## How to apply configuration?
- For quickstart, add these environment variables to the corresponding application's docker.env
- For helm charts, add these environment variables as extraEnvs to the corresponding application's chart.
For example,
@ -59,6 +68,8 @@ extraEnvs:
value: "MetadataAuditEvent"
- name: FAILED_METADATA_CHANGE_EVENT_NAME
value: "FailedMetadataChangeEvent"
- name: KAFKA_CONSUMER_GROUP_ID
value: "my-apps-mae-consumer"
```
## SSL

View File

@ -50,7 +50,7 @@ public class MetadataAuditEventsProcessor {
log.info("registered index builders {}", indexBuilders);
}
@KafkaListener(id = "mae-consumer-job-client", topics = "${KAFKA_TOPIC_NAME:" + Topics.METADATA_AUDIT_EVENT + "}")
@KafkaListener(id = "${KAFKA_CONSUMER_GROUP_ID:mae-consumer-job-client}", topics = "${KAFKA_TOPIC_NAME:" + Topics.METADATA_AUDIT_EVENT + "}")
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
final GenericRecord record = consumerRecord.value();
log.debug("Got MAE");

View File

@ -41,7 +41,7 @@ public class MetadataChangeEventsProcessor {
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(id = "mce-consumer-job-client",
@KafkaListener(id = "${KAFKA_CONSUMER_GROUP_ID:mce-consumer-job-client}",
topics = "${KAFKA_MCE_TOPIC_NAME:" + Topics.METADATA_CHANGE_EVENT + "}")
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
final GenericRecord record = consumerRecord.value();