mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-23 07:51:44 +00:00
parent
513350e623
commit
e9b91bb5d4
@ -111,6 +111,14 @@ public class MicrometerBundle implements ConfiguredBundle<OpenMetadataApplicatio
|
|||||||
.addMapping("/prometheus");
|
.addMapping("/prometheus");
|
||||||
|
|
||||||
LOG.info("Prometheus metrics endpoint registered at admin port on /prometheus");
|
LOG.info("Prometheus metrics endpoint registered at admin port on /prometheus");
|
||||||
|
|
||||||
|
// Register user metrics endpoint
|
||||||
|
environment
|
||||||
|
.admin()
|
||||||
|
.addServlet("user-metrics", new UserMetricsServlet())
|
||||||
|
.addMapping("/user-metrics");
|
||||||
|
|
||||||
|
LOG.info("User metrics endpoint registered at admin port on /user-metrics");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerJdbiMetrics(Environment environment) {
|
private void registerJdbiMetrics(Environment environment) {
|
||||||
|
@ -0,0 +1,242 @@
|
|||||||
|
package org.openmetadata.service.monitoring;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import jakarta.servlet.http.HttpServlet;
|
||||||
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
|
import jakarta.servlet.http.HttpServletResponse;
|
||||||
|
import jakarta.ws.rs.core.Response;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
|
||||||
|
import org.openmetadata.schema.dataInsight.type.DailyActiveUsers;
|
||||||
|
import org.openmetadata.schema.entity.teams.User;
|
||||||
|
import org.openmetadata.schema.type.DataReportIndex;
|
||||||
|
import org.openmetadata.schema.utils.JsonUtils;
|
||||||
|
import org.openmetadata.service.Entity;
|
||||||
|
import org.openmetadata.service.jdbi3.ListFilter;
|
||||||
|
import org.openmetadata.service.jdbi3.UserRepository;
|
||||||
|
import org.openmetadata.service.search.SearchRepository;
|
||||||
|
import org.openmetadata.service.util.EntityUtil;
|
||||||
|
import org.openmetadata.service.util.ResultList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Servlet that exposes user metrics on the admin port.
|
||||||
|
* Provides metrics including:
|
||||||
|
* - daily_active_users: Number of daily active users (from analytics if available)
|
||||||
|
* - last_activity: Last non-bot user activity timestamp
|
||||||
|
* - total_users: Total number of users
|
||||||
|
* - bot_users: Number of bot users
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class UserMetricsServlet extends HttpServlet {
|
||||||
|
private static final String CONTENT_TYPE = "application/json; charset=utf-8";
|
||||||
|
private final ObjectMapper objectMapper = JsonUtils.getObjectMapper();
|
||||||
|
private transient UserRepository userRepository;
|
||||||
|
private transient SearchRepository searchRepository;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
userRepository = (UserRepository) Entity.getEntityRepository(Entity.USER);
|
||||||
|
searchRepository = Entity.getSearchRepository();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
|
||||||
|
resp.setStatus(HttpServletResponse.SC_OK);
|
||||||
|
resp.setContentType(CONTENT_TYPE);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Map<String, Object> metrics = collectUserMetrics();
|
||||||
|
writeJsonResponse(resp, metrics);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error collecting user metrics", e);
|
||||||
|
handleErrorResponse(resp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeJsonResponse(HttpServletResponse resp, Object data) throws IOException {
|
||||||
|
try {
|
||||||
|
String json = objectMapper.writeValueAsString(data);
|
||||||
|
PrintWriter writer = resp.getWriter();
|
||||||
|
writer.write(json);
|
||||||
|
writer.flush();
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
LOG.error("Error serializing response to JSON", e);
|
||||||
|
throw new IOException("Failed to serialize response", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleErrorResponse(HttpServletResponse resp) throws IOException {
|
||||||
|
resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
||||||
|
Map<String, String> error = Map.of("error", "Failed to collect user metrics");
|
||||||
|
writeJsonResponse(resp, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> collectUserMetrics() {
|
||||||
|
Map<String, Object> metrics = new HashMap<>();
|
||||||
|
ListFilter filter = new ListFilter(null);
|
||||||
|
int totalUsers = userRepository.getDao().listCount(filter);
|
||||||
|
metrics.put("total_users", totalUsers);
|
||||||
|
|
||||||
|
ListFilter botFilter = new ListFilter(null);
|
||||||
|
botFilter.addQueryParam("isBot", "true");
|
||||||
|
int botUsers = userRepository.getDao().listCount(botFilter);
|
||||||
|
metrics.put("bot_users", botUsers);
|
||||||
|
|
||||||
|
Integer dailyActiveUsers = getDailyActiveUsers();
|
||||||
|
metrics.put("daily_active_users", dailyActiveUsers != null ? dailyActiveUsers : 0);
|
||||||
|
|
||||||
|
String lastActivity = getLastUserActivity();
|
||||||
|
metrics.put("last_activity", lastActivity);
|
||||||
|
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Integer getDailyActiveUsers() {
|
||||||
|
try {
|
||||||
|
Response response = fetchDailyActiveUsersResponse();
|
||||||
|
return extractActiveUsersCount(response);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Could not fetch daily active users from analytics", e);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Response fetchDailyActiveUsersResponse() throws IOException {
|
||||||
|
long endTs = System.currentTimeMillis();
|
||||||
|
long startTs = endTs - (24 * 60 * 60 * 1000); // 24 hours ago
|
||||||
|
|
||||||
|
return searchRepository.listDataInsightChartResult(
|
||||||
|
startTs,
|
||||||
|
endTs,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
DataInsightChartResult.DataInsightChartType.DAILY_ACTIVE_USERS,
|
||||||
|
1,
|
||||||
|
0,
|
||||||
|
null,
|
||||||
|
DataReportIndex.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA_INDEX.value());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Integer extractActiveUsersCount(Response response) {
|
||||||
|
if (!isValidResponse(response)) {
|
||||||
|
LOG.debug("No daily active users data found for the last 24 hours");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
Object entity = response.getEntity();
|
||||||
|
|
||||||
|
// The API might return either DataInsightChartResult or ResultList directly
|
||||||
|
if (entity instanceof DataInsightChartResult) {
|
||||||
|
DataInsightChartResult chartResult = (DataInsightChartResult) entity;
|
||||||
|
if (!hasValidData(chartResult)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return findMaxActiveUsers(chartResult.getData());
|
||||||
|
} else if (entity instanceof ResultList) {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
ResultList<Object> resultList = (ResultList<Object>) entity;
|
||||||
|
if (resultList.getData() == null || resultList.getData().isEmpty()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return findMaxActiveUsers(resultList.getData());
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug(
|
||||||
|
"Unexpected response type from daily active users API: {}", entity.getClass().getName());
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isValidResponse(Response response) {
|
||||||
|
return response != null && response.getStatus() == 200 && response.getEntity() != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean hasValidData(DataInsightChartResult chartResult) {
|
||||||
|
return chartResult != null && chartResult.getData() != null && !chartResult.getData().isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Integer findMaxActiveUsers(List<Object> dataList) {
|
||||||
|
int maxActiveUsers = 0;
|
||||||
|
for (Object obj : dataList) {
|
||||||
|
Integer activeUsers = extractActiveUsersFromObject(obj);
|
||||||
|
if (activeUsers != null) {
|
||||||
|
maxActiveUsers = Math.max(maxActiveUsers, activeUsers);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return maxActiveUsers;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Integer extractActiveUsersFromObject(Object obj) {
|
||||||
|
if (obj instanceof DailyActiveUsers dailyActiveUsers) {
|
||||||
|
return dailyActiveUsers.getActiveUsers();
|
||||||
|
} else if (obj instanceof Map) {
|
||||||
|
return extractActiveUsersFromMap((Map<?, ?>) obj);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private Integer extractActiveUsersFromMap(Map<?, ?> map) {
|
||||||
|
Map<String, Object> dauMap = (Map<String, Object>) map;
|
||||||
|
Object activeUsersObj = dauMap.get("activeUsers");
|
||||||
|
if (activeUsersObj instanceof Number number) {
|
||||||
|
return number.intValue();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getLastUserActivity() {
|
||||||
|
try {
|
||||||
|
ListFilter nonBotFilter = createNonBotFilter();
|
||||||
|
Long lastActivityTime = findMostRecentActivity(nonBotFilter, 1);
|
||||||
|
if (lastActivityTime != null) {
|
||||||
|
return Instant.ofEpochMilli(lastActivityTime).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
lastActivityTime = findMostRecentActivity(nonBotFilter, 100);
|
||||||
|
if (lastActivityTime != null) {
|
||||||
|
return Instant.ofEpochMilli(lastActivityTime).toString();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error getting last user activity", e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ListFilter createNonBotFilter() {
|
||||||
|
ListFilter filter = new ListFilter(null);
|
||||||
|
filter.addQueryParam("isBot", "false");
|
||||||
|
return filter;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long findMostRecentActivity(ListFilter filter, int limit) {
|
||||||
|
List<User> users =
|
||||||
|
userRepository
|
||||||
|
.listAfter(null, EntityUtil.Fields.EMPTY_FIELDS, filter, limit, null)
|
||||||
|
.getData();
|
||||||
|
long maxActivityTime = 0;
|
||||||
|
|
||||||
|
for (User user : users) {
|
||||||
|
Long activityTime = getUserActivityTime(user);
|
||||||
|
if (activityTime != null && activityTime > maxActivityTime) {
|
||||||
|
maxActivityTime = activityTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return maxActivityTime > 0 ? maxActivityTime : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long getUserActivityTime(User user) {
|
||||||
|
if (user.getLastActivityTime() != null) {
|
||||||
|
return user.getLastActivityTime();
|
||||||
|
}
|
||||||
|
return user.getUpdatedAt();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,257 @@
|
|||||||
|
package org.openmetadata.service.monitoring;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.Mockito.mockStatic;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
|
import jakarta.servlet.http.HttpServletResponse;
|
||||||
|
import jakarta.ws.rs.core.Response;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.io.StringWriter;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.MockedStatic;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
|
||||||
|
import org.openmetadata.schema.dataInsight.type.DailyActiveUsers;
|
||||||
|
import org.openmetadata.schema.entity.teams.User;
|
||||||
|
import org.openmetadata.schema.utils.JsonUtils;
|
||||||
|
import org.openmetadata.service.Entity;
|
||||||
|
import org.openmetadata.service.jdbi3.EntityDAO;
|
||||||
|
import org.openmetadata.service.jdbi3.ListFilter;
|
||||||
|
import org.openmetadata.service.jdbi3.UserRepository;
|
||||||
|
import org.openmetadata.service.search.SearchRepository;
|
||||||
|
import org.openmetadata.service.util.ResultList;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
class UserMetricsServletTest {
|
||||||
|
|
||||||
|
@Mock private UserRepository userRepository;
|
||||||
|
@Mock private SearchRepository searchRepository;
|
||||||
|
@Mock private EntityDAO<User> userDAO;
|
||||||
|
@Mock private HttpServletRequest request;
|
||||||
|
@Mock private HttpServletResponse response;
|
||||||
|
@Mock private Response searchResponse;
|
||||||
|
|
||||||
|
private UserMetricsServlet servlet;
|
||||||
|
private StringWriter stringWriter;
|
||||||
|
private final ObjectMapper objectMapper = JsonUtils.getObjectMapper();
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() throws Exception {
|
||||||
|
servlet = new UserMetricsServlet();
|
||||||
|
stringWriter = new StringWriter();
|
||||||
|
PrintWriter writer = new PrintWriter(stringWriter);
|
||||||
|
|
||||||
|
when(response.getWriter()).thenReturn(writer);
|
||||||
|
when(userRepository.getDao()).thenReturn(userDAO);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetUserMetricsSuccess() throws Exception {
|
||||||
|
// Mock the Entity static methods
|
||||||
|
try (MockedStatic<Entity> entityMock = mockStatic(Entity.class)) {
|
||||||
|
entityMock.when(() -> Entity.getEntityRepository(Entity.USER)).thenReturn(userRepository);
|
||||||
|
entityMock.when(Entity::getSearchRepository).thenReturn(searchRepository);
|
||||||
|
|
||||||
|
servlet.init();
|
||||||
|
|
||||||
|
// Setup total users count
|
||||||
|
when(userDAO.listCount(any(ListFilter.class))).thenReturn(10, 3); // 10 total, 3 bots
|
||||||
|
|
||||||
|
// Setup daily active users
|
||||||
|
DailyActiveUsers dailyActiveUsers = new DailyActiveUsers();
|
||||||
|
dailyActiveUsers.setActiveUsers(5);
|
||||||
|
dailyActiveUsers.setTimestamp(System.currentTimeMillis());
|
||||||
|
|
||||||
|
ResultList<DailyActiveUsers> dauResults = new ResultList<>();
|
||||||
|
dauResults.setData(Collections.singletonList(dailyActiveUsers));
|
||||||
|
|
||||||
|
when(searchResponse.getStatus()).thenReturn(200);
|
||||||
|
when(searchResponse.getEntity()).thenReturn(dauResults);
|
||||||
|
when(searchRepository.listDataInsightChartResult(
|
||||||
|
any(Long.class),
|
||||||
|
any(Long.class),
|
||||||
|
any(),
|
||||||
|
any(),
|
||||||
|
eq(DataInsightChartResult.DataInsightChartType.DAILY_ACTIVE_USERS),
|
||||||
|
anyInt(),
|
||||||
|
anyInt(),
|
||||||
|
any(),
|
||||||
|
any()))
|
||||||
|
.thenReturn(searchResponse);
|
||||||
|
|
||||||
|
// Setup last activity
|
||||||
|
long lastActivityTime = System.currentTimeMillis() - 3600000; // 1 hour ago
|
||||||
|
User activeUser = new User();
|
||||||
|
activeUser.setName("testuser");
|
||||||
|
activeUser.setIsBot(false);
|
||||||
|
activeUser.setLastActivityTime(lastActivityTime);
|
||||||
|
|
||||||
|
ResultList<User> userResults = new ResultList<>();
|
||||||
|
userResults.setData(Collections.singletonList(activeUser));
|
||||||
|
|
||||||
|
when(userRepository.listAfter(any(), any(), any(ListFilter.class), anyInt(), any()))
|
||||||
|
.thenReturn(userResults);
|
||||||
|
servlet.doGet(request, response);
|
||||||
|
verify(response).setStatus(HttpServletResponse.SC_OK);
|
||||||
|
verify(response).setContentType("application/json; charset=utf-8");
|
||||||
|
String jsonResponse = stringWriter.toString();
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Map<String, Object> metrics = objectMapper.readValue(jsonResponse, Map.class);
|
||||||
|
|
||||||
|
assertEquals(10, metrics.get("total_users"));
|
||||||
|
assertEquals(3, metrics.get("bot_users"));
|
||||||
|
assertEquals(5, metrics.get("daily_active_users"));
|
||||||
|
assertEquals(Instant.ofEpochMilli(lastActivityTime).toString(), metrics.get("last_activity"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetUserMetricsNoDailyActiveUsers() throws Exception {
|
||||||
|
try (MockedStatic<Entity> entityMock = mockStatic(Entity.class)) {
|
||||||
|
entityMock.when(() -> Entity.getEntityRepository(Entity.USER)).thenReturn(userRepository);
|
||||||
|
entityMock.when(Entity::getSearchRepository).thenReturn(searchRepository);
|
||||||
|
|
||||||
|
servlet.init();
|
||||||
|
|
||||||
|
// Setup counts
|
||||||
|
when(userDAO.listCount(any(ListFilter.class))).thenReturn(8, 2);
|
||||||
|
|
||||||
|
// No daily active users data
|
||||||
|
when(searchResponse.getStatus()).thenReturn(200);
|
||||||
|
when(searchResponse.getEntity()).thenReturn(new ResultList<>());
|
||||||
|
when(searchRepository.listDataInsightChartResult(
|
||||||
|
any(Long.class),
|
||||||
|
any(Long.class),
|
||||||
|
any(),
|
||||||
|
any(),
|
||||||
|
any(),
|
||||||
|
anyInt(),
|
||||||
|
anyInt(),
|
||||||
|
any(),
|
||||||
|
any()))
|
||||||
|
.thenReturn(searchResponse);
|
||||||
|
|
||||||
|
// No users with activity
|
||||||
|
ResultList<User> emptyResults = new ResultList<>();
|
||||||
|
emptyResults.setData(Collections.emptyList());
|
||||||
|
when(userRepository.listAfter(any(), any(), any(ListFilter.class), anyInt(), any()))
|
||||||
|
.thenReturn(emptyResults);
|
||||||
|
|
||||||
|
servlet.doGet(request, response);
|
||||||
|
|
||||||
|
String jsonResponse = stringWriter.toString();
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Map<String, Object> metrics = objectMapper.readValue(jsonResponse, Map.class);
|
||||||
|
|
||||||
|
assertEquals(8, metrics.get("total_users"));
|
||||||
|
assertEquals(2, metrics.get("bot_users"));
|
||||||
|
assertEquals(0, metrics.get("daily_active_users"));
|
||||||
|
assertNull(metrics.get("last_activity"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetUserMetricsError() throws Exception {
|
||||||
|
try (MockedStatic<Entity> entityMock = mockStatic(Entity.class)) {
|
||||||
|
entityMock.when(() -> Entity.getEntityRepository(Entity.USER)).thenReturn(userRepository);
|
||||||
|
entityMock.when(Entity::getSearchRepository).thenReturn(searchRepository);
|
||||||
|
servlet.init();
|
||||||
|
when(userDAO.listCount(any(ListFilter.class)))
|
||||||
|
.thenThrow(new RuntimeException("Database error"));
|
||||||
|
|
||||||
|
servlet.doGet(request, response);
|
||||||
|
verify(response).setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
||||||
|
String jsonResponse = stringWriter.toString();
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Map<String, Object> error = objectMapper.readValue(jsonResponse, Map.class);
|
||||||
|
|
||||||
|
assertEquals("Failed to collect user metrics", error.get("error"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetUserMetricsWithMultipleUsers() throws Exception {
|
||||||
|
try (MockedStatic<Entity> entityMock = mockStatic(Entity.class)) {
|
||||||
|
entityMock.when(() -> Entity.getEntityRepository(Entity.USER)).thenReturn(userRepository);
|
||||||
|
entityMock.when(Entity::getSearchRepository).thenReturn(searchRepository);
|
||||||
|
|
||||||
|
servlet.init();
|
||||||
|
|
||||||
|
when(userDAO.listCount(any(ListFilter.class))).thenReturn(15, 5);
|
||||||
|
|
||||||
|
DailyActiveUsers dau1 = new DailyActiveUsers();
|
||||||
|
dau1.setActiveUsers(3);
|
||||||
|
dau1.setTimestamp(System.currentTimeMillis() - 86400000);
|
||||||
|
|
||||||
|
DailyActiveUsers dau2 = new DailyActiveUsers();
|
||||||
|
dau2.setActiveUsers(7);
|
||||||
|
dau2.setTimestamp(System.currentTimeMillis());
|
||||||
|
|
||||||
|
ResultList<DailyActiveUsers> dauResults = new ResultList<>();
|
||||||
|
dauResults.setData(Arrays.asList(dau1, dau2));
|
||||||
|
|
||||||
|
when(searchResponse.getStatus()).thenReturn(200);
|
||||||
|
when(searchResponse.getEntity()).thenReturn(dauResults);
|
||||||
|
when(searchRepository.listDataInsightChartResult(
|
||||||
|
any(Long.class),
|
||||||
|
any(Long.class),
|
||||||
|
any(),
|
||||||
|
any(),
|
||||||
|
any(),
|
||||||
|
anyInt(),
|
||||||
|
anyInt(),
|
||||||
|
any(),
|
||||||
|
any()))
|
||||||
|
.thenReturn(searchResponse);
|
||||||
|
|
||||||
|
// Setup users with different activity times
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
User user1 = new User();
|
||||||
|
user1.setName("user1");
|
||||||
|
user1.setIsBot(false);
|
||||||
|
user1.setLastActivityTime(now - 7200000); // 2 hours ago
|
||||||
|
|
||||||
|
User user2 = new User();
|
||||||
|
user2.setName("user2");
|
||||||
|
user2.setIsBot(false);
|
||||||
|
user2.setLastActivityTime(now - 3600000); // 1 hour ago (most recent)
|
||||||
|
|
||||||
|
User botUser = new User();
|
||||||
|
botUser.setName("bot");
|
||||||
|
botUser.setIsBot(true);
|
||||||
|
botUser.setLastActivityTime(now); // Should be ignored
|
||||||
|
|
||||||
|
ResultList<User> userResults = new ResultList<>();
|
||||||
|
userResults.setData(Arrays.asList(user1, user2));
|
||||||
|
|
||||||
|
when(userRepository.listAfter(any(), any(), any(ListFilter.class), anyInt(), any()))
|
||||||
|
.thenReturn(userResults);
|
||||||
|
|
||||||
|
servlet.doGet(request, response);
|
||||||
|
|
||||||
|
String jsonResponse = stringWriter.toString();
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Map<String, Object> metrics = objectMapper.readValue(jsonResponse, Map.class);
|
||||||
|
|
||||||
|
assertEquals(15, metrics.get("total_users"));
|
||||||
|
assertEquals(5, metrics.get("bot_users"));
|
||||||
|
assertEquals(7, metrics.get("daily_active_users")); // Should use the latest value
|
||||||
|
assertEquals(Instant.ofEpochMilli(now - 3600000).toString(), metrics.get("last_activity"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,9 @@
|
|||||||
package org.openmetadata.service.resources;
|
package org.openmetadata.service.resources;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
|
@ -0,0 +1,332 @@
|
|||||||
|
package org.openmetadata.service.resources;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
import static org.openmetadata.service.security.SecurityUtil.authHeaders;
|
||||||
|
import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import jakarta.ws.rs.client.WebTarget;
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.TestInfo;
|
||||||
|
import org.openmetadata.schema.api.teams.CreateUser;
|
||||||
|
import org.openmetadata.schema.entity.teams.User;
|
||||||
|
import org.openmetadata.schema.utils.JsonUtils;
|
||||||
|
import org.openmetadata.service.OpenMetadataApplicationTest;
|
||||||
|
import org.openmetadata.service.resources.teams.UserResourceTest;
|
||||||
|
import org.openmetadata.service.security.auth.UserActivityTracker;
|
||||||
|
import org.openmetadata.service.util.TestUtils;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
class UserMetricsResourceTest extends OpenMetadataApplicationTest {
|
||||||
|
|
||||||
|
private final ObjectMapper objectMapper = JsonUtils.getObjectMapper();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testUserMetricsEndpoint() throws Exception {
|
||||||
|
int adminPort = APP.getAdminPort();
|
||||||
|
URL url = URI.create("http://localhost:" + adminPort + "/user-metrics").toURL();
|
||||||
|
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
|
||||||
|
connection.setRequestMethod("GET");
|
||||||
|
connection.setConnectTimeout(5000);
|
||||||
|
connection.setReadTimeout(5000);
|
||||||
|
|
||||||
|
try {
|
||||||
|
int responseCode = connection.getResponseCode();
|
||||||
|
assertEquals(200, responseCode, "User metrics endpoint should return 200 OK");
|
||||||
|
|
||||||
|
String contentType = connection.getContentType();
|
||||||
|
assertNotNull(contentType, "Content type should not be null");
|
||||||
|
assertTrue(
|
||||||
|
contentType.contains("application/json"),
|
||||||
|
"Content type should be application/json, but was: " + contentType);
|
||||||
|
|
||||||
|
try (BufferedReader reader =
|
||||||
|
new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
|
||||||
|
String response = reader.lines().collect(Collectors.joining("\n"));
|
||||||
|
assertFalse(response.isEmpty(), "User metrics response should not be empty");
|
||||||
|
verifyMetricsStructure(response);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testUserMetricsEndpointPerformance() throws Exception {
|
||||||
|
int adminPort = APP.getAdminPort();
|
||||||
|
URL url = URI.create("http://localhost:" + adminPort + "/user-metrics").toURL();
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
|
||||||
|
connection.setRequestMethod("GET");
|
||||||
|
int responseCode = connection.getResponseCode();
|
||||||
|
long duration = System.currentTimeMillis() - startTime;
|
||||||
|
|
||||||
|
assertEquals(200, responseCode);
|
||||||
|
assertTrue(
|
||||||
|
duration < 2000,
|
||||||
|
"User metrics endpoint should respond within 2 seconds, took: " + duration + "ms");
|
||||||
|
connection.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testUserMetricsContent() throws Exception {
|
||||||
|
int adminPort = APP.getAdminPort();
|
||||||
|
URL url = URI.create("http://localhost:" + adminPort + "/user-metrics").toURL();
|
||||||
|
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
|
||||||
|
connection.setRequestMethod("GET");
|
||||||
|
|
||||||
|
try {
|
||||||
|
assertEquals(200, connection.getResponseCode());
|
||||||
|
|
||||||
|
try (BufferedReader reader =
|
||||||
|
new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
|
||||||
|
String response = reader.lines().collect(Collectors.joining("\n"));
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Map<String, Object> metrics = objectMapper.readValue(response, Map.class);
|
||||||
|
|
||||||
|
assertTrue(metrics.containsKey("total_users"), "Should contain total_users");
|
||||||
|
assertTrue(metrics.containsKey("bot_users"), "Should contain bot_users");
|
||||||
|
assertTrue(metrics.containsKey("daily_active_users"), "Should contain daily_active_users");
|
||||||
|
assertTrue(metrics.containsKey("last_activity"), "Should contain last_activity");
|
||||||
|
|
||||||
|
assertInstanceOf(
|
||||||
|
Integer.class, metrics.get("total_users"), "total_users should be an integer");
|
||||||
|
assertInstanceOf(Integer.class, metrics.get("bot_users"), "bot_users should be an integer");
|
||||||
|
assertInstanceOf(
|
||||||
|
Integer.class,
|
||||||
|
metrics.get("daily_active_users"),
|
||||||
|
"daily_active_users should be an integer");
|
||||||
|
|
||||||
|
int totalUsers = (Integer) metrics.get("total_users");
|
||||||
|
int botUsers = (Integer) metrics.get("bot_users");
|
||||||
|
assertTrue(totalUsers >= 0, "Total users should be non-negative");
|
||||||
|
assertTrue(botUsers >= 0, "Bot users should be non-negative");
|
||||||
|
assertTrue(botUsers <= totalUsers, "Bot users should not exceed total users");
|
||||||
|
|
||||||
|
int dailyActiveUsers = (Integer) metrics.get("daily_active_users");
|
||||||
|
assertTrue(dailyActiveUsers >= 0, "Daily active users should be non-negative");
|
||||||
|
|
||||||
|
Object lastActivity = metrics.get("last_activity");
|
||||||
|
if (lastActivity != null) {
|
||||||
|
assertInstanceOf(
|
||||||
|
String.class, lastActivity, "last_activity should be a string when present");
|
||||||
|
String timestamp = (String) lastActivity;
|
||||||
|
assertTrue(
|
||||||
|
timestamp.matches("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.*"),
|
||||||
|
"last_activity should be in ISO format: " + timestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testUserMetricsWithSystemUsers() throws Exception {
|
||||||
|
// This test verifies that system users (like ingestion-bot) are properly counted
|
||||||
|
int adminPort = APP.getAdminPort();
|
||||||
|
URL url = URI.create("http://localhost:" + adminPort + "/user-metrics").toURL();
|
||||||
|
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
|
||||||
|
connection.setRequestMethod("GET");
|
||||||
|
|
||||||
|
try {
|
||||||
|
assertEquals(200, connection.getResponseCode());
|
||||||
|
|
||||||
|
try (BufferedReader reader =
|
||||||
|
new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
|
||||||
|
String response = reader.lines().collect(Collectors.joining("\n"));
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Map<String, Object> metrics = objectMapper.readValue(response, Map.class);
|
||||||
|
|
||||||
|
int botUsers = (Integer) metrics.get("bot_users");
|
||||||
|
// In the test environment, we should have at least the ingestion-bot
|
||||||
|
assertTrue(botUsers >= 1, "Should have at least one bot user (ingestion-bot)");
|
||||||
|
|
||||||
|
LOG.info("User metrics: {}", metrics);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyMetricsStructure(String response) {
|
||||||
|
try {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Map<String, Object> metrics = objectMapper.readValue(response, Map.class);
|
||||||
|
assertNotNull(metrics, "Metrics should not be null");
|
||||||
|
assertEquals(4, metrics.size(), "Should have exactly 4 metrics");
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Failed to parse metrics JSON: " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testUserMetricsWithRealActivity(TestInfo test) throws Exception {
|
||||||
|
Map<String, Object> initialMetrics = getUserMetrics();
|
||||||
|
LOG.info("Initial metrics: {}", initialMetrics);
|
||||||
|
|
||||||
|
int initialTotalUsers = (Integer) initialMetrics.get("total_users");
|
||||||
|
int initialBotUsers = (Integer) initialMetrics.get("bot_users");
|
||||||
|
|
||||||
|
UserResourceTest userResourceTest = new UserResourceTest();
|
||||||
|
CreateUser createUser =
|
||||||
|
userResourceTest
|
||||||
|
.createRequest(test)
|
||||||
|
.withName("metrics-test-user-" + UUID.randomUUID())
|
||||||
|
.withEmail("metrics-test@example.com")
|
||||||
|
.withIsBot(false);
|
||||||
|
|
||||||
|
User newUser = createUserAndLogin(createUser);
|
||||||
|
LOG.info("Created new user: {}", newUser.getName());
|
||||||
|
|
||||||
|
Map<String, String> userAuthHeaders = authHeaders(newUser.getName() + "@example.com");
|
||||||
|
|
||||||
|
WebTarget target = getResource("users/name/" + newUser.getName());
|
||||||
|
TestUtils.get(target, User.class, userAuthHeaders);
|
||||||
|
|
||||||
|
UserActivityTracker.getInstance().forceFlushSync();
|
||||||
|
TestUtils.simulateWork(2);
|
||||||
|
Map<String, Object> updatedMetrics = getUserMetrics();
|
||||||
|
LOG.info("Updated metrics after activity: {}", updatedMetrics);
|
||||||
|
int updatedTotalUsers = (Integer) updatedMetrics.get("total_users");
|
||||||
|
assertEquals(initialTotalUsers + 1, updatedTotalUsers, "Total users should increase by 1");
|
||||||
|
assertEquals(
|
||||||
|
initialBotUsers, updatedMetrics.get("bot_users"), "Bot users count should remain the same");
|
||||||
|
|
||||||
|
String lastActivity = (String) updatedMetrics.get("last_activity");
|
||||||
|
assertNotNull(lastActivity, "Last activity should not be null after user activity");
|
||||||
|
|
||||||
|
Instant lastActivityTime = Instant.parse(lastActivity);
|
||||||
|
Instant now = Instant.now();
|
||||||
|
long secondsSinceActivity = now.getEpochSecond() - lastActivityTime.getEpochSecond();
|
||||||
|
assertTrue(
|
||||||
|
secondsSinceActivity < 60,
|
||||||
|
"Last activity should be within last 60 seconds, but was "
|
||||||
|
+ secondsSinceActivity
|
||||||
|
+ " seconds ago");
|
||||||
|
|
||||||
|
// Note: Daily active users might not update immediately as it depends on
|
||||||
|
// the analytics pipeline, so we just verify it's non-negative
|
||||||
|
int dailyActiveUsers = (Integer) updatedMetrics.get("daily_active_users");
|
||||||
|
assertTrue(dailyActiveUsers >= 0, "Daily active users should be non-negative");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testUserMetricsWithBotUser() throws Exception {
|
||||||
|
// This test verifies that bot activity is NOT counted in last_activity metric
|
||||||
|
|
||||||
|
// Get initial metrics
|
||||||
|
Map<String, Object> initialMetrics = getUserMetrics();
|
||||||
|
int initialBotUsers = (Integer) initialMetrics.get("bot_users");
|
||||||
|
int initialTotalUsers = (Integer) initialMetrics.get("total_users");
|
||||||
|
assertTrue(initialBotUsers >= 0, "Bot users count should be non-negative");
|
||||||
|
assertTrue(initialBotUsers <= initialTotalUsers, "Bot users should not exceed total users");
|
||||||
|
|
||||||
|
LOG.info(
|
||||||
|
"Bot user filtering is implemented in UserMetricsServlet.createNonBotFilter() which adds isBot=false filter");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testUserMetricsMultipleUsers(TestInfo test) throws Exception {
|
||||||
|
UserResourceTest userResourceTest = new UserResourceTest();
|
||||||
|
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
CreateUser createUser =
|
||||||
|
userResourceTest
|
||||||
|
.createRequest(test)
|
||||||
|
.withName("multi-test-user-" + i + "-" + UUID.randomUUID())
|
||||||
|
.withEmail("multi-user-" + i + "@example.com")
|
||||||
|
.withIsBot(false);
|
||||||
|
|
||||||
|
User user = createUserAndLogin(createUser);
|
||||||
|
Map<String, String> authHeaders = authHeaders(user.getName() + "@example.com");
|
||||||
|
WebTarget target = getResource("users/name/" + user.getName());
|
||||||
|
TestUtils.get(target, User.class, authHeaders);
|
||||||
|
TestUtils.simulateWork(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
UserActivityTracker.getInstance().forceFlushSync();
|
||||||
|
TestUtils.simulateWork(3); // Increased wait time for database propagation
|
||||||
|
|
||||||
|
Map<String, Object> metrics = getUserMetrics();
|
||||||
|
LOG.info("Metrics after multiple users: {}", metrics);
|
||||||
|
|
||||||
|
String lastActivity = (String) metrics.get("last_activity");
|
||||||
|
assertNotNull(lastActivity, "Last activity should not be null");
|
||||||
|
|
||||||
|
// Verify activity is very recent (within last 60 seconds to account for test execution time and
|
||||||
|
// async updates)
|
||||||
|
Instant lastActivityTime = Instant.parse(lastActivity);
|
||||||
|
Instant now = Instant.now();
|
||||||
|
long secondsSinceActivity = now.getEpochSecond() - lastActivityTime.getEpochSecond();
|
||||||
|
assertTrue(
|
||||||
|
secondsSinceActivity < 60,
|
||||||
|
"Last activity should be very recent, but was " + secondsSinceActivity + " seconds ago");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Daily Active Users tests - simplified since servlet runs in separate context
|
||||||
|
@Test
|
||||||
|
void testDailyActiveUsersMetric() throws Exception {
|
||||||
|
Map<String, Object> metrics = getUserMetrics();
|
||||||
|
assertTrue(
|
||||||
|
metrics.containsKey("daily_active_users"), "Metrics should include daily_active_users");
|
||||||
|
Object dauValue = metrics.get("daily_active_users");
|
||||||
|
assertNotNull(dauValue, "Daily active users should not be null");
|
||||||
|
assertInstanceOf(Integer.class, dauValue, "Daily active users should be an integer");
|
||||||
|
assertTrue((Integer) dauValue >= 0, "Daily active users should be non-negative");
|
||||||
|
|
||||||
|
LOG.info("Daily active users implementation handles missing data by returning 0");
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> getUserMetrics() throws Exception {
|
||||||
|
int adminPort = APP.getAdminPort();
|
||||||
|
URL url = URI.create("http://localhost:" + adminPort + "/user-metrics").toURL();
|
||||||
|
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
|
||||||
|
connection.setRequestMethod("GET");
|
||||||
|
connection.setConnectTimeout(5000);
|
||||||
|
connection.setReadTimeout(5000);
|
||||||
|
|
||||||
|
try {
|
||||||
|
assertEquals(200, connection.getResponseCode());
|
||||||
|
|
||||||
|
try (BufferedReader reader =
|
||||||
|
new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
|
||||||
|
String response = reader.lines().collect(Collectors.joining("\n"));
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Map<String, Object> metrics = objectMapper.readValue(response, Map.class);
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private User createUserAndLogin(CreateUser createUser) throws Exception {
|
||||||
|
WebTarget target = getResource("users");
|
||||||
|
User user = TestUtils.post(target, createUser, User.class, ADMIN_AUTH_HEADERS);
|
||||||
|
|
||||||
|
Map<String, String> userAuthHeaders = authHeaders(user.getName() + "@example.com");
|
||||||
|
TestUtils.get(getResource("users/name/" + user.getName()), User.class, userAuthHeaders);
|
||||||
|
|
||||||
|
return user;
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user