diff --git a/metadata-io/src/test/resources/testng.xml b/metadata-io/src/test/resources/testng.xml index dc2c3978b2..875ba5bf33 100644 --- a/metadata-io/src/test/resources/testng.xml +++ b/metadata-io/src/test/resources/testng.xml @@ -13,4 +13,4 @@ parallel followed by everything else. - \ No newline at end of file + diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java index e807d4ac78..e0501487c0 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java @@ -149,7 +149,7 @@ public class BatchMetadataChangeProposalsProcessor { // If adding this MCP would exceed the batch size limit, process the current batch first if (!currentBatch.isEmpty() && currentBatchSize + mcpSize - > provider.getMetadataChangeProposal().getBatch().getSize()) { + > provider.getMetadataChangeProposal().getConsumer().getBatch().getSize()) { processBatch(currentBatch, currentBatchSize); totalProcessed += currentBatch.size(); log.info( diff --git a/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java index 5fb298cffa..221e9373e4 100644 --- a/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java +++ b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java @@ -242,9 +242,11 @@ public class BatchMetadataChangeProposalsProcessorTest { when(mockProvider.getMetadataChangeProposal()) .thenReturn( new MetadataChangeProposalConfig() - .setBatch( + .setConsumer( new MetadataChangeProposalConfig.ConsumerBatchConfig() - .setSize(Integer.MAX_VALUE))); + .setBatch( + new MetadataChangeProposalConfig.BatchConfig() + .setSize(Integer.MAX_VALUE)))); // Create MCPs MetadataChangeProposal mcp1 = new MetadataChangeProposal(); @@ -302,9 +304,11 @@ public class BatchMetadataChangeProposalsProcessorTest { when(mockProvider.getMetadataChangeProposal()) .thenReturn( new MetadataChangeProposalConfig() - .setBatch( + .setConsumer( new MetadataChangeProposalConfig.ConsumerBatchConfig() - .setSize(Integer.MAX_VALUE))); + .setBatch( + new MetadataChangeProposalConfig.BatchConfig() + .setSize(Integer.MAX_VALUE)))); // Create 3 Invalid MCPs MetadataChangeProposal mcp1 = new MetadataChangeProposal(); @@ -412,11 +416,14 @@ public class BatchMetadataChangeProposalsProcessorTest { @Test public void testLargeBatchPartitioning() throws Exception { // Mock the ConfigurationProvider to return a specific batch size limit - MetadataChangeProposalConfig.ConsumerBatchConfig mockConfig = - mock(MetadataChangeProposalConfig.ConsumerBatchConfig.class); - when(mockConfig.getSize()).thenReturn(5 * 1024); // 5KB batch size limit for testing + MetadataChangeProposalConfig.ConsumerBatchConfig batchConfig = + new MetadataChangeProposalConfig.ConsumerBatchConfig() + .setBatch( + new MetadataChangeProposalConfig.BatchConfig() + .setSize(5 * 1024) + .setEnabled(true)); // 5KB batch size limit for testing when(mockProvider.getMetadataChangeProposal()) - .thenReturn(new MetadataChangeProposalConfig().setBatch(mockConfig)); + .thenReturn(new MetadataChangeProposalConfig().setConsumer(batchConfig)); // Create 3 MCPs, one with a large aspect value MetadataChangeProposal smallMcp1 = createMcpWithAspectSize(1000); // 1KB @@ -456,11 +463,15 @@ public class BatchMetadataChangeProposalsProcessorTest { @Test public void testExtremelyLargeAspect() throws Exception { // Mock the ConfigurationProvider to return a specific batch size limit - MetadataChangeProposalConfig.ConsumerBatchConfig mockConfig = - mock(MetadataChangeProposalConfig.ConsumerBatchConfig.class); - when(mockConfig.getSize()).thenReturn(10000); // 10KB batch size limit for testing + MetadataChangeProposalConfig.ConsumerBatchConfig batchConfig = + new MetadataChangeProposalConfig.ConsumerBatchConfig() + .setBatch( + new MetadataChangeProposalConfig.BatchConfig() + .setSize(10000) + .setEnabled(true)); // 10KB batch size limit for testing when(mockProvider.getMetadataChangeProposal()) - .thenReturn(new MetadataChangeProposalConfig().setBatch(mockConfig)); + .thenReturn(new MetadataChangeProposalConfig().setConsumer(batchConfig)); + mock(MetadataChangeProposalConfig.ConsumerBatchConfig.class); // Create an MCP with an aspect value that exceeds the batch size on its own MetadataChangeProposal hugeMcp = diff --git a/metadata-service/configuration/build.gradle b/metadata-service/configuration/build.gradle index 3c7e38b004..e82ac18bca 100644 --- a/metadata-service/configuration/build.gradle +++ b/metadata-service/configuration/build.gradle @@ -16,6 +16,9 @@ dependencies { compileOnly externalDependency.lombok annotationProcessor externalDependency.lombok + + testImplementation externalDependency.testng + testImplementation externalDependency.springBootTest } processResources.configure { diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/DataHubAppConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/DataHubAppConfiguration.java index 5aa676a9e6..d752665959 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/DataHubAppConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/DataHubAppConfiguration.java @@ -7,8 +7,12 @@ import com.linkedin.metadata.config.search.ElasticSearchConfiguration; import com.linkedin.metadata.config.search.SearchServiceConfiguration; import com.linkedin.metadata.config.telemetry.TelemetryConfiguration; import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; @Data +@Component +@ConfigurationProperties public class DataHubAppConfiguration { /** Ingestion related configs */ diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java index cc649b6bb0..6c545725ca 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java @@ -6,7 +6,7 @@ import lombok.experimental.Accessors; @Data @Accessors(chain = true) public class MetadataChangeProposalConfig { - ConsumerBatchConfig batch; + ConsumerBatchConfig consumer; ThrottlesConfig throttle; MCPValidationConfig validation; SideEffectsConfig sideEffects; @@ -64,8 +64,14 @@ public class MetadataChangeProposalConfig { @Data @Accessors(chain = true) - public static class ConsumerBatchConfig { + public static class BatchConfig { boolean enabled; Integer size; } + + @Data + @Accessors(chain = true) + public static class ConsumerBatchConfig { + BatchConfig batch; + } } diff --git a/metadata-service/configuration/src/test/java/com/linkedin/metadata/config/DataHubAppConfigurationTest.java b/metadata-service/configuration/src/test/java/com/linkedin/metadata/config/DataHubAppConfigurationTest.java new file mode 100644 index 0000000000..026c82ff69 --- /dev/null +++ b/metadata-service/configuration/src/test/java/com/linkedin/metadata/config/DataHubAppConfigurationTest.java @@ -0,0 +1,40 @@ +package com.linkedin.metadata.config; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +@SpringBootTest(classes = DataHubTestApplication.class) +public class DataHubAppConfigurationTest extends AbstractTestNGSpringContextTests { + + @Autowired private DataHubTestApplication testApplication; + + @Test + public void testInit() { + assertNotNull(testApplication); + } + + @Test + public void testMCPBatchDefaults() { + assertFalse( + testApplication + .getDataHubAppConfig() + .getMetadataChangeProposal() + .getConsumer() + .getBatch() + .isEnabled()); + assertEquals( + testApplication + .getDataHubAppConfig() + .getMetadataChangeProposal() + .getConsumer() + .getBatch() + .getSize(), + 15744000); + } +} diff --git a/metadata-service/configuration/src/test/java/com/linkedin/metadata/config/DataHubTestApplication.java b/metadata-service/configuration/src/test/java/com/linkedin/metadata/config/DataHubTestApplication.java new file mode 100644 index 0000000000..9f4f9e9856 --- /dev/null +++ b/metadata-service/configuration/src/test/java/com/linkedin/metadata/config/DataHubTestApplication.java @@ -0,0 +1,16 @@ +package com.linkedin.metadata.config; + +import com.linkedin.metadata.spring.YamlPropertySourceFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.PropertySource; + +@SpringBootApplication +@PropertySource(value = "classpath:/application.yaml", factory = YamlPropertySourceFactory.class) +public class DataHubTestApplication { + @Autowired private DataHubAppConfiguration dataHubAppConfig; + + public DataHubAppConfiguration getDataHubAppConfig() { + return dataHubAppConfig; + } +}