feat(consumers): mce-consumer throttling based on mae-consumer lag (#10626)

This commit is contained in:
david-leifker 2024-05-31 15:53:02 -05:00 committed by GitHub
parent 91ab27b046
commit dfa9bd2779
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 768 additions and 3 deletions

View File

@ -250,6 +250,7 @@ project.ext.externalDependency = [
'springBootStarterValidation': "org.springframework.boot:spring-boot-starter-validation:$springBootVersion",
'springKafka': "org.springframework.kafka:spring-kafka:$springKafkaVersion",
'springActuator': "org.springframework.boot:spring-boot-starter-actuator:$springBootVersion",
'springRetry': "org.springframework.retry:spring-retry:2.0.6",
'swaggerAnnotations': 'io.swagger.core.v3:swagger-annotations:2.2.15',
'swaggerCli': 'io.swagger.codegen.v3:swagger-codegen-cli:3.0.46',
'swaggerCore': 'io.swagger.core.v3:swagger-core:2.2.7',

View File

@ -18,6 +18,7 @@ dependencies {
annotationProcessor externalDependency.lombok
testImplementation externalDependency.mockito
testImplementation externalDependency.testng
constraints {
implementation(externalDependency.log4jCore) {

View File

@ -0,0 +1,246 @@
package com.datahub.metadata.dao.producer;
import com.codahale.metrics.Gauge;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.metadata.config.MetadataChangeProposalConfig;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.util.Pair;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.ExponentialBackOff;
@Slf4j
@Builder(toBuilder = true)
public class KafkaProducerThrottle {
@Nonnull private final EntityRegistry entityRegistry;
@Nonnull private final Admin kafkaAdmin;
@Nonnull private final MetadataChangeProposalConfig.ThrottlesConfig config;
@Nonnull private final String mclConsumerGroupId;
@Nonnull private final String versionedTopicName;
@Nonnull private final String timeseriesTopicName;
@Nonnull private final Consumer<Boolean> pauseConsumer;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final Map<MclType, Long> medianLag = new ConcurrentHashMap<>();
private final Map<MclType, BackOffExecution> backoffMap = new ConcurrentHashMap<>();
/** Update lag information at a given rate */
public KafkaProducerThrottle start() {
if ((config.getVersioned().isEnabled() || config.getTimeseries().isEnabled())
&& config.getUpdateIntervalMs() > 0) {
scheduler.scheduleAtFixedRate(
() -> {
refresh();
try {
throttle();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},
config.getUpdateIntervalMs(),
config.getUpdateIntervalMs(),
TimeUnit.MILLISECONDS);
}
return this;
}
@VisibleForTesting
public void refresh() {
medianLag.putAll(getMedianLag());
log.info("MCL medianLag: {}", medianLag);
}
@VisibleForTesting
public void stop() {
scheduler.shutdown();
}
/**
* Get copy of the lag info
*
* @return median lag per mcl topic
*/
@VisibleForTesting
public Map<MclType, Long> getLag() {
return medianLag.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@VisibleForTesting
public boolean isThrottled(MclType mclType) {
if (getThrottleConfig(mclType).isEnabled() && medianLag.containsKey(mclType)) {
return medianLag.get(mclType) > getThrottleConfig(mclType).getThreshold();
}
return false;
}
@VisibleForTesting
public long computeNextBackOff(MclType mclType) {
if (isThrottled(mclType)) {
BackOffExecution backOffExecution =
backoffMap.computeIfAbsent(
mclType,
k -> {
MetadataChangeProposalConfig.ThrottleConfig throttleConfig =
getThrottleConfig(mclType);
ExponentialBackOff backoff =
new ExponentialBackOff(
throttleConfig.getInitialIntervalMs(), throttleConfig.getMultiplier());
backoff.setMaxAttempts(throttleConfig.getMaxAttempts());
backoff.setMaxInterval(throttleConfig.getMaxIntervalMs());
return backoff.start();
});
return backOffExecution.nextBackOff();
}
return 0;
}
@VisibleForTesting
public void throttle() throws InterruptedException {
for (MclType mclType : MclType.values()) {
if (isThrottled(mclType)) {
long backoffWaitMs = computeNextBackOff(mclType);
if (backoffWaitMs > 0) {
log.warn(
"Throttled producer Topic: {} Duration: {} ms MedianLag: {}",
getTopicName(mclType),
backoffWaitMs,
medianLag.get(mclType));
MetricUtils.gauge(
this.getClass(),
String.format("%s_throttled", getTopicName(mclType)),
() -> (Gauge<?>) () -> 1);
MetricUtils.counter(
this.getClass(), String.format("%s_throttledCount", getTopicName(mclType)))
.inc();
log.info("Pausing MCE consumer for {} ms.", backoffWaitMs);
pauseConsumer.accept(true);
Thread.sleep(backoffWaitMs);
log.info("Resuming MCE consumer.");
pauseConsumer.accept(false);
// if throttled for one topic, skip remaining
return;
} else {
// no throttle or exceeded configuration limits
log.info("MCE consumer throttle exponential backoff reset.");
backoffMap.remove(mclType);
MetricUtils.gauge(
this.getClass(),
String.format("%s_throttled", getTopicName(mclType)),
() -> (Gauge<?>) () -> 0);
}
} else {
// not throttled, remove backoff tracking
log.info("MCE consumer throttle exponential backoff reset.");
backoffMap.remove(mclType);
MetricUtils.gauge(
this.getClass(),
String.format("%s_throttled", getTopicName(mclType)),
() -> (Gauge<?>) () -> 0);
}
}
}
private Map<MclType, Long> getMedianLag() {
try {
Map<TopicPartition, OffsetAndMetadata> mclConsumerOffsets =
kafkaAdmin
.listConsumerGroupOffsets(mclConsumerGroupId)
.partitionsToOffsetAndMetadata()
.get()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<TopicPartition, OffsetSpec> latestOffsetRequest =
mclConsumerOffsets.keySet().stream()
.map(offsetAndMetadata -> Map.entry(offsetAndMetadata, OffsetSpec.latest()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<TopicPartition, Long> endOffsetValues =
kafkaAdmin.listOffsets(latestOffsetRequest).all().get().entrySet().stream()
.map(entry -> Map.entry(entry.getKey(), entry.getValue().offset()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Stream.of(
Pair.of(MclType.VERSIONED, versionedTopicName),
Pair.of(MclType.TIMESERIES, timeseriesTopicName))
.map(
topic -> {
MclType mclType = topic.getFirst();
String topicName = topic.getSecond();
Map<TopicPartition, OffsetAndMetadata> topicOffsets =
mclConsumerOffsets.entrySet().stream()
.filter(entry -> entry.getKey().topic().equals(topicName))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
List<Double> offsetValues =
topicOffsets.values().stream()
.map(OffsetAndMetadata::offset)
.map(Long::doubleValue)
.collect(Collectors.toList());
long offsetMedian = getMedian(offsetValues).longValue();
List<Double> topicEndOffsetValues =
topicOffsets.keySet().stream()
.map(topicPart -> endOffsetValues.getOrDefault(topicPart, 0L))
.map(Long::doubleValue)
.collect(Collectors.toList());
long endOffsetMedian = getMedian(topicEndOffsetValues).longValue();
return Map.entry(mclType, Math.max(0, endOffsetMedian - offsetMedian));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} catch (ExecutionException | InterruptedException e) {
log.error("Error fetching consumer group offsets.", e);
return Map.of(MclType.VERSIONED, 0L, MclType.TIMESERIES, 0L);
}
}
private MetadataChangeProposalConfig.ThrottleConfig getThrottleConfig(MclType mclType) {
MetadataChangeProposalConfig.ThrottleConfig throttleConfig;
switch (mclType) {
case VERSIONED -> throttleConfig = config.getVersioned();
case TIMESERIES -> throttleConfig = config.getTimeseries();
default -> throw new IllegalStateException();
}
return throttleConfig;
}
private String getTopicName(MclType mclType) {
return MclType.TIMESERIES.equals(mclType) ? timeseriesTopicName : versionedTopicName;
}
private static Double getMedian(Collection<Double> listValues) {
double[] values = listValues.stream().mapToDouble(d -> d).sorted().toArray();
double median;
if (values.length % 2 == 0)
median = (values[values.length / 2] + values[values.length / 2 - 1]) / 2;
else median = values[values.length / 2];
return median;
}
public enum MclType {
TIMESERIES,
VERSIONED
}
}

View File

@ -0,0 +1,363 @@
package com.datahub.metadata.dao.producer;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import com.linkedin.metadata.config.MetadataChangeProposalConfig;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.Topics;
import com.linkedin.util.Pair;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.Test;
public class KafkaProducerThrottleTest {
private static final List<String> STANDARD_TOPICS =
List.of(Topics.METADATA_CHANGE_LOG_VERSIONED, Topics.METADATA_CHANGE_LOG_TIMESERIES);
private static final String STANDARD_MCL_CONSUMER_GROUP_ID = "generic-mae-consumer-job-client";
@Test
public void testLagCalculation() throws ExecutionException, InterruptedException {
// 3 partitions
// Consumer offsets: 1, 2, 3
// End offsets: 2, 4, 6
// Lag: 1, 2, 3
// MedianLag: 2
AdminClient mockAdmin =
mockKafka(
generateLag(
STANDARD_TOPICS,
topicPart -> (long) topicPart.partition() + 1,
topicPart -> ((long) topicPart.partition() + 1) * 2,
3));
KafkaProducerThrottle test =
KafkaProducerThrottle.builder()
.config(noSchedulerConfig().getThrottle())
.kafkaAdmin(mockAdmin)
.versionedTopicName(STANDARD_TOPICS.get(0))
.timeseriesTopicName(STANDARD_TOPICS.get(1))
.entityRegistry(mock(EntityRegistry.class))
.mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID)
.pauseConsumer(mock(Consumer.class))
.build();
// Refresh calculations
test.refresh();
assertEquals(
test.getLag(),
Map.of(
KafkaProducerThrottle.MclType.VERSIONED, 2L,
KafkaProducerThrottle.MclType.TIMESERIES, 2L));
}
@Test
public void testThrottle() throws ExecutionException, InterruptedException {
MetadataChangeProposalConfig.ThrottlesConfig noThrottleConfig =
noSchedulerConfig().getThrottle();
noThrottleConfig
.getVersioned()
.setThreshold(10)
.setInitialIntervalMs(1)
.setMultiplier(1)
.setMaxAttempts(1)
.setMaxIntervalMs(1);
MetadataChangeProposalConfig.ThrottlesConfig throttleConfig = noSchedulerConfig().getThrottle();
throttleConfig
.getVersioned()
.setThreshold(1)
.setInitialIntervalMs(1)
.setMultiplier(1)
.setMaxAttempts(1)
.setMaxIntervalMs(1);
// 3 partitions
// Consumer offsets: 1, 2, 3
// End offsets: 2, 4, 6
// Lag: 1, 2, 3
// MedianLag: 2
AdminClient mockAdmin =
mockKafka(
generateLag(
STANDARD_TOPICS,
topicPart -> (long) topicPart.partition() + 1,
topicPart -> ((long) topicPart.partition() + 1) * 2,
3));
Consumer<Boolean> pauseFunction = mock(Consumer.class);
KafkaProducerThrottle test =
KafkaProducerThrottle.builder()
.config(noThrottleConfig)
.kafkaAdmin(mockAdmin)
.versionedTopicName(STANDARD_TOPICS.get(0))
.timeseriesTopicName(STANDARD_TOPICS.get(1))
.entityRegistry(mock(EntityRegistry.class))
.mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID)
.pauseConsumer(pauseFunction)
.build();
// Refresh calculations
test.refresh();
assertEquals(
test.getLag(),
Map.of(
KafkaProducerThrottle.MclType.VERSIONED, 2L,
KafkaProducerThrottle.MclType.TIMESERIES, 2L));
assertFalse(
test.isThrottled(KafkaProducerThrottle.MclType.VERSIONED),
"Expected not throttling, lag is below threshold");
assertFalse(test.isThrottled(KafkaProducerThrottle.MclType.TIMESERIES));
test.throttle();
verifyNoInteractions(pauseFunction);
reset(pauseFunction);
KafkaProducerThrottle test2 = test.toBuilder().config(throttleConfig).build();
// Refresh calculations
test2.refresh();
assertEquals(
test2.getLag(),
Map.of(
KafkaProducerThrottle.MclType.VERSIONED, 2L,
KafkaProducerThrottle.MclType.TIMESERIES, 2L));
assertTrue(
test2.isThrottled(KafkaProducerThrottle.MclType.VERSIONED),
"Expected throttling, lag is above threshold.");
assertFalse(
test2.isThrottled(KafkaProducerThrottle.MclType.TIMESERIES),
"Expected not throttling. Timeseries is disabled");
test2.throttle();
// verify 1ms pause and resume
verify(pauseFunction).accept(eq(true));
verify(pauseFunction).accept(eq(false));
verifyNoMoreInteractions(pauseFunction);
}
@Test
public void testBackOff() throws ExecutionException, InterruptedException {
MetadataChangeProposalConfig.ThrottlesConfig throttleConfig = noSchedulerConfig().getThrottle();
throttleConfig
.getVersioned()
.setThreshold(1)
.setInitialIntervalMs(1)
.setMultiplier(2)
.setMaxAttempts(5)
.setMaxIntervalMs(8);
// 3 partitions
// Consumer offsets: 1, 2, 3
// End offsets: 2, 4, 6
// Lag: 1, 2, 3
// MedianLag: 2
AdminClient mockAdmin =
mockKafka(
generateLag(
STANDARD_TOPICS,
topicPart -> (long) topicPart.partition() + 1,
topicPart -> ((long) topicPart.partition() + 1) * 2,
3));
KafkaProducerThrottle test =
KafkaProducerThrottle.builder()
.config(throttleConfig)
.kafkaAdmin(mockAdmin)
.versionedTopicName(STANDARD_TOPICS.get(0))
.timeseriesTopicName(STANDARD_TOPICS.get(1))
.entityRegistry(mock(EntityRegistry.class))
.mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID)
.pauseConsumer(mock(Consumer.class))
.build();
// Refresh calculations
test.refresh();
assertEquals(
test.getLag(),
Map.of(
KafkaProducerThrottle.MclType.VERSIONED, 2L,
KafkaProducerThrottle.MclType.TIMESERIES, 2L));
assertTrue(
test.isThrottled(KafkaProducerThrottle.MclType.VERSIONED),
"Expected throttling, lag is above threshold.");
assertFalse(
test.isThrottled(KafkaProducerThrottle.MclType.TIMESERIES),
"Expected no throttling. Timeseries is disabled");
assertEquals(
test.computeNextBackOff(KafkaProducerThrottle.MclType.TIMESERIES),
0L,
"Expected no backoff. Timeseries is disabled.");
assertEquals(
test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED), 1L, "Expected initial 1");
assertEquals(
test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED),
2L,
"Expected second 2^1");
assertEquals(
test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED), 4L, "Expected third 2^2");
assertEquals(
test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED),
8L,
"Expected fourth 2^3");
assertEquals(
test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED),
8L,
"Expected fifth max interval at 8");
assertEquals(
test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED),
-1L,
"Expected max attempts");
}
@Test
public void testScheduler() throws ExecutionException, InterruptedException {
MetadataChangeProposalConfig config = new MetadataChangeProposalConfig();
MetadataChangeProposalConfig.ThrottlesConfig throttlesConfig =
new MetadataChangeProposalConfig.ThrottlesConfig()
.setUpdateIntervalMs(10); // configure fast update for test
throttlesConfig.setVersioned(
new MetadataChangeProposalConfig.ThrottleConfig()
.setEnabled(true) // enable 1 throttle config to activate
);
throttlesConfig.setTimeseries(
new MetadataChangeProposalConfig.ThrottleConfig().setEnabled(false));
config.setThrottle(throttlesConfig);
// 1 lag, 1 partition
AdminClient mockAdmin =
mockKafka(generateLag(STANDARD_TOPICS, topicPart -> 1L, topicPart -> 2L, 1));
KafkaProducerThrottle test =
KafkaProducerThrottle.builder()
.config(throttlesConfig)
.kafkaAdmin(mockAdmin)
.versionedTopicName(STANDARD_TOPICS.get(0))
.timeseriesTopicName(STANDARD_TOPICS.get(1))
.entityRegistry(mock(EntityRegistry.class))
.mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID)
.pauseConsumer(mock(Consumer.class))
.build();
try {
test.start();
Thread.sleep(50);
assertEquals(
test.getLag(),
Map.of(
KafkaProducerThrottle.MclType.VERSIONED, 1L,
KafkaProducerThrottle.MclType.TIMESERIES, 1L),
"Expected lag updated");
} finally {
test.stop();
}
}
private static MetadataChangeProposalConfig noSchedulerConfig() {
MetadataChangeProposalConfig config = new MetadataChangeProposalConfig();
MetadataChangeProposalConfig.ThrottlesConfig throttlesConfig =
new MetadataChangeProposalConfig.ThrottlesConfig()
.setUpdateIntervalMs(0); // no scheduler, manual update
throttlesConfig.setVersioned(
new MetadataChangeProposalConfig.ThrottleConfig()
.setEnabled(true) // enable 1 throttle config to activate
);
throttlesConfig.setTimeseries(
new MetadataChangeProposalConfig.ThrottleConfig().setEnabled(false));
config.setThrottle(throttlesConfig);
return config;
}
private static Pair<Map<TopicPartition, OffsetAndMetadata>, Map<TopicPartition, Long>>
generateLag(
Collection<String> topicNames,
Function<TopicPartition, Long> consumerOffset,
Function<TopicPartition, Long> endOffset,
int partitions) {
Set<TopicPartition> topicPartitions =
topicNames.stream()
.flatMap(
topicName ->
IntStream.range(0, partitions)
.mapToObj(partitionNum -> new TopicPartition(topicName, partitionNum)))
.collect(Collectors.toSet());
Map<TopicPartition, OffsetAndMetadata> consumerOffsetMap =
topicPartitions.stream()
.map(
topicPartition ->
Map.entry(
topicPartition,
new OffsetAndMetadata(consumerOffset.apply(topicPartition))))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<TopicPartition, Long> endOffsetMap =
topicPartitions.stream()
.map(topicPartition -> Map.entry(topicPartition, endOffset.apply(topicPartition)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Pair.of(consumerOffsetMap, endOffsetMap);
}
private static AdminClient mockKafka(
Pair<Map<TopicPartition, OffsetAndMetadata>, Map<TopicPartition, Long>> offsetPair)
throws ExecutionException, InterruptedException {
AdminClient mockKafkaAdmin = mock(AdminClient.class);
// consumer offsets
ListConsumerGroupOffsetsResult mockConsumerOffsetsResult =
mock(ListConsumerGroupOffsetsResult.class);
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> mockConsumerFuture =
mock(KafkaFuture.class);
when(mockConsumerOffsetsResult.partitionsToOffsetAndMetadata()).thenReturn(mockConsumerFuture);
when(mockConsumerFuture.get()).thenReturn(offsetPair.getFirst());
when(mockKafkaAdmin.listConsumerGroupOffsets(anyString()))
.thenReturn(mockConsumerOffsetsResult);
// end offsets
ListOffsetsResult mockOffsetsResult = mock(ListOffsetsResult.class);
KafkaFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> mockOffsetFuture =
mock(KafkaFuture.class);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> resultMap =
offsetPair.getSecond().entrySet().stream()
.map(
entry -> {
ListOffsetsResult.ListOffsetsResultInfo mockInfo =
mock(ListOffsetsResult.ListOffsetsResultInfo.class);
when(mockInfo.offset()).thenReturn(entry.getValue());
return Map.entry(entry.getKey(), mockInfo);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
when(mockOffsetFuture.get()).thenReturn(resultMap);
when(mockOffsetsResult.all()).thenReturn(mockOffsetFuture);
when(mockKafkaAdmin.listOffsets(anyMap())).thenReturn(mockOffsetsResult);
return mockKafkaAdmin;
}
}

View File

@ -22,6 +22,7 @@ dependencies {
implementation project(':metadata-events:mxe-utils-avro')
implementation project(':metadata-io')
implementation project(':metadata-service:restli-client-api')
implementation project(':metadata-dao-impl:kafka-producer')
implementation spec.product.pegasus.restliClient
implementation spec.product.pegasus.restliCommon
implementation externalDependency.elasticSearchRest

View File

@ -0,0 +1,30 @@
package com.linkedin.metadata.config;
import lombok.Data;
import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class MetadataChangeProposalConfig {
ThrottlesConfig throttle;
@Data
@Accessors(chain = true)
public static class ThrottlesConfig {
Integer updateIntervalMs;
ThrottleConfig versioned;
ThrottleConfig timeseries;
}
@Data
@Accessors(chain = true)
public static class ThrottleConfig {
boolean enabled;
Integer threshold;
Integer maxAttempts;
Integer initialIntervalMs;
Integer multiplier;
Integer maxIntervalMs;
}
}

View File

@ -460,3 +460,26 @@ forms:
businessAttribute:
fetchRelatedEntitiesCount: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_COUNT:20000}
fetchRelatedEntitiesBatchSize: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_BATCH_SIZE:1000}
metadataChangeProposal:
throttle:
updateIntervalMs: ${MCP_THROTTLE_UPDATE_INTERVAL_MS:60000}
# Versioned MCL topic
versioned:
# Whether to throttle MCP processing based on MCL backlog
enabled: ${MCP_VERSIONED_THROTTLE_ENABLED:false}
threshold: ${MCP_VERSIONED_THRESHOLD:4000} # throttle threshold
maxAttempts: ${MCP_VERSIONED_MAX_ATTEMPTS:1000}
initialIntervalMs: ${MCP_VERSIONED_INITIAL_INTERVAL_MS:100}
multiplier: ${MCP_VERSIONED_MULTIPLIER:10}
maxIntervalMs: ${MCP_VERSIONED_MAX_INTERVAL_MS:30000}
# Timeseries MCL topic
timeseries:
# Whether to throttle MCP processing based on MCL backlog
enabled: ${MCP_TIMESERIES_THROTTLE_ENABLED:false}
threshold: ${MCP_TIMESERIES_THRESHOLD:4000} # throttle threshold
maxAttempts: ${MCP_TIMESERIES_MAX_ATTEMPTS:1000}
initialIntervalMs: ${MCP_TIMESERIES_INITIAL_INTERVAL_MS:100}
multiplier: ${MCP_TIMESERIES_MULTIPLIER:10}
maxIntervalMs: ${MCP_TIMESERIES_MAX_INTERVAL_MS:30000}

View File

@ -7,6 +7,7 @@ import com.linkedin.metadata.config.DataHubConfiguration;
import com.linkedin.metadata.config.EbeanConfiguration;
import com.linkedin.metadata.config.GraphQLConfiguration;
import com.linkedin.metadata.config.IngestionConfiguration;
import com.linkedin.metadata.config.MetadataChangeProposalConfig;
import com.linkedin.metadata.config.SystemUpdateConfiguration;
import com.linkedin.metadata.config.TestsConfiguration;
import com.linkedin.metadata.config.ViewsConfiguration;
@ -80,4 +81,7 @@ public class ConfigurationProvider {
/** GraphQL Configurations */
private GraphQLConfiguration graphQL;
/** MCP throttling configuration */
private MetadataChangeProposalConfig metadataChangeProposal;
}

View File

@ -44,7 +44,7 @@ public class DataHubKafkaProducerFactory {
Arrays.asList(kafkaConfiguration.getBootstrapServers().split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> props = properties.buildProducerProperties();
Map<String, Object> props = properties.buildProducerProperties(null);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer());

View File

@ -73,7 +73,7 @@ public class KafkaEventConsumerFactory {
Arrays.asList(kafkaConfiguration.getBootstrapServers().split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> customizedProperties = baseKafkaProperties.buildConsumerProperties();
Map<String, Object> customizedProperties = baseKafkaProperties.buildConsumerProperties(null);
customizedProperties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
customizedProperties.put(

View File

@ -44,7 +44,7 @@ public class SimpleKafkaConsumerFactory {
Arrays.asList(kafkaConfiguration.getBootstrapServers().split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> customizedProperties = properties.buildConsumerProperties();
Map<String, Object> customizedProperties = properties.buildConsumerProperties(null);
customizedProperties.put(
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
kafkaConfiguration.getConsumer().getMaxPartitionFetchBytes());

View File

@ -0,0 +1,93 @@
package com.linkedin.gms.factory.kafka.throttle;
import com.datahub.metadata.dao.producer.KafkaProducerThrottle;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.config.MetadataChangeProposalConfig;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.Topics;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
@Slf4j
@Configuration
public class KafkaProducerThrottleFactory {
@Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}")
private String maeConsumerGroupId;
@Value("${METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID:generic-mce-consumer-job-client}")
private String mceConsumerGroupId;
@Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}")
private String versionedTopicName;
@Value(
"${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES + "}")
private String timeseriesTopicName;
@Bean
public KafkaProducerThrottle kafkaProducerThrottle(
@Qualifier("configurationProvider") ConfigurationProvider provider,
final KafkaProperties kafkaProperties,
final EntityRegistry entityRegistry,
final KafkaListenerEndpointRegistry registry) {
KafkaConfiguration kafkaConfiguration = provider.getKafka();
MetadataChangeProposalConfig mcpConfig = provider.getMetadataChangeProposal();
return KafkaProducerThrottle.builder()
.entityRegistry(entityRegistry)
.kafkaAdmin(kafkaAdmin(kafkaConfiguration, kafkaProperties))
.config(mcpConfig.getThrottle())
.mclConsumerGroupId(maeConsumerGroupId)
.timeseriesTopicName(timeseriesTopicName)
.versionedTopicName(versionedTopicName)
.pauseConsumer(
(pause) -> {
Optional<MessageListenerContainer> container =
Optional.ofNullable(registry.getListenerContainer(mceConsumerGroupId));
if (container.isEmpty()) {
log.warn(
"Expected container was missing: {} throttling is not possible.",
mceConsumerGroupId);
} else {
if (pause) {
container.ifPresent(MessageListenerContainer::pause);
} else {
container.ifPresent(MessageListenerContainer::resume);
}
}
})
.build()
.start();
}
private static AdminClient kafkaAdmin(
KafkaConfiguration kafkaConfiguration, final KafkaProperties kafkaProperties) {
Map<String, Object> adminProperties = new HashMap<>(kafkaProperties.buildAdminProperties(null));
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaConfiguration.getBootstrapServers() != null
&& !kafkaConfiguration.getBootstrapServers().isEmpty()) {
adminProperties.put(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
Arrays.asList(kafkaConfiguration.getBootstrapServers().split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092 or environment variables
return KafkaAdminClient.create(adminProperties);
}
}

View File

@ -1,6 +1,7 @@
package io.datahubproject.openapi.test;
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.metadata.models.registry.EntityRegistry;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.ComponentScan;
@ -11,4 +12,6 @@ import org.springframework.test.context.TestPropertySource;
@ComponentScan(basePackages = {"com.linkedin.gms.factory.kafka", "com.linkedin.gms.factory.config"})
public class SchemaRegistryControllerTestConfiguration {
@MockBean KafkaHealthChecker kafkaHealthChecker;
@MockBean EntityRegistry entityRegistry;
}