feat(kafka): refactor kafka topic mgmt and move setup to system-update (#14564)

This commit is contained in:
Chakru 2025-08-27 17:03:56 +05:30 committed by GitHub
parent 0252818bd0
commit cbb5f9a9b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 1752 additions and 621 deletions

View File

@ -50,3 +50,4 @@ jobs:
event_name: ${{ github.event.workflow_run.event }}
files: "artifacts/**/*.xml"
comment_mode: off
large_files: true

View File

@ -40,7 +40,7 @@ buildscript {
ext.versionGradle = '8.14.3'
ext.springVersion = '6.2.10'
ext.springBootVersion = '3.4.5'
ext.springKafkaVersion = '3.3.6'
ext.springKafkaVersion = '3.3.8'
ext.openTelemetryVersion = '1.49.0'
ext.neo4jVersion = '5.20.0'
ext.neo4jApocVersion = '5.20.0'

View File

@ -17,7 +17,7 @@ import org.springframework.core.annotation.Order;
@Configuration
@Conditional(SystemUpdateCondition.BlockingSystemUpdateCondition.class)
public class BuildIndicesConfig {
@Order(1)
@Order(2)
@Bean(name = "buildIndices")
public BlockingSystemUpgrade buildIndices(
final SystemMetadataService systemMetadataService,

View File

@ -0,0 +1,26 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.kafka.KafkaSetup;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
@Configuration
@Conditional(SystemUpdateCondition.BlockingSystemUpdateCondition.class)
public class KafkaSetupConfig {
@Autowired private OperationContext opContext;
@Order(1) // This ensures it runs before BuildIndices (@Order(2))
@Bean(name = "kafkaSetup")
public BlockingSystemUpgrade kafkaSetup(
final ConfigurationProvider configurationProvider, KafkaProperties properties) {
return new KafkaSetup(opContext, configurationProvider.getKafka(), properties);
}
}

View File

@ -6,6 +6,7 @@ import com.linkedin.datahub.upgrade.system.browsepaths.BackfillIcebergBrowsePath
import com.linkedin.datahub.upgrade.system.dataprocessinstances.BackfillDataProcessInstances;
import com.linkedin.datahub.upgrade.system.entities.RemoveQueryEdges;
import com.linkedin.datahub.upgrade.system.ingestion.BackfillIngestionSourceInfoIndices;
import com.linkedin.datahub.upgrade.system.kafka.KafkaNonBlockingSetup;
import com.linkedin.datahub.upgrade.system.policyfields.BackfillPolicyFields;
import com.linkedin.datahub.upgrade.system.schemafield.GenerateSchemaFieldsFromSchemaMetadata;
import com.linkedin.datahub.upgrade.system.schemafield.MigrateSchemaFieldDocIds;
@ -19,8 +20,10 @@ import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO;
import io.datahubproject.metadata.context.OperationContext;
import org.opensearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@ -154,4 +157,12 @@ public class NonBlockingConfigs {
return new MigrateSchemaFieldDocIds(
opContext, components, entityService, enabled && hashEnabled, batchSize, delayMs, limit);
}
@Autowired private OperationContext opContext;
@Bean
public NonBlockingSystemUpgrade kafkaSetupNonBlocking(
final ConfigurationProvider configurationProvider, KafkaProperties properties) {
return new KafkaNonBlockingSetup(opContext, configurationProvider.getKafka(), properties);
}
}

View File

@ -0,0 +1,108 @@
package com.linkedin.datahub.upgrade.system.kafka;
import static com.linkedin.gms.factory.kafka.common.AdminClientFactory.buildKafkaAdminClient;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@Slf4j
public class ConfluentSchemaRegistryCleanupPolicyStep implements UpgradeStep {
private final OperationContext _opContext;
private final KafkaConfiguration _kafkaConfiguration;
private final KafkaProperties _kafkaProperties;
public ConfluentSchemaRegistryCleanupPolicyStep(
OperationContext opContext,
KafkaConfiguration kafkaConfiguration,
KafkaProperties kafkaProperties) {
this._opContext = opContext;
this._kafkaConfiguration = kafkaConfiguration;
this._kafkaProperties = kafkaProperties;
}
@Override
public String id() {
return "ConfluentSchemaRegistryCleanupPolicyStep";
}
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
log.info("Configuring Confluent Schema Registry cleanup policies...");
// Check if Confluent Schema Registry is enabled
if (_kafkaConfiguration.getSetup() == null
|| !_kafkaConfiguration.getSetup().isUseConfluentSchemaRegistry()) {
log.info(
"Skipping Confluent Schema Registry cleanup policy configuration - schema registry is disabled");
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED);
}
try {
// Create AdminClient using AdminClientFactory
AdminClient adminClient = createAdminClient();
// Configure cleanup policy for _schemas topic (equivalent to kafka-configs.sh)
String schemasTopic = "_schemas";
log.info("Configuring cleanup policy for schema registry topic: {}", schemasTopic);
// Create config resource for the _schemas topic
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, schemasTopic);
// Create the cleanup policy configuration
ConfigEntry cleanupPolicyEntry = new ConfigEntry("cleanup.policy", "compact");
// Create alter config operation
AlterConfigOp alterConfigOp =
new AlterConfigOp(cleanupPolicyEntry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
configs.put(configResource, Collections.singletonList(alterConfigOp));
// Apply the configuration change
log.info(
"Applying cleanup policy configuration: topic={}, cleanup.policy=compact",
schemasTopic);
adminClient.incrementalAlterConfigs(configs).all().get();
log.info(
"Successfully configured cleanup policy for schema registry topic: {}", schemasTopic);
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED);
} catch (Exception e) {
log.error(
"Failed to configure Confluent Schema Registry cleanup policies: {}",
e.getMessage(),
e);
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.FAILED);
}
};
}
/**
* Creates an AdminClient instance using the AdminClientFactory. This method is extracted to allow
* for mocking in unit tests.
*
* @return AdminClient instance configured with the current Kafka configuration
*/
protected AdminClient createAdminClient() {
return buildKafkaAdminClient(
_kafkaConfiguration, _kafkaProperties, "datahub-upgrade-kafka-setup");
}
}

View File

@ -0,0 +1,194 @@
package com.linkedin.datahub.upgrade.system.kafka;
import static com.linkedin.gms.factory.kafka.common.AdminClientFactory.buildKafkaAdminClient;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.config.kafka.TopicsConfiguration;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.config.TopicBuilder;
@Slf4j
public class CreateKafkaTopicsStep implements UpgradeStep {
private final OperationContext _opContext;
private final KafkaConfiguration _kafkaConfiguration;
private final KafkaProperties _kafkaProperties;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public CreateKafkaTopicsStep(
OperationContext opContext,
KafkaConfiguration kafkaConfiguration,
KafkaProperties kafkaProperties) {
this._opContext = opContext;
this._kafkaConfiguration = kafkaConfiguration;
this._kafkaProperties = kafkaProperties;
}
@Override
public String id() {
return "CreateKafkaTopicsStep";
}
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
if (_kafkaConfiguration.getSetup() == null) {
log.warn("Setup configuration is null - skipping topic creation");
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED);
}
if (!_kafkaConfiguration.getSetup().isPreCreateTopics()) {
log.info("Skipping Kafka topic creation as preCreateTopics is false");
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED);
}
log.info("Creating Kafka topics...");
try {
// Debug logging to understand configuration state
log.info(
"KafkaConfiguration setup: {}",
OBJECT_MAPPER
.writerWithDefaultPrettyPrinter()
.writeValueAsString(_kafkaConfiguration.getSetup()));
log.info(
"KafkaConfiguration topics: {}",
OBJECT_MAPPER
.writerWithDefaultPrettyPrinter()
.writeValueAsString(_kafkaConfiguration.getTopics()));
log.info(
"KafkaConfiguration topicDefaults: {}",
OBJECT_MAPPER
.writerWithDefaultPrettyPrinter()
.writeValueAsString(_kafkaConfiguration.getTopicDefaults()));
// Create AdminClient using AdminClientFactory
AdminClient adminClient = createAdminClient();
// Get topic configurations
TopicsConfiguration topicsConfig = _kafkaConfiguration.getTopics();
if (topicsConfig == null) {
log.warn("Topics configuration is null - skipping topic creation");
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED);
}
if (topicsConfig.getTopics() == null || topicsConfig.getTopics().isEmpty()) {
log.warn("No topics configured for creation");
// We dont really support a scenario of no topics to create with preCreateTopics set to
// true
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.FAILED);
}
// Get existing topics to implement if-not-exists functionality
Set<String> existingTopics = getExistingTopics(adminClient);
log.info("Found {} existing topics: {}", existingTopics.size(), existingTopics);
// Collect all topics to create (only those that don't exist)
List<NewTopic> topicsToCreate = new ArrayList<>();
List<String> topicsToSkip = new ArrayList<>();
for (Map.Entry<String, TopicsConfiguration.TopicConfiguration> entry :
topicsConfig.getTopics().entrySet()) {
String topicKey = entry.getKey();
TopicsConfiguration.TopicConfiguration topicConfig = entry.getValue();
// Skip if topic should not be created
if (!topicConfig.getEnabled()) {
log.debug("Skipping topic {} - create flag is false", topicKey);
continue;
}
String topicName = topicConfig.getName();
// Check if topic already exists
if (existingTopics.contains(topicName)) {
log.info("Topic {} already exists - skipping creation", topicName);
topicsToSkip.add(topicName);
continue;
}
// Use Spring's TopicBuilder to create the topic
TopicBuilder topicBuilder =
TopicBuilder.name(topicName)
.partitions(topicConfig.getPartitions())
.replicas(topicConfig.getReplicationFactor());
// Set topic-specific configurations if provided
if (topicConfig.getConfigProperties() != null
&& !topicConfig.getConfigProperties().isEmpty()) {
topicConfig.getConfigProperties().forEach(topicBuilder::config);
}
topicsToCreate.add(topicBuilder.build());
log.info(
"Preparing to create topic: {} with {} partitions and replication factor {}",
topicName,
topicConfig.getPartitions(),
topicConfig.getReplicationFactor());
}
// Log summary of what will be created vs skipped
if (!topicsToSkip.isEmpty()) {
log.info("Skipping {} existing topics: {}", topicsToSkip.size(), topicsToSkip);
}
if (topicsToCreate.isEmpty()) {
log.info("All configured topics already exist - nothing to create");
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED);
}
// Create all topics in bulk
log.info("Creating {} new topics in bulk", topicsToCreate.size());
CreateTopicsResult result = adminClient.createTopics(topicsToCreate);
result.all().get(); // Wait for all topics to be created
log.info("Successfully created {} Kafka topics", topicsToCreate.size());
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED);
} catch (Exception e) {
log.error("Failed to create Kafka topics: {}", e.getMessage(), e);
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.FAILED);
}
};
}
/** Get the set of existing topic names from Kafka */
private Set<String> getExistingTopics(AdminClient adminClient) throws Exception {
try {
ListTopicsResult listTopicsResult = adminClient.listTopics();
return listTopicsResult.names().get();
} catch (Exception e) {
log.error("Failed to list existing topics: {}", e.getMessage(), e);
throw e;
}
}
/**
* Creates an AdminClient instance using the AdminClientFactory. This method is extracted to allow
* for mocking in unit tests.
*
* @return AdminClient instance configured with the current Kafka configuration
*/
protected AdminClient createAdminClient() {
return buildKafkaAdminClient(
_kafkaConfiguration, _kafkaProperties, "datahub-upgrade-kafka-setup");
}
}

View File

@ -0,0 +1,36 @@
package com.linkedin.datahub.upgrade.system.kafka;
import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@Slf4j
public class KafkaNonBlockingSetup implements NonBlockingSystemUpgrade {
private final List<UpgradeStep> _steps;
public KafkaNonBlockingSetup(
OperationContext opContext,
KafkaConfiguration kafkaConfiguration,
KafkaProperties kafkaProperties) {
_steps =
ImmutableList.of(
new ConfluentSchemaRegistryCleanupPolicyStep(
opContext, kafkaConfiguration, kafkaProperties));
}
@Override
public String id() {
return "KafkaNonBlockingSetup";
}
@Override
public List<UpgradeStep> steps() {
return _steps;
}
}

