refactor(metrics): Make MetricUtils.registry non-nullable

- Make MetricUtils.registry non-nullable with default no-op implementation. This allows us to remove boilerplate for handling the null case
- Rename request context metric names to follow convention
This commit is contained in:
Abe 2025-09-09 11:08:16 -07:00 committed by GitHub
parent 4ea758da19
commit 6ec6f0150d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 191 additions and 396 deletions

View File

@ -87,13 +87,9 @@ public class GraphQLEngine {
graphQLQueryComplexityLimit, new DataHubFieldComplexityCalculator()));
if (metricUtils != null && graphQLConfiguration.getMetrics().isEnabled()) {
metricUtils
.getRegistry()
.ifPresent(
meterRegistry ->
instrumentations.add(
new GraphQLTimingInstrumentation(
meterRegistry, graphQLConfiguration.getMetrics())));
instrumentations.add(
new GraphQLTimingInstrumentation(
metricUtils.getRegistry(), graphQLConfiguration.getMetrics()));
}
ChainedInstrumentation chainedInstrumentation = new ChainedInstrumentation(instrumentations);

View File

@ -284,7 +284,7 @@ public class GraphQLEngineTest {
public void testMetricsEnabled() {
// Setup metrics
MeterRegistry meterRegistry = new SimpleMeterRegistry();
when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry);
// Enable metrics in configuration
GraphQLConfiguration metricsEnabledConfig = createDefaultConfiguration();
@ -405,7 +405,7 @@ public class GraphQLEngineTest {
metrics.setTrivialDataFetchersEnabled(true);
MeterRegistry meterRegistry = new SimpleMeterRegistry();
when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry);
graphQLEngine =
GraphQLEngine.builder()
@ -457,7 +457,7 @@ public class GraphQLEngineTest {
@Test
public void testMetricsWithEmptyRegistry() {
// Setup metrics with empty registry
when(mockMetricUtils.getRegistry()).thenReturn(Optional.empty());
when(mockMetricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry());
GraphQLConfiguration metricsEnabledConfig = createDefaultConfiguration();
metricsEnabledConfig.getMetrics().setEnabled(true);

View File

@ -124,7 +124,7 @@ public class ElasticSearchTimeseriesAspectService
new ThreadPoolExecutor.CallerRunsPolicy());
if (metricUtils != null) {
MicrometerMetricsRegistry.registerExecutorMetrics(
"timeseries", this.queryPool, metricUtils.getRegistry().orElse(null));
"timeseries", this.queryPool, metricUtils.getRegistry());
}
this.entityRegistry = entityRegistry;

View File

@ -30,7 +30,6 @@ import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@ -97,7 +96,7 @@ public class GraphQLTimingInstrumentationTest {
// Setup real GraphQL configuration for GraphQLEngine
graphQLConfiguration = createDefaultConfiguration();
when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry);
}
@Test

View File

@ -69,17 +69,13 @@ public abstract class AbstractKafkaListener<E, H extends EventHook<E>, R>
// TODO: include priority level when available
metricUtils
.getRegistry()
.ifPresent(
meterRegistry -> {
meterRegistry
.timer(
MetricUtils.KAFKA_MESSAGE_QUEUE_TIME,
"topic",
consumerRecord.topic(),
"consumer.group",
consumerGroupId)
.record(Duration.ofMillis(queueTimeMs));
});
.timer(
MetricUtils.KAFKA_MESSAGE_QUEUE_TIME,
"topic",
consumerRecord.topic(),
"consumer.group",
consumerGroupId)
.record(Duration.ofMillis(queueTimeMs));
});
final R record = consumerRecord.value();
log.debug(

View File

@ -76,17 +76,13 @@ public class DataHubUsageEventsProcessor {
// TODO: include priority level when available
metricUtils
.getRegistry()
.ifPresent(
meterRegistry -> {
meterRegistry
.timer(
MetricUtils.KAFKA_MESSAGE_QUEUE_TIME,
"topic",
consumerRecord.topic(),
"consumer.group",
datahubUsageEventConsumerGroupId)
.record(Duration.ofMillis(queueTimeMs));
});
.timer(
MetricUtils.KAFKA_MESSAGE_QUEUE_TIME,
"topic",
consumerRecord.topic(),
"consumer.group",
datahubUsageEventConsumerGroupId)
.record(Duration.ofMillis(queueTimeMs));
});
final String record = consumerRecord.value();

View File

@ -118,12 +118,8 @@ public class MCLKafkaListener
// request
metricUtils
.getRegistry()
.ifPresent(
meterRegistry -> {
meterRegistry
.timer(MetricUtils.DATAHUB_REQUEST_HOOK_QUEUE_TIME, "hook", hookName)
.record(Duration.ofMillis(queueTimeMs));
});
.timer(MetricUtils.DATAHUB_REQUEST_HOOK_QUEUE_TIME, "hook", hookName)
.record(Duration.ofMillis(queueTimeMs));
}
});
}

View File

