mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-25 22:49:12 +00:00
parent
f0804816ab
commit
28807f8c0a
@ -24,6 +24,7 @@ import java.util.UUID;
|
||||
import javax.ws.rs.container.ContainerRequestContext;
|
||||
import javax.ws.rs.container.ContainerResponseContext;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import javax.ws.rs.core.SecurityContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.openmetadata.catalog.CatalogApplicationConfig;
|
||||
@ -53,6 +54,7 @@ public class ChangeEventHandler implements EventHandler {
|
||||
|
||||
public Void process(ContainerRequestContext requestContext, ContainerResponseContext responseContext) {
|
||||
String method = requestContext.getMethod();
|
||||
SecurityContext securityContext = requestContext.getSecurityContext();
|
||||
try {
|
||||
ChangeEvent changeEvent = getChangeEvent(method, responseContext);
|
||||
if (changeEvent != null) {
|
||||
@ -75,7 +77,7 @@ public class ChangeEventHandler implements EventHandler {
|
||||
List<Thread> threads = getThreads(responseContext);
|
||||
if (threads != null) {
|
||||
for (var thread : threads) {
|
||||
feedDao.create(thread);
|
||||
feedDao.create(thread, securityContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -225,12 +227,11 @@ public class ChangeEventHandler implements EventHandler {
|
||||
switch (changeType) {
|
||||
case ADD:
|
||||
message =
|
||||
String.format("Added %s: `*%s*`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
|
||||
String.format("Added %s: `%s`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
|
||||
break;
|
||||
case UPDATE:
|
||||
message =
|
||||
String.format(
|
||||
"Updated %s to `*%s*`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
|
||||
String.format("Updated %s to `%s`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
|
||||
break;
|
||||
case DELETE:
|
||||
message = String.format("Deleted %s", arrayFieldValue != null ? arrayFieldValue : fieldName);
|
||||
|
||||
@ -55,6 +55,7 @@ import org.openmetadata.catalog.entity.teams.User;
|
||||
import org.openmetadata.catalog.jdbi3.AirflowPipelineRepository.AirflowPipelineEntityInterface;
|
||||
import org.openmetadata.catalog.jdbi3.BotsRepository.BotsEntityInterface;
|
||||
import org.openmetadata.catalog.jdbi3.ChartRepository.ChartEntityInterface;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO.FieldRelationshipDAO.FromFieldMapper;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO.TagDAO.TagLabelMapper;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO.UsageDAO.UsageDetailsMapper;
|
||||
import org.openmetadata.catalog.jdbi3.DashboardRepository.DashboardEntityInterface;
|
||||
@ -516,6 +517,33 @@ public interface CollectionDAO {
|
||||
|
||||
@SqlUpdate("UPDATE thread_entity SET json = :json where id = :id")
|
||||
void update(@Bind("id") String id, @Bind("json") String json);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT entityLink, COUNT(*) count FROM field_relationship fr INNER JOIN thread_entity te ON fr.fromFQN=te.id "
|
||||
+ "WHERE fr.toFQN LIKE CONCAT(:fqnPrefix, '%') AND fr.toType like concat(:toType, '%') AND fr.fromType = :fromType "
|
||||
+ "AND fr.relation = :relation AND te.resolved= :isResolved "
|
||||
+ "GROUP BY entityLink")
|
||||
@RegisterRowMapper(CountFieldMapper.class)
|
||||
List<List<String>> listCountByEntityLink(
|
||||
@Bind("fqnPrefix") String fqnPrefix,
|
||||
@Bind("fromType") String fromType,
|
||||
@Bind("toType") String toType,
|
||||
@Bind("relation") int relation,
|
||||
@Bind("isResolved") boolean isResolved);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT entityLink, COUNT(*) count FROM thread_entity WHERE (id IN (<threadIds>)) "
|
||||
+ "AND resolved= :isResolved GROUP BY entityLink")
|
||||
@RegisterRowMapper(CountFieldMapper.class)
|
||||
List<List<String>> listCountByThreads(
|
||||
@BindList("threadIds") List<String> threadIds, @Bind("isResolved") boolean isResolved);
|
||||
|
||||
class CountFieldMapper implements RowMapper<List<String>> {
|
||||
@Override
|
||||
public List<String> map(ResultSet rs, StatementContext ctx) throws SQLException {
|
||||
return Arrays.asList(rs.getString("entityLink"), rs.getString("count"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface FieldRelationshipDAO {
|
||||
|
||||
@ -23,12 +23,18 @@ import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import javax.ws.rs.core.SecurityContext;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.api.feed.EntityLinkThreadCount;
|
||||
import org.openmetadata.catalog.api.feed.ThreadCount;
|
||||
import org.openmetadata.catalog.entity.feed.Thread;
|
||||
import org.openmetadata.catalog.resources.feeds.FeedUtil;
|
||||
import org.openmetadata.catalog.resources.feeds.MessageParser;
|
||||
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.catalog.security.SecurityUtil;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.type.Include;
|
||||
import org.openmetadata.catalog.type.Post;
|
||||
@ -44,10 +50,13 @@ public class FeedRepository {
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public Thread create(Thread thread) throws IOException, ParseException {
|
||||
// Validate user creating thread
|
||||
public Thread create(Thread thread, SecurityContext securityContext) throws IOException, ParseException {
|
||||
String fromUser = thread.getPosts().get(0).getFrom();
|
||||
dao.userDAO().findEntityByName(fromUser);
|
||||
|
||||
if (SecurityUtil.isSecurityEnabled(securityContext)) {
|
||||
// Validate user creating thread if security is enabled
|
||||
dao.userDAO().findEntityByName(fromUser);
|
||||
}
|
||||
|
||||
// Validate about data entity is valid
|
||||
EntityLink about = EntityLink.parse(thread.getAbout());
|
||||
@ -136,6 +145,46 @@ public class FeedRepository {
|
||||
return thread;
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public ThreadCount getThreadsCount(String link, boolean isResolved) throws IOException {
|
||||
ThreadCount threadCount = new ThreadCount();
|
||||
List<List<String>> result;
|
||||
List<EntityLinkThreadCount> entityLinkThreadCounts = new ArrayList<>();
|
||||
AtomicInteger totalCount = new AtomicInteger(0);
|
||||
if (link == null) {
|
||||
// Get thread count of all entities
|
||||
result =
|
||||
dao.feedDAO()
|
||||
.listCountByEntityLink(
|
||||
StringUtils.EMPTY, "thread", StringUtils.EMPTY, Relationship.IS_ABOUT.ordinal(), isResolved);
|
||||
} else {
|
||||
EntityLink entityLink = EntityLink.parse(link);
|
||||
EntityReference reference = EntityUtil.validateEntityLink(entityLink);
|
||||
if (reference.getType().equals(Entity.USER)) {
|
||||
List<String> threadIds = new ArrayList<>(getUserThreadIds(reference));
|
||||
result = dao.feedDAO().listCountByThreads(threadIds, isResolved);
|
||||
} else {
|
||||
result =
|
||||
dao.feedDAO()
|
||||
.listCountByEntityLink(
|
||||
entityLink.getFullyQualifiedFieldValue(),
|
||||
"thread",
|
||||
entityLink.getFullyQualifiedFieldType(),
|
||||
Relationship.IS_ABOUT.ordinal(),
|
||||
isResolved);
|
||||
}
|
||||
}
|
||||
result.forEach(
|
||||
l -> {
|
||||
int count = Integer.parseInt(l.get(1));
|
||||
entityLinkThreadCounts.add(new EntityLinkThreadCount().withEntityLink(l.get(0)).withCount(count));
|
||||
totalCount.addAndGet(count);
|
||||
});
|
||||
threadCount.withTotalCount(totalCount.get());
|
||||
threadCount.withCounts(entityLinkThreadCounts);
|
||||
return threadCount;
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public List<Thread> listThreads(String link) throws IOException {
|
||||
if (link == null) {
|
||||
@ -157,23 +206,7 @@ public class FeedRepository {
|
||||
// TODO remove hardcoding of thread
|
||||
// For a user entitylink get created or replied relationships to the thread
|
||||
if (reference.getType().equals(Entity.USER)) {
|
||||
// TODO: Add user mentioned threads as well
|
||||
threadIds.addAll(
|
||||
dao.relationshipDAO()
|
||||
.findTo(
|
||||
reference.getName(),
|
||||
reference.getType(),
|
||||
Relationship.CREATED.ordinal(),
|
||||
"thread",
|
||||
toBoolean(Include.NON_DELETED)));
|
||||
threadIds.addAll(
|
||||
dao.relationshipDAO()
|
||||
.findTo(
|
||||
reference.getName(),
|
||||
reference.getType(),
|
||||
Relationship.REPLIED_TO.ordinal(),
|
||||
"thread",
|
||||
toBoolean(Include.NON_DELETED)));
|
||||
threadIds.addAll(getUserThreadIds(reference));
|
||||
} else {
|
||||
// Only data assets are added as about
|
||||
result =
|
||||
@ -196,7 +229,29 @@ public class FeedRepository {
|
||||
}
|
||||
}
|
||||
// sort the list by thread updated timestamp before returning
|
||||
threads.sort(Comparator.comparing(Thread::getUpdatedAt));
|
||||
threads.sort(Comparator.comparing(Thread::getUpdatedAt, Comparator.reverseOrder()));
|
||||
return threads;
|
||||
}
|
||||
|
||||
private List<String> getUserThreadIds(EntityReference user) {
|
||||
List<String> threadIds = new ArrayList<>();
|
||||
// TODO: Add user mentioned threads as well
|
||||
threadIds.addAll(
|
||||
dao.relationshipDAO()
|
||||
.findTo(
|
||||
user.getName(),
|
||||
user.getType(),
|
||||
Relationship.CREATED.ordinal(),
|
||||
"thread",
|
||||
toBoolean(Include.NON_DELETED)));
|
||||
threadIds.addAll(
|
||||
dao.relationshipDAO()
|
||||
.findTo(
|
||||
user.getName(),
|
||||
user.getType(),
|
||||
Relationship.REPLIED_TO.ordinal(),
|
||||
"thread",
|
||||
toBoolean(Include.NON_DELETED)));
|
||||
return threadIds;
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import javax.validation.Valid;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.Path;
|
||||
@ -38,6 +39,7 @@ import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.SecurityContext;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import org.openmetadata.catalog.api.feed.CreateThread;
|
||||
import org.openmetadata.catalog.api.feed.ThreadCount;
|
||||
import org.openmetadata.catalog.entity.feed.Thread;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.catalog.jdbi3.FeedRepository;
|
||||
@ -96,7 +98,7 @@ public class FeedResource {
|
||||
@Context UriInfo uriInfo,
|
||||
@Parameter(
|
||||
description = "Filter threads by entity link",
|
||||
schema = @Schema(type = "string", example = "<E#/{entityType}/{entityFQN}>"))
|
||||
schema = @Schema(type = "string", example = "<E#/{entityType}/{entityFQN}/{fieldName}>"))
|
||||
@QueryParam("entityLink")
|
||||
String entityLink)
|
||||
throws IOException {
|
||||
@ -120,6 +122,33 @@ public class FeedResource {
|
||||
return addHref(uriInfo, dao.get(id));
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/count")
|
||||
@Operation(
|
||||
summary = "count of threads",
|
||||
tags = "feeds",
|
||||
description = "Get a count of threads, optionally filtered by `entityLink` for each of the entities.",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "Count of threads",
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = ThreadCount.class)))
|
||||
})
|
||||
public ThreadCount getThreadCount(
|
||||
@Context UriInfo uriInfo,
|
||||
@Parameter(
|
||||
description = "Filter threads by entity link",
|
||||
schema = @Schema(type = "string", example = "<E#/{entityType}/{entityFQN}/{fieldName}>"))
|
||||
@QueryParam("entityLink")
|
||||
String entityLink,
|
||||
@Parameter(description = "Filter threads by whether it is active or resolved", schema = @Schema(type = "boolean"))
|
||||
@DefaultValue("false")
|
||||
@QueryParam("isResolved")
|
||||
Boolean isResolved)
|
||||
throws IOException {
|
||||
return dao.getThreadsCount(entityLink, isResolved);
|
||||
}
|
||||
|
||||
@POST
|
||||
@Operation(
|
||||
summary = "Create a thread",
|
||||
@ -136,7 +165,7 @@ public class FeedResource {
|
||||
throws IOException, ParseException {
|
||||
Thread thread = getThread(securityContext, create);
|
||||
FeedUtil.addPost(thread, new Post().withMessage(create.getMessage()).withFrom(create.getFrom()));
|
||||
addHref(uriInfo, dao.create(thread));
|
||||
addHref(uriInfo, dao.create(thread, securityContext));
|
||||
return Response.created(thread.getHref()).entity(thread).build();
|
||||
}
|
||||
|
||||
|
||||
@ -139,4 +139,14 @@ public final class SecurityUtil {
|
||||
}
|
||||
return target.request();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if authentication is enabled.
|
||||
*
|
||||
* @param securityContext security context
|
||||
* @return true if jwt filter based authentication is enabled, false otherwise
|
||||
*/
|
||||
public static boolean isSecurityEnabled(SecurityContext securityContext) {
|
||||
return !securityContext.getAuthenticationScheme().equals(SecurityContext.BASIC_AUTH);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,43 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/api/feed/threadCount.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "Count of threads related to an entity",
|
||||
"description": "This schema defines the type for reporting the count of threads related to an entity.",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.catalog.api.feed.ThreadCount",
|
||||
"definitions": {
|
||||
"entityLinkThreadCount": {
|
||||
"description": "Type used to return thread count per entity link.",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.catalog.api.feed.EntityLinkThreadCount",
|
||||
"properties": {
|
||||
"count": {
|
||||
"description": "Count of threads for the given entity link.",
|
||||
"type": "integer",
|
||||
"minimum": 0
|
||||
},
|
||||
"entityLink": {
|
||||
"$ref": "../../type/basic.json#/definitions/entityLink"
|
||||
}
|
||||
},
|
||||
"required": ["count", "entityLink"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"totalCount": {
|
||||
"description": "Total count of all the threads.",
|
||||
"type": "integer",
|
||||
"minimum": 0
|
||||
},
|
||||
"counts": {
|
||||
"description": "",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/entityLinkThreadCount"
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["counts", "totalCount"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
@ -58,9 +58,9 @@
|
||||
"format": "date-time"
|
||||
},
|
||||
"entityLink": {
|
||||
"description": "Link to an entity or field within an entity using this format `<#E/{entities}/{entityType}/{field}/{fieldValue}`.",
|
||||
"description": "Link to an entity or field within an entity using this format `<#E/{entities}/{entityType}/{field}/{arrayFieldName}/{arrayFieldValue}`.",
|
||||
"type": "string",
|
||||
"pattern": "^<#E/\\S+/\\S+>$"
|
||||
"pattern": "^<#E\/\\S+\/\\S+>$"
|
||||
},
|
||||
"sqlQuery": {
|
||||
"description": "SQL query statement. Example - 'select * from orders'.",
|
||||
|
||||
@ -43,6 +43,8 @@ import org.openmetadata.catalog.CatalogApplicationTest;
|
||||
import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.api.data.CreateTable;
|
||||
import org.openmetadata.catalog.api.feed.CreateThread;
|
||||
import org.openmetadata.catalog.api.feed.EntityLinkThreadCount;
|
||||
import org.openmetadata.catalog.api.feed.ThreadCount;
|
||||
import org.openmetadata.catalog.entity.data.Table;
|
||||
import org.openmetadata.catalog.entity.feed.Thread;
|
||||
import org.openmetadata.catalog.entity.teams.Team;
|
||||
@ -196,6 +198,10 @@ public class FeedResourceTest extends CatalogApplicationTest {
|
||||
listThreads(TABLE_COLUMN_LINK, userAuthHeaders).getData().size()); // About TABLE Column Description
|
||||
assertEquals(++totalThreadCount, listThreads(null, userAuthHeaders).getData().size()); // Overall threads
|
||||
}
|
||||
|
||||
// Test the /api/v1/feed/count API
|
||||
assertEquals(userThreadCount, listThreadsCount(USER_LINK, userAuthHeaders).getTotalCount());
|
||||
assertEquals(tableThreadCount, getThreadCount(TABLE_LINK, userAuthHeaders));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -304,4 +310,18 @@ public class FeedResourceTest extends CatalogApplicationTest {
|
||||
target = entityLink != null ? target.queryParam("entityLink", entityLink) : target;
|
||||
return TestUtils.get(target, ThreadList.class, authHeaders);
|
||||
}
|
||||
|
||||
public static ThreadCount listThreadsCount(String entityLink, Map<String, String> authHeaders)
|
||||
throws HttpResponseException {
|
||||
WebTarget target = getResource("feed/count");
|
||||
target = entityLink != null ? target.queryParam("entityLink", entityLink) : target;
|
||||
return TestUtils.get(target, ThreadCount.class, authHeaders);
|
||||
}
|
||||
|
||||
private int getThreadCount(String entityLink, Map<String, String> authHeaders) throws HttpResponseException {
|
||||
List<EntityLinkThreadCount> linkThreadCount = listThreadsCount(entityLink, authHeaders).getCounts();
|
||||
EntityLinkThreadCount threadCount =
|
||||
linkThreadCount.stream().filter(l -> l.getEntityLink().equals(entityLink)).findFirst().orElseThrow();
|
||||
return (int) threadCount.getCount();
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user