mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-06 12:36:56 +00:00
Cleanup FeedRepository and ChangeEventHandler (#11372)
This commit is contained in:
parent
a972fa8670
commit
82dbf49ad5
@ -38,6 +38,7 @@ import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.exception.EntityNotFoundException;
|
||||
import org.openmetadata.service.jdbi3.EntityDAO;
|
||||
import org.openmetadata.service.jdbi3.EntityRepository;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
|
||||
@Slf4j
|
||||
@ -263,6 +264,10 @@ public final class Entity {
|
||||
: getEntityByName(ref.getType(), ref.getFullyQualifiedName(), fields, include);
|
||||
}
|
||||
|
||||
public static <T> T getEntity(EntityLink link, String fields, Include include) throws IOException {
|
||||
return getEntityByName(link.getEntityType(), link.getEntityFQN(), fields, include);
|
||||
}
|
||||
|
||||
/** Retrieve the entity using id from given entity reference and fields */
|
||||
public static <T> T getEntity(String entityType, UUID id, String fields, Include include) throws IOException {
|
||||
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
|
||||
|
||||
@ -99,23 +99,7 @@ public class ChangeEventHandler implements EventHandler {
|
||||
for (Thread thread : listOrEmpty(getThreads(responseContext, loggedInUserName))) {
|
||||
// Don't create a thread if there is no message
|
||||
if (thread.getMessage() != null && !thread.getMessage().isEmpty()) {
|
||||
EntityInterface entity;
|
||||
// In case of ENTITY_FIELDS_CHANGED entity from responseContext will be a ChangeEvent
|
||||
if (responseContext.getEntity() instanceof ChangeEvent) {
|
||||
ChangeEvent change = (ChangeEvent) responseContext.getEntity();
|
||||
entity = (EntityInterface) change.getEntity();
|
||||
} else {
|
||||
entity = (EntityInterface) responseContext.getEntity();
|
||||
}
|
||||
EntityReference entityReference = entity.getEntityReference();
|
||||
EntityReference owner;
|
||||
try {
|
||||
owner = Entity.getOwner(entityReference);
|
||||
} catch (Exception exception) {
|
||||
owner = null;
|
||||
}
|
||||
EntityLink about = EntityLink.parse(thread.getAbout());
|
||||
feedDao.create(thread, entity.getId(), owner, about);
|
||||
feedDao.create(thread);
|
||||
String jsonThread = mapper.writeValueAsString(thread);
|
||||
WebSocketManager.getInstance().broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread);
|
||||
}
|
||||
@ -223,8 +207,6 @@ public class ChangeEventHandler implements EventHandler {
|
||||
|
||||
private List<Thread> getThreads(ContainerResponseContext responseContext, String loggedInUserName) {
|
||||
Object entity = responseContext.getEntity();
|
||||
String changeType = responseContext.getHeaderString(RestUtil.CHANGE_CUSTOM_HEADER);
|
||||
|
||||
if (entity == null) {
|
||||
return Collections.emptyList(); // Response has no entity to produce change event from
|
||||
}
|
||||
@ -241,6 +223,7 @@ public class ChangeEventHandler implements EventHandler {
|
||||
}
|
||||
|
||||
EntityInterface entityInterface = (EntityInterface) entity;
|
||||
String changeType = responseContext.getHeaderString(RestUtil.CHANGE_CUSTOM_HEADER);
|
||||
if (RestUtil.ENTITY_SOFT_DELETED.equals(changeType)) {
|
||||
String entityType = Entity.getEntityTypeFromClass(entity.getClass());
|
||||
String message = String.format("Soft deleted **%s**: `%s`", entityType, entityInterface.getFullyQualifiedName());
|
||||
|
||||
@ -60,6 +60,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.json.JSONObject;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.feed.CloseTask;
|
||||
import org.openmetadata.schema.api.feed.EntityLinkThreadCount;
|
||||
import org.openmetadata.schema.api.feed.ResolveTask;
|
||||
@ -127,24 +128,27 @@ public class FeedRepository {
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public Thread create(Thread thread, UUID entityId, EntityReference entityOwner, EntityLink about) throws IOException {
|
||||
String createdBy = thread.getCreatedBy();
|
||||
User createdByUser = dao.userDAO().findEntityByName(createdBy); // Validate user creating thread
|
||||
thread.withEntityId(entityId); // Add entity id to thread
|
||||
public Thread create(Thread thread) throws IOException {
|
||||
// Validate about data entity is valid and get the owner for that entity
|
||||
EntityLink about = EntityLink.parse(thread.getAbout());
|
||||
EntityInterface aboutEntity = Entity.getEntity(about, "owner", ALL);
|
||||
thread.withEntityId(aboutEntity.getId()); // Add entity id to thread
|
||||
EntityReference entityOwner = aboutEntity.getOwner();
|
||||
|
||||
// Validate user creating the thread
|
||||
User createdByUser = SubjectCache.getInstance().getUser(thread.getCreatedBy());
|
||||
|
||||
// if thread is of type "task", assign a taskId
|
||||
if (thread.getType().equals(ThreadType.Task)) {
|
||||
thread.withTask(thread.getTask().withId(getNextTaskId()));
|
||||
}
|
||||
|
||||
// if thread is of type "announcement", validate start and end time
|
||||
if (thread.getType().equals(ThreadType.Announcement)) {
|
||||
thread.withTask(thread.getTask().withId(getNextTaskId())); // Assign taskId for a task
|
||||
} else if (thread.getType().equals(ThreadType.Announcement)) {
|
||||
// Validate start and end time for announcement
|
||||
validateAnnouncement(thread.getAnnouncement());
|
||||
long startTime = thread.getAnnouncement().getStartTime();
|
||||
long endTime = thread.getAnnouncement().getEndTime();
|
||||
// TODO fix this - overlapping announcements should be allowed
|
||||
List<String> announcements =
|
||||
dao.feedDAO().listAnnouncementBetween(thread.getId().toString(), entityId.toString(), startTime, endTime);
|
||||
dao.feedDAO()
|
||||
.listAnnouncementBetween(thread.getId().toString(), thread.getEntityId().toString(), startTime, endTime);
|
||||
if (announcements.size() > 0) {
|
||||
// There is already an announcement that overlaps the new one
|
||||
throw new IllegalArgumentException(ANNOUNCEMENT_OVERLAP);
|
||||
@ -157,8 +161,7 @@ public class FeedRepository {
|
||||
// Add relationship User -- created --> Thread relationship
|
||||
dao.relationshipDAO().insert(createdByUser.getId(), thread.getId(), Entity.USER, Entity.THREAD, CREATED.ordinal());
|
||||
|
||||
// Add field relationship data asset Thread -- isAbout ---> entity/entityField
|
||||
// relationship
|
||||
// Add field relationship for data asset - Thread -- isAbout ---> entity/entityField
|
||||
dao.fieldRelationshipDAO()
|
||||
.insert(
|
||||
thread.getId().toString(), // from FQN
|
||||
@ -176,24 +179,9 @@ public class FeedRepository {
|
||||
|
||||
// Add mentions to field relationship table
|
||||
storeMentions(thread, thread.getMessage());
|
||||
|
||||
return thread;
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public Thread create(Thread thread) throws IOException {
|
||||
// Validate about data entity is valid
|
||||
EntityLink about = EntityLink.parse(thread.getAbout());
|
||||
EntityReference aboutRef = EntityUtil.validateEntityLink(about);
|
||||
|
||||
// Get owner for the addressed to Entity
|
||||
EntityReference owner = Entity.getOwner(aboutRef);
|
||||
|
||||
UUID entityId = aboutRef.getId();
|
||||
|
||||
return create(thread, entityId, owner, about);
|
||||
}
|
||||
|
||||
public Thread get(String id) throws IOException {
|
||||
Thread thread = EntityUtil.validate(id, dao.feedDAO().findById(id), Thread.class);
|
||||
sortPosts(thread);
|
||||
@ -427,9 +415,9 @@ public class FeedRepository {
|
||||
throws IOException {
|
||||
// perform the task
|
||||
TaskDetails task = thread.getTask();
|
||||
EntityLink entityLink = EntityLink.parse(thread.getAbout());
|
||||
EntityReference reference = EntityUtil.validateEntityLink(entityLink);
|
||||
performTask(task, entityLink, reference, uriInfo, resolveTask.getNewValue(), user);
|
||||
EntityLink about = EntityLink.parse(thread.getAbout());
|
||||
EntityReference aboutRef = EntityUtil.validateEntityLink(about);
|
||||
performTask(task, about, aboutRef, uriInfo, resolveTask.getNewValue(), user);
|
||||
|
||||
// Update the attributes
|
||||
task.withNewValue(resolveTask.getNewValue());
|
||||
@ -520,36 +508,25 @@ public class FeedRepository {
|
||||
|
||||
@Transaction
|
||||
public Thread addPostToThread(String id, Post post, String userName) throws IOException {
|
||||
// Query 1 - validate the user posting the message
|
||||
User fromUser = dao.userDAO().findEntityByName(post.getFrom());
|
||||
// Validate the user posting the message
|
||||
User fromUser = SubjectCache.getInstance().getUser(post.getFrom());
|
||||
|
||||
// Query 2 - Find the thread
|
||||
// Update the thread with the new post
|
||||
Thread thread = EntityUtil.validate(id, dao.feedDAO().findById(id), Thread.class);
|
||||
thread.withUpdatedBy(userName).withUpdatedAt(System.currentTimeMillis());
|
||||
FeedUtil.addPost(thread, post);
|
||||
|
||||
// TODO is rewriting entire json okay?
|
||||
// Query 3 - update the JSON document for the feed
|
||||
dao.feedDAO().update(id, JsonUtils.pojoToJson(thread));
|
||||
|
||||
// Query 4 - Add relation User -- repliedTo --> Thread
|
||||
// Add relation User -- repliedTo --> Thread
|
||||
// Add relationship from thread to the user entity that is posting a reply
|
||||
boolean relationAlreadyExists = false;
|
||||
for (Post p : thread.getPosts()) {
|
||||
if (p.getFrom().equals(post.getFrom())) {
|
||||
relationAlreadyExists = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
boolean relationAlreadyExists = thread.getPosts().stream().anyMatch(p -> p.getFrom().equals(post.getFrom()));
|
||||
if (!relationAlreadyExists) {
|
||||
dao.relationshipDAO().insert(fromUser.getId(), thread.getId(), Entity.USER, Entity.THREAD, REPLIED_TO.ordinal());
|
||||
}
|
||||
|
||||
// Add mentions into field relationship table
|
||||
storeMentions(thread, post.getMessage());
|
||||
|
||||
sortPostsInThreads(List.of(thread));
|
||||
|
||||
return thread;
|
||||
}
|
||||
|
||||
@ -573,7 +550,6 @@ public class FeedRepository {
|
||||
.withPostsCount(posts.size());
|
||||
// update the json document
|
||||
dao.feedDAO().update(thread.getId().toString(), JsonUtils.pojoToJson(thread));
|
||||
|
||||
return new DeleteResponse<>(post, RestUtil.ENTITY_DELETED);
|
||||
}
|
||||
|
||||
@ -594,16 +570,9 @@ public class FeedRepository {
|
||||
return new DeleteResponse<>(thread, RestUtil.ENTITY_DELETED);
|
||||
}
|
||||
|
||||
public EntityReference getOwnerReference(String username) {
|
||||
return dao.userDAO().findEntityByName(username).getEntityReference();
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public ThreadCount getThreadsCount(FeedFilter filter, String link) {
|
||||
ThreadCount threadCount = new ThreadCount();
|
||||
List<List<String>> result;
|
||||
List<EntityLinkThreadCount> entityLinkThreadCounts = new ArrayList<>();
|
||||
AtomicInteger totalCount = new AtomicInteger(0);
|
||||
if (link == null) {
|
||||
// Get thread count of all entities
|
||||
result =
|
||||
@ -642,20 +611,20 @@ public class FeedRepository {
|
||||
filter.getResolved());
|
||||
}
|
||||
}
|
||||
|
||||
AtomicInteger totalCount = new AtomicInteger(0);
|
||||
List<EntityLinkThreadCount> entityLinkThreadCounts = new ArrayList<>();
|
||||
result.forEach(
|
||||
l -> {
|
||||
int count = Integer.parseInt(l.get(1));
|
||||
entityLinkThreadCounts.add(new EntityLinkThreadCount().withEntityLink(l.get(0)).withCount(count));
|
||||
totalCount.addAndGet(count);
|
||||
});
|
||||
threadCount.withTotalCount(totalCount.get());
|
||||
threadCount.withCounts(entityLinkThreadCounts);
|
||||
return threadCount;
|
||||
return new ThreadCount().withTotalCount(totalCount.get()).withCounts(entityLinkThreadCounts);
|
||||
}
|
||||
|
||||
public List<Post> listPosts(String threadId) throws IOException {
|
||||
Thread thread = get(threadId);
|
||||
return thread.getPosts();
|
||||
return get(threadId).getPosts();
|
||||
}
|
||||
|
||||
/** List threads based on the filters and limits in the order of the updated timestamp. */
|
||||
@ -830,38 +799,36 @@ public class FeedRepository {
|
||||
|
||||
public void checkPermissionsForResolveTask(Thread thread, SecurityContext securityContext, Authorizer authorizer)
|
||||
throws IOException {
|
||||
if (thread.getType().equals(ThreadType.Task)) {
|
||||
TaskDetails taskDetails = thread.getTask();
|
||||
List<EntityReference> assignees = taskDetails.getAssignees();
|
||||
String createdBy = thread.getCreatedBy();
|
||||
// Validate about data entity is valid
|
||||
EntityLink about = EntityLink.parse(thread.getAbout());
|
||||
EntityReference aboutRef = EntityUtil.validateEntityLink(about);
|
||||
if (!thread.getType().equals(ThreadType.Task)) {
|
||||
return; // Nothing to resolve
|
||||
}
|
||||
List<EntityReference> assignees = thread.getTask().getAssignees();
|
||||
EntityLink about = EntityLink.parse(thread.getAbout());
|
||||
EntityReference aboutRef = EntityUtil.validateEntityLink(about);
|
||||
|
||||
// Get owner for the addressed to Entity
|
||||
EntityReference owner = Entity.getOwner(aboutRef);
|
||||
// Get owner for the addressed to Entity
|
||||
EntityReference owner = Entity.getOwner(aboutRef);
|
||||
|
||||
String userName = securityContext.getUserPrincipal().getName();
|
||||
User loggedInUser = findUserByName(userName);
|
||||
List<EntityReference> teams =
|
||||
populateEntityReferences(
|
||||
dao.relationshipDAO()
|
||||
.findFrom(loggedInUser.getId().toString(), Entity.USER, Relationship.HAS.ordinal(), Entity.TEAM),
|
||||
Entity.TEAM);
|
||||
List<String> teamNames = teams.stream().map(EntityReference::getName).collect(Collectors.toList());
|
||||
String userName = securityContext.getUserPrincipal().getName();
|
||||
User loggedInUser = SubjectCache.getInstance().getUser(userName);
|
||||
List<EntityReference> teams =
|
||||
populateEntityReferences(
|
||||
dao.relationshipDAO()
|
||||
.findFrom(loggedInUser.getId().toString(), Entity.USER, Relationship.HAS.ordinal(), Entity.TEAM),
|
||||
Entity.TEAM);
|
||||
List<String> teamNames = teams.stream().map(EntityReference::getName).collect(Collectors.toList());
|
||||
|
||||
// check if logged-in user satisfies any of the following
|
||||
// - Creator of the task
|
||||
// - logged-in user or the teams they belong to were assigned the task
|
||||
// - logged-in user or the teams they belong to, owns the entity that the task is about
|
||||
if (!createdBy.equals(userName)
|
||||
&& assignees.stream().noneMatch(assignee -> assignee.getName().equals(userName))
|
||||
&& assignees.stream().noneMatch(assignee -> teamNames.contains(assignee.getName()))
|
||||
&& !owner.getName().equals(userName)
|
||||
&& !teamNames.contains(owner.getName())) {
|
||||
// Only admins or bots can close or resolve task other than the above-mentioned users
|
||||
authorizer.authorizeAdmin(securityContext);
|
||||
}
|
||||
// check if logged-in user satisfies any of the following
|
||||
// - Creator of the task
|
||||
// - logged-in user or the teams they belong to were assigned the task
|
||||
// - logged-in user or the teams they belong to, owns the entity that the task is about
|
||||
if (!thread.getCreatedBy().equals(userName)
|
||||
&& assignees.stream().noneMatch(assignee -> assignee.getName().equals(userName))
|
||||
&& assignees.stream().noneMatch(assignee -> teamNames.contains(assignee.getName()))
|
||||
&& !owner.getName().equals(userName)
|
||||
&& !teamNames.contains(owner.getName())) {
|
||||
// Only admins or bots can close or resolve task other than the above-mentioned users
|
||||
authorizer.authorizeAdmin(securityContext);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1135,8 +1102,4 @@ public class FeedRepository {
|
||||
this.totalCount = totalCount;
|
||||
}
|
||||
}
|
||||
|
||||
public User findUserByName(String userName) {
|
||||
return dao.userDAO().findEntityByName(userName);
|
||||
}
|
||||
}
|
||||
|
||||
@ -466,7 +466,7 @@ public class FeedResource {
|
||||
Thread thread = dao.get(threadId);
|
||||
// delete thread only if the admin/bot/author tries to delete it
|
||||
OperationContext operationContext = new OperationContext(Entity.THREAD, MetadataOperation.DELETE);
|
||||
ResourceContextInterface resourceContext = new ThreadResourceContext(dao.getOwnerReference(thread.getCreatedBy()));
|
||||
ResourceContextInterface resourceContext = new ThreadResourceContext(thread.getCreatedBy());
|
||||
authorizer.authorize(securityContext, operationContext, resourceContext);
|
||||
return dao.deleteThread(thread, securityContext.getUserPrincipal().getName()).toResponse();
|
||||
}
|
||||
@ -497,7 +497,7 @@ public class FeedResource {
|
||||
// delete post only if the admin/bot/author tries to delete it
|
||||
// TODO fix this
|
||||
OperationContext operationContext = new OperationContext(Entity.THREAD, MetadataOperation.DELETE);
|
||||
ResourceContextInterface resourceContext = new PostResourceContext(dao.getOwnerReference(post.getFrom()));
|
||||
ResourceContextInterface resourceContext = new PostResourceContext(post.getFrom());
|
||||
authorizer.authorize(securityContext, operationContext, resourceContext);
|
||||
return dao.deletePost(thread, post, securityContext.getUserPrincipal().getName()).toResponse();
|
||||
}
|
||||
|
||||
@ -8,10 +8,11 @@ import org.openmetadata.service.Entity;
|
||||
|
||||
/** Posts that are part of conversation threads require special handling */
|
||||
public class PostResourceContext implements ResourceContextInterface {
|
||||
private final EntityReference owner;
|
||||
// The user who posted to thread is the owner of that post
|
||||
private final String postedBy;
|
||||
|
||||
public PostResourceContext(EntityReference owner) {
|
||||
this.owner = owner;
|
||||
public PostResourceContext(String postedBy) {
|
||||
this.postedBy = postedBy;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -21,7 +22,7 @@ public class PostResourceContext implements ResourceContextInterface {
|
||||
|
||||
@Override
|
||||
public EntityReference getOwner() {
|
||||
return owner;
|
||||
return SubjectCache.getInstance().getUser(postedBy).getEntityReference();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -34,6 +34,7 @@ import org.openmetadata.schema.entity.teams.Team;
|
||||
import org.openmetadata.schema.entity.teams.User;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.exception.EntityNotFoundException;
|
||||
import org.openmetadata.service.jdbi3.TeamRepository;
|
||||
import org.openmetadata.service.jdbi3.UserRepository;
|
||||
@ -84,7 +85,7 @@ public class SubjectCache {
|
||||
try {
|
||||
return USER_CACHE.get(userName);
|
||||
} catch (ExecutionException | UncheckedExecutionException ex) {
|
||||
throw new EntityNotFoundException(ex.getMessage());
|
||||
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(Entity.USER, userName));
|
||||
}
|
||||
}
|
||||
|
||||
@ -92,7 +93,7 @@ public class SubjectCache {
|
||||
try {
|
||||
return USER_CACHE_WIH_ID.get(userId);
|
||||
} catch (ExecutionException | UncheckedExecutionException ex) {
|
||||
throw new EntityNotFoundException(ex.getMessage());
|
||||
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(Entity.USER, userId));
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,7 +101,7 @@ public class SubjectCache {
|
||||
try {
|
||||
return USER_CACHE.get(userName).getUser();
|
||||
} catch (ExecutionException | UncheckedExecutionException ex) {
|
||||
throw new EntityNotFoundException(ex.getMessage());
|
||||
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(Entity.USER, userName));
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,7 +113,7 @@ public class SubjectCache {
|
||||
try {
|
||||
return USER_CACHE_WIH_ID.get(userId).getUser();
|
||||
} catch (ExecutionException | UncheckedExecutionException ex) {
|
||||
throw new EntityNotFoundException(ex.getMessage());
|
||||
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(Entity.USER, userId));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -8,10 +8,11 @@ import org.openmetadata.service.Entity;
|
||||
|
||||
/** Conversation threads require special handling */
|
||||
public class ThreadResourceContext implements ResourceContextInterface {
|
||||
private final EntityReference owner;
|
||||
// User who created the thread is the owner of thread entity
|
||||
private final String createdBy;
|
||||
|
||||
public ThreadResourceContext(EntityReference owner) {
|
||||
this.owner = owner;
|
||||
public ThreadResourceContext(String createdBy) {
|
||||
this.createdBy = createdBy;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -21,7 +22,7 @@ public class ThreadResourceContext implements ResourceContextInterface {
|
||||
|
||||
@Override
|
||||
public EntityReference getOwner() {
|
||||
return owner;
|
||||
return SubjectCache.getInstance().getUser(createdBy).getEntityReference();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -329,47 +329,38 @@ public final class ChangeEventParser {
|
||||
public static Map<EntityLink, String> getFormattedMessages(
|
||||
PublishTo publishTo, ChangeDescription changeDescription, EntityInterface entity) {
|
||||
// Store a map of entityLink -> message
|
||||
Map<EntityLink, String> messages;
|
||||
|
||||
List<FieldChange> fieldsUpdated = changeDescription.getFieldsUpdated();
|
||||
messages = getFormattedMessagesForAllFieldChange(publishTo, entity, fieldsUpdated, ChangeType.UPDATE);
|
||||
Map<EntityLink, String> messages =
|
||||
getFormattedMessagesForAllFieldChange(publishTo, entity, fieldsUpdated, ChangeType.UPDATE);
|
||||
|
||||
// fieldsAdded and fieldsDeleted need special handling since
|
||||
// there is a possibility to merge them as one update message.
|
||||
List<FieldChange> fieldsAdded = changeDescription.getFieldsAdded();
|
||||
List<FieldChange> fieldsDeleted = changeDescription.getFieldsDeleted();
|
||||
messages.putAll(mergeAdditionsDeletion(publishTo, entity, fieldsAdded, fieldsDeleted));
|
||||
|
||||
return messages;
|
||||
}
|
||||
|
||||
private static Map<EntityLink, String> getFormattedMessagesForAllFieldChange(
|
||||
PublishTo publishTo, EntityInterface entity, List<FieldChange> fields, ChangeType changeType) {
|
||||
Map<EntityLink, String> messages = new HashMap<>();
|
||||
|
||||
for (FieldChange field : fields) {
|
||||
// if field name has dots, then it is an array field
|
||||
String fieldName = field.getName();
|
||||
String newFieldValue;
|
||||
String oldFieldValue;
|
||||
EntityLink link = getEntityLink(fieldName, entity);
|
||||
if (entity.getEntityReference().getType().equals(Entity.QUERY) && fieldName.equals("queryUsedIn")) {
|
||||
String message =
|
||||
handleQueryUsage(field.getNewValue(), field.getOldValue(), entity, publishTo, changeType, link);
|
||||
String entityType = link.getEntityType();
|
||||
String newFieldValue = getFieldValue(field.getNewValue());
|
||||
String oldFieldValue = getFieldValue(field.getOldValue());
|
||||
if (entityType.equals(Entity.QUERY) && fieldName.equals("queryUsedIn")) {
|
||||
String message = handleQueryUsage(newFieldValue, oldFieldValue, entity, publishTo, changeType, link);
|
||||
messages.put(link, message);
|
||||
return messages;
|
||||
} else {
|
||||
newFieldValue = getFieldValue(field.getNewValue());
|
||||
oldFieldValue = getFieldValue(field.getOldValue());
|
||||
}
|
||||
if (link.getEntityType().equals(TEST_CASE) && link.getFieldName().equals("testCaseResult")) {
|
||||
String message = handleTestCaseResult(publishTo, entity, field.getNewValue());
|
||||
} else if (entityType.equals(TEST_CASE) && link.getFieldName().equals("testCaseResult")) {
|
||||
String message = handleTestCaseResult(publishTo, entity, newFieldValue);
|
||||
messages.put(link, message);
|
||||
} else if (link.getEntityType().equals(KPI) && link.getFieldName().equals("kpiResult")) {
|
||||
String message = handleKpiResult(publishTo, entity, field.getNewValue());
|
||||
} else if (entityType.equals(KPI) && link.getFieldName().equals("kpiResult")) {
|
||||
String message = handleKpiResult(publishTo, entity, newFieldValue);
|
||||
messages.put(link, message);
|
||||
} else if (link.getEntityType().equals(INGESTION_PIPELINE) && link.getFieldName().equals("pipelineStatus")) {
|
||||
String message = handleIngestionPipelineResult(publishTo, entity, field.getNewValue());
|
||||
} else if (entityType.equals(INGESTION_PIPELINE) && link.getFieldName().equals("pipelineStatus")) {
|
||||
String message = handleIngestionPipelineResult(publishTo, entity, newFieldValue);
|
||||
messages.put(link, message);
|
||||
} else if (!fieldName.equals("failureDetails")) {
|
||||
String message = createMessageForField(publishTo, link, changeType, fieldName, oldFieldValue, newFieldValue);
|
||||
@ -433,10 +424,9 @@ public final class ChangeEventParser {
|
||||
PublishTo publishTo,
|
||||
ChangeType changeType,
|
||||
EntityLink link) {
|
||||
String fieldName = "queryUsage";
|
||||
String newVal = getFieldValueForQuery(newValue, entity, publishTo);
|
||||
String oldVal = getFieldValueForQuery(oldValue, entity, publishTo);
|
||||
return createMessageForField(publishTo, link, changeType, fieldName, oldVal, newVal);
|
||||
return createMessageForField(publishTo, link, changeType, "queryUsage", oldVal, newVal);
|
||||
}
|
||||
|
||||
private static String getFieldValueForQuery(Object fieldValue, EntityInterface entity, PublishTo publishTo) {
|
||||
@ -683,15 +673,14 @@ public final class ChangeEventParser {
|
||||
getBold(publishTo),
|
||||
testCaseEntity.getTestSuite().getName());
|
||||
return String.format(format, testCaseName, result.getTestCaseStatus());
|
||||
} else {
|
||||
String format =
|
||||
String.format(
|
||||
"Test Case %s is updated in %s/%s",
|
||||
getBold(publishTo),
|
||||
EntityLink.parse(testCaseEntity.getEntityLink()).getEntityFQN(),
|
||||
testCaseEntity.getTestSuite().getName());
|
||||
return String.format(format, testCaseName);
|
||||
}
|
||||
String format =
|
||||
String.format(
|
||||
"Test Case %s is updated in %s/%s",
|
||||
getBold(publishTo),
|
||||
EntityLink.parse(testCaseEntity.getEntityLink()).getEntityFQN(),
|
||||
testCaseEntity.getTestSuite().getName());
|
||||
return String.format(format, testCaseName);
|
||||
}
|
||||
|
||||
public static String handleIngestionPipelineResult(PublishTo publishTo, EntityInterface entity, Object newValue) {
|
||||
@ -701,10 +690,9 @@ public final class ChangeEventParser {
|
||||
String date = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(new Date(status.getEndDate()));
|
||||
String format = String.format("Ingestion Pipeline %s %s at %s", getBold(publishTo), getBold(publishTo), date);
|
||||
return String.format(format, ingestionPipelineName, status.getPipelineState());
|
||||
} else {
|
||||
String format = String.format("Ingestion Pipeline %s is updated", getBold(publishTo));
|
||||
return String.format(format, ingestionPipelineName);
|
||||
}
|
||||
String format = String.format("Ingestion Pipeline %s is updated", getBold(publishTo));
|
||||
return String.format(format, ingestionPipelineName);
|
||||
}
|
||||
|
||||
public static String handleKpiResult(PublishTo publishTo, EntityInterface entity, Object newValue) {
|
||||
@ -717,10 +705,9 @@ public final class ChangeEventParser {
|
||||
getBold(publishTo), getBold(publishTo), getBold(publishTo), getBold(publishTo));
|
||||
KpiTarget target = result.getTargetResult().get(0);
|
||||
return String.format(format, kpiName, target.getName(), target.getValue(), target.getTargetMet());
|
||||
} else {
|
||||
String format = String.format("KpiResult %s is updated.", getBold(publishTo));
|
||||
return String.format(format, kpiName);
|
||||
}
|
||||
String format = String.format("KpiResult %s is updated.", getBold(publishTo));
|
||||
return String.format(format, kpiName);
|
||||
}
|
||||
|
||||
public static String getPlaintextDiff(PublishTo publishTo, String oldValue, String newValue) {
|
||||
|
||||
@ -195,9 +195,6 @@ public final class EntityUtil {
|
||||
public static EntityReference validateEntityLink(EntityLink entityLink) {
|
||||
String entityType = entityLink.getEntityType();
|
||||
String fqn = entityLink.getEntityFQN();
|
||||
|
||||
// TODO: add more validation for field name and array fields
|
||||
|
||||
return Entity.getEntityReferenceByName(entityType, fqn, ALL);
|
||||
}
|
||||
|
||||
|
||||
@ -1081,7 +1081,7 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
|
||||
|
||||
// This should return error since the table is owned by a team
|
||||
// and for the filter we are passing team id instead of user id
|
||||
assertResponseContains(
|
||||
assertResponse(
|
||||
() -> listThreadsWithFilter(ownerId2, FilterType.OWNER.toString(), AUTH_HEADERS),
|
||||
NOT_FOUND,
|
||||
entityNotFound(Entity.USER, ownerId2));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user