@ -49,6 +49,8 @@ public class DataHubUsageEventsProcessorTest {
public void setUp() {
MockitoAnnotations.openMocks(this);
when(metricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry());
systemOperationContext =
TestOperationContexts.Builder.builder()
.systemTelemetryContextSupplier(
@ -65,6 +67,8 @@ public class DataHubUsageEventsProcessorTest {
dataHubUsageEventTransformer,
systemOperationContext.getSearchContext().getIndexConvention(),
systemOperationContext);
// Set the consumer group ID
setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client");
}
@Test
@ -346,10 +350,7 @@ public class DataHubUsageEventsProcessorTest {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
// Set the consumer group ID via reflection
setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client");
when(metricUtils.getRegistry()).thenReturn(meterRegistry);
// Setup test data
String eventJson = "{\"type\":\"PageViewEvent\",\"timestamp\":1234567890}";
@ -397,10 +398,7 @@ public class DataHubUsageEventsProcessorTest {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
// Set the consumer group ID
setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client");
when(metricUtils.getRegistry()).thenReturn(meterRegistry);
// Setup test data
String eventJson = "{\"type\":\"SearchEvent\"}";
@ -466,10 +464,7 @@ public class DataHubUsageEventsProcessorTest {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
// Set the consumer group ID
setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client");
when(metricUtils.getRegistry()).thenReturn(meterRegistry);
// Setup test data
String eventJson = "{\"invalid\":\"event\"}";
@ -504,89 +499,13 @@ public class DataHubUsageEventsProcessorTest {
verify(elasticsearchConnector, never()).feedElasticEvent(any());
}
@Test
public void testMicrometerMetricsAbsentWhenRegistryNotPresent() {
// Configure the mock metricUtils to return empty Optional (no registry)
when(metricUtils.getRegistry()).thenReturn(Optional.empty());
// Set the consumer group ID
setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client");
// Setup test data
String eventJson = "{\"type\":\"ViewEvent\"}";
String transformedDocument = "{\"view\":\"data\"}";
when(mockRecord.timestamp()).thenReturn(System.currentTimeMillis() - 1000);
when(mockRecord.value()).thenReturn(eventJson);
DataHubUsageEventTransformer.TransformedDocument transformedDoc =
new DataHubUsageEventTransformer.TransformedDocument(TEST_EVENT_ID, transformedDocument);
when(dataHubUsageEventTransformer.transformDataHubUsageEvent(eventJson))
.thenReturn(Optional.of(transformedDoc));
// Execute - should not throw exception
processor.consume(mockRecord);
// Verify the histogram method was still called (for dropwizard metrics)
verify(metricUtils).histogram(eq(DataHubUsageEventsProcessor.class), eq("kafkaLag"), anyLong());
// Verify processing completed successfully despite no registry
verify(elasticsearchConnector).feedElasticEvent(any());
}
@Test
public void testMicrometerKafkaQueueTimeWithCustomConsumerGroup() {
// Setup a real MeterRegistry
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
// Set a custom consumer group ID
String customConsumerGroup = "custom-usage-event-consumer";
setConsumerGroupId(processor, customConsumerGroup);
// Setup test data
String eventJson = "{\"type\":\"Event\"}";
String transformedDocument = "{\"data\":\"value\"}";
when(mockRecord.timestamp()).thenReturn(System.currentTimeMillis() - 1500);
when(mockRecord.topic()).thenReturn("DataHubUsageEvent_v1");
when(mockRecord.value()).thenReturn(eventJson);
DataHubUsageEventTransformer.TransformedDocument transformedDoc =
new DataHubUsageEventTransformer.TransformedDocument(TEST_EVENT_ID, transformedDocument);
when(dataHubUsageEventTransformer.transformDataHubUsageEvent(eventJson))
.thenReturn(Optional.of(transformedDoc));
// Execute
processor.consume(mockRecord);
// Verify timer was recorded with custom consumer group
Timer timer =
meterRegistry.timer(
MetricUtils.KAFKA_MESSAGE_QUEUE_TIME,
"topic",
"DataHubUsageEvent_v1",
"consumer.group",
customConsumerGroup);
assertNotNull(timer);
assertEquals(timer.count(), 1);
}
@Test
public void testMicrometerKafkaQueueTimeAccuracy() {
// Setup a real MeterRegistry
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
// Set the consumer group ID
setConsumerGroupId(processor, "datahub-usage-event-consumer-job-client");
when(metricUtils.getRegistry()).thenReturn(meterRegistry);
// Setup test data
String eventJson = "{\"type\":\"Event\"}";

View File

@ -64,7 +64,7 @@ public class MCLKafkaListenerTest {
mockSystemMetadata = spy(SystemMetadataUtils.createDefaultSystemMetadata());
meterRegistry = new SimpleMeterRegistry();
when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
when(metricUtils.getRegistry()).thenReturn(meterRegistry);
systemOperationContext =
TestOperationContexts.Builder.builder()

View File

@ -93,17 +93,13 @@ public class MetadataChangeProposalsProcessor {
// TODO: include priority level when available
metricUtils
.getRegistry()
.ifPresent(
meterRegistry -> {
meterRegistry
.timer(
MetricUtils.KAFKA_MESSAGE_QUEUE_TIME,
"topic",
consumerRecord.topic(),
"consumer.group",
mceConsumerGroupId)
.record(Duration.ofMillis(queueTimeMs));
});
.timer(
MetricUtils.KAFKA_MESSAGE_QUEUE_TIME,
"topic",
consumerRecord.topic(),
"consumer.group",
mceConsumerGroupId)
.record(Duration.ofMillis(queueTimeMs));
});
final GenericRecord record = consumerRecord.value();

View File

@ -95,17 +95,13 @@ public class BatchMetadataChangeProposalsProcessor {
// TODO: include priority level when available
metricUtils
.getRegistry()
.ifPresent(
meterRegistry -> {
meterRegistry
.timer(
MetricUtils.KAFKA_MESSAGE_QUEUE_TIME,
"topic",
consumerRecord.topic(),
"consumer.group",
mceConsumerGroupId)
.record(Duration.ofMillis(queueTimeMs));
});
.timer(
MetricUtils.KAFKA_MESSAGE_QUEUE_TIME,
"topic",
consumerRecord.topic(),
"consumer.group",
mceConsumerGroupId)
.record(Duration.ofMillis(queueTimeMs));
});
final GenericRecord record = consumerRecord.value();

View File

@ -1,7 +1,6 @@
package com.linkedin.metadata.kafka;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
@ -41,6 +40,7 @@ import com.linkedin.mxe.Topics;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.SystemTelemetryContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.opentelemetry.api.trace.Span;
@ -48,7 +48,6 @@ import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -100,18 +99,18 @@ public class MetadataChangeProposalsProcessorTest {
@Mock private Span mockSpan;
@Mock private MetricUtils metricUtils;
private MetricUtils metricUtils;
private AutoCloseable mocks;
private MockedStatic<Span> spanMock;
private MockedStatic<MetricUtils> metricUtilsMock;
private MockedStatic<EventUtils> eventUtilsMock;
@BeforeMethod
public void setup() {
mocks = MockitoAnnotations.openMocks(this);
metricUtils = MetricUtils.builder().registry(new SimpleMeterRegistry()).build();
opContext =
opContext.toBuilder()
.systemTelemetryContext(
@ -160,11 +159,6 @@ public class MetadataChangeProposalsProcessorTest {
spanMock = mockStatic(Span.class);
spanMock.when(Span::current).thenReturn(mockSpan);
metricUtilsMock = mockStatic(MetricUtils.class);
metricUtilsMock
.when(() -> MetricUtils.name(eq(MetadataChangeProposalsProcessor.class), any()))
.thenReturn("metricName");
eventUtilsMock = mockStatic(EventUtils.class);
// Setup consumer record mock
@ -185,11 +179,6 @@ public class MetadataChangeProposalsProcessorTest {
spanMock = null; // Set to null after closing
}
if (metricUtilsMock != null) {
metricUtilsMock.close();
metricUtilsMock = null; // Set to null after closing
}
if (eventUtilsMock != null) {
eventUtilsMock.close();
eventUtilsMock = null; // Set to null after closing
@ -344,11 +333,7 @@ public class MetadataChangeProposalsProcessorTest {
@Test
public void testMicrometerKafkaQueueTimeMetric() throws Exception {
// Setup a real MeterRegistry
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
MeterRegistry meterRegistry = metricUtils.getRegistry();
// Set timestamp to simulate queue time
long messageTimestamp = System.currentTimeMillis() - 3000; // 3 seconds ago
@ -376,21 +361,13 @@ public class MetadataChangeProposalsProcessorTest {
assertTrue(timer.totalTime(TimeUnit.MILLISECONDS) >= 2500); // At least 2.5 seconds
assertTrue(timer.totalTime(TimeUnit.MILLISECONDS) <= 3500); // At most 3.5 seconds
// Verify the histogram method was called
verify(metricUtils)
.histogram(eq(MetadataChangeProposalsProcessor.class), eq("kafkaLag"), anyLong());
// Verify successful processing
verify(mockEntityService).ingestProposal(eq(opContext), any(), eq(false));
}
@Test
public void testMicrometerKafkaQueueTimeWithDifferentTopics() throws Exception {
// Setup a real MeterRegistry
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
MeterRegistry meterRegistry = metricUtils.getRegistry();
// Create MCP
MetadataChangeProposal mcp = createSimpleMCP();
@ -448,11 +425,7 @@ public class MetadataChangeProposalsProcessorTest {
@Test
public void testMicrometerMetricsWithProcessingFailure() throws Exception {
// Setup a real MeterRegistry
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
MeterRegistry meterRegistry = metricUtils.getRegistry();
// Create MCP that will fail
MetadataChangeProposal mcp = new MetadataChangeProposal();
@ -492,9 +465,6 @@ public class MetadataChangeProposalsProcessorTest {
@Test
public void testMicrometerMetricsAbsentWhenRegistryNotPresent() throws Exception {
// Configure the mock metricUtils to return empty Optional (no registry)
when(metricUtils.getRegistry()).thenReturn(Optional.empty());
// Create MCP
MetadataChangeProposal mcp = createSimpleMCP();
eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord)).thenReturn(mcp);
@ -504,21 +474,13 @@ public class MetadataChangeProposalsProcessorTest {
// Execute - should not throw exception
processor.consume(mockConsumerRecord);
// Verify the histogram method was still called (for dropwizard metrics)
verify(metricUtils)
.histogram(eq(MetadataChangeProposalsProcessor.class), eq("kafkaLag"), anyLong());
// Verify processing completed successfully despite no registry
verify(mockEntityService).ingestProposal(eq(opContext), any(), eq(false));
}
@Test
public void testMicrometerKafkaQueueTimeAccuracy() throws Exception {
// Setup a real MeterRegistry
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(metricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
MeterRegistry meterRegistry = metricUtils.getRegistry();
// Create MCP
MetadataChangeProposal mcp = createSimpleMCP();
@ -562,10 +524,6 @@ public class MetadataChangeProposalsProcessorTest {
// Verify max recorded time
assertTrue(timer.max(TimeUnit.MILLISECONDS) >= 4500);
assertTrue(timer.max(TimeUnit.MILLISECONDS) <= 5500);
// Verify histogram was called for each record
verify(metricUtils, times(queueTimes.length))
.histogram(eq(MetadataChangeProposalsProcessor.class), eq("kafkaLag"), anyLong());
}
@Test
@ -598,10 +556,6 @@ public class MetadataChangeProposalsProcessorTest {
// Verify processing completed successfully
verify(mockEntityService).ingestProposal(eq(opContextNoMetrics), any(), eq(false));
// Verify metricUtils methods were never called since it's not present in context
verify(metricUtils, never()).histogram(any(), any(), anyLong());
verify(metricUtils, never()).getRegistry();
}
// Helper method

View File

@ -557,6 +557,7 @@ public class BatchMetadataChangeProposalsProcessorTest {
public void testConsumeWithTelemetryMetrics() throws Exception {
// Mock the metric utils
MetricUtils mockMetricUtils = mock(MetricUtils.class);
when(mockMetricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry());
// Create a mock operation context
OperationContext opContextWithMetrics = spy(opContext);
@ -856,12 +857,12 @@ public class BatchMetadataChangeProposalsProcessorTest {
@Test
public void testMicrometerMetricsAbsentWhenRegistryNotPresent() throws Exception {
// Create MetricUtils without a registry
MetricUtils metricUtilsNoRegistry = MetricUtils.builder().registry(null).build();
// With the new non-null contract, we can't create MetricUtils with null registry
// Instead, test the case where MetricUtils is absent from OperationContext
// Create operation context with metric utils that has no registry
// Create operation context without metric utils
OperationContext opContextNoRegistry = mock(OperationContext.class);
when(opContextNoRegistry.getMetricUtils()).thenReturn(Optional.of(metricUtilsNoRegistry));
when(opContextNoRegistry.getMetricUtils()).thenReturn(Optional.empty());
// Mock withQueueSpan
doAnswer(

View File

@ -82,17 +82,13 @@ public class PlatformEventProcessor {
// TODO: include priority level when available
metricUtils
.getRegistry()
.ifPresent(
meterRegistry -> {
meterRegistry
.timer(
MetricUtils.KAFKA_MESSAGE_QUEUE_TIME,
"topic",
consumerRecord.topic(),
"consumer.group",
datahubPlatformEventConsumerGroupId)
.record(Duration.ofMillis(queueTimeMs));
});
.timer(
MetricUtils.KAFKA_MESSAGE_QUEUE_TIME,
"topic",
consumerRecord.topic(),
"consumer.group",
datahubPlatformEventConsumerGroupId)
.record(Duration.ofMillis(queueTimeMs));
});
final GenericRecord record = consumerRecord.value();
log.info(

View File

@ -62,6 +62,7 @@ public class PlatformEventProcessorTest {
// Create mock MetricUtils
mockMetricUtils = mock(MetricUtils.class);
when(mockMetricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry());
when(mockOperationContext.getMetricUtils()).thenReturn(Optional.of(mockMetricUtils));
// Create mock hooks
@ -133,6 +134,8 @@ public class PlatformEventProcessorTest {
// Setup
List<PlatformEventHook> hooks = Arrays.asList(mockHook1, mockHook2);
processor = new PlatformEventProcessor(mockOperationContext, hooks);
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");
try (MockedStatic<EventUtils> mockedEventUtils = Mockito.mockStatic(EventUtils.class)) {
mockedEventUtils
@ -161,6 +164,8 @@ public class PlatformEventProcessorTest {
// Setup
List<PlatformEventHook> hooks = Arrays.asList(mockHook1);
processor = new PlatformEventProcessor(mockOperationContext, hooks);
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");
try (MockedStatic<EventUtils> mockedEventUtils = Mockito.mockStatic(EventUtils.class)) {
mockedEventUtils
@ -189,6 +194,8 @@ public class PlatformEventProcessorTest {
// Setup
List<PlatformEventHook> hooks = Arrays.asList(mockHook1, mockHook2);
processor = new PlatformEventProcessor(mockOperationContext, hooks);
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");
// Make first hook throw exception
doThrow(new RuntimeException("Hook 1 failed"))
@ -224,6 +231,8 @@ public class PlatformEventProcessorTest {
// Setup
List<PlatformEventHook> hooks = Arrays.asList(mockHook1, mockHook2);
processor = new PlatformEventProcessor(mockOperationContext, hooks);
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");
// Make both hooks throw exceptions
doThrow(new RuntimeException("Hook 1 failed"))
@ -283,6 +292,8 @@ public class PlatformEventProcessorTest {
// Setup
List<PlatformEventHook> hooks = Arrays.asList(mockHook1);
processor = new PlatformEventProcessor(mockOperationContext, hooks);
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");
// Set up specific consumer record values
String expectedKey = "test-key-123";
@ -321,7 +332,8 @@ public class PlatformEventProcessorTest {
// Verify that the consumer record methods were called
// Note: some methods may be called multiple times (e.g., for logging and metrics)
verify(specificMockRecord, times(1)).key();
verify(specificMockRecord, times(1)).topic();
verify(specificMockRecord, times(2))
.topic(); // Called twice: once for metrics, once for logging
verify(specificMockRecord, times(1)).partition();
verify(specificMockRecord, times(1)).offset();
verify(specificMockRecord, times(2))
@ -336,6 +348,8 @@ public class PlatformEventProcessorTest {
// Setup
List<PlatformEventHook> hooks = Arrays.asList(mockHook1);
processor = new PlatformEventProcessor(mockOperationContext, hooks);
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");
PlatformEvent event1 = mock(PlatformEvent.class);
when(event1.getName()).thenReturn("Event1");
@ -389,6 +403,8 @@ public class PlatformEventProcessorTest {
// Setup
List<PlatformEventHook> hooks = Arrays.asList(mockHook1);
processor = new PlatformEventProcessor(mockOperationContext, hooks);
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");
ConsumerRecord<String, GenericRecord> nullValueRecord = mock(ConsumerRecord.class);
when(nullValueRecord.value()).thenReturn(null);
@ -422,7 +438,7 @@ public class PlatformEventProcessorTest {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry);
// Set the consumer group ID via reflection
setConsumerGroupId(processor, "generic-platform-event-job-client");
@ -469,7 +485,7 @@ public class PlatformEventProcessorTest {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry);
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");
@ -534,7 +550,7 @@ public class PlatformEventProcessorTest {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry);
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");
@ -578,7 +594,7 @@ public class PlatformEventProcessorTest {
@Test
public void testMicrometerMetricsAbsentWhenRegistryNotPresent() throws Exception {
// Configure the mock metricUtils to return empty Optional (no registry)
when(mockMetricUtils.getRegistry()).thenReturn(Optional.empty());
when(mockMetricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry());
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");
@ -608,7 +624,7 @@ public class PlatformEventProcessorTest {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry);
// Set a custom consumer group ID
String customConsumerGroup = "custom-platform-event-consumer";
@ -645,7 +661,7 @@ public class PlatformEventProcessorTest {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry);
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");
@ -705,7 +721,7 @@ public class PlatformEventProcessorTest {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
// Configure the mock metricUtils to return the registry
when(mockMetricUtils.getRegistry()).thenReturn(Optional.of(meterRegistry));
when(mockMetricUtils.getRegistry()).thenReturn(meterRegistry);
// Set the consumer group ID
setConsumerGroupId(processor, "generic-platform-event-job-client");

View File

@ -275,11 +275,11 @@ public class RequestContext implements ContextInterface {
metricUtils.incrementMicrometer(
MetricUtils.DATAHUB_REQUEST_COUNT,
1,
"user.category",
"user_category",
userCategory,
"agent.class",
"agent_class",
agentClass,
"request.api",
"request_api",
requestAPI);
}
}

View File

@ -278,13 +278,13 @@ public class RequestContextTest {
verify(mockMetricUtils, atLeastOnce())
.incrementMicrometer(
eq("datahub.request.count"),
eq(MetricUtils.DATAHUB_REQUEST_COUNT),
eq(1.0d),
eq("user.category"),
eq("user_category"),
eq("system"),
eq("agent.class"),
eq("agent_class"),
eq("unknown"),
eq("request.api"),
eq("request_api"),
eq("restli"));
}
@ -303,13 +303,13 @@ public class RequestContextTest {
verify(mockMetricUtils, atLeastOnce())
.incrementMicrometer(
eq("datahub.request.count"),
eq(MetricUtils.DATAHUB_REQUEST_COUNT),
eq(1.0d),
eq("user.category"),
eq("user_category"),
eq("admin"),
eq("agent.class"),
eq("agent_class"),
eq("unknown"),
eq("request.api"),
eq("request_api"),
eq("restli"));
}
@ -328,13 +328,13 @@ public class RequestContextTest {
verify(mockMetricUtils, atLeastOnce())
.incrementMicrometer(
eq("datahub.request.count"),
eq(MetricUtils.DATAHUB_REQUEST_COUNT),
eq(1.0d),
eq("user.category"),
eq("user_category"),
eq("regular"),
eq("agent.class"),
eq("agent_class"),
eq("unknown"),
eq("request.api"),
eq("request_api"),
eq("restli"));
}

View File

@ -317,7 +317,7 @@ public class GraphQLEngineFactory {
GraphQLConcurrencyUtils.setExecutorService(graphQLWorkerPool);
if (metricUtils != null) {
MicrometerMetricsRegistry.registerExecutorMetrics(
"graphql", graphqlExecutorService, metricUtils.getRegistry().orElse(null));
"graphql", graphqlExecutorService, metricUtils.getRegistry());
}
return graphQLWorkerPool;

View File

@ -86,7 +86,7 @@ public class KafkaTraceReaderFactory {
traceExecutorService = Executors.newFixedThreadPool(threadPoolSize);
if (metricUtils != null) {
MicrometerMetricsRegistry.registerExecutorMetrics(
"api-trace", this.traceExecutorService, metricUtils.getRegistry().orElse(null));
"api-trace", this.traceExecutorService, metricUtils.getRegistry());
}
return traceExecutorService;
}

View File

@ -41,6 +41,7 @@ import io.datahubproject.metadata.context.SystemTelemetryContext;
import io.datahubproject.metadata.services.RestrictedService;
import io.datahubproject.metadata.services.SecretService;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.opentelemetry.api.trace.Tracer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@ -261,7 +262,7 @@ public class GraphQLEngineFactoryTest extends AbstractTestNGSpringContextTests {
public void setUp() {
// Set up default mock behaviors
when(graphService.supportsMultiHop()).thenReturn(true);
when(metricUtils.getRegistry()).thenReturn(java.util.Optional.empty());
when(metricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry());
}
@Test

View File

@ -109,7 +109,7 @@ public class ClientCache<K, V, C extends ClientCacheConfig> {
if (config.isStatsEnabled() && metricUtils != null) {
MicrometerMetricsRegistry.registerCacheMetrics(
config.getName(), cache, metricUtils.getRegistry().orElse(null));
config.getName(), cache, metricUtils.getRegistry());
}
return new ClientCache<>(config, cache, loadFunction, weigher, ttlSecondsFunction);

View File

@ -145,7 +145,7 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
new ThreadPoolExecutor.CallerRunsPolicy());
if (metricUtils != null) {
MicrometerMetricsRegistry.registerExecutorMetrics(
"entity-client-get", this.batchGetV2Pool, metricUtils.getRegistry().orElse(null));
"entity-client-get", this.batchGetV2Pool, metricUtils.getRegistry());
}
this.batchIngestPool =
new ThreadPoolExecutor(
@ -158,7 +158,7 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
new ThreadPoolExecutor.CallerRunsPolicy());
if (metricUtils != null) {
MicrometerMetricsRegistry.registerExecutorMetrics(
"entity-client-ingest", this.batchIngestPool, metricUtils.getRegistry().orElse(null));
"entity-client-ingest", this.batchIngestPool, metricUtils.getRegistry());
}
}

View File

@ -63,6 +63,7 @@ import com.linkedin.restli.client.RestLiResponseException;
import com.linkedin.restli.client.response.BatchKVResponse;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
@ -98,7 +99,7 @@ public class BaseClientTest {
// Setup MetricUtils mock
mockMetricUtils = mock(MetricUtils.class);
when(mockMetricUtils.getRegistry()).thenReturn(Optional.empty());
when(mockMetricUtils.getRegistry()).thenReturn(new SimpleMeterRegistry());
}
@Test

View File

@ -7,11 +7,12 @@ import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.Builder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ -21,10 +22,10 @@ public class MetricUtils {
public static final String DROPWIZARD_METRIC = "dwizMetric";
public static final String DROPWIZARD_NAME = "dwizName";
/* Micrometer */
/* Micrometer. See https://prometheus.io/docs/practices/naming/ */
public static final String KAFKA_MESSAGE_QUEUE_TIME = "kafka.message.queue.time";
public static final String DATAHUB_REQUEST_HOOK_QUEUE_TIME = "datahub.request.hook.queue.time";
public static final String DATAHUB_REQUEST_COUNT = "datahub.request.count";
public static final String DATAHUB_REQUEST_COUNT = "datahub_request_count";
/* OpenTelemetry */
public static final String CACHE_HIT_ATTR = "cache.hit";
@ -42,7 +43,7 @@ public class MetricUtils {
@Deprecated public static final String DELIMITER = "_";
private final MeterRegistry registry;
@Builder.Default @NonNull private final MeterRegistry registry = new CompositeMeterRegistry();
private static final Map<String, Timer> legacyTimeCache = new ConcurrentHashMap<>();
private static final Map<String, Counter> legacyCounterCache = new ConcurrentHashMap<>();
private static final Map<String, DistributionSummary> legacyHistogramCache =
@ -52,24 +53,17 @@ public class MetricUtils {
// For state-based gauges (like throttled state)
private static final Map<String, AtomicDouble> gaugeStates = new ConcurrentHashMap<>();
public Optional<MeterRegistry> getRegistry() {
return Optional.ofNullable(registry);
public MeterRegistry getRegistry() {
return registry;
}
@Deprecated
public void time(String dropWizardMetricName, long durationNanos) {
getRegistry()
.ifPresent(
meterRegistry -> {
Timer timer =
legacyTimeCache.computeIfAbsent(
dropWizardMetricName,
name ->
Timer.builder(name)
.tags(DROPWIZARD_METRIC, "true")
.register(meterRegistry));
timer.record(durationNanos, TimeUnit.NANOSECONDS);
});
Timer timer =
legacyTimeCache.computeIfAbsent(
dropWizardMetricName,
name -> Timer.builder(name).tags(DROPWIZARD_METRIC, "true").register(registry));
timer.record(durationNanos, TimeUnit.NANOSECONDS);
}
@Deprecated
@ -90,18 +84,14 @@ public class MetricUtils {
@Deprecated
public void increment(String metricName, double increment) {
getRegistry()
.ifPresent(
meterRegistry -> {
Counter counter =
legacyCounterCache.computeIfAbsent(
metricName,
name ->
Counter.builder(MetricRegistry.name(name))
.tag(DROPWIZARD_METRIC, "true")
.register(meterRegistry));
counter.increment(increment);
});
Counter counter =
legacyCounterCache.computeIfAbsent(
metricName,
name ->
Counter.builder(MetricRegistry.name(name))
.tag(DROPWIZARD_METRIC, "true")
.register(registry));
counter.increment(increment);
}
/**
@ -112,16 +102,11 @@ public class MetricUtils {
* @param tags The tags to associate with the metric (can be empty)
*/
public void incrementMicrometer(String metricName, double increment, String... tags) {
getRegistry()
.ifPresent(
meterRegistry -> {
// Create a cache key that includes both metric name and tags
String cacheKey = createCacheKey(metricName, tags);
Counter counter =
micrometerCounterCache.computeIfAbsent(
cacheKey, key -> meterRegistry.counter(metricName, tags));
counter.increment(increment);
});
// Create a cache key that includes both metric name and tags
String cacheKey = createCacheKey(metricName, tags);
Counter counter =
micrometerCounterCache.computeIfAbsent(cacheKey, key -> registry.counter(metricName, tags));
counter.increment(increment);
}
/**
@ -167,40 +152,30 @@ public class MetricUtils {
public void setGaugeValue(Class<?> clazz, String metricName, double value) {
String name = MetricRegistry.name(clazz, metricName);
getRegistry()
.ifPresent(
meterRegistry -> {
// Get or create the state holder
AtomicDouble state = gaugeStates.computeIfAbsent(name, k -> new AtomicDouble(0));
// Get or create the state holder
AtomicDouble state = gaugeStates.computeIfAbsent(name, k -> new AtomicDouble(0));
// Register the gauge if not already registered
legacyGaugeCache.computeIfAbsent(
name,
key ->
Gauge.builder(key, state, AtomicDouble::get)
.tag(DROPWIZARD_METRIC, "true")
.register(meterRegistry));
// Register the gauge if not already registered
legacyGaugeCache.computeIfAbsent(
name,
key ->
Gauge.builder(key, state, AtomicDouble::get)
.tag(DROPWIZARD_METRIC, "true")
.register(registry));
// Update the value
state.set(value);
});
// Update the value
state.set(value);
}
@Deprecated
public void histogram(Class<?> clazz, String metricName, long value) {
getRegistry()
.ifPresent(
meterRegistry -> {
String name = MetricRegistry.name(clazz, metricName);
DistributionSummary summary =
legacyHistogramCache.computeIfAbsent(
name,
key ->
DistributionSummary.builder(key)
.tag(DROPWIZARD_METRIC, "true")
.register(meterRegistry));
summary.record(value);
});
String name = MetricRegistry.name(clazz, metricName);
DistributionSummary summary =
legacyHistogramCache.computeIfAbsent(
name,
key ->
DistributionSummary.builder(key).tag(DROPWIZARD_METRIC, "true").register(registry));
summary.record(value);
}
@Deprecated

View File

@ -27,11 +27,7 @@ public class MicrometerMetricsRegistry {
public static synchronized boolean registerCacheMetrics(
@Nonnull String cacheName,
@Nonnull Object nativeCache,
@Nullable MeterRegistry meterRegistry) {
if (cacheName == null || nativeCache == null || meterRegistry == null) {
return false;
}
@Nonnull MeterRegistry meterRegistry) {
if (!GLOBALLY_REGISTERED_CACHES.add(cacheName)) {
return false;

View File

@ -10,7 +10,6 @@ import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.mockito.Mock;
@ -44,18 +43,25 @@ public class MetricUtilsTest {
}
@Test
public void testGetRegistryReturnsOptionalWithRegistry() {
Optional<MeterRegistry> registry = metricUtils.getRegistry();
assertTrue(registry.isPresent());
assertSame(registry.get(), meterRegistry);
public void testGetRegistryReturnsRegistry() {
MeterRegistry registry = metricUtils.getRegistry();
assertNotNull(registry);
assertSame(registry, meterRegistry);
}
@Test
public void testGetRegistryReturnsEmptyOptionalWhenNull() {
MetricUtils utilsWithNullRegistry = MetricUtils.builder().registry(null).build();
public void testGetRegistryReturnsDefaultWhenNotSpecified() {
MetricUtils utilsWithDefaultRegistry = MetricUtils.builder().build();
Optional<MeterRegistry> registry = utilsWithNullRegistry.getRegistry();
assertFalse(registry.isPresent());
MeterRegistry registry = utilsWithDefaultRegistry.getRegistry();
assertNotNull(registry);
assertTrue(registry instanceof io.micrometer.core.instrument.composite.CompositeMeterRegistry);
}
@Test(expectedExceptions = NullPointerException.class)
public void testBuilderRejectsNullRegistry() {
// This should throw NullPointerException due to @NonNull annotation
MetricUtils.builder().registry(null).build();
}
@Test
@ -72,11 +78,11 @@ public class MetricUtilsTest {
}
@Test
public void testTimeWithNullRegistryDoesNothing() {
MetricUtils utilsWithNullRegistry = MetricUtils.builder().registry(null).build();
public void testTimeWithDefaultRegistryWorks() {
MetricUtils utilsWithDefaultRegistry = MetricUtils.builder().build();
// Should not throw exception
utilsWithNullRegistry.time("test.timer", 1000);
// Should not throw exception and should work with default registry
utilsWithDefaultRegistry.time("test.timer", 1000);
}
@Test
@ -217,16 +223,21 @@ public class MetricUtilsTest {
}
@Test
public void testAllMethodsWithNullRegistry() {
MetricUtils utilsWithNullRegistry = MetricUtils.builder().registry(null).build();
public void testAllMethodsWithDefaultRegistry() {
MetricUtils utilsWithDefaultRegistry = MetricUtils.builder().build();
// None of these should throw exceptions
utilsWithNullRegistry.time("timer", 1000);
utilsWithNullRegistry.increment(this.getClass(), "counter", 1);
utilsWithNullRegistry.increment("counter", 1);
utilsWithNullRegistry.exceptionIncrement(this.getClass(), "error", new RuntimeException());
utilsWithNullRegistry.setGaugeValue(this.getClass(), "gauge", 42);
utilsWithNullRegistry.histogram(this.getClass(), "histogram", 100);
// All methods should work with the default CompositeMeterRegistry
utilsWithDefaultRegistry.time("timer", 1000);
utilsWithDefaultRegistry.increment(this.getClass(), "counter", 1);
utilsWithDefaultRegistry.increment("counter", 1);
utilsWithDefaultRegistry.exceptionIncrement(this.getClass(), "error", new RuntimeException());
utilsWithDefaultRegistry.setGaugeValue(this.getClass(), "gauge", 42);
utilsWithDefaultRegistry.histogram(this.getClass(), "histogram", 100);
// Verify the registry is the expected type
assertTrue(
utilsWithDefaultRegistry.getRegistry()
instanceof io.micrometer.core.instrument.composite.CompositeMeterRegistry);
}
@Test

View File

@ -155,32 +155,6 @@ public class MicrometerMetricsRegistryTest {
assertFalse(result);
}
@Test
public void testRegisterCacheWithNullNativeCache() {
// Given
String cacheName = "testCache";
// When
boolean result = MicrometerMetricsRegistry.registerCacheMetrics(cacheName, null, meterRegistry);
// Then
assertFalse(result);
}
@Test
public void testRegisterCacheWithNullCacheName() {
// Given
Cache<Object, Object> caffeineCache =
Caffeine.newBuilder().maximumSize(100).recordStats().build();
// When
boolean result =
MicrometerMetricsRegistry.registerCacheMetrics(null, caffeineCache, meterRegistry);
// Then
assertFalse(result); // Should return false for null cache name
}
@Test
public void testRegisterCacheWithUnsupportedCacheType() {
// Given
@ -352,30 +326,6 @@ public class MicrometerMetricsRegistryTest {
assertEquals(registeredKey, cacheName);
}
@Test
public void testAllNullParameters() {
// When
boolean result = MicrometerMetricsRegistry.registerCacheMetrics(null, null, null);
// Then
assertFalse(result);
}
@Test
public void testNullCheckOrder() {
// Test various combinations of null parameters
// Null cache name
Cache<Object, Object> caffeineCache = Caffeine.newBuilder().maximumSize(100).build();
assertFalse(MicrometerMetricsRegistry.registerCacheMetrics(null, caffeineCache, meterRegistry));
// Null native cache
assertFalse(MicrometerMetricsRegistry.registerCacheMetrics("test", null, meterRegistry));
// Null meter registry
assertFalse(MicrometerMetricsRegistry.registerCacheMetrics("test", caffeineCache, null));
}
@Test
public void testSuccessfulRegistrationDoesNotThrowOnSubsequentAttempts() {
// Given