From 186a48c5d2bb1f18abc0ca65718e9bcb800db0da Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Wed, 9 Jul 2025 17:27:09 -0500 Subject: [PATCH] misc(metrics): fix legacy metrics cache & tests (#14011) --- ...hMetadataChangeProposalsProcessorTest.java | 99 ++++ .../ConfigEntityRegistryFactory.java | 5 +- .../graphql/GraphQLEngineFactoryTest.java | 501 ++++++++++++++++++ .../metadata/utils/metrics/MetricUtils.java | 20 +- .../metrics/MetricUtilsTimerCacheTest.java | 252 +++++++++ 5 files changed, 867 insertions(+), 10 deletions(-) create mode 100644 metadata-service/factories/src/test/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactoryTest.java create mode 100644 metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MetricUtilsTimerCacheTest.java diff --git a/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java index debfef07fe..2cd17f931d 100644 --- a/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java +++ b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java @@ -4,10 +4,14 @@ import static com.linkedin.metadata.Constants.DATASET_PROPERTIES_ASPECT_NAME; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -49,6 +53,7 @@ import io.opentelemetry.api.trace.StatusCode; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.mockito.ArgumentCaptor; @@ -544,6 +549,100 @@ public class BatchMetadataChangeProposalsProcessorTest { assertTrue(withAspectSize >= 1500); // Base size + aspect size } + @Test + public void testConsumeWithTelemetryMetrics() throws Exception { + // Mock the metric utils + MetricUtils mockMetricUtils = mock(MetricUtils.class); + + // Create a mock operation context + OperationContext opContextWithMetrics = spy(opContext); + + // Mock the metric utils to be present + when(opContextWithMetrics.getMetricUtils()).thenReturn(Optional.of(mockMetricUtils)); + + // Mock the withQueueSpan to execute the runnable directly + // The method signature in OperationContext is: + // public void withQueueSpan(String name, List systemMetadata, String topicName, + // Runnable task, String... attributes) + doAnswer( + invocation -> { + Runnable runnable = invocation.getArgument(3); + runnable.run(); + return null; + }) + .when(opContextWithMetrics) + .withQueueSpan( + anyString(), // operation name + anyList(), // system metadata list + anyString(), // topic name + any(Runnable.class), // task + anyString(), + anyString(), // BATCH_SIZE_ATTR and its value + anyString(), + anyString() // MetricUtils.DROPWIZARD_NAME and metric name + ); + + BatchMetadataChangeProposalsProcessor processorWithTelemetry = + new BatchMetadataChangeProposalsProcessor( + opContextWithMetrics, + entityClient, + mockKafkaProducer, + mockKafkaThrottle, + mockRegistry, + mockProvider); + + // Set required fields via reflection + try { + java.lang.reflect.Field field = + BatchMetadataChangeProposalsProcessor.class.getDeclaredField("fmcpTopicName"); + field.setAccessible(true); + field.set(processorWithTelemetry, Topics.FAILED_METADATA_CHANGE_PROPOSAL); + + field = BatchMetadataChangeProposalsProcessor.class.getDeclaredField("mceConsumerGroupId"); + field.setAccessible(true); + field.set(processorWithTelemetry, "MetadataChangeProposal-Consumer"); + } catch (Exception e) { + throw new RuntimeException("Failed to set field via reflection", e); + } + + // Setup minimal configuration + when(mockProvider.getMetadataChangeProposal()) + .thenReturn( + new MetadataChangeProposalConfig() + .setConsumer( + new MetadataChangeProposalConfig.ConsumerBatchConfig() + .setBatch( + new MetadataChangeProposalConfig.BatchConfig() + .setSize(Integer.MAX_VALUE)))); + + // Create a simple MCP + MetadataChangeProposal mcp = new MetadataChangeProposal(); + mcp.setSystemMetadata(new SystemMetadata()); + mcp.setChangeType(ChangeType.UPSERT); + mcp.setEntityUrn(UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,testMetrics,PROD)")); + mcp.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false))); + mcp.setEntityType("dataset"); + mcp.setAspectName("status"); + + // Mock conversion + eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord1)).thenReturn(mcp); + + // Create a single consumer record with a timestamp + when(mockConsumerRecord1.timestamp()) + .thenReturn(System.currentTimeMillis() - 1000); // 1 second ago + List> records = List.of(mockConsumerRecord1); + + // Execute test - this should trigger the metricUtils.ifPresent line + processorWithTelemetry.consume(records); + + // Verify that the metricUtils.histogram was called for kafka lag + verify(mockMetricUtils, times(1)) + .histogram(eq(BatchMetadataChangeProposalsProcessor.class), eq("kafkaLag"), anyLong()); + + // Verify that the processing completed successfully + verify(mockEntityService, times(1)).ingestProposal(any(), any(), eq(false)); + } + // Helper method to create an MCP with a specific aspect value size private MetadataChangeProposal createMcpWithAspectSize(int size) { MetadataChangeProposal mcp = new MetadataChangeProposal(); diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java index 9f4dfb86c0..f1518f9c8f 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java @@ -1,7 +1,6 @@ package com.linkedin.gms.factory.entityregistry; import com.datahub.plugins.metadata.aspect.SpringPluginFactory; -import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration; import com.linkedin.metadata.aspect.plugins.PluginFactory; import com.linkedin.metadata.aspect.plugins.config.PluginConfiguration; import com.linkedin.metadata.models.registry.ConfigEntityRegistry; @@ -30,9 +29,7 @@ public class ConfigEntityRegistryFactory { @Bean(name = "configEntityRegistry") @Nonnull - protected ConfigEntityRegistry getInstance( - SpringStandardPluginConfiguration springStandardPluginConfiguration) - throws IOException, EntityRegistryException { + protected ConfigEntityRegistry getInstance() throws IOException, EntityRegistryException { BiFunction, PluginFactory> pluginFactoryProvider = (config, loaders) -> new SpringPluginFactory(applicationContext, config, loaders); if (entityRegistryConfigPath != null) { diff --git a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactoryTest.java b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactoryTest.java new file mode 100644 index 0000000000..0feea4c976 --- /dev/null +++ b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactoryTest.java @@ -0,0 +1,501 @@ +package com.linkedin.gms.factory.graphql; + +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +import com.datahub.authentication.group.GroupService; +import com.datahub.authentication.invite.InviteTokenService; +import com.datahub.authentication.post.PostService; +import com.datahub.authentication.token.StatefulTokenService; +import com.datahub.authentication.user.NativeUserService; +import com.datahub.authorization.role.RoleService; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor; +import com.linkedin.datahub.graphql.GraphQLEngine; +import com.linkedin.entity.client.EntityClient; +import com.linkedin.entity.client.SystemEntityClient; +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration; +import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory; +import com.linkedin.metadata.connection.ConnectionService; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.versioning.EntityVersioningService; +import com.linkedin.metadata.graph.GraphClient; +import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.graph.SiblingGraphService; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.recommendation.RecommendationsService; +import com.linkedin.metadata.recommendation.candidatesource.RecentlySearchedSource; +import com.linkedin.metadata.recommendation.candidatesource.RecentlyViewedSource; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain; +import com.linkedin.metadata.service.*; +import com.linkedin.metadata.timeline.TimelineService; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.metadata.version.GitVersion; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.metadata.context.SystemTelemetryContext; +import io.datahubproject.metadata.services.RestrictedService; +import io.datahubproject.metadata.services.SecretService; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import io.opentelemetry.api.trace.Tracer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import org.opensearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +@SpringBootTest(classes = {ConfigurationProvider.class, GraphQLEngineFactory.class}) +@ContextConfiguration(classes = GraphQLEngineFactoryTest.TestConfig.class) +@TestPropertySource( + locations = "classpath:/application.yaml", + properties = { + "platformAnalytics.enabled=false", + "graphQL.concurrency.separateThreadPool=true", + "LINEAGE_DEFAULT_LAST_DAYS_FILTER=30" + }) +public class GraphQLEngineFactoryTest extends AbstractTestNGSpringContextTests { + + @BeforeTest + public void setup() { + PathSpecBasedSchemaAnnotationVisitor.class + .getClassLoader() + .setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false); + } + + @Autowired private GraphQLEngineFactory graphQLEngineFactory; + + @Autowired + @Qualifier("graphQLEngine") + private GraphQLEngine graphQLEngine; + + @Autowired + @Qualifier("graphQLWorkerPool") + private ExecutorService graphQLWorkerPool; + + @Autowired + @Qualifier("configurationProvider") + private ConfigurationProvider configurationProvider; + + @MockBean + @Qualifier("elasticSearchRestHighLevelClient") + private RestHighLevelClient elasticClient; + + @MockBean + @Qualifier("indexConvention") + private IndexConvention indexConvention; + + @MockBean + @Qualifier("graphClient") + private GraphClient graphClient; + + @MockBean + @Qualifier("entityService") + private EntityService entityService; + + @MockBean + @Qualifier("entitySearchService") + private EntitySearchService entitySearchService; + + @MockBean + @Qualifier("graphService") + private GraphService graphService; + + @MockBean + @Qualifier("siblingGraphService") + private SiblingGraphService siblingGraphService; + + @MockBean + @Qualifier("timeseriesAspectService") + private TimeseriesAspectService timeseriesAspectService; + + @MockBean + @Qualifier("recommendationsService") + private RecommendationsService recommendationsService; + + @MockBean + @Qualifier("dataHubTokenService") + private StatefulTokenService statefulTokenService; + + @MockBean + @Qualifier("dataHubSecretService") + private SecretService secretService; + + @MockBean + @Qualifier("gitVersion") + private GitVersion gitVersion; + + @MockBean + @Qualifier("timelineService") + private TimelineService timelineService; + + @MockBean + @Qualifier("nativeUserService") + private NativeUserService nativeUserService; + + @MockBean + @Qualifier("groupService") + private GroupService groupService; + + @MockBean + @Qualifier("roleService") + private RoleService roleService; + + @MockBean + @Qualifier("inviteTokenService") + private InviteTokenService inviteTokenService; + + @MockBean + @Qualifier("postService") + private PostService postService; + + @MockBean + @Qualifier("viewService") + private ViewService viewService; + + @MockBean + @Qualifier("ownerShipTypeService") + private OwnershipTypeService ownershipTypeService; + + @MockBean + @Qualifier("settingsService") + private SettingsService settingsService; + + @MockBean + @Qualifier("lineageService") + private LineageService lineageService; + + @MockBean + @Qualifier("queryService") + private QueryService queryService; + + @MockBean + @Qualifier("erModelRelationshipService") + private ERModelRelationshipService erModelRelationshipService; + + @MockBean + @Qualifier("dataProductService") + private DataProductService dataProductService; + + @MockBean + @Qualifier("applicationService") + private ApplicationService applicationService; + + @MockBean + @Qualifier("formService") + private FormService formService; + + @MockBean + @Qualifier("restrictedService") + private RestrictedService restrictedService; + + @MockBean + @Qualifier("businessAttributeService") + private BusinessAttributeService businessAttributeService; + + @MockBean + @Qualifier("connectionService") + private ConnectionService connectionService; + + @MockBean + @Qualifier("assertionService") + private AssertionService assertionService; + + @MockBean + @Qualifier("entityClient") + private EntityClient entityClient; + + @MockBean + @Qualifier("systemEntityClient") + private SystemEntityClient systemEntityClient; + + @MockBean private EntityVersioningService entityVersioningService; + + @MockBean private MetricUtils metricUtils; + + @MockBean private EntityRegistry entityRegistry; + + @MockBean private QueryFilterRewriteChain queryFilterRewriteChain; + + @MockBean + @Qualifier("recentlyViewedCandidateSource") + private RecentlyViewedSource recentlyViewedSource; + + @MockBean + @Qualifier("recentlySearchedCandidateSource") + private RecentlySearchedSource recentlySearchedSource; + + @MockBean + @Qualifier("pageTemplateService") + private PageTemplateService pageTemplateService; + + @MockBean + @Qualifier("baseElasticSearchComponents") + private BaseElasticSearchComponentsFactory.BaseElasticSearchComponents + baseElasticSearchComponents; + + @MockBean + @Qualifier("pageModuleService") + private PageModuleService pageModuleService; + + @Value("${platformAnalytics.enabled}") + private Boolean isAnalyticsEnabled; + + @Value("${LINEAGE_DEFAULT_LAST_DAYS_FILTER:#{null}}") + private Integer defaultLineageLastDaysFilter; + + @BeforeMethod + public void setUp() { + // Set up default mock behaviors + when(graphService.supportsMultiHop()).thenReturn(true); + when(metricUtils.getRegistry()).thenReturn(java.util.Optional.empty()); + } + + @Test + public void testGraphQLEngineCreation() { + // Then + assertNotNull(graphQLEngine); + + // Verify analytics is disabled as per property + assertFalse(isAnalyticsEnabled); + } + + @Test + public void testGraphQLWorkerPoolCreation() { + // Then + assertNotNull(graphQLWorkerPool); + assertTrue(graphQLWorkerPool instanceof ThreadPoolExecutor); + + ThreadPoolExecutor threadPool = (ThreadPoolExecutor) graphQLWorkerPool; + + // The default configuration should use default values + assertTrue(threadPool.getCorePoolSize() > 0); + assertTrue(threadPool.getMaximumPoolSize() > 0); + } + + @Test + public void testConfigurationProviderDefaults() { + // Verify ConfigurationProvider returns non-null configurations with defaults + assertNotNull(configurationProvider); + assertNotNull(configurationProvider.getIngestion()); + assertNotNull(configurationProvider.getAuthentication()); + assertNotNull(configurationProvider.getAuthorization()); + assertNotNull(configurationProvider.getVisualConfig()); + assertNotNull(configurationProvider.getTelemetry()); + assertNotNull(configurationProvider.getMetadataTests()); + assertNotNull(configurationProvider.getDatahub()); + assertNotNull(configurationProvider.getViews()); + assertNotNull(configurationProvider.getSearchBar()); + assertNotNull(configurationProvider.getHomePage()); + assertNotNull(configurationProvider.getFeatureFlags()); + assertNotNull(configurationProvider.getGraphQL()); + assertNotNull(configurationProvider.getChromeExtension()); + assertNotNull(configurationProvider.getCache()); + + // Verify nested configurations + assertNotNull(configurationProvider.getCache().getClient()); + assertNotNull(configurationProvider.getCache().getClient().getUsageClient()); + assertNotNull(configurationProvider.getGraphQL().getConcurrency()); + } + + @Test + public void testLineageDefaultDaysFilter() { + // Then + assertEquals(defaultLineageLastDaysFilter, Integer.valueOf(30)); + } + + @Test + public void testGraphQLEngineWithAnalyticsEnabled() { + // Create a new factory instance with analytics enabled + GraphQLEngineFactory factoryWithAnalytics = new GraphQLEngineFactory(); + + // Set up dependencies using reflection + setField(factoryWithAnalytics, "elasticClient", elasticClient); + setField(factoryWithAnalytics, "indexConvention", indexConvention); + setField(factoryWithAnalytics, "graphClient", graphClient); + setField(factoryWithAnalytics, "entityService", entityService); + setField(factoryWithAnalytics, "graphService", graphService); + setField(factoryWithAnalytics, "siblingGraphService", siblingGraphService); + setField(factoryWithAnalytics, "timeseriesAspectService", timeseriesAspectService); + setField(factoryWithAnalytics, "recommendationsService", recommendationsService); + setField(factoryWithAnalytics, "statefulTokenService", statefulTokenService); + setField(factoryWithAnalytics, "secretService", secretService); + setField(factoryWithAnalytics, "entityRegistry", entityRegistry); + setField(factoryWithAnalytics, "configProvider", configurationProvider); + setField(factoryWithAnalytics, "gitVersion", gitVersion); + setField(factoryWithAnalytics, "timelineService", timelineService); + setField(factoryWithAnalytics, "nativeUserService", nativeUserService); + setField(factoryWithAnalytics, "groupService", groupService); + setField(factoryWithAnalytics, "roleService", roleService); + setField(factoryWithAnalytics, "inviteTokenService", inviteTokenService); + setField(factoryWithAnalytics, "postService", postService); + setField(factoryWithAnalytics, "viewService", viewService); + setField(factoryWithAnalytics, "ownershipTypeService", ownershipTypeService); + setField(factoryWithAnalytics, "settingsService", settingsService); + setField(factoryWithAnalytics, "lineageService", lineageService); + setField(factoryWithAnalytics, "queryService", queryService); + setField(factoryWithAnalytics, "erModelRelationshipService", erModelRelationshipService); + setField(factoryWithAnalytics, "dataProductService", dataProductService); + setField(factoryWithAnalytics, "applicationService", applicationService); + setField(factoryWithAnalytics, "formService", formService); + setField(factoryWithAnalytics, "restrictedService", restrictedService); + setField(factoryWithAnalytics, "businessAttributeService", businessAttributeService); + setField(factoryWithAnalytics, "_connectionService", connectionService); + setField(factoryWithAnalytics, "assertionService", assertionService); + setField(factoryWithAnalytics, "isAnalyticsEnabled", true); + + // When + GraphQLEngine engineWithAnalytics = + factoryWithAnalytics.graphQLEngine( + entityClient, systemEntityClient, entityVersioningService, metricUtils); + + // Then + assertNotNull(engineWithAnalytics); + } + + @Test + public void testGraphQLWorkerPoolMetricsRegistration() { + // Then + assertNotNull(graphQLWorkerPool); + } + + @Test + public void testAllServicesAreWired() { + // Verify all required services are injected + assertNotNull(elasticClient); + assertNotNull(indexConvention); + assertNotNull(graphClient); + assertNotNull(entityService); + assertNotNull(entitySearchService); + assertNotNull(graphService); + assertNotNull(siblingGraphService); + assertNotNull(timeseriesAspectService); + assertNotNull(recommendationsService); + assertNotNull(statefulTokenService); + assertNotNull(secretService); + assertNotNull(entityRegistry); + assertNotNull(gitVersion); + assertNotNull(timelineService); + assertNotNull(nativeUserService); + assertNotNull(groupService); + assertNotNull(roleService); + assertNotNull(inviteTokenService); + assertNotNull(postService); + assertNotNull(viewService); + assertNotNull(ownershipTypeService); + assertNotNull(settingsService); + assertNotNull(lineageService); + assertNotNull(queryService); + assertNotNull(erModelRelationshipService); + assertNotNull(dataProductService); + assertNotNull(applicationService); + assertNotNull(formService); + assertNotNull(restrictedService); + assertNotNull(businessAttributeService); + assertNotNull(connectionService); + assertNotNull(assertionService); + assertNotNull(entityClient); + assertNotNull(systemEntityClient); + assertNotNull(entityVersioningService); + assertNotNull(metricUtils); + } + + @Test + public void testGraphQLConcurrencyConfiguration() { + // Test the actual concurrency configuration from the default ConfigurationProvider + var concurrencyConfig = configurationProvider.getGraphQL().getConcurrency(); + assertNotNull(concurrencyConfig); + + // These should have default values + assertNotNull(concurrencyConfig.getCorePoolSize()); + assertNotNull(concurrencyConfig.getMaxPoolSize()); + assertNotNull(concurrencyConfig.getKeepAlive()); + assertNotNull(concurrencyConfig.getStackSize()); + } + + @Test + public void testGraphQLWorkerPoolWithDifferentConfiguration() { + // Test worker pool creation with different configurations + var concurrencyConfig = configurationProvider.getGraphQL().getConcurrency(); + + // Create a new factory to test different scenarios + ExecutorService executorService = graphQLEngineFactory.graphQLWorkerPool(metricUtils); + assertNotNull(executorService); + + ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService; + + // If core pool size is negative, it should use default calculation + if (concurrencyConfig.getCorePoolSize() < 0) { + assertEquals(threadPool.getCorePoolSize(), Runtime.getRuntime().availableProcessors() * 5); + } else { + assertEquals(threadPool.getCorePoolSize(), concurrencyConfig.getCorePoolSize()); + } + + // If max pool size is zero or negative, it should use default calculation + if (concurrencyConfig.getMaxPoolSize() <= 0) { + assertEquals( + threadPool.getMaximumPoolSize(), Runtime.getRuntime().availableProcessors() * 100); + } else { + assertEquals(threadPool.getMaximumPoolSize(), concurrencyConfig.getMaxPoolSize()); + } + + // Cleanup + executorService.shutdown(); + } + + @Test + public void testStsClientCreationHandlesException() { + // The factory should handle StsClient creation exceptions gracefully + // This is tested implicitly by the successful creation of graphQLEngine + assertNotNull(graphQLEngine); + } + + private void setField(Object target, String fieldName, Object value) { + try { + java.lang.reflect.Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } catch (Exception e) { + throw new RuntimeException("Failed to set field: " + fieldName, e); + } + } + + @org.springframework.context.annotation.Configuration + static class TestConfig { + + @Bean + public SpringStandardPluginConfiguration springStandardPluginConfiguration() { + return new SpringStandardPluginConfiguration(); + } + + @Bean("systemOperationContext") + public OperationContext systemOperationContext(MetricUtils metricUtils) { + OperationContext defaultContext = TestOperationContexts.systemContextNoSearchAuthorization(); + return defaultContext.toBuilder() + .systemTelemetryContext( + SystemTelemetryContext.builder() + .metricUtils(metricUtils) + .tracer(mock(Tracer.class)) + .build()) + .build(defaultContext.getSystemActorContext().getAuthentication(), false); + } + + @Bean + public ObjectMapper objectMapper( + @Qualifier("systemOperationContext") OperationContext operationContext) { + return operationContext.getObjectMapper(); + } + } +} diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java index 90a28fb05e..c11c60a51e 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java @@ -6,7 +6,9 @@ import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import lombok.Builder; @@ -36,6 +38,7 @@ public class MetricUtils { @Deprecated public static final String DELIMITER = "_"; private final MeterRegistry registry; + private final Map legacyTimeCache = new ConcurrentHashMap<>(); public Optional getRegistry() { return Optional.ofNullable(registry); @@ -45,12 +48,17 @@ public class MetricUtils { public void time(String dropWizardMetricName, long durationNanos) { getRegistry() .ifPresent( - meterRegistry -> - Timer.builder(dropWizardMetricName) - .tags(DROPWIZARD_METRIC, "true") - .publishPercentiles(0.5, 0.75, 0.95, 0.98, 0.99, 0.999) // Dropwizard defaults - .register(meterRegistry) - .record(durationNanos, TimeUnit.NANOSECONDS)); + meterRegistry -> { + Timer timer = + legacyTimeCache.computeIfAbsent( + dropWizardMetricName, + name -> + Timer.builder(name) + .tags(DROPWIZARD_METRIC, "true") + .publishPercentiles(0.5, 0.75, 0.95, 0.98, 0.99, 0.999) + .register(meterRegistry)); + timer.record(durationNanos, TimeUnit.NANOSECONDS); + }); } @Deprecated diff --git a/metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MetricUtilsTimerCacheTest.java b/metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MetricUtilsTimerCacheTest.java new file mode 100644 index 0000000000..bb19623b80 --- /dev/null +++ b/metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MetricUtilsTimerCacheTest.java @@ -0,0 +1,252 @@ +package com.linkedin.metadata.utils.metrics; + +import static org.testng.Assert.*; + +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.concurrent.TimeUnit; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class MetricUtilsTimerCacheTest { + + private SimpleMeterRegistry meterRegistry; + private MetricUtils metricUtils; + + @BeforeMethod + public void setUp() { + meterRegistry = new SimpleMeterRegistry(); + metricUtils = MetricUtils.builder().registry(meterRegistry).build(); + } + + @AfterMethod + public void tearDown() { + meterRegistry.clear(); + meterRegistry.close(); + } + + @Test + public void testTimerCacheReusesExistingTimer() { + String metricName = "test.cached.timer"; + + // First call - should create new timer + metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(100)); + + // Get the timer instance + Timer firstTimer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true"); + assertNotNull(firstTimer); + assertEquals(firstTimer.count(), 1); + + // Second call - should reuse the same timer + metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(200)); + + // Should be the same timer instance + Timer secondTimer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true"); + assertSame(firstTimer, secondTimer); + assertEquals(secondTimer.count(), 2); + + // Verify total time is sum of both recordings + assertEquals(secondTimer.totalTime(TimeUnit.MILLISECONDS), 300.0, 0.1); + } + + @Test + public void testTimerCacheWithMultipleMetrics() { + String metricName1 = "test.timer1"; + String metricName2 = "test.timer2"; + + // Record to first timer multiple times + metricUtils.time(metricName1, TimeUnit.MILLISECONDS.toNanos(100)); + metricUtils.time(metricName1, TimeUnit.MILLISECONDS.toNanos(150)); + + // Record to second timer + metricUtils.time(metricName2, TimeUnit.MILLISECONDS.toNanos(200)); + + // Verify first timer + Timer timer1 = meterRegistry.timer(metricName1, MetricUtils.DROPWIZARD_METRIC, "true"); + assertEquals(timer1.count(), 2); + assertEquals(timer1.totalTime(TimeUnit.MILLISECONDS), 250.0, 0.1); + + // Verify second timer + Timer timer2 = meterRegistry.timer(metricName2, MetricUtils.DROPWIZARD_METRIC, "true"); + assertEquals(timer2.count(), 1); + assertEquals(timer2.totalTime(TimeUnit.MILLISECONDS), 200.0, 0.1); + + // Verify they are different instances + assertNotSame(timer1, timer2); + } + + @Test + public void testTimerCacheDoesNotDuplicatePercentileMetrics() { + String metricName = "test.percentile.timer"; + + // Record multiple times - should not create duplicate percentile gauges + for (int i = 0; i < 5; i++) { + metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(100 * (i + 1))); + } + + // Count all meters that start with our metric name (timer + percentile gauges) + long meterCount = + meterRegistry.getMeters().stream() + .filter(meter -> meter.getId().getName().startsWith(metricName)) + .count(); + + // Should have 1 timer + 6 percentile gauges (0.5, 0.75, 0.95, 0.98, 0.99, 0.999) + // Without caching, we would have 5 timers + 30 percentile gauges + assertEquals(meterCount, 7); + + // Verify the timer recorded all 5 events + Timer timer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true"); + assertEquals(timer.count(), 5); + } + + @Test + public void testTimerCacheWorksWithConcurrentAccess() throws InterruptedException { + String metricName = "test.concurrent.timer"; + int threadCount = 10; + int recordsPerThread = 100; + + // Create threads that will all try to record to the same timer + Thread[] threads = new Thread[threadCount]; + for (int i = 0; i < threadCount; i++) { + final int threadIndex = i; + threads[i] = + new Thread( + () -> { + for (int j = 0; j < recordsPerThread; j++) { + metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(threadIndex * 10 + j)); + } + }); + } + + // Start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Verify only one timer was created and all recordings were captured + Timer timer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true"); + assertEquals(timer.count(), threadCount * recordsPerThread); + + // Verify no duplicate meters were created + long timerCount = + meterRegistry.getMeters().stream() + .filter(meter -> meter.getId().getName().equals(metricName)) + .filter(meter -> meter instanceof Timer) + .count(); + assertEquals(timerCount, 1); + } + + @Test + public void testTimerCacheHandlesSpecialCharactersInName() { + // Test various metric names that might cause issues + String[] metricNames = { + "test.timer-with-dash", + "test.timer_with_underscore", + "test.timer.with.many.dots", + "test.timer$with$dollar", + "test.timer@with@at" + }; + + for (String metricName : metricNames) { + metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(100)); + metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(200)); + + Timer timer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true"); + assertNotNull(timer, "Timer should exist for metric: " + metricName); + assertEquals(timer.count(), 2, "Timer should have 2 recordings for metric: " + metricName); + } + } + + @Test + public void testTimerCacheWithVeryLongMetricName() { + // Test with a very long metric name + StringBuilder longNameBuilder = new StringBuilder("test.timer"); + for (int i = 0; i < 50; i++) { + longNameBuilder.append(".component").append(i); + } + String longMetricName = longNameBuilder.toString(); + + metricUtils.time(longMetricName, TimeUnit.MILLISECONDS.toNanos(100)); + metricUtils.time(longMetricName, TimeUnit.MILLISECONDS.toNanos(200)); + + Timer timer = meterRegistry.timer(longMetricName, MetricUtils.DROPWIZARD_METRIC, "true"); + assertNotNull(timer); + assertEquals(timer.count(), 2); + } + + @Test + public void testTimerPercentileValuesAreRecorded() { + String metricName = "test.percentile.values.timer"; + + // Record various values to test percentile calculation + long[] durations = {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}; // milliseconds + for (long duration : durations) { + metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(duration)); + } + + // Verify percentile gauges exist + String[] expectedPercentiles = {"0.5", "0.75", "0.95", "0.98", "0.99", "0.999"}; + for (String percentile : expectedPercentiles) { + boolean percentileExists = + meterRegistry.getMeters().stream() + .anyMatch( + meter -> + meter.getId().getName().equals(metricName + ".percentile") + && meter.getId().getTag("phi") != null + && meter.getId().getTag("phi").equals(percentile)); + assertTrue(percentileExists, "Percentile gauge should exist for: " + percentile); + } + } + + @Test + public void testTimerCacheDoesNotLeakMemory() { + // Test that the cache doesn't grow unbounded + int uniqueMetrics = 1000; + + for (int i = 0; i < uniqueMetrics; i++) { + String metricName = "test.timer.memory." + i; + metricUtils.time(metricName, TimeUnit.MILLISECONDS.toNanos(100)); + } + + // Verify we have the expected number of timers + long timerCount = + meterRegistry.getMeters().stream() + .filter(meter -> meter instanceof Timer) + .filter(meter -> meter.getId().getName().startsWith("test.timer.memory.")) + .count(); + + assertEquals(timerCount, uniqueMetrics); + } + + @Test + public void testTimerRecordsZeroDuration() { + String metricName = "test.zero.duration.timer"; + + metricUtils.time(metricName, 0); + + Timer timer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true"); + assertNotNull(timer); + assertEquals(timer.count(), 1); + assertEquals(timer.totalTime(TimeUnit.NANOSECONDS), 0.0); + } + + @Test + public void testTimerRecordsVeryLargeDuration() { + String metricName = "test.large.duration.timer"; + + // Record a very large duration (1 hour in nanoseconds) + long largeNanos = TimeUnit.HOURS.toNanos(1); + metricUtils.time(metricName, largeNanos); + + Timer timer = meterRegistry.timer(metricName, MetricUtils.DROPWIZARD_METRIC, "true"); + assertNotNull(timer); + assertEquals(timer.count(), 1); + assertEquals(timer.totalTime(TimeUnit.HOURS), 1.0, 0.001); + } +}