mirror of
https://github.com/datahub-project/datahub.git
synced 2025-06-27 05:03:31 +00:00
fix(config): fix mcp batch configuration (#13598)
This commit is contained in:
parent
feb979d17f
commit
907250eccd
@ -13,4 +13,4 @@ parallel followed by everything else.
|
||||
<suite-file path="testng-other.xml"/>
|
||||
<suite-file path="testng-sql-opt.xml"/>
|
||||
</suite-files>
|
||||
</suite>
|
||||
</suite>
|
||||
|
@ -149,7 +149,7 @@ public class BatchMetadataChangeProposalsProcessor {
|
||||
// If adding this MCP would exceed the batch size limit, process the current batch first
|
||||
if (!currentBatch.isEmpty()
|
||||
&& currentBatchSize + mcpSize
|
||||
> provider.getMetadataChangeProposal().getBatch().getSize()) {
|
||||
> provider.getMetadataChangeProposal().getConsumer().getBatch().getSize()) {
|
||||
processBatch(currentBatch, currentBatchSize);
|
||||
totalProcessed += currentBatch.size();
|
||||
log.info(
|
||||
|
@ -242,9 +242,11 @@ public class BatchMetadataChangeProposalsProcessorTest {
|
||||
when(mockProvider.getMetadataChangeProposal())
|
||||
.thenReturn(
|
||||
new MetadataChangeProposalConfig()
|
||||
.setBatch(
|
||||
.setConsumer(
|
||||
new MetadataChangeProposalConfig.ConsumerBatchConfig()
|
||||
.setSize(Integer.MAX_VALUE)));
|
||||
.setBatch(
|
||||
new MetadataChangeProposalConfig.BatchConfig()
|
||||
.setSize(Integer.MAX_VALUE))));
|
||||
|
||||
// Create MCPs
|
||||
MetadataChangeProposal mcp1 = new MetadataChangeProposal();
|
||||
@ -302,9 +304,11 @@ public class BatchMetadataChangeProposalsProcessorTest {
|
||||
when(mockProvider.getMetadataChangeProposal())
|
||||
.thenReturn(
|
||||
new MetadataChangeProposalConfig()
|
||||
.setBatch(
|
||||
.setConsumer(
|
||||
new MetadataChangeProposalConfig.ConsumerBatchConfig()
|
||||
.setSize(Integer.MAX_VALUE)));
|
||||
.setBatch(
|
||||
new MetadataChangeProposalConfig.BatchConfig()
|
||||
.setSize(Integer.MAX_VALUE))));
|
||||
|
||||
// Create 3 Invalid MCPs
|
||||
MetadataChangeProposal mcp1 = new MetadataChangeProposal();
|
||||
@ -412,11 +416,14 @@ public class BatchMetadataChangeProposalsProcessorTest {
|
||||
@Test
|
||||
public void testLargeBatchPartitioning() throws Exception {
|
||||
// Mock the ConfigurationProvider to return a specific batch size limit
|
||||
MetadataChangeProposalConfig.ConsumerBatchConfig mockConfig =
|
||||
mock(MetadataChangeProposalConfig.ConsumerBatchConfig.class);
|
||||
when(mockConfig.getSize()).thenReturn(5 * 1024); // 5KB batch size limit for testing
|
||||
MetadataChangeProposalConfig.ConsumerBatchConfig batchConfig =
|
||||
new MetadataChangeProposalConfig.ConsumerBatchConfig()
|
||||
.setBatch(
|
||||
new MetadataChangeProposalConfig.BatchConfig()
|
||||
.setSize(5 * 1024)
|
||||
.setEnabled(true)); // 5KB batch size limit for testing
|
||||
when(mockProvider.getMetadataChangeProposal())
|
||||
.thenReturn(new MetadataChangeProposalConfig().setBatch(mockConfig));
|
||||
.thenReturn(new MetadataChangeProposalConfig().setConsumer(batchConfig));
|
||||
|
||||
// Create 3 MCPs, one with a large aspect value
|
||||
MetadataChangeProposal smallMcp1 = createMcpWithAspectSize(1000); // 1KB
|
||||
@ -456,11 +463,15 @@ public class BatchMetadataChangeProposalsProcessorTest {
|
||||
@Test
|
||||
public void testExtremelyLargeAspect() throws Exception {
|
||||
// Mock the ConfigurationProvider to return a specific batch size limit
|
||||
MetadataChangeProposalConfig.ConsumerBatchConfig mockConfig =
|
||||
mock(MetadataChangeProposalConfig.ConsumerBatchConfig.class);
|
||||
when(mockConfig.getSize()).thenReturn(10000); // 10KB batch size limit for testing
|
||||
MetadataChangeProposalConfig.ConsumerBatchConfig batchConfig =
|
||||
new MetadataChangeProposalConfig.ConsumerBatchConfig()
|
||||
.setBatch(
|
||||
new MetadataChangeProposalConfig.BatchConfig()
|
||||
.setSize(10000)
|
||||
.setEnabled(true)); // 10KB batch size limit for testing
|
||||
when(mockProvider.getMetadataChangeProposal())
|
||||
.thenReturn(new MetadataChangeProposalConfig().setBatch(mockConfig));
|
||||
.thenReturn(new MetadataChangeProposalConfig().setConsumer(batchConfig));
|
||||
mock(MetadataChangeProposalConfig.ConsumerBatchConfig.class);
|
||||
|
||||
// Create an MCP with an aspect value that exceeds the batch size on its own
|
||||
MetadataChangeProposal hugeMcp =
|
||||
|
@ -16,6 +16,9 @@ dependencies {
|
||||
compileOnly externalDependency.lombok
|
||||
|
||||
annotationProcessor externalDependency.lombok
|
||||
|
||||
testImplementation externalDependency.testng
|
||||
testImplementation externalDependency.springBootTest
|
||||
}
|
||||
|
||||
processResources.configure {
|
||||
|
@ -7,8 +7,12 @@ import com.linkedin.metadata.config.search.ElasticSearchConfiguration;
|
||||
import com.linkedin.metadata.config.search.SearchServiceConfiguration;
|
||||
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Data
|
||||
@Component
|
||||
@ConfigurationProperties
|
||||
public class DataHubAppConfiguration {
|
||||
|
||||
/** Ingestion related configs */
|
||||
|
@ -6,7 +6,7 @@ import lombok.experimental.Accessors;
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
public class MetadataChangeProposalConfig {
|
||||
ConsumerBatchConfig batch;
|
||||
ConsumerBatchConfig consumer;
|
||||
ThrottlesConfig throttle;
|
||||
MCPValidationConfig validation;
|
||||
SideEffectsConfig sideEffects;
|
||||
@ -64,8 +64,14 @@ public class MetadataChangeProposalConfig {
|
||||
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
public static class ConsumerBatchConfig {
|
||||
public static class BatchConfig {
|
||||
boolean enabled;
|
||||
Integer size;
|
||||
}
|
||||
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
public static class ConsumerBatchConfig {
|
||||
BatchConfig batch;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,40 @@
|
||||
package com.linkedin.metadata.config;
|
||||
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.assertFalse;
|
||||
import static org.testng.Assert.assertNotNull;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@SpringBootTest(classes = DataHubTestApplication.class)
|
||||
public class DataHubAppConfigurationTest extends AbstractTestNGSpringContextTests {
|
||||
|
||||
@Autowired private DataHubTestApplication testApplication;
|
||||
|
||||
@Test
|
||||
public void testInit() {
|
||||
assertNotNull(testApplication);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMCPBatchDefaults() {
|
||||
assertFalse(
|
||||
testApplication
|
||||
.getDataHubAppConfig()
|
||||
.getMetadataChangeProposal()
|
||||
.getConsumer()
|
||||
.getBatch()
|
||||
.isEnabled());
|
||||
assertEquals(
|
||||
testApplication
|
||||
.getDataHubAppConfig()
|
||||
.getMetadataChangeProposal()
|
||||
.getConsumer()
|
||||
.getBatch()
|
||||
.getSize(),
|
||||
15744000);
|
||||
}
|
||||
}
|
@ -0,0 +1,16 @@
|
||||
package com.linkedin.metadata.config;
|
||||
|
||||
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
|
||||
@SpringBootApplication
|
||||
@PropertySource(value = "classpath:/application.yaml", factory = YamlPropertySourceFactory.class)
|
||||
public class DataHubTestApplication {
|
||||
@Autowired private DataHubAppConfiguration dataHubAppConfig;
|
||||
|
||||
public DataHubAppConfiguration getDataHubAppConfig() {
|
||||
return dataHubAppConfig;
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user