diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java index 09b20375789..1c1787dc335 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java @@ -478,11 +478,105 @@ public interface CollectionDAO { @SqlQuery("SELECT json FROM thread_entity ORDER BY updatedAt DESC") List list(); + @SqlQuery("SELECT count(id) FROM thread_entity WHERE resolved = :resolved") + int listCount(@Bind("resolved") boolean resolved); + + @SqlQuery( + "SELECT json FROM thread_entity " + + "WHERE updatedAt > :before AND resolved = :resolved " + + "ORDER BY updatedAt DESC " + + "LIMIT :limit") + List listBefore(@Bind("limit") int limit, @Bind("before") long before, @Bind("resolved") boolean resolved); + + @SqlQuery( + "SELECT json FROM thread_entity " + + "WHERE updatedAt < :after AND resolved = :resolved " + + "ORDER BY updatedAt DESC " + + "LIMIT :limit") + List listAfter(@Bind("limit") int limit, @Bind("after") long after, @Bind("resolved") boolean resolved); + + @SqlQuery( + "SELECT json FROM thread_entity WHERE updatedAt > :before AND resolved = :resolved AND " + + "(entityId in (SELECT toId FROM entity_relationship WHERE " + + "((fromEntity='user' AND fromId= :userId) OR " + + "(fromEntity='team' AND fromId IN ())) AND relation=8) OR " + + "id in (SELECT toId FROM entity_relationship WHERE (fromEntity='user' AND fromId= :userId AND toEntity='THREAD' AND relation IN (1,2)))) " + + "ORDER BY updatedAt DESC " + + "LIMIT :limit") + List listThreadsByOwnerBefore( + @Bind("userId") String userId, + @BindList("teamIds") List teamIds, + @Bind("limit") int limit, + @Bind("before") long before, + @Bind("resolved") boolean resolved); + + @SqlQuery( + "SELECT json FROM thread_entity WHERE updatedAt < :after AND resolved = :resolved AND " + + "(entityId in (SELECT toId FROM entity_relationship WHERE " + + "((fromEntity='user' AND fromId= :userId) OR " + + "(fromEntity='team' AND fromId IN ())) AND relation=8) OR " + + "id in (SELECT toId FROM entity_relationship WHERE (fromEntity='user' AND fromId= :userId AND toEntity='THREAD' AND relation IN (1,2)))) " + + "ORDER BY updatedAt DESC " + + "LIMIT :limit") + List listThreadsByOwnerAfter( + @Bind("userId") String userId, + @BindList("teamIds") List teamIds, + @Bind("limit") int limit, + @Bind("after") long after, + @Bind("resolved") boolean resolved); + + @SqlQuery( + "SELECT count(id) FROM thread_entity WHERE resolved = :resolved AND " + + "(entityId in (SELECT toId FROM entity_relationship WHERE " + + "((fromEntity='user' AND fromId= :userId) OR " + + "(fromEntity='team' AND fromId IN ())) AND relation=8) OR " + + "id in (SELECT toId FROM entity_relationship WHERE (fromEntity='user' AND fromId= :userId AND toEntity='THREAD' AND relation IN (1,2)))) ") + int listCountThreadsByOwner( + @Bind("userId") String userId, @BindList("teamIds") List teamIds, @Bind("resolved") boolean resolved); + + @SqlQuery( + "SELECT json FROM thread_entity WHERE updatedAt > :before AND resolved = :resolved AND " + + "id in (SELECT fromFQN FROM field_relationship WHERE " + + "toFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType='THREAD' AND toType LIKE CONCAT(:toType, '%') AND relation= :relation) " + + "ORDER BY updatedAt DESC " + + "LIMIT :limit") + List listThreadsByEntityLinkBefore( + @Bind("fqnPrefix") String fqnPrefix, + @Bind("toType") String toType, + @Bind("limit") int limit, + @Bind("before") long before, + @Bind("resolved") boolean resolved, + @Bind("relation") int relation); + + @SqlQuery( + "SELECT json FROM thread_entity WHERE updatedAt < :after AND resolved = :resolved AND " + + "id in (SELECT fromFQN FROM field_relationship WHERE " + + "toFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType='THREAD' AND toType LIKE CONCAT(:toType, '%') AND relation= :relation) " + + "ORDER BY updatedAt DESC " + + "LIMIT :limit") + List listThreadsByEntityLinkAfter( + @Bind("fqnPrefix") String fqnPrefix, + @Bind("toType") String toType, + @Bind("limit") int limit, + @Bind("after") long after, + @Bind("resolved") boolean resolved, + @Bind("relation") int relation); + + @SqlQuery( + "SELECT count(id) FROM thread_entity WHERE resolved = :resolved AND " + + "id in (SELECT fromFQN FROM field_relationship WHERE " + + "toFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType='THREAD' AND toType LIKE CONCAT(:toType, '%') AND relation= :relation)") + int listCountThreadsByEntityLink( + @Bind("fqnPrefix") String fqnPrefix, + @Bind("toType") String toType, + @Bind("resolved") boolean resolved, + @Bind("relation") int relation); + @SqlUpdate("UPDATE thread_entity SET json = :json where id = :id") void update(@Bind("id") String id, @Bind("json") String json); @SqlQuery( - "SELECT entityLink, COUNT(*) count FROM field_relationship fr INNER JOIN thread_entity te ON fr.fromFQN=te.id " + "SELECT entityLink, COUNT(id) count FROM field_relationship fr INNER JOIN thread_entity te ON fr.fromFQN=te.id " + "WHERE fr.toFQN LIKE CONCAT(:fqnPrefix, '%') AND fr.toType like concat(:toType, '%') AND fr.fromType = :fromType " + "AND fr.relation = :relation AND te.resolved= :isResolved " + "GROUP BY entityLink") @@ -495,29 +589,98 @@ public interface CollectionDAO { @Bind("isResolved") boolean isResolved); @SqlQuery( - "SELECT entityLink, COUNT(*) count FROM thread_entity WHERE (id IN ()) " + "SELECT entityLink, COUNT(id) count FROM thread_entity WHERE resolved = :resolved AND " + + "id in (SELECT toId FROM entity_relationship WHERE " + + "(((fromEntity='user' AND fromId= :userId) OR " + + "(fromEntity='team' AND fromId IN ())) AND relation=8) OR " + + "(fromEntity='user' AND fromId= :userId AND toEntity='THREAD' AND relation IN (1,2))) " + + "GROUP BY entityLink") + @RegisterRowMapper(CountFieldMapper.class) + List> listCountByOwner( + @Bind("userId") String userId, @BindList("teamIds") List teamIds, @Bind("resolved") boolean resolved); + + @SqlQuery( + "SELECT entityLink, COUNT(id) count FROM thread_entity WHERE (id IN ()) " + "AND resolved= :isResolved GROUP BY entityLink") @RegisterRowMapper(CountFieldMapper.class) List> listCountByThreads( @BindList("threadIds") List threadIds, @Bind("isResolved") boolean isResolved); @SqlQuery( - "SELECT id FROM thread_entity WHERE entityId in (" + "SELECT json FROM thread_entity WHERE updatedAt > :before AND resolved = :resolved AND entityId in (" + + "SELECT toId FROM entity_relationship WHERE " + + "((fromEntity='user' AND fromId= :userId) OR " + + "(fromEntity='team' AND fromId IN ())) AND relation= :relation) " + + "ORDER BY updatedAt DESC " + + "LIMIT :limit") + List listThreadsByFollowsBefore( + @Bind("userId") String userId, + @Bind("limit") int limit, + @Bind("before") long before, + @Bind("resolved") boolean resolved, + @Bind("relation") int relation); + + @SqlQuery( + "SELECT json FROM thread_entity WHERE updatedAt < :after AND resolved = :resolved AND entityId in (" + + "SELECT toId FROM entity_relationship WHERE " + + "((fromEntity='user' AND fromId= :userId) OR " + + "(fromEntity='team' AND fromId IN ())) AND relation= :relation) " + + "ORDER BY updatedAt DESC " + + "LIMIT :limit") + List listThreadsByFollowsAfter( + @Bind("userId") String userId, + @Bind("limit") int limit, + @Bind("after") long after, + @Bind("resolved") boolean resolved, + @Bind("relation") int relation); + + @SqlQuery( + "SELECT count(id) FROM thread_entity WHERE resolved = :resolved AND entityId in (" + "SELECT toId FROM entity_relationship WHERE " + "((fromEntity='user' AND fromId= :userId) OR " + "(fromEntity='team' AND fromId IN ())) AND relation= :relation)") - List listUserThreadsFromER( - @Bind("userId") String userId, @BindList("teamIds") List teamIds, @Bind("relation") int relation); + int listCountThreadsByFollows( + @Bind("userId") String userId, @Bind("resolved") boolean resolved, @Bind("relation") int relation); @SqlQuery( - "SELECT id FROM thread_entity WHERE id in (" + "SELECT json FROM thread_entity WHERE updatedAt > :before AND resolved = :resolved AND id in (" + "SELECT toFQN FROM field_relationship WHERE " + "((fromType='user' AND fromFQN= :userName) OR " - + "(fromType='team' AND fromFQN IN ())) AND toType = :toType AND relation = :relation)") - List listUserThreadsFromFR( + + "(fromType='team' AND fromFQN IN ())) AND toType='THREAD' AND relation= :relation) " + + "ORDER BY updatedAt DESC " + + "LIMIT :limit") + List listThreadsByMentionsBefore( @Bind("userName") String userName, @BindList("teamNames") List teamNames, - @Bind("toType") String toType, + @Bind("limit") int limit, + @Bind("before") long before, + @Bind("resolved") boolean resolved, + @Bind("relation") int relation); + + @SqlQuery( + "SELECT json FROM thread_entity WHERE updatedAt < :after AND resolved = :resolved AND id in (" + + "SELECT toFQN FROM field_relationship WHERE " + + "((fromType='user' AND fromFQN= :userName) OR " + + "(fromType='team' AND fromFQN IN ())) AND toType='THREAD' AND relation= :relation) " + + "ORDER BY updatedAt DESC " + + "LIMIT :limit") + List listThreadsByMentionsAfter( + @Bind("userName") String userName, + @BindList("teamNames") List teamNames, + @Bind("limit") int limit, + @Bind("after") long after, + @Bind("resolved") boolean resolved, + @Bind("relation") int relation); + + @SqlQuery( + "SELECT count(id) FROM thread_entity WHERE resolved = :resolved AND id in (" + + "SELECT toFQN FROM field_relationship WHERE " + + "((fromType='user' AND fromFQN= :userName) OR " + + "(fromType='team' AND fromFQN IN ())) AND toType='THREAD' AND relation= :relation) ") + int listCountThreadsByMentions( + @Bind("userName") String userName, + @BindList("teamNames") List teamNames, + @Bind("resolved") boolean resolved, @Bind("relation") int relation); class CountFieldMapper implements RowMapper> { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FeedRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FeedRepository.java index ea4b237c358..0fc5efbe531 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FeedRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FeedRepository.java @@ -18,10 +18,8 @@ import java.io.IOException; import java.text.ParseException; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashSet; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -50,6 +48,7 @@ import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.RestUtil; import org.openmetadata.catalog.util.RestUtil.DeleteResponse; import org.openmetadata.catalog.util.RestUtil.PatchResponse; +import org.openmetadata.catalog.util.ResultList; @Slf4j public class FeedRepository { @@ -65,6 +64,11 @@ public class FeedRepository { FOLLOWS } + public enum PaginationType { + BEFORE, + AFTER + } + @Transaction public Thread create(Thread thread) throws IOException, ParseException { String createdBy = thread.getCreatedBy(); @@ -226,9 +230,15 @@ public class FeedRepository { } else { EntityLink entityLink = EntityLink.parse(link); EntityReference reference = EntityUtil.validateEntityLink(entityLink); - if (reference.getType().equals(Entity.USER)) { - List threadIds = getThreadIdsByOwner(reference.getId().toString()); - result = dao.feedDAO().listCountByThreads(threadIds, isResolved); + if (reference.getType().equals(Entity.USER) || reference.getType().equals(Entity.TEAM)) { + if (reference.getType().equals(Entity.USER)) { + String userId = reference.getId().toString(); + List teamIds = getTeamIds(userId); + result = dao.feedDAO().listCountByOwner(userId, teamIds, isResolved); + } else { + // team is not supported + result = new ArrayList<>(); + } } else { result = dao.feedDAO() @@ -256,72 +266,133 @@ public class FeedRepository { return thread.getPosts(); } + /** + * List threads based on the filters and limits in the order of the updated timestamp. + * + * @param link entity link filter + * @param limitPosts the number of posts to limit per thread + * @param userId UUID of the user. Enables UserId filter + * @param filterType Type of the filter to be applied with userId filter + * @param limit the number of threads to limit in the response + * @param pageMarker the before/after updatedTime to be used as pagination marker for queries + * @param isResolved whether the thread is resolved or open + * @param paginationType before or after + * @return a list of threads as ResultList + * @throws IOException on error + * @throws ParseException on error + */ @Transaction - public List listThreads(String link, int limitPosts, String userId, FilterType filterType) - throws IOException { - List threads = new ArrayList<>(); + public final ResultList list( + String link, + int limitPosts, + String userId, + FilterType filterType, + int limit, + String pageMarker, + boolean isResolved, + PaginationType paginationType) + throws IOException, ParseException { + List threads; + int total; + // Here updatedAt time is used for page marker since threads are sorted by last update time + long time = Long.MAX_VALUE; + // if paginationType is "before", it must have a pageMarker time. + // "after" could be null to get the first page. In this case we set time to MAX_VALUE + // to get any entry with updatedTime < MAX_VALUE + if (pageMarker != null) { + time = Long.parseLong(RestUtil.decodeCursor(pageMarker)); + } + + // No filters are enabled. Listing all the threads if (link == null && userId == null) { - // No filters are enabled. Listing all the threads - threads = JsonUtils.readObjects(dao.feedDAO().list(), Thread.class); + // Get one extra result used for computing before cursor + List jsons; + if (paginationType == PaginationType.BEFORE) { + jsons = dao.feedDAO().listBefore(limit + 1, time, isResolved); + } else { + jsons = dao.feedDAO().listAfter(limit + 1, time, isResolved); + } + threads = JsonUtils.readObjects(jsons, Thread.class); + total = dao.feedDAO().listCount(isResolved); } else { // Either one or both the filters are enabled - List threadIds = new ArrayList<>(); + // we don't support both the filters together. If both are not null, entity link takes precedence if (link != null) { EntityLink entityLink = EntityLink.parse(link); EntityReference reference = EntityUtil.validateEntityLink(entityLink); - List> result; // For a user entityLink get created or replied relationships to the thread if (reference.getType().equals(Entity.USER)) { - threadIds.addAll(getThreadIdsByOwner(reference.getId().toString())); + FilteredThreads filteredThreads = + getThreadsByOwner(reference.getId().toString(), limit + 1, time, isResolved, paginationType); + threads = filteredThreads.getThreads(); + total = filteredThreads.getTotalCount(); } else { // Only data assets are added as about - result = - dao.fieldRelationshipDAO() - .listFromByAllPrefix( - entityLink.getFullyQualifiedFieldValue(), - Entity.THREAD, - entityLink.getFullyQualifiedFieldType(), - Relationship.IS_ABOUT.ordinal()); - result.forEach(l -> threadIds.add(l.get(1))); - } - } - - if (userId != null) { - List userThreadIds; - if (filterType == FilterType.FOLLOWS) { - userThreadIds = getThreadIdsByFollows(userId); - } else if (filterType == FilterType.MENTIONS) { - userThreadIds = getThreadIdsByMentions(userId); - } else { - userThreadIds = getThreadIdsByOwner(userId); - } - - // if both link and user filters are enabled, the filters should be applied as "AND" - if (!threadIds.isEmpty()) { - // apply user filter on top of the link filter - if (!userThreadIds.isEmpty()) { - userThreadIds = userThreadIds.stream().filter(threadIds::contains).collect(Collectors.toList()); + List jsons; + if (paginationType == PaginationType.BEFORE) { + jsons = + dao.feedDAO() + .listThreadsByEntityLinkBefore( + entityLink.getFullyQualifiedFieldValue(), + entityLink.getFullyQualifiedFieldType(), + limit + 1, + time, + isResolved, + Relationship.IS_ABOUT.ordinal()); + } else { + jsons = + dao.feedDAO() + .listThreadsByEntityLinkAfter( + entityLink.getFullyQualifiedFieldValue(), + entityLink.getFullyQualifiedFieldType(), + limit + 1, + time, + isResolved, + Relationship.IS_ABOUT.ordinal()); } + threads = JsonUtils.readObjects(jsons, Thread.class); + total = + dao.feedDAO() + .listCountThreadsByEntityLink( + entityLink.getFullyQualifiedFieldValue(), + entityLink.getFullyQualifiedFieldType(), + isResolved, + Relationship.IS_ABOUT.ordinal()); } - - threadIds.addAll(userThreadIds); - } - - Set uniqueValues = new HashSet<>(); - for (String t : threadIds) { - // If an entity has multiple relationships (created, mentioned, repliedTo etc.) to the same thread - // Don't send duplicated copies of the thread in response - if (uniqueValues.add(t)) { - threads.add(EntityUtil.validate(t, dao.feedDAO().findById(t), Thread.class)); + } else { + FilteredThreads filteredThreads; + if (filterType == FilterType.FOLLOWS) { + filteredThreads = getThreadsByFollows(userId, limit + 1, time, isResolved, paginationType); + } else if (filterType == FilterType.MENTIONS) { + filteredThreads = getThreadsByMentions(userId, limit + 1, time, isResolved, paginationType); + } else { + filteredThreads = getThreadsByOwner(userId, limit + 1, time, isResolved, paginationType); } + threads = filteredThreads.getThreads(); + total = filteredThreads.getTotalCount(); } - - // sort the list by thread updated timestamp before returning - threads.sort(Comparator.comparing(Thread::getUpdatedAt, Comparator.reverseOrder())); } - return limitPostsInThreads(threads, limitPosts); + + limitPostsInThreads(threads, limitPosts); + + String beforeCursor = null; + String afterCursor = null; + if (paginationType == PaginationType.BEFORE) { + if (threads.size() > limit) { // If extra result exists, then previous page exists - return before cursor + threads.remove(0); + beforeCursor = threads.get(0).getUpdatedAt().toString(); + } + afterCursor = threads.get(threads.size() - 1).getUpdatedAt().toString(); + } else { + beforeCursor = pageMarker == null ? null : threads.get(0).getUpdatedAt().toString(); + if (threads.size() > limit) { // If extra result exists, then next page exists - return after cursor + threads.remove(limit); + afterCursor = threads.get(limit - 1).getUpdatedAt().toString(); + } + } + return new ResultList<>(threads, beforeCursor, afterCursor, total); } @Transaction @@ -365,7 +436,13 @@ public class FeedRepository { return original.getResolved() != updated.getResolved() || !original.getMessage().equals(updated.getMessage()); } - private List limitPostsInThreads(List threads, int limitPosts) { + /** + * Limit the number of posts within each thread. + * + * @param threads list of threads + * @param limitPosts the number of posts to limit per thread + */ + private void limitPostsInThreads(List threads, int limitPosts) { for (Thread t : threads) { List posts = t.getPosts(); if (posts.size() > limitPosts) { @@ -375,26 +452,62 @@ public class FeedRepository { t.withPosts(posts); } } - return threads; } - private List getThreadIdsByOwner(String userId) { - List threadIds = new ArrayList<>(); + /** + * Return the threads associated with user/team owned entities and the threads that were created by or replied to by + * the user. + * + * @param userId UUID of the user + * @param limit number of threads to limit + * @param time updatedTime before/after which the results should be filtered for pagination + * @param isResolved whether the thread is resolved or open + * @param paginationType before or after + * @return a list of threads and the total count of threads + * @throws IOException on error + */ + private FilteredThreads getThreadsByOwner( + String userId, int limit, long time, boolean isResolved, PaginationType paginationType) throws IOException { // add threads on user or team owned entities + // and threads created by or replied to by the user + List teamIds = getTeamIds(userId); + List jsons; + if (paginationType == PaginationType.BEFORE) { + jsons = dao.feedDAO().listThreadsByOwnerBefore(userId, teamIds, limit, time, isResolved); + } else { + jsons = dao.feedDAO().listThreadsByOwnerAfter(userId, teamIds, limit, time, isResolved); + } + List threads = JsonUtils.readObjects(jsons, Thread.class); + int totalCount = dao.feedDAO().listCountThreadsByOwner(userId, teamIds, isResolved); + return new FilteredThreads(threads, totalCount); + } + + /** + * Get a list of team ids that the given user is a part of. + * + * @param userId UUID of the user. + * @return list of team ids. + */ + private List getTeamIds(String userId) { List teamIds = dao.relationshipDAO().findFrom(userId, Entity.USER, Relationship.HAS.ordinal(), Entity.TEAM); if (teamIds.isEmpty()) { teamIds = List.of(StringUtils.EMPTY); } - threadIds.addAll(dao.feedDAO().listUserThreadsFromER(userId, teamIds, Relationship.OWNS.ordinal())); - - // add threads created by or replied to by the user - threadIds.addAll(dao.relationshipDAO().findTo(userId, Entity.USER, Relationship.CREATED.ordinal(), Entity.THREAD)); - threadIds.addAll( - dao.relationshipDAO().findTo(userId, Entity.USER, Relationship.REPLIED_TO.ordinal(), Entity.THREAD)); - return threadIds; + return teamIds; } - - private List getThreadIdsByMentions(String userId) throws IOException { + /** + * Returns the threads where the user or the team they belong to were mentioned by other users with @mention. + * + * @param userId UUID of the user + * @param limit number of threads to limit + * @param time updatedTime before/after which the results should be filtered for pagination + * @param isResolved whether the thread is resolved or open + * @param paginationType before or after + * @return a list of threads and the total count of threads + * @throws IOException on error + */ + private FilteredThreads getThreadsByMentions( + String userId, int limit, long time, boolean isResolved, PaginationType paginationType) throws IOException { List teams = EntityUtil.populateEntityReferences( dao.relationshipDAO().findFromEntity(userId, Entity.USER, Relationship.HAS.ordinal(), Entity.TEAM)); @@ -404,13 +517,65 @@ public class FeedRepository { } User user = dao.userDAO().findEntityById(UUID.fromString(userId)); - // Return all the thread ids where the user or team was mentioned - return new ArrayList<>( + // Return the threads where the user or team was mentioned + List jsons; + if (paginationType == PaginationType.BEFORE) { + jsons = + dao.feedDAO() + .listThreadsByMentionsBefore( + user.getName(), teamNames, limit, time, isResolved, Relationship.MENTIONED_IN.ordinal()); + } else { + jsons = + dao.feedDAO() + .listThreadsByMentionsAfter( + user.getName(), teamNames, limit, time, isResolved, Relationship.MENTIONED_IN.ordinal()); + } + List threads = JsonUtils.readObjects(jsons, Thread.class); + int totalCount = dao.feedDAO() - .listUserThreadsFromFR(user.getName(), teamNames, Entity.THREAD, Relationship.MENTIONED_IN.ordinal())); + .listCountThreadsByMentions(user.getName(), teamNames, isResolved, Relationship.MENTIONED_IN.ordinal()); + return new FilteredThreads(threads, totalCount); } - private List getThreadIdsByFollows(String userId) { - return dao.feedDAO().listUserThreadsFromER(userId, List.of(StringUtils.EMPTY), Relationship.FOLLOWS.ordinal()); + /** + * Returns the threads that are associated with the entities followed by the user. + * + * @param userId UUID of the user + * @param limit number of threads to limit + * @param time updatedTime before/after which the results should be filtered for pagination + * @param isResolved whether the thread is resolved or open + * @param paginationType before or after + * @return a list of threads and the total count of threads + * @throws IOException on error + */ + private FilteredThreads getThreadsByFollows( + String userId, int limit, long time, boolean isResolved, PaginationType paginationType) throws IOException { + List jsons; + if (paginationType == PaginationType.BEFORE) { + jsons = dao.feedDAO().listThreadsByFollowsBefore(userId, limit, time, isResolved, Relationship.FOLLOWS.ordinal()); + } else { + jsons = dao.feedDAO().listThreadsByFollowsAfter(userId, limit, time, isResolved, Relationship.FOLLOWS.ordinal()); + } + List threads = JsonUtils.readObjects(jsons, Thread.class); + int totalCount = dao.feedDAO().listCountThreadsByFollows(userId, isResolved, Relationship.FOLLOWS.ordinal()); + return new FilteredThreads(threads, totalCount); + } + + public static class FilteredThreads { + List threads; + int totalCount; + + public FilteredThreads(List threads, int totalCount) { + this.threads = threads; + this.totalCount = totalCount; + } + + public List getThreads() { + return threads; + } + + public int getTotalCount() { + return totalCount; + } } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/feeds/FeedResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/feeds/FeedResource.java index 6c6b149f78a..2cdb8522cf7 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/feeds/FeedResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/feeds/FeedResource.java @@ -56,6 +56,7 @@ import org.openmetadata.catalog.entity.feed.Thread; import org.openmetadata.catalog.jdbi3.CollectionDAO; import org.openmetadata.catalog.jdbi3.FeedRepository; import org.openmetadata.catalog.jdbi3.FeedRepository.FilterType; +import org.openmetadata.catalog.jdbi3.FeedRepository.PaginationType; import org.openmetadata.catalog.resources.Collection; import org.openmetadata.catalog.security.Authorizer; import org.openmetadata.catalog.security.SecurityUtil; @@ -130,7 +131,7 @@ public class FeedResource { description = "List of threads", content = @Content(mediaType = "application/json", schema = @Schema(implementation = ThreadList.class))) }) - public ThreadList list( + public ResultList list( @Context UriInfo uriInfo, @Parameter( description = "Limit the number of posts sorted by chronological order (1 to 1000000, default = 3)", @@ -140,6 +141,18 @@ public class FeedResource { @DefaultValue("3") @QueryParam("limitPosts") int limitPosts, + @Parameter(description = "Limit the number of threads returned. (1 to 1000000, default = 10)") + @DefaultValue("10") + @Min(1) + @Max(1000000) + @QueryParam("limit") + int limitParam, + @Parameter(description = "Returns list of threads before this cursor", schema = @Schema(type = "string")) + @QueryParam("before") + String before, + @Parameter(description = "Returns list of threads after this cursor", schema = @Schema(type = "string")) + @QueryParam("after") + String after, @Parameter( description = "Filter threads by entity link", schema = @Schema(type = "string", example = "")) @@ -147,7 +160,7 @@ public class FeedResource { String entityLink, @Parameter( description = - "Filter threads by user id. This filter requires a 'filterType' query param. The default filter type is 'OWNER'", + "Filter threads by user id. This filter requires a 'filterType' query param. The default filter type is 'OWNER'. This filter cannot be combined with the entityLink filter.", schema = @Schema(type = "string")) @QueryParam("userId") String userId, @@ -156,9 +169,24 @@ public class FeedResource { "Filter type definition for the user filter. It can take one of 'OWNER', 'FOLLOWS', 'MENTIONS'. This must be used with the 'user' query param", schema = @Schema(implementation = FilterType.class)) @QueryParam("filterType") - FilterType filterType) - throws IOException { - return new ThreadList(addHref(uriInfo, dao.listThreads(entityLink, limitPosts, userId, filterType))); + FilterType filterType, + @Parameter(description = "Filter threads by whether they are resolved or not. By default resolved is false") + @DefaultValue("false") + @QueryParam("resolved") + boolean resolved) + throws IOException, ParseException { + RestUtil.validateCursors(before, after); + + ResultList threads; + if (before != null) { // Reverse paging + threads = + dao.list(entityLink, limitPosts, userId, filterType, limitParam, before, resolved, PaginationType.BEFORE); + } else { // Forward paging or first page + threads = dao.list(entityLink, limitPosts, userId, filterType, limitParam, after, resolved, PaginationType.AFTER); + } + threads.getData().forEach(thread -> addHref(uriInfo, thread)); + + return threads; } @GET diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/teams/TeamResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/teams/TeamResource.java index 522ac3c4a6f..99ab7774959 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/teams/TeamResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/teams/TeamResource.java @@ -121,16 +121,16 @@ public class TeamResource extends EntityResource { schema = @Schema(type = "string", example = FIELDS)) @QueryParam("fields") String fieldsParam, - @Parameter(description = "Limit the number tables returned. (1 to 1000000, default = 10)") + @Parameter(description = "Limit the number of teams returned. (1 to 1000000, default = 10)") @DefaultValue("10") @Min(0) @Max(1000000) @QueryParam("limit") int limitParam, - @Parameter(description = "Returns list of tables before this cursor", schema = @Schema(type = "string")) + @Parameter(description = "Returns list of teams before this cursor", schema = @Schema(type = "string")) @QueryParam("before") String before, - @Parameter(description = "Returns list of tables after this cursor", schema = @Schema(type = "string")) + @Parameter(description = "Returns list of teams after this cursor", schema = @Schema(type = "string")) @QueryParam("after") String after, @Parameter( diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java index 51d231296aa..a42f2540ca5 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java @@ -19,6 +19,7 @@ import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; import static org.openmetadata.catalog.exception.CatalogExceptionMessage.noPermission; @@ -39,6 +40,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Stream; import javax.json.JsonPatch; import javax.ws.rs.client.WebTarget; import lombok.extern.slf4j.Slf4j; @@ -49,6 +51,10 @@ import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.NullSource; import org.openmetadata.catalog.CatalogApplicationTest; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.api.data.CreateTable; @@ -194,12 +200,13 @@ public class FeedResourceTest extends CatalogApplicationTest { @Test void post_validThreadAndList_200(TestInfo test) throws IOException { - int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getData().size(); - int userThreadCount = listThreads(USER_LINK, null, ADMIN_AUTH_HEADERS).getData().size(); - int teamThreadCount = listThreads(TEAM_LINK, null, ADMIN_AUTH_HEADERS).getData().size(); - int tableThreadCount = listThreads(TABLE_LINK, null, ADMIN_AUTH_HEADERS).getData().size(); - int tableDescriptionThreadCount = listThreads(TABLE_DESCRIPTION_LINK, null, ADMIN_AUTH_HEADERS).getData().size(); - int tableColumnDescriptionThreadCount = listThreads(TABLE_COLUMN_LINK, null, ADMIN_AUTH_HEADERS).getData().size(); + int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getPaging().getTotal(); + int userThreadCount = listThreads(USER_LINK, null, ADMIN_AUTH_HEADERS).getPaging().getTotal(); + int tableThreadCount = listThreads(TABLE_LINK, null, ADMIN_AUTH_HEADERS).getPaging().getTotal(); + int tableDescriptionThreadCount = + listThreads(TABLE_DESCRIPTION_LINK, null, ADMIN_AUTH_HEADERS).getPaging().getTotal(); + int tableColumnDescriptionThreadCount = + listThreads(TABLE_COLUMN_LINK, null, ADMIN_AUTH_HEADERS).getPaging().getTotal(); CreateThread create = create() @@ -217,45 +224,51 @@ public class FeedResourceTest extends CatalogApplicationTest { for (int i = 0; i < 10; i++) { createAndCheck(create, userAuthHeaders); // List all the threads and make sure the number of threads increased by 1 - assertEquals(++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getData().size()); // Mentioned user - // TODO: There is no support for team mentions yet. - // assertEquals(++teamThreadCount, listThreads(TEAM_LINK, null, userAuthHeaders).getData().size()); // Mentioned - // team - assertEquals(++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getData().size()); // About TABLE - assertEquals(++totalThreadCount, listThreads(null, null, userAuthHeaders).getData().size()); // Overall threads + assertEquals( + ++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getPaging().getTotal()); // Mentioned user + assertEquals( + ++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE + assertEquals( + ++totalThreadCount, listThreads(null, null, userAuthHeaders).getPaging().getTotal()); // Overall threads } // List threads should not include mentioned entities // It should only include threads which are about the entity link assertEquals( tableDescriptionThreadCount, - listThreads(TABLE_DESCRIPTION_LINK, null, userAuthHeaders).getData().size()); // About TABLE Description + listThreads(TABLE_DESCRIPTION_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE Description assertEquals( tableColumnDescriptionThreadCount, - listThreads(TABLE_COLUMN_LINK, null, userAuthHeaders).getData().size()); // About TABLE Column Description + listThreads(TABLE_COLUMN_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE Column Description create.withAbout(TABLE_DESCRIPTION_LINK); for (int i = 0; i < 10; i++) { createAndCheck(create, userAuthHeaders); // List all the threads and make sure the number of threads increased by 1 - assertEquals(++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getData().size()); // Mentioned user - assertEquals(++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getData().size()); // About TABLE + assertEquals( + ++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getPaging().getTotal()); // Mentioned user + assertEquals( + ++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE assertEquals( ++tableDescriptionThreadCount, - listThreads(TABLE_DESCRIPTION_LINK, null, userAuthHeaders).getData().size()); // About TABLE Description - assertEquals(++totalThreadCount, listThreads(null, null, userAuthHeaders).getData().size()); // Overall threads + listThreads(TABLE_DESCRIPTION_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE Description + assertEquals( + ++totalThreadCount, listThreads(null, null, userAuthHeaders).getPaging().getTotal()); // Overall threads } create.withAbout(TABLE_COLUMN_LINK); for (int i = 0; i < 10; i++) { createAndCheck(create, userAuthHeaders); // List all the threads and make sure the number of threads increased by 1 - assertEquals(++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getData().size()); // Mentioned user - assertEquals(++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getData().size()); // About TABLE + assertEquals( + ++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getPaging().getTotal()); // Mentioned user + assertEquals( + ++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE assertEquals( ++tableColumnDescriptionThreadCount, - listThreads(TABLE_COLUMN_LINK, null, userAuthHeaders).getData().size()); // About TABLE Description - assertEquals(++totalThreadCount, listThreads(null, null, userAuthHeaders).getData().size()); // Overall threads + listThreads(TABLE_COLUMN_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE Description + assertEquals( + ++totalThreadCount, listThreads(null, null, userAuthHeaders).getPaging().getTotal()); // Overall threads } // Test the /api/v1/feed/count API @@ -264,6 +277,72 @@ public class FeedResourceTest extends CatalogApplicationTest { assertEquals(tableColumnDescriptionThreadCount, getThreadCount(TABLE_COLUMN_LINK, userAuthHeaders)); } + private static Stream provideStringsForListThreads() { + return Stream.of( + Arguments.of(String.format("<#E/%s/%s>", Entity.USER, USER.getName())), + Arguments.of(String.format("<#E/%s/%s>", Entity.TABLE, TABLE.getFullyQualifiedName()))); + } + + @ParameterizedTest + @NullSource + @MethodSource("provideStringsForListThreads") + void get_listThreadsWithPagination(String entityLink) throws HttpResponseException { + // Create 10 threads + int totalThreadCount = listThreads(entityLink, null, ADMIN_AUTH_HEADERS).getPaging().getTotal(); + Map userAuthHeaders = authHeaders(USER.getEmail()); + for (int i = 1; i <= 10; i++) { + CreateThread create = create().withMessage("Thread " + i); + createAndCheck(create, userAuthHeaders); + // List all the threads and make sure the number of threads increased by 1 + assertEquals(++totalThreadCount, listThreads(entityLink, null, userAuthHeaders).getPaging().getTotal()); + } + // Now test if there are n number of pages with limit set to 5. (n = totalThreadCount / 5) + int limit = 5; + int totalPages = totalThreadCount / limit; + int lastPageCount; + if (totalThreadCount % limit != 0) { + totalPages++; + lastPageCount = totalThreadCount % limit; + } else { + lastPageCount = limit; + } + + // Get the first page + ThreadList threads = listThreads(entityLink, null, userAuthHeaders, limit, null, null); + assertEquals(limit, threads.getData().size()); + assertEquals(totalThreadCount, threads.getPaging().getTotal()); + assertNotNull(threads.getPaging().getAfter()); + assertNull(threads.getPaging().getBefore()); + String afterCursor = threads.getPaging().getAfter(); + String beforeCursor = null; + int pageCount = 1; + + // From the second page till last page, after and before cursors should not be null + while (afterCursor != null && pageCount < totalPages - 1) { + threads = listThreads(entityLink, null, userAuthHeaders, limit, null, afterCursor); + assertNotNull(threads.getPaging().getAfter()); + assertNotNull(threads.getPaging().getBefore()); + pageCount++; + afterCursor = threads.getPaging().getAfter(); + if (pageCount == 2) { + beforeCursor = threads.getPaging().getBefore(); + } + } + assertEquals(totalPages - 1, pageCount); + + // Get the last page + threads = listThreads(entityLink, null, userAuthHeaders, limit, null, afterCursor); + assertEquals(lastPageCount, threads.getData().size()); + assertNull(threads.getPaging().getAfter()); + + // beforeCursor should point to the first page + threads = listThreads(entityLink, null, userAuthHeaders, limit, beforeCursor, null); + assertEquals(limit, threads.getData().size()); + // since threads are always returned to the order of updated timestamp + // the first message should read "Thread 10" + assertEquals("Thread 10", threads.getData().get(0).getMessage()); + } + @Test void post_addPostWithoutMessage_4xx() { // Add post to a thread without message field @@ -381,9 +460,11 @@ public class FeedResourceTest extends CatalogApplicationTest { @Test void list_threadsWithOwnerFilter() throws HttpResponseException { // THREAD is created with TABLE entity in BeforeAll - int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getData().size(); + int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getPaging().getTotal(); int user2ThreadCount = - listThreadsWithFilter(USER2.getId().toString(), FilterType.OWNER.toString(), AUTH_HEADERS).getData().size(); + listThreadsWithFilter(USER2.getId().toString(), FilterType.OWNER.toString(), AUTH_HEADERS) + .getPaging() + .getTotal(); String ownerId = TABLE.getOwner().getId().toString(); // create another thread on an entity with a different owner @@ -397,20 +478,21 @@ public class FeedResourceTest extends CatalogApplicationTest { assertNotEquals(ownerId, ownerId2); ThreadList threads = listThreadsWithFilter(ownerId, FilterType.OWNER.toString(), AUTH_HEADERS); - assertEquals(totalThreadCount, threads.getData().size()); + assertEquals(totalThreadCount, threads.getPaging().getTotal()); // This should return 0 since the table is owned by a team // and for the filter we are passing team id instead of user id threads = listThreadsWithFilter(ownerId2, FilterType.OWNER.toString(), AUTH_HEADERS); + assertEquals(0, threads.getPaging().getTotal()); assertEquals(0, threads.getData().size()); // Now, test the filter with user who is part of the team threads = listThreadsWithFilter(USER2.getId().toString(), FilterType.OWNER.toString(), AUTH_HEADERS); - assertEquals(user2ThreadCount + 1, threads.getData().size()); + assertEquals(user2ThreadCount + 1, threads.getPaging().getTotal()); // Test if no user id filter returns all threads threads = listThreadsWithFilter(null, FilterType.OWNER.toString(), AUTH_HEADERS); - assertEquals(totalThreadCount + 1, threads.getData().size()); + assertEquals(totalThreadCount + 1, threads.getPaging().getTotal()); } @Test @@ -433,7 +515,7 @@ public class FeedResourceTest extends CatalogApplicationTest { addPostAndCheck(thread, createPost, ADMIN_AUTH_HEADERS); ThreadList threads = listThreadsWithFilter(USER.getId().toString(), FilterType.MENTIONS.toString(), AUTH_HEADERS); - assertEquals(2, threads.getData().size()); + assertEquals(2, threads.getPaging().getTotal()); } @Test @@ -577,9 +659,23 @@ public class FeedResourceTest extends CatalogApplicationTest { public static ThreadList listThreads(String entityLink, Integer limitPosts, Map authHeaders) throws HttpResponseException { + return listThreads(entityLink, limitPosts, authHeaders, null, null, null); + } + + public static ThreadList listThreads( + String entityLink, + Integer limitPosts, + Map authHeaders, + Integer limitParam, + String before, + String after) + throws HttpResponseException { WebTarget target = getResource("feed"); target = entityLink != null ? target.queryParam("entityLink", entityLink) : target; target = limitPosts != null ? target.queryParam("limitPosts", limitPosts) : target; + target = limitParam != null ? target.queryParam("limit", limitParam) : target; + target = before != null ? target.queryParam("before", before) : target; + target = after != null ? target.queryParam("after", after) : target; return TestUtils.get(target, ThreadList.class, authHeaders); }