diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GraphQLEngine.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GraphQLEngine.java index 025403c298..1b0d49b78c 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GraphQLEngine.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GraphQLEngine.java @@ -87,13 +87,9 @@ public class GraphQLEngine { graphQLQueryComplexityLimit, new DataHubFieldComplexityCalculator())); if (metricUtils != null && graphQLConfiguration.getMetrics().isEnabled()) { - metricUtils - .getRegistry() - .ifPresent( - meterRegistry -> - instrumentations.add( - new GraphQLTimingInstrumentation( - meterRegistry, graphQLConfiguration.getMetrics()))); + instrumentations.add( + new GraphQLTimingInstrumentation( + metricUtils.getRegistry(), graphQLConfiguration.getMetrics())); } ChainedInstrumentation chainedInstrumentation = new ChainedInstrumentation(instrumentations); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/GraphQLEngineTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/GraphQLEngineTest.java index 53a54aaf33..f76d2ed10a 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/GraphQLEngineTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/GraphQLEngineTest.java @@ -284,7 +284,7 @@ public class GraphQLEngineTest { public void testMetricsEnabled() { // Setup metrics MeterRegistry meterRegistry = new SimpleMeterRegistry(); - when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry); // Enable metrics in configuration GraphQLConfiguration metricsEnabledConfig = createDefaultConfiguration(); @@ -405,7 +405,7 @@ public class GraphQLEngineTest { metrics.setTrivialDataFetchersEnabled(true); MeterRegistry meterRegistry = new SimpleMeterRegistry(); - when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry); graphQLEngine = GraphQLEngine.builder() @@ -457,7 +457,7 @@ public class GraphQLEngineTest { @Test public void testMetricsWithEmptyRegistry() { // Setup metrics with empty registry - when(mockMetricUtils.getRegistry()).thenReturn(Optional.empty()); + when(mockMetricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry()); GraphQLConfiguration metricsEnabledConfig = createDefaultConfiguration(); metricsEnabledConfig.getMetrics().setEnabled(true); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java index b361280b2a..1fa09e5e42 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java @@ -124,7 +124,7 @@ public class ElasticSearchTimeseriesAspectService new ThreadPoolExecutor.CallerRunsPolicy()); if (metricUtils != null) { MicrometerMetricsRegistry.registerExecutorMetrics( - "timeseries", this.queryPool, metricUtils.getRegistry().orElse(null)); + "timeseries", this.queryPool, metricUtils.getRegistry()); } this.entityRegistry = entityRegistry; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/system_telemetry/GraphQLTimingInstrumentationTest.java b/metadata-io/src/test/java/com/linkedin/metadata/system_telemetry/GraphQLTimingInstrumentationTest.java index d73c69f4cd..368c694b96 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/system_telemetry/GraphQLTimingInstrumentationTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/system_telemetry/GraphQLTimingInstrumentationTest.java @@ -30,7 +30,6 @@ import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.util.Collections; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -97,7 +96,7 @@ public class GraphQLTimingInstrumentationTest { // Setup real GraphQL configuration for GraphQLEngine graphQLConfiguration = createDefaultConfiguration(); - when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry); } @Test diff --git a/metadata-jobs/common/src/main/java/com/linkedin/metadata/kafka/listener/AbstractKafkaListener.java b/metadata-jobs/common/src/main/java/com/linkedin/metadata/kafka/listener/AbstractKafkaListener.java index b8bceaa48a..38ed64ff55 100644 --- a/metadata-jobs/common/src/main/java/com/linkedin/metadata/kafka/listener/AbstractKafkaListener.java +++ b/metadata-jobs/common/src/main/java/com/linkedin/metadata/kafka/listener/AbstractKafkaListener.java @@ -69,17 +69,13 @@ public abstract class AbstractKafkaListener, R> // TODO: include priority level when available metricUtils .getRegistry() - .ifPresent( - meterRegistry -> { - meterRegistry - .timer( - MetricUtils.KAFKA_MESSAGE_QUEUE_TIME, - "topic", - consumerRecord.topic(), - "consumer.group", - consumerGroupId) - .record(Duration.ofMillis(queueTimeMs)); - }); + .timer( + MetricUtils.KAFKA_MESSAGE_QUEUE_TIME, + "topic", + consumerRecord.topic(), + "consumer.group", + consumerGroupId) + .record(Duration.ofMillis(queueTimeMs)); }); final R record = consumerRecord.value(); log.debug( diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java index e8d52decdb..81ee9e894c 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java @@ -76,17 +76,13 @@ public class DataHubUsageEventsProcessor { // TODO: include priority level when available metricUtils .getRegistry() - .ifPresent( - meterRegistry -> { - meterRegistry - .timer( - MetricUtils.KAFKA_MESSAGE_QUEUE_TIME, - "topic", - consumerRecord.topic(), - "consumer.group", - datahubUsageEventConsumerGroupId) - .record(Duration.ofMillis(queueTimeMs)); - }); + .timer( + MetricUtils.KAFKA_MESSAGE_QUEUE_TIME, + "topic", + consumerRecord.topic(), + "consumer.group", + datahubUsageEventConsumerGroupId) + .record(Duration.ofMillis(queueTimeMs)); }); final String record = consumerRecord.value(); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/listener/mcl/MCLKafkaListener.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/listener/mcl/MCLKafkaListener.java index 154c483d21..ec98f05e41 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/listener/mcl/MCLKafkaListener.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/listener/mcl/MCLKafkaListener.java @@ -118,12 +118,8 @@ public class MCLKafkaListener // request metricUtils .getRegistry() - .ifPresent( - meterRegistry -> { - meterRegistry - .timer(MetricUtils.DATAHUB_REQUEST_HOOK_QUEUE_TIME, "hook", hookName) - .record(Duration.ofMillis(queueTimeMs)); - }); + .timer(MetricUtils.DATAHUB_REQUEST_HOOK_QUEUE_TIME, "hook", hookName) + .record(Duration.ofMillis(queueTimeMs)); } }); } diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessorTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessorTest.java index a0afd5a305..9f390a7d5e 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessorTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessorTest.java @@ -49,6 +49,8 @@ public class DataHubUsageEventsProcessorTest { public void setUp() { MockitoAnnotations.openMocks(this); + when(metricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry()); + systemOperationContext = TestOperationContexts.Builder.builder() .systemTelemetryContextSupplier( @@ -65,6 +67,8 @@ public class DataHubUsageEventsProcessorTest { dataHubUsageEventTransformer, systemOperationContext.getSearchContext().getIndexConvention(), systemOperationContext); + // Set the consumer group ID + setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client"); } @Test @@ -346,10 +350,7 @@ public class DataHubUsageEventsProcessorTest { SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); // Configure the mock metricUtils to return the registry - when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); - - // Set the consumer group ID via reflection - setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client"); + when(metricUtils.getRegistry()).thenReturn(meterRegistry); // Setup test data String eventJson = "{\"type\":\"PageViewEvent\",\"timestamp\":1234567890}"; @@ -397,10 +398,7 @@ public class DataHubUsageEventsProcessorTest { SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); // Configure the mock metricUtils to return the registry - when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); - - // Set the consumer group ID - setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client"); + when(metricUtils.getRegistry()).thenReturn(meterRegistry); // Setup test data String eventJson = "{\"type\":\"SearchEvent\"}"; @@ -466,10 +464,7 @@ public class DataHubUsageEventsProcessorTest { SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); // Configure the mock metricUtils to return the registry - when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); - - // Set the consumer group ID - setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client"); + when(metricUtils.getRegistry()).thenReturn(meterRegistry); // Setup test data String eventJson = "{\"invalid\":\"event\"}"; @@ -504,89 +499,13 @@ public class DataHubUsageEventsProcessorTest { verify(elasticsearchConnector, never()).feedElasticEvent(any()); } - @Test - public void testMicrometerMetricsAbsentWhenRegistryNotPresent() { - // Configure the mock metricUtils to return empty Optional (no registry) - when(metricUtils.getRegistry()).thenReturn(Optional.empty()); - - // Set the consumer group ID - setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client"); - - // Setup test data - String eventJson = "{\"type\":\"ViewEvent\"}"; - String transformedDocument = "{\"view\":\"data\"}"; - - when(mockRecord.timestamp()).thenReturn(System.currentTimeMillis() - 1000); - when(mockRecord.value()).thenReturn(eventJson); - - DataHubUsageEventTransformer.TransformedDocument transformedDoc = - new DataHubUsageEventTransformer.TransformedDocument(TEST_EVENT_ID, transformedDocument); - - when(dataHubUsageEventTransformer.transformDataHubUsageEvent(eventJson)) - .thenReturn(Optional.of(transformedDoc)); - - // Execute - should not throw exception - processor.consume(mockRecord); - - // Verify the histogram method was still called (for dropwizard metrics) - verify(metricUtils).histogram(eq(DataHubUsageEventsProcessor.class), eq("kafkaLag"), anyLong()); - - // Verify processing completed successfully despite no registry - verify(elasticsearchConnector).feedElasticEvent(any()); - } - - @Test - public void testMicrometerKafkaQueueTimeWithCustomConsumerGroup() { - // Setup a real MeterRegistry - SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); - - // Configure the mock metricUtils to return the registry - when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); - - // Set a custom consumer group ID - String customConsumerGroup = "custom-usage-event-consumer"; - setConsumerGroupId(processor, customConsumerGroup); - - // Setup test data - String eventJson = "{\"type\":\"Event\"}"; - String transformedDocument = "{\"data\":\"value\"}"; - - when(mockRecord.timestamp()).thenReturn(System.currentTimeMillis() - 1500); - when(mockRecord.topic()).thenReturn("DataHubUsageEvent_v1"); - when(mockRecord.value()).thenReturn(eventJson); - - DataHubUsageEventTransformer.TransformedDocument transformedDoc = - new DataHubUsageEventTransformer.TransformedDocument(TEST_EVENT_ID, transformedDocument); - - when(dataHubUsageEventTransformer.transformDataHubUsageEvent(eventJson)) - .thenReturn(Optional.of(transformedDoc)); - - // Execute - processor.consume(mockRecord); - - // Verify timer was recorded with custom consumer group - Timer timer = - meterRegistry.timer( - MetricUtils.KAFKA_MESSAGE_QUEUE_TIME, - "topic", - "DataHubUsageEvent_v1", - "consumer.group", - customConsumerGroup); - - assertNotNull(timer); - assertEquals(timer.count(), 1); - } - @Test public void testMicrometerKafkaQueueTimeAccuracy() { // Setup a real MeterRegistry SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); // Configure the mock metricUtils to return the registry - when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); - - // Set the consumer group ID - setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client"); + when(metricUtils.getRegistry()).thenReturn(meterRegistry); // Setup test data String eventJson = "{\"type\":\"Event\"}"; diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/listener/mcl/MCLKafkaListenerTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/listener/mcl/MCLKafkaListenerTest.java index 1730f9039f..1c505bf14f 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/listener/mcl/MCLKafkaListenerTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/listener/mcl/MCLKafkaListenerTest.java @@ -64,7 +64,7 @@ public class MCLKafkaListenerTest { mockSystemMetadata = spy(SystemMetadataUtils.createDefaultSystemMetadata()); meterRegistry = new SimpleMeterRegistry(); - when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + when(metricUtils.getRegistry()).thenReturn(meterRegistry); systemOperationContext = TestOperationContexts.Builder.builder() diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java index cfa24b6491..b9c0c45c67 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java @@ -93,17 +93,13 @@ public class MetadataChangeProposalsProcessor { // TODO: include priority level when available metricUtils .getRegistry() - .ifPresent( - meterRegistry -> { - meterRegistry - .timer( - MetricUtils.KAFKA_MESSAGE_QUEUE_TIME, - "topic", - consumerRecord.topic(), - "consumer.group", - mceConsumerGroupId) - .record(Duration.ofMillis(queueTimeMs)); - }); + .timer( + MetricUtils.KAFKA_MESSAGE_QUEUE_TIME, + "topic", + consumerRecord.topic(), + "consumer.group", + mceConsumerGroupId) + .record(Duration.ofMillis(queueTimeMs)); }); final GenericRecord record = consumerRecord.value(); diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java index 6ea230b9ba..a69dfab174 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java @@ -95,17 +95,13 @@ public class BatchMetadataChangeProposalsProcessor { // TODO: include priority level when available metricUtils .getRegistry() - .ifPresent( - meterRegistry -> { - meterRegistry - .timer( - MetricUtils.KAFKA_MESSAGE_QUEUE_TIME, - "topic", - consumerRecord.topic(), - "consumer.group", - mceConsumerGroupId) - .record(Duration.ofMillis(queueTimeMs)); - }); + .timer( + MetricUtils.KAFKA_MESSAGE_QUEUE_TIME, + "topic", + consumerRecord.topic(), + "consumer.group", + mceConsumerGroupId) + .record(Duration.ofMillis(queueTimeMs)); }); final GenericRecord record = consumerRecord.value(); diff --git a/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessorTest.java b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessorTest.java index 67438211a1..dbf5a2e808 100644 --- a/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessorTest.java +++ b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessorTest.java @@ -1,7 +1,6 @@ package com.linkedin.metadata.kafka; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; @@ -41,6 +40,7 @@ import com.linkedin.mxe.Topics; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.metadata.context.SystemTelemetryContext; import io.datahubproject.test.metadata.context.TestOperationContexts; +import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.opentelemetry.api.trace.Span; @@ -48,7 +48,6 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import java.io.IOException; import java.util.List; -import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -100,18 +99,18 @@ public class MetadataChangeProposalsProcessorTest { @Mock private Span mockSpan; - @Mock private MetricUtils metricUtils; + private MetricUtils metricUtils; private AutoCloseable mocks; private MockedStatic spanMock; - private MockedStatic metricUtilsMock; private MockedStatic eventUtilsMock; @BeforeMethod public void setup() { mocks = MockitoAnnotations.openMocks(this); + metricUtils = MetricUtils.builder().registry(new SimpleMeterRegistry()).build(); opContext = opContext.toBuilder() .systemTelemetryContext( @@ -160,11 +159,6 @@ public class MetadataChangeProposalsProcessorTest { spanMock = mockStatic(Span.class); spanMock.when(Span::current).thenReturn(mockSpan); - metricUtilsMock = mockStatic(MetricUtils.class); - metricUtilsMock - .when(() -> MetricUtils.name(eq(MetadataChangeProposalsProcessor.class), any())) - .thenReturn("metricName"); - eventUtilsMock = mockStatic(EventUtils.class); // Setup consumer record mock @@ -185,11 +179,6 @@ public class MetadataChangeProposalsProcessorTest { spanMock = null; // Set to null after closing } - if (metricUtilsMock != null) { - metricUtilsMock.close(); - metricUtilsMock = null; // Set to null after closing - } - if (eventUtilsMock != null) { eventUtilsMock.close(); eventUtilsMock = null; // Set to null after closing @@ -344,11 +333,7 @@ public class MetadataChangeProposalsProcessorTest { @Test public void testMicrometerKafkaQueueTimeMetric() throws Exception { - // Setup a real MeterRegistry - SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); - - // Configure the mock metricUtils to return the registry - when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + MeterRegistry meterRegistry = metricUtils.getRegistry(); // Set timestamp to simulate queue time long messageTimestamp = System.currentTimeMillis() - 3000; // 3 seconds ago @@ -376,21 +361,13 @@ public class MetadataChangeProposalsProcessorTest { assertTrue(timer.totalTime(TimeUnit.MILLISECONDS) >= 2500); // At least 2.5 seconds assertTrue(timer.totalTime(TimeUnit.MILLISECONDS) <= 3500); // At most 3.5 seconds - // Verify the histogram method was called - verify(metricUtils) - .histogram(eq(MetadataChangeProposalsProcessor.class), eq("kafkaLag"), anyLong()); - // Verify successful processing verify(mockEntityService).ingestProposal(eq(opContext), any(), eq(false)); } @Test public void testMicrometerKafkaQueueTimeWithDifferentTopics() throws Exception { - // Setup a real MeterRegistry - SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); - - // Configure the mock metricUtils to return the registry - when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + MeterRegistry meterRegistry = metricUtils.getRegistry(); // Create MCP MetadataChangeProposal mcp = createSimpleMCP(); @@ -448,11 +425,7 @@ public class MetadataChangeProposalsProcessorTest { @Test public void testMicrometerMetricsWithProcessingFailure() throws Exception { - // Setup a real MeterRegistry - SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); - - // Configure the mock metricUtils to return the registry - when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + MeterRegistry meterRegistry = metricUtils.getRegistry(); // Create MCP that will fail MetadataChangeProposal mcp = new MetadataChangeProposal(); @@ -492,9 +465,6 @@ public class MetadataChangeProposalsProcessorTest { @Test public void testMicrometerMetricsAbsentWhenRegistryNotPresent() throws Exception { - // Configure the mock metricUtils to return empty Optional (no registry) - when(metricUtils.getRegistry()).thenReturn(Optional.empty()); - // Create MCP MetadataChangeProposal mcp = createSimpleMCP(); eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord)).thenReturn(mcp); @@ -504,21 +474,13 @@ public class MetadataChangeProposalsProcessorTest { // Execute - should not throw exception processor.consume(mockConsumerRecord); - // Verify the histogram method was still called (for dropwizard metrics) - verify(metricUtils) - .histogram(eq(MetadataChangeProposalsProcessor.class), eq("kafkaLag"), anyLong()); - // Verify processing completed successfully despite no registry verify(mockEntityService).ingestProposal(eq(opContext), any(), eq(false)); } @Test public void testMicrometerKafkaQueueTimeAccuracy() throws Exception { - // Setup a real MeterRegistry - SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); - - // Configure the mock metricUtils to return the registry - when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + MeterRegistry meterRegistry = metricUtils.getRegistry(); // Create MCP MetadataChangeProposal mcp = createSimpleMCP(); @@ -562,10 +524,6 @@ public class MetadataChangeProposalsProcessorTest { // Verify max recorded time assertTrue(timer.max(TimeUnit.MILLISECONDS) >= 4500); assertTrue(timer.max(TimeUnit.MILLISECONDS) <= 5500); - - // Verify histogram was called for each record - verify(metricUtils, times(queueTimes.length)) - .histogram(eq(MetadataChangeProposalsProcessor.class), eq("kafkaLag"), anyLong()); } @Test @@ -598,10 +556,6 @@ public class MetadataChangeProposalsProcessorTest { // Verify processing completed successfully verify(mockEntityService).ingestProposal(eq(opContextNoMetrics), any(), eq(false)); - - // Verify metricUtils methods were never called since it's not present in context - verify(metricUtils, never()).histogram(any(), any(), anyLong()); - verify(metricUtils, never()).getRegistry(); } // Helper method diff --git a/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java index eb1d26e064..1cfa2221da 100644 --- a/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java +++ b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java @@ -557,6 +557,7 @@ public class BatchMetadataChangeProposalsProcessorTest { public void testConsumeWithTelemetryMetrics() throws Exception { // Mock the metric utils MetricUtils mockMetricUtils = mock(MetricUtils.class); + when(mockMetricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry()); // Create a mock operation context OperationContext opContextWithMetrics = spy(opContext); @@ -856,12 +857,12 @@ public class BatchMetadataChangeProposalsProcessorTest { @Test public void testMicrometerMetricsAbsentWhenRegistryNotPresent() throws Exception { - // Create MetricUtils without a registry - MetricUtils metricUtilsNoRegistry = MetricUtils.builder().registry(null).build(); + // With the new non-null contract, we can't create MetricUtils with null registry + // Instead, test the case where MetricUtils is absent from OperationContext - // Create operation context with metric utils that has no registry + // Create operation context without metric utils OperationContext opContextNoRegistry = mock(OperationContext.class); - when(opContextNoRegistry.getMetricUtils()).thenReturn(Optional.of(metricUtilsNoRegistry)); + when(opContextNoRegistry.getMetricUtils()).thenReturn(Optional.empty()); // Mock withQueueSpan doAnswer( diff --git a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java index 245851e594..faebe30fff 100644 --- a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java +++ b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java @@ -82,17 +82,13 @@ public class PlatformEventProcessor { // TODO: include priority level when available metricUtils .getRegistry() - .ifPresent( - meterRegistry -> { - meterRegistry - .timer( - MetricUtils.KAFKA_MESSAGE_QUEUE_TIME, - "topic", - consumerRecord.topic(), - "consumer.group", - datahubPlatformEventConsumerGroupId) - .record(Duration.ofMillis(queueTimeMs)); - }); + .timer( + MetricUtils.KAFKA_MESSAGE_QUEUE_TIME, + "topic", + consumerRecord.topic(), + "consumer.group", + datahubPlatformEventConsumerGroupId) + .record(Duration.ofMillis(queueTimeMs)); }); final GenericRecord record = consumerRecord.value(); log.info( diff --git a/metadata-jobs/pe-consumer/src/test/java/com/datahub/event/PlatformEventProcessorTest.java b/metadata-jobs/pe-consumer/src/test/java/com/datahub/event/PlatformEventProcessorTest.java index ac802469a4..3210c1f86a 100644 --- a/metadata-jobs/pe-consumer/src/test/java/com/datahub/event/PlatformEventProcessorTest.java +++ b/metadata-jobs/pe-consumer/src/test/java/com/datahub/event/PlatformEventProcessorTest.java @@ -62,6 +62,7 @@ public class PlatformEventProcessorTest { // Create mock MetricUtils mockMetricUtils = mock(MetricUtils.class); + when(mockMetricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry()); when(mockOperationContext.getMetricUtils()).thenReturn(Optional.of(mockMetricUtils)); // Create mock hooks @@ -133,6 +134,8 @@ public class PlatformEventProcessorTest { // Setup List hooks = Arrays.asList(mockHook1, mockHook2); processor = new PlatformEventProcessor(mockOperationContext, hooks); + // Set the consumer group ID + setConsumerGroupId(processor, "generic-platform-event-job-client"); try (MockedStatic mockedEventUtils = Mockito.mockStatic(EventUtils.class)) { mockedEventUtils @@ -161,6 +164,8 @@ public class PlatformEventProcessorTest { // Setup List hooks = Arrays.asList(mockHook1); processor = new PlatformEventProcessor(mockOperationContext, hooks); + // Set the consumer group ID + setConsumerGroupId(processor, "generic-platform-event-job-client"); try (MockedStatic mockedEventUtils = Mockito.mockStatic(EventUtils.class)) { mockedEventUtils @@ -189,6 +194,8 @@ public class PlatformEventProcessorTest { // Setup List hooks = Arrays.asList(mockHook1, mockHook2); processor = new PlatformEventProcessor(mockOperationContext, hooks); + // Set the consumer group ID + setConsumerGroupId(processor, "generic-platform-event-job-client"); // Make first hook throw exception doThrow(new RuntimeException("Hook 1 failed")) @@ -224,6 +231,8 @@ public class PlatformEventProcessorTest { // Setup List hooks = Arrays.asList(mockHook1, mockHook2); processor = new PlatformEventProcessor(mockOperationContext, hooks); + // Set the consumer group ID + setConsumerGroupId(processor, "generic-platform-event-job-client"); // Make both hooks throw exceptions doThrow(new RuntimeException("Hook 1 failed")) @@ -283,6 +292,8 @@ public class PlatformEventProcessorTest { // Setup List hooks = Arrays.asList(mockHook1); processor = new PlatformEventProcessor(mockOperationContext, hooks); + // Set the consumer group ID + setConsumerGroupId(processor, "generic-platform-event-job-client"); // Set up specific consumer record values String expectedKey = "test-key-123"; @@ -321,7 +332,8 @@ public class PlatformEventProcessorTest { // Verify that the consumer record methods were called // Note: some methods may be called multiple times (e.g., for logging and metrics) verify(specificMockRecord, times(1)).key(); - verify(specificMockRecord, times(1)).topic(); + verify(specificMockRecord, times(2)) + .topic(); // Called twice: once for metrics, once for logging verify(specificMockRecord, times(1)).partition(); verify(specificMockRecord, times(1)).offset(); verify(specificMockRecord, times(2)) @@ -336,6 +348,8 @@ public class PlatformEventProcessorTest { // Setup List hooks = Arrays.asList(mockHook1); processor = new PlatformEventProcessor(mockOperationContext, hooks); + // Set the consumer group ID + setConsumerGroupId(processor, "generic-platform-event-job-client"); PlatformEvent event1 = mock(PlatformEvent.class); when(event1.getName()).thenReturn("Event1"); @@ -389,6 +403,8 @@ public class PlatformEventProcessorTest { // Setup List hooks = Arrays.asList(mockHook1); processor = new PlatformEventProcessor(mockOperationContext, hooks); + // Set the consumer group ID + setConsumerGroupId(processor, "generic-platform-event-job-client"); ConsumerRecord nullValueRecord = mock(ConsumerRecord.class); when(nullValueRecord.value()).thenReturn(null); @@ -422,7 +438,7 @@ public class PlatformEventProcessorTest { SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); // Configure the mock metricUtils to return the registry - when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry); // Set the consumer group ID via reflection setConsumerGroupId(processor, "generic-platform-event-job-client"); @@ -469,7 +485,7 @@ public class PlatformEventProcessorTest { SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); // Configure the mock metricUtils to return the registry - when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry); // Set the consumer group ID setConsumerGroupId(processor, "generic-platform-event-job-client"); @@ -534,7 +550,7 @@ public class PlatformEventProcessorTest { SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); // Configure the mock metricUtils to return the registry - when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry); // Set the consumer group ID setConsumerGroupId(processor, "generic-platform-event-job-client"); @@ -578,7 +594,7 @@ public class PlatformEventProcessorTest { @Test public void testMicrometerMetricsAbsentWhenRegistryNotPresent() throws Exception { // Configure the mock metricUtils to return empty Optional (no registry) - when(mockMetricUtils.getRegistry()).thenReturn(Optional.empty()); + when(mockMetricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry()); // Set the consumer group ID setConsumerGroupId(processor, "generic-platform-event-job-client"); @@ -608,7 +624,7 @@ public class PlatformEventProcessorTest { SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); // Configure the mock metricUtils to return the registry - when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry); // Set a custom consumer group ID String customConsumerGroup = "custom-platform-event-consumer"; @@ -645,7 +661,7 @@ public class PlatformEventProcessorTest { SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); // Configure the mock metricUtils to return the registry - when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry); // Set the consumer group ID setConsumerGroupId(processor, "generic-platform-event-job-client"); @@ -705,7 +721,7 @@ public class PlatformEventProcessorTest { SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); // Configure the mock metricUtils to return the registry - when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry)); + when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry); // Set the consumer group ID setConsumerGroupId(processor, "generic-platform-event-job-client"); diff --git a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java index 9d4ffaa746..a9748cfa93 100644 --- a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java +++ b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java @@ -275,11 +275,11 @@ public class RequestContext implements ContextInterface { metricUtils.incrementMicrometer( MetricUtils.DATAHUB_REQUEST_COUNT, 1, - "user.category", + "user_category", userCategory, - "agent.class", + "agent_class", agentClass, - "request.api", + "request_api", requestAPI); } } diff --git a/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/RequestContextTest.java b/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/RequestContextTest.java index e5c512d484..fa5eb929be 100644 --- a/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/RequestContextTest.java +++ b/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/RequestContextTest.java @@ -278,13 +278,13 @@ public class RequestContextTest { verify(mockMetricUtils, atLeastOnce()) .incrementMicrometer( - eq("datahub.request.count"), + eq(MetricUtils.DATAHUB_REQUEST_COUNT), eq(1.0d), - eq("user.category"), + eq("user_category"), eq("system"), - eq("agent.class"), + eq("agent_class"), eq("unknown"), - eq("request.api"), + eq("request_api"), eq("restli")); } @@ -303,13 +303,13 @@ public class RequestContextTest { verify(mockMetricUtils, atLeastOnce()) .incrementMicrometer( - eq("datahub.request.count"), + eq(MetricUtils.DATAHUB_REQUEST_COUNT), eq(1.0d), - eq("user.category"), + eq("user_category"), eq("admin"), - eq("agent.class"), + eq("agent_class"), eq("unknown"), - eq("request.api"), + eq("request_api"), eq("restli")); } @@ -328,13 +328,13 @@ public class RequestContextTest { verify(mockMetricUtils, atLeastOnce()) .incrementMicrometer( - eq("datahub.request.count"), + eq(MetricUtils.DATAHUB_REQUEST_COUNT), eq(1.0d), - eq("user.category"), + eq("user_category"), eq("regular"), - eq("agent.class"), + eq("agent_class"), eq("unknown"), - eq("request.api"), + eq("request_api"), eq("restli")); } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactory.java index c89165538b..bbf292e98b 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactory.java @@ -317,7 +317,7 @@ public class GraphQLEngineFactory { GraphQLConcurrencyUtils.setExecutorService(graphQLWorkerPool); if (metricUtils != null) { MicrometerMetricsRegistry.registerExecutorMetrics( - "graphql", graphqlExecutorService, metricUtils.getRegistry().orElse(null)); + "graphql", graphqlExecutorService, metricUtils.getRegistry()); } return graphQLWorkerPool; diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/trace/KafkaTraceReaderFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/trace/KafkaTraceReaderFactory.java index 9c7e06adc2..3bdde3cfa2 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/trace/KafkaTraceReaderFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/trace/KafkaTraceReaderFactory.java @@ -86,7 +86,7 @@ public class KafkaTraceReaderFactory { traceExecutorService = Executors.newFixedThreadPool(threadPoolSize); if (metricUtils != null) { MicrometerMetricsRegistry.registerExecutorMetrics( - "api-trace", this.traceExecutorService, metricUtils.getRegistry().orElse(null)); + "api-trace", this.traceExecutorService, metricUtils.getRegistry()); } return traceExecutorService; } diff --git a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactoryTest.java b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactoryTest.java index 1c279aab23..52694caddb 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactoryTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactoryTest.java @@ -41,6 +41,7 @@ import io.datahubproject.metadata.context.SystemTelemetryContext; import io.datahubproject.metadata.services.RestrictedService; import io.datahubproject.metadata.services.SecretService; import io.datahubproject.test.metadata.context.TestOperationContexts; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.opentelemetry.api.trace.Tracer; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -261,7 +262,7 @@ public class GraphQLEngineFactoryTest extends AbstractTestNGSpringContextTests { public void setUp() { // Set up default mock behaviors when(graphService.supportsMultiHop()).thenReturn(true); - when(metricUtils.getRegistry()).thenReturn(java.util.Optional.empty()); + when(metricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry()); } @Test diff --git a/metadata-service/restli-client-api/src/main/java/com/linkedin/common/client/ClientCache.java b/metadata-service/restli-client-api/src/main/java/com/linkedin/common/client/ClientCache.java index 0a4eb26474..5f139a7c6a 100644 --- a/metadata-service/restli-client-api/src/main/java/com/linkedin/common/client/ClientCache.java +++ b/metadata-service/restli-client-api/src/main/java/com/linkedin/common/client/ClientCache.java @@ -109,7 +109,7 @@ public class ClientCache { if (config.isStatsEnabled() && metricUtils != null) { MicrometerMetricsRegistry.registerCacheMetrics( - config.getName(), cache, metricUtils.getRegistry().orElse(null)); + config.getName(), cache, metricUtils.getRegistry()); } return new ClientCache<>(config, cache, loadFunction, weigher, ttlSecondsFunction); diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java index a5c86f7a96..2215c40765 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java @@ -145,7 +145,7 @@ public class RestliEntityClient extends BaseClient implements EntityClient { new ThreadPoolExecutor.CallerRunsPolicy()); if (metricUtils != null) { MicrometerMetricsRegistry.registerExecutorMetrics( - "entity-client-get", this.batchGetV2Pool, metricUtils.getRegistry().orElse(null)); + "entity-client-get", this.batchGetV2Pool, metricUtils.getRegistry()); } this.batchIngestPool = new ThreadPoolExecutor( @@ -158,7 +158,7 @@ public class RestliEntityClient extends BaseClient implements EntityClient { new ThreadPoolExecutor.CallerRunsPolicy()); if (metricUtils != null) { MicrometerMetricsRegistry.registerExecutorMetrics( - "entity-client-ingest", this.batchIngestPool, metricUtils.getRegistry().orElse(null)); + "entity-client-ingest", this.batchIngestPool, metricUtils.getRegistry()); } } diff --git a/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java b/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java index a964d65667..a0ee015628 100644 --- a/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java +++ b/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java @@ -63,6 +63,7 @@ import com.linkedin.restli.client.RestLiResponseException; import com.linkedin.restli.client.response.BatchKVResponse; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collections; @@ -98,7 +99,7 @@ public class BaseClientTest { // Setup MetricUtils mock mockMetricUtils = mock(MetricUtils.class); - when(mockMetricUtils.getRegistry()).thenReturn(Optional.empty()); + when(mockMetricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry()); } @Test diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java index aa06096912..3db72f72d7 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java @@ -7,11 +7,12 @@ import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import lombok.Builder; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -21,10 +22,10 @@ public class MetricUtils { public static final String DROPWIZARD_METRIC = "dwizMetric"; public static final String DROPWIZARD_NAME = "dwizName"; - /* Micrometer */ + /* Micrometer. See https://prometheus.io/docs/practices/naming/ */ public static final String KAFKA_MESSAGE_QUEUE_TIME = "kafka.message.queue.time"; public static final String DATAHUB_REQUEST_HOOK_QUEUE_TIME = "datahub.request.hook.queue.time"; - public static final String DATAHUB_REQUEST_COUNT = "datahub.request.count"; + public static final String DATAHUB_REQUEST_COUNT = "datahub_request_count"; /* OpenTelemetry */ public static final String CACHE_HIT_ATTR = "cache.hit"; @@ -42,7 +43,7 @@ public class MetricUtils { @Deprecated public static final String DELIMITER = "_"; - private final MeterRegistry registry; + @Builder.Default @NonNull private final MeterRegistry registry = new CompositeMeterRegistry(); private static final Map legacyTimeCache = new ConcurrentHashMap<>(); private static final Map legacyCounterCache = new ConcurrentHashMap<>(); private static final Map legacyHistogramCache = @@ -52,24 +53,17 @@ public class MetricUtils { // For state-based gauges (like throttled state) private static final Map gaugeStates = new ConcurrentHashMap<>(); - public Optional getRegistry() { - return Optional.ofNullable(registry); + public MeterRegistry getRegistry() { + return registry; } @Deprecated public void time(String dropWizardMetricName, long durationNanos) { - getRegistry() - .ifPresent( - meterRegistry -> { - Timer timer = - legacyTimeCache.computeIfAbsent( - dropWizardMetricName, - name -> - Timer.builder(name) - .tags(DROPWIZARD_METRIC, "true") - .register(meterRegistry)); - timer.record(durationNanos, TimeUnit.NANOSECONDS); - }); + Timer timer = + legacyTimeCache.computeIfAbsent( + dropWizardMetricName, + name -> Timer.builder(name).tags(DROPWIZARD_METRIC, "true").register(registry)); + timer.record(durationNanos, TimeUnit.NANOSECONDS); } @Deprecated @@ -90,18 +84,14 @@ public class MetricUtils { @Deprecated public void increment(String metricName, double increment) { - getRegistry() - .ifPresent( - meterRegistry -> { - Counter counter = - legacyCounterCache.computeIfAbsent( - metricName, - name -> - Counter.builder(MetricRegistry.name(name)) - .tag(DROPWIZARD_METRIC, "true") - .register(meterRegistry)); - counter.increment(increment); - }); + Counter counter = + legacyCounterCache.computeIfAbsent( + metricName, + name -> + Counter.builder(MetricRegistry.name(name)) + .tag(DROPWIZARD_METRIC, "true") + .register(registry)); + counter.increment(increment); } /** @@ -112,16 +102,11 @@ public class MetricUtils { * @param tags The tags to associate with the metric (can be empty) */ public void incrementMicrometer(String metricName, double increment, String... tags) { - getRegistry() - .ifPresent( - meterRegistry -> { - // Create a cache key that includes both metric name and tags - String cacheKey = createCacheKey(metricName, tags); - Counter counter = - micrometerCounterCache.computeIfAbsent( - cacheKey, key -> meterRegistry.counter(metricName, tags)); - counter.increment(increment); - }); + // Create a cache key that includes both metric name and tags + String cacheKey = createCacheKey(metricName, tags); + Counter counter = + micrometerCounterCache.computeIfAbsent(cacheKey, key -> registry.counter(metricName, tags)); + counter.increment(increment); } /** @@ -167,40 +152,30 @@ public class MetricUtils { public void setGaugeValue(Class clazz, String metricName, double value) { String name = MetricRegistry.name(clazz, metricName); - getRegistry() - .ifPresent( - meterRegistry -> { - // Get or create the state holder - AtomicDouble state = gaugeStates.computeIfAbsent(name, k -> new AtomicDouble(0)); + // Get or create the state holder + AtomicDouble state = gaugeStates.computeIfAbsent(name, k -> new AtomicDouble(0)); - // Register the gauge if not already registered - legacyGaugeCache.computeIfAbsent( - name, - key -> - Gauge.builder(key, state, AtomicDouble::get) - .tag(DROPWIZARD_METRIC, "true") - .register(meterRegistry)); + // Register the gauge if not already registered + legacyGaugeCache.computeIfAbsent( + name, + key -> + Gauge.builder(key, state, AtomicDouble::get) + .tag(DROPWIZARD_METRIC, "true") + .register(registry)); - // Update the value - state.set(value); - }); + // Update the value + state.set(value); } @Deprecated public void histogram(Class clazz, String metricName, long value) { - getRegistry() - .ifPresent( - meterRegistry -> { - String name = MetricRegistry.name(clazz, metricName); - DistributionSummary summary = - legacyHistogramCache.computeIfAbsent( - name, - key -> - DistributionSummary.builder(key) - .tag(DROPWIZARD_METRIC, "true") - .register(meterRegistry)); - summary.record(value); - }); + String name = MetricRegistry.name(clazz, metricName); + DistributionSummary summary = + legacyHistogramCache.computeIfAbsent( + name, + key -> + DistributionSummary.builder(key).tag(DROPWIZARD_METRIC, "true").register(registry)); + summary.record(value); } @Deprecated diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MicrometerMetricsRegistry.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MicrometerMetricsRegistry.java index 059cc79634..d1ea3c1511 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MicrometerMetricsRegistry.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MicrometerMetricsRegistry.java @@ -27,11 +27,7 @@ public class MicrometerMetricsRegistry { public static synchronized boolean registerCacheMetrics( @Nonnull String cacheName, @Nonnull Object nativeCache, - @Nullable MeterRegistry meterRegistry) { - - if (cacheName == null || nativeCache == null || meterRegistry == null) { - return false; - } + @Nonnull MeterRegistry meterRegistry) { if (!GLOBALLY_REGISTERED_CACHES.add(cacheName)) { return false; diff --git a/metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MetricUtilsTest.java b/metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MetricUtilsTest.java index 20cff77b2f..6f0fb39f3f 100644 --- a/metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MetricUtilsTest.java +++ b/metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MetricUtilsTest.java @@ -10,7 +10,6 @@ import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.mockito.Mock; @@ -44,18 +43,25 @@ public class MetricUtilsTest { } @Test - public void testGetRegistryReturnsOptionalWithRegistry() { - Optional registry = metricUtils.getRegistry(); - assertTrue(registry.isPresent()); - assertSame(registry.get(), meterRegistry); + public void testGetRegistryReturnsRegistry() { + MeterRegistry registry = metricUtils.getRegistry(); + assertNotNull(registry); + assertSame(registry, meterRegistry); } @Test - public void testGetRegistryReturnsEmptyOptionalWhenNull() { - MetricUtils utilsWithNullRegistry = MetricUtils.builder().registry(null).build(); + public void testGetRegistryReturnsDefaultWhenNotSpecified() { + MetricUtils utilsWithDefaultRegistry = MetricUtils.builder().build(); - Optional registry = utilsWithNullRegistry.getRegistry(); - assertFalse(registry.isPresent()); + MeterRegistry registry = utilsWithDefaultRegistry.getRegistry(); + assertNotNull(registry); + assertTrue(registry instanceof io.micrometer.core.instrument.composite.CompositeMeterRegistry); + } + + @Test(expectedExceptions = NullPointerException.class) + public void testBuilderRejectsNullRegistry() { + // This should throw NullPointerException due to @NonNull annotation + MetricUtils.builder().registry(null).build(); } @Test @@ -72,11 +78,11 @@ public class MetricUtilsTest { } @Test - public void testTimeWithNullRegistryDoesNothing() { - MetricUtils utilsWithNullRegistry = MetricUtils.builder().registry(null).build(); + public void testTimeWithDefaultRegistryWorks() { + MetricUtils utilsWithDefaultRegistry = MetricUtils.builder().build(); - // Should not throw exception - utilsWithNullRegistry.time("test.timer", 1000); + // Should not throw exception and should work with default registry + utilsWithDefaultRegistry.time("test.timer", 1000); } @Test @@ -217,16 +223,21 @@ public class MetricUtilsTest { } @Test - public void testAllMethodsWithNullRegistry() { - MetricUtils utilsWithNullRegistry = MetricUtils.builder().registry(null).build(); + public void testAllMethodsWithDefaultRegistry() { + MetricUtils utilsWithDefaultRegistry = MetricUtils.builder().build(); - // None of these should throw exceptions - utilsWithNullRegistry.time("timer", 1000); - utilsWithNullRegistry.increment(this.getClass(), "counter", 1); - utilsWithNullRegistry.increment("counter", 1); - utilsWithNullRegistry.exceptionIncrement(this.getClass(), "error", new RuntimeException()); - utilsWithNullRegistry.setGaugeValue(this.getClass(), "gauge", 42); - utilsWithNullRegistry.histogram(this.getClass(), "histogram", 100); + // All methods should work with the default CompositeMeterRegistry + utilsWithDefaultRegistry.time("timer", 1000); + utilsWithDefaultRegistry.increment(this.getClass(), "counter", 1); + utilsWithDefaultRegistry.increment("counter", 1); + utilsWithDefaultRegistry.exceptionIncrement(this.getClass(), "error", new RuntimeException()); + utilsWithDefaultRegistry.setGaugeValue(this.getClass(), "gauge", 42); + utilsWithDefaultRegistry.histogram(this.getClass(), "histogram", 100); + + // Verify the registry is the expected type + assertTrue( + utilsWithDefaultRegistry.getRegistry() + instanceof io.micrometer.core.instrument.composite.CompositeMeterRegistry); } @Test diff --git a/metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MicrometerMetricsRegistryTest.java b/metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MicrometerMetricsRegistryTest.java index 51be77a6cf..43ba3bbe99 100644 --- a/metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MicrometerMetricsRegistryTest.java +++ b/metadata-utils/src/test/java/com/linkedin/metadata/utils/metrics/MicrometerMetricsRegistryTest.java @@ -155,32 +155,6 @@ public class MicrometerMetricsRegistryTest { assertFalse(result); } - @Test - public void testRegisterCacheWithNullNativeCache() { - // Given - String cacheName = "testCache"; - - // When - boolean result = MicrometerMetricsRegistry.registerCacheMetrics(cacheName, null, meterRegistry); - - // Then - assertFalse(result); - } - - @Test - public void testRegisterCacheWithNullCacheName() { - // Given - Cache caffeineCache = - Caffeine.newBuilder().maximumSize(100).recordStats().build(); - - // When - boolean result = - MicrometerMetricsRegistry.registerCacheMetrics(null, caffeineCache, meterRegistry); - - // Then - assertFalse(result); // Should return false for null cache name - } - @Test public void testRegisterCacheWithUnsupportedCacheType() { // Given @@ -352,30 +326,6 @@ public class MicrometerMetricsRegistryTest { assertEquals(registeredKey, cacheName); } - @Test - public void testAllNullParameters() { - // When - boolean result = MicrometerMetricsRegistry.registerCacheMetrics(null, null, null); - - // Then - assertFalse(result); - } - - @Test - public void testNullCheckOrder() { - // Test various combinations of null parameters - - // Null cache name - Cache caffeineCache = Caffeine.newBuilder().maximumSize(100).build(); - assertFalse(MicrometerMetricsRegistry.registerCacheMetrics(null, caffeineCache, meterRegistry)); - - // Null native cache - assertFalse(MicrometerMetricsRegistry.registerCacheMetrics("test", null, meterRegistry)); - - // Null meter registry - assertFalse(MicrometerMetricsRegistry.registerCacheMetrics("test", caffeineCache, null)); - } - @Test public void testSuccessfulRegistrationDoesNotThrowOnSubsequentAttempts() { // Given