diff --git a/docker/kafka-setup/kafka-setup.sh b/docker/kafka-setup/kafka-setup.sh index e7446b8545..030c291825 100644 --- a/docker/kafka-setup/kafka-setup.sh +++ b/docker/kafka-setup/kafka-setup.sh @@ -104,22 +104,22 @@ send() { } ## Produce the jobs to run. -send "$METADATA_AUDIT_EVENT_NAME" "--topic $METADATA_AUDIT_EVENT_NAME" -send "$METADATA_CHANGE_EVENT_NAME" "--topic $METADATA_CHANGE_EVENT_NAME" -send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--topic $FAILED_METADATA_CHANGE_EVENT_NAME" -send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" +send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME" +send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME" +send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME" +send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" # Set retention to 90 days -send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" -send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME" -send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" -send "$PLATFORM_EVENT_TOPIC_NAME" "--topic $PLATFORM_EVENT_TOPIC_NAME" +send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" +send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME" +send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" +send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME" # Infinite retention upgrade topic -send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" +send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" # Create topic for datahub usage event if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then - send "$DATAHUB_USAGE_EVENT_NAME" "--topic $DATAHUB_USAGE_EVENT_NAME" + send "$DATAHUB_USAGE_EVENT_NAME" "--partitions $PARTITIONS --topic $DATAHUB_USAGE_EVENT_NAME" fi ## close the filo diff --git a/docker/kafka-setup/kafka-topic-workers.sh b/docker/kafka-setup/kafka-topic-workers.sh index 14b2d73eff..fd0d45c3f4 100644 --- a/docker/kafka-setup/kafka-topic-workers.sh +++ b/docker/kafka-setup/kafka-topic-workers.sh @@ -13,7 +13,7 @@ job() { i=$1 topic_args=$2 kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \ - --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR \ + --replication-factor $REPLICATION_FACTOR \ $topic_args } diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java index cf04c5bfb6..922d6c4902 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java +++ b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java @@ -13,7 +13,10 @@ import org.springframework.context.annotation.FilterType; @SuppressWarnings("checkstyle:HideUtilityClassConstructor") @SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class, CassandraAutoConfiguration.class, SolrHealthContributorAutoConfiguration.class}) -@ComponentScan(excludeFilters = { +@ComponentScan(basePackages = { + "com.linkedin.metadata.boot.kafka", + "com.linkedin.metadata.kafka" +}, excludeFilters = { @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ScheduledAnalyticsFactory.class)}) public class MaeConsumerApplication { diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java index 110c552256..b8334cd7fa 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java @@ -2,6 +2,7 @@ package com.linkedin.metadata.kafka.boot; import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.metadata.boot.BootstrapManager; +import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener; import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition; import javax.annotation.Nonnull; diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java index 6fc580f8ff..d6cfb58b7b 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java @@ -23,7 +23,7 @@ import com.linkedin.metadata.config.ElasticSearchConfiguration; import com.linkedin.metadata.config.SystemUpdateConfiguration; import com.linkedin.metadata.graph.Edge; import com.linkedin.metadata.graph.GraphService; -import com.linkedin.metadata.kafka.boot.DataHubUpgradeKafkaListener; +import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener; import com.linkedin.metadata.key.ChartKey; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java index 161000b981..9b4fe15c11 100644 --- a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java @@ -20,6 +20,7 @@ import org.springframework.context.annotation.PropertySource; SolrHealthContributorAutoConfiguration.class }) @ComponentScan(basePackages = { + "com.linkedin.metadata.boot.kafka", "com.linkedin.gms.factory.auth", "com.linkedin.gms.factory.common", "com.linkedin.gms.factory.config", diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java index 4dd412f33f..1b69b1113b 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java @@ -2,6 +2,7 @@ package com.linkedin.metadata.kafka.boot; import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.metadata.boot.BootstrapManager; +import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener; import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/boot/DataHubUpgradeKafkaListener.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/boot/DataHubUpgradeKafkaListener.java deleted file mode 100644 index 5a3ed6aeab..0000000000 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/boot/DataHubUpgradeKafkaListener.java +++ /dev/null @@ -1,138 +0,0 @@ -package com.linkedin.metadata.kafka.boot; - -import com.linkedin.gms.factory.config.ConfigurationProvider; -import com.linkedin.metadata.EventUtils; -import com.linkedin.metadata.boot.dependencies.BootstrapDependency; -import com.linkedin.metadata.utils.metrics.MetricUtils; -import com.linkedin.metadata.version.GitVersion; -import com.linkedin.mxe.DataHubUpgradeHistoryEvent; -import com.linkedin.mxe.Topics; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.listener.ConsumerSeekAware; -import org.springframework.kafka.listener.MessageListenerContainer; -import org.springframework.stereotype.Component; - -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -// We don't disable this on GMS since we want GMS to also wait until the system is ready to read in case of -// backwards incompatible query logic dependent on system updates. -@Component("dataHubUpgradeKafkaListener") -@RequiredArgsConstructor -@Slf4j -@EnableKafka -public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, BootstrapDependency { - @Autowired - private KafkaListenerEndpointRegistry registry; - - private static final String CONSUMER_GROUP = "${DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID:generic-duhe-consumer-job-client}"; - private static final String SUFFIX = "temp"; - private static final String TOPIC_NAME = "${DATAHUB_UPGRADE_HISTORY_TOPIC_NAME:" + Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME + "}"; - - private final DefaultKafkaConsumerFactory _defaultKafkaConsumerFactory; - - @Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}") - private String revision; - private final GitVersion _gitVersion; - private final ConfigurationProvider _configurationProvider; - - @Value(CONSUMER_GROUP) - private String consumerGroup; - - @Value(TOPIC_NAME) - private String topicName; - - private AtomicBoolean isUpdated = new AtomicBoolean(false); - - - // Constructs a consumer to read determine final offset to assign, prevents re-reading whole topic to get the latest version - @Override - public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { - try (Consumer kafkaConsumer = - _defaultKafkaConsumerFactory.createConsumer(consumerGroup, SUFFIX)) { - final Map offsetMap = kafkaConsumer.endOffsets(assignments.keySet()); - assignments.entrySet().stream() - .filter(entry -> topicName.equals(entry.getKey().topic())) - .forEach(entry -> - callback.seek(entry.getKey().topic(), entry.getKey().partition(), offsetMap.get(entry.getKey()) - 1)); - } - } - - @KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "kafkaEventConsumer") - public void checkSystemVersion(final ConsumerRecord consumerRecord) { - final GenericRecord record = consumerRecord.value(); - final String expectedVersion = String.format("%s-%s", _gitVersion.getVersion(), revision); - - DataHubUpgradeHistoryEvent event; - try { - event = EventUtils.avroToPegasusDUHE(record); - log.info("Latest system update version: {}", event.getVersion()); - if (expectedVersion.equals(event.getVersion())) { - isUpdated.getAndSet(true); - } else { - log.debug("System version is not up to date: {}", expectedVersion); - } - - } catch (Exception e) { - MetricUtils.counter(this.getClass(), "avro_to_pegasus_conversion_failure").inc(); - log.error("Error deserializing message due to: ", e); - log.error("Message: {}", record.toString()); - return; - } - } - - public void waitForUpdate() { - int maxBackOffs = Integer.parseInt(_configurationProvider.getSystemUpdate().getMaxBackOffs()); - long initialBackOffMs = Long.parseLong(_configurationProvider.getSystemUpdate().getInitialBackOffMs()); - int backOffFactor = Integer.parseInt(_configurationProvider.getSystemUpdate().getBackOffFactor()); - - long backOffMs = initialBackOffMs; - for (int i = 0; i < maxBackOffs; i++) { - if (isUpdated.get()) { - log.debug("Finished waiting for updated indices."); - try { - log.info("Containers: {}", registry.getListenerContainers().stream() - .map(MessageListenerContainer::getListenerId) - .collect(Collectors.toList())); - registry.getListenerContainer(consumerGroup).stop(); - } catch (NullPointerException e) { - log.error("Expected consumer `{}` to shutdown.", consumerGroup); - } - return; - } - try { - Thread.sleep(backOffMs); - } catch (InterruptedException e) { - log.error("Thread interrupted while sleeping for exponential backoff: {}", e.getMessage()); - throw new RuntimeException(e); - } - - backOffMs = backOffMs * backOffFactor; - } - - if (!isUpdated.get()) { - - throw new IllegalStateException("Indices are not updated after exponential backoff." - + " Please try restarting and consider increasing back off settings."); - } - } - - @Override - public boolean waitForBootstrap() { - this.waitForUpdate(); - - return true; - } -} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/DataHubUpgradeKafkaListener.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java similarity index 89% rename from metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/DataHubUpgradeKafkaListener.java rename to metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java index 4aedee7a23..37035ac38b 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/DataHubUpgradeKafkaListener.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java @@ -1,4 +1,4 @@ -package com.linkedin.metadata.kafka.boot; +package com.linkedin.metadata.boot.kafka; import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.metadata.EventUtils; @@ -18,7 +18,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; @@ -35,8 +34,8 @@ import org.springframework.stereotype.Component; @Slf4j @EnableKafka public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, BootstrapDependency { - @Autowired - private KafkaListenerEndpointRegistry registry; + + private final KafkaListenerEndpointRegistry registry; private static final String CONSUMER_GROUP = "${DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID:generic-duhe-consumer-job-client}"; private static final String SUFFIX = "temp"; @@ -55,7 +54,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap @Value(TOPIC_NAME) private String topicName; - private AtomicBoolean isUpdated = new AtomicBoolean(false); + private final static AtomicBoolean IS_UPDATED = new AtomicBoolean(false); // Constructs a consumer to read determine final offset to assign, prevents re-reading whole topic to get the latest version @@ -66,12 +65,15 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap final Map offsetMap = kafkaConsumer.endOffsets(assignments.keySet()); assignments.entrySet().stream() .filter(entry -> topicName.equals(entry.getKey().topic())) - .forEach(entry -> - callback.seek(entry.getKey().topic(), entry.getKey().partition(), offsetMap.get(entry.getKey()) - 1)); + .forEach(entry -> { + log.info("Partition: {} Current Offset: {}", entry.getKey(), offsetMap.get(entry.getKey())); + long newOffset = offsetMap.get(entry.getKey()) - 1; + callback.seek(entry.getKey().topic(), entry.getKey().partition(), Math.max(0, newOffset)); + }); } } - @KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "kafkaEventConsumer") + @KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "kafkaEventConsumer", concurrency = "1") public void checkSystemVersion(final ConsumerRecord consumerRecord) { final GenericRecord record = consumerRecord.value(); final String expectedVersion = String.format("%s-%s", _gitVersion.getVersion(), revision); @@ -81,7 +83,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap event = EventUtils.avroToPegasusDUHE(record); log.info("Latest system update version: {}", event.getVersion()); if (expectedVersion.equals(event.getVersion())) { - isUpdated.getAndSet(true); + IS_UPDATED.getAndSet(true); } else { log.debug("System version is not up to date: {}", expectedVersion); } @@ -101,7 +103,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap long backOffMs = initialBackOffMs; for (int i = 0; i < maxBackOffs; i++) { - if (isUpdated.get()) { + if (IS_UPDATED.get()) { log.debug("Finished waiting for updated indices."); try { log.info("Containers: {}", registry.getListenerContainers().stream() @@ -123,7 +125,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap backOffMs = backOffMs * backOffFactor; } - if (!isUpdated.get()) { + if (!IS_UPDATED.get()) { throw new IllegalStateException("Indices are not updated after exponential backoff." + " Please try restarting and consider increasing back off settings.");