diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 004a958731c..20b5d2913b8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -5347,6 +5347,13 @@ public interface CollectionDAO { + "LIMIT 1", connectionType = POSTGRES) Long getMaxLastActivityTime(); + + @SqlQuery( + "SELECT COUNT(DISTINCT id) FROM user_entity " + + "WHERE isBot = false " + + "AND deleted = false " + + "AND lastActivityTime >= :since") + int countDailyActiveUsers(@Bind("since") long since); } interface ChangeEventDAO { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/UserMetricsServlet.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/UserMetricsServlet.java index 02d24920895..0fd1f8a1c67 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/UserMetricsServlet.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/UserMetricsServlet.java @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.servlet.http.HttpServlet; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; -import jakarta.ws.rs.core.Response; import java.io.IOException; import java.io.PrintWriter; import java.time.Instant; @@ -14,18 +13,13 @@ import java.util.List; import java.util.Map; import java.util.Set; import lombok.extern.slf4j.Slf4j; -import org.openmetadata.schema.dataInsight.DataInsightChartResult; -import org.openmetadata.schema.dataInsight.type.DailyActiveUsers; import org.openmetadata.schema.entity.teams.User; -import org.openmetadata.schema.type.DataReportIndex; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.Entity; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.UserRepository; -import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.util.EntityUtil; -import org.openmetadata.service.util.ResultList; /** * Servlet that exposes user metrics on the admin port. @@ -40,12 +34,10 @@ public class UserMetricsServlet extends HttpServlet { private static final String CONTENT_TYPE = "application/json; charset=utf-8"; private final ObjectMapper objectMapper = JsonUtils.getObjectMapper(); private transient UserRepository userRepository; - private transient SearchRepository searchRepository; @Override public void init() { userRepository = (UserRepository) Entity.getEntityRepository(Entity.USER); - searchRepository = Entity.getSearchRepository(); } @Override @@ -102,97 +94,15 @@ public class UserMetricsServlet extends HttpServlet { private Integer getDailyActiveUsers() { try { - Response response = fetchDailyActiveUsersResponse(); - return extractActiveUsersCount(response); + long twentyFourHoursAgo = System.currentTimeMillis() - (24 * 60 * 60 * 1000); + return ((CollectionDAO.UserDAO) userRepository.getDao()) + .countDailyActiveUsers(twentyFourHoursAgo); } catch (Exception e) { - LOG.warn("Could not fetch daily active users from analytics", e); + LOG.warn("Could not fetch daily active users from database", e); return 0; } } - private Response fetchDailyActiveUsersResponse() throws IOException { - long endTs = System.currentTimeMillis(); - long startTs = endTs - (24 * 60 * 60 * 1000); // 24 hours ago - - return searchRepository.listDataInsightChartResult( - startTs, - endTs, - null, - null, - DataInsightChartResult.DataInsightChartType.DAILY_ACTIVE_USERS, - 1, - 0, - null, - DataReportIndex.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA_INDEX.value()); - } - - private Integer extractActiveUsersCount(Response response) { - if (!isValidResponse(response)) { - LOG.debug("No daily active users data found for the last 24 hours"); - return 0; - } - - Object entity = response.getEntity(); - - // The API might return either DataInsightChartResult or ResultList directly - if (entity instanceof DataInsightChartResult) { - DataInsightChartResult chartResult = (DataInsightChartResult) entity; - if (!hasValidData(chartResult)) { - return 0; - } - return findMaxActiveUsers(chartResult.getData()); - } else if (entity instanceof ResultList) { - @SuppressWarnings("unchecked") - ResultList resultList = (ResultList) entity; - if (resultList.getData() == null || resultList.getData().isEmpty()) { - return 0; - } - return findMaxActiveUsers(resultList.getData()); - } - - LOG.debug( - "Unexpected response type from daily active users API: {}", entity.getClass().getName()); - return 0; - } - - private boolean isValidResponse(Response response) { - return response != null && response.getStatus() == 200 && response.getEntity() != null; - } - - private boolean hasValidData(DataInsightChartResult chartResult) { - return chartResult != null && chartResult.getData() != null && !chartResult.getData().isEmpty(); - } - - private Integer findMaxActiveUsers(List dataList) { - int maxActiveUsers = 0; - for (Object obj : dataList) { - Integer activeUsers = extractActiveUsersFromObject(obj); - if (activeUsers != null) { - maxActiveUsers = Math.max(maxActiveUsers, activeUsers); - } - } - return maxActiveUsers; - } - - private Integer extractActiveUsersFromObject(Object obj) { - if (obj instanceof DailyActiveUsers dailyActiveUsers) { - return dailyActiveUsers.getActiveUsers(); - } else if (obj instanceof Map) { - return extractActiveUsersFromMap((Map) obj); - } - return null; - } - - @SuppressWarnings("unchecked") - private Integer extractActiveUsersFromMap(Map map) { - Map dauMap = (Map) map; - Object activeUsersObj = dauMap.get("activeUsers"); - if (activeUsersObj instanceof Number number) { - return number.intValue(); - } - return null; - } - private String getLastUserActivity() { try { ListFilter nonBotFilter = createNonBotFilter(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/UserActivityFilter.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/UserActivityFilter.java index f994c68dcd2..232fe73f47c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/UserActivityFilter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/UserActivityFilter.java @@ -36,11 +36,15 @@ public class UserActivityFilter implements ContainerRequestFilter { public void filter(ContainerRequestContext requestContext) { SecurityContext securityContext = requestContext.getSecurityContext(); if (securityContext == null || securityContext.getUserPrincipal() == null) { + LOG.trace( + "No security context or principal, skipping activity tracking for path: {}", + requestContext.getUriInfo().getPath()); return; } String userName = securityContext.getUserPrincipal().getName(); if (userName == null || userName.isEmpty()) { + LOG.trace("No username found in principal, skipping activity tracking"); return; } @@ -50,8 +54,10 @@ public class UserActivityFilter implements ContainerRequestFilter { LOG.trace("User {} is a bot, skipping activity tracking", userName); return; } - LOG.debug("Tracking activity for user: {}", userName); + String path = requestContext.getUriInfo().getPath(); + LOG.info("Tracking activity for user: {} on path: {}", userName, path); UserActivityTracker.getInstance().trackActivity(userName); + LOG.debug("Successfully tracked activity for user: {}", userName); } catch (Exception e) { LOG.error("Failed to track activity for user: {}", userName, e); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/UserActivityTracker.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/UserActivityTracker.java index 2221d83a132..06ce98c4021 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/UserActivityTracker.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/UserActivityTracker.java @@ -61,8 +61,9 @@ public class UserActivityTracker { private static volatile UserActivityTracker INSTANCE; private UserActivityTracker() { - this.minUpdateIntervalMs = 60000; - this.batchUpdateIntervalSeconds = 30; + this.minUpdateIntervalMs = 60000; // 1 minute minimum between local cache updates for same user + this.batchUpdateIntervalSeconds = + 3600; // 1 hour batch flush to database (since we only care about daily) int maxConcurrentDbOperations = 10; this.dbOperationPermits = new Semaphore(maxConcurrentDbOperations); } @@ -122,6 +123,7 @@ public class UserActivityTracker { try { UserActivity existing = localActivityCache.get(userName); if (existing != null && (currentTime - existing.lastLocalUpdate) < minUpdateIntervalMs) { + LOG.trace("Skipping activity update for {} - too soon since last update", userName); return; } } finally { @@ -134,8 +136,10 @@ public class UserActivityTracker { userName, (k, v) -> { if (v == null) { + LOG.debug("New activity tracked for user: {}", userName); return new UserActivity(userName, currentTime, currentTime); } else if ((currentTime - v.lastLocalUpdate) >= minUpdateIntervalMs) { + LOG.debug("Updating activity for user: {}", userName); v.lastActivityTime = currentTime; v.lastLocalUpdate = currentTime; } @@ -146,18 +150,29 @@ public class UserActivityTracker { } } + /** + * Force an immediate flush of cached activities to the database. + * Useful for testing or when shutting down. + */ + public void forceFlush() { + LOG.info("Force flushing user activity cache with {} entries", localActivityCache.size()); + performBatchUpdate(); + } + private void performBatchUpdate() { Map userActivityMap; cacheLock.writeLock().lock(); try { if (localActivityCache.isEmpty()) { + LOG.trace("No activities to flush"); return; } userActivityMap = new HashMap<>(); localActivityCache.forEach( (userName, activity) -> userActivityMap.put(userName, activity.lastActivityTime)); + LOG.info("Flushing {} user activities to database", userActivityMap.size()); localActivityCache.clear(); } finally { cacheLock.writeLock().unlock(); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/security/auth/UserActivityTrackerTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/security/auth/UserActivityTrackerTest.java index caf1e1a3ab2..6f4b53d7ab1 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/security/auth/UserActivityTrackerTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/security/auth/UserActivityTrackerTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.openmetadata.service.util.TestUtils.simulateWork; import java.lang.reflect.Field; @@ -32,6 +33,7 @@ import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; import org.mockito.Mockito; import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.UserRepository; class UserActivityTrackerTest { @@ -232,4 +234,29 @@ class UserActivityTrackerTest { verify(mockUserRepository, times(1)).updateUsersLastActivityTimeBatch(anyMap()); } } + + @Test + void testCountDailyActiveUsers() { + CollectionDAO.UserDAO mockUserDAO = mock(CollectionDAO.UserDAO.class); + when(mockUserRepository.getDao()).thenReturn(mockUserDAO); + + long now = System.currentTimeMillis(); + long twentyFourHoursAgo = now - (24 * 60 * 60 * 1000); + long oneYearAgo = now - (365L * 24 * 60 * 60 * 1000); + + when(mockUserDAO.countDailyActiveUsers(twentyFourHoursAgo)).thenReturn(0); + int count = mockUserDAO.countDailyActiveUsers(twentyFourHoursAgo); + assertEquals(0, count, "Should return 0 when no users are active"); + + when(mockUserDAO.countDailyActiveUsers(twentyFourHoursAgo)).thenReturn(5); + count = mockUserDAO.countDailyActiveUsers(twentyFourHoursAgo); + assertEquals(5, count, "Should return count of active users"); + + when(mockUserDAO.countDailyActiveUsers(oneYearAgo)).thenReturn(12); + count = mockUserDAO.countDailyActiveUsers(oneYearAgo); + assertEquals(12, count, "Should return all users when querying from far past"); + + verify(mockUserDAO, times(2)).countDailyActiveUsers(twentyFourHoursAgo); + verify(mockUserDAO, times(1)).countDailyActiveUsers(oneYearAgo); + } }