From cd2c61ad5ea2ccc4767dc3ab9576026ac5a938c1 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Wed, 6 Sep 2023 07:11:16 -0700 Subject: [PATCH] Cleanup task resolution and closing with task workflows (#13097) --- .../service/events/ChangeEventHandler.java | 2 +- .../service/jdbi3/ContainerRepository.java | 82 ++++--- .../jdbi3/DashboardDataModelRepository.java | 79 ++++--- .../service/jdbi3/DashboardRepository.java | 45 ++-- .../service/jdbi3/EntityRepository.java | 75 +++++-- .../service/jdbi3/FeedRepository.java | 205 +++++++++++------- .../service/jdbi3/MlModelRepository.java | 82 ++++--- .../service/jdbi3/PipelineRepository.java | 83 +++++-- .../service/jdbi3/SearchIndexRepository.java | 119 ++++++---- .../service/jdbi3/TableRepository.java | 109 ++++++---- .../service/jdbi3/TopicRepository.java | 122 +++++++---- .../openmetadata/service/util/EntityUtil.java | 8 + .../openmetadata/service/util/FeedUtils.java | 3 +- .../resources/feeds/FeedResourceTest.java | 53 +++-- 14 files changed, 686 insertions(+), 381 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java index 87bd78d97a0..542c23e61c3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java @@ -86,7 +86,7 @@ public class ChangeEventHandler implements EventHandler { for (Thread thread : listOrEmpty(FeedUtils.getThreads(changeEvent, loggedInUserName))) { // Don't create a thread if there is no message if (thread.getMessage() != null && !thread.getMessage().isEmpty()) { - feedDao.create(thread, responseContext); + feedDao.create(thread); String jsonThread = mapper.writeValueAsString(thread); WebSocketManager.getInstance() .broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java index b26c38e3596..d150e933a33 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java @@ -4,6 +4,7 @@ import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.schema.type.Include.ALL; import static org.openmetadata.service.Entity.CONTAINER; +import static org.openmetadata.service.Entity.DASHBOARD_DATA_MODEL; import static org.openmetadata.service.Entity.FIELD_PARENT; import static org.openmetadata.service.Entity.FIELD_TAGS; import static org.openmetadata.service.Entity.STORAGE_SERVICE; @@ -11,9 +12,10 @@ import static org.openmetadata.service.Entity.STORAGE_SERVICE; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.List; -import javax.json.JsonPatch; import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.entity.data.Container; +import org.openmetadata.schema.entity.data.DashboardDataModel; import org.openmetadata.schema.entity.services.StorageService; import org.openmetadata.schema.type.Column; import org.openmetadata.schema.type.ContainerFileFormat; @@ -21,10 +23,11 @@ import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Relationship; import org.openmetadata.schema.type.TagLabel; -import org.openmetadata.schema.type.TaskDetails; +import org.openmetadata.schema.type.TaskType; import org.openmetadata.service.Entity; -import org.openmetadata.service.exception.CatalogExceptionMessage; -import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow; +import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext; +import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; import org.openmetadata.service.resources.storages.ContainerResource; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.FullyQualifiedName; @@ -202,32 +205,57 @@ public class ContainerRepository extends EntityRepository { } @Override - public void update(TaskDetails task, MessageParser.EntityLink entityLink, String newValue, String user) { - // TODO move this as the first check + public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) { + validateTaskThread(threadContext); + EntityLink entityLink = threadContext.getAbout(); if (entityLink.getFieldName().equals("dataModel")) { - Container container = getByName(null, entityLink.getEntityFQN(), getFields("dataModel,tags"), Include.ALL, false); - Column column = - container.getDataModel().getColumns().stream() - .filter(c -> c.getName().equals(entityLink.getArrayFieldName())) - .findFirst() - .orElseThrow( - () -> - new IllegalArgumentException( - CatalogExceptionMessage.invalidFieldName("column", entityLink.getArrayFieldName()))); - - String origJson = JsonUtils.pojoToJson(container); - if (EntityUtil.isDescriptionTask(task.getType())) { - column.setDescription(newValue); - } else if (EntityUtil.isTagTask(task.getType())) { - List tags = JsonUtils.readObjects(newValue, TagLabel.class); - column.setTags(tags); + TaskType taskType = threadContext.getThread().getTask().getType(); + if (EntityUtil.isDescriptionTask(taskType)) { + return new DataModelDescriptionTaskWorkflow(threadContext); + } else if (EntityUtil.isTagTask(taskType)) { + return new DataModelTagTaskWorkflow(threadContext); + } else { + throw new IllegalArgumentException(String.format("Invalid task type %s", taskType)); } - String updatedEntityJson = JsonUtils.pojoToJson(container); - JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson); - patch(null, container.getId(), user, patch); - return; } - super.update(task, entityLink, newValue, user); + return super.getTaskWorkflow(threadContext); + } + + static class DataModelDescriptionTaskWorkflow extends DescriptionTaskWorkflow { + private final Column column; + + DataModelDescriptionTaskWorkflow(ThreadContext threadContext) { + super(threadContext); + DashboardDataModel dataModel = + Entity.getEntity(DASHBOARD_DATA_MODEL, threadContext.getAboutEntity().getId(), "dataModel", ALL); + threadContext.setAboutEntity(dataModel); + column = EntityUtil.findColumn(dataModel.getColumns(), getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + column.setDescription(resolveTask.getNewValue()); + return threadContext.getAboutEntity(); + } + } + + static class DataModelTagTaskWorkflow extends TagTaskWorkflow { + private final Column column; + + DataModelTagTaskWorkflow(ThreadContext threadContext) { + super(threadContext); + DashboardDataModel dataModel = + Entity.getEntity(DASHBOARD_DATA_MODEL, threadContext.getAboutEntity().getId(), "dataModel,tags", ALL); + threadContext.setAboutEntity(dataModel); + column = EntityUtil.findColumn(dataModel.getColumns(), getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + List tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class); + column.setTags(tags); + return threadContext.getAboutEntity(); + } } private void addDerivedColumnTags(List columns) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java index 081a5e067bf..6da4a9d2743 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java @@ -15,12 +15,14 @@ package org.openmetadata.service.jdbi3; import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.schema.type.Include.ALL; +import static org.openmetadata.service.Entity.DASHBOARD_DATA_MODEL; import static org.openmetadata.service.Entity.FIELD_TAGS; import java.util.List; -import javax.json.JsonPatch; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.entity.data.DashboardDataModel; import org.openmetadata.schema.entity.services.DashboardService; import org.openmetadata.schema.type.Column; @@ -28,9 +30,10 @@ import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Relationship; import org.openmetadata.schema.type.TagLabel; -import org.openmetadata.schema.type.TaskDetails; +import org.openmetadata.schema.type.TaskType; import org.openmetadata.service.Entity; -import org.openmetadata.service.exception.CatalogExceptionMessage; +import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow; +import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext; import org.openmetadata.service.resources.databases.DatabaseUtil; import org.openmetadata.service.resources.datamodels.DashboardDataModelResource; import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; @@ -60,31 +63,57 @@ public class DashboardDataModelRepository extends EntityRepository c.getName().equals(entityLink.getArrayFieldName())) - .findFirst() - .orElseThrow( - () -> - new IllegalArgumentException( - CatalogExceptionMessage.invalidFieldName("column", entityLink.getArrayFieldName()))); - if (EntityUtil.isDescriptionTask(task.getType())) { - column.setDescription(newValue); - } else if (EntityUtil.isTagTask(task.getType())) { - List tags = JsonUtils.readObjects(newValue, TagLabel.class); - column.setTags(tags); + TaskType taskType = threadContext.getThread().getTask().getType(); + if (EntityUtil.isDescriptionTask(taskType)) { + return new ColumnDescriptionTaskWorkflow(threadContext); + } else if (EntityUtil.isTagTask(taskType)) { + return new ColumnTagTaskWorkflow(threadContext); + } else { + throw new IllegalArgumentException(String.format("Invalid task type %s", taskType)); } - String updatedEntityJson = JsonUtils.pojoToJson(dashboardDataModel); - JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson); - patch(null, dashboardDataModel.getId(), user, patch); - return; } - super.update(task, entityLink, newValue, user); + return super.getTaskWorkflow(threadContext); + } + + static class ColumnDescriptionTaskWorkflow extends DescriptionTaskWorkflow { + private final Column column; + + ColumnDescriptionTaskWorkflow(ThreadContext threadContext) { + super(threadContext); + DashboardDataModel dataModel = + Entity.getEntity(DASHBOARD_DATA_MODEL, threadContext.getAboutEntity().getId(), "columns", ALL); + threadContext.setAboutEntity(dataModel); + column = EntityUtil.findColumn(dataModel.getColumns(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + column.setDescription(resolveTask.getNewValue()); + return threadContext.getAboutEntity(); + } + } + + static class ColumnTagTaskWorkflow extends TagTaskWorkflow { + private final Column column; + + ColumnTagTaskWorkflow(ThreadContext threadContext) { + super(threadContext); + DashboardDataModel dataModel = + Entity.getEntity(DASHBOARD_DATA_MODEL, threadContext.getAboutEntity().getId(), "columns,tags", ALL); + threadContext.setAboutEntity(dataModel); + column = EntityUtil.findColumn(dataModel.getColumns(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + List tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class); + column.setTags(tags); + return threadContext.getAboutEntity(); + } } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardRepository.java index b1e40432def..4c375caada1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardRepository.java @@ -15,18 +15,20 @@ package org.openmetadata.service.jdbi3; import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.schema.type.Include.ALL; +import static org.openmetadata.service.Entity.DASHBOARD; import static org.openmetadata.service.Entity.FIELD_DESCRIPTION; -import static org.openmetadata.service.Entity.FIELD_TAGS; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.entity.data.Chart; import org.openmetadata.schema.entity.data.Dashboard; import org.openmetadata.schema.entity.services.DashboardService; import org.openmetadata.schema.type.*; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.CatalogExceptionMessage; +import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow; +import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext; import org.openmetadata.service.resources.dashboards.DashboardResource; import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; import org.openmetadata.service.util.EntityUtil; @@ -56,27 +58,34 @@ public class DashboardRepository extends EntityRepository { } @Override - public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) { + public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) { + EntityLink entityLink = threadContext.getAbout(); if (entityLink.getFieldName().equals("charts")) { - Dashboard dashboard = getByName(null, entityLink.getEntityFQN(), getFields("charts,tags"), Include.ALL, false); - EntityReference chart = + TaskType taskType = threadContext.getThread().getTask().getType(); + if (!entityLink.getFieldName().equals(FIELD_DESCRIPTION)) { + // Only description field can be updated + throw new IllegalArgumentException( + CatalogExceptionMessage.invalidFieldForTask(entityLink.getFieldName(), taskType)); + } + return new ChartDescriptionTaskWorkflow(threadContext); + } + return super.getTaskWorkflow(threadContext); + } + + static class ChartDescriptionTaskWorkflow extends DescriptionTaskWorkflow { + ChartDescriptionTaskWorkflow(ThreadContext threadContext) { + super(threadContext); + Dashboard dashboard = Entity.getEntity(DASHBOARD, threadContext.getAboutEntity().getId(), "charts", ALL); + String chartName = threadContext.getAbout().getArrayFieldName(); + EntityReference chartReference = dashboard.getCharts().stream() - .filter(c -> c.getName().equals(entityLink.getArrayFieldName())) + .filter(c -> c.getName().equals(chartName)) .findFirst() .orElseThrow( - () -> - new IllegalArgumentException( - CatalogExceptionMessage.invalidFieldName("chart", entityLink.getArrayFieldName()))); - String fieldName = - EntityUtil.isDescriptionTask(task.getType()) - ? FIELD_DESCRIPTION - : EntityUtil.isTagTask(task.getType()) ? FIELD_TAGS : "invalidField"; - EntityLink chartLink = new EntityLink(Entity.CHART, chart.getFullyQualifiedName(), fieldName, null, null); - EntityRepository chartRepository = Entity.getEntityRepository(Entity.CHART); - chartRepository.update(task, chartLink, newValue, user); - return; + () -> new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("chart", chartName))); + Chart chart = Entity.getEntity(chartReference, "", ALL); + threadContext.setAboutEntity(chart); } - super.update(task, entityLink, newValue, user); } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index f4495f07a0c..b69de36170e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -81,6 +81,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.CheckForNull; import javax.json.JsonPatch; +import javax.validation.constraints.NotNull; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriInfo; import lombok.Getter; @@ -91,6 +92,7 @@ import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.VoteRequest; +import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.api.teams.CreateTeam; import org.openmetadata.schema.entity.classification.Tag; import org.openmetadata.schema.entity.data.GlossaryTerm; @@ -109,8 +111,8 @@ import org.openmetadata.schema.type.ProviderType; import org.openmetadata.schema.type.Relationship; import org.openmetadata.schema.type.TagLabel; import org.openmetadata.schema.type.TagLabel.TagSource; -import org.openmetadata.schema.type.TaskDetails; import org.openmetadata.schema.type.TaskType; +import org.openmetadata.schema.type.ThreadType; import org.openmetadata.schema.type.Votes; import org.openmetadata.schema.type.csv.CsvImportResult; import org.openmetadata.schema.utils.EntityInterfaceUtil; @@ -123,7 +125,8 @@ import org.openmetadata.service.exception.UnhandledServerException; import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord; import org.openmetadata.service.jdbi3.CollectionDAO.EntityVersionPair; import org.openmetadata.service.jdbi3.CollectionDAO.ExtensionRecord; -import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; +import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow; +import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext; import org.openmetadata.service.resources.tags.TagLabelUtil; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.EntityUtil.Fields; @@ -330,26 +333,6 @@ public abstract class EntityRepository { entity.setFullyQualifiedName(quoteName(entity.getName())); } - /** Update an entity based suggested description and tags in the task */ - public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) { - TaskType taskType = task.getType(); - T entity = getByName(null, entityLink.getEntityFQN(), getFields("tags"), Include.ALL, false); - String origJson = JsonUtils.pojoToJson(entity); - if (EntityUtil.isDescriptionTask(taskType) && entityLink.getFieldName().equals(FIELD_DESCRIPTION)) { - entity.setDescription(newValue); - } else if (supportsTags && EntityUtil.isTagTask(taskType) && entityLink.getFieldName().equals("tags")) { - List tags = JsonUtils.readObjects(newValue, TagLabel.class); - entity.setTags(tags); - } else { - // Not supported - throw new IllegalArgumentException( - CatalogExceptionMessage.invalidFieldForTask(entityLink.getFieldName(), task.getType())); - } - String updatedEntityJson = JsonUtils.pojoToJson(entity); - JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson); - patch(null, entity.getId(), user, patch); - } - /** * Initialize data from json files if seed data does not exist in corresponding tables. Seed data is stored under * openmetadata-service/src/main/resources/json/data/{entityType} @@ -1683,6 +1666,25 @@ public abstract class EntityRepository { return entity.getTags(); } + public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) { + validateTaskThread(threadContext); + TaskType taskType = threadContext.getThread().getTask().getType(); + if (EntityUtil.isDescriptionTask(taskType)) { + return new DescriptionTaskWorkflow(threadContext); + } else if (EntityUtil.isTagTask(taskType)) { + return new TagTaskWorkflow(threadContext); + } else { + throw new IllegalArgumentException(String.format("Invalid task type %s", taskType)); + } + } + + public void validateTaskThread(ThreadContext threadContext) { + ThreadType threadType = threadContext.getThread().getType(); + if (threadType != ThreadType.Task) { + throw new IllegalArgumentException(String.format("Thread type %s is not task related", threadType)); + } + } + public enum Operation { PUT, PATCH, @@ -2321,11 +2323,38 @@ public abstract class EntityRepository { static class EntityLoaderWithId extends CacheLoader, EntityInterface> { @Override - public EntityInterface load(@CheckForNull Pair idPair) throws IOException { + public EntityInterface load(@NotNull Pair idPair) throws IOException { String entityType = idPair.getLeft(); UUID id = idPair.getRight(); EntityRepository repository = Entity.getEntityRepository(entityType); return repository.getDao().findEntityById(id, ALL); } } + + public static class DescriptionTaskWorkflow extends TaskWorkflow { + DescriptionTaskWorkflow(ThreadContext threadContext) { + super(threadContext); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + EntityInterface aboutEntity = threadContext.getAboutEntity(); + aboutEntity.setDescription(resolveTask.getNewValue()); + return aboutEntity; + } + } + + public static class TagTaskWorkflow extends TaskWorkflow { + TagTaskWorkflow(ThreadContext threadContext) { + super(threadContext); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + List tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class); + EntityInterface aboutEntity = threadContext.getAboutEntity(); + aboutEntity.setTags(tags); + return aboutEntity; + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java index e8ffaa44049..20addc39030 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java @@ -22,7 +22,6 @@ import static org.openmetadata.schema.type.Relationship.CREATED; import static org.openmetadata.schema.type.Relationship.IS_ABOUT; import static org.openmetadata.schema.type.Relationship.REPLIED_TO; import static org.openmetadata.service.Entity.USER; -import static org.openmetadata.service.Entity.getEntityRepository; import static org.openmetadata.service.exception.CatalogExceptionMessage.ANNOUNCEMENT_INVALID_START_TIME; import static org.openmetadata.service.exception.CatalogExceptionMessage.ANNOUNCEMENT_OVERLAP; import static org.openmetadata.service.exception.CatalogExceptionMessage.entityNotFound; @@ -42,11 +41,11 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.json.JsonPatch; -import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.SecurityContext; import javax.ws.rs.core.UriInfo; import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.jdbi.v3.sqlobject.transaction.Transaction; @@ -58,8 +57,6 @@ import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.api.feed.ThreadCount; import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.entity.teams.User; -import org.openmetadata.schema.type.AnnouncementDetails; -import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Post; @@ -129,44 +126,84 @@ public class FeedRepository { return dao.feedDAO().getTaskId(); } - @Transaction - public Thread create(Thread thread) { - // Validate about data entity is valid and get the owner for that entity - EntityLink about = EntityLink.parse(thread.getAbout()); - EntityRepository repository = Entity.getEntityRepository(about.getEntityType()); - String field = repository.supportsOwner ? "owner" : ""; - EntityInterface aboutEntity = Entity.getEntity(about, field, ALL); - thread.withEntityId(aboutEntity.getId()); // Add entity id to thread - return createThread(thread, about, aboutEntity.getOwner()); + public class ThreadContext { + @Getter protected final Thread thread; + @Getter @Setter protected final EntityLink about; + @Getter @Setter protected EntityInterface aboutEntity; + @Getter private final EntityReference createdBy; + + ThreadContext(Thread thread) { + this.thread = thread; + this.about = EntityLink.parse(thread.getAbout()); + this.aboutEntity = Entity.getEntity(about, getFields(), ALL); + this.createdBy = Entity.getEntityReferenceByName(Entity.USER, thread.getCreatedBy(), NON_DELETED); + thread.withEntityId(aboutEntity.getId()); // Add entity id to thread + } + + public TaskWorkflow getTaskWorkflow() { + EntityRepository repository = Entity.getEntityRepository(about.getEntityType()); + return repository.getTaskWorkflow(this); + } + + public EntityRepository getEntityRepository() { + return Entity.getEntityRepository(about.getEntityType()); + } + + private String getFields() { + EntityRepository repository = getEntityRepository(); + List fieldList = new ArrayList<>(); + if (repository.supportsOwner) { + fieldList.add("owner"); + } + if (repository.supportsTags) { + fieldList.add("tags"); + } + return String.join(",", fieldList.toArray(new String[0])); + } + } + + public abstract static class TaskWorkflow { + protected final ThreadContext threadContext; + + TaskWorkflow(ThreadContext threadContext) { + this.threadContext = threadContext; + } + + public abstract EntityInterface performTask(String user, ResolveTask resolveTask); + + @SuppressWarnings("unused") + protected void closeTask(String user, CloseTask closeTask) {} + + protected final TaskType getTaskType() { + return threadContext.getThread().getTask().getType(); + } + + protected final EntityLink getAbout() { + return threadContext.getAbout(); + } + } + + private ThreadContext getThreadContext(Thread thread) { + return new ThreadContext(thread); } @Transaction - private Thread createThread(Thread thread, EntityLink about, EntityReference entityOwner) { - // Validate user creating thread - UUID createdByUserId = Entity.getEntityReferenceByName(Entity.USER, thread.getCreatedBy(), NON_DELETED).getId(); - - if (thread.getType() == ThreadType.Task) { - thread.withTask(thread.getTask().withId(getNextTaskId())); // Assign taskId for a task - } else if (thread.getType() == ThreadType.Announcement) { - // Validate start and end time for announcement - validateAnnouncement(thread.getAnnouncement()); - long startTime = thread.getAnnouncement().getStartTime(); - long endTime = thread.getAnnouncement().getEndTime(); - // TODO fix this - overlapping announcements should be allowed - List announcements = - dao.feedDAO() - .listAnnouncementBetween(thread.getId().toString(), thread.getEntityId().toString(), startTime, endTime); - if (!announcements.isEmpty()) { - // There is already an announcement that overlaps the new one - throw new IllegalArgumentException(ANNOUNCEMENT_OVERLAP); - } - } + public Thread create(Thread thread) { + ThreadContext threadContext = getThreadContext(thread); + return createThread(threadContext); + } + public void store(ThreadContext threadContext) { // Insert a new thread - dao.feedDAO().insert(JsonUtils.pojoToJson(thread)); + dao.feedDAO().insert(JsonUtils.pojoToJson(threadContext.getThread())); + } + public void storeRelationships(ThreadContext threadContext) { + Thread thread = threadContext.getThread(); + EntityLink about = threadContext.getAbout(); // Add relationship User -- created --> Thread relationship - dao.relationshipDAO().insert(createdByUserId, thread.getId(), USER, Entity.THREAD, CREATED.ordinal()); + dao.relationshipDAO() + .insert(threadContext.getCreatedBy().getId(), thread.getId(), USER, Entity.THREAD, CREATED.ordinal()); // Add field relationship for data asset - Thread -- isAbout ---> entity/entityField dao.fieldRelationshipDAO() @@ -180,7 +217,9 @@ public class FeedRepository { IS_ABOUT.ordinal(), null); - // Add the owner also as addressedTo as the entity he owns when addressed, the owner is actually being addressed + // Add the owner also as addressedTo as the entity he owns when addressed, the owner is + // actually being addressed + EntityReference entityOwner = threadContext.getAboutEntity().getOwner(); if (entityOwner != null) { dao.relationshipDAO() .insert(thread.getId(), entityOwner.getId(), Entity.THREAD, entityOwner.getType(), ADDRESSED_TO.ordinal()); @@ -188,30 +227,21 @@ public class FeedRepository { // Add mentions to field relationship table storeMentions(thread, thread.getMessage()); - populateAssignees(thread); - return thread; } @Transaction - public Thread create(Thread thread, ContainerResponseContext responseContext) { - // Validate about data entity is valid and get the owner for that entity - EntityInterface entity; - // In case of ENTITY_FIELDS_CHANGED entity from responseContext will be a ChangeEvent - if (responseContext.getEntity() instanceof ChangeEvent) { - ChangeEvent change = (ChangeEvent) responseContext.getEntity(); - entity = (EntityInterface) change.getEntity(); - } else { - entity = (EntityInterface) responseContext.getEntity(); + private Thread createThread(ThreadContext threadContext) { + Thread thread = threadContext.getThread(); + if (thread.getType() == ThreadType.Task) { + thread.getTask().withId(getNextTaskId()); + } else if (thread.getType() == ThreadType.Announcement) { + // Validate start and end time for announcement + validateAnnouncement(thread); } - EntityReference owner = null; - try { - owner = Entity.getOwner(entity.getEntityReference()); - } catch (Exception ignored) { - // Either deleted or owner field not available - } - EntityLink about = EntityLink.parse(thread.getAbout()); - thread.withEntityId(entity.getId()); // Add entity id to thread - return createThread(thread, about, owner); + store(threadContext); + storeRelationships(threadContext); + populateAssignees(threadContext.getThread()); + return threadContext.getThread(); } public Thread get(String id) { @@ -228,26 +258,35 @@ public class FeedRepository { public PatchResponse closeTask(UriInfo uriInfo, Thread thread, String user, CloseTask closeTask) { // Update the attributes - closeTask(thread, user, closeTask.getComment()); + ThreadContext threadContext = getThreadContext(thread); + closeTask(threadContext, user, closeTask); Thread updatedHref = FeedResource.addHref(uriInfo, thread); return new PatchResponse<>(Status.OK, updatedHref, RestUtil.ENTITY_UPDATED); } public PatchResponse resolveTask(UriInfo uriInfo, Thread thread, String user, ResolveTask resolveTask) { // perform the task - TaskDetails task = thread.getTask(); - EntityLink about = EntityLink.parse(thread.getAbout()); - EntityReference aboutRef = EntityUtil.validateEntityLink(about); - EntityRepository repository = getEntityRepository(aboutRef.getType()); - repository.update(task, about, resolveTask.getNewValue(), user); - - // Update the attributes - task.withNewValue(resolveTask.getNewValue()); - closeTask(thread, user, null); + ThreadContext threadContext = getThreadContext(thread); + resolveTask(threadContext, user, resolveTask); Thread updatedHref = FeedResource.addHref(uriInfo, thread); return new PatchResponse<>(Status.OK, updatedHref, RestUtil.ENTITY_UPDATED); } + private void resolveTask(ThreadContext threadContext, String user, ResolveTask resolveTask) { + TaskWorkflow taskWorkflow = threadContext.getTaskWorkflow(); + EntityInterface aboutEntity = threadContext.getAboutEntity(); + String origJson = JsonUtils.pojoToJson(aboutEntity); + EntityInterface updatedEntity = taskWorkflow.performTask(user, resolveTask); + String updatedEntityJson = JsonUtils.pojoToJson(updatedEntity); + JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson); + EntityRepository repository = threadContext.getEntityRepository(); + repository.patch(null, aboutEntity.getId(), user, patch); + + // Update the attributes + threadContext.getThread().getTask().withNewValue(resolveTask.getNewValue()); + closeTask(threadContext, user, new CloseTask()); + } + private String getTagFQNs(List tags) { return tags.stream().map(TagLabel::getTagFQN).collect(Collectors.joining(", ")); } @@ -290,13 +329,16 @@ public class FeedRepository { addPostToThread(thread.getId().toString(), post, user); } - private void closeTask(Thread thread, String user, String closingComment) { + private void closeTask(ThreadContext threadContext, String user, CloseTask closeTask) { + Thread thread = threadContext.getThread(); TaskDetails task = thread.getTask(); + TaskWorkflow workflow = threadContext.getTaskWorkflow(); + workflow.closeTask(user, closeTask); task.withStatus(TaskStatus.Closed).withClosedBy(user).withClosedAt(System.currentTimeMillis()); thread.withTask(task).withUpdatedBy(user).withUpdatedAt(System.currentTimeMillis()); dao.feedDAO().update(thread.getId().toString(), JsonUtils.pojoToJson(thread)); - addClosingPost(thread, user, closingComment); + addClosingPost(thread, user, closeTask.getComment()); sortPosts(thread); } @@ -610,18 +652,7 @@ public class FeedRepository { } if (updated.getAnnouncement() != null) { - validateAnnouncement(updated.getAnnouncement()); - // check if the announcement start and end time clashes with other existing announcements - List announcements = - dao.feedDAO() - .listAnnouncementBetween( - id.toString(), - updated.getEntityId().toString(), - updated.getAnnouncement().getStartTime(), - updated.getAnnouncement().getEndTime()); - if (!announcements.isEmpty()) { - throw new IllegalArgumentException(ANNOUNCEMENT_OVERLAP); - } + validateAnnouncement(updated); } // Update the attributes @@ -665,10 +696,20 @@ public class FeedRepository { CatalogExceptionMessage.taskOperationNotAllowed(userName, closeTask ? "closeTask" : "resolveTask")); } - private void validateAnnouncement(AnnouncementDetails announcementDetails) { - if (announcementDetails.getStartTime() >= announcementDetails.getEndTime()) { + private void validateAnnouncement(Thread thread) { + long startTime = thread.getAnnouncement().getStartTime(); + long endTime = thread.getAnnouncement().getEndTime(); + if (startTime >= endTime) { throw new IllegalArgumentException(ANNOUNCEMENT_INVALID_START_TIME); } + // TODO fix this - overlapping announcements should be allowed + List announcements = + dao.feedDAO() + .listAnnouncementBetween(thread.getId().toString(), thread.getEntityId().toString(), startTime, endTime); + if (!announcements.isEmpty()) { + // There is already an announcement that overlaps the new one + throw new IllegalArgumentException(ANNOUNCEMENT_OVERLAP); + } } private void restorePatchAttributes(Thread original, Thread updated) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java index 5a048c674df..f33a3757c17 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java @@ -25,9 +25,9 @@ import static org.openmetadata.service.util.EntityUtil.mlHyperParameterMatch; import java.util.ArrayList; import java.util.List; -import javax.json.JsonPatch; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.entity.data.MlModel; import org.openmetadata.schema.entity.services.MlModelService; import org.openmetadata.schema.type.EntityReference; @@ -37,10 +37,12 @@ import org.openmetadata.schema.type.MlFeatureSource; import org.openmetadata.schema.type.MlHyperParameter; import org.openmetadata.schema.type.Relationship; import org.openmetadata.schema.type.TagLabel; -import org.openmetadata.schema.type.TaskDetails; +import org.openmetadata.schema.type.TaskType; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.CatalogExceptionMessage; -import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow; +import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext; +import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; import org.openmetadata.service.resources.mlmodels.MlModelResource; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.EntityUtil.Fields; @@ -63,6 +65,14 @@ public class MlModelRepository extends EntityRepository { MODEL_UPDATE_FIELDS); } + public static MlFeature findMlFeature(List features, String featureName) { + return features.stream() + .filter(c -> c.getName().equals(featureName)) + .findFirst() + .orElseThrow( + () -> new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("mlFeature", featureName))); + } + @Override public void setFullyQualifiedName(MlModel mlModel) { mlModel.setFullyQualifiedName( @@ -238,31 +248,53 @@ public class MlModelRepository extends EntityRepository { } @Override - public void update(TaskDetails task, MessageParser.EntityLink entityLink, String newValue, String user) { + public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) { + validateTaskThread(threadContext); + EntityLink entityLink = threadContext.getAbout(); if (entityLink.getFieldName().equals("mlFeatures")) { - MlModel mlModel = getByName(null, entityLink.getEntityFQN(), getFields("tags"), Include.ALL, false); - MlFeature mlFeature = - mlModel.getMlFeatures().stream() - .filter(c -> c.getName().equals(entityLink.getArrayFieldName())) - .findFirst() - .orElseThrow( - () -> - new IllegalArgumentException( - CatalogExceptionMessage.invalidFieldName("chart", entityLink.getArrayFieldName()))); - - String origJson = JsonUtils.pojoToJson(mlModel); - if (EntityUtil.isDescriptionTask(task.getType())) { - mlFeature.setDescription(newValue); - } else if (EntityUtil.isTagTask(task.getType())) { - List tags = JsonUtils.readObjects(newValue, TagLabel.class); - mlFeature.setTags(tags); + TaskType taskType = threadContext.getThread().getTask().getType(); + if (EntityUtil.isDescriptionTask(taskType)) { + return new MlFeatureDescriptionTaskWorkflow(threadContext); + } else if (EntityUtil.isTagTask(taskType)) { + return new MlFeatureTagTaskWorkflow(threadContext); + } else { + throw new IllegalArgumentException(String.format("Invalid task type %s", taskType)); } - String updatedEntityJson = JsonUtils.pojoToJson(mlModel); - JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson); - patch(null, mlModel.getId(), user, patch); - return; } - super.update(task, entityLink, newValue, user); + return super.getTaskWorkflow(threadContext); + } + + static class MlFeatureDescriptionTaskWorkflow extends DescriptionTaskWorkflow { + private final MlFeature mlFeature; + + MlFeatureDescriptionTaskWorkflow(ThreadContext threadContext) { + super(threadContext); + MlModel mlModel = (MlModel) threadContext.getAboutEntity(); + mlFeature = findMlFeature(mlModel.getMlFeatures(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + mlFeature.setDescription(resolveTask.getNewValue()); + return threadContext.getAboutEntity(); + } + } + + static class MlFeatureTagTaskWorkflow extends TagTaskWorkflow { + private final MlFeature mlFeature; + + MlFeatureTagTaskWorkflow(ThreadContext threadContext) { + super(threadContext); + MlModel mlModel = (MlModel) threadContext.getAboutEntity(); + mlFeature = findMlFeature(mlModel.getMlFeatures(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + List tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class); + mlFeature.setTags(tags); + return threadContext.getAboutEntity(); + } } private void populateService(MlModel mlModel) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java index aa2d073ab1a..3c651206faf 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java @@ -16,15 +16,16 @@ package org.openmetadata.service.jdbi3; import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.schema.type.Include.ALL; +import static org.openmetadata.service.Entity.CONTAINER; import static org.openmetadata.service.Entity.FIELD_TAGS; import static org.openmetadata.service.Entity.PIPELINE_SERVICE; import static org.openmetadata.service.util.EntityUtil.taskMatch; import java.util.ArrayList; import java.util.List; -import javax.json.JsonPatch; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.entity.data.Pipeline; import org.openmetadata.schema.entity.data.PipelineStatus; import org.openmetadata.schema.entity.services.PipelineService; @@ -34,11 +35,13 @@ import org.openmetadata.schema.type.Relationship; import org.openmetadata.schema.type.Status; import org.openmetadata.schema.type.TagLabel; import org.openmetadata.schema.type.Task; -import org.openmetadata.schema.type.TaskDetails; +import org.openmetadata.schema.type.TaskType; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.CatalogExceptionMessage; import org.openmetadata.service.exception.EntityNotFoundException; -import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow; +import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext; +import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; import org.openmetadata.service.resources.pipelines.PipelineResource; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.EntityUtil.Fields; @@ -71,30 +74,55 @@ public class PipelineRepository extends EntityRepository { } @Override - public void update(TaskDetails task, MessageParser.EntityLink entityLink, String newValue, String user) { + public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) { + validateTaskThread(threadContext); + EntityLink entityLink = threadContext.getAbout(); if (entityLink.getFieldName().equals(TASKS_FIELD)) { - Pipeline pipeline = getByName(null, entityLink.getEntityFQN(), getFields("tasks,tags"), Include.ALL, false); - String oldJson = JsonUtils.pojoToJson(pipeline); - Task pipelineTask = - pipeline.getTasks().stream() - .filter(c -> c.getName().equals(entityLink.getArrayFieldName())) - .findFirst() - .orElseThrow( - () -> - new IllegalArgumentException( - CatalogExceptionMessage.invalidFieldName("task", entityLink.getArrayFieldName()))); - if (EntityUtil.isDescriptionTask(task.getType())) { - pipelineTask.setDescription(newValue); - } else if (EntityUtil.isTagTask(task.getType())) { - List tags = JsonUtils.readObjects(newValue, TagLabel.class); - pipelineTask.setTags(tags); + TaskType taskType = threadContext.getThread().getTask().getType(); + if (EntityUtil.isDescriptionTask(taskType)) { + return new TaskDescriptionWorkflow(threadContext); + } else if (EntityUtil.isTagTask(taskType)) { + return new TaskTagWorkflow(threadContext); + } else { + throw new IllegalArgumentException(String.format("Invalid task type %s", taskType)); } - String updatedEntityJson = JsonUtils.pojoToJson(pipeline); - JsonPatch patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson); - patch(null, pipeline.getId(), user, patch); - return; } - super.update(task, entityLink, newValue, user); + return super.getTaskWorkflow(threadContext); + } + + static class TaskDescriptionWorkflow extends DescriptionTaskWorkflow { + private final Task task; + + TaskDescriptionWorkflow(ThreadContext threadContext) { + super(threadContext); + Pipeline pipeline = Entity.getEntity(CONTAINER, threadContext.getAboutEntity().getId(), "tasks", ALL); + threadContext.setAboutEntity(pipeline); + task = findTask(pipeline.getTasks(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + task.setDescription(resolveTask.getNewValue()); + return threadContext.getAboutEntity(); + } + } + + static class TaskTagWorkflow extends TagTaskWorkflow { + private final Task task; + + TaskTagWorkflow(ThreadContext threadContext) { + super(threadContext); + Pipeline pipeline = Entity.getEntity(CONTAINER, threadContext.getAboutEntity().getId(), "tasks,tags", ALL); + threadContext.setAboutEntity(pipeline); + task = findTask(pipeline.getTasks(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + List tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class); + task.setTags(tags); + return threadContext.getAboutEntity(); + } } @Override @@ -361,4 +389,11 @@ public class PipelineRepository extends EntityRepository { } } } + + public static Task findTask(List tasks, String taskName) { + return tasks.stream() + .filter(c -> c.getName().equals(taskName)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("task", taskName))); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java index 77e8247727e..d4d304ba073 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java @@ -32,20 +32,22 @@ import java.util.UUID; import java.util.function.BiPredicate; import java.util.function.Function; import java.util.stream.Collectors; -import javax.json.JsonPatch; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.entity.data.SearchIndex; import org.openmetadata.schema.entity.services.SearchService; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Relationship; import org.openmetadata.schema.type.SearchIndexField; import org.openmetadata.schema.type.TagLabel; -import org.openmetadata.schema.type.TaskDetails; +import org.openmetadata.schema.type.TaskType; import org.openmetadata.schema.type.searchindex.SearchIndexSampleData; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.CatalogExceptionMessage; -import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow; +import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext; +import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; import org.openmetadata.service.resources.searchindex.SearchIndexResource; import org.openmetadata.service.security.mask.PIIMasker; import org.openmetadata.service.util.EntityUtil; @@ -275,51 +277,84 @@ public class SearchIndexRepository extends EntityRepository { } @Override - public void update(TaskDetails task, MessageParser.EntityLink entityLink, String newValue, String user) { + public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) { + validateTaskThread(threadContext); + EntityLink entityLink = threadContext.getAbout(); if (entityLink.getFieldName().equals("fields")) { - String schemaName = entityLink.getArrayFieldName(); - String childrenSchemaName = ""; - if (entityLink.getArrayFieldName().contains(".")) { - String fieldNameWithoutQuotes = - entityLink.getArrayFieldName().substring(1, entityLink.getArrayFieldName().length() - 1); - schemaName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf(".")); - childrenSchemaName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1); + TaskType taskType = threadContext.getThread().getTask().getType(); + if (EntityUtil.isDescriptionTask(taskType)) { + return new FieldDescriptionWorkflow(threadContext); + } else if (EntityUtil.isTagTask(taskType)) { + return new FieldTagWorkflow(threadContext); + } else { + throw new IllegalArgumentException(String.format("Invalid task type %s", taskType)); } - SearchIndex searchIndex = getByName(null, entityLink.getEntityFQN(), getFields("tags"), ALL, false); - SearchIndexField schemaField = null; - for (SearchIndexField field : searchIndex.getFields()) { - if (field.getName().equals(schemaName)) { - schemaField = field; - break; - } - } - if (!"".equals(childrenSchemaName) && schemaField != null) { - schemaField = getChildrenSchemaField(schemaField.getChildren(), childrenSchemaName); - } - if (schemaField == null) { - throw new IllegalArgumentException( - CatalogExceptionMessage.invalidFieldName("schema", entityLink.getArrayFieldName())); - } - - String origJson = JsonUtils.pojoToJson(searchIndex); - if (EntityUtil.isDescriptionTask(task.getType())) { - schemaField.setDescription(newValue); - } else if (EntityUtil.isTagTask(task.getType())) { - List tags = JsonUtils.readObjects(newValue, TagLabel.class); - schemaField.setTags(tags); - } - String updatedEntityJson = JsonUtils.pojoToJson(searchIndex); - JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson); - patch(null, searchIndex.getId(), user, patch); - return; } - super.update(task, entityLink, newValue, user); + return super.getTaskWorkflow(threadContext); } - private static SearchIndexField getChildrenSchemaField(List fields, String childrenSchemaName) { + static class FieldDescriptionWorkflow extends DescriptionTaskWorkflow { + private final SearchIndexField schemaField; + + FieldDescriptionWorkflow(ThreadContext threadContext) { + super(threadContext); + schemaField = + getSchemaField((SearchIndex) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + schemaField.setDescription(resolveTask.getNewValue()); + return threadContext.getAboutEntity(); + } + } + + static class FieldTagWorkflow extends TagTaskWorkflow { + private final SearchIndexField schemaField; + + FieldTagWorkflow(ThreadContext threadContext) { + super(threadContext); + schemaField = + getSchemaField((SearchIndex) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + List tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class); + schemaField.setTags(tags); + return threadContext.getAboutEntity(); + } + } + + private static SearchIndexField getSchemaField(SearchIndex searchIndex, String fieldName) { + String schemaName = fieldName; + List schemaFields = searchIndex.getFields(); + String childSchemaName = ""; + if (fieldName.contains(".")) { + String fieldNameWithoutQuotes = fieldName.substring(1, fieldName.length() - 1); + schemaName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf(".")); + childSchemaName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1); + } + SearchIndexField schemaField = null; + for (SearchIndexField field : schemaFields) { + if (field.getName().equals(schemaName)) { + schemaField = field; + break; + } + } + if (!"".equals(childSchemaName) && schemaField != null) { + schemaField = getChildSchemaField(schemaField.getChildren(), childSchemaName); + } + if (schemaField == null) { + throw new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("schema", fieldName)); + } + return schemaField; + } + + private static SearchIndexField getChildSchemaField(List fields, String childSchemaName) { SearchIndexField childrenSchemaField = null; for (SearchIndexField field : fields) { - if (field.getName().equals(childrenSchemaName)) { + if (field.getName().equals(childSchemaName)) { childrenSchemaField = field; break; } @@ -327,7 +362,7 @@ public class SearchIndexRepository extends EntityRepository { if (childrenSchemaField == null) { for (SearchIndexField field : fields) { if (field.getChildren() != null) { - childrenSchemaField = getChildrenSchemaField(field.getChildren(), childrenSchemaName); + childrenSchemaField = getChildSchemaField(field.getChildren(), childSchemaName); if (childrenSchemaField != null) { break; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index 2cffb0f0069..ee615093d4c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -39,7 +39,6 @@ import java.util.UUID; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.json.JsonPatch; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; @@ -47,6 +46,7 @@ import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.data.CreateTableProfile; +import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.entity.data.DatabaseSchema; import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.tests.CustomMetric; @@ -69,11 +69,12 @@ import org.openmetadata.schema.type.TableJoins; import org.openmetadata.schema.type.TableProfile; import org.openmetadata.schema.type.TableProfilerConfig; import org.openmetadata.schema.type.TagLabel; -import org.openmetadata.schema.type.TaskDetails; import org.openmetadata.schema.type.TaskType; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.CatalogExceptionMessage; import org.openmetadata.service.exception.EntityNotFoundException; +import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow; +import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext; import org.openmetadata.service.resources.databases.DatabaseUtil; import org.openmetadata.service.resources.databases.TableResource; import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; @@ -762,48 +763,76 @@ public class TableRepository extends EntityRepository { } @Override - public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) { - validateEntityLinkFieldExists(entityLink, task.getType()); + public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) { + validateTaskThread(threadContext); + EntityLink entityLink = threadContext.getAbout(); if (entityLink.getFieldName().equals("columns")) { - String columnName = entityLink.getArrayFieldName(); - String childrenName = ""; - if (entityLink.getArrayFieldName().contains(".")) { - String fieldNameWithoutQuotes = - entityLink.getArrayFieldName().substring(1, entityLink.getArrayFieldName().length() - 1); - columnName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf(".")); - childrenName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1); + TaskType taskType = threadContext.getThread().getTask().getType(); + if (EntityUtil.isDescriptionTask(taskType)) { + return new ColumnDescriptionWorkflow(threadContext); + } else if (EntityUtil.isTagTask(taskType)) { + return new ColumnTagWorkflow(threadContext); + } else { + throw new IllegalArgumentException(String.format("Invalid task type %s", taskType)); } - Table table = getByName(null, entityLink.getEntityFQN(), getFields("columns,tags"), Include.ALL, false); - Column column = null; - for (Column c : table.getColumns()) { - if (c.getName().equals(columnName)) { - column = c; - break; - } - } - if (!"".equals(childrenName) && column != null) { - column = getChildrenColumn(column.getChildren(), childrenName); - } - if (column == null) { - throw new IllegalArgumentException( - CatalogExceptionMessage.invalidFieldName("column", entityLink.getArrayFieldName())); - } - String origJson = JsonUtils.pojoToJson(table); - if (EntityUtil.isDescriptionTask(task.getType())) { - column.setDescription(newValue); - } else if (EntityUtil.isTagTask(task.getType())) { - List tags = JsonUtils.readObjects(newValue, TagLabel.class); - column.setTags(tags); - } - String updatedEntityJson = JsonUtils.pojoToJson(table); - JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson); - patch(null, table.getId(), user, patch); - return; } - super.update(task, entityLink, newValue, user); + return super.getTaskWorkflow(threadContext); } - private static Column getChildrenColumn(List column, String childrenName) { + static class ColumnDescriptionWorkflow extends DescriptionTaskWorkflow { + private final Column column; + + ColumnDescriptionWorkflow(ThreadContext threadContext) { + super(threadContext); + Table table = Entity.getEntity(TABLE, threadContext.getAboutEntity().getId(), "columns", ALL); + threadContext.setAboutEntity(table); + column = getColumn((Table) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + column.setDescription(resolveTask.getNewValue()); + return threadContext.getAboutEntity(); + } + } + + static class ColumnTagWorkflow extends TagTaskWorkflow { + private final Column column; + + ColumnTagWorkflow(ThreadContext threadContext) { + super(threadContext); + Table table = Entity.getEntity(TABLE, threadContext.getAboutEntity().getId(), "columns,tags", ALL); + threadContext.setAboutEntity(table); + column = getColumn((Table) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + List tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class); + column.setTags(tags); + return threadContext.getAboutEntity(); + } + } + + private static Column getColumn(Table table, String columnName) { + String childrenName = ""; + if (columnName.contains(".")) { + String fieldNameWithoutQuotes = columnName.substring(1, columnName.length() - 1); + columnName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf(".")); + childrenName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1); + } + + Column column = EntityUtil.findColumn(table.getColumns(), columnName); + if (!"".equals(childrenName) && column != null) { + column = getChildColumn(column.getChildren(), childrenName); + } + if (column == null) { + throw new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("column", columnName)); + } + return column; + } + + private static Column getChildColumn(List column, String childrenName) { Column childrenColumn = null; for (Column col : column) { if (col.getName().equals(childrenName)) { @@ -814,7 +843,7 @@ public class TableRepository extends EntityRepository
{ if (childrenColumn == null) { for (Column value : column) { if (value.getChildren() != null) { - childrenColumn = getChildrenColumn(value.getChildren(), childrenName); + childrenColumn = getChildColumn(value.getChildren(), childrenName); if (childrenColumn != null) { break; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java index d56761fb405..d901e11e764 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java @@ -20,7 +20,6 @@ import static org.openmetadata.service.Entity.FIELD_DESCRIPTION; import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME; import static org.openmetadata.service.Entity.FIELD_TAGS; import static org.openmetadata.service.Entity.MESSAGING_SERVICE; -import static org.openmetadata.service.util.EntityUtil.getSchemaField; import java.util.ArrayList; import java.util.HashSet; @@ -31,21 +30,23 @@ import java.util.UUID; import java.util.function.BiPredicate; import java.util.function.Function; import java.util.stream.Collectors; -import javax.json.JsonPatch; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.entity.data.Topic; import org.openmetadata.schema.entity.services.MessagingService; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Field; import org.openmetadata.schema.type.Relationship; import org.openmetadata.schema.type.TagLabel; -import org.openmetadata.schema.type.TaskDetails; +import org.openmetadata.schema.type.TaskType; import org.openmetadata.schema.type.topic.CleanupPolicy; import org.openmetadata.schema.type.topic.TopicSampleData; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.CatalogExceptionMessage; -import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow; +import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext; +import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; import org.openmetadata.service.resources.topics.TopicResource; import org.openmetadata.service.security.mask.PIIMasker; import org.openmetadata.service.util.EntityUtil; @@ -268,48 +269,79 @@ public class TopicRepository extends EntityRepository { } @Override - public void update(TaskDetails task, MessageParser.EntityLink entityLink, String newValue, String user) { + public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) { + validateTaskThread(threadContext); + EntityLink entityLink = threadContext.getAbout(); if (entityLink.getFieldName().equals("messageSchema")) { - String schemaName = entityLink.getArrayFieldName(); - String childrenSchemaName = ""; - if (entityLink.getArrayFieldName().contains(".")) { - String fieldNameWithoutQuotes = - entityLink.getArrayFieldName().substring(1, entityLink.getArrayFieldName().length() - 1); - schemaName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf(".")); - childrenSchemaName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1); + TaskType taskType = threadContext.getThread().getTask().getType(); + if (EntityUtil.isDescriptionTask(taskType)) { + return new MessageSchemaDescriptionWorkflow(threadContext); + } else if (EntityUtil.isTagTask(taskType)) { + return new MessageSchemaTagWorkflow(threadContext); + } else { + throw new IllegalArgumentException(String.format("Invalid task type %s", taskType)); } - Topic topic = getByName(null, entityLink.getEntityFQN(), getFields("tags"), ALL, false); - Field schemaField = null; - for (Field field : topic.getMessageSchema().getSchemaFields()) { - if (field.getName().equals(schemaName)) { - schemaField = field; - break; - } - } - if (!"".equals(childrenSchemaName) && schemaField != null) { - schemaField = getchildrenSchemaField(schemaField.getChildren(), childrenSchemaName); - } - if (schemaField == null) { - throw new IllegalArgumentException( - CatalogExceptionMessage.invalidFieldName("schema", entityLink.getArrayFieldName())); - } - - String origJson = JsonUtils.pojoToJson(topic); - if (EntityUtil.isDescriptionTask(task.getType())) { - schemaField.setDescription(newValue); - } else if (EntityUtil.isTagTask(task.getType())) { - List tags = JsonUtils.readObjects(newValue, TagLabel.class); - schemaField.setTags(tags); - } - String updatedEntityJson = JsonUtils.pojoToJson(topic); - JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson); - patch(null, topic.getId(), user, patch); - return; } - super.update(task, entityLink, newValue, user); + return super.getTaskWorkflow(threadContext); } - private static Field getchildrenSchemaField(List fields, String childrenSchemaName) { + static class MessageSchemaDescriptionWorkflow extends DescriptionTaskWorkflow { + private final Field schemaField; + + MessageSchemaDescriptionWorkflow(ThreadContext threadContext) { + super(threadContext); + schemaField = + getSchemaField((Topic) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + schemaField.setDescription(resolveTask.getNewValue()); + return threadContext.getAboutEntity(); + } + } + + static class MessageSchemaTagWorkflow extends TagTaskWorkflow { + private final Field schemaField; + + MessageSchemaTagWorkflow(ThreadContext threadContext) { + super(threadContext); + schemaField = + getSchemaField((Topic) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName()); + } + + @Override + public EntityInterface performTask(String user, ResolveTask resolveTask) { + List tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class); + schemaField.setTags(tags); + return threadContext.getAboutEntity(); + } + } + + private static Field getSchemaField(Topic topic, String schemaName) { + String childrenSchemaName = ""; + if (schemaName.contains(".")) { + String fieldNameWithoutQuotes = schemaName.substring(1, schemaName.length() - 1); + schemaName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf(".")); + childrenSchemaName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1); + } + Field schemaField = null; + for (Field field : topic.getMessageSchema().getSchemaFields()) { + if (field.getName().equals(schemaName)) { + schemaField = field; + break; + } + } + if (!"".equals(childrenSchemaName) && schemaField != null) { + schemaField = getChildSchemaField(schemaField.getChildren(), childrenSchemaName); + } + if (schemaField == null) { + throw new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("schema", schemaName)); + } + return schemaField; + } + + private static Field getChildSchemaField(List fields, String childrenSchemaName) { Field childrenSchemaField = null; for (Field field : fields) { if (field.getName().equals(childrenSchemaName)) { @@ -320,7 +352,7 @@ public class TopicRepository extends EntityRepository { if (childrenSchemaField == null) { for (Field field : fields) { if (field.getChildren() != null) { - childrenSchemaField = getchildrenSchemaField(field.getChildren(), childrenSchemaName); + childrenSchemaField = getChildSchemaField(field.getChildren(), childrenSchemaName); if (childrenSchemaField != null) { break; } @@ -449,7 +481,7 @@ public class TopicRepository extends EntityRepository { updatedField.setDescription(origField.getDescription()); return; } - String field = getSchemaField(original, origField, FIELD_DESCRIPTION); + String field = EntityUtil.getSchemaField(original, origField, FIELD_DESCRIPTION); recordChange(field, origField.getDescription(), updatedField.getDescription()); } @@ -459,7 +491,7 @@ public class TopicRepository extends EntityRepository { updatedField.setDisplayName(origField.getDisplayName()); return; } - String field = getSchemaField(original, origField, FIELD_DISPLAY_NAME); + String field = EntityUtil.getSchemaField(original, origField, FIELD_DISPLAY_NAME); recordChange(field, origField.getDisplayName(), updatedField.getDisplayName()); } @@ -469,7 +501,7 @@ public class TopicRepository extends EntityRepository { updatedField.setDataTypeDisplay(origField.getDataTypeDisplay()); return; } - String field = getSchemaField(original, origField, FIELD_DATA_TYPE_DISPLAY); + String field = EntityUtil.getSchemaField(original, origField, FIELD_DATA_TYPE_DISPLAY); recordChange(field, origField.getDataTypeDisplay(), updatedField.getDataTypeDisplay()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java index 933c4828515..70cb2a60cc5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java @@ -536,4 +536,12 @@ public final class EntityUtil { public static boolean isTagTask(TaskType taskType) { return taskType == TaskType.RequestTag || taskType == TaskType.UpdateTag; } + + public static Column findColumn(List columns, String columnName) { + return columns.stream() + .filter(c -> c.getName().equals(columnName)) + .findFirst() + .orElseThrow( + () -> new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("column", columnName))); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/FeedUtils.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/FeedUtils.java index 1da44e037ca..c71f8050c23 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/FeedUtils.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/FeedUtils.java @@ -16,7 +16,6 @@ package org.openmetadata.service.util; import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity; import static org.openmetadata.service.formatter.util.FormatterUtil.getFormattedMessages; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -34,7 +33,7 @@ import org.openmetadata.service.resources.feeds.MessageParser; public final class FeedUtils { private FeedUtils() {} - public static List getThreads(ChangeEvent changeEvent, String loggedInUserName) throws IOException { + public static List getThreads(ChangeEvent changeEvent, String loggedInUserName) { if (changeEvent == null || changeEvent.getEntity() == null) { return Collections.emptyList(); // Response has no entity to produce change event from } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/feeds/FeedResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/feeds/FeedResourceTest.java index 28ae4f2e84b..64d5675db41 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/feeds/FeedResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/feeds/FeedResourceTest.java @@ -33,6 +33,7 @@ import static org.openmetadata.service.exception.CatalogExceptionMessage.ANNOUNC import static org.openmetadata.service.exception.CatalogExceptionMessage.entityNotFound; import static org.openmetadata.service.exception.CatalogExceptionMessage.permissionNotAllowed; import static org.openmetadata.service.resources.EntityResourceTest.C1; +import static org.openmetadata.service.resources.EntityResourceTest.USER1; import static org.openmetadata.service.resources.EntityResourceTest.USER_ADDRESS_TAG_LABEL; import static org.openmetadata.service.security.SecurityUtil.authHeaders; import static org.openmetadata.service.security.SecurityUtil.getPrincipalName; @@ -392,7 +393,6 @@ public class FeedResourceTest extends OpenMetadataApplicationTest { int totalAnnouncementCount = listAnnouncements(null, null, null, ADMIN_AUTH_HEADERS).getPaging().getTotal(); // create two announcements with start time in the future - LocalDateTime now = LocalDateTime.now(); String about = String.format("<#E::%s::%s>", Entity.TABLE, TABLE.getFullyQualifiedName()); // Create announcement 1 @@ -445,7 +445,6 @@ public class FeedResourceTest extends OpenMetadataApplicationTest { @Test void post_invalidAnnouncement_400() throws IOException { // create two announcements with same start time in the future - LocalDateTime now = LocalDateTime.now(); String about = String.format("<#E::%s::%s>", Entity.TABLE, TABLE.getFullyQualifiedName()); AnnouncementDetails announcementDetails = getAnnouncementDetails("1", 3, 5); createAnnouncement(USER.getName(), about, "Announcement One", announcementDetails, USER_AUTH_HEADERS); @@ -962,41 +961,38 @@ public class FeedResourceTest extends OpenMetadataApplicationTest { @Test void list_threadsWithOwnerFilter() throws HttpResponseException { - // THREAD is created with TABLE entity in BeforeAll int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getPaging().getTotal(); - String ownerId = TABLE.getOwner().getId().toString(); - assertNotNull(ownerId); - int user1ThreadCount = - listThreadsWithFilter(ownerId, FilterType.OWNER.toString(), USER_AUTH_HEADERS).getPaging().getTotal(); - int user2ThreadCount = - listThreadsWithFilter(USER2.getId().toString(), FilterType.OWNER.toString(), USER_AUTH_HEADERS) - .getPaging() - .getTotal(); + String user1 = USER1.getId().toString(); // user1 is the owner of TABLE + String user2 = USER2.getId().toString(); // user2 belongs to team2 which owns TABLE2 + assertNotNull(user1); + // Get thread counts for user1 and user2 + int user1ThreadCount = listThreadsWithFilter(user1, FilterType.OWNER, USER_AUTH_HEADERS).getPaging().getTotal(); + int user2ThreadCount = listThreadsWithFilter(user2, FilterType.OWNER, USER_AUTH_HEADERS).getPaging().getTotal(); - // create another thread on an entity with a different owner - String ownerId2 = TABLE2.getOwner().getId().toString(); - assertNotNull(ownerId2); + // create another thread on an entity with team2 as owner + String team2 = TABLE2.getOwner().getId().toString(); + assertNotEquals(user1, team2); createAndCheck( create().withAbout(String.format("<#E::table::%s>", TABLE2.getFullyQualifiedName())).withFrom(ADMIN_USER_NAME), ADMIN_AUTH_HEADERS); - assertNotEquals(ownerId, ownerId2); - ThreadList threads = listThreadsWithFilter(ownerId, FilterType.OWNER.toString(), USER_AUTH_HEADERS); + // user1 thread count remains the same as the newly created thread belongs to team2 and user1 is not part of it + ThreadList threads = listThreadsWithFilter(user1, FilterType.OWNER, USER_AUTH_HEADERS); assertEquals(user1ThreadCount, threads.getPaging().getTotal()); // This should return error since the table is owned by a team // and for the filter we are passing team id instead of user id assertResponse( - () -> listThreadsWithFilter(ownerId2, FilterType.OWNER.toString(), USER_AUTH_HEADERS), + () -> listThreadsWithFilter(team2, FilterType.OWNER, USER_AUTH_HEADERS), NOT_FOUND, - entityNotFound(Entity.USER, ownerId2)); + entityNotFound(Entity.USER, team2)); - // Now, test the filter with user who is part of the team - threads = listThreadsWithFilter(USER2.getId().toString(), FilterType.OWNER.toString(), USER_AUTH_HEADERS); + // Now, test the filter with user2 who is part of the team2 + threads = listThreadsWithFilter(user2, FilterType.OWNER, USER_AUTH_HEADERS); assertEquals(user2ThreadCount + 1, threads.getPaging().getTotal()); // Test if no user id filter returns all threads - threads = listThreadsWithFilter(null, FilterType.OWNER.toString(), USER_AUTH_HEADERS); + threads = listThreadsWithFilter(null, FilterType.OWNER, USER_AUTH_HEADERS); assertEquals(totalThreadCount + 1, threads.getPaging().getTotal()); } @@ -1019,8 +1015,7 @@ public class FeedResourceTest extends OpenMetadataApplicationTest { ADMIN_AUTH_HEADERS); addPostAndCheck(thread, createPost, ADMIN_AUTH_HEADERS); - ThreadList threads = - listThreadsWithFilter(USER.getId().toString(), FilterType.MENTIONS.toString(), USER_AUTH_HEADERS); + ThreadList threads = listThreadsWithFilter(USER.getId().toString(), FilterType.MENTIONS, USER_AUTH_HEADERS); assertEquals(2, threads.getPaging().getTotal()); } @@ -1046,11 +1041,10 @@ public class FeedResourceTest extends OpenMetadataApplicationTest { .until( () -> { ThreadList threads = - listThreadsWithFilter(USER.getId().toString(), FilterType.FOLLOWS.toString(), USER_AUTH_HEADERS); + listThreadsWithFilter(USER.getId().toString(), FilterType.FOLLOWS, USER_AUTH_HEADERS); return threads.getPaging().getTotal().equals(initialThreadCount + 3); }); - ThreadList threads = - listThreadsWithFilter(USER.getId().toString(), FilterType.FOLLOWS.toString(), USER_AUTH_HEADERS); + ThreadList threads = listThreadsWithFilter(USER.getId().toString(), FilterType.FOLLOWS, USER_AUTH_HEADERS); assertEquals(initialThreadCount + 3, threads.getPaging().getTotal()); assertEquals(initialThreadCount + 3, threads.getData().size()); assertEquals( @@ -1059,7 +1053,7 @@ public class FeedResourceTest extends OpenMetadataApplicationTest { assertEquals("Message 2", threads.getData().get(1).getMessage()); // Filter by follows for another user should return 0 threads - threads = listThreadsWithFilter(USER2.getId().toString(), FilterType.FOLLOWS.toString(), USER_AUTH_HEADERS); + threads = listThreadsWithFilter(USER2.getId().toString(), FilterType.FOLLOWS, USER_AUTH_HEADERS); assertEquals(0, threads.getPaging().getTotal()); assertEquals(0, threads.getData().size()); } @@ -1386,6 +1380,11 @@ public class FeedResourceTest extends OpenMetadataApplicationTest { TestUtils.put(target, userId, OK, authHeaders); } + public ThreadList listThreadsWithFilter(String userId, FilterType filterType, Map authHeaders) + throws HttpResponseException { + return listThreadsWithFilter(userId, filterType.toString(), authHeaders); + } + public ThreadList listThreadsWithFilter(String userId, String filterType, Map authHeaders) throws HttpResponseException { WebTarget target = getResource("feed");