mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-14 19:51:39 +00:00
misc(metrics): fix legacy metrics cache & tests (#14011)
This commit is contained in:
parent
602f178afd
commit
186a48c5d2
@ -4,10 +4,14 @@ import static com.linkedin.metadata.Constants.DATASET_PROPERTIES_ASPECT_NAME;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyList;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.mockStatic;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
@ -49,6 +53,7 @@ import io.opentelemetry.api.trace.StatusCode;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
@ -544,6 +549,100 @@ public class BatchMetadataChangeProposalsProcessorTest {
|
||||
assertTrue(withAspectSize >= 1500); // Base size + aspect size
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumeWithTelemetryMetrics() throws Exception {
|
||||
// Mock the metric utils
|
||||
MetricUtils mockMetricUtils = mock(MetricUtils.class);
|
||||
|
||||
// Create a mock operation context
|
||||
OperationContext opContextWithMetrics = spy(opContext);
|
||||
|
||||
// Mock the metric utils to be present
|
||||
when(opContextWithMetrics.getMetricUtils()).thenReturn(Optional.of(mockMetricUtils));
|
||||
|
||||
// Mock the withQueueSpan to execute the runnable directly
|
||||
// The method signature in OperationContext is:
|
||||
// public void withQueueSpan(String name, List<SystemMetadata> systemMetadata, String topicName,
|
||||
// Runnable task, String... attributes)
|
||||
doAnswer(
|
||||
invocation -> {
|
||||
Runnable runnable = invocation.getArgument(3);
|
||||
runnable.run();
|
||||
return null;
|
||||
})
|
||||
.when(opContextWithMetrics)
|
||||
.withQueueSpan(
|
||||
anyString(), // operation name
|
||||
anyList(), // system metadata list
|
||||
anyString(), // topic name
|
||||
any(Runnable.class), // task
|
||||
anyString(),
|
||||
anyString(), // BATCH_SIZE_ATTR and its value
|
||||
anyString(),
|
||||
anyString() // MetricUtils.DROPWIZARD_NAME and metric name
|
||||
);
|
||||
|
||||
BatchMetadataChangeProposalsProcessor processorWithTelemetry =
|
||||
new BatchMetadataChangeProposalsProcessor(
|
||||
opContextWithMetrics,
|
||||
entityClient,
|
||||
mockKafkaProducer,
|
||||
mockKafkaThrottle,
|
||||
mockRegistry,
|
||||
mockProvider);
|
||||
|
||||
// Set required fields via reflection
|
||||
try {
|
||||
java.lang.reflect.Field field =
|
||||
BatchMetadataChangeProposalsProcessor.class.getDeclaredField("fmcpTopicName");
|
||||
field.setAccessible(true);
|
||||
field.set(processorWithTelemetry, Topics.FAILED_METADATA_CHANGE_PROPOSAL);
|
||||
|
||||
field = BatchMetadataChangeProposalsProcessor.class.getDeclaredField("mceConsumerGroupId");
|
||||
field.setAccessible(true);
|
||||
field.set(processorWithTelemetry, "MetadataChangeProposal-Consumer");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to set field via reflection", e);
|
||||
}
|
||||
|
||||
// Setup minimal configuration
|
||||
when(mockProvider.getMetadataChangeProposal())
|
||||
.thenReturn(
|
||||
new MetadataChangeProposalConfig()
|
||||
.setConsumer(
|
||||
new MetadataChangeProposalConfig.ConsumerBatchConfig()
|
||||
.setBatch(
|
||||
new MetadataChangeProposalConfig.BatchConfig()
|
||||
.setSize(Integer.MAX_VALUE))));
|
||||
|
||||
// Create a simple MCP
|
||||
MetadataChangeProposal mcp = new MetadataChangeProposal();
|
||||
mcp.setSystemMetadata(new SystemMetadata());
|
||||
mcp.setChangeType(ChangeType.UPSERT);
|
||||
mcp.setEntityUrn(UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,testMetrics,PROD)"));
|
||||
mcp.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false)));
|
||||
mcp.setEntityType("dataset");
|
||||
mcp.setAspectName("status");
|
||||
|
||||
// Mock conversion
|
||||
eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord1)).thenReturn(mcp);
|
||||
|
||||
// Create a single consumer record with a timestamp
|
||||
when(mockConsumerRecord1.timestamp())
|
||||
.thenReturn(System.currentTimeMillis() - 1000); // 1 second ago
|
||||
List<ConsumerRecord<String, GenericRecord>> records = List.of(mockConsumerRecord1);
|
||||
|
||||
// Execute test - this should trigger the metricUtils.ifPresent line
|
||||
processorWithTelemetry.consume(records);
|
||||
|
||||
// Verify that the metricUtils.histogram was called for kafka lag
|
||||
verify(mockMetricUtils, times(1))
|
||||
.histogram(eq(BatchMetadataChangeProposalsProcessor.class), eq("kafkaLag"), anyLong());
|
||||
|
||||
// Verify that the processing completed successfully
|
||||
verify(mockEntityService, times(1)).ingestProposal(any(), any(), eq(false));
|
||||
}
|
||||
|
||||
// Helper method to create an MCP with a specific aspect value size
|
||||
private MetadataChangeProposal createMcpWithAspectSize(int size) {
|
||||
MetadataChangeProposal mcp = new MetadataChangeProposal();
|
||||
|
@ -1,7 +1,6 @@
|
||||
package com.linkedin.gms.factory.entityregistry;
|
||||
|
||||
import com.datahub.plugins.metadata.aspect.SpringPluginFactory;
|
||||
import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration;
|
||||
import com.linkedin.metadata.aspect.plugins.PluginFactory;
|
||||
import com.linkedin.metadata.aspect.plugins.config.PluginConfiguration;
|
||||
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
|
||||
@ -30,9 +29,7 @@ public class ConfigEntityRegistryFactory {
|
||||
|
||||
@Bean(name = "configEntityRegistry")
|
||||
@Nonnull
|
||||
protected ConfigEntityRegistry getInstance(
|
||||
SpringStandardPluginConfiguration springStandardPluginConfiguration)
|
||||
throws IOException, EntityRegistryException {
|
||||
protected ConfigEntityRegistry getInstance() throws IOException, EntityRegistryException {
|
||||
BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider =
|
||||
(config, loaders) -> new SpringPluginFactory(applicationContext, config, loaders);
|
||||
if (entityRegistryConfigPath != null) {
|
||||
|
@ -0,0 +1,501 @@
|
||||
package com.linkedin.gms.factory.graphql;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
import com.datahub.authentication.group.GroupService;
|
||||
import com.datahub.authentication.invite.InviteTokenService;
|
||||
import com.datahub.authentication.post.PostService;
|
||||
import com.datahub.authentication.token.StatefulTokenService;
|
||||
import com.datahub.authentication.user.NativeUserService;
|
||||
import com.datahub.authorization.role.RoleService;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
|
||||
import com.linkedin.datahub.graphql.GraphQLEngine;
|
||||
import com.linkedin.entity.client.EntityClient;
|
||||
import com.linkedin.entity.client.SystemEntityClient;
|
||||
import com.linkedin.gms.factory.config.ConfigurationProvider;
|
||||
import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration;
|
||||
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
|
||||
import com.linkedin.metadata.connection.ConnectionService;
|
||||
import com.linkedin.metadata.entity.EntityService;
|
||||
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
|
||||
import com.linkedin.metadata.graph.GraphClient;
|
||||
import com.linkedin.metadata.graph.GraphService;
|
||||
import com.linkedin.metadata.graph.SiblingGraphService;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||
import com.linkedin.metadata.recommendation.RecommendationsService;
|
||||
import com.linkedin.metadata.recommendation.candidatesource.RecentlySearchedSource;
|
||||
import com.linkedin.metadata.recommendation.candidatesource.RecentlyViewedSource;
|
||||
import com.linkedin.metadata.search.EntitySearchService;
|
||||
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain;
|
||||
import com.linkedin.metadata.service.*;
|
||||
import com.linkedin.metadata.timeline.TimelineService;
|
||||
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
|
||||
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
|
||||
import com.linkedin.metadata.utils.metrics.MetricUtils;
|
||||
import com.linkedin.metadata.version.GitVersion;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import io.datahubproject.metadata.context.SystemTelemetryContext;
|
||||
import io.datahubproject.metadata.services.RestrictedService;
|
||||
import io.datahubproject.metadata.services.SecretService;
|
||||
import io.datahubproject.test.metadata.context.TestOperationContexts;
|
||||
import io.opentelemetry.api.trace.Tracer;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import org.opensearch.client.RestHighLevelClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.BeforeTest;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@SpringBootTest(classes = {ConfigurationProvider.class, GraphQLEngineFactory.class})
|
||||
@ContextConfiguration(classes = GraphQLEngineFactoryTest.TestConfig.class)
|
||||
@TestPropertySource(
|
||||
locations = "classpath:/application.yaml",
|
||||
properties = {
|
||||
"platformAnalytics.enabled=false",
|
||||
"graphQL.concurrency.separateThreadPool=true",
|
||||
"LINEAGE_DEFAULT_LAST_DAYS_FILTER=30"
|
||||
})
|
||||
public class GraphQLEngineFactoryTest extends AbstractTestNGSpringContextTests {
|
||||
|
||||
@BeforeTest
|
||||
public void setup() {
|
||||
PathSpecBasedSchemaAnnotationVisitor.class
|
||||
.getClassLoader()
|
||||
.setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false);
|
||||
}
|
||||
|
||||
@Autowired private GraphQLEngineFactory graphQLEngineFactory;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("graphQLEngine")
|
||||
private GraphQLEngine graphQLEngine;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("graphQLWorkerPool")
|
||||
private ExecutorService graphQLWorkerPool;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("configurationProvider")
|
||||
private ConfigurationProvider configurationProvider;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("elasticSearchRestHighLevelClient")
|
||||
private RestHighLevelClient elasticClient;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("indexConvention")
|
||||
private IndexConvention indexConvention;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("graphClient")
|
||||
private GraphClient graphClient;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("entityService")
|
||||
private EntityService<?> entityService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("entitySearchService")
|
||||
private EntitySearchService entitySearchService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("graphService")
|
||||
private GraphService graphService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("siblingGraphService")
|
||||
private SiblingGraphService siblingGraphService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("timeseriesAspectService")
|
||||
private TimeseriesAspectService timeseriesAspectService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("recommendationsService")
|
||||
private RecommendationsService recommendationsService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("dataHubTokenService")
|
||||
private StatefulTokenService statefulTokenService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("dataHubSecretService")
|
||||
private SecretService secretService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("gitVersion")
|
||||
private GitVersion gitVersion;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("timelineService")
|
||||
private TimelineService timelineService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("nativeUserService")
|
||||
private NativeUserService nativeUserService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("groupService")
|
||||
private GroupService groupService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("roleService")
|
||||
private RoleService roleService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("inviteTokenService")
|
||||
private InviteTokenService inviteTokenService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("postService")
|
||||
private PostService postService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("viewService")
|
||||
private ViewService viewService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("ownerShipTypeService")
|
||||
private OwnershipTypeService ownershipTypeService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("settingsService")
|
||||
private SettingsService settingsService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("lineageService")
|
||||
private LineageService lineageService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("queryService")
|
||||
private QueryService queryService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("erModelRelationshipService")
|
||||
private ERModelRelationshipService erModelRelationshipService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("dataProductService")
|
||||
private DataProductService dataProductService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("applicationService")
|
||||
private ApplicationService applicationService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("formService")
|
||||
private FormService formService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("restrictedService")
|
||||
private RestrictedService restrictedService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("businessAttributeService")
|
||||
private BusinessAttributeService businessAttributeService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("connectionService")
|
||||
private ConnectionService connectionService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("assertionService")
|
||||
private AssertionService assertionService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("entityClient")
|
||||
private EntityClient entityClient;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("systemEntityClient")
|
||||
private SystemEntityClient systemEntityClient;
|
||||
|
||||
@MockBean private EntityVersioningService entityVersioningService;
|
||||
|
||||
@MockBean private MetricUtils metricUtils;
|
||||
|
||||
@MockBean private EntityRegistry entityRegistry;
|
||||
|
||||
@MockBean private QueryFilterRewriteChain queryFilterRewriteChain;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("recentlyViewedCandidateSource")
|
||||
private RecentlyViewedSource recentlyViewedSource;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("recentlySearchedCandidateSource")
|
||||
private RecentlySearchedSource recentlySearchedSource;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("pageTemplateService")
|
||||
private PageTemplateService pageTemplateService;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("baseElasticSearchComponents")
|
||||
private BaseElasticSearchComponentsFactory.BaseElasticSearchComponents
|
||||
baseElasticSearchComponents;
|
||||
|
||||
@MockBean
|
||||
@Qualifier("pageModuleService")
|
||||
private PageModuleService pageModuleService;
|
||||
|
||||
@Value("${platformAnalytics.enabled}")
|
||||
private Boolean isAnalyticsEnabled;
|
||||
|
||||
@Value("${LINEAGE_DEFAULT_LAST_DAYS_FILTER:#{null}}")
|
||||
private Integer defaultLineageLastDaysFilter;
|
||||
|
||||
@BeforeMethod
|
||||
public void setUp() {
|
||||
// Set up default mock behaviors
|
||||
when(graphService.supportsMultiHop()).thenReturn(true);
|
||||
when(metricUtils.getRegistry()).thenReturn(java.util.Optional.empty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGraphQLEngineCreation() {
|
||||
// Then
|
||||
assertNotNull(graphQLEngine);
|
||||
|
||||
// Verify analytics is disabled as per property
|
||||
assertFalse(isAnalyticsEnabled);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGraphQLWorkerPoolCreation() {
|
||||
// Then
|
||||
assertNotNull(graphQLWorkerPool);
|
||||
assertTrue(graphQLWorkerPool instanceof ThreadPoolExecutor);
|
||||
|
||||
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) graphQLWorkerPool;
|
||||
|
||||
// The default configuration should use default values
|
||||
assertTrue(threadPool.getCorePoolSize() > 0);
|
||||
assertTrue(threadPool.getMaximumPoolSize() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigurationProviderDefaults() {
|
||||
// Verify ConfigurationProvider returns non-null configurations with defaults
|
||||
assertNotNull(configurationProvider);
|
||||
assertNotNull(configurationProvider.getIngestion());
|
||||
assertNotNull(configurationProvider.getAuthentication());
|
||||
assertNotNull(configurationProvider.getAuthorization());
|
||||
assertNotNull(configurationProvider.getVisualConfig());
|
||||
assertNotNull(configurationProvider.getTelemetry());
|
||||
assertNotNull(configurationProvider.getMetadataTests());
|
||||
assertNotNull(configurationProvider.getDatahub());
|
||||
assertNotNull(configurationProvider.getViews());
|
||||
assertNotNull(configurationProvider.getSearchBar());
|
||||
assertNotNull(configurationProvider.getHomePage());
|
||||
assertNotNull(configurationProvider.getFeatureFlags());
|
||||
assertNotNull(configurationProvider.getGraphQL());
|
||||
assertNotNull(configurationProvider.getChromeExtension());
|
||||
assertNotNull(configurationProvider.getCache());
|
||||
|
||||
// Verify nested configurations
|
||||
assertNotNull(configurationProvider.getCache().getClient());
|
||||
assertNotNull(configurationProvider.getCache().getClient().getUsageClient());
|
||||
assertNotNull(configurationProvider.getGraphQL().getConcurrency());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLineageDefaultDaysFilter() {
|
||||
// Then
|
||||
assertEquals(defaultLineageLastDaysFilter, Integer.valueOf(30));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGraphQLEngineWithAnalyticsEnabled() {
|
||||
// Create a new factory instance with analytics enabled
|
||||
GraphQLEngineFactory factoryWithAnalytics = new GraphQLEngineFactory();
|
||||
|
||||
// Set up dependencies using reflection
|
||||
setField(factoryWithAnalytics, "elasticClient", elasticClient);
|
||||
setField(factoryWithAnalytics, "indexConvention", indexConvention);
|
||||
setField(factoryWithAnalytics, "graphClient", graphClient);
|
||||
setField(factoryWithAnalytics, "entityService", entityService);
|
||||
setField(factoryWithAnalytics, "graphService", graphService);
|
||||
setField(factoryWithAnalytics, "siblingGraphService", siblingGraphService);
|
||||
setField(factoryWithAnalytics, "timeseriesAspectService", timeseriesAspectService);
|
||||
setField(factoryWithAnalytics, "recommendationsService", recommendationsService);
|
||||
setField(factoryWithAnalytics, "statefulTokenService", statefulTokenService);
|
||||
setField(factoryWithAnalytics, "secretService", secretService);
|
||||
setField(factoryWithAnalytics, "entityRegistry", entityRegistry);
|
||||
setField(factoryWithAnalytics, "configProvider", configurationProvider);
|
||||
setField(factoryWithAnalytics, "gitVersion", gitVersion);
|
||||
setField(factoryWithAnalytics, "timelineService", timelineService);
|
||||
setField(factoryWithAnalytics, "nativeUserService", nativeUserService);
|
||||
setField(factoryWithAnalytics, "groupService", groupService);
|
||||
setField(factoryWithAnalytics, "roleService", roleService);
|
||||
setField(factoryWithAnalytics, "inviteTokenService", inviteTokenService);
|
||||
setField(factoryWithAnalytics, "postService", postService);
|
||||
setField(factoryWithAnalytics, "viewService", viewService);
|
||||
setField(factoryWithAnalytics, "ownershipTypeService", ownershipTypeService);
|
||||
setField(factoryWithAnalytics, "settingsService", settingsService);
|
||||
setField(factoryWithAnalytics, "lineageService", lineageService);
|
||||
setField(factoryWithAnalytics, "queryService", queryService);
|
||||
setField(factoryWithAnalytics, "erModelRelationshipService", erModelRelationshipService);
|
||||
setField(factoryWithAnalytics, "dataProductService", dataProductService);
|
||||
setField(factoryWithAnalytics, "applicationService", applicationService);
|
||||
setField(factoryWithAnalytics, "formService", formService);
|
||||
setField(factoryWithAnalytics, "restrictedService", restrictedService);
|
||||
setField(factoryWithAnalytics, "businessAttributeService", businessAttributeService);
|
||||
setField(factoryWithAnalytics, "_connectionService", connectionService);
|
||||
setField(factoryWithAnalytics, "assertionService", assertionService);
|
||||
setField(factoryWithAnalytics, "isAnalyticsEnabled", true);
|
||||
|
||||
// When
|
||||
GraphQLEngine engineWithAnalytics =
|
||||
factoryWithAnalytics.graphQLEngine(
|
||||
entityClient, systemEntityClient, entityVersioningService, metricUtils);
|
||||
|
||||
// Then
|
||||
assertNotNull(engineWithAnalytics);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGraphQLWorkerPoolMetricsRegistration() {
|
||||
// Then
|
||||
assertNotNull(graphQLWorkerPool);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllServicesAreWired() {
|
||||
// Verify all required services are injected
|
||||
assertNotNull(elasticClient);
|
||||
assertNotNull(indexConvention);
|
||||
assertNotNull(graphClient);
|
||||
assertNotNull(entityService);
|
||||
assertNotNull(entitySearchService);
|
||||
assertNotNull(graphService);
|
||||
assertNotNull(siblingGraphService);
|
||||
assertNotNull(timeseriesAspectService);
|
||||
assertNotNull(recommendationsService);
|
||||
assertNotNull(statefulTokenService);
|
||||
assertNotNull(secretService);
|
||||
assertNotNull(entityRegistry);
|
||||
assertNotNull(gitVersion);
|
||||
assertNotNull(timelineService);
|
||||
assertNotNull(nativeUserService);
|
||||
assertNotNull(groupService);
|
||||
assertNotNull(roleService);
|
||||
assertNotNull(inviteTokenService);
|
||||
assertNotNull(postService);
|
||||
assertNotNull(viewService);
|
||||
assertNotNull(ownershipTypeService);
|
||||
assertNotNull(settingsService);
|
||||
assertNotNull(lineageService);
|
||||
assertNotNull(queryService);
|
||||
assertNotNull(erModelRelationshipService);
|
||||
assertNotNull(dataProductService);
|
||||
assertNotNull(applicationService);
|
||||
assertNotNull(formService);
|
||||
assertNotNull(restrictedService);
|
||||
assertNotNull(businessAttributeService);
|
||||
assertNotNull(connectionService);
|
||||
assertNotNull(assertionService);
|
||||
assertNotNull(entityClient);
|
||||
assertNotNull(systemEntityClient);
|
||||
assertNotNull(entityVersioningService);
|
||||
assertNotNull(metricUtils);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGraphQLConcurrencyConfiguration() {
|
||||
// Test the actual concurrency configuration from the default ConfigurationProvider
|
||||
var concurrencyConfig = configurationProvider.getGraphQL().getConcurrency();
|
||||
assertNotNull(concurrencyConfig);
|
||||
|
||||
// These should have default values
|
||||
assertNotNull(concurrencyConfig.getCorePoolSize());
|
||||
assertNotNull(concurrencyConfig.getMaxPoolSize());
|
||||
assertNotNull(concurrencyConfig.getKeepAlive());
|
||||
assertNotNull(concurrencyConfig.getStackSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGraphQLWorkerPoolWithDifferentConfiguration() {
|
||||
// Test worker pool creation with different configurations
|
||||
var concurrencyConfig = configurationProvider.getGraphQL().getConcurrency();
|
||||
|
||||
// Create a new factory to test different scenarios
|
||||
ExecutorService executorService = graphQLEngineFactory.graphQLWorkerPool(metricUtils);
|
||||
assertNotNull(executorService);
|
||||
|
||||
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService;
|
||||
|
||||
// If core pool size is negative, it should use default calculation
|
||||
if (concurrencyConfig.getCorePoolSize() < 0) {
|
||||
assertEquals(threadPool.getCorePoolSize(), Runtime.getRuntime().availableProcessors() * 5);
|
||||
} else {
|
||||
assertEquals(threadPool.getCorePoolSize(), concurrencyConfig.getCorePoolSize());
|
||||
}
|
||||
|
||||
// If max pool size is zero or negative, it should use default calculation
|
||||
if (concurrencyConfig.getMaxPoolSize() <= 0) {
|
||||
assertEquals(
|
||||
threadPool.getMaximumPoolSize(), Runtime.getRuntime().availableProcessors() * 100);
|
||||
} else {
|
||||
assertEquals(threadPool.getMaximumPoolSize(), concurrencyConfig.getMaxPoolSize());
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
executorService.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStsClientCreationHandlesException() {
|
||||
// The factory should handle StsClient creation exceptions gracefully
|
||||
// This is tested implicitly by the successful creation of graphQLEngine
|
||||
assertNotNull(graphQLEngine);
|
||||
}
|
||||
|
||||
private void setField(Object target, String fieldName, Object value) {
|
||||
try {
|
||||
java.lang.reflect.Field field = target.getClass().getDeclaredField(fieldName);
|
||||
field.setAccessible(true);
|
||||
field.set(target, value);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to set field: " + fieldName, e);
|
||||
}
|
||||
}
|
||||
|
||||
@org.springframework.context.annotation.Configuration
|
||||
static class TestConfig {
|
||||
|
||||
@Bean
|
||||
public SpringStandardPluginConfiguration springStandardPluginConfiguration() {
|
||||
return new SpringStandardPluginConfiguration();
|
||||
}
|
||||
|
||||
@Bean("systemOperationContext")
|
||||
public OperationContext systemOperationContext(MetricUtils metricUtils) {
|
||||
OperationContext defaultContext = TestOperationContexts.systemContextNoSearchAuthorization();
|
||||
return defaultContext.toBuilder()
|
||||
.systemTelemetryContext(
|
||||
SystemTelemetryContext.builder()
|
||||
.metricUtils(metricUtils)
|
||||
.tracer(mock(Tracer.class))
|
||||
.build())
|
||||
.build(defaultContext.getSystemActorContext().getAuthentication(), false);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ObjectMapper objectMapper(
|
||||
@Qualifier("systemOperationContext") OperationContext operationContext) {
|
||||
return operationContext.getObjectMapper();
|
||||
}
|
||||
}
|
||||
}
|
@ -6,7 +6,9 @@ import io.micrometer.core.instrument.DistributionSummary;
|
||||
import io.micrometer.core.instrument.Gauge;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.Builder;
|
||||
@ -36,6 +38,7 @@ public class MetricUtils {
|
||||
@Deprecated public static final String DELIMITER = "_";
|
||||
|
||||
private final MeterRegistry registry;
|
||||
private final Map<String, Timer> legacyTimeCache = new ConcurrentHashMap<>();
|
||||
|
||||
public Optional<MeterRegistry> getRegistry() {
|
||||
return Optional.ofNullable(registry);
|
||||
@ -45,12 +48,17 @@ public class MetricUtils {
|
||||
public void time(String dropWizardMetricName, long durationNanos) {
|
||||
getRegistry()
|
||||
.ifPresent(
|
||||
meterRegistry ->
|
||||
Timer.builder(dropWizardMetricName)
|
||||
.tags(DROPWIZARD_METRIC, "true")
|
||||
.publishPercentiles(0.5, 0.75, 0.95, 0.98, 0.99, 0.999) // Dropwizard defaults
|
||||
.register(meterRegistry)
|
||||
.record(durationNanos, TimeUnit.NANOSECONDS));
|
||||
meterRegistry -> {
|
||||
Timer timer =
|
||||
legacyTimeCache.computeIfAbsent(
|
||||
dropWizardMetricName,
|
||||
name ->
|
||||
Timer.builder(name)
|
||||
.tags(DROPWIZARD_METRIC, "true")
|
||||
.publishPercentiles(0.5, 0.75, 0.95, 0.98, 0.99, 0.999)
|
||||
.register(meterRegistry));
|
||||
timer.record(durationNanos, TimeUnit.NANOSECONDS);
|
||||
});
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
|
@ -0,0 +1,252 @@
|
||||
package com.linkedin.metadata.utils.metrics;
|
||||
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class MetricUtilsTimerCacheTest {
|
||||
|
||||
private SimpleMeterRegistry meterRegistry;
|
||||
private MetricUtils metricUtils;
|
||||
|
||||
@BeforeMethod
|
||||
public void setUp() {
|
||||
meterRegistry = new SimpleMeterRegistry();
|
||||
metricUtils = MetricUtils.builder().registry(meterRegistry).build();
|
||||
}
|
||||
|
||||
@AfterMethod
|
||||
public void tearDown() {
|
||||
meterRegistry.clear();
|
||||
meterRegistry.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimerCacheReusesExistingTimer() {
|
||||
String metricName = "test.cached.timer";
|
||||
|
||||
// First call - should create new timer
|
||||
metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(100));
|
||||
|
||||
// Get the timer instance
|
||||
Timer firstTimer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true");
|
||||
assertNotNull(firstTimer);
|
||||
assertEquals(firstTimer.count(), 1);
|
||||
|
||||
// Second call - should reuse the same timer
|
||||
metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(200));
|
||||
|
||||
// Should be the same timer instance
|
||||
Timer secondTimer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true");
|
||||
assertSame(firstTimer, secondTimer);
|
||||
assertEquals(secondTimer.count(), 2);
|
||||
|
||||
// Verify total time is sum of both recordings
|
||||
assertEquals(secondTimer.totalTime(TimeUnit.MILLISECONDS), 300.0, 0.1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimerCacheWithMultipleMetrics() {
|
||||
String metricName1 = "test.timer1";
|
||||
String metricName2 = "test.timer2";
|
||||
|
||||
// Record to first timer multiple times
|
||||
metricUtils.time(metricName1, TimeUnit.MILLISECONDS.toNanos(100));
|
||||
metricUtils.time(metricName1, TimeUnit.MILLISECONDS.toNanos(150));
|
||||
|
||||
// Record to second timer
|
||||
metricUtils.time(metricName2, TimeUnit.MILLISECONDS.toNanos(200));
|
||||
|
||||
// Verify first timer
|
||||
Timer timer1 = meterRegistry.timer(metricName1, MetricUtils.DROPWIZARD_METRIC, "true");
|
||||
assertEquals(timer1.count(), 2);
|
||||
assertEquals(timer1.totalTime(TimeUnit.MILLISECONDS), 250.0, 0.1);
|
||||
|
||||
// Verify second timer
|
||||
Timer timer2 = meterRegistry.timer(metricName2, MetricUtils.DROPWIZARD_METRIC, "true");
|
||||
assertEquals(timer2.count(), 1);
|
||||
assertEquals(timer2.totalTime(TimeUnit.MILLISECONDS), 200.0, 0.1);
|
||||
|
||||
// Verify they are different instances
|
||||
assertNotSame(timer1, timer2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimerCacheDoesNotDuplicatePercentileMetrics() {
|
||||
String metricName = "test.percentile.timer";
|
||||
|
||||
// Record multiple times - should not create duplicate percentile gauges
|
||||
for (int i = 0; i < 5; i++) {
|
||||
metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(100 * (i + 1)));
|
||||
}
|
||||
|
||||
// Count all meters that start with our metric name (timer + percentile gauges)
|
||||
long meterCount =
|
||||
meterRegistry.getMeters().stream()
|
||||
.filter(meter -> meter.getId().getName().startsWith(metricName))
|
||||
.count();
|
||||
|
||||
// Should have 1 timer + 6 percentile gauges (0.5, 0.75, 0.95, 0.98, 0.99, 0.999)
|
||||
// Without caching, we would have 5 timers + 30 percentile gauges
|
||||
assertEquals(meterCount, 7);
|
||||
|
||||
// Verify the timer recorded all 5 events
|
||||
Timer timer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true");
|
||||
assertEquals(timer.count(), 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimerCacheWorksWithConcurrentAccess() throws InterruptedException {
|
||||
String metricName = "test.concurrent.timer";
|
||||
int threadCount = 10;
|
||||
int recordsPerThread = 100;
|
||||
|
||||
// Create threads that will all try to record to the same timer
|
||||
Thread[] threads = new Thread[threadCount];
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
final int threadIndex = i;
|
||||
threads[i] =
|
||||
new Thread(
|
||||
() -> {
|
||||
for (int j = 0; j < recordsPerThread; j++) {
|
||||
metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(threadIndex * 10 + j));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Start all threads
|
||||
for (Thread thread : threads) {
|
||||
thread.start();
|
||||
}
|
||||
|
||||
// Wait for all threads to complete
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
// Verify only one timer was created and all recordings were captured
|
||||
Timer timer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true");
|
||||
assertEquals(timer.count(), threadCount * recordsPerThread);
|
||||
|
||||
// Verify no duplicate meters were created
|
||||
long timerCount =
|
||||
meterRegistry.getMeters().stream()
|
||||
.filter(meter -> meter.getId().getName().equals(metricName))
|
||||
.filter(meter -> meter instanceof Timer)
|
||||
.count();
|
||||
assertEquals(timerCount, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimerCacheHandlesSpecialCharactersInName() {
|
||||
// Test various metric names that might cause issues
|
||||
String[] metricNames = {
|
||||
"test.timer-with-dash",
|
||||
"test.timer_with_underscore",
|
||||
"test.timer.with.many.dots",
|
||||
"test.timer$with$dollar",
|
||||
"test.timer@with@at"
|
||||
};
|
||||
|
||||
for (String metricName : metricNames) {
|
||||
metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(100));
|
||||
metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(200));
|
||||
|
||||
Timer timer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true");
|
||||
assertNotNull(timer, "Timer should exist for metric: " + metricName);
|
||||
assertEquals(timer.count(), 2, "Timer should have 2 recordings for metric: " + metricName);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimerCacheWithVeryLongMetricName() {
|
||||
// Test with a very long metric name
|
||||
StringBuilder longNameBuilder = new StringBuilder("test.timer");
|
||||
for (int i = 0; i < 50; i++) {
|
||||
longNameBuilder.append(".component").append(i);
|
||||
}
|
||||
String longMetricName = longNameBuilder.toString();
|
||||
|
||||
metricUtils.time(longMetricName, TimeUnit.MILLISECONDS.toNanos(100));
|
||||
metricUtils.time(longMetricName, TimeUnit.MILLISECONDS.toNanos(200));
|
||||
|
||||
Timer timer = meterRegistry.timer(longMetricName, MetricUtils.DROPWIZARD_METRIC, "true");
|
||||
assertNotNull(timer);
|
||||
assertEquals(timer.count(), 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimerPercentileValuesAreRecorded() {
|
||||
String metricName = "test.percentile.values.timer";
|
||||
|
||||
// Record various values to test percentile calculation
|
||||
long[] durations = {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}; // milliseconds
|
||||
for (long duration : durations) {
|
||||
metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(duration));
|
||||
}
|
||||
|
||||
// Verify percentile gauges exist
|
||||
String[] expectedPercentiles = {"0.5", "0.75", "0.95", "0.98", "0.99", "0.999"};
|
||||
for (String percentile : expectedPercentiles) {
|
||||
boolean percentileExists =
|
||||
meterRegistry.getMeters().stream()
|
||||
.anyMatch(
|
||||
meter ->
|
||||
meter.getId().getName().equals(metricName + ".percentile")
|
||||
&& meter.getId().getTag("phi") != null
|
||||
&& meter.getId().getTag("phi").equals(percentile));
|
||||
assertTrue(percentileExists, "Percentile gauge should exist for: " + percentile);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimerCacheDoesNotLeakMemory() {
|
||||
// Test that the cache doesn't grow unbounded
|
||||
int uniqueMetrics = 1000;
|
||||
|
||||
for (int i = 0; i < uniqueMetrics; i++) {
|
||||
String metricName = "test.timer.memory." + i;
|
||||
metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(100));
|
||||
}
|
||||
|
||||
// Verify we have the expected number of timers
|
||||
long timerCount =
|
||||
meterRegistry.getMeters().stream()
|
||||
.filter(meter -> meter instanceof Timer)
|
||||
.filter(meter -> meter.getId().getName().startsWith("test.timer.memory."))
|
||||
.count();
|
||||
|
||||
assertEquals(timerCount, uniqueMetrics);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimerRecordsZeroDuration() {
|
||||
String metricName = "test.zero.duration.timer";
|
||||
|
||||
metricUtils.time(metricName, 0);
|
||||
|
||||
Timer timer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true");
|
||||
assertNotNull(timer);
|
||||
assertEquals(timer.count(), 1);
|
||||
assertEquals(timer.totalTime(TimeUnit.NANOSECONDS), 0.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimerRecordsVeryLargeDuration() {
|
||||
String metricName = "test.large.duration.timer";
|
||||
|
||||
// Record a very large duration (1 hour in nanoseconds)
|
||||
long largeNanos = TimeUnit.HOURS.toNanos(1);
|
||||
metricUtils.time(metricName, largeNanos);
|
||||
|
||||
Timer timer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true");
|
||||
assertNotNull(timer);
|
||||
assertEquals(timer.count(), 1);
|
||||
assertEquals(timer.totalTime(TimeUnit.HOURS), 1.0, 0.001);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user