mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-25 08:58:26 +00:00
fix(system-update): fixes system-update with more than 1 partition (#7302)
This commit is contained in:
parent
fc12fad74c
commit
b4b3a39829
@ -104,22 +104,22 @@ send() {
|
||||
}
|
||||
|
||||
## Produce the jobs to run.
|
||||
send "$METADATA_AUDIT_EVENT_NAME" "--topic $METADATA_AUDIT_EVENT_NAME"
|
||||
send "$METADATA_CHANGE_EVENT_NAME" "--topic $METADATA_CHANGE_EVENT_NAME"
|
||||
send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--topic $FAILED_METADATA_CHANGE_EVENT_NAME"
|
||||
send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME"
|
||||
send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME"
|
||||
send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME"
|
||||
send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME"
|
||||
send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME"
|
||||
|
||||
# Set retention to 90 days
|
||||
send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME"
|
||||
send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
|
||||
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
|
||||
send "$PLATFORM_EVENT_TOPIC_NAME" "--topic $PLATFORM_EVENT_TOPIC_NAME"
|
||||
send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME"
|
||||
send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
|
||||
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
|
||||
send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME"
|
||||
|
||||
# Infinite retention upgrade topic
|
||||
send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME"
|
||||
send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME"
|
||||
# Create topic for datahub usage event
|
||||
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
|
||||
send "$DATAHUB_USAGE_EVENT_NAME" "--topic $DATAHUB_USAGE_EVENT_NAME"
|
||||
send "$DATAHUB_USAGE_EVENT_NAME" "--partitions $PARTITIONS --topic $DATAHUB_USAGE_EVENT_NAME"
|
||||
fi
|
||||
|
||||
## close the filo
|
||||
|
||||
@ -13,7 +13,7 @@ job() {
|
||||
i=$1
|
||||
topic_args=$2
|
||||
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \
|
||||
--partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR \
|
||||
--replication-factor $REPLICATION_FACTOR \
|
||||
$topic_args
|
||||
}
|
||||
|
||||
|
||||
@ -13,7 +13,10 @@ import org.springframework.context.annotation.FilterType;
|
||||
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
|
||||
@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class, CassandraAutoConfiguration.class,
|
||||
SolrHealthContributorAutoConfiguration.class})
|
||||
@ComponentScan(excludeFilters = {
|
||||
@ComponentScan(basePackages = {
|
||||
"com.linkedin.metadata.boot.kafka",
|
||||
"com.linkedin.metadata.kafka"
|
||||
}, excludeFilters = {
|
||||
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ScheduledAnalyticsFactory.class)})
|
||||
public class MaeConsumerApplication {
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ package com.linkedin.metadata.kafka.boot;
|
||||
|
||||
import com.linkedin.gms.factory.config.ConfigurationProvider;
|
||||
import com.linkedin.metadata.boot.BootstrapManager;
|
||||
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
|
||||
import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
@ -23,7 +23,7 @@ import com.linkedin.metadata.config.ElasticSearchConfiguration;
|
||||
import com.linkedin.metadata.config.SystemUpdateConfiguration;
|
||||
import com.linkedin.metadata.graph.Edge;
|
||||
import com.linkedin.metadata.graph.GraphService;
|
||||
import com.linkedin.metadata.kafka.boot.DataHubUpgradeKafkaListener;
|
||||
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
|
||||
import com.linkedin.metadata.key.ChartKey;
|
||||
import com.linkedin.metadata.models.AspectSpec;
|
||||
import com.linkedin.metadata.models.EntitySpec;
|
||||
|
||||
@ -20,6 +20,7 @@ import org.springframework.context.annotation.PropertySource;
|
||||
SolrHealthContributorAutoConfiguration.class
|
||||
})
|
||||
@ComponentScan(basePackages = {
|
||||
"com.linkedin.metadata.boot.kafka",
|
||||
"com.linkedin.gms.factory.auth",
|
||||
"com.linkedin.gms.factory.common",
|
||||
"com.linkedin.gms.factory.config",
|
||||
|
||||
@ -2,6 +2,7 @@ package com.linkedin.metadata.kafka.boot;
|
||||
|
||||
import com.linkedin.gms.factory.config.ConfigurationProvider;
|
||||
import com.linkedin.metadata.boot.BootstrapManager;
|
||||
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
|
||||
import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
|
||||
@ -1,138 +0,0 @@
|
||||
package com.linkedin.metadata.kafka.boot;
|
||||
|
||||
import com.linkedin.gms.factory.config.ConfigurationProvider;
|
||||
import com.linkedin.metadata.EventUtils;
|
||||
import com.linkedin.metadata.boot.dependencies.BootstrapDependency;
|
||||
import com.linkedin.metadata.utils.metrics.MetricUtils;
|
||||
import com.linkedin.metadata.version.GitVersion;
|
||||
import com.linkedin.mxe.DataHubUpgradeHistoryEvent;
|
||||
import com.linkedin.mxe.Topics;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.listener.ConsumerSeekAware;
|
||||
import org.springframework.kafka.listener.MessageListenerContainer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
// We don't disable this on GMS since we want GMS to also wait until the system is ready to read in case of
|
||||
// backwards incompatible query logic dependent on system updates.
|
||||
@Component("dataHubUpgradeKafkaListener")
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
@EnableKafka
|
||||
public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, BootstrapDependency {
|
||||
@Autowired
|
||||
private KafkaListenerEndpointRegistry registry;
|
||||
|
||||
private static final String CONSUMER_GROUP = "${DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID:generic-duhe-consumer-job-client}";
|
||||
private static final String SUFFIX = "temp";
|
||||
private static final String TOPIC_NAME = "${DATAHUB_UPGRADE_HISTORY_TOPIC_NAME:" + Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME + "}";
|
||||
|
||||
private final DefaultKafkaConsumerFactory<String, GenericRecord> _defaultKafkaConsumerFactory;
|
||||
|
||||
@Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}")
|
||||
private String revision;
|
||||
private final GitVersion _gitVersion;
|
||||
private final ConfigurationProvider _configurationProvider;
|
||||
|
||||
@Value(CONSUMER_GROUP)
|
||||
private String consumerGroup;
|
||||
|
||||
@Value(TOPIC_NAME)
|
||||
private String topicName;
|
||||
|
||||
private AtomicBoolean isUpdated = new AtomicBoolean(false);
|
||||
|
||||
|
||||
// Constructs a consumer to read determine final offset to assign, prevents re-reading whole topic to get the latest version
|
||||
@Override
|
||||
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
|
||||
try (Consumer<String, GenericRecord> kafkaConsumer =
|
||||
_defaultKafkaConsumerFactory.createConsumer(consumerGroup, SUFFIX)) {
|
||||
final Map<TopicPartition, Long> offsetMap = kafkaConsumer.endOffsets(assignments.keySet());
|
||||
assignments.entrySet().stream()
|
||||
.filter(entry -> topicName.equals(entry.getKey().topic()))
|
||||
.forEach(entry ->
|
||||
callback.seek(entry.getKey().topic(), entry.getKey().partition(), offsetMap.get(entry.getKey()) - 1));
|
||||
}
|
||||
}
|
||||
|
||||
@KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "kafkaEventConsumer")
|
||||
public void checkSystemVersion(final ConsumerRecord<String, GenericRecord> consumerRecord) {
|
||||
final GenericRecord record = consumerRecord.value();
|
||||
final String expectedVersion = String.format("%s-%s", _gitVersion.getVersion(), revision);
|
||||
|
||||
DataHubUpgradeHistoryEvent event;
|
||||
try {
|
||||
event = EventUtils.avroToPegasusDUHE(record);
|
||||
log.info("Latest system update version: {}", event.getVersion());
|
||||
if (expectedVersion.equals(event.getVersion())) {
|
||||
isUpdated.getAndSet(true);
|
||||
} else {
|
||||
log.debug("System version is not up to date: {}", expectedVersion);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
MetricUtils.counter(this.getClass(), "avro_to_pegasus_conversion_failure").inc();
|
||||
log.error("Error deserializing message due to: ", e);
|
||||
log.error("Message: {}", record.toString());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
public void waitForUpdate() {
|
||||
int maxBackOffs = Integer.parseInt(_configurationProvider.getSystemUpdate().getMaxBackOffs());
|
||||
long initialBackOffMs = Long.parseLong(_configurationProvider.getSystemUpdate().getInitialBackOffMs());
|
||||
int backOffFactor = Integer.parseInt(_configurationProvider.getSystemUpdate().getBackOffFactor());
|
||||
|
||||
long backOffMs = initialBackOffMs;
|
||||
for (int i = 0; i < maxBackOffs; i++) {
|
||||
if (isUpdated.get()) {
|
||||
log.debug("Finished waiting for updated indices.");
|
||||
try {
|
||||
log.info("Containers: {}", registry.getListenerContainers().stream()
|
||||
.map(MessageListenerContainer::getListenerId)
|
||||
.collect(Collectors.toList()));
|
||||
registry.getListenerContainer(consumerGroup).stop();
|
||||
} catch (NullPointerException e) {
|
||||
log.error("Expected consumer `{}` to shutdown.", consumerGroup);
|
||||
}
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(backOffMs);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Thread interrupted while sleeping for exponential backoff: {}", e.getMessage());
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
backOffMs = backOffMs * backOffFactor;
|
||||
}
|
||||
|
||||
if (!isUpdated.get()) {
|
||||
|
||||
throw new IllegalStateException("Indices are not updated after exponential backoff."
|
||||
+ " Please try restarting and consider increasing back off settings.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean waitForBootstrap() {
|
||||
this.waitForUpdate();
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
package com.linkedin.metadata.kafka.boot;
|
||||
package com.linkedin.metadata.boot.kafka;
|
||||
|
||||
import com.linkedin.gms.factory.config.ConfigurationProvider;
|
||||
import com.linkedin.metadata.EventUtils;
|
||||
@ -18,7 +18,6 @@ import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
@ -35,8 +34,8 @@ import org.springframework.stereotype.Component;
|
||||
@Slf4j
|
||||
@EnableKafka
|
||||
public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, BootstrapDependency {
|
||||
@Autowired
|
||||
private KafkaListenerEndpointRegistry registry;
|
||||
|
||||
private final KafkaListenerEndpointRegistry registry;
|
||||
|
||||
private static final String CONSUMER_GROUP = "${DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID:generic-duhe-consumer-job-client}";
|
||||
private static final String SUFFIX = "temp";
|
||||
@ -55,7 +54,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap
|
||||
@Value(TOPIC_NAME)
|
||||
private String topicName;
|
||||
|
||||
private AtomicBoolean isUpdated = new AtomicBoolean(false);
|
||||
private final static AtomicBoolean IS_UPDATED = new AtomicBoolean(false);
|
||||
|
||||
|
||||
// Constructs a consumer to read determine final offset to assign, prevents re-reading whole topic to get the latest version
|
||||
@ -66,12 +65,15 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap
|
||||
final Map<TopicPartition, Long> offsetMap = kafkaConsumer.endOffsets(assignments.keySet());
|
||||
assignments.entrySet().stream()
|
||||
.filter(entry -> topicName.equals(entry.getKey().topic()))
|
||||
.forEach(entry ->
|
||||
callback.seek(entry.getKey().topic(), entry.getKey().partition(), offsetMap.get(entry.getKey()) - 1));
|
||||
.forEach(entry -> {
|
||||
log.info("Partition: {} Current Offset: {}", entry.getKey(), offsetMap.get(entry.getKey()));
|
||||
long newOffset = offsetMap.get(entry.getKey()) - 1;
|
||||
callback.seek(entry.getKey().topic(), entry.getKey().partition(), Math.max(0, newOffset));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "kafkaEventConsumer")
|
||||
@KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "kafkaEventConsumer", concurrency = "1")
|
||||
public void checkSystemVersion(final ConsumerRecord<String, GenericRecord> consumerRecord) {
|
||||
final GenericRecord record = consumerRecord.value();
|
||||
final String expectedVersion = String.format("%s-%s", _gitVersion.getVersion(), revision);
|
||||
@ -81,7 +83,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap
|
||||
event = EventUtils.avroToPegasusDUHE(record);
|
||||
log.info("Latest system update version: {}", event.getVersion());
|
||||
if (expectedVersion.equals(event.getVersion())) {
|
||||
isUpdated.getAndSet(true);
|
||||
IS_UPDATED.getAndSet(true);
|
||||
} else {
|
||||
log.debug("System version is not up to date: {}", expectedVersion);
|
||||
}
|
||||
@ -101,7 +103,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap
|
||||
|
||||
long backOffMs = initialBackOffMs;
|
||||
for (int i = 0; i < maxBackOffs; i++) {
|
||||
if (isUpdated.get()) {
|
||||
if (IS_UPDATED.get()) {
|
||||
log.debug("Finished waiting for updated indices.");
|
||||
try {
|
||||
log.info("Containers: {}", registry.getListenerContainers().stream()
|
||||
@ -123,7 +125,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap
|
||||
backOffMs = backOffMs * backOffFactor;
|
||||
}
|
||||
|
||||
if (!isUpdated.get()) {
|
||||
if (!IS_UPDATED.get()) {
|
||||
|
||||
throw new IllegalStateException("Indices are not updated after exponential backoff."
|
||||
+ " Please try restarting and consider increasing back off settings.");
|
||||
Loading…
x
Reference in New Issue
Block a user