feat(kafka): increase kafka message size and enable compression (#9038)

Co-authored-by: Pedro Silva <pedro@acryl.io>
This commit is contained in:
david-leifker 2023-10-29 16:26:05 -05:00 committed by GitHub
parent 9ae0e93d82
commit 3f4ab44a91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 135 additions and 40 deletions

View File

@ -851,8 +851,14 @@ jobs:
if: failure()
run: |
docker ps -a
docker logs datahub-gms >& gms-${{ matrix.test_strategy }}.log
docker logs datahub-actions >& actions-${{ matrix.test_strategy }}.log
docker logs datahub-gms >& gms-${{ matrix.test_strategy }}.log || true
docker logs datahub-actions >& actions-${{ matrix.test_strategy }}.log || true
docker logs datahub-mae-consumer >& mae-${{ matrix.test_strategy }}.log || true
docker logs datahub-mce-consumer >& mce-${{ matrix.test_strategy }}.log || true
docker logs broker >& broker-${{ matrix.test_strategy }}.log || true
docker logs mysql >& mysql-${{ matrix.test_strategy }}.log || true
docker logs elasticsearch >& elasticsearch-${{ matrix.test_strategy }}.log || true
docker logs datahub-frontend-react >& frontend-${{ matrix.test_strategy }}.log || true
- name: Upload logs
uses: actions/upload-artifact@v3
if: failure()

View File

@ -39,7 +39,7 @@ buildscript {
plugins {
id 'com.gorylenko.gradle-git-properties' version '2.4.0-rc2'
id 'com.github.johnrengelman.shadow' version '6.1.0'
id 'com.palantir.docker' version '0.35.0'
id 'com.palantir.docker' version '0.35.0' apply false
// https://blog.ltgt.net/javax-jakarta-mess-and-gradle-solution/
// TODO id "org.gradlex.java-ecosystem-capabilities" version "1.0"
}

View File

@ -1,6 +1,8 @@
package client;
import com.linkedin.metadata.config.kafka.ProducerConfiguration;
import com.typesafe.config.Config;
import config.ConfigurationProvider;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -35,12 +37,12 @@ public class KafkaTrackingProducer {
private final KafkaProducer<String, String> _producer;
@Inject
public KafkaTrackingProducer(@Nonnull Config config, ApplicationLifecycle lifecycle) {
public KafkaTrackingProducer(@Nonnull Config config, ApplicationLifecycle lifecycle, final ConfigurationProvider configurationProvider) {
_isEnabled = !config.hasPath("analytics.enabled") || config.getBoolean("analytics.enabled");
if (_isEnabled) {
_logger.debug("Analytics tracking is enabled");
_producer = createKafkaProducer(config);
_producer = createKafkaProducer(config, configurationProvider.getKafka().getProducer());
lifecycle.addStopHook(
() -> {
@ -62,13 +64,15 @@ public class KafkaTrackingProducer {
_producer.send(record);
}
private static KafkaProducer createKafkaProducer(Config config) {
private static KafkaProducer createKafkaProducer(Config config, ProducerConfiguration producerConfiguration) {
final Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-frontend");
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.getString("analytics.kafka.delivery.timeout.ms"));
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString("analytics.kafka.bootstrap.server"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Actor urn.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // JSON object.
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, producerConfiguration.getMaxRequestSize());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerConfiguration.getCompressionType());
final String securityProtocolConfig = "analytics.kafka.security.protocol";
if (config.hasPath(securityProtocolConfig)

View File

@ -1,6 +1,7 @@
package config;
import com.linkedin.metadata.config.cache.CacheConfiguration;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import lombok.Data;
@ -11,7 +12,6 @@ import org.springframework.context.annotation.PropertySource;
/**
* Minimal sharing between metadata-service and frontend
* Initially for use of client caching configuration.
* Does not use the factories module to avoid transitive dependencies.
*/
@EnableConfigurationProperties
@ -19,6 +19,10 @@ import org.springframework.context.annotation.PropertySource;
@ConfigurationProperties
@Data
public class ConfigurationProvider {
/**
* Kafka related configs.
*/
private KafkaConfiguration kafka;
/**
* Configuration for caching

View File

@ -5,4 +5,6 @@ KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
KAFKA_HEAP_OPTS=-Xms256m -Xmx256m
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
KAFKA_MESSAGE_MAX_BYTES=5242880
KAFKA_MAX_MESSAGE_BYTES=5242880

View File

@ -8,10 +8,12 @@ RUN addgroup -S datahub && adduser -S datahub -G datahub
# Upgrade Alpine and base packages
# PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762
RUN apk --no-cache --update-cache --available upgrade \
&& apk --no-cache add curl sqlite \
&& apk --no-cache add curl sqlite libc6-compat java-snappy \
&& apk --no-cache add openjdk11-jre --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community \
&& apk --no-cache add jattach --repository http://dl-cdn.alpinelinux.org/alpine/edge/community/
ENV LD_LIBRARY_PATH="/lib:/lib64"
FROM base as prod-install
COPY ./datahub-frontend.zip /

View File

@ -18,7 +18,7 @@ FROM alpine:3 AS base
ENV JMX_VERSION=0.18.0
# PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762
RUN apk --no-cache --update-cache --available upgrade \
&& apk --no-cache add curl bash coreutils gcompat sqlite \
&& apk --no-cache add curl bash coreutils gcompat sqlite libc6-compat java-snappy \
&& apk --no-cache add openjdk11-jre --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community \
&& apk --no-cache add jattach --repository http://dl-cdn.alpinelinux.org/alpine/edge/community/ \
&& curl -sS https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.46.v20220331/jetty-runner-9.4.46.v20220331.jar --output jetty-runner.jar \
@ -29,6 +29,8 @@ RUN apk --no-cache --update-cache --available upgrade \
&& cp /usr/lib/jvm/java-11-openjdk/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks
COPY --from=binary /go/bin/dockerize /usr/local/bin
ENV LD_LIBRARY_PATH="/lib:/lib64"
FROM base as prod-install
COPY war.war /datahub/datahub-gms/bin/war.war
COPY metadata-models/src/main/resources/entity-registry.yml /datahub/datahub-gms/resources/entity-registry.yml

View File

@ -18,7 +18,7 @@ FROM alpine:3 AS base
ENV JMX_VERSION=0.18.0
# PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762
RUN apk --no-cache --update-cache --available upgrade \
&& apk --no-cache add curl bash coreutils sqlite \
&& apk --no-cache add curl bash coreutils sqlite libc6-compat java-snappy \
&& apk --no-cache add openjdk11-jre --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community \
&& apk --no-cache add jattach --repository http://dl-cdn.alpinelinux.org/alpine/edge/community/ \
&& wget --no-verbose https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v1.24.0/opentelemetry-javaagent.jar \
@ -26,6 +26,8 @@ RUN apk --no-cache --update-cache --available upgrade \
&& cp /usr/lib/jvm/java-11-openjdk/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks
COPY --from=binary /go/bin/dockerize /usr/local/bin
ENV LD_LIBRARY_PATH="/lib:/lib64"
FROM base as prod-install
COPY mae-consumer-job.jar /datahub/datahub-mae-consumer/bin/
COPY metadata-models/src/main/resources/entity-registry.yml /datahub/datahub-mae-consumer/resources/entity-registry.yml

View File

@ -18,7 +18,7 @@ FROM alpine:3 AS base
ENV JMX_VERSION=0.18.0
# PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762
RUN apk --no-cache --update-cache --available upgrade \
&& apk --no-cache add curl bash sqlite \
&& apk --no-cache add curl bash sqlite libc6-compat java-snappy \
&& apk --no-cache add openjdk11-jre --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community \
&& apk --no-cache add jattach --repository http://dl-cdn.alpinelinux.org/alpine/edge/community/ \
&& wget --no-verbose https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v1.24.0/opentelemetry-javaagent.jar \
@ -33,6 +33,8 @@ COPY docker/datahub-mce-consumer/start.sh /datahub/datahub-mce-consumer/scripts/
COPY docker/monitoring/client-prometheus-config.yaml /datahub/datahub-mce-consumer/scripts/prometheus-config.yaml
RUN chmod +x /datahub/datahub-mce-consumer/scripts/start.sh
ENV LD_LIBRARY_PATH="/lib:/lib64"
FROM base as dev-install
# Dummy stage for development. Assumes code is built on your machine and mounted to this image.
# See this excellent thread https://github.com/docker/cli/issues/1134

View File

@ -18,7 +18,7 @@ FROM alpine:3 AS base
ENV JMX_VERSION=0.18.0
# PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762
RUN apk --no-cache --update-cache --available upgrade \
&& apk --no-cache add curl bash coreutils gcompat sqlite \
&& apk --no-cache add curl bash coreutils gcompat sqlite libc6-compat java-snappy \
&& apk --no-cache add openjdk11-jre --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community \
&& curl -sS https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.46.v20220331/jetty-runner-9.4.46.v20220331.jar --output jetty-runner.jar \
&& curl -sS https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-jmx/9.4.46.v20220331/jetty-jmx-9.4.46.v20220331.jar --output jetty-jmx.jar \
@ -28,6 +28,8 @@ RUN apk --no-cache --update-cache --available upgrade \
&& cp /usr/lib/jvm/java-11-openjdk/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks
COPY --from=binary /go/bin/dockerize /usr/local/bin
ENV LD_LIBRARY_PATH="/lib:/lib64"
FROM base as prod-install
COPY datahub-upgrade.jar /datahub/datahub-upgrade/bin/
COPY metadata-models/src/main/resources/entity-registry.yml /datahub/datahub-gms/resources/entity-registry.yml

View File

@ -2,6 +2,7 @@
: ${PARTITIONS:=1}
: ${REPLICATION_FACTOR:=1}
: ${MAX_MESSAGE_BYTES:=5242880}
: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:=PLAINTEXT}
@ -12,3 +13,4 @@ export KAFKA_HEAP_OPTS="-Xmx64M"
CONNECTION_PROPERTIES_PATH=/tmp/connection.properties
WORKERS=4
DELIMITER=";"

View File

@ -102,24 +102,43 @@ exec 4<&-
send() {
work_id=$1
topic_args=$2
echo sending $work_id $topic_args
echo "$work_id" "$topic_args" 1>&3 ## the fifo is fd 3
topic_config=$3
echo -e "sending $work_id\n worker_args: ${topic_args}${DELIMITER}${topic_config}"
echo "$work_id" "${topic_args}${DELIMITER}${topic_config}" 1>&3 ## the fifo is fd 3
}
## Produce the jobs to run.
send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME"
send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME"
send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME"
send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME"
send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME" \
"--entity-type topics --entity-name $METADATA_AUDIT_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME" \
"--entity-type topics --entity-name $FAILED_METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
# Set retention to 90 days
send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME"
send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME"
send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \
"--entity-type topics --entity-name $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME" \
"--entity-type topics --entity-name $PLATFORM_EVENT_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
# Infinite retention upgrade topic
send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME"
# Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite
# Please see the bug report below for details
# https://github.com/datahub-project/datahub/issues/7882
send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" \
"--entity-type topics --entity-name "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" --alter --add-config retention.ms=-1"
# Create topic for datahub usage event
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
send "$DATAHUB_USAGE_EVENT_NAME" "--partitions $PARTITIONS --topic $DATAHUB_USAGE_EVENT_NAME"
@ -150,8 +169,3 @@ if [[ $USE_CONFLUENT_SCHEMA_REGISTRY == "TRUE" ]]; then
--entity-name _schemas \
--alter --add-config cleanup.policy=compact
fi
# Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite
# Please see the bug report below for details
# https://github.com/datahub-project/datahub/issues/7882
kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --entity-type topics --entity-name "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" --alter --add-config retention.ms=-1

View File

@ -11,10 +11,18 @@ START_LOCK=$4
## the queue workers are supposed to be doing
job() {
i=$1
topic_args=$2
worker_args=$2
topic_args=$(echo $worker_args | cut -d "$DELIMITER" -f 1)
topic_config=$(echo $worker_args | cut -d "$DELIMITER" -f 2)
echo " $i: kafka-topics.sh --create --if-not-exist $topic_args"
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \
--replication-factor $REPLICATION_FACTOR \
$topic_args
if [[ ! -z "$topic_config" ]]; then
echo " $i: kafka-configs.sh $topic_config"
kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER $topic_config
fi
}
## This is the worker to read from the queue.

View File

@ -16,6 +16,8 @@ services:
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
- KAFKA_HEAP_OPTS=-Xms256m -Xmx256m
- KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
- KAFKA_MESSAGE_MAX_BYTES=5242880
- KAFKA_MAX_MESSAGE_BYTES=5242880
healthcheck:
interval: 1s
retries: 5

View File

@ -16,6 +16,8 @@ services:
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
- KAFKA_HEAP_OPTS=-Xms256m -Xmx256m
- KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
- KAFKA_MESSAGE_MAX_BYTES=5242880
- KAFKA_MAX_MESSAGE_BYTES=5242880
healthcheck:
interval: 1s
retries: 5

View File

@ -16,6 +16,8 @@ services:
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
- KAFKA_HEAP_OPTS=-Xms256m -Xmx256m
- KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
- KAFKA_MESSAGE_MAX_BYTES=5242880
- KAFKA_MAX_MESSAGE_BYTES=5242880
healthcheck:
interval: 1s
retries: 5

View File

@ -16,6 +16,8 @@ services:
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
- KAFKA_HEAP_OPTS=-Xms256m -Xmx256m
- KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
- KAFKA_MESSAGE_MAX_BYTES=5242880
- KAFKA_MAX_MESSAGE_BYTES=5242880
healthcheck:
interval: 1s
retries: 5

View File

@ -67,15 +67,19 @@ In general, there are **lots** of Kafka configuration environment variables for
These environment variables follow the standard Spring representation of properties as environment variables.
Simply replace the dot, `.`, with an underscore, `_`, and convert to uppercase.
| Variable | Default | Unit/Type | Components | Description |
|-----------------------------------------------------|----------------------------------------------|-----------|-----------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. |
| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `SCHEMA_REGISTRY_TYPE` | `INTERNAL` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry implementation. One of `INTERNAL` or `KAFKA` or `AWS_GLUE` |
| `KAFKA_SCHEMAREGISTRY_URL` | `http://localhost:8080/schema-registry/api/` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry url. Used for `INTERNAL` and `KAFKA`. The default value is for the `GMS` component. The `MCE Consumer` and `MAE Consumer` should be the `GMS` hostname and port. |
| `AWS_GLUE_SCHEMA_REGISTRY_REGION` | `us-east-1` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry implementation. |
| `AWS_GLUE_SCHEMA_REGISTRY_NAME` | `` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry. |
| `USE_CONFLUENT_SCHEMA_REGISTRY` | `true` | boolean | [`kafka-setup`] | Enable Confluent schema registry configuration. |
| Variable | Default | Unit/Type | Components | Description |
|-----------------------------------------------------|----------------------------------------------|-----------|--------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. |
| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `SCHEMA_REGISTRY_TYPE` | `INTERNAL` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry implementation. One of `INTERNAL` or `KAFKA` or `AWS_GLUE` |
| `KAFKA_SCHEMAREGISTRY_URL` | `http://localhost:8080/schema-registry/api/` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry url. Used for `INTERNAL` and `KAFKA`. The default value is for the `GMS` component. The `MCE Consumer` and `MAE Consumer` should be the `GMS` hostname and port. |
| `AWS_GLUE_SCHEMA_REGISTRY_REGION` | `us-east-1` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry implementation. |
| `AWS_GLUE_SCHEMA_REGISTRY_NAME` | `` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry. |
| `USE_CONFLUENT_SCHEMA_REGISTRY` | `true` | boolean | [`kafka-setup`] | Enable Confluent schema registry configuration. |
| `KAFKA_PRODUCER_MAX_REQUEST_SIZE` | `5242880` | integer | [`Frontend`, `GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES` | `5242880` | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. |
| `MAX_MESSAGE_BYTES` | `5242880` | integer | [`kafka-setup`] | Sets the max message size on the kakfa topics. |
| `KAFKA_PRODUCER_COMPRESSION_TYPE` | `snappy` | string | [`Frontend`, `GMS`, `MCE Consumer`, `MAE Consumer`] | The compression used by the producer. |
## Frontend

View File

@ -0,0 +1,10 @@
package com.linkedin.metadata.config.kafka;
import lombok.Data;
@Data
public class ConsumerConfiguration {
private int maxPartitionFetchBytes;
}

View File

@ -12,4 +12,6 @@ public class KafkaConfiguration {
private SchemaRegistryConfiguration schemaRegistry;
private ProducerConfiguration producer;
private ConsumerConfiguration consumer;
}

View File

@ -13,4 +13,8 @@ public class ProducerConfiguration {
private int requestTimeout;
private int backoffTimeout;
private String compressionType;
private int maxRequestSize;
}

View File

@ -228,6 +228,10 @@ kafka:
deliveryTimeout: ${KAFKA_PRODUCER_DELIVERY_TIMEOUT:30000}
requestTimeout: ${KAFKA_PRODUCER_REQUEST_TIMEOUT:3000}
backoffTimeout: ${KAFKA_PRODUCER_BACKOFF_TIMEOUT:500}
compressionType: ${KAFKA_PRODUCER_COMPRESSION_TYPE:snappy} # producer's compression algorithm
maxRequestSize: ${KAFKA_PRODUCER_MAX_REQUEST_SIZE:5242880} # the max bytes sent by the producer, also see kafka-setup MAX_MESSAGE_BYTES for matching value
consumer:
maxPartitionFetchBytes: ${KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES:5242880} # the max bytes consumed per partition
schemaRegistry:
type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # INTERNAL or KAFKA or AWS_GLUE
url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}

View File

@ -59,6 +59,8 @@ public class DataHubKafkaProducerFactory {
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaConfiguration.getProducer().getDeliveryTimeout());
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaConfiguration.getProducer().getRequestTimeout());
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaConfiguration.getProducer().getBackoffTimeout());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaConfiguration.getProducer().getCompressionType());
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaConfiguration.getProducer().getMaxRequestSize());
// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
schemaRegistryConfig.getProperties().entrySet()

View File

@ -70,6 +70,7 @@ public class KafkaEventConsumerFactory {
consumerProps.setEnableAutoCommit(true);
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaConfiguration.getBootstrapServers() != null && kafkaConfiguration.getBootstrapServers().length() > 0) {
consumerProps.setBootstrapServers(Arrays.asList(kafkaConfiguration.getBootstrapServers().split(",")));
@ -84,6 +85,9 @@ public class KafkaEventConsumerFactory {
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty())
.forEach(entry -> customizedProperties.put(entry.getKey(), entry.getValue()));
customizedProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
kafkaConfiguration.getConsumer().getMaxPartitionFetchBytes());
return customizedProperties;
}

View File

@ -4,8 +4,11 @@ import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@ -40,10 +43,14 @@ public class SimpleKafkaConsumerFactory {
consumerProps.setBootstrapServers(Arrays.asList(kafkaConfiguration.getBootstrapServers().split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> customizedProperties = consumerProps.buildProperties();
customizedProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
kafkaConfiguration.getConsumer().getMaxPartitionFetchBytes());
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties()));
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(customizedProperties));
log.info("Simple KafkaListenerContainerFactory built successfully");