mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-11 00:35:22 +00:00
Fix User Metrics (#22922)
Co-authored-by: Ajith Prasad <37380177+aji-aju@users.noreply.github.com>
This commit is contained in:
parent
981ffc28ab
commit
11ba92a1b1
@ -5347,6 +5347,13 @@ public interface CollectionDAO {
|
|||||||
+ "LIMIT 1",
|
+ "LIMIT 1",
|
||||||
connectionType = POSTGRES)
|
connectionType = POSTGRES)
|
||||||
Long getMaxLastActivityTime();
|
Long getMaxLastActivityTime();
|
||||||
|
|
||||||
|
@SqlQuery(
|
||||||
|
"SELECT COUNT(DISTINCT id) FROM user_entity "
|
||||||
|
+ "WHERE isBot = false "
|
||||||
|
+ "AND deleted = false "
|
||||||
|
+ "AND lastActivityTime >= :since")
|
||||||
|
int countDailyActiveUsers(@Bind("since") long since);
|
||||||
}
|
}
|
||||||
|
|
||||||
interface ChangeEventDAO {
|
interface ChangeEventDAO {
|
||||||
|
@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||||||
import jakarta.servlet.http.HttpServlet;
|
import jakarta.servlet.http.HttpServlet;
|
||||||
import jakarta.servlet.http.HttpServletRequest;
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
import jakarta.servlet.http.HttpServletResponse;
|
import jakarta.servlet.http.HttpServletResponse;
|
||||||
import jakarta.ws.rs.core.Response;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
@ -14,18 +13,13 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
|
|
||||||
import org.openmetadata.schema.dataInsight.type.DailyActiveUsers;
|
|
||||||
import org.openmetadata.schema.entity.teams.User;
|
import org.openmetadata.schema.entity.teams.User;
|
||||||
import org.openmetadata.schema.type.DataReportIndex;
|
|
||||||
import org.openmetadata.schema.utils.JsonUtils;
|
import org.openmetadata.schema.utils.JsonUtils;
|
||||||
import org.openmetadata.service.Entity;
|
import org.openmetadata.service.Entity;
|
||||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||||
import org.openmetadata.service.jdbi3.ListFilter;
|
import org.openmetadata.service.jdbi3.ListFilter;
|
||||||
import org.openmetadata.service.jdbi3.UserRepository;
|
import org.openmetadata.service.jdbi3.UserRepository;
|
||||||
import org.openmetadata.service.search.SearchRepository;
|
|
||||||
import org.openmetadata.service.util.EntityUtil;
|
import org.openmetadata.service.util.EntityUtil;
|
||||||
import org.openmetadata.service.util.ResultList;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Servlet that exposes user metrics on the admin port.
|
* Servlet that exposes user metrics on the admin port.
|
||||||
@ -40,12 +34,10 @@ public class UserMetricsServlet extends HttpServlet {
|
|||||||
private static final String CONTENT_TYPE = "application/json; charset=utf-8";
|
private static final String CONTENT_TYPE = "application/json; charset=utf-8";
|
||||||
private final ObjectMapper objectMapper = JsonUtils.getObjectMapper();
|
private final ObjectMapper objectMapper = JsonUtils.getObjectMapper();
|
||||||
private transient UserRepository userRepository;
|
private transient UserRepository userRepository;
|
||||||
private transient SearchRepository searchRepository;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() {
|
public void init() {
|
||||||
userRepository = (UserRepository) Entity.getEntityRepository(Entity.USER);
|
userRepository = (UserRepository) Entity.getEntityRepository(Entity.USER);
|
||||||
searchRepository = Entity.getSearchRepository();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -102,97 +94,15 @@ public class UserMetricsServlet extends HttpServlet {
|
|||||||
|
|
||||||
private Integer getDailyActiveUsers() {
|
private Integer getDailyActiveUsers() {
|
||||||
try {
|
try {
|
||||||
Response response = fetchDailyActiveUsersResponse();
|
long twentyFourHoursAgo = System.currentTimeMillis() - (24 * 60 * 60 * 1000);
|
||||||
return extractActiveUsersCount(response);
|
return ((CollectionDAO.UserDAO) userRepository.getDao())
|
||||||
|
.countDailyActiveUsers(twentyFourHoursAgo);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Could not fetch daily active users from analytics", e);
|
LOG.warn("Could not fetch daily active users from database", e);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Response fetchDailyActiveUsersResponse() throws IOException {
|
|
||||||
long endTs = System.currentTimeMillis();
|
|
||||||
long startTs = endTs - (24 * 60 * 60 * 1000); // 24 hours ago
|
|
||||||
|
|
||||||
return searchRepository.listDataInsightChartResult(
|
|
||||||
startTs,
|
|
||||||
endTs,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
DataInsightChartResult.DataInsightChartType.DAILY_ACTIVE_USERS,
|
|
||||||
1,
|
|
||||||
0,
|
|
||||||
null,
|
|
||||||
DataReportIndex.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA_INDEX.value());
|
|
||||||
}
|
|
||||||
|
|
||||||
private Integer extractActiveUsersCount(Response response) {
|
|
||||||
if (!isValidResponse(response)) {
|
|
||||||
LOG.debug("No daily active users data found for the last 24 hours");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
Object entity = response.getEntity();
|
|
||||||
|
|
||||||
// The API might return either DataInsightChartResult or ResultList directly
|
|
||||||
if (entity instanceof DataInsightChartResult) {
|
|
||||||
DataInsightChartResult chartResult = (DataInsightChartResult) entity;
|
|
||||||
if (!hasValidData(chartResult)) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
return findMaxActiveUsers(chartResult.getData());
|
|
||||||
} else if (entity instanceof ResultList) {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
ResultList<Object> resultList = (ResultList<Object>) entity;
|
|
||||||
if (resultList.getData() == null || resultList.getData().isEmpty()) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
return findMaxActiveUsers(resultList.getData());
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.debug(
|
|
||||||
"Unexpected response type from daily active users API: {}", entity.getClass().getName());
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isValidResponse(Response response) {
|
|
||||||
return response != null && response.getStatus() == 200 && response.getEntity() != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean hasValidData(DataInsightChartResult chartResult) {
|
|
||||||
return chartResult != null && chartResult.getData() != null && !chartResult.getData().isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Integer findMaxActiveUsers(List<Object> dataList) {
|
|
||||||
int maxActiveUsers = 0;
|
|
||||||
for (Object obj : dataList) {
|
|
||||||
Integer activeUsers = extractActiveUsersFromObject(obj);
|
|
||||||
if (activeUsers != null) {
|
|
||||||
maxActiveUsers = Math.max(maxActiveUsers, activeUsers);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return maxActiveUsers;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Integer extractActiveUsersFromObject(Object obj) {
|
|
||||||
if (obj instanceof DailyActiveUsers dailyActiveUsers) {
|
|
||||||
return dailyActiveUsers.getActiveUsers();
|
|
||||||
} else if (obj instanceof Map) {
|
|
||||||
return extractActiveUsersFromMap((Map<?, ?>) obj);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private Integer extractActiveUsersFromMap(Map<?, ?> map) {
|
|
||||||
Map<String, Object> dauMap = (Map<String, Object>) map;
|
|
||||||
Object activeUsersObj = dauMap.get("activeUsers");
|
|
||||||
if (activeUsersObj instanceof Number number) {
|
|
||||||
return number.intValue();
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getLastUserActivity() {
|
private String getLastUserActivity() {
|
||||||
try {
|
try {
|
||||||
ListFilter nonBotFilter = createNonBotFilter();
|
ListFilter nonBotFilter = createNonBotFilter();
|
||||||
|
@ -36,11 +36,15 @@ public class UserActivityFilter implements ContainerRequestFilter {
|
|||||||
public void filter(ContainerRequestContext requestContext) {
|
public void filter(ContainerRequestContext requestContext) {
|
||||||
SecurityContext securityContext = requestContext.getSecurityContext();
|
SecurityContext securityContext = requestContext.getSecurityContext();
|
||||||
if (securityContext == null || securityContext.getUserPrincipal() == null) {
|
if (securityContext == null || securityContext.getUserPrincipal() == null) {
|
||||||
|
LOG.trace(
|
||||||
|
"No security context or principal, skipping activity tracking for path: {}",
|
||||||
|
requestContext.getUriInfo().getPath());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
String userName = securityContext.getUserPrincipal().getName();
|
String userName = securityContext.getUserPrincipal().getName();
|
||||||
if (userName == null || userName.isEmpty()) {
|
if (userName == null || userName.isEmpty()) {
|
||||||
|
LOG.trace("No username found in principal, skipping activity tracking");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -50,8 +54,10 @@ public class UserActivityFilter implements ContainerRequestFilter {
|
|||||||
LOG.trace("User {} is a bot, skipping activity tracking", userName);
|
LOG.trace("User {} is a bot, skipping activity tracking", userName);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.debug("Tracking activity for user: {}", userName);
|
String path = requestContext.getUriInfo().getPath();
|
||||||
|
LOG.info("Tracking activity for user: {} on path: {}", userName, path);
|
||||||
UserActivityTracker.getInstance().trackActivity(userName);
|
UserActivityTracker.getInstance().trackActivity(userName);
|
||||||
|
LOG.debug("Successfully tracked activity for user: {}", userName);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Failed to track activity for user: {}", userName, e);
|
LOG.error("Failed to track activity for user: {}", userName, e);
|
||||||
}
|
}
|
||||||
|
@ -61,8 +61,9 @@ public class UserActivityTracker {
|
|||||||
private static volatile UserActivityTracker INSTANCE;
|
private static volatile UserActivityTracker INSTANCE;
|
||||||
|
|
||||||
private UserActivityTracker() {
|
private UserActivityTracker() {
|
||||||
this.minUpdateIntervalMs = 60000;
|
this.minUpdateIntervalMs = 60000; // 1 minute minimum between local cache updates for same user
|
||||||
this.batchUpdateIntervalSeconds = 30;
|
this.batchUpdateIntervalSeconds =
|
||||||
|
3600; // 1 hour batch flush to database (since we only care about daily)
|
||||||
int maxConcurrentDbOperations = 10;
|
int maxConcurrentDbOperations = 10;
|
||||||
this.dbOperationPermits = new Semaphore(maxConcurrentDbOperations);
|
this.dbOperationPermits = new Semaphore(maxConcurrentDbOperations);
|
||||||
}
|
}
|
||||||
@ -122,6 +123,7 @@ public class UserActivityTracker {
|
|||||||
try {
|
try {
|
||||||
UserActivity existing = localActivityCache.get(userName);
|
UserActivity existing = localActivityCache.get(userName);
|
||||||
if (existing != null && (currentTime - existing.lastLocalUpdate) < minUpdateIntervalMs) {
|
if (existing != null && (currentTime - existing.lastLocalUpdate) < minUpdateIntervalMs) {
|
||||||
|
LOG.trace("Skipping activity update for {} - too soon since last update", userName);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
@ -134,8 +136,10 @@ public class UserActivityTracker {
|
|||||||
userName,
|
userName,
|
||||||
(k, v) -> {
|
(k, v) -> {
|
||||||
if (v == null) {
|
if (v == null) {
|
||||||
|
LOG.debug("New activity tracked for user: {}", userName);
|
||||||
return new UserActivity(userName, currentTime, currentTime);
|
return new UserActivity(userName, currentTime, currentTime);
|
||||||
} else if ((currentTime - v.lastLocalUpdate) >= minUpdateIntervalMs) {
|
} else if ((currentTime - v.lastLocalUpdate) >= minUpdateIntervalMs) {
|
||||||
|
LOG.debug("Updating activity for user: {}", userName);
|
||||||
v.lastActivityTime = currentTime;
|
v.lastActivityTime = currentTime;
|
||||||
v.lastLocalUpdate = currentTime;
|
v.lastLocalUpdate = currentTime;
|
||||||
}
|
}
|
||||||
@ -146,18 +150,29 @@ public class UserActivityTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Force an immediate flush of cached activities to the database.
|
||||||
|
* Useful for testing or when shutting down.
|
||||||
|
*/
|
||||||
|
public void forceFlush() {
|
||||||
|
LOG.info("Force flushing user activity cache with {} entries", localActivityCache.size());
|
||||||
|
performBatchUpdate();
|
||||||
|
}
|
||||||
|
|
||||||
private void performBatchUpdate() {
|
private void performBatchUpdate() {
|
||||||
Map<String, Long> userActivityMap;
|
Map<String, Long> userActivityMap;
|
||||||
|
|
||||||
cacheLock.writeLock().lock();
|
cacheLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
if (localActivityCache.isEmpty()) {
|
if (localActivityCache.isEmpty()) {
|
||||||
|
LOG.trace("No activities to flush");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
userActivityMap = new HashMap<>();
|
userActivityMap = new HashMap<>();
|
||||||
localActivityCache.forEach(
|
localActivityCache.forEach(
|
||||||
(userName, activity) -> userActivityMap.put(userName, activity.lastActivityTime));
|
(userName, activity) -> userActivityMap.put(userName, activity.lastActivityTime));
|
||||||
|
LOG.info("Flushing {} user activities to database", userActivityMap.size());
|
||||||
localActivityCache.clear();
|
localActivityCache.clear();
|
||||||
} finally {
|
} finally {
|
||||||
cacheLock.writeLock().unlock();
|
cacheLock.writeLock().unlock();
|
||||||
|
@ -22,6 +22,7 @@ import static org.mockito.Mockito.doThrow;
|
|||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
import static org.openmetadata.service.util.TestUtils.simulateWork;
|
import static org.openmetadata.service.util.TestUtils.simulateWork;
|
||||||
|
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
@ -32,6 +33,7 @@ import org.junit.jupiter.api.Test;
|
|||||||
import org.mockito.MockedStatic;
|
import org.mockito.MockedStatic;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.openmetadata.service.Entity;
|
import org.openmetadata.service.Entity;
|
||||||
|
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||||
import org.openmetadata.service.jdbi3.UserRepository;
|
import org.openmetadata.service.jdbi3.UserRepository;
|
||||||
|
|
||||||
class UserActivityTrackerTest {
|
class UserActivityTrackerTest {
|
||||||
@ -232,4 +234,29 @@ class UserActivityTrackerTest {
|
|||||||
verify(mockUserRepository, times(1)).updateUsersLastActivityTimeBatch(anyMap());
|
verify(mockUserRepository, times(1)).updateUsersLastActivityTimeBatch(anyMap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testCountDailyActiveUsers() {
|
||||||
|
CollectionDAO.UserDAO mockUserDAO = mock(CollectionDAO.UserDAO.class);
|
||||||
|
when(mockUserRepository.getDao()).thenReturn(mockUserDAO);
|
||||||
|
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
long twentyFourHoursAgo = now - (24 * 60 * 60 * 1000);
|
||||||
|
long oneYearAgo = now - (365L * 24 * 60 * 60 * 1000);
|
||||||
|
|
||||||
|
when(mockUserDAO.countDailyActiveUsers(twentyFourHoursAgo)).thenReturn(0);
|
||||||
|
int count = mockUserDAO.countDailyActiveUsers(twentyFourHoursAgo);
|
||||||
|
assertEquals(0, count, "Should return 0 when no users are active");
|
||||||
|
|
||||||
|
when(mockUserDAO.countDailyActiveUsers(twentyFourHoursAgo)).thenReturn(5);
|
||||||
|
count = mockUserDAO.countDailyActiveUsers(twentyFourHoursAgo);
|
||||||
|
assertEquals(5, count, "Should return count of active users");
|
||||||
|
|
||||||
|
when(mockUserDAO.countDailyActiveUsers(oneYearAgo)).thenReturn(12);
|
||||||
|
count = mockUserDAO.countDailyActiveUsers(oneYearAgo);
|
||||||
|
assertEquals(12, count, "Should return all users when querying from far past");
|
||||||
|
|
||||||
|
verify(mockUserDAO, times(2)).countDailyActiveUsers(twentyFourHoursAgo);
|
||||||
|
verify(mockUserDAO, times(1)).countDailyActiveUsers(oneYearAgo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user