View File

@ -0,0 +1,36 @@
package com.linkedin.datahub.upgrade.system.kafka;
import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@Slf4j
public class KafkaSetup implements BlockingSystemUpgrade {
private final List<UpgradeStep> _steps;
public KafkaSetup(
OperationContext opContext,
KafkaConfiguration kafkaConfiguration,
KafkaProperties kafkaProperties) {
_steps =
ImmutableList.of(
new WaitForKafkaReadyStep(opContext, kafkaConfiguration, kafkaProperties),
new CreateKafkaTopicsStep(opContext, kafkaConfiguration, kafkaProperties));
}
@Override
public String id() {
return "KafkaSetup";
}
@Override
public List<UpgradeStep> steps() {
return _steps;
}
}

View File

@ -0,0 +1,91 @@
package com.linkedin.datahub.upgrade.system.kafka;
import static com.linkedin.gms.factory.kafka.common.AdminClientFactory.buildKafkaAdminClient;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@Slf4j
public class WaitForKafkaReadyStep implements UpgradeStep {
// Constants for timeout and retry configuration
private static final int RETRY_COUNT = 60;
private static final long RETRY_INTERVAL_SECONDS = 5;
private static final long REQUEST_TIMEOUT_MS = 5000;
private final OperationContext _opContext;
private final KafkaConfiguration _kafkaConfiguration;
private final KafkaProperties _kafkaProperties;
public WaitForKafkaReadyStep(
OperationContext opContext,
KafkaConfiguration kafkaConfiguration,
KafkaProperties kafkaProperties) {
this._opContext = opContext;
this._kafkaConfiguration = kafkaConfiguration;
this._kafkaProperties = kafkaProperties;
}
@Override
public String id() {
return "WaitForKafkaReadyStep";
}
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
log.info("Waiting for Kafka to be ready...");
try {
// Create AdminClient using the extracted method
AdminClient adminClient = createAdminClient();
// Get cluster information to check all brokers are reachable
var clusterResult = adminClient.describeCluster();
var clusterId = clusterResult.clusterId().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
var nodes = clusterResult.nodes().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
// Verify we have brokers available
if (nodes.isEmpty()) {
throw new RuntimeException("No brokers found in Kafka cluster");
}
log.debug("Found {} brokers in Kafka cluster", nodes.size());
log.info("Kafka cluster is ready - {} brokers are reachable and healthy", nodes.size());
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.SUCCEEDED);
} catch (Exception e) {
// Retries is done by UpgradeManager based on retryCount
try {
Thread.sleep(RETRY_INTERVAL_SECONDS * 1000);
} catch (InterruptedException ie) {
log.error("Interrupted while waiting for Kafka to be ready", ie);
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.FAILED);
}
return new DefaultUpgradeStepResult(this.id(), DataHubUpgradeState.FAILED);
}
};
}
protected AdminClient createAdminClient() {
return buildKafkaAdminClient(
_kafkaConfiguration, _kafkaProperties, "datahub-upgrade-kafka-setup");
}
@Override
public int retryCount() {
return RETRY_COUNT;
}
}

View File

@ -3,4 +3,4 @@ management.health.neo4j.enabled=false
ingestion.enabled=false
spring.main.allow-bean-definition-overriding=true
entityClient.impl=restli
metadataChangeProposal.throttle.updateIntervalMs=0
metadataChangeProposal.throttle.updateIntervalMs=0

View File

@ -8,6 +8,7 @@ import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.SystemUpdateBlocking;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCPStep;
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
import com.linkedin.datahub.upgrade.system.kafka.KafkaSetup;
import java.util.List;
import java.util.stream.Collectors;
import javax.inject.Named;
@ -32,7 +33,13 @@ public class DatahubUpgradeBlockingTest extends AbstractTestNGSpringContextTests
@Test
public void testBuildIndicesOrder() {
assertNotNull(blockingSystemUpgrades);
assertTrue(blockingSystemUpgrades.get(0) instanceof BuildIndices);
assertTrue(blockingSystemUpgrades.get(1) instanceof BuildIndices);
}
@Test
public void testKafkaSetupOrder() {
assertNotNull(blockingSystemUpgrades);
assertTrue(blockingSystemUpgrades.get(0) instanceof KafkaSetup);
}
@Test

View File

