diff --git a/.github/workflows/test-results.yml b/.github/workflows/test-results.yml index 5c658c121a..8053a0a661 100644 --- a/.github/workflows/test-results.yml +++ b/.github/workflows/test-results.yml @@ -50,3 +50,4 @@ jobs: event_name: ${{ github.event.workflow_run.event }} files: "artifacts/**/*.xml" comment_mode: off + large_files: true diff --git a/build.gradle b/build.gradle index 4359b0e312..3b2aeaeb0c 100644 --- a/build.gradle +++ b/build.gradle @@ -40,7 +40,7 @@ buildscript { ext.versionGradle = '8.14.3' ext.springVersion = '6.2.10' ext.springBootVersion = '3.4.5' - ext.springKafkaVersion = '3.3.6' + ext.springKafkaVersion = '3.3.8' ext.openTelemetryVersion = '1.49.0' ext.neo4jVersion = '5.20.0' ext.neo4jApocVersion = '5.20.0' diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BuildIndicesConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BuildIndicesConfig.java index b678ba92c5..315adb7e84 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BuildIndicesConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BuildIndicesConfig.java @@ -17,7 +17,7 @@ import org.springframework.core.annotation.Order; @Configuration @Conditional(SystemUpdateCondition.BlockingSystemUpdateCondition.class) public class BuildIndicesConfig { - @Order(1) + @Order(2) @Bean(name = "buildIndices") public BlockingSystemUpgrade buildIndices( final SystemMetadataService systemMetadataService, diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/KafkaSetupConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/KafkaSetupConfig.java new file mode 100644 index 0000000000..49b02de2cb --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/KafkaSetupConfig.java @@ -0,0 +1,26 @@ +package com.linkedin.datahub.upgrade.config; + +import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade; +import com.linkedin.datahub.upgrade.system.kafka.KafkaSetup; +import com.linkedin.gms.factory.config.ConfigurationProvider; +import io.datahubproject.metadata.context.OperationContext; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; + +@Configuration +@Conditional(SystemUpdateCondition.BlockingSystemUpdateCondition.class) +public class KafkaSetupConfig { + + @Autowired private OperationContext opContext; + + @Order(1) // This ensures it runs before BuildIndices (@Order(2)) + @Bean(name = "kafkaSetup") + public BlockingSystemUpgrade kafkaSetup( + final ConfigurationProvider configurationProvider, KafkaProperties properties) { + return new KafkaSetup(opContext, configurationProvider.getKafka(), properties); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NonBlockingConfigs.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NonBlockingConfigs.java index 4dcfd0d07f..ab8b7fddd7 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NonBlockingConfigs.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NonBlockingConfigs.java @@ -6,6 +6,7 @@ import com.linkedin.datahub.upgrade.system.browsepaths.BackfillIcebergBrowsePath import com.linkedin.datahub.upgrade.system.dataprocessinstances.BackfillDataProcessInstances; import com.linkedin.datahub.upgrade.system.entities.RemoveQueryEdges; import com.linkedin.datahub.upgrade.system.ingestion.BackfillIngestionSourceInfoIndices; +import com.linkedin.datahub.upgrade.system.kafka.KafkaNonBlockingSetup; import com.linkedin.datahub.upgrade.system.policyfields.BackfillPolicyFields; import com.linkedin.datahub.upgrade.system.schemafield.GenerateSchemaFieldsFromSchemaMetadata; import com.linkedin.datahub.upgrade.system.schemafield.MigrateSchemaFieldDocIds; @@ -19,8 +20,10 @@ import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO; import io.datahubproject.metadata.context.OperationContext; import org.opensearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; @@ -154,4 +157,12 @@ public class NonBlockingConfigs { return new MigrateSchemaFieldDocIds( opContext, components, entityService, enabled && hashEnabled, batchSize, delayMs, limit); } + + @Autowired private OperationContext opContext; + + @Bean + public NonBlockingSystemUpgrade kafkaSetupNonBlocking( + final ConfigurationProvider configurationProvider, KafkaProperties properties) { + return new KafkaNonBlockingSetup(opContext, configurationProvider.getKafka(), properties); + } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/ConfluentSchemaRegistryCleanupPolicyStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/ConfluentSchemaRegistryCleanupPolicyStep.java new file mode 100644 index 0000000000..1d4e969d4a --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/ConfluentSchemaRegistryCleanupPolicyStep.java @@ -0,0 +1,108 @@ +package com.linkedin.datahub.upgrade.system.kafka; + +import static com.linkedin.gms.factory.kafka.common.AdminClientFactory.buildKafkaAdminClient; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import com.linkedin.upgrade.DataHubUpgradeState; +import io.datahubproject.metadata.context.OperationContext; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.config.ConfigResource; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; + +@Slf4j +public class ConfluentSchemaRegistryCleanupPolicyStep implements UpgradeStep { + + private final OperationContext _opContext; + private final KafkaConfiguration _kafkaConfiguration; + private final KafkaProperties _kafkaProperties; + + public ConfluentSchemaRegistryCleanupPolicyStep( + OperationContext opContext, + KafkaConfiguration kafkaConfiguration, + KafkaProperties kafkaProperties) { + this._opContext = opContext; + this._kafkaConfiguration = kafkaConfiguration; + this._kafkaProperties = kafkaProperties; + } + + @Override + public String id() { + return "ConfluentSchemaRegistryCleanupPolicyStep"; + } + + @Override + public Function executable() { + return (context) -> { + log.info("Configuring Confluent Schema Registry cleanup policies..."); + + // Check if Confluent Schema Registry is enabled + if (_kafkaConfiguration.getSetup() == null + || !_kafkaConfiguration.getSetup().isUseConfluentSchemaRegistry()) { + log.info( + "Skipping Confluent Schema Registry cleanup policy configuration - schema registry is disabled"); + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED); + } + + try { + // Create AdminClient using AdminClientFactory + AdminClient adminClient = createAdminClient(); + + // Configure cleanup policy for _schemas topic (equivalent to kafka-configs.sh) + String schemasTopic = "_schemas"; + log.info("Configuring cleanup policy for schema registry topic: {}", schemasTopic); + + // Create config resource for the _schemas topic + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, schemasTopic); + + // Create the cleanup policy configuration + ConfigEntry cleanupPolicyEntry = new ConfigEntry("cleanup.policy", "compact"); + + // Create alter config operation + AlterConfigOp alterConfigOp = + new AlterConfigOp(cleanupPolicyEntry, AlterConfigOp.OpType.SET); + Map> configs = new HashMap<>(); + configs.put(configResource, Collections.singletonList(alterConfigOp)); + + // Apply the configuration change + log.info( + "Applying cleanup policy configuration: topic={}, cleanup.policy=compact", + schemasTopic); + adminClient.incrementalAlterConfigs(configs).all().get(); + + log.info( + "Successfully configured cleanup policy for schema registry topic: {}", schemasTopic); + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED); + + } catch (Exception e) { + log.error( + "Failed to configure Confluent Schema Registry cleanup policies: {}", + e.getMessage(), + e); + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.FAILED); + } + }; + } + + /** + * Creates an AdminClient instance using the AdminClientFactory. This method is extracted to allow + * for mocking in unit tests. + * + * @return AdminClient instance configured with the current Kafka configuration + */ + protected AdminClient createAdminClient() { + return buildKafkaAdminClient( + _kafkaConfiguration, _kafkaProperties, "datahub-upgrade-kafka-setup"); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/CreateKafkaTopicsStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/CreateKafkaTopicsStep.java new file mode 100644 index 0000000000..69ef4f76aa --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/CreateKafkaTopicsStep.java @@ -0,0 +1,194 @@ +package com.linkedin.datahub.upgrade.system.kafka; + +import static com.linkedin.gms.factory.kafka.common.AdminClientFactory.buildKafkaAdminClient; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import com.linkedin.metadata.config.kafka.TopicsConfiguration; +import com.linkedin.upgrade.DataHubUpgradeState; +import io.datahubproject.metadata.context.OperationContext; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.kafka.config.TopicBuilder; + +@Slf4j +public class CreateKafkaTopicsStep implements UpgradeStep { + + private final OperationContext _opContext; + private final KafkaConfiguration _kafkaConfiguration; + private final KafkaProperties _kafkaProperties; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public CreateKafkaTopicsStep( + OperationContext opContext, + KafkaConfiguration kafkaConfiguration, + KafkaProperties kafkaProperties) { + this._opContext = opContext; + this._kafkaConfiguration = kafkaConfiguration; + this._kafkaProperties = kafkaProperties; + } + + @Override + public String id() { + return "CreateKafkaTopicsStep"; + } + + @Override + public Function executable() { + return (context) -> { + if (_kafkaConfiguration.getSetup() == null) { + log.warn("Setup configuration is null - skipping topic creation"); + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED); + } + + if (!_kafkaConfiguration.getSetup().isPreCreateTopics()) { + log.info("Skipping Kafka topic creation as preCreateTopics is false"); + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED); + } + + log.info("Creating Kafka topics..."); + + try { + // Debug logging to understand configuration state + log.info( + "KafkaConfiguration setup: {}", + OBJECT_MAPPER + .writerWithDefaultPrettyPrinter() + .writeValueAsString(_kafkaConfiguration.getSetup())); + log.info( + "KafkaConfiguration topics: {}", + OBJECT_MAPPER + .writerWithDefaultPrettyPrinter() + .writeValueAsString(_kafkaConfiguration.getTopics())); + log.info( + "KafkaConfiguration topicDefaults: {}", + OBJECT_MAPPER + .writerWithDefaultPrettyPrinter() + .writeValueAsString(_kafkaConfiguration.getTopicDefaults())); + + // Create AdminClient using AdminClientFactory + AdminClient adminClient = createAdminClient(); + + // Get topic configurations + TopicsConfiguration topicsConfig = _kafkaConfiguration.getTopics(); + if (topicsConfig == null) { + log.warn("Topics configuration is null - skipping topic creation"); + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED); + } + + if (topicsConfig.getTopics() == null || topicsConfig.getTopics().isEmpty()) { + log.warn("No topics configured for creation"); + // We dont really support a scenario of no topics to create with preCreateTopics set to + // true + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.FAILED); + } + + // Get existing topics to implement if-not-exists functionality + Set existingTopics = getExistingTopics(adminClient); + log.info("Found {} existing topics: {}", existingTopics.size(), existingTopics); + + // Collect all topics to create (only those that don't exist) + List topicsToCreate = new ArrayList<>(); + List topicsToSkip = new ArrayList<>(); + + for (Map.Entry entry : + topicsConfig.getTopics().entrySet()) { + String topicKey = entry.getKey(); + TopicsConfiguration.TopicConfiguration topicConfig = entry.getValue(); + + // Skip if topic should not be created + if (!topicConfig.getEnabled()) { + log.debug("Skipping topic {} - create flag is false", topicKey); + continue; + } + + String topicName = topicConfig.getName(); + + // Check if topic already exists + if (existingTopics.contains(topicName)) { + log.info("Topic {} already exists - skipping creation", topicName); + topicsToSkip.add(topicName); + continue; + } + + // Use Spring's TopicBuilder to create the topic + TopicBuilder topicBuilder = + TopicBuilder.name(topicName) + .partitions(topicConfig.getPartitions()) + .replicas(topicConfig.getReplicationFactor()); + + // Set topic-specific configurations if provided + if (topicConfig.getConfigProperties() != null + && !topicConfig.getConfigProperties().isEmpty()) { + topicConfig.getConfigProperties().forEach(topicBuilder::config); + } + + topicsToCreate.add(topicBuilder.build()); + log.info( + "Preparing to create topic: {} with {} partitions and replication factor {}", + topicName, + topicConfig.getPartitions(), + topicConfig.getReplicationFactor()); + } + + // Log summary of what will be created vs skipped + if (!topicsToSkip.isEmpty()) { + log.info("Skipping {} existing topics: {}", topicsToSkip.size(), topicsToSkip); + } + + if (topicsToCreate.isEmpty()) { + log.info("All configured topics already exist - nothing to create"); + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED); + } + + // Create all topics in bulk + log.info("Creating {} new topics in bulk", topicsToCreate.size()); + CreateTopicsResult result = adminClient.createTopics(topicsToCreate); + result.all().get(); // Wait for all topics to be created + + log.info("Successfully created {} Kafka topics", topicsToCreate.size()); + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED); + + } catch (Exception e) { + log.error("Failed to create Kafka topics: {}", e.getMessage(), e); + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.FAILED); + } + }; + } + + /** Get the set of existing topic names from Kafka */ + private Set getExistingTopics(AdminClient adminClient) throws Exception { + try { + ListTopicsResult listTopicsResult = adminClient.listTopics(); + return listTopicsResult.names().get(); + } catch (Exception e) { + log.error("Failed to list existing topics: {}", e.getMessage(), e); + throw e; + } + } + + /** + * Creates an AdminClient instance using the AdminClientFactory. This method is extracted to allow + * for mocking in unit tests. + * + * @return AdminClient instance configured with the current Kafka configuration + */ + protected AdminClient createAdminClient() { + return buildKafkaAdminClient( + _kafkaConfiguration, _kafkaProperties, "datahub-upgrade-kafka-setup"); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/KafkaNonBlockingSetup.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/KafkaNonBlockingSetup.java new file mode 100644 index 0000000000..f0653f28ad --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/KafkaNonBlockingSetup.java @@ -0,0 +1,36 @@ +package com.linkedin.datahub.upgrade.system.kafka; + +import com.google.common.collect.ImmutableList; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; + +@Slf4j +public class KafkaNonBlockingSetup implements NonBlockingSystemUpgrade { + + private final List _steps; + + public KafkaNonBlockingSetup( + OperationContext opContext, + KafkaConfiguration kafkaConfiguration, + KafkaProperties kafkaProperties) { + _steps = + ImmutableList.of( + new ConfluentSchemaRegistryCleanupPolicyStep( + opContext, kafkaConfiguration, kafkaProperties)); + } + + @Override + public String id() { + return "KafkaNonBlockingSetup"; + } + + @Override + public List steps() { + return _steps; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/KafkaSetup.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/KafkaSetup.java new file mode 100644 index 0000000000..fba84f98e1 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/KafkaSetup.java @@ -0,0 +1,36 @@ +package com.linkedin.datahub.upgrade.system.kafka; + +import com.google.common.collect.ImmutableList; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; + +@Slf4j +public class KafkaSetup implements BlockingSystemUpgrade { + + private final List _steps; + + public KafkaSetup( + OperationContext opContext, + KafkaConfiguration kafkaConfiguration, + KafkaProperties kafkaProperties) { + _steps = + ImmutableList.of( + new WaitForKafkaReadyStep(opContext, kafkaConfiguration, kafkaProperties), + new CreateKafkaTopicsStep(opContext, kafkaConfiguration, kafkaProperties)); + } + + @Override + public String id() { + return "KafkaSetup"; + } + + @Override + public List steps() { + return _steps; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/WaitForKafkaReadyStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/WaitForKafkaReadyStep.java new file mode 100644 index 0000000000..62611b9f8a --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/kafka/WaitForKafkaReadyStep.java @@ -0,0 +1,91 @@ +package com.linkedin.datahub.upgrade.system.kafka; + +import static com.linkedin.gms.factory.kafka.common.AdminClientFactory.buildKafkaAdminClient; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import com.linkedin.upgrade.DataHubUpgradeState; +import io.datahubproject.metadata.context.OperationContext; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; + +@Slf4j +public class WaitForKafkaReadyStep implements UpgradeStep { + + // Constants for timeout and retry configuration + private static final int RETRY_COUNT = 60; + private static final long RETRY_INTERVAL_SECONDS = 5; + private static final long REQUEST_TIMEOUT_MS = 5000; + + private final OperationContext _opContext; + private final KafkaConfiguration _kafkaConfiguration; + private final KafkaProperties _kafkaProperties; + + public WaitForKafkaReadyStep( + OperationContext opContext, + KafkaConfiguration kafkaConfiguration, + KafkaProperties kafkaProperties) { + this._opContext = opContext; + this._kafkaConfiguration = kafkaConfiguration; + this._kafkaProperties = kafkaProperties; + } + + @Override + public String id() { + return "WaitForKafkaReadyStep"; + } + + @Override + public Function executable() { + return (context) -> { + log.info("Waiting for Kafka to be ready..."); + + try { + // Create AdminClient using the extracted method + AdminClient adminClient = createAdminClient(); + + // Get cluster information to check all brokers are reachable + var clusterResult = adminClient.describeCluster(); + var clusterId = clusterResult.clusterId().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + var nodes = clusterResult.nodes().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + // Verify we have brokers available + if (nodes.isEmpty()) { + throw new RuntimeException("No brokers found in Kafka cluster"); + } + + log.debug("Found {} brokers in Kafka cluster", nodes.size()); + + log.info("Kafka cluster is ready - {} brokers are reachable and healthy", nodes.size()); + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED); + + } catch (Exception e) { + // Retries is done by UpgradeManager based on retryCount + try { + Thread.sleep(RETRY_INTERVAL_SECONDS * 1000); + } catch (InterruptedException ie) { + log.error("Interrupted while waiting for Kafka to be ready", ie); + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.FAILED); + } + + return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.FAILED); + } + }; + } + + protected AdminClient createAdminClient() { + return buildKafkaAdminClient( + _kafkaConfiguration, _kafkaProperties, "datahub-upgrade-kafka-setup"); + } + + @Override + public int retryCount() { + return RETRY_COUNT; + } +} diff --git a/datahub-upgrade/src/main/resources/application.properties b/datahub-upgrade/src/main/resources/application.properties index 847c264dfa..c78b189172 100644 --- a/datahub-upgrade/src/main/resources/application.properties +++ b/datahub-upgrade/src/main/resources/application.properties @@ -3,4 +3,4 @@ management.health.neo4j.enabled=false ingestion.enabled=false spring.main.allow-bean-definition-overriding=true entityClient.impl=restli -metadataChangeProposal.throttle.updateIntervalMs=0 \ No newline at end of file +metadataChangeProposal.throttle.updateIntervalMs=0 diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeBlockingTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeBlockingTest.java index 2e6ece1341..e4da965aac 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeBlockingTest.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeBlockingTest.java @@ -8,6 +8,7 @@ import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade; import com.linkedin.datahub.upgrade.system.SystemUpdateBlocking; import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCPStep; import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices; +import com.linkedin.datahub.upgrade.system.kafka.KafkaSetup; import java.util.List; import java.util.stream.Collectors; import javax.inject.Named; @@ -32,7 +33,13 @@ public class DatahubUpgradeBlockingTest extends AbstractTestNGSpringContextTests @Test public void testBuildIndicesOrder() { assertNotNull(blockingSystemUpgrades); - assertTrue(blockingSystemUpgrades.get(0) instanceof BuildIndices); + assertTrue(blockingSystemUpgrades.get(1) instanceof BuildIndices); + } + + @Test + public void testKafkaSetupOrder() { + assertNotNull(blockingSystemUpgrades); + assertTrue(blockingSystemUpgrades.get(0) instanceof KafkaSetup); } @Test diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/ConfluentSchemaRegistryCleanupPolicyStepTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/ConfluentSchemaRegistryCleanupPolicyStepTest.java new file mode 100644 index 0000000000..59209924b1 --- /dev/null +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/ConfluentSchemaRegistryCleanupPolicyStepTest.java @@ -0,0 +1,164 @@ +package com.linkedin.datahub.upgrade.system.kafka; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import com.linkedin.metadata.config.kafka.SetupConfiguration; +import com.linkedin.upgrade.DataHubUpgradeState; +import io.datahubproject.metadata.context.OperationContext; +import java.util.Arrays; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigsResult; +import org.apache.kafka.common.KafkaFuture; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class ConfluentSchemaRegistryCleanupPolicyStepTest { + + private ConfluentSchemaRegistryCleanupPolicyStep step; + private OperationContext mockOpContext; + private KafkaConfiguration kafkaConfiguration; + private KafkaProperties kafkaProperties; + + @BeforeMethod + public void setUp() { + mockOpContext = mock(OperationContext.class); + + // Create real KafkaConfiguration with test data + kafkaConfiguration = new KafkaConfiguration(); + kafkaConfiguration.setBootstrapServers("localhost:9092"); + + SetupConfiguration setupConfig = new SetupConfiguration(); + setupConfig.setPreCreateTopics(true); + setupConfig.setUseConfluentSchemaRegistry(true); + kafkaConfiguration.setSetup(setupConfig); + + // Create real KafkaProperties with test data + kafkaProperties = new KafkaProperties(); + kafkaProperties.setBootstrapServers(Arrays.asList("localhost:9092")); + + step = + new ConfluentSchemaRegistryCleanupPolicyStep( + mockOpContext, kafkaConfiguration, kafkaProperties); + } + + @Test + public void testId() { + assertEquals(step.id(), "ConfluentSchemaRegistryCleanupPolicyStep"); + } + + @Test + public void testSkipWhenSchemaRegistryDisabled() { + // Set schema registry to disabled + kafkaConfiguration.getSetup().setUseConfluentSchemaRegistry(false); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = step.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + } + + @Test + public void testSkipWhenSetupConfigIsNull() { + // Set setup configuration as null + kafkaConfiguration.setSetup(null); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = step.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + } + + @Test + public void testSuccessfulCleanupPolicyConfiguration() throws Exception { + // Create spy to mock the createAdminClient method + ConfluentSchemaRegistryCleanupPolicyStep spyStep = spy(step); + + // Mock AdminClient and its dependencies + AdminClient mockAdminClient = mock(AdminClient.class); + AlterConfigsResult mockAlterConfigsResult = mock(AlterConfigsResult.class); + + // Mock successful alter configs result + KafkaFuture alterConfigsFuture = mock(KafkaFuture.class); + when(alterConfigsFuture.get()).thenReturn(null); // Success + when(mockAlterConfigsResult.all()).thenReturn(alterConfigsFuture); + when(mockAdminClient.incrementalAlterConfigs(any())).thenReturn(mockAlterConfigsResult); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + + // Verify that incrementalAlterConfigs was called + verify(mockAdminClient, times(1)).incrementalAlterConfigs(any()); + } + + @Test + public void testAlterConfigsFailure() throws Exception { + // Create spy to mock the createAdminClient method + ConfluentSchemaRegistryCleanupPolicyStep spyStep = spy(step); + + // Mock AdminClient to throw exception when altering configs + AdminClient mockAdminClient = mock(AdminClient.class); + when(mockAdminClient.incrementalAlterConfigs(any())) + .thenThrow(new RuntimeException("Failed to alter configs")); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.FAILED); + } + + @Test + public void testAdminClientException() throws Exception { + // Create spy to mock the createAdminClient method + ConfluentSchemaRegistryCleanupPolicyStep spyStep = spy(step); + + // Mock AdminClient to throw exception when calling incrementalAlterConfigs + AdminClient mockAdminClient = mock(AdminClient.class); + AlterConfigsResult mockAlterConfigsResult = mock(AlterConfigsResult.class); + + // Mock alter configs result that throws exception + KafkaFuture alterConfigsFuture = mock(KafkaFuture.class); + when(alterConfigsFuture.get()).thenThrow(new RuntimeException("AdminClient operation failed")); + when(mockAlterConfigsResult.all()).thenReturn(alterConfigsFuture); + when(mockAdminClient.incrementalAlterConfigs(any())).thenReturn(mockAlterConfigsResult); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.FAILED); + } + + @Test + public void testCreateAdminClientMethod() { + // Test that the createAdminClient method can be called + AdminClient adminClient = step.createAdminClient(); + assertNotNull(adminClient); + } +} diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/CreateKafkaTopicsStepTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/CreateKafkaTopicsStepTest.java new file mode 100644 index 0000000000..678f639991 --- /dev/null +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/CreateKafkaTopicsStepTest.java @@ -0,0 +1,371 @@ +package com.linkedin.datahub.upgrade.system.kafka; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import com.linkedin.metadata.config.kafka.SetupConfiguration; +import com.linkedin.metadata.config.kafka.TopicsConfiguration; +import com.linkedin.upgrade.DataHubUpgradeState; +import io.datahubproject.metadata.context.OperationContext; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.common.KafkaFuture; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class CreateKafkaTopicsStepTest { + + private CreateKafkaTopicsStep step; + private OperationContext mockOpContext; + private KafkaConfiguration kafkaConfiguration; + private KafkaProperties kafkaProperties; + + @BeforeMethod + public void setUp() { + mockOpContext = mock(OperationContext.class); + + // Create real KafkaConfiguration with test data + kafkaConfiguration = new KafkaConfiguration(); + kafkaConfiguration.setBootstrapServers("localhost:9092"); + + SetupConfiguration setupConfig = new SetupConfiguration(); + setupConfig.setPreCreateTopics(true); + setupConfig.setUseConfluentSchemaRegistry(true); + kafkaConfiguration.setSetup(setupConfig); + + // Create real KafkaProperties with test data + kafkaProperties = new KafkaProperties(); + kafkaProperties.setBootstrapServers(Arrays.asList("localhost:9092")); + + step = new CreateKafkaTopicsStep(mockOpContext, kafkaConfiguration, kafkaProperties); + } + + @Test + public void testId() { + assertEquals(step.id(), "CreateKafkaTopicsStep"); + } + + @Test + public void testSkipTopicCreationWhenDisabled() { + // Set preCreateTopics to false + kafkaConfiguration.getSetup().setPreCreateTopics(false); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = step.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + } + + @Test + public void testSkipTopicCreationWhenSetupConfigIsNull() { + // Set setup configuration as null + kafkaConfiguration.setSetup(null); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = step.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + } + + @Test + public void testSkipTopicCreationWhenTopicsConfigIsNull() { + // Set topics configuration as null + kafkaConfiguration.setTopics(null); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = step.executable().apply(mockContext); + + assertNotNull(result); + // When topics is null, getTopics() still returns a TopicsConfiguration with empty topics map + // which causes the step to fail because no topics are configured + assertEquals(result.result(), DataHubUpgradeState.FAILED); + } + + @Test + public void testFailWhenNoTopicsConfigured() { + // Set empty topics map directly on KafkaConfiguration + kafkaConfiguration.setTopics(Collections.emptyMap()); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = step.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.FAILED); + } + + @Test + public void testSuccessfulTopicCreation() throws Exception { + // Create spy to mock the createAdminClient method + CreateKafkaTopicsStep spyStep = spy(step); + + // Mock AdminClient and its dependencies + AdminClient mockAdminClient = mock(AdminClient.class); + ListTopicsResult mockListTopicsResult = mock(ListTopicsResult.class); + CreateTopicsResult mockCreateTopicsResult = mock(CreateTopicsResult.class); + + // Mock existing topics (empty set - no existing topics) + KafkaFuture> listTopicsFuture = mock(KafkaFuture.class); + when(listTopicsFuture.get()).thenReturn(new HashSet<>()); + when(mockListTopicsResult.names()).thenReturn(listTopicsFuture); + when(mockAdminClient.listTopics()).thenReturn(mockListTopicsResult); + + // Mock topic creation result + KafkaFuture createTopicsFuture = mock(KafkaFuture.class); + when(createTopicsFuture.get()).thenReturn(null); // Success + when(mockCreateTopicsResult.all()).thenReturn(createTopicsFuture); + when(mockAdminClient.createTopics(any())).thenReturn(mockCreateTopicsResult); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + // Set up topics configuration with test topics + Map topics = new HashMap<>(); + TopicsConfiguration.TopicConfiguration topicConfig = + new TopicsConfiguration.TopicConfiguration(); + topicConfig.setName("test-topic"); + topicConfig.setPartitions(3); + topicConfig.setReplicationFactor(1); + topicConfig.setEnabled(true); + topics.put("testTopic", topicConfig); + kafkaConfiguration.setTopics(topics); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + + // Verify that createTopics was called + verify(mockAdminClient, times(1)).createTopics(any()); + } + + @Test + public void testSkipExistingTopics() throws Exception { + // Create spy to mock the createAdminClient method + CreateKafkaTopicsStep spyStep = spy(step); + + // Mock AdminClient and its dependencies + AdminClient mockAdminClient = mock(AdminClient.class); + ListTopicsResult mockListTopicsResult = mock(ListTopicsResult.class); + + // Mock existing topics (topic already exists) + Set existingTopics = new HashSet<>(); + existingTopics.add("test-topic"); + KafkaFuture> listTopicsFuture = mock(KafkaFuture.class); + when(listTopicsFuture.get()).thenReturn(existingTopics); + when(mockListTopicsResult.names()).thenReturn(listTopicsFuture); + when(mockAdminClient.listTopics()).thenReturn(mockListTopicsResult); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + // Set up topics configuration with test topics + Map topics = new HashMap<>(); + TopicsConfiguration.TopicConfiguration topicConfig = + new TopicsConfiguration.TopicConfiguration(); + topicConfig.setName("test-topic"); + topicConfig.setPartitions(3); + topicConfig.setReplicationFactor(1); + topicConfig.setEnabled(true); + topics.put("testTopic", topicConfig); + kafkaConfiguration.setTopics(topics); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + + // Verify that createTopics was NOT called since topic already exists + verify(mockAdminClient, times(0)).createTopics(any()); + } + + @Test + public void testSkipTopicWhenCreateFlagIsFalse() throws Exception { + // Create spy to mock the createAdminClient method + CreateKafkaTopicsStep spyStep = spy(step); + + // Mock AdminClient and its dependencies + AdminClient mockAdminClient = mock(AdminClient.class); + ListTopicsResult mockListTopicsResult = mock(ListTopicsResult.class); + + // Mock existing topics (empty set) + KafkaFuture> listTopicsFuture = mock(KafkaFuture.class); + when(listTopicsFuture.get()).thenReturn(new HashSet<>()); + when(mockListTopicsResult.names()).thenReturn(listTopicsFuture); + when(mockAdminClient.listTopics()).thenReturn(mockListTopicsResult); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + // Set up topics configuration with create flag set to false + Map topics = new HashMap<>(); + TopicsConfiguration.TopicConfiguration topicConfig = + new TopicsConfiguration.TopicConfiguration(); + topicConfig.setName("test-topic"); + topicConfig.setPartitions(3); + topicConfig.setReplicationFactor(1); + topicConfig.setEnabled(false); // Don't create this topic + topics.put("testTopic", topicConfig); + kafkaConfiguration.setTopics(topics); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + + // Verify that createTopics was NOT called since create flag is false + verify(mockAdminClient, times(0)).createTopics(any()); + } + + @Test + public void testTopicCreationWithConfigProperties() throws Exception { + // Create spy to mock the createAdminClient method + CreateKafkaTopicsStep spyStep = spy(step); + + // Mock AdminClient and its dependencies + AdminClient mockAdminClient = mock(AdminClient.class); + ListTopicsResult mockListTopicsResult = mock(ListTopicsResult.class); + CreateTopicsResult mockCreateTopicsResult = mock(CreateTopicsResult.class); + + // Mock existing topics (empty set) + KafkaFuture> listTopicsFuture = mock(KafkaFuture.class); + when(listTopicsFuture.get()).thenReturn(new HashSet<>()); + when(mockListTopicsResult.names()).thenReturn(listTopicsFuture); + when(mockAdminClient.listTopics()).thenReturn(mockListTopicsResult); + + // Mock topic creation result + KafkaFuture createTopicsFuture = mock(KafkaFuture.class); + when(createTopicsFuture.get()).thenReturn(null); // Success + when(mockCreateTopicsResult.all()).thenReturn(createTopicsFuture); + when(mockAdminClient.createTopics(any())).thenReturn(mockCreateTopicsResult); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + // Set up topics configuration with config properties + Map topics = new HashMap<>(); + TopicsConfiguration.TopicConfiguration topicConfig = + new TopicsConfiguration.TopicConfiguration(); + topicConfig.setName("test-topic"); + topicConfig.setPartitions(3); + topicConfig.setReplicationFactor(1); + topicConfig.setEnabled(true); + + // Add config properties + Map configProperties = new HashMap<>(); + configProperties.put("retention.ms", "86400000"); + configProperties.put("cleanup.policy", "delete"); + topicConfig.setConfigProperties(configProperties); + + topics.put("testTopic", topicConfig); + kafkaConfiguration.setTopics(topics); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + + // Verify that createTopics was called + verify(mockAdminClient, times(1)).createTopics(any()); + } + + @Test + public void testTopicCreationFailure() throws Exception { + // Create spy to mock the createAdminClient method + CreateKafkaTopicsStep spyStep = spy(step); + + // Mock AdminClient and its dependencies + AdminClient mockAdminClient = mock(AdminClient.class); + ListTopicsResult mockListTopicsResult = mock(ListTopicsResult.class); + + // Mock existing topics (empty set) + KafkaFuture> listTopicsFuture = mock(KafkaFuture.class); + when(listTopicsFuture.get()).thenReturn(new HashSet<>()); + when(mockListTopicsResult.names()).thenReturn(listTopicsFuture); + when(mockAdminClient.listTopics()).thenReturn(mockListTopicsResult); + + // Mock topic creation to throw exception + when(mockAdminClient.createTopics(any())) + .thenThrow(new RuntimeException("Topic creation failed")); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + // Set up topics configuration + Map topics = new HashMap<>(); + TopicsConfiguration.TopicConfiguration topicConfig = + new TopicsConfiguration.TopicConfiguration(); + topicConfig.setName("test-topic"); + topicConfig.setPartitions(3); + topicConfig.setReplicationFactor(1); + topicConfig.setEnabled(true); + topics.put("testTopic", topicConfig); + kafkaConfiguration.setTopics(topics); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.FAILED); + } + + @Test + public void testListTopicsFailure() throws Exception { + // Create spy to mock the createAdminClient method + CreateKafkaTopicsStep spyStep = spy(step); + + // Mock AdminClient to throw exception when listing topics + AdminClient mockAdminClient = mock(AdminClient.class); + when(mockAdminClient.listTopics()).thenThrow(new RuntimeException("Failed to list topics")); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + // Set up topics configuration + Map topics = new HashMap<>(); + TopicsConfiguration.TopicConfiguration topicConfig = + new TopicsConfiguration.TopicConfiguration(); + topicConfig.setName("test-topic"); + topicConfig.setPartitions(3); + topicConfig.setReplicationFactor(1); + topicConfig.setEnabled(true); + topics.put("testTopic", topicConfig); + kafkaConfiguration.setTopics(topics); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.FAILED); + } + + @Test + public void testCreateAdminClientMethod() { + // Test that the createAdminClient method can be called + AdminClient adminClient = step.createAdminClient(); + assertNotNull(adminClient); + } +} diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/KafkaNonBlockingSetupTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/KafkaNonBlockingSetupTest.java new file mode 100644 index 0000000000..22a7f0dca0 --- /dev/null +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/KafkaNonBlockingSetupTest.java @@ -0,0 +1,67 @@ +package com.linkedin.datahub.upgrade.system.kafka; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import com.linkedin.metadata.config.kafka.SetupConfiguration; +import io.datahubproject.metadata.context.OperationContext; +import java.util.Arrays; +import java.util.List; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class KafkaNonBlockingSetupTest { + + private KafkaNonBlockingSetup upgrade; + + @Mock private OperationContext mockOpContext; + private KafkaConfiguration kafkaConfiguration; + private KafkaProperties kafkaProperties; + + @BeforeMethod + public void setUp() { + MockitoAnnotations.openMocks(this); + + // Create real KafkaConfiguration with test data + kafkaConfiguration = new KafkaConfiguration(); + kafkaConfiguration.setBootstrapServers("localhost:9092"); + + SetupConfiguration setupConfig = new SetupConfiguration(); + setupConfig.setPreCreateTopics(true); + setupConfig.setUseConfluentSchemaRegistry(true); + kafkaConfiguration.setSetup(setupConfig); + + // Create real KafkaProperties with test data + kafkaProperties = new KafkaProperties(); + kafkaProperties.setBootstrapServers(Arrays.asList("localhost:9092")); + + upgrade = new KafkaNonBlockingSetup(mockOpContext, kafkaConfiguration, kafkaProperties); + } + + @Test + public void testId() { + assertEquals(upgrade.id(), "KafkaNonBlockingSetup"); + } + + @Test + public void testSteps() { + List steps = upgrade.steps(); + assertNotNull(steps); + assertEquals(steps.size(), 1); + assertTrue(steps.get(0) instanceof ConfluentSchemaRegistryCleanupPolicyStep); + } + + @Test + public void testStepsContainsCorrectStep() { + List steps = upgrade.steps(); + ConfluentSchemaRegistryCleanupPolicyStep step = + (ConfluentSchemaRegistryCleanupPolicyStep) steps.get(0); + assertEquals(step.id(), "ConfluentSchemaRegistryCleanupPolicyStep"); + } +} diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/WaitForKafkaReadyStepTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/WaitForKafkaReadyStepTest.java new file mode 100644 index 0000000000..6f3f752695 --- /dev/null +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/kafka/WaitForKafkaReadyStepTest.java @@ -0,0 +1,172 @@ +package com.linkedin.datahub.upgrade.system.kafka; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import com.linkedin.metadata.config.kafka.SetupConfiguration; +import com.linkedin.upgrade.DataHubUpgradeState; +import io.datahubproject.metadata.context.OperationContext; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class WaitForKafkaReadyStepTest { + + private WaitForKafkaReadyStep step; + private WaitForKafkaReadyStep spyStep; + private OperationContext mockOpContext; + private KafkaConfiguration kafkaConfiguration; + private KafkaProperties kafkaProperties; + private AdminClient mockAdminClient; + private DescribeClusterResult mockDescribeClusterResult; + + @BeforeMethod + public void setUp() { + mockOpContext = mock(OperationContext.class); + + // Create real KafkaConfiguration with test data + kafkaConfiguration = new KafkaConfiguration(); + kafkaConfiguration.setBootstrapServers("localhost:9092"); + + SetupConfiguration setupConfig = new SetupConfiguration(); + setupConfig.setPreCreateTopics(true); + setupConfig.setUseConfluentSchemaRegistry(true); + kafkaConfiguration.setSetup(setupConfig); + + // Create real KafkaProperties with test data + kafkaProperties = new KafkaProperties(); + kafkaProperties.setBootstrapServers(Arrays.asList("localhost:9092")); + + mockAdminClient = mock(AdminClient.class); + mockDescribeClusterResult = mock(DescribeClusterResult.class); + + step = new WaitForKafkaReadyStep(mockOpContext, kafkaConfiguration, kafkaProperties); + spyStep = spy(step); + } + + @Test + public void testId() { + assertEquals(step.id(), "WaitForKafkaReadyStep"); + } + + @Test + public void testRetryCount() { + assertEquals(step.retryCount(), 60); + } + + @Test + public void testSuccessfulKafkaReady() throws Exception { + // Mock successful cluster description + Node mockNode1 = new Node(1, "localhost", 9092); + Node mockNode2 = new Node(2, "localhost", 9093); + + KafkaFuture clusterIdFuture = mock(KafkaFuture.class); + KafkaFuture> nodesFuture = mock(KafkaFuture.class); + + when(clusterIdFuture.get(anyLong(), any(TimeUnit.class))).thenReturn("test-cluster-id"); + when(nodesFuture.get(anyLong(), any(TimeUnit.class))) + .thenReturn(Arrays.asList(mockNode1, mockNode2)); + + when(mockDescribeClusterResult.clusterId()).thenReturn(clusterIdFuture); + when(mockDescribeClusterResult.nodes()).thenReturn(nodesFuture); + when(mockAdminClient.describeCluster()).thenReturn(mockDescribeClusterResult); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + } + + @Test + public void testKafkaReadyWithNoBrokers() throws Exception { + // Mock cluster description with no brokers + KafkaFuture clusterIdFuture = mock(KafkaFuture.class); + KafkaFuture> nodesFuture = mock(KafkaFuture.class); + + when(clusterIdFuture.get(anyLong(), any(TimeUnit.class))).thenReturn("test-cluster-id"); + when(nodesFuture.get(anyLong(), any(TimeUnit.class))).thenReturn(Collections.emptyList()); + + when(mockDescribeClusterResult.clusterId()).thenReturn(clusterIdFuture); + when(mockDescribeClusterResult.nodes()).thenReturn(nodesFuture); + when(mockAdminClient.describeCluster()).thenReturn(mockDescribeClusterResult); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.FAILED); + } + + @Test + public void testKafkaReadyWithException() throws Exception { + // Mock AdminClient to throw exception + when(mockAdminClient.describeCluster()) + .thenThrow(new RuntimeException("Kafka connection failed")); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.FAILED); + } + + @Test + public void testKafkaReadyWithTimeoutException() throws Exception { + // Mock timeout exception + KafkaFuture clusterIdFuture = mock(KafkaFuture.class); + when(clusterIdFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new java.util.concurrent.TimeoutException("Timeout")); + + when(mockDescribeClusterResult.clusterId()).thenReturn(clusterIdFuture); + when(mockAdminClient.describeCluster()).thenReturn(mockDescribeClusterResult); + + // Mock the createAdminClient method to return our mock AdminClient + doReturn(mockAdminClient).when(spyStep).createAdminClient(); + + UpgradeContext mockContext = mock(UpgradeContext.class); + UpgradeStepResult result = spyStep.executable().apply(mockContext); + + assertNotNull(result); + assertEquals(result.result(), DataHubUpgradeState.FAILED); + } + + @Test + public void testCreateAdminClientMethod() { + // Test that the createAdminClient method exists and can be called + // This will fail in tests since we don't have real Kafka configuration, + // but it verifies the method structure + try { + step.createAdminClient(); + } catch (Exception e) { + // Expected to fail in test environment, but method should exist + assertNotNull(e); + } + } +} diff --git a/datahub-upgrade/src/test/resources/logback-test.xml b/datahub-upgrade/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..a06d08c3d4 --- /dev/null +++ b/datahub-upgrade/src/test/resources/logback-test.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + diff --git a/docker/README.md b/docker/README.md index 44c3a0a6e9..64744a406c 100644 --- a/docker/README.md +++ b/docker/README.md @@ -44,7 +44,6 @@ Do not use `latest` or `debug` tags for any of the image as those are not suppor - [acryldata/datahub-mae-consumer](https://hub.docker.com/repository/docker/acryldata/datahub-mae-consumer/) - [acryldata/datahub-mce-consumer](https://hub.docker.com/repository/docker/acryldata/datahub-mce-consumer/) - [acryldata/datahub-upgrade](https://hub.docker.com/r/acryldata/datahub-upgrade/) -- [acryldata/datahub-kafka-setup](https://hub.docker.com/r/acryldata/datahub-kafka-setup/) - [acryldata/datahub-elasticsearch-setup](https://hub.docker.com/r/acryldata/datahub-elasticsearch-setup/) - [acryldata/datahub-mysql-setup](https://hub.docker.com/r/acryldata/datahub-mysql-setup/) - [acryldata/datahub-postgres-setup](https://hub.docker.com/r/acryldata/datahub-postgres-setup/) @@ -52,7 +51,6 @@ Do not use `latest` or `debug` tags for any of the image as those are not suppor Dependencies: -- [Kafka, Zookeeper, and Schema Registry](kafka-setup) - [Elasticsearch](elasticsearch-setup) - [MySQL](mysql) - [(Optional) Neo4j](neo4j) diff --git a/docker/build.gradle b/docker/build.gradle index 0742e3d51c..79fc4fb944 100644 --- a/docker/build.gradle +++ b/docker/build.gradle @@ -15,7 +15,6 @@ ext { backend_profile_modules = [ ':docker:elasticsearch-setup', ':docker:mysql-setup', - ':docker:kafka-setup', ':datahub-upgrade', ':metadata-service:war', ] diff --git a/docker/datahub-upgrade/start.sh b/docker/datahub-upgrade/start.sh index e882ff0006..3db2e82c5b 100644 --- a/docker/datahub-upgrade/start.sh +++ b/docker/datahub-upgrade/start.sh @@ -33,11 +33,6 @@ if [[ $ENTITY_SERVICE_IMPL == cassandra ]] && [[ $SKIP_CASSANDRA_CHECK != true ] WAIT_FOR_CASSANDRA=" -wait tcp://$CASSANDRA_DATASOURCE_HOST " fi -WAIT_FOR_KAFKA="" -if [[ $SKIP_KAFKA_CHECK != true ]]; then - WAIT_FOR_KAFKA=" -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') " -fi - WAIT_FOR_NEO4J="" if [[ $GRAPH_SERVICE_IMPL != elasticsearch ]] && [[ $SKIP_NEO4J_CHECK != true ]]; then WAIT_FOR_NEO4J=" -wait $NEO4J_HOST " @@ -62,7 +57,6 @@ fi COMMON=" $WAIT_FOR_EBEAN \ $WAIT_FOR_CASSANDRA \ - $WAIT_FOR_KAFKA \ $WAIT_FOR_NEO4J \ -timeout 240s \ java $JAVA_OPTS $JMX_OPTS \ diff --git a/docker/kafka-setup/Dockerfile b/docker/kafka-setup/Dockerfile deleted file mode 100644 index f7f8289002..0000000000 --- a/docker/kafka-setup/Dockerfile +++ /dev/null @@ -1,137 +0,0 @@ -# Defining custom repo urls for use in enterprise environments -ARG ALPINE_REPO_URL=http://dl-cdn.alpinelinux.org/alpine -ARG GITHUB_REPO_URL=https://github.com -ARG MAVEN_CENTRAL_REPO_URL=https://repo1.maven.org/maven2 -ARG APACHE_DOWNLOAD_URL=null - -# Base stage with common dependencies -FROM alpine:3.22 AS base - -# Re-declaring args from above -ARG ALPINE_REPO_URL -ARG GITHUB_REPO_URL -ARG MAVEN_CENTRAL_REPO_URL -ARG APACHE_DOWNLOAD_URL - -# Kafka specific args -ARG KAFKA_VERSION=4.0.0 -ARG SCALA_VERSION=2.13 -ARG CONFLUENT_VERSION=8.0.0 - -# Environment variables -ENV KAFKA_VERSION=${KAFKA_VERSION} -ENV SCALA_VERSION=${SCALA_VERSION} -ENV KAFKA_WORKDIR=/opt/kafka - -LABEL name="kafka" version=${KAFKA_VERSION} - -# Optionally set corporate mirror for apk -RUN if [ "${ALPINE_REPO_URL}" != "http://dl-cdn.alpinelinux.org/alpine" ] ; then \ - sed -i "s#http.*://dl-cdn.alpinelinux.org/alpine#${ALPINE_REPO_URL}#g" /etc/apk/repositories ; fi - -# Upgrade Alpine and install base packages -RUN apk --no-cache --update-cache --available upgrade \ - && apk --no-cache add 'c-ares>1.34.5' --repository=${ALPINE_REPO_URL}/edge/main \ - && apk --no-cache add \ - bash \ - coreutils \ - curl \ - ca-certificates \ - jq \ - python3 \ - py3-pip \ - wget \ - zip \ - gcompat \ - sqlite \ - libc6-compat \ - snappy=~1.2 --repository=${ALPINE_REPO_URL}/edge/main \ - && apk --no-cache add openjdk17-jre-headless --repository=${ALPINE_REPO_URL}/edge/community - -# Create directories -RUN mkdir -p /opt/kafka /usr/share/java/cp-base-new \ - && cp /usr/lib/jvm/java-17-openjdk/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks - -# Download and install Kafka -RUN if [ "${APACHE_DOWNLOAD_URL}" != "null" ] ; then \ - mirror="${APACHE_DOWNLOAD_URL}/" ; \ - else \ - mirror=$(curl --stderr /dev/null https://www.apache.org/dyn/closer.cgi\?as_json\=1 | jq -r '.preferred'); \ - fi && \ - curl -sSL "${mirror}kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" \ - | tar -xzf - -C /opt && \ - # Check if extraction created the expected directory - if [ -d "/opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}" ]; then \ - # Move contents properly, not the directory itself - cp -r /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}/* ${KAFKA_WORKDIR}/ && \ - rm -rf /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}; \ - fi - -# Download Confluent dependencies -ARG CONFLUENT_REPO_URL=https://packages.confluent.io/maven -RUN wget -P /usr/share/java/cp-base-new \ - ${CONFLUENT_REPO_URL}/io/confluent/common-utils/${CONFLUENT_VERSION}/common-utils-${CONFLUENT_VERSION}.jar \ - && wget -P /usr/share/java/cp-base-new \ - ${CONFLUENT_REPO_URL}/io/confluent/common-config/${CONFLUENT_VERSION}/common-config-${CONFLUENT_VERSION}.jar - -# Fix security vulnerabilities (after Kafka is installed so libs directory exists) -ARG COMMONS_BEAN_UTILS_VERSION="1.11.0" -ARG COMMONS_LANG3_VERSION="3.18.0" -RUN wget -P /usr/share/java/cp-base-new \ - ${MAVEN_CENTRAL_REPO_URL}/commons-beanutils/commons-beanutils/${COMMONS_BEAN_UTILS_VERSION}/commons-beanutils-${COMMONS_BEAN_UTILS_VERSION}.jar \ - && rm -f ${KAFKA_WORKDIR}/libs/commons-beanutils-*.jar \ - && cp /usr/share/java/cp-base-new/commons-beanutils-${COMMONS_BEAN_UTILS_VERSION}.jar ${KAFKA_WORKDIR}/libs/ \ - && wget -P /usr/share/java/cp-base-new \ - ${MAVEN_CENTRAL_REPO_URL}/org/apache/commons/commons-lang3/${COMMONS_LANG3_VERSION}/commons-lang3-${COMMONS_LANG3_VERSION}.jar \ - && rm -f ${KAFKA_WORKDIR}/libs/commons-lang3-*.jar \ - && cp /usr/share/java/cp-base-new/commons-lang3-${COMMONS_LANG3_VERSION}.jar ${KAFKA_WORKDIR}/libs/ - -# Download AWS MSK IAM Auth -ADD ${GITHUB_REPO_URL}/aws/aws-msk-iam-auth/releases/download/v2.3.2/aws-msk-iam-auth-2.3.2-all.jar /usr/share/java/cp-base-new/ -ADD ${GITHUB_REPO_URL}/aws/aws-msk-iam-auth/releases/download/v2.3.2/aws-msk-iam-auth-2.3.2-all.jar ${KAFKA_WORKDIR}/libs/ - -# Set LD_LIBRARY_PATH for compatibility -ENV LD_LIBRARY_PATH="/lib:/lib64" - -# Copy setup scripts -COPY docker/kafka-setup/kafka-setup.sh ${KAFKA_WORKDIR}/kafka-setup.sh -COPY docker/kafka-setup/kafka-config.sh ${KAFKA_WORKDIR}/kafka-config.sh -COPY docker/kafka-setup/kafka-topic-workers.sh ${KAFKA_WORKDIR}/kafka-topic-workers.sh -COPY docker/kafka-setup/kafka-ready.sh ${KAFKA_WORKDIR}/kafka-ready.sh -COPY docker/kafka-setup/env_to_properties.py ${KAFKA_WORKDIR}/env_to_properties.py - -# Make scripts executable -RUN chmod +x ${KAFKA_WORKDIR}/kafka-setup.sh \ - ${KAFKA_WORKDIR}/kafka-topic-workers.sh \ - ${KAFKA_WORKDIR}/kafka-ready.sh - -# Create kafka user and group -RUN addgroup -S kafka && adduser -S kafka -G kafka && chmod g-s /home/kafka - -# Set ownership -RUN chown -R kafka:kafka ${KAFKA_WORKDIR} \ - && chown -R kafka:kafka /usr/share/java/cp-base-new - -# Switch to kafka user -USER kafka - -# Set environment variables for DataHub -ENV METADATA_AUDIT_EVENT_NAME="MetadataAuditEvent_v4" -ENV METADATA_CHANGE_EVENT_NAME="MetadataChangeEvent_v4" -ENV FAILED_METADATA_CHANGE_EVENT_NAME="FailedMetadataChangeEvent_v4" -ENV DATAHUB_USAGE_EVENT_NAME="DataHubUsageEvent_v1" -ENV METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME="MetadataChangeLog_Versioned_v1" -ENV METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME="MetadataChangeLog_Timeseries_v1" -ENV METADATA_CHANGE_PROPOSAL_TOPIC_NAME="MetadataChangeProposal_v1" -ENV FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME="FailedMetadataChangeProposal_v1" -ENV PLATFORM_EVENT_TOPIC_NAME="PlatformEvent_v1" -ENV DATAHUB_UPGRADE_HISTORY_TOPIC_NAME="DataHubUpgradeHistory_v1" -ENV USE_CONFLUENT_SCHEMA_REGISTRY="TRUE" - -# Set PATH -ENV PATH=/sbin:${KAFKA_WORKDIR}/bin:$PATH - -# Set working directory -WORKDIR ${KAFKA_WORKDIR} - -CMD ["./kafka-setup.sh"] \ No newline at end of file diff --git a/docker/kafka-setup/README.md b/docker/kafka-setup/README.md deleted file mode 100644 index 685c51aba5..0000000000 --- a/docker/kafka-setup/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Kafka, Zookeeper and Schema Registry - -DataHub uses Kafka as the pub-sub message queue in the backend. -[Official Confluent Kafka Docker images](https://hub.docker.com/u/confluentinc) found in Docker Hub is used without -any modification. - -## Debugging Kafka - -You can install [kafkacat](https://github.com/edenhill/kafkacat) to consume and produce messaged to Kafka topics. -For example, to consume messages on MetadataAuditEvent topic, you can run below command. - -``` -kafkacat -b localhost:9092 -t MetadataAuditEvent -``` - -However, `kafkacat` currently doesn't support Avro deserialization at this point, -but they have an ongoing [work](https://github.com/edenhill/kafkacat/pull/151) for that. diff --git a/docker/kafka-setup/build.gradle b/docker/kafka-setup/build.gradle deleted file mode 100644 index 341ddb34a7..0000000000 --- a/docker/kafka-setup/build.gradle +++ /dev/null @@ -1,47 +0,0 @@ -plugins { - id 'java' // required for versioning -} - -apply from: "../../gradle/versioning/versioning.gradle" -apply from: "../../gradle/docker/docker.gradle" - -ext { - docker_repo = 'datahub-kafka-setup' - docker_dir = 'kafka-setup' -} - -docker { - dependsOn(build) - name "${docker_registry}/${docker_repo}:${versionTag}" - dockerfile file("${rootProject.projectDir}/docker/${docker_dir}/Dockerfile") - files fileTree(rootProject.projectDir) { - include '.dockerignore' - include "docker/${docker_dir}/*" - }.exclude { - i -> (!i.file.name.endsWith(".dockerignore") && i.file.isHidden()) - } - additionalTag("Debug", "${docker_registry}/${docker_repo}:debug") - - // Add build args if they are defined (needed for some CI or enterprise environments) - def dockerBuildArgs = [:] - if (project.hasProperty('alpineApkRepositoryUrl')) { - dockerBuildArgs.ALPINE_REPO_URL = project.getProperty('alpineApkRepositoryUrl') - } - if (project.hasProperty('githubMirrorUrl')) { - dockerBuildArgs.GITHUB_REPO_URL = project.getProperty('githubMirrorUrl') - } - if (project.hasProperty('mavenCentralRepositoryUrl')) { - dockerBuildArgs.MAVEN_CENTRAL_REPO_URL = project.getProperty('mavenCentralRepositoryUrl') - } - if (project.hasProperty('apacheDownloadUrl')) { - dockerBuildArgs.APACHE_DOWNLOAD_URL = project.getProperty('apacheDownloadUrl') - } - - if (project.hasProperty('confluentRepositoryUrl')) { - dockerBuildArgs.CONFLUENT_REPO_URL = project.getProperty('confluentRepositoryUrl') - } - - if (dockerBuildArgs.size() > 0) { - buildArgs(dockerBuildArgs) - } -} diff --git a/docker/kafka-setup/env/docker.env b/docker/kafka-setup/env/docker.env deleted file mode 100644 index c180816129..0000000000 --- a/docker/kafka-setup/env/docker.env +++ /dev/null @@ -1,33 +0,0 @@ -KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -KAFKA_BOOTSTRAP_SERVER=broker:29092 -USE_CONFLUENT_SCHEMA_REGISTRY=TRUE - -# Configure the topics that are created by kafka-setup -# Make sure these names are consistent across the whole deployment -# METADATA_CHANGE_PROPOSAL_TOPIC_NAME=MetadataChangeProposal_v1 -# FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME=FailedMetadataChangeProposal_v1 -# METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=MetadataChangeLog_Versioned_v1 -# METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME=MetadataChangeLog_Timeseries_v1 -# PLATFORM_EVENT_TOPIC_NAME=PlatformEvent_v1 -# DATAHUB_USAGE_EVENT_NAME=DataHubUsageEvent_v1 -# Deprecated! -# METADATA_AUDIT_EVENT_NAME=MetadataAuditEvent_v4 -# METADATA_CHANGE_EVENT_NAME=MetadataChangeEvent_v4 -# FAILED_METADATA_CHANGE_EVENT_NAME=FailedMetadataChangeEvent_v4 -# PLATFORM_EVENT_TOPIC_NAME=PlatformEvent_v1 -# DATAHUB_USAGE_EVENT_NAME=DataHubUsageEvent_v1 -# DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=DataHubUpgradeHistory_v1 -# PARTITIONS=1 -# REPLICATION_FACTOR=1 - -# Configure for an SSL-Connection to Kafka -# KAFKA_PROPERTIES_SECURITY_PROTOCOL=SSL -# KAFKA_PROPERTIES_SSL_KEYSTORE_LOCATION= -# KAFKA_PROPERTIES_SSL_KEYSTORE_PASSWORD= -# KAFKA_PROPERTIES_SSL_KEY_PASSWORD= -# KAFKA_PROPERTIES_SSL_TRUSTSTORE_LOCATION= -# KAFKA_PROPERTIES_SSL_TRUSTSTORE_PASSWORD= -# KAFKA_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM= - -# Uncomment to disable persistence of client-side analytics events -# DATAHUB_ANALYTICS_ENABLED=false diff --git a/docker/kafka-setup/env_to_properties.py b/docker/kafka-setup/env_to_properties.py deleted file mode 100644 index 8d8b8c3cc7..0000000000 --- a/docker/kafka-setup/env_to_properties.py +++ /dev/null @@ -1,24 +0,0 @@ -import os -import re -import sys - - -def env_to_properties(env_prefix: str, properties_file: str): - pattern = re.compile('(?<=[^_])_(?=[^_])') - props = {} - - for (env_name, val) in os.environ.items(): - if env_name.startswith(env_prefix): - raw_name = env_name[len(env_prefix):].lower() - prop_dot = '.'.join(pattern.split(raw_name)) - props[prop_dot] = val - - with open(properties_file, 'a') as f: - for k, v in props.items(): - f.writelines(f'{k}={v}\n') - - -if __name__ == '__main__': - env_prefix = sys.argv[1] - properties_file = sys.argv[2] - env_to_properties(env_prefix, properties_file) diff --git a/docker/kafka-setup/kafka-config.sh b/docker/kafka-setup/kafka-config.sh deleted file mode 100644 index cc714fa53b..0000000000 --- a/docker/kafka-setup/kafka-config.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env bash - -: ${PARTITIONS:=1} -: ${REPLICATION_FACTOR:=1} -: ${MAX_MESSAGE_BYTES:=5242880} - -: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:=PLAINTEXT} - -: ${DATAHUB_ANALYTICS_ENABLED:=true} - -: ${KAFKA_HEAP_OPTS:=-Xmx64M} -export KAFKA_HEAP_OPTS - -CONNECTION_PROPERTIES_PATH=/tmp/connection.properties - -WORKERS=4 -DELIMITER=";" diff --git a/docker/kafka-setup/kafka-ready.sh b/docker/kafka-setup/kafka-ready.sh deleted file mode 100755 index ba87bde047..0000000000 --- a/docker/kafka-setup/kafka-ready.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -for i in {1..60} -do - kafka-broker-api-versions.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER - if [ $? -eq 0 ]; then - break - fi - if [ $i -eq 60 ]; then - echo "Kafka bootstrap server $KAFKA_BOOTSTRAP_SERVER not ready." - exit 1 - fi - sleep 5s -done diff --git a/docker/kafka-setup/kafka-setup.sh b/docker/kafka-setup/kafka-setup.sh deleted file mode 100755 index 392cca9466..0000000000 --- a/docker/kafka-setup/kafka-setup.sh +++ /dev/null @@ -1,133 +0,0 @@ -#!/bin/bash - -## Exit early if PRECREATION is not needed -if [[ $DATAHUB_PRECREATE_TOPICS == "false" ]]; then - echo "DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS}" - echo "Pre-creation of topics has been turned off, exiting" - exit 0 -fi - -. kafka-config.sh - -echo "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVER" > $CONNECTION_PROPERTIES_PATH - -python env_to_properties.py KAFKA_PROPERTIES_ $CONNECTION_PROPERTIES_PATH - -# cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180 -. kafka-ready.sh - -############################################################ -# Start Topic Creation Logic -############################################################ -# make the files -START=$(mktemp -t start-XXXX) -FIFO=$(mktemp -t fifo-XXXX) -FIFO_LOCK=$(mktemp -t lock-XXXX) -START_LOCK=$(mktemp -t lock-XXXX) - -## mktemp makes a regular file. Delete that an make a fifo. -rm $FIFO -mkfifo $FIFO -echo $FIFO - -## create a trap to cleanup on exit if we fail in the middle. -cleanup() { - rm $FIFO - rm $START - rm $FIFO_LOCK - rm $START_LOCK -} -trap cleanup 0 - -# Start worker script -. kafka-topic-workers.sh $START $FIFO $FIFO_LOCK $START_LOCK - -## Open the fifo for writing. -exec 3>$FIFO -## Open the start lock for reading -exec 4<$START_LOCK - -## Wait for the workers to start -while true; do - flock 4 - started=$(wc -l $START | cut -d \ -f 1) - flock -u 4 - if [[ $started -eq $WORKERS ]]; then - break - else - echo waiting, started $started of $WORKERS - fi -done -exec 4<&- - -## utility function to send the jobs to the workers -send() { - work_id=$1 - topic_args=$2 - topic_config=$3 - - echo -e "sending $work_id\n worker_args: ${topic_args}${DELIMITER}${topic_config}" - echo "$work_id" "${topic_args}${DELIMITER}${topic_config}" 1>&3 ## the fifo is fd 3 -} - -## Produce the jobs to run. -send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME" \ - "--entity-type topics --entity-name $METADATA_AUDIT_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" - -send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME" \ - "--entity-type topics --entity-name $METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" -send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME" \ - "--entity-type topics --entity-name $FAILED_METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" - -send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" \ - "--entity-type topics --entity-name $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" - -# Set retention to 90 days -send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" \ - "--entity-type topics --entity-name $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" - -send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \ - "--entity-type topics --entity-name $METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" -send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \ - "--entity-type topics --entity-name $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" - -send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME" \ - "--entity-type topics --entity-name $PLATFORM_EVENT_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" - -# Infinite retention upgrade topic - # Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite - # Please see the bug report below for details - # https://github.com/datahub-project/datahub/issues/7882 -send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" \ - "--entity-type topics --entity-name "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" --alter --add-config retention.ms=-1" - -# Create topic for datahub usage event -if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then - send "$DATAHUB_USAGE_EVENT_NAME" "--partitions $PARTITIONS --topic $DATAHUB_USAGE_EVENT_NAME" -fi - -## close the filo -exec 3<&- -## disable the cleanup trap -trap '' 0 -## It is safe to delete the files because the workers -## already opened them. Thus, only the names are going away -## the actual files will stay there until the workers -## all finish. -cleanup -## now wait for all the workers. -wait - -echo "Topic Creation Complete." - -############################################################ -# End Topic Creation Logic -############################################################ - -## If using confluent schema registry as a standalone component, then configure compact cleanup policy. -if [[ $USE_CONFLUENT_SCHEMA_REGISTRY == "TRUE" ]]; then - kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \ - --entity-type topics \ - --entity-name _schemas \ - --alter --add-config cleanup.policy=compact -fi diff --git a/docker/kafka-setup/kafka-topic-workers.sh b/docker/kafka-setup/kafka-topic-workers.sh deleted file mode 100644 index 3ddf41abba..0000000000 --- a/docker/kafka-setup/kafka-topic-workers.sh +++ /dev/null @@ -1,74 +0,0 @@ -#!/usr/bin/env bash - -. kafka-config.sh - -START=$1 -FIFO=$2 -FIFO_LOCK=$3 -START_LOCK=$4 - -## this is the "job" function which is does whatever work -## the queue workers are supposed to be doing -job() { - i=$1 - worker_args=$2 - topic_args=$(echo $worker_args | cut -d "$DELIMITER" -f 1) - topic_config=$(echo $worker_args | cut -d "$DELIMITER" -f 2) - - echo " $i: kafka-topics.sh --create --if-not-exist $topic_args" - kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \ - --replication-factor $REPLICATION_FACTOR \ - $topic_args - if [[ ! -z "$topic_config" ]]; then - echo " $i: kafka-configs.sh $topic_config" - kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER $topic_config - fi -} - -## This is the worker to read from the queue. -work() { - ID=$1 - ## first open the fifo and locks for reading. - exec 3<$FIFO - exec 4<$FIFO_LOCK - exec 5<$START_LOCK - - ## signal the worker has started. - flock 5 # obtain the start lock - echo $ID >> $START # put my worker ID in the start file - flock -u 5 # release the start lock - exec 5<&- # close the start lock file - echo worker $ID started - - while true; do - ## try to read the queue - flock 4 # obtain the fifo lock - read -su 3 work_id work_item # read into work_id and work_item - read_status=$? # save the exit status of read - flock -u 4 # release the fifo lock - - ## check the line read. - if [[ $read_status -eq 0 ]]; then - ## If read gives an exit code of 0 the read succeeded. - # got a work item. do the work - echo $ID got work_id=$work_id topic_args=$work_item - ## Run the job in a subshell. That way any exit calls do not kill - ## the worker process. - ( job "$work_id" "$work_item" ) - else - ## Any other exit code indicates an EOF. - break - fi - done - # clean up the fd(s) - exec 3<&- - exec 4<&- - echo $ID "done working" -} - -## Start the workers. -for ((i=1;i<=$WORKERS;i++)); do - echo will start $i - work $i & -done - diff --git a/docker/profiles/docker-compose.frontend.yml b/docker/profiles/docker-compose.frontend.yml index cf79070846..4b5a5bd35b 100644 --- a/docker/profiles/docker-compose.frontend.yml +++ b/docker/profiles/docker-compose.frontend.yml @@ -89,8 +89,6 @@ services: condition: service_completed_successfully opensearch-setup-dev: condition: service_completed_successfully - kafka-setup-dev: - condition: service_completed_successfully frontend-debug-postgres: <<: *datahub-frontend-service-dev profiles: diff --git a/docker/profiles/docker-compose.gms.yml b/docker/profiles/docker-compose.gms.yml index 026f79bb67..f5b931c80c 100644 --- a/docker/profiles/docker-compose.gms.yml +++ b/docker/profiles/docker-compose.gms.yml @@ -76,6 +76,9 @@ x-datahub-system-update-service: &datahub-system-update-service SPRING_KAFKA_PROPERTIES_USE_LATEST_VERSION: ${SPRING_KAFKA_PROPERTIES_USE_LATEST_VERSION:-true} ENTITY_VERSIONING_ENABLED: ${ENTITY_VERSIONING_ENABLED:-true} ELASTICSEARCH_INDEX_BUILDER_REFRESH_INTERVAL_SECONDS: ${ELASTICSEARCH_INDEX_BUILDER_REFRESH_INTERVAL_SECONDS:-3} + DATAHUB_PRECREATE_TOPICS: ${DATAHUB_PRECREATE_TOPICS:-false} + KAFKA_BOOTSTRAP_SERVER: broker:29092 + USE_CONFLUENT_SCHEMA_REGISTRY: false volumes: - ${HOME}/.datahub/plugins:/etc/datahub/plugins labels: @@ -92,6 +95,7 @@ x-datahub-system-update-service-dev: &datahub-system-update-service-dev REPROCESS_DEFAULT_BROWSE_PATHS_V2: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:-false} JAVA_TOOL_OPTIONS: '-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5003' ENTITY_VERSIONING_ENABLED: ${ENTITY_VERSIONING_ENABLED:-true} + DATAHUB_PRECREATE_TOPICS: ${DATAHUB_PRECREATE_TOPICS:-true} volumes: - ../../datahub-upgrade/build/libs/:/datahub/datahub-upgrade/bin/ - ../../metadata-models/src/main/resources/:/datahub/datahub-gms/resources @@ -233,8 +237,6 @@ services: condition: service_completed_successfully opensearch-setup: condition: service_completed_successfully - kafka-setup: - condition: service_completed_successfully opensearch: condition: service_healthy mysql: @@ -252,8 +254,6 @@ services: condition: service_completed_successfully opensearch-setup: condition: service_completed_successfully - kafka-setup: - condition: service_completed_successfully system-update-quickstart-postgres: <<: *datahub-system-update-service profiles: @@ -265,8 +265,6 @@ services: condition: service_completed_successfully opensearch-setup: condition: service_completed_successfully - kafka-setup: - condition: service_completed_successfully system-update-debug: &system-update-debug <<: *datahub-system-update-service-dev profiles: @@ -279,8 +277,6 @@ services: condition: service_completed_successfully opensearch-setup-dev: condition: service_completed_successfully - kafka-setup-dev: - condition: service_completed_successfully system-update-debug-aws: &system-update-debug <<: *datahub-system-update-service-dev profiles: @@ -291,8 +287,6 @@ services: condition: service_completed_successfully opensearch-setup-dev: condition: service_completed_successfully - kafka-setup-dev: - condition: service_completed_successfully system-update-debug-consumers: # for consistent naming based on profile <<: *system-update-debug profiles: @@ -306,8 +300,6 @@ services: condition: service_completed_successfully elasticsearch-setup-dev: condition: service_completed_successfully - kafka-setup-dev: - condition: service_completed_successfully system-update-debug-postgres: <<: *datahub-system-update-service-dev profiles: @@ -319,8 +311,6 @@ services: condition: service_completed_successfully opensearch-setup-dev: condition: service_completed_successfully - kafka-setup-dev: - condition: service_completed_successfully system-update-debug-cassandra: <<: *datahub-system-update-service-dev profiles: @@ -332,8 +322,6 @@ services: condition: service_completed_successfully opensearch-setup-dev: condition: service_completed_successfully - kafka-setup-dev: - condition: service_completed_successfully system-update-debug-neo4j: <<: *datahub-system-update-service-dev profiles: @@ -345,8 +333,6 @@ services: condition: service_healthy opensearch-setup-dev: condition: service_completed_successfully - kafka-setup-dev: - condition: service_completed_successfully ################################# # GMS ################################# diff --git a/docker/profiles/docker-compose.prerequisites.yml b/docker/profiles/docker-compose.prerequisites.yml index efbf48a247..8364554e0d 100644 --- a/docker/profiles/docker-compose.prerequisites.yml +++ b/docker/profiles/docker-compose.prerequisites.yml @@ -279,27 +279,6 @@ services: timeout: 5s volumes: - broker:/var/lib/kafka/data/ - kafka-setup: &kafka-setup - profiles: *profiles-quickstart - hostname: kafka-setup - image: ${DATAHUB_KAFKA_SETUP_IMAGE:-${DATAHUB_REPO:-acryldata}/datahub-kafka-setup}:${DATAHUB_VERSION:-head} - env_file: kafka-setup/env/docker.env - environment: &kafka-setup-env - DATAHUB_PRECREATE_TOPICS: ${DATAHUB_PRECREATE_TOPICS:-false} - KAFKA_BOOTSTRAP_SERVER: broker:29092 - USE_CONFLUENT_SCHEMA_REGISTRY: false - depends_on: - kafka-broker: - condition: service_healthy - labels: - datahub_setup_job: true - kafka-setup-dev: - <<: *kafka-setup - profiles: *profiles-dev - environment: - <<: *kafka-setup-env - DATAHUB_PRECREATE_TOPICS: ${DATAHUB_PRECREATE_TOPICS:-true} - image: ${DATAHUB_KAFKA_SETUP_IMAGE:-${DATAHUB_REPO:-acryldata}/datahub-kafka-setup}:${DATAHUB_VERSION:-debug} elasticsearch: profiles: *elasticsearch-profiles hostname: search diff --git a/docker/quickstart/docker-compose.quickstart-profile.yml b/docker/quickstart/docker-compose.quickstart-profile.yml index 459e65cf6c..12b4f35424 100644 --- a/docker/quickstart/docker-compose.quickstart-profile.yml +++ b/docker/quickstart/docker-compose.quickstart-profile.yml @@ -202,31 +202,6 @@ services: source: broker target: /var/lib/kafka/data volume: {} - kafka-setup: - profiles: - - quickstart - - quickstart-backend - - quickstart-actions - - quickstart-frontend - - quickstart-storage - - quickstart-cassandra - - quickstart-postgres - - quickstart-consumers - depends_on: - kafka-broker: - condition: service_healthy - required: true - environment: - DATAHUB_PRECREATE_TOPICS: 'false' - KAFKA_BOOTSTRAP_SERVER: broker:29092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - USE_CONFLUENT_SCHEMA_REGISTRY: 'false' - hostname: kafka-setup - image: acryldata/datahub-kafka-setup:${DATAHUB_VERSION} - labels: - datahub_setup_job: 'true' - networks: - default: null mysql: profiles: - quickstart @@ -389,9 +364,6 @@ services: - -u - SystemUpdate depends_on: - kafka-setup: - condition: service_completed_successfully - required: true mysql: condition: service_healthy required: true @@ -408,6 +380,7 @@ services: BACKFILL_BROWSE_PATHS_V2: 'true' DATAHUB_GMS_HOST: datahub-gms DATAHUB_GMS_PORT: '8080' + DATAHUB_PRECREATE_TOPICS: 'false' EBEAN_DATASOURCE_DRIVER: com.mysql.jdbc.Driver EBEAN_DATASOURCE_HOST: mysql:3306 EBEAN_DATASOURCE_PASSWORD: datahub @@ -435,6 +408,7 @@ services: SCHEMA_REGISTRY_TYPE: INTERNAL SPRING_KAFKA_PROPERTIES_AUTO_REGISTER_SCHEMAS: 'true' SPRING_KAFKA_PROPERTIES_USE_LATEST_VERSION: 'true' + USE_CONFLUENT_SCHEMA_REGISTRY: 'false' hostname: datahub-system-update image: acryldata/datahub-upgrade:${DATAHUB_VERSION} labels: diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index a6d9f4f32b..be5232de10 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -1148,7 +1148,6 @@ module.exports = { // - "docker/datahub-ingestion/README", // - "docker/elasticsearch-setup/README", // - "docker/ingestion/README", - // - "docker/kafka-setup/README", // - "docker/mariadb/README", // - "docker/mysql/README", // - "docker/neo4j/README", diff --git a/docs/architecture/docker-containers.md b/docs/architecture/docker-containers.md index 73bcbc0acf..7ae2bb5ca3 100644 --- a/docs/architecture/docker-containers.md +++ b/docs/architecture/docker-containers.md @@ -16,12 +16,10 @@ the proper ordering of the components whereas docker-compose relies on a series datahub-gms (healthy) | datahub-upgrade (SystemUpdate completed) - /--------------------/ | \ \------------------------------------------------\ - / | \-------------------\ \ -mysql-setup (completed) elasticsearch-setup (completed) kafka-setup (completed) (if apply) neo4j (healthy) - | | / \ - | | / \ -mysql (healthy) elasticsearch (healthy) broker (healthy) (if not internal) schema-registry (healthy) - | - zookeeper (healthy) + /--------------------/ | \------------------------------------------------\ + / | \ +mysql-setup (completed) elasticsearch-setup (completed) (if apply) neo4j (healthy) + | | + | | +mysql (healthy) elasticsearch (healthy) ``` diff --git a/docs/deploy/confluent-cloud.md b/docs/deploy/confluent-cloud.md index 55f5bd18d5..0146379339 100644 --- a/docs/deploy/confluent-cloud.md +++ b/docs/deploy/confluent-cloud.md @@ -229,11 +229,3 @@ credentialsAndCertsSecrets: ``` The Actions pod will automatically pick these up in the correctly named environment variables when they are named this exact way. - -## Contribution - -Accepting contributions for a setup script compatible with Confluent Cloud! - -The kafka-setup-job container we ship with is only compatible with a distribution of Kafka wherein ZooKeeper -is exposed and available. A version of the job using the [Confluent CLI](https://docs.confluent.io/confluent-cli/current/command-reference/kafka/topic/confluent_kafka_topic_create.html) -would be very useful for the broader community. diff --git a/docs/deploy/kubernetes.md b/docs/deploy/kubernetes.md index 220ae6d35f..e525b1e8f7 100644 --- a/docs/deploy/kubernetes.md +++ b/docs/deploy/kubernetes.md @@ -121,7 +121,6 @@ datahub-datahub-gms-58b676f77c-c6pfx 1/1 Running 0 datahub-datahub-mae-consumer-7b98bf65d-tjbwx 1/1 Running 0 4m3s datahub-datahub-mce-consumer-8c57d8587-vjv9m 1/1 Running 0 4m2s datahub-elasticsearch-setup-job-8dz6b 0/1 Completed 0 4m50s -datahub-kafka-setup-job-6blcj 0/1 Completed 0 4m40s datahub-mysql-setup-job-b57kc 0/1 Completed 0 4m7s elasticsearch-master-0 1/1 Running 0 97m elasticsearch-master-1 1/1 Running 0 97m diff --git a/docs/docker/development.md b/docs/docker/development.md index 793066a86f..5571e18ea9 100644 --- a/docs/docker/development.md +++ b/docs/docker/development.md @@ -31,7 +31,6 @@ Once the `debug` docker images are constructed you'll see images similar to the ```shell acryldata/datahub-frontend-react debug e52fef698025 28 minutes ago 763MB -acryldata/datahub-kafka-setup debug 3375aaa2b12d 55 minutes ago 659MB acryldata/datahub-gms debug ea2b0a8ea115 56 minutes ago 408MB acryldata/datahub-upgrade debug 322377a7a21d 56 minutes ago 463MB acryldata/datahub-mysql-setup debug 17768edcc3e5 2 hours ago 58.2MB diff --git a/docs/how/kafka-config.md b/docs/how/kafka-config.md index 3ad7c9f47a..9b8b6d7751 100644 --- a/docs/how/kafka-config.md +++ b/docs/how/kafka-config.md @@ -5,7 +5,9 @@ hide_title: true # Configuring Kafka in DataHub -DataHub requires Kafka to operate. Kafka is used as a durable log that can be used to store inbound +DataHub uses Kafka as the pub-sub message queue in the backend. +[Official Confluent Kafka Docker images](https://hub.docker.com/u/confluentinc) found in Docker Hub is used without +any modification. Kafka is used as a durable log that can be used to store inbound requests to update the Metadata Graph (Metadata Change Proposal), or as a change log detailing the updates that have been made to the Metadata Graph (Metadata Change Log). @@ -15,9 +17,9 @@ The following environment variables can be used to customize DataHub's connectio each of which requires a connection to Kafka: - `metadata-service` (datahub-gms container) +- `system-update` (dathub-system-update container if setting up topics via datahub) - (Advanced - if standalone consumers are deployed) `mce-consumer-job` (datahub-mce-consumer container) - (Advanced - if standalone consumers are deployed) `mae-consumer-job` (datahub-mae-consumer container) -- (Advanced - if product analytics are enabled) datahub-frontend ### Connection Configuration @@ -70,9 +72,10 @@ How Metadata Events relate to these topics is discussed at more length in [Metad We've included environment variables to customize the name each of these topics, for cases where an organization has naming rules for your topics. -### Metadata Service (datahub-gms) +### Metadata Service (datahub-gms) and System Update (datahub-system-update) -The following are environment variables you can use to configure topic names used in the Metadata Service container: +The following are environment variables you can use to configure topic names used in the Metadata Service container and +the System Update container for topic setup: - `METADATA_CHANGE_PROPOSAL_TOPIC_NAME`: The name of the topic for Metadata Change Proposals emitted by the ingestion framework. - `FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME`: The name of the topic for Metadata Change Proposals emitted when MCPs fail processing. @@ -99,11 +102,6 @@ The following are environment variables you can use to configure topic names use - `DATAHUB_USAGE_EVENT_NAME`: The name of the topic for product analytics events. - (Deprecated) `METADATA_AUDIT_EVENT_NAME`: The name of the deprecated metadata audit event topic. -### DataHub Frontend (datahub-frontend-react) - -- `DATAHUB_TRACKING_TOPIC`: The name of the topic used for storing DataHub usage events. - It should contain the same value as `DATAHUB_USAGE_EVENT_NAME` in the Metadata Service container. - Please ensure that these environment variables are set consistently throughout your ecosystem. DataHub has a few different applications running which communicate with Kafka (see above). ## Configuring Consumer Group Id @@ -162,6 +160,19 @@ datahub-gms: - name: KAFKA_CONSUMER_GROUP_ID value: "my-apps-mae-consumer" .... +datahub-system-update: + ... + extraEnvs: + - name: METADATA_CHANGE_PROPOSAL_TOPIC_NAME + value: "CustomMetadataChangeProposal_v1" + - name: METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME + value: "CustomMetadataChangeLogVersioned_v1" + - name: FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME + value: "CustomFailedMetadataChangeProposal_v1" + - name: KAFKA_CONSUMER_GROUP_ID + value: "my-apps-mae-consumer" + .... + datahub-frontend: ... @@ -189,10 +200,9 @@ datahub-mce-consumer; ## Other Components that use Kafka can be configured using environment variables: -- kafka-setup - schema-registry -## SASL/GSSAPI properties for kafka-setup and datahub-frontend via environment variables +## SASL/GSSAPI properties for system-update and datahub-frontend via environment variables ```bash KAFKA_BOOTSTRAP_SERVER=broker:29092 @@ -249,3 +259,32 @@ Client. > messages indicate that the service was passed a configuration that is not relevant to it and can be safely ignored. > Other errors: `Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [DataHubUsageEvent_v1]`. Please check ranger permissions or kafka broker logs. + +### Additional Kafka Topic level configuration + +For additional [Kafka topic level config properties](https://kafka.apache.org/documentation/#topicconfigs), either add them to application.yaml under `kafka.topics..configProperties` or `kafka.topicDefaults.configProperties` or define env vars in the following form (standard Spring conventions applied to application.yaml) +These env vars are required in datahub-system-update contained. + +Examples: + +1. To configure `max.message.bytes` on topic used for `metadataChangeLogVersioned`, set + `KAFKA_TOPICS_metadataChangeLogVersioned_CONFIGPROPERTIES_max_message_bytes=10000` +2. To configure `max.message.bytes` for all topics that don't explicitly define one, set the `topicDefaults` via + `KAFKA_TOPICDEFAULTS_CONFIGPROPERTIES_max_message_bytes=10000` + +Configurations specified in `topicDefaults` are applied to all topics by merging them with any configs defined per topic, with the per-topic config taking precedence over those specified in `topicDefault`. + +If you intend to create and configure the topics yourself and not have datahub create them, the kafka setup process of +datahub-system-update can be turned off by setting env var DATAHUB_PRECREATE_TOPICS to false + +## Debugging Kafka + +You can install [kafkacat](https://github.com/edenhill/kafkacat) to consume and produce messaged to Kafka topics. +For example, to consume messages on MetadataAuditEvent topic, you can run below command. + +``` +kafkacat -b localhost:9092 -t MetadataAuditEvent +``` + +However, `kafkacat` currently doesn't support Avro deserialization at this point, +but they have an ongoing [work](https://github.com/edenhill/kafkacat/pull/151) for that. diff --git a/docs/quickstart.md b/docs/quickstart.md index de0a52f841..1465a40dc5 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -90,7 +90,6 @@ Starting up DataHub... ✔ Container datahub-kafka-broker-1 Healthy 6.0s ✔ Container datahub-opensearch-setup-1 Exited 11.6s ✔ Container datahub-mysql-setup-1 Exited 11.6s - ✔ Container datahub-kafka-setup-1 Exited 11.6s ✔ Container datahub-system-update-quickstart-1 Exited 26.6s ✔ Container datahub-datahub-gms-quickstart-1 Healthy 42.1s ✔ Container datahub-frontend-quickstart-1 Started 26.6s diff --git a/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java b/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java index 8ae7eef91d..dc736f610c 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java @@ -117,6 +117,20 @@ public class PropertiesCollectorConfigurationTest extends AbstractTestNGSpringCo "systemUpdate.*.enabled", "systemUpdate.*.batchSize", + // Kafka topic Configs + "kafka.topics.*.name", + "kafka.topics.*.partitions", + "kafka.topics.*.enabled", + "kafka.topics.*.replicationFactor", + "kafka.topics.*.configProperties.max.message.bytes", + "kafka.topics.*.configProperties.retention.ms", + "kafka.topicDefaults.configProperties.max.message.bytes", + "kafka.topicDefaults.configProperties.retention.ms", + "kafka.topicDefaults.partitions", + "kafka.topicDefaults.replicationFactor", + "kafka.setup.preCreateTopics", + "kafka.setup.useConfluentSchemaRegistry", + // IAM authentication flags "*.postgresUseIamAuth", "*.opensearchUseAwsIamAuth"); diff --git a/metadata-jobs/mce-consumer-job/README.md b/metadata-jobs/mce-consumer-job/README.md index 3f789be4fd..611c5641b2 100644 --- a/metadata-jobs/mce-consumer-job/README.md +++ b/metadata-jobs/mce-consumer-job/README.md @@ -43,7 +43,7 @@ However, if you only want to build `Metadata Change Event Consumer Job` specific ## Dependencies -Before starting `Metadata Change Event Consumer Job`, you need to make sure that [Kafka, Schema Registry & Zookeeper](../../docker/kafka-setup) +Before starting `Metadata Change Event Consumer Job`, you need to make sure that [Kafka, Schema Registry & Zookeeper](../../docs/how/kafka-config.md) and [DataHub GMS](../../docker/datahub-gms) Docker containers are up and running. ## Start via Docker image diff --git a/metadata-service/README.md b/metadata-service/README.md index 42a1b64f92..b30ddc3be5 100644 --- a/metadata-service/README.md +++ b/metadata-service/README.md @@ -29,7 +29,7 @@ However, if you only want to build `DataHub Metadata Service` specifically: ## Dependencies -Before starting `DataHub Metadata Service`, you need to make sure that [Kafka, Schema Registry & Zookeeper](../docker/kafka-setup), +Before starting `DataHub Metadata Service`, you need to make sure that [Kafka, Schema Registry & Zookeeper](../docs/how/kafka-config.md), [Elasticsearch](../docker/elasticsearch) and [MySQL](../docker/mysql) Docker containers are up and running. ## Start via Docker image diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java index 7a740bd5e5..d9b7d34f5a 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java @@ -2,14 +2,17 @@ package com.linkedin.metadata.config.kafka; import java.util.HashMap; import java.util.Map; +import javax.annotation.PostConstruct; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; +import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; @Data +@Configuration public class KafkaConfiguration { // Avoiding dependencies on other libraries (Spring/Kafka) for configuration public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; @@ -28,6 +31,16 @@ public class KafkaConfiguration { private String bootstrapServers; + private SetupConfiguration setup; + + private TopicsConfiguration.TopicConfiguration topicDefaults; + + private Map topics; + + // Retained for backward compatibility, not loaded from conf, but computed by merging + // topics with topicDefaults + private TopicsConfiguration topicsConfiguration; + private ListenerConfiguration listener; private SchemaRegistryConfiguration schemaRegistry; @@ -37,7 +50,15 @@ public class KafkaConfiguration { private ConsumerConfiguration consumer; private SerDeConfig serde; - private TopicsConfiguration topics; + + @PostConstruct + public void initializeTopicDefaults() { + topicsConfiguration = new TopicsConfiguration(topicDefaults, topics); + } + + public TopicsConfiguration getTopics() { + return new TopicsConfiguration(topicDefaults, topics); + } @Data public static class SerDeConfig { diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/SetupConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/SetupConfiguration.java new file mode 100644 index 0000000000..5fa50152a7 --- /dev/null +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/SetupConfiguration.java @@ -0,0 +1,9 @@ +package com.linkedin.metadata.config.kafka; + +import lombok.Data; + +@Data +public class SetupConfiguration { + private boolean preCreateTopics = true; + private boolean useConfluentSchemaRegistry = true; +} diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/TopicsConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/TopicsConfiguration.java index 3da4898da3..43d97c9c76 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/TopicsConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/TopicsConfiguration.java @@ -1,8 +1,76 @@ package com.linkedin.metadata.config.kafka; -import lombok.Data; +import java.util.HashMap; +import java.util.Map; +import lombok.Getter; +import lombok.Setter; -@Data +@Getter +@Setter public class TopicsConfiguration { - private String dataHubUsage; + // For backward compatibility + private String dataHubUsage = "DataHubUsage_v1"; + + private Map topics = new HashMap<>(); + + private TopicConfiguration topicDefaults; + + // For backward compatibility + public String getDataHubUsage() { + if (topics != null && topics.containsKey("datahubUsageEvent")) { + TopicConfiguration topicDef = topics.get("datahubUsageEvent"); + if (topicDef != null && topicDef.getName() != null) { + return topicDef.getName(); + } + } + return dataHubUsage; + } + + public TopicsConfiguration( + TopicConfiguration topicDefaults, Map topics) { + this.topics = topics; + this.topicDefaults = topicDefaults; + + // Initialize null values in topics map with defaults from topicDefaults + if (topics != null && topicDefaults != null) { + for (TopicConfiguration topicConfig : topics.values()) { + if (topicConfig != null) { + // Initialize partitions if null + if (topicConfig.getPartitions() == null) { + topicConfig.setPartitions(topicDefaults.getPartitions()); + } + + // Initialize replicationFactor if null + if (topicConfig.getReplicationFactor() == null) { + topicConfig.setReplicationFactor(topicDefaults.getReplicationFactor()); + } + + // Initialize create if null + if (topicConfig.getEnabled() == null) { + topicConfig.setEnabled(topicDefaults.getEnabled()); + } + + // Initialize configProperties if null + if (topicConfig.getConfigProperties() == null) { + topicConfig.setConfigProperties(topicDefaults.getConfigProperties()); + } else if (topicDefaults.getConfigProperties() != null) { + // Merge configProperties, keeping existing values as overrides + Map mergedConfig = new HashMap<>(topicDefaults.getConfigProperties()); + mergedConfig.putAll(topicConfig.getConfigProperties()); + topicConfig.setConfigProperties(mergedConfig); + } + } + } + } + } + + @Getter + @Setter + public static class TopicConfiguration { + private String name; + private Integer partitions; + private Integer replicationFactor = 1; + private Map configProperties; + private Boolean enabled = true; // Default to true if not specified + } } diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 8bc2616b76..86131e0a7d 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -344,8 +344,69 @@ elasticsearch: # TODO: Kafka topic convention kafka: + # Kafka Setup Configuration - replaces kafka-setup container functionality + setup: + preCreateTopics: ${DATAHUB_PRECREATE_TOPICS:true} + useConfluentSchemaRegistry: ${USE_CONFLUENT_SCHEMA_REGISTRY:false} + + # readinessCheck: + # timeoutSeconds: 300 # HARDCODED: 60 attempts * 5 seconds = 300s total + # intervalSeconds: 5 # HARDCODED: kafka-ready.sh sleep 5s + # maxAttempts: 60 # HARDCODED: kafka-ready.sh for i in {1..60} + + # # EXISTING KAFKA_PROPERTIES_* environment variable mapping + # properties: + # securityProtocol: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:PLAINTEXT} # EXISTING + # # All KAFKA_PROPERTIES_* environment variables will be automatically mapped + + # Topic defaults applied when individual topic configuration is missing + topicDefaults: + partitions: ${PARTITIONS:1} + replicationFactor: ${REPLICATION_FACTOR:1} + configProperties: + max.message.bytes: ${MAX_MESSAGE_BYTES:5242880} + topics: - dataHubUsage: ${DATAHUB_USAGE_EVENT_NAME:DataHubUsageEvent_v1} + # Topic Dictionary Configuration - merged directly into topics section + # Each topic can be created by iterating through this dictionary + # The key name matches the programmatic identifier used in code + metadataChangeProposal: + name: ${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:MetadataChangeProposal_v1} + + failedMetadataChangeProposal: + name: ${FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME:FailedMetadataChangeProposal_v1} + + metadataChangeLogVersioned: + name: ${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:MetadataChangeLog_Versioned_v1} + + metadataChangeLogTimeseries: + name: ${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:MetadataChangeLog_Timeseries_v1} + configProperties: + retention.ms: 7776000000 + + platformEvent: + name: ${PLATFORM_EVENT_TOPIC_NAME:PlatformEvent_v1} + + datahubUpgradeHistory: + name: ${DATAHUB_UPGRADE_HISTORY_TOPIC_NAME:DataHubUpgradeHistory_v1} + partitions: 1 + configProperties: + retention.ms: -1 + + datahubUsageEvent: + name: ${DATAHUB_USAGE_EVENT_NAME:DataHubUsageEvent_v1} + enabled: ${DATAHUB_ANALYTICS_ENABLED:true} + + # Deprecated Topics (maintained for backward compatibility) + metadataAuditEvent: + name: ${METADATA_AUDIT_EVENT_NAME:MetadataAuditEvent_v4} + + metadataChangeEvent: + name: ${METADATA_CHANGE_EVENT_NAME:MetadataChangeEvent_v4} + + failedMetadataChangeEvent: + name: ${FAILED_METADATA_CHANGE_EVENT_NAME:FailedMetadataChangeEvent_v4} + listener: concurrency: ${KAFKA_LISTENER_CONCURRENCY:1} bootstrapServers: ${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092} @@ -406,6 +467,7 @@ kafka: security: protocol: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:PLAINTEXT} + # Only required if GraphService type is neo4j neo4j: username: ${NEO4J_USERNAME:neo4j} diff --git a/metadata-service/configuration/src/test/java/com/linkedin/metadata/config/kafka/KafkaConfigurationTest.java b/metadata-service/configuration/src/test/java/com/linkedin/metadata/config/kafka/KafkaConfigurationTest.java new file mode 100644 index 0000000000..040f3bc439 --- /dev/null +++ b/metadata-service/configuration/src/test/java/com/linkedin/metadata/config/kafka/KafkaConfigurationTest.java @@ -0,0 +1,193 @@ +package com.linkedin.metadata.config.kafka; + +import static org.testng.Assert.*; + +import com.linkedin.metadata.config.DataHubAppConfiguration; +import com.linkedin.metadata.config.DataHubTestApplication; +import java.util.Map; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +@SpringBootTest(classes = DataHubTestApplication.class) +public class KafkaConfigurationTest extends AbstractTestNGSpringContextTests { + + @Autowired private DataHubTestApplication testApplication; + + @Test + public void testKafkaConfigurationLoaded() { + assertNotNull(testApplication, "Test application should be loaded"); + + DataHubAppConfiguration appConfig = testApplication.getDataHubAppConfig(); + assertNotNull(appConfig, "DataHubAppConfiguration should be loaded"); + + KafkaConfiguration kafkaConfig = appConfig.getKafka(); + assertNotNull(kafkaConfig, "KafkaConfiguration should be loaded"); + } + + @Test + public void testConfigurationHierarchy() { + DataHubAppConfiguration appConfig = testApplication.getDataHubAppConfig(); + KafkaConfiguration kafkaConfig = appConfig.getKafka(); + + // Test that all configuration objects are properly nested + assertNotNull(kafkaConfig.getSetup(), "Setup configuration should be accessible"); + assertNotNull( + kafkaConfig.getTopicDefaults(), "TopicDefaults configuration should be accessible"); + + // Test that the configuration objects are of the correct types + assertTrue( + kafkaConfig.getSetup() instanceof SetupConfiguration, + "Setup should be SetupConfiguration type"); + assertTrue( + kafkaConfig.getTopicDefaults() instanceof TopicsConfiguration.TopicConfiguration, + "TopicDefaults should be TopicConfiguration type"); + + // Topics configuration should not be null + assertNotNull(kafkaConfig.getTopics(), "Topics configuration should not be null"); + assertTrue( + kafkaConfig.getTopics() instanceof TopicsConfiguration, + "Topics should be TopicsConfiguration type"); + } + + @Test + public void testTopicConfigurationDefaultsInitialization() { + DataHubAppConfiguration appConfig = testApplication.getDataHubAppConfig(); + KafkaConfiguration kafkaConfig = appConfig.getKafka(); + + TopicsConfiguration topicsConfig = kafkaConfig.getTopics(); + assertNotNull(topicsConfig, "TopicsConfiguration should not be null"); + TopicsConfiguration.TopicConfiguration topicDefaults = topicsConfig.getTopicDefaults(); + + // Test that topics with null values get initialized with defaults + Map topics = topicsConfig.getTopics(); + assertNotNull(topics, "Topics map should not be null"); + + // Test metadataChangeProposal topic (should have defaults applied) + TopicsConfiguration.TopicConfiguration metadataChangeProposal = + topics.get("metadataChangeProposal"); + assertNotNull(metadataChangeProposal, "metadataChangeProposal should not be null"); + // This topic only has a name, so partitions should be initialized from defaults + assertEquals( + metadataChangeProposal.getPartitions(), + topicDefaults.getPartitions(), + "metadataChangeProposal partitions should be initialized from defaults"); + assertEquals( + metadataChangeProposal.getReplicationFactor(), + topicDefaults.getReplicationFactor(), + "metadataChangeProposal replicationFactor should be initialized from defaults"); + + // Test configProperties merging for metadataChangeProposal (should inherit all defaults) + Map metadataChangeProposalConfig = metadataChangeProposal.getConfigProperties(); + assertNotNull( + metadataChangeProposalConfig, "metadataChangeProposal configProperties should not be null"); + Map defaultConfig = topicDefaults.getConfigProperties(); + assertNotNull(defaultConfig, "topicDefaults configProperties should not be null"); + + // Verify that metadataChangeProposal inherits the default max.message.bytes + assertEquals( + metadataChangeProposalConfig.get("max.message.bytes"), + defaultConfig.get("max.message.bytes"), + "metadataChangeProposal should inherit max.message.bytes from defaults"); + assertEquals( + metadataChangeProposalConfig.get("max.message.bytes"), + "5242880", + "metadataChangeProposal should have default max.message.bytes value"); + + // Test datahubUpgradeHistory topic (has explicit partitions=1, should retain it) + TopicsConfiguration.TopicConfiguration datahubUpgradeHistory = + topics.get("datahubUpgradeHistory"); + assertNotNull(datahubUpgradeHistory, "datahubUpgradeHistory should not be null"); + assertEquals( + datahubUpgradeHistory.getPartitions(), + Integer.valueOf(1), + "datahubUpgradeHistory should retain its explicit partitions value"); + // replicationFactor should be initialized from defaults since it's not explicitly set + assertEquals( + datahubUpgradeHistory.getReplicationFactor(), + topicDefaults.getReplicationFactor(), + "datahubUpgradeHistory replicationFactor should be initialized from defaults"); + + // Test configProperties merging for datahubUpgradeHistory (should merge defaults with specific + // retention) + Map datahubUpgradeHistoryConfig = datahubUpgradeHistory.getConfigProperties(); + assertNotNull( + datahubUpgradeHistoryConfig, "datahubUpgradeHistory configProperties should not be null"); + + // Verify that datahubUpgradeHistory inherits the default max.message.bytes + assertEquals( + datahubUpgradeHistoryConfig.get("max.message.bytes"), + defaultConfig.get("max.message.bytes"), + "datahubUpgradeHistory should inherit max.message.bytes from defaults"); + assertEquals( + datahubUpgradeHistoryConfig.get("max.message.bytes"), + "5242880", + "datahubUpgradeHistory should have default max.message.bytes value"); + + // Verify that datahubUpgradeHistory has its specific retention.ms value + assertEquals( + datahubUpgradeHistoryConfig.get("retention.ms"), + "-1", + "datahubUpgradeHistory should have its specific retention.ms value"); + + // Verify that the configProperties map contains both default and specific properties + assertTrue( + datahubUpgradeHistoryConfig.containsKey("max.message.bytes"), + "datahubUpgradeHistory should contain default max.message.bytes"); + + // Test metadataChangeLogTimeseries topic (should merge defaults with specific retention) + TopicsConfiguration.TopicConfiguration metadataChangeLogTimeseries = + topics.get("metadataChangeLogTimeseries"); + assertNotNull(metadataChangeLogTimeseries, "metadataChangeLogTimeseries should not be null"); + + // Test configProperties merging for metadataChangeLogTimeseries + Map metadataChangeLogTimeseriesConfig = + metadataChangeLogTimeseries.getConfigProperties(); + assertNotNull( + metadataChangeLogTimeseriesConfig, + "metadataChangeLogTimeseries configProperties should not be null"); + + // Verify that metadataChangeLogTimeseries inherits the default max.message.bytes + assertEquals( + metadataChangeLogTimeseriesConfig.get("max.message.bytes"), + defaultConfig.get("max.message.bytes"), + "metadataChangeLogTimeseries should inherit max.message.bytes from defaults"); + + // Verify that metadataChangeLogTimeseries has its specific retention.ms value + assertEquals( + metadataChangeLogTimeseriesConfig.get("retention.ms"), + "7776000000", + "metadataChangeLogTimeseries should have its specific retention.ms value"); + + // Verify that the configProperties map contains both default and specific properties + assertTrue( + metadataChangeLogTimeseriesConfig.containsKey("max.message.bytes"), + "metadataChangeLogTimeseries should contain default max.message.bytes"); + assertTrue( + metadataChangeLogTimeseriesConfig.containsKey("retention.ms"), + "metadataChangeLogTimeseries should contain specific retention.ms"); + assertTrue( + metadataChangeLogTimeseriesConfig.size() >= 2, + "metadataChangeLogTimeseries configProperties should contain at least default and specific properties"); + } + + @Test + public void testGetDataHubUsage() { + DataHubAppConfiguration appConfig = testApplication.getDataHubAppConfig(); + KafkaConfiguration kafkaConfig = appConfig.getKafka(); + + TopicsConfiguration topicsConfig = kafkaConfig.getTopics(); + assertNotNull(topicsConfig, "TopicsConfiguration should not be null"); + + // Test getDataHubUsage method + String dataHubUsage = topicsConfig.getDataHubUsage(); + assertNotNull(dataHubUsage, "getDataHubUsage should not return null"); + + // Since datahubUsageEvent topic is configured in the YAML, it should return the topic name + assertEquals( + dataHubUsage, + "DataHubUsageEvent_v1", + "getDataHubUsage should return the datahubUsageEvent topic name from configuration"); + } +} diff --git a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/system_telemetry/OpenTelemetryBaseFactoryTest.java b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/system_telemetry/OpenTelemetryBaseFactoryTest.java index 66121d36ac..3d89b2a08a 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/system_telemetry/OpenTelemetryBaseFactoryTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/system_telemetry/OpenTelemetryBaseFactoryTest.java @@ -27,6 +27,10 @@ public class OpenTelemetryBaseFactoryTest { // Mock TopicsConfiguration since it's not provided in the classes public static class TopicsConfiguration extends com.linkedin.metadata.config.kafka.TopicsConfiguration { + TopicsConfiguration() { + super(null, null); + } + private String dataHubUsage; public String getDataHubUsage() { diff --git a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/telemetry/TrackingServiceFactoryTest.java b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/telemetry/TrackingServiceFactoryTest.java index 1e9f956944..e067c09abf 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/telemetry/TrackingServiceFactoryTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/telemetry/TrackingServiceFactoryTest.java @@ -153,9 +153,9 @@ public class TrackingServiceFactoryTest extends AbstractTestNGSpringContextTests com.linkedin.metadata.config.kafka.KafkaConfiguration config = new com.linkedin.metadata.config.kafka.KafkaConfiguration(); com.linkedin.metadata.config.kafka.TopicsConfiguration topicsConfig = - new com.linkedin.metadata.config.kafka.TopicsConfiguration(); + new com.linkedin.metadata.config.kafka.TopicsConfiguration(null, null); topicsConfig.setDataHubUsage("DataHubUsageEvent_v1"); - config.setTopics(topicsConfig); + config.setTopicsConfiguration(topicsConfig); return config; } @@ -223,9 +223,9 @@ public class TrackingServiceFactoryTest extends AbstractTestNGSpringContextTests com.linkedin.metadata.config.kafka.KafkaConfiguration config = new com.linkedin.metadata.config.kafka.KafkaConfiguration(); com.linkedin.metadata.config.kafka.TopicsConfiguration topicsConfig = - new com.linkedin.metadata.config.kafka.TopicsConfiguration(); + new com.linkedin.metadata.config.kafka.TopicsConfiguration(null, null); topicsConfig.setDataHubUsage("DataHubUsageEvent_v1"); - config.setTopics(topicsConfig); + config.setTopicsConfiguration(topicsConfig); return config; } diff --git a/settings.gradle b/settings.gradle index 61189a3f67..30f794d441 100644 --- a/settings.gradle +++ b/settings.gradle @@ -34,7 +34,6 @@ include ':docker' include ':docker:elasticsearch-setup' include ':docker:mysql-setup' include ':docker:postgres-setup' -include ':docker:kafka-setup' include ':docker:datahub-ingestion' include ':docker:datahub-ingestion-base' include 'metadata-models'