Fix #3514 Activity Feed: Support pagination for the feed API (#3584)

This commit is contained in:
Vivek Ratnavel Subramanian 2022-03-22 18:37:32 -07:00 committed by GitHub
parent 90c1fecc44
commit 8cc00ff0b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 568 additions and 116 deletions

View File

@ -478,11 +478,105 @@ public interface CollectionDAO {
@SqlQuery("SELECT json FROM thread_entity ORDER BY updatedAt DESC")
List<String> list();
@SqlQuery("SELECT count(id) FROM thread_entity WHERE resolved = :resolved")
int listCount(@Bind("resolved") boolean resolved);
@SqlQuery(
"SELECT json FROM thread_entity "
+ "WHERE updatedAt > :before AND resolved = :resolved "
+ "ORDER BY updatedAt DESC "
+ "LIMIT :limit")
List<String> listBefore(@Bind("limit") int limit, @Bind("before") long before, @Bind("resolved") boolean resolved);
@SqlQuery(
"SELECT json FROM thread_entity "
+ "WHERE updatedAt < :after AND resolved = :resolved "
+ "ORDER BY updatedAt DESC "
+ "LIMIT :limit")
List<String> listAfter(@Bind("limit") int limit, @Bind("after") long after, @Bind("resolved") boolean resolved);
@SqlQuery(
"SELECT json FROM thread_entity WHERE updatedAt > :before AND resolved = :resolved AND "
+ "(entityId in (SELECT toId FROM entity_relationship WHERE "
+ "((fromEntity='user' AND fromId= :userId) OR "
+ "(fromEntity='team' AND fromId IN (<teamIds>))) AND relation=8) OR "
+ "id in (SELECT toId FROM entity_relationship WHERE (fromEntity='user' AND fromId= :userId AND toEntity='THREAD' AND relation IN (1,2)))) "
+ "ORDER BY updatedAt DESC "
+ "LIMIT :limit")
List<String> listThreadsByOwnerBefore(
@Bind("userId") String userId,
@BindList("teamIds") List<String> teamIds,
@Bind("limit") int limit,
@Bind("before") long before,
@Bind("resolved") boolean resolved);
@SqlQuery(
"SELECT json FROM thread_entity WHERE updatedAt < :after AND resolved = :resolved AND "
+ "(entityId in (SELECT toId FROM entity_relationship WHERE "
+ "((fromEntity='user' AND fromId= :userId) OR "
+ "(fromEntity='team' AND fromId IN (<teamIds>))) AND relation=8) OR "
+ "id in (SELECT toId FROM entity_relationship WHERE (fromEntity='user' AND fromId= :userId AND toEntity='THREAD' AND relation IN (1,2)))) "
+ "ORDER BY updatedAt DESC "
+ "LIMIT :limit")
List<String> listThreadsByOwnerAfter(
@Bind("userId") String userId,
@BindList("teamIds") List<String> teamIds,
@Bind("limit") int limit,
@Bind("after") long after,
@Bind("resolved") boolean resolved);
@SqlQuery(
"SELECT count(id) FROM thread_entity WHERE resolved = :resolved AND "
+ "(entityId in (SELECT toId FROM entity_relationship WHERE "
+ "((fromEntity='user' AND fromId= :userId) OR "
+ "(fromEntity='team' AND fromId IN (<teamIds>))) AND relation=8) OR "
+ "id in (SELECT toId FROM entity_relationship WHERE (fromEntity='user' AND fromId= :userId AND toEntity='THREAD' AND relation IN (1,2)))) ")
int listCountThreadsByOwner(
@Bind("userId") String userId, @BindList("teamIds") List<String> teamIds, @Bind("resolved") boolean resolved);
@SqlQuery(
"SELECT json FROM thread_entity WHERE updatedAt > :before AND resolved = :resolved AND "
+ "id in (SELECT fromFQN FROM field_relationship WHERE "
+ "toFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType='THREAD' AND toType LIKE CONCAT(:toType, '%') AND relation= :relation) "
+ "ORDER BY updatedAt DESC "
+ "LIMIT :limit")
List<String> listThreadsByEntityLinkBefore(
@Bind("fqnPrefix") String fqnPrefix,
@Bind("toType") String toType,
@Bind("limit") int limit,
@Bind("before") long before,
@Bind("resolved") boolean resolved,
@Bind("relation") int relation);
@SqlQuery(
"SELECT json FROM thread_entity WHERE updatedAt < :after AND resolved = :resolved AND "
+ "id in (SELECT fromFQN FROM field_relationship WHERE "
+ "toFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType='THREAD' AND toType LIKE CONCAT(:toType, '%') AND relation= :relation) "
+ "ORDER BY updatedAt DESC "
+ "LIMIT :limit")
List<String> listThreadsByEntityLinkAfter(
@Bind("fqnPrefix") String fqnPrefix,
@Bind("toType") String toType,
@Bind("limit") int limit,
@Bind("after") long after,
@Bind("resolved") boolean resolved,
@Bind("relation") int relation);
@SqlQuery(
"SELECT count(id) FROM thread_entity WHERE resolved = :resolved AND "
+ "id in (SELECT fromFQN FROM field_relationship WHERE "
+ "toFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType='THREAD' AND toType LIKE CONCAT(:toType, '%') AND relation= :relation)")
int listCountThreadsByEntityLink(
@Bind("fqnPrefix") String fqnPrefix,
@Bind("toType") String toType,
@Bind("resolved") boolean resolved,
@Bind("relation") int relation);
@SqlUpdate("UPDATE thread_entity SET json = :json where id = :id")
void update(@Bind("id") String id, @Bind("json") String json);
@SqlQuery(
"SELECT entityLink, COUNT(*) count FROM field_relationship fr INNER JOIN thread_entity te ON fr.fromFQN=te.id "
"SELECT entityLink, COUNT(id) count FROM field_relationship fr INNER JOIN thread_entity te ON fr.fromFQN=te.id "
+ "WHERE fr.toFQN LIKE CONCAT(:fqnPrefix, '%') AND fr.toType like concat(:toType, '%') AND fr.fromType = :fromType "
+ "AND fr.relation = :relation AND te.resolved= :isResolved "
+ "GROUP BY entityLink")
@ -495,29 +589,98 @@ public interface CollectionDAO {
@Bind("isResolved") boolean isResolved);
@SqlQuery(
"SELECT entityLink, COUNT(*) count FROM thread_entity WHERE (id IN (<threadIds>)) "
"SELECT entityLink, COUNT(id) count FROM thread_entity WHERE resolved = :resolved AND "
+ "id in (SELECT toId FROM entity_relationship WHERE "
+ "(((fromEntity='user' AND fromId= :userId) OR "
+ "(fromEntity='team' AND fromId IN (<teamIds>))) AND relation=8) OR "
+ "(fromEntity='user' AND fromId= :userId AND toEntity='THREAD' AND relation IN (1,2))) "
+ "GROUP BY entityLink")
@RegisterRowMapper(CountFieldMapper.class)
List<List<String>> listCountByOwner(
@Bind("userId") String userId, @BindList("teamIds") List<String> teamIds, @Bind("resolved") boolean resolved);
@SqlQuery(
"SELECT entityLink, COUNT(id) count FROM thread_entity WHERE (id IN (<threadIds>)) "
+ "AND resolved= :isResolved GROUP BY entityLink")
@RegisterRowMapper(CountFieldMapper.class)
List<List<String>> listCountByThreads(
@BindList("threadIds") List<String> threadIds, @Bind("isResolved") boolean isResolved);
@SqlQuery(
"SELECT id FROM thread_entity WHERE entityId in ("
"SELECT json FROM thread_entity WHERE updatedAt > :before AND resolved = :resolved AND entityId in ("
+ "SELECT toId FROM entity_relationship WHERE "
+ "((fromEntity='user' AND fromId= :userId) OR "
+ "(fromEntity='team' AND fromId IN (<teamIds>))) AND relation= :relation) "
+ "ORDER BY updatedAt DESC "
+ "LIMIT :limit")
List<String> listThreadsByFollowsBefore(
@Bind("userId") String userId,
@Bind("limit") int limit,
@Bind("before") long before,
@Bind("resolved") boolean resolved,
@Bind("relation") int relation);
@SqlQuery(
"SELECT json FROM thread_entity WHERE updatedAt < :after AND resolved = :resolved AND entityId in ("
+ "SELECT toId FROM entity_relationship WHERE "
+ "((fromEntity='user' AND fromId= :userId) OR "
+ "(fromEntity='team' AND fromId IN (<teamIds>))) AND relation= :relation) "
+ "ORDER BY updatedAt DESC "
+ "LIMIT :limit")
List<String> listThreadsByFollowsAfter(
@Bind("userId") String userId,
@Bind("limit") int limit,
@Bind("after") long after,
@Bind("resolved") boolean resolved,
@Bind("relation") int relation);
@SqlQuery(
"SELECT count(id) FROM thread_entity WHERE resolved = :resolved AND entityId in ("
+ "SELECT toId FROM entity_relationship WHERE "
+ "((fromEntity='user' AND fromId= :userId) OR "
+ "(fromEntity='team' AND fromId IN (<teamIds>))) AND relation= :relation)")
List<String> listUserThreadsFromER(
@Bind("userId") String userId, @BindList("teamIds") List<String> teamIds, @Bind("relation") int relation);
int listCountThreadsByFollows(
@Bind("userId") String userId, @Bind("resolved") boolean resolved, @Bind("relation") int relation);
@SqlQuery(
"SELECT id FROM thread_entity WHERE id in ("
"SELECT json FROM thread_entity WHERE updatedAt > :before AND resolved = :resolved AND id in ("
+ "SELECT toFQN FROM field_relationship WHERE "
+ "((fromType='user' AND fromFQN= :userName) OR "
+ "(fromType='team' AND fromFQN IN (<teamNames>))) AND toType = :toType AND relation = :relation)")
List<String> listUserThreadsFromFR(
+ "(fromType='team' AND fromFQN IN (<teamNames>))) AND toType='THREAD' AND relation= :relation) "
+ "ORDER BY updatedAt DESC "
+ "LIMIT :limit")
List<String> listThreadsByMentionsBefore(
@Bind("userName") String userName,
@BindList("teamNames") List<String> teamNames,
@Bind("toType") String toType,
@Bind("limit") int limit,
@Bind("before") long before,
@Bind("resolved") boolean resolved,
@Bind("relation") int relation);
@SqlQuery(
"SELECT json FROM thread_entity WHERE updatedAt < :after AND resolved = :resolved AND id in ("
+ "SELECT toFQN FROM field_relationship WHERE "
+ "((fromType='user' AND fromFQN= :userName) OR "
+ "(fromType='team' AND fromFQN IN (<teamNames>))) AND toType='THREAD' AND relation= :relation) "
+ "ORDER BY updatedAt DESC "
+ "LIMIT :limit")
List<String> listThreadsByMentionsAfter(
@Bind("userName") String userName,
@BindList("teamNames") List<String> teamNames,
@Bind("limit") int limit,
@Bind("after") long after,
@Bind("resolved") boolean resolved,
@Bind("relation") int relation);
@SqlQuery(
"SELECT count(id) FROM thread_entity WHERE resolved = :resolved AND id in ("
+ "SELECT toFQN FROM field_relationship WHERE "
+ "((fromType='user' AND fromFQN= :userName) OR "
+ "(fromType='team' AND fromFQN IN (<teamNames>))) AND toType='THREAD' AND relation= :relation) ")
int listCountThreadsByMentions(
@Bind("userName") String userName,
@BindList("teamNames") List<String> teamNames,
@Bind("resolved") boolean resolved,
@Bind("relation") int relation);
class CountFieldMapper implements RowMapper<List<String>> {

View File

@ -18,10 +18,8 @@ import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -50,6 +48,7 @@ import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.RestUtil.DeleteResponse;
import org.openmetadata.catalog.util.RestUtil.PatchResponse;
import org.openmetadata.catalog.util.ResultList;
@Slf4j
public class FeedRepository {
@ -65,6 +64,11 @@ public class FeedRepository {
FOLLOWS
}
public enum PaginationType {
BEFORE,
AFTER
}
@Transaction
public Thread create(Thread thread) throws IOException, ParseException {
String createdBy = thread.getCreatedBy();
@ -226,9 +230,15 @@ public class FeedRepository {
} else {
EntityLink entityLink = EntityLink.parse(link);
EntityReference reference = EntityUtil.validateEntityLink(entityLink);
if (reference.getType().equals(Entity.USER)) {
List<String> threadIds = getThreadIdsByOwner(reference.getId().toString());
result = dao.feedDAO().listCountByThreads(threadIds, isResolved);
if (reference.getType().equals(Entity.USER) || reference.getType().equals(Entity.TEAM)) {
if (reference.getType().equals(Entity.USER)) {
String userId = reference.getId().toString();
List<String> teamIds = getTeamIds(userId);
result = dao.feedDAO().listCountByOwner(userId, teamIds, isResolved);
} else {
// team is not supported
result = new ArrayList<>();
}
} else {
result =
dao.feedDAO()
@ -256,72 +266,133 @@ public class FeedRepository {
return thread.getPosts();
}
/**
* List threads based on the filters and limits in the order of the updated timestamp.
*
* @param link entity link filter
* @param limitPosts the number of posts to limit per thread
* @param userId UUID of the user. Enables UserId filter
* @param filterType Type of the filter to be applied with userId filter
* @param limit the number of threads to limit in the response
* @param pageMarker the before/after updatedTime to be used as pagination marker for queries
* @param isResolved whether the thread is resolved or open
* @param paginationType before or after
* @return a list of threads as ResultList
* @throws IOException on error
* @throws ParseException on error
*/
@Transaction
public List<Thread> listThreads(String link, int limitPosts, String userId, FilterType filterType)
throws IOException {
List<Thread> threads = new ArrayList<>();
public final ResultList<Thread> list(
String link,
int limitPosts,
String userId,
FilterType filterType,
int limit,
String pageMarker,
boolean isResolved,
PaginationType paginationType)
throws IOException, ParseException {
List<Thread> threads;
int total;
// Here updatedAt time is used for page marker since threads are sorted by last update time
long time = Long.MAX_VALUE;
// if paginationType is "before", it must have a pageMarker time.
// "after" could be null to get the first page. In this case we set time to MAX_VALUE
// to get any entry with updatedTime < MAX_VALUE
if (pageMarker != null) {
time = Long.parseLong(RestUtil.decodeCursor(pageMarker));
}
// No filters are enabled. Listing all the threads
if (link == null && userId == null) {
// No filters are enabled. Listing all the threads
threads = JsonUtils.readObjects(dao.feedDAO().list(), Thread.class);
// Get one extra result used for computing before cursor
List<String> jsons;
if (paginationType == PaginationType.BEFORE) {
jsons = dao.feedDAO().listBefore(limit + 1, time, isResolved);
} else {
jsons = dao.feedDAO().listAfter(limit + 1, time, isResolved);
}
threads = JsonUtils.readObjects(jsons, Thread.class);
total = dao.feedDAO().listCount(isResolved);
} else {
// Either one or both the filters are enabled
List<String> threadIds = new ArrayList<>();
// we don't support both the filters together. If both are not null, entity link takes precedence
if (link != null) {
EntityLink entityLink = EntityLink.parse(link);
EntityReference reference = EntityUtil.validateEntityLink(entityLink);
List<List<String>> result;
// For a user entityLink get created or replied relationships to the thread
if (reference.getType().equals(Entity.USER)) {
threadIds.addAll(getThreadIdsByOwner(reference.getId().toString()));
FilteredThreads filteredThreads =
getThreadsByOwner(reference.getId().toString(), limit + 1, time, isResolved, paginationType);
threads = filteredThreads.getThreads();
total = filteredThreads.getTotalCount();
} else {
// Only data assets are added as about
result =
dao.fieldRelationshipDAO()
.listFromByAllPrefix(
entityLink.getFullyQualifiedFieldValue(),
Entity.THREAD,
entityLink.getFullyQualifiedFieldType(),
Relationship.IS_ABOUT.ordinal());
result.forEach(l -> threadIds.add(l.get(1)));
}
}
if (userId != null) {
List<String> userThreadIds;
if (filterType == FilterType.FOLLOWS) {
userThreadIds = getThreadIdsByFollows(userId);
} else if (filterType == FilterType.MENTIONS) {
userThreadIds = getThreadIdsByMentions(userId);
} else {
userThreadIds = getThreadIdsByOwner(userId);
}
// if both link and user filters are enabled, the filters should be applied as "AND"
if (!threadIds.isEmpty()) {
// apply user filter on top of the link filter
if (!userThreadIds.isEmpty()) {
userThreadIds = userThreadIds.stream().filter(threadIds::contains).collect(Collectors.toList());
List<String> jsons;
if (paginationType == PaginationType.BEFORE) {
jsons =
dao.feedDAO()
.listThreadsByEntityLinkBefore(
entityLink.getFullyQualifiedFieldValue(),
entityLink.getFullyQualifiedFieldType(),
limit + 1,
time,
isResolved,
Relationship.IS_ABOUT.ordinal());
} else {
jsons =
dao.feedDAO()
.listThreadsByEntityLinkAfter(
entityLink.getFullyQualifiedFieldValue(),
entityLink.getFullyQualifiedFieldType(),
limit + 1,
time,
isResolved,
Relationship.IS_ABOUT.ordinal());
}
threads = JsonUtils.readObjects(jsons, Thread.class);
total =
dao.feedDAO()
.listCountThreadsByEntityLink(
entityLink.getFullyQualifiedFieldValue(),
entityLink.getFullyQualifiedFieldType(),
isResolved,
Relationship.IS_ABOUT.ordinal());
}
threadIds.addAll(userThreadIds);
}
Set<String> uniqueValues = new HashSet<>();
for (String t : threadIds) {
// If an entity has multiple relationships (created, mentioned, repliedTo etc.) to the same thread
// Don't send duplicated copies of the thread in response
if (uniqueValues.add(t)) {
threads.add(EntityUtil.validate(t, dao.feedDAO().findById(t), Thread.class));
} else {
FilteredThreads filteredThreads;
if (filterType == FilterType.FOLLOWS) {
filteredThreads = getThreadsByFollows(userId, limit + 1, time, isResolved, paginationType);
} else if (filterType == FilterType.MENTIONS) {
filteredThreads = getThreadsByMentions(userId, limit + 1, time, isResolved, paginationType);
} else {
filteredThreads = getThreadsByOwner(userId, limit + 1, time, isResolved, paginationType);
}
threads = filteredThreads.getThreads();
total = filteredThreads.getTotalCount();
}
// sort the list by thread updated timestamp before returning
threads.sort(Comparator.comparing(Thread::getUpdatedAt, Comparator.reverseOrder()));
}
return limitPostsInThreads(threads, limitPosts);
limitPostsInThreads(threads, limitPosts);
String beforeCursor = null;
String afterCursor = null;
if (paginationType == PaginationType.BEFORE) {
if (threads.size() > limit) { // If extra result exists, then previous page exists - return before cursor
threads.remove(0);
beforeCursor = threads.get(0).getUpdatedAt().toString();
}
afterCursor = threads.get(threads.size() - 1).getUpdatedAt().toString();
} else {
beforeCursor = pageMarker == null ? null : threads.get(0).getUpdatedAt().toString();
if (threads.size() > limit) { // If extra result exists, then next page exists - return after cursor
threads.remove(limit);
afterCursor = threads.get(limit - 1).getUpdatedAt().toString();
}
}
return new ResultList<>(threads, beforeCursor, afterCursor, total);
}
@Transaction
@ -365,7 +436,13 @@ public class FeedRepository {
return original.getResolved() != updated.getResolved() || !original.getMessage().equals(updated.getMessage());
}
private List<Thread> limitPostsInThreads(List<Thread> threads, int limitPosts) {
/**
* Limit the number of posts within each thread.
*
* @param threads list of threads
* @param limitPosts the number of posts to limit per thread
*/
private void limitPostsInThreads(List<Thread> threads, int limitPosts) {
for (Thread t : threads) {
List<Post> posts = t.getPosts();
if (posts.size() > limitPosts) {
@ -375,26 +452,62 @@ public class FeedRepository {
t.withPosts(posts);
}
}
return threads;
}
private List<String> getThreadIdsByOwner(String userId) {
List<String> threadIds = new ArrayList<>();
/**
* Return the threads associated with user/team owned entities and the threads that were created by or replied to by
* the user.
*
* @param userId UUID of the user
* @param limit number of threads to limit
* @param time updatedTime before/after which the results should be filtered for pagination
* @param isResolved whether the thread is resolved or open
* @param paginationType before or after
* @return a list of threads and the total count of threads
* @throws IOException on error
*/
private FilteredThreads getThreadsByOwner(
String userId, int limit, long time, boolean isResolved, PaginationType paginationType) throws IOException {
// add threads on user or team owned entities
// and threads created by or replied to by the user
List<String> teamIds = getTeamIds(userId);
List<String> jsons;
if (paginationType == PaginationType.BEFORE) {
jsons = dao.feedDAO().listThreadsByOwnerBefore(userId, teamIds, limit, time, isResolved);
} else {
jsons = dao.feedDAO().listThreadsByOwnerAfter(userId, teamIds, limit, time, isResolved);
}
List<Thread> threads = JsonUtils.readObjects(jsons, Thread.class);
int totalCount = dao.feedDAO().listCountThreadsByOwner(userId, teamIds, isResolved);
return new FilteredThreads(threads, totalCount);
}
/**
* Get a list of team ids that the given user is a part of.
*
* @param userId UUID of the user.
* @return list of team ids.
*/
private List<String> getTeamIds(String userId) {
List<String> teamIds = dao.relationshipDAO().findFrom(userId, Entity.USER, Relationship.HAS.ordinal(), Entity.TEAM);
if (teamIds.isEmpty()) {
teamIds = List.of(StringUtils.EMPTY);
}
threadIds.addAll(dao.feedDAO().listUserThreadsFromER(userId, teamIds, Relationship.OWNS.ordinal()));
// add threads created by or replied to by the user
threadIds.addAll(dao.relationshipDAO().findTo(userId, Entity.USER, Relationship.CREATED.ordinal(), Entity.THREAD));
threadIds.addAll(
dao.relationshipDAO().findTo(userId, Entity.USER, Relationship.REPLIED_TO.ordinal(), Entity.THREAD));
return threadIds;
return teamIds;
}
private List<String> getThreadIdsByMentions(String userId) throws IOException {
/**
* Returns the threads where the user or the team they belong to were mentioned by other users with @mention.
*
* @param userId UUID of the user
* @param limit number of threads to limit
* @param time updatedTime before/after which the results should be filtered for pagination
* @param isResolved whether the thread is resolved or open
* @param paginationType before or after
* @return a list of threads and the total count of threads
* @throws IOException on error
*/
private FilteredThreads getThreadsByMentions(
String userId, int limit, long time, boolean isResolved, PaginationType paginationType) throws IOException {
List<EntityReference> teams =
EntityUtil.populateEntityReferences(
dao.relationshipDAO().findFromEntity(userId, Entity.USER, Relationship.HAS.ordinal(), Entity.TEAM));
@ -404,13 +517,65 @@ public class FeedRepository {
}
User user = dao.userDAO().findEntityById(UUID.fromString(userId));
// Return all the thread ids where the user or team was mentioned
return new ArrayList<>(
// Return the threads where the user or team was mentioned
List<String> jsons;
if (paginationType == PaginationType.BEFORE) {
jsons =
dao.feedDAO()
.listThreadsByMentionsBefore(
user.getName(), teamNames, limit, time, isResolved, Relationship.MENTIONED_IN.ordinal());
} else {
jsons =
dao.feedDAO()
.listThreadsByMentionsAfter(
user.getName(), teamNames, limit, time, isResolved, Relationship.MENTIONED_IN.ordinal());
}
List<Thread> threads = JsonUtils.readObjects(jsons, Thread.class);
int totalCount =
dao.feedDAO()
.listUserThreadsFromFR(user.getName(), teamNames, Entity.THREAD, Relationship.MENTIONED_IN.ordinal()));
.listCountThreadsByMentions(user.getName(), teamNames, isResolved, Relationship.MENTIONED_IN.ordinal());
return new FilteredThreads(threads, totalCount);
}
private List<String> getThreadIdsByFollows(String userId) {
return dao.feedDAO().listUserThreadsFromER(userId, List.of(StringUtils.EMPTY), Relationship.FOLLOWS.ordinal());
/**
* Returns the threads that are associated with the entities followed by the user.
*
* @param userId UUID of the user
* @param limit number of threads to limit
* @param time updatedTime before/after which the results should be filtered for pagination
* @param isResolved whether the thread is resolved or open
* @param paginationType before or after
* @return a list of threads and the total count of threads
* @throws IOException on error
*/
private FilteredThreads getThreadsByFollows(
String userId, int limit, long time, boolean isResolved, PaginationType paginationType) throws IOException {
List<String> jsons;
if (paginationType == PaginationType.BEFORE) {
jsons = dao.feedDAO().listThreadsByFollowsBefore(userId, limit, time, isResolved, Relationship.FOLLOWS.ordinal());
} else {
jsons = dao.feedDAO().listThreadsByFollowsAfter(userId, limit, time, isResolved, Relationship.FOLLOWS.ordinal());
}
List<Thread> threads = JsonUtils.readObjects(jsons, Thread.class);
int totalCount = dao.feedDAO().listCountThreadsByFollows(userId, isResolved, Relationship.FOLLOWS.ordinal());
return new FilteredThreads(threads, totalCount);
}
public static class FilteredThreads {
List<Thread> threads;
int totalCount;
public FilteredThreads(List<Thread> threads, int totalCount) {
this.threads = threads;
this.totalCount = totalCount;
}
public List<Thread> getThreads() {
return threads;
}
public int getTotalCount() {
return totalCount;
}
}
}

View File

@ -56,6 +56,7 @@ import org.openmetadata.catalog.entity.feed.Thread;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.jdbi3.FeedRepository;
import org.openmetadata.catalog.jdbi3.FeedRepository.FilterType;
import org.openmetadata.catalog.jdbi3.FeedRepository.PaginationType;
import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.security.Authorizer;
import org.openmetadata.catalog.security.SecurityUtil;
@ -130,7 +131,7 @@ public class FeedResource {
description = "List of threads",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = ThreadList.class)))
})
public ThreadList list(
public ResultList<Thread> list(
@Context UriInfo uriInfo,
@Parameter(
description = "Limit the number of posts sorted by chronological order (1 to 1000000, default = 3)",
@ -140,6 +141,18 @@ public class FeedResource {
@DefaultValue("3")
@QueryParam("limitPosts")
int limitPosts,
@Parameter(description = "Limit the number of threads returned. (1 to 1000000, default = 10)")
@DefaultValue("10")
@Min(1)
@Max(1000000)
@QueryParam("limit")
int limitParam,
@Parameter(description = "Returns list of threads before this cursor", schema = @Schema(type = "string"))
@QueryParam("before")
String before,
@Parameter(description = "Returns list of threads after this cursor", schema = @Schema(type = "string"))
@QueryParam("after")
String after,
@Parameter(
description = "Filter threads by entity link",
schema = @Schema(type = "string", example = "<E#/{entityType}/{entityFQN}/{fieldName}>"))
@ -147,7 +160,7 @@ public class FeedResource {
String entityLink,
@Parameter(
description =
"Filter threads by user id. This filter requires a 'filterType' query param. The default filter type is 'OWNER'",
"Filter threads by user id. This filter requires a 'filterType' query param. The default filter type is 'OWNER'. This filter cannot be combined with the entityLink filter.",
schema = @Schema(type = "string"))
@QueryParam("userId")
String userId,
@ -156,9 +169,24 @@ public class FeedResource {
"Filter type definition for the user filter. It can take one of 'OWNER', 'FOLLOWS', 'MENTIONS'. This must be used with the 'user' query param",
schema = @Schema(implementation = FilterType.class))
@QueryParam("filterType")
FilterType filterType)
throws IOException {
return new ThreadList(addHref(uriInfo, dao.listThreads(entityLink, limitPosts, userId, filterType)));
FilterType filterType,
@Parameter(description = "Filter threads by whether they are resolved or not. By default resolved is false")
@DefaultValue("false")
@QueryParam("resolved")
boolean resolved)
throws IOException, ParseException {
RestUtil.validateCursors(before, after);
ResultList<Thread> threads;
if (before != null) { // Reverse paging
threads =
dao.list(entityLink, limitPosts, userId, filterType, limitParam, before, resolved, PaginationType.BEFORE);
} else { // Forward paging or first page
threads = dao.list(entityLink, limitPosts, userId, filterType, limitParam, after, resolved, PaginationType.AFTER);
}
threads.getData().forEach(thread -> addHref(uriInfo, thread));
return threads;
}
@GET

View File

@ -121,16 +121,16 @@ public class TeamResource extends EntityResource<Team, TeamRepository> {
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields")
String fieldsParam,
@Parameter(description = "Limit the number tables returned. (1 to 1000000, default = 10)")
@Parameter(description = "Limit the number of teams returned. (1 to 1000000, default = 10)")
@DefaultValue("10")
@Min(0)
@Max(1000000)
@QueryParam("limit")
int limitParam,
@Parameter(description = "Returns list of tables before this cursor", schema = @Schema(type = "string"))
@Parameter(description = "Returns list of teams before this cursor", schema = @Schema(type = "string"))
@QueryParam("before")
String before,
@Parameter(description = "Returns list of tables after this cursor", schema = @Schema(type = "string"))
@Parameter(description = "Returns list of teams after this cursor", schema = @Schema(type = "string"))
@QueryParam("after")
String after,
@Parameter(

View File

@ -19,6 +19,7 @@ import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.noPermission;
@ -39,6 +40,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import javax.json.JsonPatch;
import javax.ws.rs.client.WebTarget;
import lombok.extern.slf4j.Slf4j;
@ -49,6 +51,10 @@ import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource;
import org.openmetadata.catalog.CatalogApplicationTest;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.data.CreateTable;
@ -194,12 +200,13 @@ public class FeedResourceTest extends CatalogApplicationTest {
@Test
void post_validThreadAndList_200(TestInfo test) throws IOException {
int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getData().size();
int userThreadCount = listThreads(USER_LINK, null, ADMIN_AUTH_HEADERS).getData().size();
int teamThreadCount = listThreads(TEAM_LINK, null, ADMIN_AUTH_HEADERS).getData().size();
int tableThreadCount = listThreads(TABLE_LINK, null, ADMIN_AUTH_HEADERS).getData().size();
int tableDescriptionThreadCount = listThreads(TABLE_DESCRIPTION_LINK, null, ADMIN_AUTH_HEADERS).getData().size();
int tableColumnDescriptionThreadCount = listThreads(TABLE_COLUMN_LINK, null, ADMIN_AUTH_HEADERS).getData().size();
int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
int userThreadCount = listThreads(USER_LINK, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
int tableThreadCount = listThreads(TABLE_LINK, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
int tableDescriptionThreadCount =
listThreads(TABLE_DESCRIPTION_LINK, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
int tableColumnDescriptionThreadCount =
listThreads(TABLE_COLUMN_LINK, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
CreateThread create =
create()
@ -217,45 +224,51 @@ public class FeedResourceTest extends CatalogApplicationTest {
for (int i = 0; i < 10; i++) {
createAndCheck(create, userAuthHeaders);
// List all the threads and make sure the number of threads increased by 1
assertEquals(++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getData().size()); // Mentioned user
// TODO: There is no support for team mentions yet.
// assertEquals(++teamThreadCount, listThreads(TEAM_LINK, null, userAuthHeaders).getData().size()); // Mentioned
// team
assertEquals(++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getData().size()); // About TABLE
assertEquals(++totalThreadCount, listThreads(null, null, userAuthHeaders).getData().size()); // Overall threads
assertEquals(
++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getPaging().getTotal()); // Mentioned user
assertEquals(
++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE
assertEquals(
++totalThreadCount, listThreads(null, null, userAuthHeaders).getPaging().getTotal()); // Overall threads
}
// List threads should not include mentioned entities
// It should only include threads which are about the entity link
assertEquals(
tableDescriptionThreadCount,
listThreads(TABLE_DESCRIPTION_LINK, null, userAuthHeaders).getData().size()); // About TABLE Description
listThreads(TABLE_DESCRIPTION_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE Description
assertEquals(
tableColumnDescriptionThreadCount,
listThreads(TABLE_COLUMN_LINK, null, userAuthHeaders).getData().size()); // About TABLE Column Description
listThreads(TABLE_COLUMN_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE Column Description
create.withAbout(TABLE_DESCRIPTION_LINK);
for (int i = 0; i < 10; i++) {
createAndCheck(create, userAuthHeaders);
// List all the threads and make sure the number of threads increased by 1
assertEquals(++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getData().size()); // Mentioned user
assertEquals(++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getData().size()); // About TABLE
assertEquals(
++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getPaging().getTotal()); // Mentioned user
assertEquals(
++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE
assertEquals(
++tableDescriptionThreadCount,
listThreads(TABLE_DESCRIPTION_LINK, null, userAuthHeaders).getData().size()); // About TABLE Description
assertEquals(++totalThreadCount, listThreads(null, null, userAuthHeaders).getData().size()); // Overall threads
listThreads(TABLE_DESCRIPTION_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE Description
assertEquals(
++totalThreadCount, listThreads(null, null, userAuthHeaders).getPaging().getTotal()); // Overall threads
}
create.withAbout(TABLE_COLUMN_LINK);
for (int i = 0; i < 10; i++) {
createAndCheck(create, userAuthHeaders);
// List all the threads and make sure the number of threads increased by 1
assertEquals(++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getData().size()); // Mentioned user
assertEquals(++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getData().size()); // About TABLE
assertEquals(
++userThreadCount, listThreads(USER_LINK, null, userAuthHeaders).getPaging().getTotal()); // Mentioned user
assertEquals(
++tableThreadCount, listThreads(TABLE_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE
assertEquals(
++tableColumnDescriptionThreadCount,
listThreads(TABLE_COLUMN_LINK, null, userAuthHeaders).getData().size()); // About TABLE Description
assertEquals(++totalThreadCount, listThreads(null, null, userAuthHeaders).getData().size()); // Overall threads
listThreads(TABLE_COLUMN_LINK, null, userAuthHeaders).getPaging().getTotal()); // About TABLE Description
assertEquals(
++totalThreadCount, listThreads(null, null, userAuthHeaders).getPaging().getTotal()); // Overall threads
}
// Test the /api/v1/feed/count API
@ -264,6 +277,72 @@ public class FeedResourceTest extends CatalogApplicationTest {
assertEquals(tableColumnDescriptionThreadCount, getThreadCount(TABLE_COLUMN_LINK, userAuthHeaders));
}
private static Stream<Arguments> provideStringsForListThreads() {
return Stream.of(
Arguments.of(String.format("<#E/%s/%s>", Entity.USER, USER.getName())),
Arguments.of(String.format("<#E/%s/%s>", Entity.TABLE, TABLE.getFullyQualifiedName())));
}
@ParameterizedTest
@NullSource
@MethodSource("provideStringsForListThreads")
void get_listThreadsWithPagination(String entityLink) throws HttpResponseException {
// Create 10 threads
int totalThreadCount = listThreads(entityLink, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
Map<String, String> userAuthHeaders = authHeaders(USER.getEmail());
for (int i = 1; i <= 10; i++) {
CreateThread create = create().withMessage("Thread " + i);
createAndCheck(create, userAuthHeaders);
// List all the threads and make sure the number of threads increased by 1
assertEquals(++totalThreadCount, listThreads(entityLink, null, userAuthHeaders).getPaging().getTotal());
}
// Now test if there are n number of pages with limit set to 5. (n = totalThreadCount / 5)
int limit = 5;
int totalPages = totalThreadCount / limit;
int lastPageCount;
if (totalThreadCount % limit != 0) {
totalPages++;
lastPageCount = totalThreadCount % limit;
} else {
lastPageCount = limit;
}
// Get the first page
ThreadList threads = listThreads(entityLink, null, userAuthHeaders, limit, null, null);
assertEquals(limit, threads.getData().size());
assertEquals(totalThreadCount, threads.getPaging().getTotal());
assertNotNull(threads.getPaging().getAfter());
assertNull(threads.getPaging().getBefore());
String afterCursor = threads.getPaging().getAfter();
String beforeCursor = null;
int pageCount = 1;
// From the second page till last page, after and before cursors should not be null
while (afterCursor != null && pageCount < totalPages - 1) {
threads = listThreads(entityLink, null, userAuthHeaders, limit, null, afterCursor);
assertNotNull(threads.getPaging().getAfter());
assertNotNull(threads.getPaging().getBefore());
pageCount++;
afterCursor = threads.getPaging().getAfter();
if (pageCount == 2) {
beforeCursor = threads.getPaging().getBefore();
}
}
assertEquals(totalPages - 1, pageCount);
// Get the last page
threads = listThreads(entityLink, null, userAuthHeaders, limit, null, afterCursor);
assertEquals(lastPageCount, threads.getData().size());
assertNull(threads.getPaging().getAfter());
// beforeCursor should point to the first page
threads = listThreads(entityLink, null, userAuthHeaders, limit, beforeCursor, null);
assertEquals(limit, threads.getData().size());
// since threads are always returned to the order of updated timestamp
// the first message should read "Thread 10"
assertEquals("Thread 10", threads.getData().get(0).getMessage());
}
@Test
void post_addPostWithoutMessage_4xx() {
// Add post to a thread without message field
@ -381,9 +460,11 @@ public class FeedResourceTest extends CatalogApplicationTest {
@Test
void list_threadsWithOwnerFilter() throws HttpResponseException {
// THREAD is created with TABLE entity in BeforeAll
int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getData().size();
int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
int user2ThreadCount =
listThreadsWithFilter(USER2.getId().toString(), FilterType.OWNER.toString(), AUTH_HEADERS).getData().size();
listThreadsWithFilter(USER2.getId().toString(), FilterType.OWNER.toString(), AUTH_HEADERS)
.getPaging()
.getTotal();
String ownerId = TABLE.getOwner().getId().toString();
// create another thread on an entity with a different owner
@ -397,20 +478,21 @@ public class FeedResourceTest extends CatalogApplicationTest {
assertNotEquals(ownerId, ownerId2);
ThreadList threads = listThreadsWithFilter(ownerId, FilterType.OWNER.toString(), AUTH_HEADERS);
assertEquals(totalThreadCount, threads.getData().size());
assertEquals(totalThreadCount, threads.getPaging().getTotal());
// This should return 0 since the table is owned by a team
// and for the filter we are passing team id instead of user id
threads = listThreadsWithFilter(ownerId2, FilterType.OWNER.toString(), AUTH_HEADERS);
assertEquals(0, threads.getPaging().getTotal());
assertEquals(0, threads.getData().size());
// Now, test the filter with user who is part of the team
threads = listThreadsWithFilter(USER2.getId().toString(), FilterType.OWNER.toString(), AUTH_HEADERS);
assertEquals(user2ThreadCount + 1, threads.getData().size());
assertEquals(user2ThreadCount + 1, threads.getPaging().getTotal());
// Test if no user id filter returns all threads
threads = listThreadsWithFilter(null, FilterType.OWNER.toString(), AUTH_HEADERS);
assertEquals(totalThreadCount + 1, threads.getData().size());
assertEquals(totalThreadCount + 1, threads.getPaging().getTotal());
}
@Test
@ -433,7 +515,7 @@ public class FeedResourceTest extends CatalogApplicationTest {
addPostAndCheck(thread, createPost, ADMIN_AUTH_HEADERS);
ThreadList threads = listThreadsWithFilter(USER.getId().toString(), FilterType.MENTIONS.toString(), AUTH_HEADERS);
assertEquals(2, threads.getData().size());
assertEquals(2, threads.getPaging().getTotal());
}
@Test
@ -577,9 +659,23 @@ public class FeedResourceTest extends CatalogApplicationTest {
public static ThreadList listThreads(String entityLink, Integer limitPosts, Map<String, String> authHeaders)
throws HttpResponseException {
return listThreads(entityLink, limitPosts, authHeaders, null, null, null);
}
public static ThreadList listThreads(
String entityLink,
Integer limitPosts,
Map<String, String> authHeaders,
Integer limitParam,
String before,
String after)
throws HttpResponseException {
WebTarget target = getResource("feed");
target = entityLink != null ? target.queryParam("entityLink", entityLink) : target;
target = limitPosts != null ? target.queryParam("limitPosts", limitPosts) : target;
target = limitParam != null ? target.queryParam("limit", limitParam) : target;
target = before != null ? target.queryParam("before", before) : target;
target = after != null ? target.queryParam("after", after) : target;
return TestUtils.get(target, ThreadList.class, authHeaders);
}