@ -0,0 +1,164 @@
package com.linkedin.datahub.upgrade.system.kafka;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.config.kafka.SetupConfiguration;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Arrays;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class ConfluentSchemaRegistryCleanupPolicyStepTest {
private ConfluentSchemaRegistryCleanupPolicyStep step;
private OperationContext mockOpContext;
private KafkaConfiguration kafkaConfiguration;
private KafkaProperties kafkaProperties;
@BeforeMethod
public void setUp() {
mockOpContext = mock(OperationContext.class);
// Create real KafkaConfiguration with test data
kafkaConfiguration = new KafkaConfiguration();
kafkaConfiguration.setBootstrapServers("localhost:9092");
SetupConfiguration setupConfig = new SetupConfiguration();
setupConfig.setPreCreateTopics(true);
setupConfig.setUseConfluentSchemaRegistry(true);
kafkaConfiguration.setSetup(setupConfig);
// Create real KafkaProperties with test data
kafkaProperties = new KafkaProperties();
kafkaProperties.setBootstrapServers(Arrays.asList("localhost:9092"));
step =
new ConfluentSchemaRegistryCleanupPolicyStep(
mockOpContext, kafkaConfiguration, kafkaProperties);
}
@Test
public void testId() {
assertEquals(step.id(), "ConfluentSchemaRegistryCleanupPolicyStep");
}
@Test
public void testSkipWhenSchemaRegistryDisabled() {
// Set schema registry to disabled
kafkaConfiguration.getSetup().setUseConfluentSchemaRegistry(false);
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = step.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
}
@Test
public void testSkipWhenSetupConfigIsNull() {
// Set setup configuration as null
kafkaConfiguration.setSetup(null);
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = step.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
}
@Test
public void testSuccessfulCleanupPolicyConfiguration() throws Exception {
// Create spy to mock the createAdminClient method
ConfluentSchemaRegistryCleanupPolicyStep spyStep = spy(step);
// Mock AdminClient and its dependencies
AdminClient mockAdminClient = mock(AdminClient.class);
AlterConfigsResult mockAlterConfigsResult = mock(AlterConfigsResult.class);
// Mock successful alter configs result
KafkaFuture<Void> alterConfigsFuture = mock(KafkaFuture.class);
when(alterConfigsFuture.get()).thenReturn(null); // Success
when(mockAlterConfigsResult.all()).thenReturn(alterConfigsFuture);
when(mockAdminClient.incrementalAlterConfigs(any())).thenReturn(mockAlterConfigsResult);
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
// Verify that incrementalAlterConfigs was called
verify(mockAdminClient, times(1)).incrementalAlterConfigs(any());
}
@Test
public void testAlterConfigsFailure() throws Exception {
// Create spy to mock the createAdminClient method
ConfluentSchemaRegistryCleanupPolicyStep spyStep = spy(step);
// Mock AdminClient to throw exception when altering configs
AdminClient mockAdminClient = mock(AdminClient.class);
when(mockAdminClient.incrementalAlterConfigs(any()))
.thenThrow(new RuntimeException("Failed to alter configs"));
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
@Test
public void testAdminClientException() throws Exception {
// Create spy to mock the createAdminClient method
ConfluentSchemaRegistryCleanupPolicyStep spyStep = spy(step);
// Mock AdminClient to throw exception when calling incrementalAlterConfigs
AdminClient mockAdminClient = mock(AdminClient.class);
AlterConfigsResult mockAlterConfigsResult = mock(AlterConfigsResult.class);
// Mock alter configs result that throws exception
KafkaFuture<Void> alterConfigsFuture = mock(KafkaFuture.class);
when(alterConfigsFuture.get()).thenThrow(new RuntimeException("AdminClient operation failed"));
when(mockAlterConfigsResult.all()).thenReturn(alterConfigsFuture);
when(mockAdminClient.incrementalAlterConfigs(any())).thenReturn(mockAlterConfigsResult);
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
@Test
public void testCreateAdminClientMethod() {
// Test that the createAdminClient method can be called
AdminClient adminClient = step.createAdminClient();
assertNotNull(adminClient);
}
}

View File

@ -0,0 +1,371 @@
package com.linkedin.datahub.upgrade.system.kafka;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.config.kafka.SetupConfiguration;
import com.linkedin.metadata.config.kafka.TopicsConfiguration;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class CreateKafkaTopicsStepTest {
private CreateKafkaTopicsStep step;
private OperationContext mockOpContext;
private KafkaConfiguration kafkaConfiguration;
private KafkaProperties kafkaProperties;
@BeforeMethod
public void setUp() {
mockOpContext = mock(OperationContext.class);
// Create real KafkaConfiguration with test data
kafkaConfiguration = new KafkaConfiguration();
kafkaConfiguration.setBootstrapServers("localhost:9092");
SetupConfiguration setupConfig = new SetupConfiguration();
setupConfig.setPreCreateTopics(true);
setupConfig.setUseConfluentSchemaRegistry(true);
kafkaConfiguration.setSetup(setupConfig);
// Create real KafkaProperties with test data
kafkaProperties = new KafkaProperties();
kafkaProperties.setBootstrapServers(Arrays.asList("localhost:9092"));
step = new CreateKafkaTopicsStep(mockOpContext, kafkaConfiguration, kafkaProperties);
}
@Test
public void testId() {
assertEquals(step.id(), "CreateKafkaTopicsStep");
}
@Test
public void testSkipTopicCreationWhenDisabled() {
// Set preCreateTopics to false
kafkaConfiguration.getSetup().setPreCreateTopics(false);
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = step.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
}
@Test
public void testSkipTopicCreationWhenSetupConfigIsNull() {
// Set setup configuration as null
kafkaConfiguration.setSetup(null);
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = step.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
}
@Test
public void testSkipTopicCreationWhenTopicsConfigIsNull() {
// Set topics configuration as null
kafkaConfiguration.setTopics(null);
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = step.executable().apply(mockContext);
assertNotNull(result);
// When topics is null, getTopics() still returns a TopicsConfiguration with empty topics map
// which causes the step to fail because no topics are configured
assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
@Test
public void testFailWhenNoTopicsConfigured() {
// Set empty topics map directly on KafkaConfiguration
kafkaConfiguration.setTopics(Collections.emptyMap());
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = step.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
@Test
public void testSuccessfulTopicCreation() throws Exception {
// Create spy to mock the createAdminClient method
CreateKafkaTopicsStep spyStep = spy(step);
// Mock AdminClient and its dependencies
AdminClient mockAdminClient = mock(AdminClient.class);
ListTopicsResult mockListTopicsResult = mock(ListTopicsResult.class);
CreateTopicsResult mockCreateTopicsResult = mock(CreateTopicsResult.class);
// Mock existing topics (empty set - no existing topics)
KafkaFuture<Set<String>> listTopicsFuture = mock(KafkaFuture.class);
when(listTopicsFuture.get()).thenReturn(new HashSet<>());
when(mockListTopicsResult.names()).thenReturn(listTopicsFuture);
when(mockAdminClient.listTopics()).thenReturn(mockListTopicsResult);
// Mock topic creation result
KafkaFuture<Void> createTopicsFuture = mock(KafkaFuture.class);
when(createTopicsFuture.get()).thenReturn(null); // Success
when(mockCreateTopicsResult.all()).thenReturn(createTopicsFuture);
when(mockAdminClient.createTopics(any())).thenReturn(mockCreateTopicsResult);
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
// Set up topics configuration with test topics
Map<String, TopicsConfiguration.TopicConfiguration> topics = new HashMap<>();
TopicsConfiguration.TopicConfiguration topicConfig =
new TopicsConfiguration.TopicConfiguration();
topicConfig.setName("test-topic");
topicConfig.setPartitions(3);
topicConfig.setReplicationFactor(1);
topicConfig.setEnabled(true);
topics.put("testTopic", topicConfig);
kafkaConfiguration.setTopics(topics);
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
// Verify that createTopics was called
verify(mockAdminClient, times(1)).createTopics(any());
}
@Test
public void testSkipExistingTopics() throws Exception {
// Create spy to mock the createAdminClient method
CreateKafkaTopicsStep spyStep = spy(step);
// Mock AdminClient and its dependencies
AdminClient mockAdminClient = mock(AdminClient.class);
ListTopicsResult mockListTopicsResult = mock(ListTopicsResult.class);
// Mock existing topics (topic already exists)
Set<String> existingTopics = new HashSet<>();
existingTopics.add("test-topic");
KafkaFuture<Set<String>> listTopicsFuture = mock(KafkaFuture.class);
when(listTopicsFuture.get()).thenReturn(existingTopics);
when(mockListTopicsResult.names()).thenReturn(listTopicsFuture);
when(mockAdminClient.listTopics()).thenReturn(mockListTopicsResult);
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
// Set up topics configuration with test topics
Map<String, TopicsConfiguration.TopicConfiguration> topics = new HashMap<>();
TopicsConfiguration.TopicConfiguration topicConfig =
new TopicsConfiguration.TopicConfiguration();
topicConfig.setName("test-topic");
topicConfig.setPartitions(3);
topicConfig.setReplicationFactor(1);
topicConfig.setEnabled(true);
topics.put("testTopic", topicConfig);
kafkaConfiguration.setTopics(topics);
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
// Verify that createTopics was NOT called since topic already exists
verify(mockAdminClient, times(0)).createTopics(any());
}
@Test
public void testSkipTopicWhenCreateFlagIsFalse() throws Exception {
// Create spy to mock the createAdminClient method
CreateKafkaTopicsStep spyStep = spy(step);
// Mock AdminClient and its dependencies
AdminClient mockAdminClient = mock(AdminClient.class);
ListTopicsResult mockListTopicsResult = mock(ListTopicsResult.class);
// Mock existing topics (empty set)
KafkaFuture<Set<String>> listTopicsFuture = mock(KafkaFuture.class);
when(listTopicsFuture.get()).thenReturn(new HashSet<>());
when(mockListTopicsResult.names()).thenReturn(listTopicsFuture);
when(mockAdminClient.listTopics()).thenReturn(mockListTopicsResult);
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
// Set up topics configuration with create flag set to false
Map<String, TopicsConfiguration.TopicConfiguration> topics = new HashMap<>();
TopicsConfiguration.TopicConfiguration topicConfig =
new TopicsConfiguration.TopicConfiguration();
topicConfig.setName("test-topic");
topicConfig.setPartitions(3);
topicConfig.setReplicationFactor(1);
topicConfig.setEnabled(false); // Don't create this topic
topics.put("testTopic", topicConfig);
kafkaConfiguration.setTopics(topics);
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
// Verify that createTopics was NOT called since create flag is false
verify(mockAdminClient, times(0)).createTopics(any());
}
@Test
public void testTopicCreationWithConfigProperties() throws Exception {
// Create spy to mock the createAdminClient method
CreateKafkaTopicsStep spyStep = spy(step);
// Mock AdminClient and its dependencies
AdminClient mockAdminClient = mock(AdminClient.class);
ListTopicsResult mockListTopicsResult = mock(ListTopicsResult.class);
CreateTopicsResult mockCreateTopicsResult = mock(CreateTopicsResult.class);
// Mock existing topics (empty set)
KafkaFuture<Set<String>> listTopicsFuture = mock(KafkaFuture.class);
when(listTopicsFuture.get()).thenReturn(new HashSet<>());
when(mockListTopicsResult.names()).thenReturn(listTopicsFuture);
when(mockAdminClient.listTopics()).thenReturn(mockListTopicsResult);
// Mock topic creation result
KafkaFuture<Void> createTopicsFuture = mock(KafkaFuture.class);
when(createTopicsFuture.get()).thenReturn(null); // Success
when(mockCreateTopicsResult.all()).thenReturn(createTopicsFuture);
when(mockAdminClient.createTopics(any())).thenReturn(mockCreateTopicsResult);
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
// Set up topics configuration with config properties
Map<String, TopicsConfiguration.TopicConfiguration> topics = new HashMap<>();
TopicsConfiguration.TopicConfiguration topicConfig =
new TopicsConfiguration.TopicConfiguration();
topicConfig.setName("test-topic");
topicConfig.setPartitions(3);
topicConfig.setReplicationFactor(1);
topicConfig.setEnabled(true);
// Add config properties
Map<String, String> configProperties = new HashMap<>();
configProperties.put("retention.ms", "86400000");
configProperties.put("cleanup.policy", "delete");
topicConfig.setConfigProperties(configProperties);
topics.put("testTopic", topicConfig);
kafkaConfiguration.setTopics(topics);
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
// Verify that createTopics was called
verify(mockAdminClient, times(1)).createTopics(any());
}
@Test
public void testTopicCreationFailure() throws Exception {
// Create spy to mock the createAdminClient method
CreateKafkaTopicsStep spyStep = spy(step);
// Mock AdminClient and its dependencies
AdminClient mockAdminClient = mock(AdminClient.class);
ListTopicsResult mockListTopicsResult = mock(ListTopicsResult.class);
// Mock existing topics (empty set)
KafkaFuture<Set<String>> listTopicsFuture = mock(KafkaFuture.class);
when(listTopicsFuture.get()).thenReturn(new HashSet<>());
when(mockListTopicsResult.names()).thenReturn(listTopicsFuture);
when(mockAdminClient.listTopics()).thenReturn(mockListTopicsResult);
// Mock topic creation to throw exception
when(mockAdminClient.createTopics(any()))
.thenThrow(new RuntimeException("Topic creation failed"));
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
// Set up topics configuration
Map<String, TopicsConfiguration.TopicConfiguration> topics = new HashMap<>();
TopicsConfiguration.TopicConfiguration topicConfig =
new TopicsConfiguration.TopicConfiguration();
topicConfig.setName("test-topic");
topicConfig.setPartitions(3);
topicConfig.setReplicationFactor(1);
topicConfig.setEnabled(true);
topics.put("testTopic", topicConfig);
kafkaConfiguration.setTopics(topics);
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
@Test
public void testListTopicsFailure() throws Exception {
// Create spy to mock the createAdminClient method
CreateKafkaTopicsStep spyStep = spy(step);
// Mock AdminClient to throw exception when listing topics
AdminClient mockAdminClient = mock(AdminClient.class);
when(mockAdminClient.listTopics()).thenThrow(new RuntimeException("Failed to list topics"));
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
// Set up topics configuration
Map<String, TopicsConfiguration.TopicConfiguration> topics = new HashMap<>();
TopicsConfiguration.TopicConfiguration topicConfig =
new TopicsConfiguration.TopicConfiguration();
topicConfig.setName("test-topic");
topicConfig.setPartitions(3);
topicConfig.setReplicationFactor(1);
topicConfig.setEnabled(true);
topics.put("testTopic", topicConfig);
kafkaConfiguration.setTopics(topics);
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
@Test
public void testCreateAdminClientMethod() {
// Test that the createAdminClient method can be called
AdminClient adminClient = step.createAdminClient();
assertNotNull(adminClient);
}
}

View File

@ -0,0 +1,67 @@
package com.linkedin.datahub.upgrade.system.kafka;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.config.kafka.SetupConfiguration;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Arrays;
import java.util.List;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class KafkaNonBlockingSetupTest {
private KafkaNonBlockingSetup upgrade;
@Mock private OperationContext mockOpContext;
private KafkaConfiguration kafkaConfiguration;
private KafkaProperties kafkaProperties;
@BeforeMethod
public void setUp() {
MockitoAnnotations.openMocks(this);
// Create real KafkaConfiguration with test data
kafkaConfiguration = new KafkaConfiguration();
kafkaConfiguration.setBootstrapServers("localhost:9092");
SetupConfiguration setupConfig = new SetupConfiguration();
setupConfig.setPreCreateTopics(true);
setupConfig.setUseConfluentSchemaRegistry(true);
kafkaConfiguration.setSetup(setupConfig);
// Create real KafkaProperties with test data
kafkaProperties = new KafkaProperties();
kafkaProperties.setBootstrapServers(Arrays.asList("localhost:9092"));
upgrade = new KafkaNonBlockingSetup(mockOpContext, kafkaConfiguration, kafkaProperties);
}
@Test
public void testId() {
assertEquals(upgrade.id(), "KafkaNonBlockingSetup");
}
@Test
public void testSteps() {
List<UpgradeStep> steps = upgrade.steps();
assertNotNull(steps);
assertEquals(steps.size(), 1);
assertTrue(steps.get(0) instanceof ConfluentSchemaRegistryCleanupPolicyStep);
}
@Test
public void testStepsContainsCorrectStep() {
List<UpgradeStep> steps = upgrade.steps();
ConfluentSchemaRegistryCleanupPolicyStep step =
(ConfluentSchemaRegistryCleanupPolicyStep) steps.get(0);
assertEquals(step.id(), "ConfluentSchemaRegistryCleanupPolicyStep");
}
}

View File

@ -0,0 +1,172 @@
package com.linkedin.datahub.upgrade.system.kafka;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.config.kafka.SetupConfiguration;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class WaitForKafkaReadyStepTest {
private WaitForKafkaReadyStep step;
private WaitForKafkaReadyStep spyStep;
private OperationContext mockOpContext;
private KafkaConfiguration kafkaConfiguration;
private KafkaProperties kafkaProperties;
private AdminClient mockAdminClient;
private DescribeClusterResult mockDescribeClusterResult;
@BeforeMethod
public void setUp() {
mockOpContext = mock(OperationContext.class);
// Create real KafkaConfiguration with test data
kafkaConfiguration = new KafkaConfiguration();
kafkaConfiguration.setBootstrapServers("localhost:9092");
SetupConfiguration setupConfig = new SetupConfiguration();
setupConfig.setPreCreateTopics(true);
setupConfig.setUseConfluentSchemaRegistry(true);
kafkaConfiguration.setSetup(setupConfig);
// Create real KafkaProperties with test data
kafkaProperties = new KafkaProperties();
kafkaProperties.setBootstrapServers(Arrays.asList("localhost:9092"));
mockAdminClient = mock(AdminClient.class);
mockDescribeClusterResult = mock(DescribeClusterResult.class);
step = new WaitForKafkaReadyStep(mockOpContext, kafkaConfiguration, kafkaProperties);
spyStep = spy(step);
}
@Test
public void testId() {
assertEquals(step.id(), "WaitForKafkaReadyStep");
}
@Test
public void testRetryCount() {
assertEquals(step.retryCount(), 60);
}
@Test
public void testSuccessfulKafkaReady() throws Exception {
// Mock successful cluster description
Node mockNode1 = new Node(1, "localhost", 9092);
Node mockNode2 = new Node(2, "localhost", 9093);
KafkaFuture<String> clusterIdFuture = mock(KafkaFuture.class);
KafkaFuture<Collection<Node>> nodesFuture = mock(KafkaFuture.class);
when(clusterIdFuture.get(anyLong(), any(TimeUnit.class))).thenReturn("test-cluster-id");
when(nodesFuture.get(anyLong(), any(TimeUnit.class)))
.thenReturn(Arrays.asList(mockNode1, mockNode2));
when(mockDescribeClusterResult.clusterId()).thenReturn(clusterIdFuture);
when(mockDescribeClusterResult.nodes()).thenReturn(nodesFuture);
when(mockAdminClient.describeCluster()).thenReturn(mockDescribeClusterResult);
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
}
@Test
public void testKafkaReadyWithNoBrokers() throws Exception {
// Mock cluster description with no brokers
KafkaFuture<String> clusterIdFuture = mock(KafkaFuture.class);
KafkaFuture<Collection<Node>> nodesFuture = mock(KafkaFuture.class);
when(clusterIdFuture.get(anyLong(), any(TimeUnit.class))).thenReturn("test-cluster-id");
when(nodesFuture.get(anyLong(), any(TimeUnit.class))).thenReturn(Collections.emptyList());
when(mockDescribeClusterResult.clusterId()).thenReturn(clusterIdFuture);
when(mockDescribeClusterResult.nodes()).thenReturn(nodesFuture);
when(mockAdminClient.describeCluster()).thenReturn(mockDescribeClusterResult);
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
@Test
public void testKafkaReadyWithException() throws Exception {
// Mock AdminClient to throw exception
when(mockAdminClient.describeCluster())
.thenThrow(new RuntimeException("Kafka connection failed"));
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
@Test
public void testKafkaReadyWithTimeoutException() throws Exception {
// Mock timeout exception
KafkaFuture<String> clusterIdFuture = mock(KafkaFuture.class);
when(clusterIdFuture.get(anyLong(), any(TimeUnit.class)))
.thenThrow(new java.util.concurrent.TimeoutException("Timeout"));
when(mockDescribeClusterResult.clusterId()).thenReturn(clusterIdFuture);
when(mockAdminClient.describeCluster()).thenReturn(mockDescribeClusterResult);
// Mock the createAdminClient method to return our mock AdminClient
doReturn(mockAdminClient).when(spyStep).createAdminClient();
UpgradeContext mockContext = mock(UpgradeContext.class);
UpgradeStepResult result = spyStep.executable().apply(mockContext);
assertNotNull(result);
assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
@Test
public void testCreateAdminClientMethod() {
// Test that the createAdminClient method exists and can be called
// This will fail in tests since we don't have real Kafka configuration,
// but it verifies the method structure
try {
step.createAdminClient();
} catch (Exception e) {
// Expected to fail in test environment, but method should exist
assertNotNull(e);
}
}
}

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- Suppress Kafka client info logs -->
<logger name="org.apache.kafka" level="WARN"/>
<!-- Suppress application info logs for the kafka package -->
<logger name="com.linkedin.datahub.upgrade.system.kafka" level="WARN"/>
<!-- Set root logger to WARN to suppress most info logs during tests -->
<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
<!-- Console appender for test output -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
</configuration>

View File

@ -44,7 +44,6 @@ Do not use `latest` or `debug` tags for any of the image as those are not suppor
- [acryldata/datahub-mae-consumer](https://hub.docker.com/repository/docker/acryldata/datahub-mae-consumer/)
- [acryldata/datahub-mce-consumer](https://hub.docker.com/repository/docker/acryldata/datahub-mce-consumer/)
- [acryldata/datahub-upgrade](https://hub.docker.com/r/acryldata/datahub-upgrade/)
- [acryldata/datahub-kafka-setup](https://hub.docker.com/r/acryldata/datahub-kafka-setup/)
- [acryldata/datahub-elasticsearch-setup](https://hub.docker.com/r/acryldata/datahub-elasticsearch-setup/)
- [acryldata/datahub-mysql-setup](https://hub.docker.com/r/acryldata/datahub-mysql-setup/)
- [acryldata/datahub-postgres-setup](https://hub.docker.com/r/acryldata/datahub-postgres-setup/)
@ -52,7 +51,6 @@ Do not use `latest` or `debug` tags for any of the image as those are not suppor
Dependencies:
- [Kafka, Zookeeper, and Schema Registry](kafka-setup)
- [Elasticsearch](elasticsearch-setup)
- [MySQL](mysql)
- [(Optional) Neo4j](neo4j)

View File

@ -15,7 +15,6 @@ ext {
backend_profile_modules = [
':docker:elasticsearch-setup',
':docker:mysql-setup',
':docker:kafka-setup',
':datahub-upgrade',
':metadata-service:war',
]

View File

@ -33,11 +33,6 @@ if [[ $ENTITY_SERVICE_IMPL == cassandra ]] && [[ $SKIP_CASSANDRA_CHECK != true ]
WAIT_FOR_CASSANDRA=" -wait tcp://$CASSANDRA_DATASOURCE_HOST "
fi
WAIT_FOR_KAFKA=""
if [[ $SKIP_KAFKA_CHECK != true ]]; then
WAIT_FOR_KAFKA=" -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') "
fi
WAIT_FOR_NEO4J=""
if [[ $GRAPH_SERVICE_IMPL != elasticsearch ]] && [[ $SKIP_NEO4J_CHECK != true ]]; then
WAIT_FOR_NEO4J=" -wait $NEO4J_HOST "
@ -62,7 +57,6 @@ fi
COMMON="
$WAIT_FOR_EBEAN \
$WAIT_FOR_CASSANDRA \
$WAIT_FOR_KAFKA \
$WAIT_FOR_NEO4J \
-timeout 240s \
java $JAVA_OPTS $JMX_OPTS \

View File

@ -1,137 +0,0 @@
# Defining custom repo urls for use in enterprise environments
ARG ALPINE_REPO_URL=http://dl-cdn.alpinelinux.org/alpine
ARG GITHUB_REPO_URL=https://github.com
ARG MAVEN_CENTRAL_REPO_URL=https://repo1.maven.org/maven2
ARG APACHE_DOWNLOAD_URL=null
# Base stage with common dependencies
FROM alpine:3.22 AS base
# Re-declaring args from above
ARG ALPINE_REPO_URL
ARG GITHUB_REPO_URL
ARG MAVEN_CENTRAL_REPO_URL
ARG APACHE_DOWNLOAD_URL
# Kafka specific args
ARG KAFKA_VERSION=4.0.0
ARG SCALA_VERSION=2.13
ARG CONFLUENT_VERSION=8.0.0
# Environment variables
ENV KAFKA_VERSION=${KAFKA_VERSION}
ENV SCALA_VERSION=${SCALA_VERSION}
ENV KAFKA_WORKDIR=/opt/kafka
LABEL name="kafka" version=${KAFKA_VERSION}
# Optionally set corporate mirror for apk
RUN if [ "${ALPINE_REPO_URL}" != "http://dl-cdn.alpinelinux.org/alpine" ] ; then \
sed -i "s#http.*://dl-cdn.alpinelinux.org/alpine#${ALPINE_REPO_URL}#g" /etc/apk/repositories ; fi
# Upgrade Alpine and install base packages
RUN apk --no-cache --update-cache --available upgrade \
&& apk --no-cache add 'c-ares>1.34.5' --repository=${ALPINE_REPO_URL}/edge/main \
&& apk --no-cache add \
bash \
coreutils \
curl \
ca-certificates \
jq \
python3 \
py3-pip \
wget \
zip \
gcompat \
sqlite \
libc6-compat \
snappy=~1.2 --repository=${ALPINE_REPO_URL}/edge/main \
&& apk --no-cache add openjdk17-jre-headless --repository=${ALPINE_REPO_URL}/edge/community
# Create directories
RUN mkdir -p /opt/kafka /usr/share/java/cp-base-new \
&& cp /usr/lib/jvm/java-17-openjdk/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks
# Download and install Kafka
RUN if [ "${APACHE_DOWNLOAD_URL}" != "null" ] ; then \
mirror="${APACHE_DOWNLOAD_URL}/" ; \
else \
mirror=$(curl --stderr /dev/null https://www.apache.org/dyn/closer.cgi\?as_json\=1 | jq -r '.preferred'); \
fi && \
curl -sSL "${mirror}kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" \
| tar -xzf - -C /opt && \
# Check if extraction created the expected directory
if [ -d "/opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}" ]; then \
# Move contents properly, not the directory itself
cp -r /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}/* ${KAFKA_WORKDIR}/ && \
rm -rf /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}; \
fi
# Download Confluent dependencies
ARG CONFLUENT_REPO_URL=https://packages.confluent.io/maven
RUN wget -P /usr/share/java/cp-base-new \
${CONFLUENT_REPO_URL}/io/confluent/common-utils/${CONFLUENT_VERSION}/common-utils-${CONFLUENT_VERSION}.jar \
&& wget -P /usr/share/java/cp-base-new \
${CONFLUENT_REPO_URL}/io/confluent/common-config/${CONFLUENT_VERSION}/common-config-${CONFLUENT_VERSION}.jar
# Fix security vulnerabilities (after Kafka is installed so libs directory exists)
ARG COMMONS_BEAN_UTILS_VERSION="1.11.0"
ARG COMMONS_LANG3_VERSION="3.18.0"
RUN wget -P /usr/share/java/cp-base-new \
${MAVEN_CENTRAL_REPO_URL}/commons-beanutils/commons-beanutils/${COMMONS_BEAN_UTILS_VERSION}/commons-beanutils-${COMMONS_BEAN_UTILS_VERSION}.jar \
&& rm -f ${KAFKA_WORKDIR}/libs/commons-beanutils-*.jar \
&& cp /usr/share/java/cp-base-new/commons-beanutils-${COMMONS_BEAN_UTILS_VERSION}.jar ${KAFKA_WORKDIR}/libs/ \
&& wget -P /usr/share/java/cp-base-new \
${MAVEN_CENTRAL_REPO_URL}/org/apache/commons/commons-lang3/${COMMONS_LANG3_VERSION}/commons-lang3-${COMMONS_LANG3_VERSION}.jar \
&& rm -f ${KAFKA_WORKDIR}/libs/commons-lang3-*.jar \
&& cp /usr/share/java/cp-base-new/commons-lang3-${COMMONS_LANG3_VERSION}.jar ${KAFKA_WORKDIR}/libs/
# Download AWS MSK IAM Auth
ADD ${GITHUB_REPO_URL}/aws/aws-msk-iam-auth/releases/download/v2.3.2/aws-msk-iam-auth-2.3.2-all.jar /usr/share/java/cp-base-new/
ADD ${GITHUB_REPO_URL}/aws/aws-msk-iam-auth/releases/download/v2.3.2/aws-msk-iam-auth-2.3.2-all.jar ${KAFKA_WORKDIR}/libs/
# Set LD_LIBRARY_PATH for compatibility
ENV LD_LIBRARY_PATH="/lib:/lib64"
# Copy setup scripts
COPY docker/kafka-setup/kafka-setup.sh ${KAFKA_WORKDIR}/kafka-setup.sh
COPY docker/kafka-setup/kafka-config.sh ${KAFKA_WORKDIR}/kafka-config.sh
COPY docker/kafka-setup/kafka-topic-workers.sh ${KAFKA_WORKDIR}/kafka-topic-workers.sh
COPY docker/kafka-setup/kafka-ready.sh ${KAFKA_WORKDIR}/kafka-ready.sh
COPY docker/kafka-setup/env_to_properties.py ${KAFKA_WORKDIR}/env_to_properties.py
# Make scripts executable
RUN chmod +x ${KAFKA_WORKDIR}/kafka-setup.sh \
${KAFKA_WORKDIR}/kafka-topic-workers.sh \
${KAFKA_WORKDIR}/kafka-ready.sh
# Create kafka user and group
RUN addgroup -S kafka && adduser -S kafka -G kafka && chmod g-s /home/kafka
# Set ownership
RUN chown -R kafka:kafka ${KAFKA_WORKDIR} \
&& chown -R kafka:kafka /usr/share/java/cp-base-new
# Switch to kafka user
USER kafka
# Set environment variables for DataHub
ENV METADATA_AUDIT_EVENT_NAME="MetadataAuditEvent_v4"
ENV METADATA_CHANGE_EVENT_NAME="MetadataChangeEvent_v4"
ENV FAILED_METADATA_CHANGE_EVENT_NAME="FailedMetadataChangeEvent_v4"
ENV DATAHUB_USAGE_EVENT_NAME="DataHubUsageEvent_v1"
ENV METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME="MetadataChangeLog_Versioned_v1"
ENV METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME="MetadataChangeLog_Timeseries_v1"
ENV METADATA_CHANGE_PROPOSAL_TOPIC_NAME="MetadataChangeProposal_v1"
ENV FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME="FailedMetadataChangeProposal_v1"
ENV PLATFORM_EVENT_TOPIC_NAME="PlatformEvent_v1"
ENV DATAHUB_UPGRADE_HISTORY_TOPIC_NAME="DataHubUpgradeHistory_v1"
ENV USE_CONFLUENT_SCHEMA_REGISTRY="TRUE"
# Set PATH
ENV PATH=/sbin:${KAFKA_WORKDIR}/bin:$PATH
# Set working directory
WORKDIR ${KAFKA_WORKDIR}
CMD ["./kafka-setup.sh"]

View File

@ -1,17 +0,0 @@
# Kafka, Zookeeper and Schema Registry
DataHub uses Kafka as the pub-sub message queue in the backend.
[Official Confluent Kafka Docker images](https://hub.docker.com/u/confluentinc) found in Docker Hub is used without
any modification.
## Debugging Kafka
You can install [kafkacat](https://github.com/edenhill/kafkacat) to consume and produce messaged to Kafka topics.
For example, to consume messages on MetadataAuditEvent topic, you can run below command.
```
kafkacat -b localhost:9092 -t MetadataAuditEvent
```
However, `kafkacat` currently doesn't support Avro deserialization at this point,
but they have an ongoing [work](https://github.com/edenhill/kafkacat/pull/151) for that.

View File

@ -1,47 +0,0 @@
plugins {
id 'java' // required for versioning
}
apply from: "../../gradle/versioning/versioning.gradle"
apply from: "../../gradle/docker/docker.gradle"
ext {
docker_repo = 'datahub-kafka-setup'
docker_dir = 'kafka-setup'
}
docker {
dependsOn(build)
name "${docker_registry}/${docker_repo}:${versionTag}"
dockerfile file("${rootProject.projectDir}/docker/${docker_dir}/Dockerfile")
files fileTree(rootProject.projectDir) {
include '.dockerignore'
include "docker/${docker_dir}/*"
}.exclude {
i -> (!i.file.name.endsWith(".dockerignore") && i.file.isHidden())
}
additionalTag("Debug", "${docker_registry}/${docker_repo}:debug")
// Add build args if they are defined (needed for some CI or enterprise environments)
def dockerBuildArgs = [:]
if (project.hasProperty('alpineApkRepositoryUrl')) {
dockerBuildArgs.ALPINE_REPO_URL = project.getProperty('alpineApkRepositoryUrl')
}
if (project.hasProperty('githubMirrorUrl')) {
dockerBuildArgs.GITHUB_REPO_URL = project.getProperty('githubMirrorUrl')
}
if (project.hasProperty('mavenCentralRepositoryUrl')) {
dockerBuildArgs.MAVEN_CENTRAL_REPO_URL = project.getProperty('mavenCentralRepositoryUrl')
}
if (project.hasProperty('apacheDownloadUrl')) {
dockerBuildArgs.APACHE_DOWNLOAD_URL = project.getProperty('apacheDownloadUrl')
}
if (project.hasProperty('confluentRepositoryUrl')) {
dockerBuildArgs.CONFLUENT_REPO_URL = project.getProperty('confluentRepositoryUrl')
}
if (dockerBuildArgs.size() > 0) {
buildArgs(dockerBuildArgs)
}
}

View File

@ -1,33 +0,0 @@
KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
KAFKA_BOOTSTRAP_SERVER=broker:29092
USE_CONFLUENT_SCHEMA_REGISTRY=TRUE
# Configure the topics that are created by kafka-setup
# Make sure these names are consistent across the whole deployment
# METADATA_CHANGE_PROPOSAL_TOPIC_NAME=MetadataChangeProposal_v1
# FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME=FailedMetadataChangeProposal_v1
# METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=MetadataChangeLog_Versioned_v1
# METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME=MetadataChangeLog_Timeseries_v1
# PLATFORM_EVENT_TOPIC_NAME=PlatformEvent_v1
# DATAHUB_USAGE_EVENT_NAME=DataHubUsageEvent_v1
# Deprecated!
# METADATA_AUDIT_EVENT_NAME=MetadataAuditEvent_v4
# METADATA_CHANGE_EVENT_NAME=MetadataChangeEvent_v4
# FAILED_METADATA_CHANGE_EVENT_NAME=FailedMetadataChangeEvent_v4
# PLATFORM_EVENT_TOPIC_NAME=PlatformEvent_v1
# DATAHUB_USAGE_EVENT_NAME=DataHubUsageEvent_v1
# DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=DataHubUpgradeHistory_v1
# PARTITIONS=1
# REPLICATION_FACTOR=1
# Configure for an SSL-Connection to Kafka
# KAFKA_PROPERTIES_SECURITY_PROTOCOL=SSL
# KAFKA_PROPERTIES_SSL_KEYSTORE_LOCATION=
# KAFKA_PROPERTIES_SSL_KEYSTORE_PASSWORD=
# KAFKA_PROPERTIES_SSL_KEY_PASSWORD=
# KAFKA_PROPERTIES_SSL_TRUSTSTORE_LOCATION=
# KAFKA_PROPERTIES_SSL_TRUSTSTORE_PASSWORD=
# KAFKA_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
# Uncomment to disable persistence of client-side analytics events
# DATAHUB_ANALYTICS_ENABLED=false

View File

@ -1,24 +0,0 @@
import os
import re
import sys
def env_to_properties(env_prefix: str, properties_file: str):
pattern = re.compile('(?<=[^_])_(?=[^_])')
props = {}
for (env_name, val) in os.environ.items():
if env_name.startswith(env_prefix):
raw_name = env_name[len(env_prefix):].lower()
prop_dot = '.'.join(pattern.split(raw_name))
props[prop_dot] = val
with open(properties_file, 'a') as f:
for k, v in props.items():
f.writelines(f'{k}={v}\n')
if __name__ == '__main__':
env_prefix = sys.argv[1]
properties_file = sys.argv[2]
env_to_properties(env_prefix, properties_file)

View File

@ -1,17 +0,0 @@
#!/usr/bin/env bash
: ${PARTITIONS:=1}
: ${REPLICATION_FACTOR:=1}
: ${MAX_MESSAGE_BYTES:=5242880}
: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:=PLAINTEXT}
: ${DATAHUB_ANALYTICS_ENABLED:=true}
: ${KAFKA_HEAP_OPTS:=-Xmx64M}
export KAFKA_HEAP_OPTS
CONNECTION_PROPERTIES_PATH=/tmp/connection.properties
WORKERS=4
DELIMITER=";"

View File

@ -1,14 +0,0 @@
#!/bin/bash
for i in {1..60}
do
kafka-broker-api-versions.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER
if [ $? -eq 0 ]; then
break
fi
if [ $i -eq 60 ]; then
echo "Kafka bootstrap server $KAFKA_BOOTSTRAP_SERVER not ready."
exit 1
fi
sleep 5s
done

View File

@ -1,133 +0,0 @@
#!/bin/bash
## Exit early if PRECREATION is not needed
if [[ $DATAHUB_PRECREATE_TOPICS == "false" ]]; then
echo "DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS}"
echo "Pre-creation of topics has been turned off, exiting"
exit 0
fi
. kafka-config.sh
echo "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVER" > $CONNECTION_PROPERTIES_PATH
python env_to_properties.py KAFKA_PROPERTIES_ $CONNECTION_PROPERTIES_PATH
# cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180
. kafka-ready.sh
############################################################
# Start Topic Creation Logic
############################################################
# make the files
START=$(mktemp -t start-XXXX)
FIFO=$(mktemp -t fifo-XXXX)
FIFO_LOCK=$(mktemp -t lock-XXXX)
START_LOCK=$(mktemp -t lock-XXXX)
## mktemp makes a regular file. Delete that an make a fifo.
rm $FIFO
mkfifo $FIFO
echo $FIFO
## create a trap to cleanup on exit if we fail in the middle.
cleanup() {
rm $FIFO
rm $START
rm $FIFO_LOCK
rm $START_LOCK
}
trap cleanup 0
# Start worker script
. kafka-topic-workers.sh $START $FIFO $FIFO_LOCK $START_LOCK
## Open the fifo for writing.
exec 3>$FIFO
## Open the start lock for reading
exec 4<$START_LOCK
## Wait for the workers to start
while true; do
flock 4
started=$(wc -l $START | cut -d \ -f 1)
flock -u 4
if [[ $started -eq $WORKERS ]]; then
break
else
echo waiting, started $started of $WORKERS
fi
done
exec 4<&-
## utility function to send the jobs to the workers
send() {
work_id=$1
topic_args=$2
topic_config=$3
echo -e "sending $work_id\n worker_args: ${topic_args}${DELIMITER}${topic_config}"
echo "$work_id" "${topic_args}${DELIMITER}${topic_config}" 1>&3 ## the fifo is fd 3
}
## Produce the jobs to run.
send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME" \
"--entity-type topics --entity-name $METADATA_AUDIT_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME" \
"--entity-type topics --entity-name $FAILED_METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
# Set retention to 90 days
send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \
"--entity-type topics --entity-name $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME" \
"--entity-type topics --entity-name $PLATFORM_EVENT_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
# Infinite retention upgrade topic
# Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite
# Please see the bug report below for details
# https://github.com/datahub-project/datahub/issues/7882
send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" \
"--entity-type topics --entity-name "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" --alter --add-config retention.ms=-1"
# Create topic for datahub usage event
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
send "$DATAHUB_USAGE_EVENT_NAME" "--partitions $PARTITIONS --topic $DATAHUB_USAGE_EVENT_NAME"
fi
## close the filo
exec 3<&-
## disable the cleanup trap
trap '' 0
## It is safe to delete the files because the workers
## already opened them. Thus, only the names are going away
## the actual files will stay there until the workers
## all finish.
cleanup
## now wait for all the workers.
wait
echo "Topic Creation Complete."
############################################################
# End Topic Creation Logic
############################################################
## If using confluent schema registry as a standalone component, then configure compact cleanup policy.
if [[ $USE_CONFLUENT_SCHEMA_REGISTRY == "TRUE" ]]; then
kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \
--entity-type topics \
--entity-name _schemas \
--alter --add-config cleanup.policy=compact
fi

View File

@ -1,74 +0,0 @@
#!/usr/bin/env bash
. kafka-config.sh
START=$1
FIFO=$2
FIFO_LOCK=$3
START_LOCK=$4
## this is the "job" function which is does whatever work
## the queue workers are supposed to be doing
job() {
i=$1
worker_args=$2
topic_args=$(echo $worker_args | cut -d "$DELIMITER" -f 1)
topic_config=$(echo $worker_args | cut -d "$DELIMITER" -f 2)
echo " $i: kafka-topics.sh --create --if-not-exist $topic_args"
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \
--replication-factor $REPLICATION_FACTOR \
$topic_args
if [[ ! -z "$topic_config" ]]; then
echo " $i: kafka-configs.sh $topic_config"
kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER $topic_config
fi
}
## This is the worker to read from the queue.
work() {
ID=$1
## first open the fifo and locks for reading.
exec 3<$FIFO
exec 4<$FIFO_LOCK
exec 5<$START_LOCK
## signal the worker has started.
flock 5 # obtain the start lock
echo $ID >> $START # put my worker ID in the start file
flock -u 5 # release the start lock
exec 5<&- # close the start lock file
echo worker $ID started
while true; do
## try to read the queue
flock 4 # obtain the fifo lock
read -su 3 work_id work_item # read into work_id and work_item
read_status=$? # save the exit status of read
flock -u 4 # release the fifo lock
## check the line read.
if [[ $read_status -eq 0 ]]; then
## If read gives an exit code of 0 the read succeeded.
# got a work item. do the work
echo $ID got work_id=$work_id topic_args=$work_item
## Run the job in a subshell. That way any exit calls do not kill
## the worker process.
( job "$work_id" "$work_item" )
else
## Any other exit code indicates an EOF.
break
fi
done
# clean up the fd(s)
exec 3<&-
exec 4<&-
echo $ID "done working"
}
## Start the workers.
for ((i=1;i<=$WORKERS;i++)); do
echo will start $i
work $i &
done

View File

@ -89,8 +89,6 @@ services:
condition: service_completed_successfully
opensearch-setup-dev:
condition: service_completed_successfully
kafka-setup-dev:
condition: service_completed_successfully
frontend-debug-postgres:
<<: *datahub-frontend-service-dev
profiles:

View File

@ -76,6 +76,9 @@ x-datahub-system-update-service: &datahub-system-update-service
SPRING_KAFKA_PROPERTIES_USE_LATEST_VERSION: ${SPRING_KAFKA_PROPERTIES_USE_LATEST_VERSION:-true}
ENTITY_VERSIONING_ENABLED: ${ENTITY_VERSIONING_ENABLED:-true}
ELASTICSEARCH_INDEX_BUILDER_REFRESH_INTERVAL_SECONDS: ${ELASTICSEARCH_INDEX_BUILDER_REFRESH_INTERVAL_SECONDS:-3}
DATAHUB_PRECREATE_TOPICS: ${DATAHUB_PRECREATE_TOPICS:-false}
KAFKA_BOOTSTRAP_SERVER: broker:29092
USE_CONFLUENT_SCHEMA_REGISTRY: false
volumes:
- ${HOME}/.datahub/plugins:/etc/datahub/plugins
labels:
@ -92,6 +95,7 @@ x-datahub-system-update-service-dev: &datahub-system-update-service-dev
REPROCESS_DEFAULT_BROWSE_PATHS_V2: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:-false}
JAVA_TOOL_OPTIONS: '-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5003'
ENTITY_VERSIONING_ENABLED: ${ENTITY_VERSIONING_ENABLED:-true}
DATAHUB_PRECREATE_TOPICS: ${DATAHUB_PRECREATE_TOPICS:-true}
volumes:
- ../../datahub-upgrade/build/libs/:/datahub/datahub-upgrade/bin/
- ../../metadata-models/src/main/resources/:/datahub/datahub-gms/resources
@ -233,8 +237,6 @@ services:
condition: service_completed_successfully
opensearch-setup:
condition: service_completed_successfully
kafka-setup:
condition: service_completed_successfully
opensearch:
condition: service_healthy
mysql:
@ -252,8 +254,6 @@ services:
condition: service_completed_successfully
opensearch-setup:
condition: service_completed_successfully
kafka-setup:
condition: service_completed_successfully
system-update-quickstart-postgres:
<<: *datahub-system-update-service
profiles:
@ -265,8 +265,6 @@ services:
condition: service_completed_successfully
opensearch-setup:
condition: service_completed_successfully
kafka-setup:
condition: service_completed_successfully
system-update-debug: &system-update-debug
<<: *datahub-system-update-service-dev
profiles:
@ -279,8 +277,6 @@ services:
condition: service_completed_successfully
opensearch-setup-dev:
condition: service_completed_successfully
kafka-setup-dev:
condition: service_completed_successfully
system-update-debug-aws: &system-update-debug
<<: *datahub-system-update-service-dev
profiles:
@ -291,8 +287,6 @@ services:
condition: service_completed_successfully
opensearch-setup-dev:
condition: service_completed_successfully
kafka-setup-dev:
condition: service_completed_successfully
system-update-debug-consumers: # for consistent naming based on profile
<<: *system-update-debug
profiles:
@ -306,8 +300,6 @@ services:
condition: service_completed_successfully
elasticsearch-setup-dev:
condition: service_completed_successfully
kafka-setup-dev:
condition: service_completed_successfully
system-update-debug-postgres:
<<: *datahub-system-update-service-dev
profiles:
@ -319,8 +311,6 @@ services:
condition: service_completed_successfully
opensearch-setup-dev:
condition: service_completed_successfully
kafka-setup-dev:
condition: service_completed_successfully
system-update-debug-cassandra:
<<: *datahub-system-update-service-dev
profiles:
@ -332,8 +322,6 @@ services:
condition: service_completed_successfully
opensearch-setup-dev:
condition: service_completed_successfully
kafka-setup-dev:
condition: service_completed_successfully
system-update-debug-neo4j:
<<: *datahub-system-update-service-dev
profiles:
@ -345,8 +333,6 @@ services:
condition: service_healthy
opensearch-setup-dev:
condition: service_completed_successfully
kafka-setup-dev:
condition: service_completed_successfully
#################################
# GMS
#################################

View File

@ -279,27 +279,6 @@ services:
timeout: 5s
volumes:
- broker:/var/lib/kafka/data/
kafka-setup: &kafka-setup
profiles: *profiles-quickstart
hostname: kafka-setup
image: ${DATAHUB_KAFKA_SETUP_IMAGE:-${DATAHUB_REPO:-acryldata}/datahub-kafka-setup}:${DATAHUB_VERSION:-head}
env_file: kafka-setup/env/docker.env
environment: &kafka-setup-env
DATAHUB_PRECREATE_TOPICS: ${DATAHUB_PRECREATE_TOPICS:-false}
KAFKA_BOOTSTRAP_SERVER: broker:29092
USE_CONFLUENT_SCHEMA_REGISTRY: false
depends_on:
kafka-broker:
condition: service_healthy
labels:
datahub_setup_job: true
kafka-setup-dev:
<<: *kafka-setup
profiles: *profiles-dev
environment:
<<: *kafka-setup-env
DATAHUB_PRECREATE_TOPICS: ${DATAHUB_PRECREATE_TOPICS:-true}
image: ${DATAHUB_KAFKA_SETUP_IMAGE:-${DATAHUB_REPO:-acryldata}/datahub-kafka-setup}:${DATAHUB_VERSION:-debug}
elasticsearch:
profiles: *elasticsearch-profiles
hostname: search

View File

@ -202,31 +202,6 @@ services:
source: broker
target: /var/lib/kafka/data
volume: {}
kafka-setup:
profiles:
- quickstart
- quickstart-backend
- quickstart-actions
- quickstart-frontend
- quickstart-storage
- quickstart-cassandra
- quickstart-postgres
- quickstart-consumers
depends_on:
kafka-broker:
condition: service_healthy
required: true
environment:
DATAHUB_PRECREATE_TOPICS: 'false'
KAFKA_BOOTSTRAP_SERVER: broker:29092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
USE_CONFLUENT_SCHEMA_REGISTRY: 'false'
hostname: kafka-setup
image: acryldata/datahub-kafka-setup:${DATAHUB_VERSION}
labels:
datahub_setup_job: 'true'
networks:
default: null
mysql:
profiles:
- quickstart
@ -389,9 +364,6 @@ services:
- -u
- SystemUpdate
depends_on:
kafka-setup:
condition: service_completed_successfully
required: true
mysql:
condition: service_healthy
required: true
@ -408,6 +380,7 @@ services:
BACKFILL_BROWSE_PATHS_V2: 'true'
DATAHUB_GMS_HOST: datahub-gms
DATAHUB_GMS_PORT: '8080'
DATAHUB_PRECREATE_TOPICS: 'false'
EBEAN_DATASOURCE_DRIVER: com.mysql.jdbc.Driver
EBEAN_DATASOURCE_HOST: mysql:3306
EBEAN_DATASOURCE_PASSWORD: datahub
@ -435,6 +408,7 @@ services:
SCHEMA_REGISTRY_TYPE: INTERNAL
SPRING_KAFKA_PROPERTIES_AUTO_REGISTER_SCHEMAS: 'true'
SPRING_KAFKA_PROPERTIES_USE_LATEST_VERSION: 'true'
USE_CONFLUENT_SCHEMA_REGISTRY: 'false'
hostname: datahub-system-update
image: acryldata/datahub-upgrade:${DATAHUB_VERSION}
labels:

View File

@ -1148,7 +1148,6 @@ module.exports = {
// - "docker/datahub-ingestion/README",
// - "docker/elasticsearch-setup/README",
// - "docker/ingestion/README",
// - "docker/kafka-setup/README",
// - "docker/mariadb/README",
// - "docker/mysql/README",
// - "docker/neo4j/README",

View File

@ -16,12 +16,10 @@ the proper ordering of the components whereas docker-compose relies on a series
datahub-gms (healthy)
|
datahub-upgrade (SystemUpdate completed)
/--------------------/ | \ \------------------------------------------------\
/ | \-------------------\ \
mysql-setup (completed) elasticsearch-setup (completed) kafka-setup (completed) (if apply) neo4j (healthy)
| | / \
| | / \
mysql (healthy) elasticsearch (healthy) broker (healthy) (if not internal) schema-registry (healthy)
|
zookeeper (healthy)
/--------------------/ | \------------------------------------------------\
/ | \
mysql-setup (completed) elasticsearch-setup (completed) (if apply) neo4j (healthy)
| |
| |
mysql (healthy) elasticsearch (healthy)
```

View File

@ -229,11 +229,3 @@ credentialsAndCertsSecrets:
```
The Actions pod will automatically pick these up in the correctly named environment variables when they are named this exact way.
## Contribution
Accepting contributions for a setup script compatible with Confluent Cloud!
The kafka-setup-job container we ship with is only compatible with a distribution of Kafka wherein ZooKeeper
is exposed and available. A version of the job using the [Confluent CLI](https://docs.confluent.io/confluent-cli/current/command-reference/kafka/topic/confluent_kafka_topic_create.html)
would be very useful for the broader community.

View File

@ -121,7 +121,6 @@ datahub-datahub-gms-58b676f77c-c6pfx 1/1 Running 0
datahub-datahub-mae-consumer-7b98bf65d-tjbwx 1/1 Running 0 4m3s
datahub-datahub-mce-consumer-8c57d8587-vjv9m 1/1 Running 0 4m2s
datahub-elasticsearch-setup-job-8dz6b 0/1 Completed 0 4m50s
datahub-kafka-setup-job-6blcj 0/1 Completed 0 4m40s
datahub-mysql-setup-job-b57kc 0/1 Completed 0 4m7s
elasticsearch-master-0 1/1 Running 0 97m
elasticsearch-master-1 1/1 Running 0 97m

View File

@ -31,7 +31,6 @@ Once the `debug` docker images are constructed you'll see images similar to the
```shell
acryldata/datahub-frontend-react debug e52fef698025 28 minutes ago 763MB
acryldata/datahub-kafka-setup debug 3375aaa2b12d 55 minutes ago 659MB
acryldata/datahub-gms debug ea2b0a8ea115 56 minutes ago 408MB
acryldata/datahub-upgrade debug 322377a7a21d 56 minutes ago 463MB
acryldata/datahub-mysql-setup debug 17768edcc3e5 2 hours ago 58.2MB

View File

@ -5,7 +5,9 @@ hide_title: true
# Configuring Kafka in DataHub
DataHub requires Kafka to operate. Kafka is used as a durable log that can be used to store inbound
DataHub uses Kafka as the pub-sub message queue in the backend.
[Official Confluent Kafka Docker images](https://hub.docker.com/u/confluentinc) found in Docker Hub is used without
any modification. Kafka is used as a durable log that can be used to store inbound
requests to update the Metadata Graph (Metadata Change Proposal), or as a change log detailing the updates
that have been made to the Metadata Graph (Metadata Change Log).
@ -15,9 +17,9 @@ The following environment variables can be used to customize DataHub's connectio
each of which requires a connection to Kafka:
- `metadata-service` (datahub-gms container)
- `system-update` (dathub-system-update container if setting up topics via datahub)
- (Advanced - if standalone consumers are deployed) `mce-consumer-job` (datahub-mce-consumer container)
- (Advanced - if standalone consumers are deployed) `mae-consumer-job` (datahub-mae-consumer container)
- (Advanced - if product analytics are enabled) datahub-frontend
### Connection Configuration
@ -70,9 +72,10 @@ How Metadata Events relate to these topics is discussed at more length in [Metad
We've included environment variables to customize the name each of these topics, for cases where an organization has naming rules for your topics.
### Metadata Service (datahub-gms)
### Metadata Service (datahub-gms) and System Update (datahub-system-update)
The following are environment variables you can use to configure topic names used in the Metadata Service container:
The following are environment variables you can use to configure topic names used in the Metadata Service container and
the System Update container for topic setup:
- `METADATA_CHANGE_PROPOSAL_TOPIC_NAME`: The name of the topic for Metadata Change Proposals emitted by the ingestion framework.
- `FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME`: The name of the topic for Metadata Change Proposals emitted when MCPs fail processing.
@ -99,11 +102,6 @@ The following are environment variables you can use to configure topic names use
- `DATAHUB_USAGE_EVENT_NAME`: The name of the topic for product analytics events.
- (Deprecated) `METADATA_AUDIT_EVENT_NAME`: The name of the deprecated metadata audit event topic.
### DataHub Frontend (datahub-frontend-react)
- `DATAHUB_TRACKING_TOPIC`: The name of the topic used for storing DataHub usage events.
It should contain the same value as `DATAHUB_USAGE_EVENT_NAME` in the Metadata Service container.
Please ensure that these environment variables are set consistently throughout your ecosystem. DataHub has a few different applications running which communicate with Kafka (see above).
## Configuring Consumer Group Id
@ -162,6 +160,19 @@ datahub-gms:
- name: KAFKA_CONSUMER_GROUP_ID
value: "my-apps-mae-consumer"
....
datahub-system-update:
...
extraEnvs:
- name: METADATA_CHANGE_PROPOSAL_TOPIC_NAME
value: "CustomMetadataChangeProposal_v1"
- name: METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME
value: "CustomMetadataChangeLogVersioned_v1"
- name: FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME
value: "CustomFailedMetadataChangeProposal_v1"
- name: KAFKA_CONSUMER_GROUP_ID
value: "my-apps-mae-consumer"
....
datahub-frontend:
...
@ -189,10 +200,9 @@ datahub-mce-consumer;
## Other Components that use Kafka can be configured using environment variables:
- kafka-setup
- schema-registry
## SASL/GSSAPI properties for kafka-setup and datahub-frontend via environment variables
## SASL/GSSAPI properties for system-update and datahub-frontend via environment variables
```bash
KAFKA_BOOTSTRAP_SERVER=broker:29092
@ -249,3 +259,32 @@ Client.
> messages indicate that the service was passed a configuration that is not relevant to it and can be safely ignored.
> Other errors: `Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [DataHubUsageEvent_v1]`. Please check ranger permissions or kafka broker logs.
### Additional Kafka Topic level configuration
For additional [Kafka topic level config properties](https://kafka.apache.org/documentation/#topicconfigs), either add them to application.yaml under `kafka.topics.<topicName>.configProperties` or `kafka.topicDefaults.configProperties` or define env vars in the following form (standard Spring conventions applied to application.yaml)
These env vars are required in datahub-system-update contained.
Examples:
1. To configure `max.message.bytes` on topic used for `metadataChangeLogVersioned`, set
`KAFKA_TOPICS_metadataChangeLogVersioned_CONFIGPROPERTIES_max_message_bytes=10000`
2. To configure `max.message.bytes` for all topics that don't explicitly define one, set the `topicDefaults` via
`KAFKA_TOPICDEFAULTS_CONFIGPROPERTIES_max_message_bytes=10000`
Configurations specified in `topicDefaults` are applied to all topics by merging them with any configs defined per topic, with the per-topic config taking precedence over those specified in `topicDefault`.
If you intend to create and configure the topics yourself and not have datahub create them, the kafka setup process of
datahub-system-update can be turned off by setting env var DATAHUB_PRECREATE_TOPICS to false
## Debugging Kafka
You can install [kafkacat](https://github.com/edenhill/kafkacat) to consume and produce messaged to Kafka topics.
For example, to consume messages on MetadataAuditEvent topic, you can run below command.
```
kafkacat -b localhost:9092 -t MetadataAuditEvent
```
However, `kafkacat` currently doesn't support Avro deserialization at this point,
but they have an ongoing [work](https://github.com/edenhill/kafkacat/pull/151) for that.

View File

@ -90,7 +90,6 @@ Starting up DataHub...
✔ Container datahub-kafka-broker-1 Healthy 6.0s
✔ Container datahub-opensearch-setup-1 Exited 11.6s
✔ Container datahub-mysql-setup-1 Exited 11.6s
✔ Container datahub-kafka-setup-1 Exited 11.6s
✔ Container datahub-system-update-quickstart-1 Exited 26.6s
✔ Container datahub-datahub-gms-quickstart-1 Healthy 42.1s
✔ Container datahub-frontend-quickstart-1 Started 26.6s

View File

@ -117,6 +117,20 @@ public class PropertiesCollectorConfigurationTest extends AbstractTestNGSpringCo
"systemUpdate.*.enabled",
"systemUpdate.*.batchSize",
// Kafka topic Configs
"kafka.topics.*.name",
"kafka.topics.*.partitions",
"kafka.topics.*.enabled",
"kafka.topics.*.replicationFactor",
"kafka.topics.*.configProperties.max.message.bytes",
"kafka.topics.*.configProperties.retention.ms",
"kafka.topicDefaults.configProperties.max.message.bytes",
"kafka.topicDefaults.configProperties.retention.ms",
"kafka.topicDefaults.partitions",
"kafka.topicDefaults.replicationFactor",
"kafka.setup.preCreateTopics",
"kafka.setup.useConfluentSchemaRegistry",
// IAM authentication flags
"*.postgresUseIamAuth",
"*.opensearchUseAwsIamAuth");

View File

@ -43,7 +43,7 @@ However, if you only want to build `Metadata Change Event Consumer Job` specific
## Dependencies
Before starting `Metadata Change Event Consumer Job`, you need to make sure that [Kafka, Schema Registry & Zookeeper](../../docker/kafka-setup)
Before starting `Metadata Change Event Consumer Job`, you need to make sure that [Kafka, Schema Registry & Zookeeper](../../docs/how/kafka-config.md)
and [DataHub GMS](../../docker/datahub-gms) Docker containers are up and running.
## Start via Docker image

View File

@ -29,7 +29,7 @@ However, if you only want to build `DataHub Metadata Service` specifically:
## Dependencies
Before starting `DataHub Metadata Service`, you need to make sure that [Kafka, Schema Registry & Zookeeper](../docker/kafka-setup),
Before starting `DataHub Metadata Service`, you need to make sure that [Kafka, Schema Registry & Zookeeper](../docs/how/kafka-config.md),
[Elasticsearch](../docker/elasticsearch) and [MySQL](../docker/mysql) Docker containers are up and running.
## Start via Docker image

View File

@ -2,14 +2,17 @@ package com.linkedin.metadata.config.kafka;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
@Data
@Configuration
public class KafkaConfiguration {
// Avoiding dependencies on other libraries (Spring/Kafka) for configuration
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
@ -28,6 +31,16 @@ public class KafkaConfiguration {
private String bootstrapServers;
private SetupConfiguration setup;
private TopicsConfiguration.TopicConfiguration topicDefaults;
private Map<String, TopicsConfiguration.TopicConfiguration> topics;
// Retained for backward compatibility, not loaded from conf, but computed by merging
// topics with topicDefaults
private TopicsConfiguration topicsConfiguration;
private ListenerConfiguration listener;
private SchemaRegistryConfiguration schemaRegistry;
@ -37,7 +50,15 @@ public class KafkaConfiguration {
private ConsumerConfiguration consumer;
private SerDeConfig serde;
private TopicsConfiguration topics;
@PostConstruct
public void initializeTopicDefaults() {
topicsConfiguration = new TopicsConfiguration(topicDefaults, topics);
}
public TopicsConfiguration getTopics() {
return new TopicsConfiguration(topicDefaults, topics);
}
@Data
public static class SerDeConfig {

View File

@ -0,0 +1,9 @@
package com.linkedin.metadata.config.kafka;
import lombok.Data;
@Data
public class SetupConfiguration {
private boolean preCreateTopics = true;
private boolean useConfluentSchemaRegistry = true;
}

View File

@ -1,8 +1,76 @@
package com.linkedin.metadata.config.kafka;
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
@Data
@Getter
@Setter
public class TopicsConfiguration {
private String dataHubUsage;
// For backward compatibility
private String dataHubUsage = "DataHubUsage_v1";
private Map<String, TopicConfiguration> topics = new HashMap<>();
private TopicConfiguration topicDefaults;
// For backward compatibility
public String getDataHubUsage() {
if (topics != null && topics.containsKey("datahubUsageEvent")) {
TopicConfiguration topicDef = topics.get("datahubUsageEvent");
if (topicDef != null && topicDef.getName() != null) {
return topicDef.getName();
}
}
return dataHubUsage;
}
public TopicsConfiguration(
TopicConfiguration topicDefaults, Map<String, TopicConfiguration> topics) {
this.topics = topics;
this.topicDefaults = topicDefaults;
// Initialize null values in topics map with defaults from topicDefaults
if (topics != null && topicDefaults != null) {
for (TopicConfiguration topicConfig : topics.values()) {
if (topicConfig != null) {
// Initialize partitions if null
if (topicConfig.getPartitions() == null) {
topicConfig.setPartitions(topicDefaults.getPartitions());
}
// Initialize replicationFactor if null
if (topicConfig.getReplicationFactor() == null) {
topicConfig.setReplicationFactor(topicDefaults.getReplicationFactor());
}
// Initialize create if null
if (topicConfig.getEnabled() == null) {
topicConfig.setEnabled(topicDefaults.getEnabled());
}
// Initialize configProperties if null
if (topicConfig.getConfigProperties() == null) {
topicConfig.setConfigProperties(topicDefaults.getConfigProperties());
} else if (topicDefaults.getConfigProperties() != null) {
// Merge configProperties, keeping existing values as overrides
Map<String, String> mergedConfig = new HashMap<>(topicDefaults.getConfigProperties());
mergedConfig.putAll(topicConfig.getConfigProperties());
topicConfig.setConfigProperties(mergedConfig);
}
}
}
}
}
@Getter
@Setter
public static class TopicConfiguration {
private String name;
private Integer partitions;
private Integer replicationFactor = 1;
private Map<String, String> configProperties;
private Boolean enabled = true; // Default to true if not specified
}
}

View File

@ -344,8 +344,69 @@ elasticsearch:
# TODO: Kafka topic convention
kafka:
# Kafka Setup Configuration - replaces kafka-setup container functionality
setup:
preCreateTopics: ${DATAHUB_PRECREATE_TOPICS:true}
useConfluentSchemaRegistry: ${USE_CONFLUENT_SCHEMA_REGISTRY:false}
# readinessCheck:
# timeoutSeconds: 300 # HARDCODED: 60 attempts * 5 seconds = 300s total
# intervalSeconds: 5 # HARDCODED: kafka-ready.sh sleep 5s
# maxAttempts: 60 # HARDCODED: kafka-ready.sh for i in {1..60}
# # EXISTING KAFKA_PROPERTIES_* environment variable mapping
# properties:
# securityProtocol: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:PLAINTEXT} # EXISTING
# # All KAFKA_PROPERTIES_* environment variables will be automatically mapped
# Topic defaults applied when individual topic configuration is missing
topicDefaults:
partitions: ${PARTITIONS:1}
replicationFactor: ${REPLICATION_FACTOR:1}
configProperties:
max.message.bytes: ${MAX_MESSAGE_BYTES:5242880}
topics:
dataHubUsage: ${DATAHUB_USAGE_EVENT_NAME:DataHubUsageEvent_v1}
# Topic Dictionary Configuration - merged directly into topics section
# Each topic can be created by iterating through this dictionary
# The key name matches the programmatic identifier used in code
metadataChangeProposal:
name: ${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:MetadataChangeProposal_v1}
failedMetadataChangeProposal:
name: ${FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME:FailedMetadataChangeProposal_v1}
metadataChangeLogVersioned:
name: ${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:MetadataChangeLog_Versioned_v1}
metadataChangeLogTimeseries:
name: ${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:MetadataChangeLog_Timeseries_v1}
configProperties:
retention.ms: 7776000000
platformEvent:
name: ${PLATFORM_EVENT_TOPIC_NAME:PlatformEvent_v1}
datahubUpgradeHistory:
name: ${DATAHUB_UPGRADE_HISTORY_TOPIC_NAME:DataHubUpgradeHistory_v1}
partitions: 1
configProperties:
retention.ms: -1
datahubUsageEvent:
name: ${DATAHUB_USAGE_EVENT_NAME:DataHubUsageEvent_v1}
enabled: ${DATAHUB_ANALYTICS_ENABLED:true}
# Deprecated Topics (maintained for backward compatibility)
metadataAuditEvent:
name: ${METADATA_AUDIT_EVENT_NAME:MetadataAuditEvent_v4}
metadataChangeEvent:
name: ${METADATA_CHANGE_EVENT_NAME:MetadataChangeEvent_v4}
failedMetadataChangeEvent:
name: ${FAILED_METADATA_CHANGE_EVENT_NAME:FailedMetadataChangeEvent_v4}
listener:
concurrency: ${KAFKA_LISTENER_CONCURRENCY:1}
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}
@ -406,6 +467,7 @@ kafka:
security:
protocol: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:PLAINTEXT}
# Only required if GraphService type is neo4j
neo4j:
username: ${NEO4J_USERNAME:neo4j}

View File

@ -0,0 +1,193 @@
package com.linkedin.metadata.config.kafka;
import static org.testng.Assert.*;
import com.linkedin.metadata.config.DataHubAppConfiguration;
import com.linkedin.metadata.config.DataHubTestApplication;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;
@SpringBootTest(classes = DataHubTestApplication.class)
public class KafkaConfigurationTest extends AbstractTestNGSpringContextTests {
@Autowired private DataHubTestApplication testApplication;
@Test
public void testKafkaConfigurationLoaded() {
assertNotNull(testApplication, "Test application should be loaded");
DataHubAppConfiguration appConfig = testApplication.getDataHubAppConfig();
assertNotNull(appConfig, "DataHubAppConfiguration should be loaded");
KafkaConfiguration kafkaConfig = appConfig.getKafka();
assertNotNull(kafkaConfig, "KafkaConfiguration should be loaded");
}
@Test
public void testConfigurationHierarchy() {
DataHubAppConfiguration appConfig = testApplication.getDataHubAppConfig();
KafkaConfiguration kafkaConfig = appConfig.getKafka();
// Test that all configuration objects are properly nested
assertNotNull(kafkaConfig.getSetup(), "Setup configuration should be accessible");
assertNotNull(
kafkaConfig.getTopicDefaults(), "TopicDefaults configuration should be accessible");
// Test that the configuration objects are of the correct types
assertTrue(
kafkaConfig.getSetup() instanceof SetupConfiguration,
"Setup should be SetupConfiguration type");
assertTrue(
kafkaConfig.getTopicDefaults() instanceof TopicsConfiguration.TopicConfiguration,
"TopicDefaults should be TopicConfiguration type");
// Topics configuration should not be null
assertNotNull(kafkaConfig.getTopics(), "Topics configuration should not be null");
assertTrue(
kafkaConfig.getTopics() instanceof TopicsConfiguration,
"Topics should be TopicsConfiguration type");
}
@Test
public void testTopicConfigurationDefaultsInitialization() {
DataHubAppConfiguration appConfig = testApplication.getDataHubAppConfig();
KafkaConfiguration kafkaConfig = appConfig.getKafka();
TopicsConfiguration topicsConfig = kafkaConfig.getTopics();
assertNotNull(topicsConfig, "TopicsConfiguration should not be null");
TopicsConfiguration.TopicConfiguration topicDefaults = topicsConfig.getTopicDefaults();
// Test that topics with null values get initialized with defaults
Map<String, TopicsConfiguration.TopicConfiguration> topics = topicsConfig.getTopics();
assertNotNull(topics, "Topics map should not be null");
// Test metadataChangeProposal topic (should have defaults applied)
TopicsConfiguration.TopicConfiguration metadataChangeProposal =
topics.get("metadataChangeProposal");
assertNotNull(metadataChangeProposal, "metadataChangeProposal should not be null");
// This topic only has a name, so partitions should be initialized from defaults
assertEquals(
metadataChangeProposal.getPartitions(),
topicDefaults.getPartitions(),
"metadataChangeProposal partitions should be initialized from defaults");
assertEquals(
metadataChangeProposal.getReplicationFactor(),
topicDefaults.getReplicationFactor(),
"metadataChangeProposal replicationFactor should be initialized from defaults");
// Test configProperties merging for metadataChangeProposal (should inherit all defaults)
Map<String, String> metadataChangeProposalConfig = metadataChangeProposal.getConfigProperties();
assertNotNull(
metadataChangeProposalConfig, "metadataChangeProposal configProperties should not be null");
Map<String, String> defaultConfig = topicDefaults.getConfigProperties();
assertNotNull(defaultConfig, "topicDefaults configProperties should not be null");
// Verify that metadataChangeProposal inherits the default max.message.bytes
assertEquals(
metadataChangeProposalConfig.get("max.message.bytes"),
defaultConfig.get("max.message.bytes"),
"metadataChangeProposal should inherit max.message.bytes from defaults");
assertEquals(
metadataChangeProposalConfig.get("max.message.bytes"),
"5242880",
"metadataChangeProposal should have default max.message.bytes value");
// Test datahubUpgradeHistory topic (has explicit partitions=1, should retain it)
TopicsConfiguration.TopicConfiguration datahubUpgradeHistory =
topics.get("datahubUpgradeHistory");
assertNotNull(datahubUpgradeHistory, "datahubUpgradeHistory should not be null");
assertEquals(
datahubUpgradeHistory.getPartitions(),
Integer.valueOf(1),
"datahubUpgradeHistory should retain its explicit partitions value");
// replicationFactor should be initialized from defaults since it's not explicitly set
assertEquals(
datahubUpgradeHistory.getReplicationFactor(),
topicDefaults.getReplicationFactor(),
"datahubUpgradeHistory replicationFactor should be initialized from defaults");
// Test configProperties merging for datahubUpgradeHistory (should merge defaults with specific
// retention)
Map<String, String> datahubUpgradeHistoryConfig = datahubUpgradeHistory.getConfigProperties();
assertNotNull(
datahubUpgradeHistoryConfig, "datahubUpgradeHistory configProperties should not be null");
// Verify that datahubUpgradeHistory inherits the default max.message.bytes
assertEquals(
datahubUpgradeHistoryConfig.get("max.message.bytes"),
defaultConfig.get("max.message.bytes"),
"datahubUpgradeHistory should inherit max.message.bytes from defaults");
assertEquals(
datahubUpgradeHistoryConfig.get("max.message.bytes"),
"5242880",
"datahubUpgradeHistory should have default max.message.bytes value");
// Verify that datahubUpgradeHistory has its specific retention.ms value
assertEquals(
datahubUpgradeHistoryConfig.get("retention.ms"),
"-1",
"datahubUpgradeHistory should have its specific retention.ms value");
// Verify that the configProperties map contains both default and specific properties
assertTrue(
datahubUpgradeHistoryConfig.containsKey("max.message.bytes"),
"datahubUpgradeHistory should contain default max.message.bytes");
// Test metadataChangeLogTimeseries topic (should merge defaults with specific retention)
TopicsConfiguration.TopicConfiguration metadataChangeLogTimeseries =
topics.get("metadataChangeLogTimeseries");
assertNotNull(metadataChangeLogTimeseries, "metadataChangeLogTimeseries should not be null");
// Test configProperties merging for metadataChangeLogTimeseries
Map<String, String> metadataChangeLogTimeseriesConfig =
metadataChangeLogTimeseries.getConfigProperties();
assertNotNull(
metadataChangeLogTimeseriesConfig,
"metadataChangeLogTimeseries configProperties should not be null");
// Verify that metadataChangeLogTimeseries inherits the default max.message.bytes
assertEquals(
metadataChangeLogTimeseriesConfig.get("max.message.bytes"),
defaultConfig.get("max.message.bytes"),
"metadataChangeLogTimeseries should inherit max.message.bytes from defaults");
// Verify that metadataChangeLogTimeseries has its specific retention.ms value
assertEquals(
metadataChangeLogTimeseriesConfig.get("retention.ms"),
"7776000000",
"metadataChangeLogTimeseries should have its specific retention.ms value");
// Verify that the configProperties map contains both default and specific properties
assertTrue(
metadataChangeLogTimeseriesConfig.containsKey("max.message.bytes"),
"metadataChangeLogTimeseries should contain default max.message.bytes");
assertTrue(
metadataChangeLogTimeseriesConfig.containsKey("retention.ms"),
"metadataChangeLogTimeseries should contain specific retention.ms");
assertTrue(
metadataChangeLogTimeseriesConfig.size() >= 2,
"metadataChangeLogTimeseries configProperties should contain at least default and specific properties");
}
@Test
public void testGetDataHubUsage() {
DataHubAppConfiguration appConfig = testApplication.getDataHubAppConfig();
KafkaConfiguration kafkaConfig = appConfig.getKafka();
TopicsConfiguration topicsConfig = kafkaConfig.getTopics();
assertNotNull(topicsConfig, "TopicsConfiguration should not be null");
// Test getDataHubUsage method
String dataHubUsage = topicsConfig.getDataHubUsage();
assertNotNull(dataHubUsage, "getDataHubUsage should not return null");
// Since datahubUsageEvent topic is configured in the YAML, it should return the topic name
assertEquals(
dataHubUsage,
"DataHubUsageEvent_v1",
"getDataHubUsage should return the datahubUsageEvent topic name from configuration");
}
}

View File

@ -27,6 +27,10 @@ public class OpenTelemetryBaseFactoryTest {
// Mock TopicsConfiguration since it's not provided in the classes
public static class TopicsConfiguration
extends com.linkedin.metadata.config.kafka.TopicsConfiguration {
TopicsConfiguration() {
super(null, null);
}
private String dataHubUsage;
public String getDataHubUsage() {

View File

@ -153,9 +153,9 @@ public class TrackingServiceFactoryTest extends AbstractTestNGSpringContextTests
com.linkedin.metadata.config.kafka.KafkaConfiguration config =
new com.linkedin.metadata.config.kafka.KafkaConfiguration();
com.linkedin.metadata.config.kafka.TopicsConfiguration topicsConfig =
new com.linkedin.metadata.config.kafka.TopicsConfiguration();
new com.linkedin.metadata.config.kafka.TopicsConfiguration(null, null);
topicsConfig.setDataHubUsage("DataHubUsageEvent_v1");
config.setTopics(topicsConfig);
config.setTopicsConfiguration(topicsConfig);
return config;
}
@ -223,9 +223,9 @@ public class TrackingServiceFactoryTest extends AbstractTestNGSpringContextTests
com.linkedin.metadata.config.kafka.KafkaConfiguration config =
new com.linkedin.metadata.config.kafka.KafkaConfiguration();
com.linkedin.metadata.config.kafka.TopicsConfiguration topicsConfig =
new com.linkedin.metadata.config.kafka.TopicsConfiguration();
new com.linkedin.metadata.config.kafka.TopicsConfiguration(null, null);
topicsConfig.setDataHubUsage("DataHubUsageEvent_v1");
config.setTopics(topicsConfig);
config.setTopicsConfiguration(topicsConfig);
return config;
}

View File

@ -34,7 +34,6 @@ include ':docker'
include ':docker:elasticsearch-setup'
include ':docker:mysql-setup'
include ':docker:postgres-setup'
include ':docker:kafka-setup'
include ':docker:datahub-ingestion'
include ':docker:datahub-ingestion-base'
include 'metadata-models'