mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-24 16:38:19 +00:00
feat(metrics): add modern Micrometer metrics PFP-1613 (#14661)
- Add micrometer metrics in RequestContext - Add a smoke test that verifies metrics via the prometheus endpoint
This commit is contained in:
parent
8b194cdd28
commit
80f206290e
@ -268,13 +268,19 @@ public class RequestContext implements ContextInterface {
|
||||
}
|
||||
|
||||
if (requestContext.getRequestAPI() != RequestAPI.TEST && metricUtils != null) {
|
||||
String agentClass = requestContext.getAgentClass().toLowerCase().replaceAll("\\s+", "");
|
||||
String requestAPI = requestContext.getRequestAPI().toString().toLowerCase();
|
||||
metricUtils.increment(
|
||||
String.format(
|
||||
"requestContext_%s_%s_%s",
|
||||
userCategory,
|
||||
requestContext.getAgentClass().toLowerCase().replaceAll("\\s+", ""),
|
||||
requestContext.getRequestAPI().toString().toLowerCase()),
|
||||
1);
|
||||
String.format("requestContext_%s_%s_%s", userCategory, agentClass, requestAPI), 1);
|
||||
metricUtils.incrementMicrometer(
|
||||
MetricUtils.DATAHUB_REQUEST_COUNT,
|
||||
1,
|
||||
"user.category",
|
||||
userCategory,
|
||||
"agent.class",
|
||||
agentClass,
|
||||
"request.api",
|
||||
requestAPI);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -272,9 +272,20 @@ public class RequestContextTest {
|
||||
.metricUtils(mockMetricUtils)
|
||||
.build();
|
||||
|
||||
// Verify that the counter was incremented
|
||||
// Verify that both legacy and new metrics are recorded
|
||||
verify(mockMetricUtils, atLeastOnce())
|
||||
.increment(eq("requestContext_system_unknown_restli"), eq(1.0d));
|
||||
|
||||
verify(mockMetricUtils, atLeastOnce())
|
||||
.incrementMicrometer(
|
||||
eq("datahub.request.count"),
|
||||
eq(1.0d),
|
||||
eq("user.category"),
|
||||
eq("system"),
|
||||
eq("agent.class"),
|
||||
eq("unknown"),
|
||||
eq("request.api"),
|
||||
eq("restli"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -286,9 +297,20 @@ public class RequestContextTest {
|
||||
.metricUtils(mockMetricUtils)
|
||||
.build();
|
||||
|
||||
// Verify that the counter was incremented
|
||||
// Verify that both legacy and new metrics are recorded
|
||||
verify(mockMetricUtils, atLeastOnce())
|
||||
.increment(eq("requestContext_admin_unknown_restli"), eq(1.0d));
|
||||
|
||||
verify(mockMetricUtils, atLeastOnce())
|
||||
.incrementMicrometer(
|
||||
eq("datahub.request.count"),
|
||||
eq(1.0d),
|
||||
eq("user.category"),
|
||||
eq("admin"),
|
||||
eq("agent.class"),
|
||||
eq("unknown"),
|
||||
eq("request.api"),
|
||||
eq("restli"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -300,9 +322,20 @@ public class RequestContextTest {
|
||||
.metricUtils(mockMetricUtils)
|
||||
.build();
|
||||
|
||||
// Verify that the counter was incremented
|
||||
// Verify that both legacy and new metrics are recorded
|
||||
verify(mockMetricUtils, atLeastOnce())
|
||||
.increment(eq("requestContext_regular_unknown_restli"), eq(1.0d));
|
||||
|
||||
verify(mockMetricUtils, atLeastOnce())
|
||||
.incrementMicrometer(
|
||||
eq("datahub.request.count"),
|
||||
eq(1.0d),
|
||||
eq("user.category"),
|
||||
eq("regular"),
|
||||
eq("agent.class"),
|
||||
eq("unknown"),
|
||||
eq("request.api"),
|
||||
eq("restli"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -24,6 +24,7 @@ public class MetricUtils {
|
||||
/* Micrometer */
|
||||
public static final String KAFKA_MESSAGE_QUEUE_TIME = "kafka.message.queue.time";
|
||||
public static final String DATAHUB_REQUEST_HOOK_QUEUE_TIME = "datahub.request.hook.queue.time";
|
||||
public static final String DATAHUB_REQUEST_COUNT = "datahub.request.count";
|
||||
|
||||
/* OpenTelemetry */
|
||||
public static final String CACHE_HIT_ATTR = "cache.hit";
|
||||
@ -47,6 +48,7 @@ public class MetricUtils {
|
||||
private static final Map<String, DistributionSummary> legacyHistogramCache =
|
||||
new ConcurrentHashMap<>();
|
||||
private static final Map<String, Gauge> legacyGaugeCache = new ConcurrentHashMap<>();
|
||||
private static final Map<String, Counter> micrometerCounterCache = new ConcurrentHashMap<>();
|
||||
// For state-based gauges (like throttled state)
|
||||
private static final Map<String, AtomicDouble> gaugeStates = new ConcurrentHashMap<>();
|
||||
|
||||
@ -102,6 +104,57 @@ public class MetricUtils {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment a counter using Micrometer metrics library.
|
||||
*
|
||||
* @param metricName The name of the metric
|
||||
* @param increment The value to increment by
|
||||
* @param tags The tags to associate with the metric (can be empty)
|
||||
*/
|
||||
public void incrementMicrometer(String metricName, double increment, String... tags) {
|
||||
getRegistry()
|
||||
.ifPresent(
|
||||
meterRegistry -> {
|
||||
// Create a cache key that includes both metric name and tags
|
||||
String cacheKey = createCacheKey(metricName, tags);
|
||||
Counter counter =
|
||||
micrometerCounterCache.computeIfAbsent(
|
||||
cacheKey, key -> meterRegistry.counter(metricName, tags));
|
||||
counter.increment(increment);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a cache key for a metric with its tags.
|
||||
*
|
||||
* <p>Examples:
|
||||
*
|
||||
* <ul>
|
||||
* <li>No tags: {@code createCacheKey("datahub.request.count")} returns {@code
|
||||
* "datahub.request.count"}
|
||||
* <li>With tags: {@code createCacheKey("datahub.request.count", "user_category", "regular",
|
||||
* "agent_class", "browser")} returns {@code
|
||||
* "datahub.request.count|user_category=regular|agent_class=browser"}
|
||||
* </ul>
|
||||
*
|
||||
* @param metricName the name of the metric
|
||||
* @param tags the tags to associate with the metric (key-value pairs)
|
||||
* @return a string key that uniquely identifies this metric+tags combination
|
||||
*/
|
||||
private String createCacheKey(String metricName, String... tags) {
|
||||
if (tags.length == 0) {
|
||||
return metricName;
|
||||
}
|
||||
|
||||
StringBuilder keyBuilder = new StringBuilder(metricName);
|
||||
for (int i = 0; i < tags.length; i += 2) {
|
||||
if (i + 1 < tags.length) {
|
||||
keyBuilder.append("|").append(tags[i]).append("=").append(tags[i + 1]);
|
||||
}
|
||||
}
|
||||
return keyBuilder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a state-based gauge value (e.g., for binary states like throttled/not throttled). This is
|
||||
* more efficient than repeatedly calling gauge() with different suppliers.
|
||||
|
||||
@ -400,4 +400,72 @@ public class MetricUtilsTest {
|
||||
assertEquals(result[1], 500.0);
|
||||
assertEquals(result[2], 1000.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementMicrometerBasicFunctionality() {
|
||||
String metricName = "test.micrometer.counter";
|
||||
double incrementValue = 2.5;
|
||||
|
||||
metricUtils.incrementMicrometer(metricName, incrementValue);
|
||||
|
||||
Counter counter = meterRegistry.counter(metricName);
|
||||
assertNotNull(counter);
|
||||
assertEquals(counter.count(), incrementValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementMicrometerWithTags() {
|
||||
String metricName = "test.micrometer.tagged";
|
||||
double incrementValue = 1.0;
|
||||
|
||||
metricUtils.incrementMicrometer(metricName, incrementValue, "env", "prod", "service", "api");
|
||||
|
||||
Counter counter = meterRegistry.counter(metricName, "env", "prod", "service", "api");
|
||||
assertNotNull(counter);
|
||||
assertEquals(counter.count(), incrementValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementMicrometerCachingBehavior() {
|
||||
String metricName = "test.cache.counter";
|
||||
|
||||
// First call should create the counter
|
||||
metricUtils.incrementMicrometer(metricName, 1.0);
|
||||
Counter counter1 = meterRegistry.counter(metricName);
|
||||
assertEquals(counter1.count(), 1.0);
|
||||
|
||||
// Second call should reuse the same counter
|
||||
metricUtils.incrementMicrometer(metricName, 2.0);
|
||||
Counter counter2 = meterRegistry.counter(metricName);
|
||||
assertSame(counter1, counter2); // Should be the exact same object due to caching
|
||||
assertEquals(counter2.count(), 3.0); // 1.0 + 2.0
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementMicrometerDifferentTagsCacheSeparately() {
|
||||
String metricName = "test.cache.tags";
|
||||
|
||||
// Create counters with different tags
|
||||
metricUtils.incrementMicrometer(metricName, 1.0, "env", "prod");
|
||||
metricUtils.incrementMicrometer(metricName, 2.0, "env", "dev");
|
||||
|
||||
Counter prodCounter = meterRegistry.counter(metricName, "env", "prod");
|
||||
Counter devCounter = meterRegistry.counter(metricName, "env", "dev");
|
||||
|
||||
assertNotSame(prodCounter, devCounter); // Different cache entries
|
||||
assertEquals(prodCounter.count(), 1.0);
|
||||
assertEquals(devCounter.count(), 2.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementMicrometerMultipleIncrementsOnSameCounter() {
|
||||
String metricName = "test.multiple.increments";
|
||||
|
||||
metricUtils.incrementMicrometer(metricName, 1.0, "type", "request");
|
||||
metricUtils.incrementMicrometer(metricName, 3.0, "type", "request");
|
||||
metricUtils.incrementMicrometer(metricName, 2.0, "type", "request");
|
||||
|
||||
Counter counter = meterRegistry.counter(metricName, "type", "request");
|
||||
assertEquals(counter.count(), 6.0); // 1.0 + 3.0 + 2.0
|
||||
}
|
||||
}
|
||||
|
||||
2
smoke-test/tests/metrics/__init__.py
Normal file
2
smoke-test/tests/metrics/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
# Metrics smoke tests package
|
||||
|
||||
58
smoke-test/tests/metrics/test_metrics.py
Normal file
58
smoke-test/tests/metrics/test_metrics.py
Normal file
@ -0,0 +1,58 @@
|
||||
"""
|
||||
Smoke test for DataHub-specific metrics.
|
||||
|
||||
This test verifies that:
|
||||
1. DataHub custom metrics are being generated
|
||||
2. The new incrementMicrometer metrics are working
|
||||
3. Request context metrics are being recorded
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
import requests
|
||||
|
||||
from tests.utils import get_gms_url
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_datahub_request_count_metric_present():
|
||||
"""Test that the new datahub_request_count metric is present in Prometheus output."""
|
||||
gms_url = get_gms_url()
|
||||
prometheus_url = f"{gms_url}/actuator/prometheus"
|
||||
|
||||
# Service initialization should already induce requests that will generate
|
||||
# metrics. So we don't need to trigger any requests as part of test setup.
|
||||
response = requests.get(prometheus_url)
|
||||
content = response.text
|
||||
|
||||
# Look specifically for the datahub_request_count metric
|
||||
metric_lines = []
|
||||
for line in content.split("\n"):
|
||||
line = line.strip()
|
||||
if line and not line.startswith("#"):
|
||||
if "datahub_request_count" in line:
|
||||
metric_lines.append(line)
|
||||
|
||||
logger.info(f"✅ Found {len(metric_lines)} datahub_request_count metric lines")
|
||||
for line in metric_lines:
|
||||
logger.info(f" - {line}")
|
||||
|
||||
# The metric should be present
|
||||
assert len(metric_lines) > 0, (
|
||||
"datahub_request_count metric not found in Prometheus output"
|
||||
)
|
||||
|
||||
# Verify that the metric has the expected tags
|
||||
expected_tags = ["user_category", "agent_class", "request_api"]
|
||||
logger.info(f"🔍 Checking for expected tags: {expected_tags}")
|
||||
|
||||
for metric_line in metric_lines:
|
||||
# Check if the metric line contains the expected tags
|
||||
has_expected_tags = all(tag in metric_line for tag in expected_tags)
|
||||
assert has_expected_tags, (
|
||||
f"Metric line missing expected tags. Line: {metric_line}, Expected tags: {expected_tags}"
|
||||
)
|
||||
logger.info(f"✅ Metric line has all expected tags: {metric_line}")
|
||||
|
||||
logger.info("🎉 All datahub_request_count metrics have the expected tags!")
|
||||
Loading…
x
Reference in New Issue
Block a user