mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-25 17:04:54 +00:00
Cleanup task resolution and closing with task workflows (#13097)
This commit is contained in:
parent
d8e78a32d8
commit
cd2c61ad5e
@ -86,7 +86,7 @@ public class ChangeEventHandler implements EventHandler {
|
||||
for (Thread thread : listOrEmpty(FeedUtils.getThreads(changeEvent, loggedInUserName))) {
|
||||
// Don't create a thread if there is no message
|
||||
if (thread.getMessage() != null && !thread.getMessage().isEmpty()) {
|
||||
feedDao.create(thread, responseContext);
|
||||
feedDao.create(thread);
|
||||
String jsonThread = mapper.writeValueAsString(thread);
|
||||
WebSocketManager.getInstance()
|
||||
.broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread);
|
||||
|
@ -4,6 +4,7 @@ import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
||||
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
import static org.openmetadata.schema.type.Include.ALL;
|
||||
import static org.openmetadata.service.Entity.CONTAINER;
|
||||
import static org.openmetadata.service.Entity.DASHBOARD_DATA_MODEL;
|
||||
import static org.openmetadata.service.Entity.FIELD_PARENT;
|
||||
import static org.openmetadata.service.Entity.FIELD_TAGS;
|
||||
import static org.openmetadata.service.Entity.STORAGE_SERVICE;
|
||||
@ -11,9 +12,10 @@ import static org.openmetadata.service.Entity.STORAGE_SERVICE;
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import javax.json.JsonPatch;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.feed.ResolveTask;
|
||||
import org.openmetadata.schema.entity.data.Container;
|
||||
import org.openmetadata.schema.entity.data.DashboardDataModel;
|
||||
import org.openmetadata.schema.entity.services.StorageService;
|
||||
import org.openmetadata.schema.type.Column;
|
||||
import org.openmetadata.schema.type.ContainerFileFormat;
|
||||
@ -21,10 +23,11 @@ import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.type.Relationship;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.TaskDetails;
|
||||
import org.openmetadata.schema.type.TaskType;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.service.resources.storages.ContainerResource;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
@ -202,32 +205,57 @@ public class ContainerRepository extends EntityRepository<Container> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(TaskDetails task, MessageParser.EntityLink entityLink, String newValue, String user) {
|
||||
// TODO move this as the first check
|
||||
public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) {
|
||||
validateTaskThread(threadContext);
|
||||
EntityLink entityLink = threadContext.getAbout();
|
||||
if (entityLink.getFieldName().equals("dataModel")) {
|
||||
Container container = getByName(null, entityLink.getEntityFQN(), getFields("dataModel,tags"), Include.ALL, false);
|
||||
Column column =
|
||||
container.getDataModel().getColumns().stream()
|
||||
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
|
||||
.findFirst()
|
||||
.orElseThrow(
|
||||
() ->
|
||||
new IllegalArgumentException(
|
||||
CatalogExceptionMessage.invalidFieldName("column", entityLink.getArrayFieldName())));
|
||||
|
||||
String origJson = JsonUtils.pojoToJson(container);
|
||||
if (EntityUtil.isDescriptionTask(task.getType())) {
|
||||
column.setDescription(newValue);
|
||||
} else if (EntityUtil.isTagTask(task.getType())) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
column.setTags(tags);
|
||||
TaskType taskType = threadContext.getThread().getTask().getType();
|
||||
if (EntityUtil.isDescriptionTask(taskType)) {
|
||||
return new DataModelDescriptionTaskWorkflow(threadContext);
|
||||
} else if (EntityUtil.isTagTask(taskType)) {
|
||||
return new DataModelTagTaskWorkflow(threadContext);
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("Invalid task type %s", taskType));
|
||||
}
|
||||
String updatedEntityJson = JsonUtils.pojoToJson(container);
|
||||
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
|
||||
patch(null, container.getId(), user, patch);
|
||||
return;
|
||||
}
|
||||
super.update(task, entityLink, newValue, user);
|
||||
return super.getTaskWorkflow(threadContext);
|
||||
}
|
||||
|
||||
static class DataModelDescriptionTaskWorkflow extends DescriptionTaskWorkflow {
|
||||
private final Column column;
|
||||
|
||||
DataModelDescriptionTaskWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
DashboardDataModel dataModel =
|
||||
Entity.getEntity(DASHBOARD_DATA_MODEL, threadContext.getAboutEntity().getId(), "dataModel", ALL);
|
||||
threadContext.setAboutEntity(dataModel);
|
||||
column = EntityUtil.findColumn(dataModel.getColumns(), getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
column.setDescription(resolveTask.getNewValue());
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
static class DataModelTagTaskWorkflow extends TagTaskWorkflow {
|
||||
private final Column column;
|
||||
|
||||
DataModelTagTaskWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
DashboardDataModel dataModel =
|
||||
Entity.getEntity(DASHBOARD_DATA_MODEL, threadContext.getAboutEntity().getId(), "dataModel,tags", ALL);
|
||||
threadContext.setAboutEntity(dataModel);
|
||||
column = EntityUtil.findColumn(dataModel.getColumns(), getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class);
|
||||
column.setTags(tags);
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
private void addDerivedColumnTags(List<Column> columns) {
|
||||
|
@ -15,12 +15,14 @@ package org.openmetadata.service.jdbi3;
|
||||
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
||||
import static org.openmetadata.schema.type.Include.ALL;
|
||||
import static org.openmetadata.service.Entity.DASHBOARD_DATA_MODEL;
|
||||
import static org.openmetadata.service.Entity.FIELD_TAGS;
|
||||
|
||||
import java.util.List;
|
||||
import javax.json.JsonPatch;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.feed.ResolveTask;
|
||||
import org.openmetadata.schema.entity.data.DashboardDataModel;
|
||||
import org.openmetadata.schema.entity.services.DashboardService;
|
||||
import org.openmetadata.schema.type.Column;
|
||||
@ -28,9 +30,10 @@ import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.type.Relationship;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.TaskDetails;
|
||||
import org.openmetadata.schema.type.TaskType;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
|
||||
import org.openmetadata.service.resources.databases.DatabaseUtil;
|
||||
import org.openmetadata.service.resources.datamodels.DashboardDataModelResource;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
@ -60,31 +63,57 @@ public class DashboardDataModelRepository extends EntityRepository<DashboardData
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) {
|
||||
public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) {
|
||||
validateTaskThread(threadContext);
|
||||
EntityLink entityLink = threadContext.getAbout();
|
||||
if (entityLink.getFieldName().equals("columns")) {
|
||||
DashboardDataModel dashboardDataModel =
|
||||
getByName(null, entityLink.getEntityFQN(), getFields("columns,tags"), Include.ALL, false);
|
||||
String origJson = JsonUtils.pojoToJson(dashboardDataModel);
|
||||
Column column =
|
||||
dashboardDataModel.getColumns().stream()
|
||||
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
|
||||
.findFirst()
|
||||
.orElseThrow(
|
||||
() ->
|
||||
new IllegalArgumentException(
|
||||
CatalogExceptionMessage.invalidFieldName("column", entityLink.getArrayFieldName())));
|
||||
if (EntityUtil.isDescriptionTask(task.getType())) {
|
||||
column.setDescription(newValue);
|
||||
} else if (EntityUtil.isTagTask(task.getType())) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
column.setTags(tags);
|
||||
TaskType taskType = threadContext.getThread().getTask().getType();
|
||||
if (EntityUtil.isDescriptionTask(taskType)) {
|
||||
return new ColumnDescriptionTaskWorkflow(threadContext);
|
||||
} else if (EntityUtil.isTagTask(taskType)) {
|
||||
return new ColumnTagTaskWorkflow(threadContext);
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("Invalid task type %s", taskType));
|
||||
}
|
||||
String updatedEntityJson = JsonUtils.pojoToJson(dashboardDataModel);
|
||||
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
|
||||
patch(null, dashboardDataModel.getId(), user, patch);
|
||||
return;
|
||||
}
|
||||
super.update(task, entityLink, newValue, user);
|
||||
return super.getTaskWorkflow(threadContext);
|
||||
}
|
||||
|
||||
static class ColumnDescriptionTaskWorkflow extends DescriptionTaskWorkflow {
|
||||
private final Column column;
|
||||
|
||||
ColumnDescriptionTaskWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
DashboardDataModel dataModel =
|
||||
Entity.getEntity(DASHBOARD_DATA_MODEL, threadContext.getAboutEntity().getId(), "columns", ALL);
|
||||
threadContext.setAboutEntity(dataModel);
|
||||
column = EntityUtil.findColumn(dataModel.getColumns(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
column.setDescription(resolveTask.getNewValue());
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
static class ColumnTagTaskWorkflow extends TagTaskWorkflow {
|
||||
private final Column column;
|
||||
|
||||
ColumnTagTaskWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
DashboardDataModel dataModel =
|
||||
Entity.getEntity(DASHBOARD_DATA_MODEL, threadContext.getAboutEntity().getId(), "columns,tags", ALL);
|
||||
threadContext.setAboutEntity(dataModel);
|
||||
column = EntityUtil.findColumn(dataModel.getColumns(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class);
|
||||
column.setTags(tags);
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -15,18 +15,20 @@ package org.openmetadata.service.jdbi3;
|
||||
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
||||
import static org.openmetadata.schema.type.Include.ALL;
|
||||
import static org.openmetadata.service.Entity.DASHBOARD;
|
||||
import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
|
||||
import static org.openmetadata.service.Entity.FIELD_TAGS;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.entity.data.Chart;
|
||||
import org.openmetadata.schema.entity.data.Dashboard;
|
||||
import org.openmetadata.schema.entity.services.DashboardService;
|
||||
import org.openmetadata.schema.type.*;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
|
||||
import org.openmetadata.service.resources.dashboards.DashboardResource;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
@ -56,27 +58,34 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) {
|
||||
public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) {
|
||||
EntityLink entityLink = threadContext.getAbout();
|
||||
if (entityLink.getFieldName().equals("charts")) {
|
||||
Dashboard dashboard = getByName(null, entityLink.getEntityFQN(), getFields("charts,tags"), Include.ALL, false);
|
||||
EntityReference chart =
|
||||
TaskType taskType = threadContext.getThread().getTask().getType();
|
||||
if (!entityLink.getFieldName().equals(FIELD_DESCRIPTION)) {
|
||||
// Only description field can be updated
|
||||
throw new IllegalArgumentException(
|
||||
CatalogExceptionMessage.invalidFieldForTask(entityLink.getFieldName(), taskType));
|
||||
}
|
||||
return new ChartDescriptionTaskWorkflow(threadContext);
|
||||
}
|
||||
return super.getTaskWorkflow(threadContext);
|
||||
}
|
||||
|
||||
static class ChartDescriptionTaskWorkflow extends DescriptionTaskWorkflow {
|
||||
ChartDescriptionTaskWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
Dashboard dashboard = Entity.getEntity(DASHBOARD, threadContext.getAboutEntity().getId(), "charts", ALL);
|
||||
String chartName = threadContext.getAbout().getArrayFieldName();
|
||||
EntityReference chartReference =
|
||||
dashboard.getCharts().stream()
|
||||
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
|
||||
.filter(c -> c.getName().equals(chartName))
|
||||
.findFirst()
|
||||
.orElseThrow(
|
||||
() ->
|
||||
new IllegalArgumentException(
|
||||
CatalogExceptionMessage.invalidFieldName("chart", entityLink.getArrayFieldName())));
|
||||
String fieldName =
|
||||
EntityUtil.isDescriptionTask(task.getType())
|
||||
? FIELD_DESCRIPTION
|
||||
: EntityUtil.isTagTask(task.getType()) ? FIELD_TAGS : "invalidField";
|
||||
EntityLink chartLink = new EntityLink(Entity.CHART, chart.getFullyQualifiedName(), fieldName, null, null);
|
||||
EntityRepository<? extends EntityInterface> chartRepository = Entity.getEntityRepository(Entity.CHART);
|
||||
chartRepository.update(task, chartLink, newValue, user);
|
||||
return;
|
||||
() -> new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("chart", chartName)));
|
||||
Chart chart = Entity.getEntity(chartReference, "", ALL);
|
||||
threadContext.setAboutEntity(chart);
|
||||
}
|
||||
super.update(task, entityLink, newValue, user);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -81,6 +81,7 @@ import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.CheckForNull;
|
||||
import javax.json.JsonPatch;
|
||||
import javax.validation.constraints.NotNull;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import lombok.Getter;
|
||||
@ -91,6 +92,7 @@ import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.VoteRequest;
|
||||
import org.openmetadata.schema.api.feed.ResolveTask;
|
||||
import org.openmetadata.schema.api.teams.CreateTeam;
|
||||
import org.openmetadata.schema.entity.classification.Tag;
|
||||
import org.openmetadata.schema.entity.data.GlossaryTerm;
|
||||
@ -109,8 +111,8 @@ import org.openmetadata.schema.type.ProviderType;
|
||||
import org.openmetadata.schema.type.Relationship;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.TagLabel.TagSource;
|
||||
import org.openmetadata.schema.type.TaskDetails;
|
||||
import org.openmetadata.schema.type.TaskType;
|
||||
import org.openmetadata.schema.type.ThreadType;
|
||||
import org.openmetadata.schema.type.Votes;
|
||||
import org.openmetadata.schema.type.csv.CsvImportResult;
|
||||
import org.openmetadata.schema.utils.EntityInterfaceUtil;
|
||||
@ -123,7 +125,8 @@ import org.openmetadata.service.exception.UnhandledServerException;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO.EntityVersionPair;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO.ExtensionRecord;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
|
||||
import org.openmetadata.service.resources.tags.TagLabelUtil;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
@ -330,26 +333,6 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
entity.setFullyQualifiedName(quoteName(entity.getName()));
|
||||
}
|
||||
|
||||
/** Update an entity based suggested description and tags in the task */
|
||||
public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) {
|
||||
TaskType taskType = task.getType();
|
||||
T entity = getByName(null, entityLink.getEntityFQN(), getFields("tags"), Include.ALL, false);
|
||||
String origJson = JsonUtils.pojoToJson(entity);
|
||||
if (EntityUtil.isDescriptionTask(taskType) && entityLink.getFieldName().equals(FIELD_DESCRIPTION)) {
|
||||
entity.setDescription(newValue);
|
||||
} else if (supportsTags && EntityUtil.isTagTask(taskType) && entityLink.getFieldName().equals("tags")) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
entity.setTags(tags);
|
||||
} else {
|
||||
// Not supported
|
||||
throw new IllegalArgumentException(
|
||||
CatalogExceptionMessage.invalidFieldForTask(entityLink.getFieldName(), task.getType()));
|
||||
}
|
||||
String updatedEntityJson = JsonUtils.pojoToJson(entity);
|
||||
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
|
||||
patch(null, entity.getId(), user, patch);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize data from json files if seed data does not exist in corresponding tables. Seed data is stored under
|
||||
* openmetadata-service/src/main/resources/json/data/{entityType}
|
||||
@ -1683,6 +1666,25 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
return entity.getTags();
|
||||
}
|
||||
|
||||
public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) {
|
||||
validateTaskThread(threadContext);
|
||||
TaskType taskType = threadContext.getThread().getTask().getType();
|
||||
if (EntityUtil.isDescriptionTask(taskType)) {
|
||||
return new DescriptionTaskWorkflow(threadContext);
|
||||
} else if (EntityUtil.isTagTask(taskType)) {
|
||||
return new TagTaskWorkflow(threadContext);
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("Invalid task type %s", taskType));
|
||||
}
|
||||
}
|
||||
|
||||
public void validateTaskThread(ThreadContext threadContext) {
|
||||
ThreadType threadType = threadContext.getThread().getType();
|
||||
if (threadType != ThreadType.Task) {
|
||||
throw new IllegalArgumentException(String.format("Thread type %s is not task related", threadType));
|
||||
}
|
||||
}
|
||||
|
||||
public enum Operation {
|
||||
PUT,
|
||||
PATCH,
|
||||
@ -2321,11 +2323,38 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
|
||||
static class EntityLoaderWithId extends CacheLoader<Pair<String, UUID>, EntityInterface> {
|
||||
@Override
|
||||
public EntityInterface load(@CheckForNull Pair<String, UUID> idPair) throws IOException {
|
||||
public EntityInterface load(@NotNull Pair<String, UUID> idPair) throws IOException {
|
||||
String entityType = idPair.getLeft();
|
||||
UUID id = idPair.getRight();
|
||||
EntityRepository<? extends EntityInterface> repository = Entity.getEntityRepository(entityType);
|
||||
return repository.getDao().findEntityById(id, ALL);
|
||||
}
|
||||
}
|
||||
|
||||
public static class DescriptionTaskWorkflow extends TaskWorkflow {
|
||||
DescriptionTaskWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
EntityInterface aboutEntity = threadContext.getAboutEntity();
|
||||
aboutEntity.setDescription(resolveTask.getNewValue());
|
||||
return aboutEntity;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TagTaskWorkflow extends TaskWorkflow {
|
||||
TagTaskWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class);
|
||||
EntityInterface aboutEntity = threadContext.getAboutEntity();
|
||||
aboutEntity.setTags(tags);
|
||||
return aboutEntity;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,7 +22,6 @@ import static org.openmetadata.schema.type.Relationship.CREATED;
|
||||
import static org.openmetadata.schema.type.Relationship.IS_ABOUT;
|
||||
import static org.openmetadata.schema.type.Relationship.REPLIED_TO;
|
||||
import static org.openmetadata.service.Entity.USER;
|
||||
import static org.openmetadata.service.Entity.getEntityRepository;
|
||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.ANNOUNCEMENT_INVALID_START_TIME;
|
||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.ANNOUNCEMENT_OVERLAP;
|
||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.entityNotFound;
|
||||
@ -42,11 +41,11 @@ import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.json.JsonPatch;
|
||||
import javax.ws.rs.container.ContainerResponseContext;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import javax.ws.rs.core.SecurityContext;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
@ -58,8 +57,6 @@ import org.openmetadata.schema.api.feed.ResolveTask;
|
||||
import org.openmetadata.schema.api.feed.ThreadCount;
|
||||
import org.openmetadata.schema.entity.feed.Thread;
|
||||
import org.openmetadata.schema.entity.teams.User;
|
||||
import org.openmetadata.schema.type.AnnouncementDetails;
|
||||
import org.openmetadata.schema.type.ChangeEvent;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.type.Post;
|
||||
@ -129,44 +126,84 @@ public class FeedRepository {
|
||||
return dao.feedDAO().getTaskId();
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public Thread create(Thread thread) {
|
||||
// Validate about data entity is valid and get the owner for that entity
|
||||
EntityLink about = EntityLink.parse(thread.getAbout());
|
||||
EntityRepository<?> repository = Entity.getEntityRepository(about.getEntityType());
|
||||
String field = repository.supportsOwner ? "owner" : "";
|
||||
EntityInterface aboutEntity = Entity.getEntity(about, field, ALL);
|
||||
thread.withEntityId(aboutEntity.getId()); // Add entity id to thread
|
||||
return createThread(thread, about, aboutEntity.getOwner());
|
||||
public class ThreadContext {
|
||||
@Getter protected final Thread thread;
|
||||
@Getter @Setter protected final EntityLink about;
|
||||
@Getter @Setter protected EntityInterface aboutEntity;
|
||||
@Getter private final EntityReference createdBy;
|
||||
|
||||
ThreadContext(Thread thread) {
|
||||
this.thread = thread;
|
||||
this.about = EntityLink.parse(thread.getAbout());
|
||||
this.aboutEntity = Entity.getEntity(about, getFields(), ALL);
|
||||
this.createdBy = Entity.getEntityReferenceByName(Entity.USER, thread.getCreatedBy(), NON_DELETED);
|
||||
thread.withEntityId(aboutEntity.getId()); // Add entity id to thread
|
||||
}
|
||||
|
||||
public TaskWorkflow getTaskWorkflow() {
|
||||
EntityRepository<?> repository = Entity.getEntityRepository(about.getEntityType());
|
||||
return repository.getTaskWorkflow(this);
|
||||
}
|
||||
|
||||
public EntityRepository<? extends EntityInterface> getEntityRepository() {
|
||||
return Entity.getEntityRepository(about.getEntityType());
|
||||
}
|
||||
|
||||
private String getFields() {
|
||||
EntityRepository<?> repository = getEntityRepository();
|
||||
List<String> fieldList = new ArrayList<>();
|
||||
if (repository.supportsOwner) {
|
||||
fieldList.add("owner");
|
||||
}
|
||||
if (repository.supportsTags) {
|
||||
fieldList.add("tags");
|
||||
}
|
||||
return String.join(",", fieldList.toArray(new String[0]));
|
||||
}
|
||||
}
|
||||
|
||||
public abstract static class TaskWorkflow {
|
||||
protected final ThreadContext threadContext;
|
||||
|
||||
TaskWorkflow(ThreadContext threadContext) {
|
||||
this.threadContext = threadContext;
|
||||
}
|
||||
|
||||
public abstract EntityInterface performTask(String user, ResolveTask resolveTask);
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
protected void closeTask(String user, CloseTask closeTask) {}
|
||||
|
||||
protected final TaskType getTaskType() {
|
||||
return threadContext.getThread().getTask().getType();
|
||||
}
|
||||
|
||||
protected final EntityLink getAbout() {
|
||||
return threadContext.getAbout();
|
||||
}
|
||||
}
|
||||
|
||||
private ThreadContext getThreadContext(Thread thread) {
|
||||
return new ThreadContext(thread);
|
||||
}
|
||||
|
||||
@Transaction
|
||||
private Thread createThread(Thread thread, EntityLink about, EntityReference entityOwner) {
|
||||
// Validate user creating thread
|
||||
UUID createdByUserId = Entity.getEntityReferenceByName(Entity.USER, thread.getCreatedBy(), NON_DELETED).getId();
|
||||
|
||||
if (thread.getType() == ThreadType.Task) {
|
||||
thread.withTask(thread.getTask().withId(getNextTaskId())); // Assign taskId for a task
|
||||
} else if (thread.getType() == ThreadType.Announcement) {
|
||||
// Validate start and end time for announcement
|
||||
validateAnnouncement(thread.getAnnouncement());
|
||||
long startTime = thread.getAnnouncement().getStartTime();
|
||||
long endTime = thread.getAnnouncement().getEndTime();
|
||||
// TODO fix this - overlapping announcements should be allowed
|
||||
List<String> announcements =
|
||||
dao.feedDAO()
|
||||
.listAnnouncementBetween(thread.getId().toString(), thread.getEntityId().toString(), startTime, endTime);
|
||||
if (!announcements.isEmpty()) {
|
||||
// There is already an announcement that overlaps the new one
|
||||
throw new IllegalArgumentException(ANNOUNCEMENT_OVERLAP);
|
||||
}
|
||||
}
|
||||
public Thread create(Thread thread) {
|
||||
ThreadContext threadContext = getThreadContext(thread);
|
||||
return createThread(threadContext);
|
||||
}
|
||||
|
||||
public void store(ThreadContext threadContext) {
|
||||
// Insert a new thread
|
||||
dao.feedDAO().insert(JsonUtils.pojoToJson(thread));
|
||||
dao.feedDAO().insert(JsonUtils.pojoToJson(threadContext.getThread()));
|
||||
}
|
||||
|
||||
public void storeRelationships(ThreadContext threadContext) {
|
||||
Thread thread = threadContext.getThread();
|
||||
EntityLink about = threadContext.getAbout();
|
||||
// Add relationship User -- created --> Thread relationship
|
||||
dao.relationshipDAO().insert(createdByUserId, thread.getId(), USER, Entity.THREAD, CREATED.ordinal());
|
||||
dao.relationshipDAO()
|
||||
.insert(threadContext.getCreatedBy().getId(), thread.getId(), USER, Entity.THREAD, CREATED.ordinal());
|
||||
|
||||
// Add field relationship for data asset - Thread -- isAbout ---> entity/entityField
|
||||
dao.fieldRelationshipDAO()
|
||||
@ -180,7 +217,9 @@ public class FeedRepository {
|
||||
IS_ABOUT.ordinal(),
|
||||
null);
|
||||
|
||||
// Add the owner also as addressedTo as the entity he owns when addressed, the owner is actually being addressed
|
||||
// Add the owner also as addressedTo as the entity he owns when addressed, the owner is
|
||||
// actually being addressed
|
||||
EntityReference entityOwner = threadContext.getAboutEntity().getOwner();
|
||||
if (entityOwner != null) {
|
||||
dao.relationshipDAO()
|
||||
.insert(thread.getId(), entityOwner.getId(), Entity.THREAD, entityOwner.getType(), ADDRESSED_TO.ordinal());
|
||||
@ -188,30 +227,21 @@ public class FeedRepository {
|
||||
|
||||
// Add mentions to field relationship table
|
||||
storeMentions(thread, thread.getMessage());
|
||||
populateAssignees(thread);
|
||||
return thread;
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public Thread create(Thread thread, ContainerResponseContext responseContext) {
|
||||
// Validate about data entity is valid and get the owner for that entity
|
||||
EntityInterface entity;
|
||||
// In case of ENTITY_FIELDS_CHANGED entity from responseContext will be a ChangeEvent
|
||||
if (responseContext.getEntity() instanceof ChangeEvent) {
|
||||
ChangeEvent change = (ChangeEvent) responseContext.getEntity();
|
||||
entity = (EntityInterface) change.getEntity();
|
||||
} else {
|
||||
entity = (EntityInterface) responseContext.getEntity();
|
||||
private Thread createThread(ThreadContext threadContext) {
|
||||
Thread thread = threadContext.getThread();
|
||||
if (thread.getType() == ThreadType.Task) {
|
||||
thread.getTask().withId(getNextTaskId());
|
||||
} else if (thread.getType() == ThreadType.Announcement) {
|
||||
// Validate start and end time for announcement
|
||||
validateAnnouncement(thread);
|
||||
}
|
||||
EntityReference owner = null;
|
||||
try {
|
||||
owner = Entity.getOwner(entity.getEntityReference());
|
||||
} catch (Exception ignored) {
|
||||
// Either deleted or owner field not available
|
||||
}
|
||||
EntityLink about = EntityLink.parse(thread.getAbout());
|
||||
thread.withEntityId(entity.getId()); // Add entity id to thread
|
||||
return createThread(thread, about, owner);
|
||||
store(threadContext);
|
||||
storeRelationships(threadContext);
|
||||
populateAssignees(threadContext.getThread());
|
||||
return threadContext.getThread();
|
||||
}
|
||||
|
||||
public Thread get(String id) {
|
||||
@ -228,26 +258,35 @@ public class FeedRepository {
|
||||
|
||||
public PatchResponse<Thread> closeTask(UriInfo uriInfo, Thread thread, String user, CloseTask closeTask) {
|
||||
// Update the attributes
|
||||
closeTask(thread, user, closeTask.getComment());
|
||||
ThreadContext threadContext = getThreadContext(thread);
|
||||
closeTask(threadContext, user, closeTask);
|
||||
Thread updatedHref = FeedResource.addHref(uriInfo, thread);
|
||||
return new PatchResponse<>(Status.OK, updatedHref, RestUtil.ENTITY_UPDATED);
|
||||
}
|
||||
|
||||
public PatchResponse<Thread> resolveTask(UriInfo uriInfo, Thread thread, String user, ResolveTask resolveTask) {
|
||||
// perform the task
|
||||
TaskDetails task = thread.getTask();
|
||||
EntityLink about = EntityLink.parse(thread.getAbout());
|
||||
EntityReference aboutRef = EntityUtil.validateEntityLink(about);
|
||||
EntityRepository<?> repository = getEntityRepository(aboutRef.getType());
|
||||
repository.update(task, about, resolveTask.getNewValue(), user);
|
||||
|
||||
// Update the attributes
|
||||
task.withNewValue(resolveTask.getNewValue());
|
||||
closeTask(thread, user, null);
|
||||
ThreadContext threadContext = getThreadContext(thread);
|
||||
resolveTask(threadContext, user, resolveTask);
|
||||
Thread updatedHref = FeedResource.addHref(uriInfo, thread);
|
||||
return new PatchResponse<>(Status.OK, updatedHref, RestUtil.ENTITY_UPDATED);
|
||||
}
|
||||
|
||||
private void resolveTask(ThreadContext threadContext, String user, ResolveTask resolveTask) {
|
||||
TaskWorkflow taskWorkflow = threadContext.getTaskWorkflow();
|
||||
EntityInterface aboutEntity = threadContext.getAboutEntity();
|
||||
String origJson = JsonUtils.pojoToJson(aboutEntity);
|
||||
EntityInterface updatedEntity = taskWorkflow.performTask(user, resolveTask);
|
||||
String updatedEntityJson = JsonUtils.pojoToJson(updatedEntity);
|
||||
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
|
||||
EntityRepository<?> repository = threadContext.getEntityRepository();
|
||||
repository.patch(null, aboutEntity.getId(), user, patch);
|
||||
|
||||
// Update the attributes
|
||||
threadContext.getThread().getTask().withNewValue(resolveTask.getNewValue());
|
||||
closeTask(threadContext, user, new CloseTask());
|
||||
}
|
||||
|
||||
private String getTagFQNs(List<TagLabel> tags) {
|
||||
return tags.stream().map(TagLabel::getTagFQN).collect(Collectors.joining(", "));
|
||||
}
|
||||
@ -290,13 +329,16 @@ public class FeedRepository {
|
||||
addPostToThread(thread.getId().toString(), post, user);
|
||||
}
|
||||
|
||||
private void closeTask(Thread thread, String user, String closingComment) {
|
||||
private void closeTask(ThreadContext threadContext, String user, CloseTask closeTask) {
|
||||
Thread thread = threadContext.getThread();
|
||||
TaskDetails task = thread.getTask();
|
||||
TaskWorkflow workflow = threadContext.getTaskWorkflow();
|
||||
workflow.closeTask(user, closeTask);
|
||||
task.withStatus(TaskStatus.Closed).withClosedBy(user).withClosedAt(System.currentTimeMillis());
|
||||
thread.withTask(task).withUpdatedBy(user).withUpdatedAt(System.currentTimeMillis());
|
||||
|
||||
dao.feedDAO().update(thread.getId().toString(), JsonUtils.pojoToJson(thread));
|
||||
addClosingPost(thread, user, closingComment);
|
||||
addClosingPost(thread, user, closeTask.getComment());
|
||||
sortPosts(thread);
|
||||
}
|
||||
|
||||
@ -610,18 +652,7 @@ public class FeedRepository {
|
||||
}
|
||||
|
||||
if (updated.getAnnouncement() != null) {
|
||||
validateAnnouncement(updated.getAnnouncement());
|
||||
// check if the announcement start and end time clashes with other existing announcements
|
||||
List<String> announcements =
|
||||
dao.feedDAO()
|
||||
.listAnnouncementBetween(
|
||||
id.toString(),
|
||||
updated.getEntityId().toString(),
|
||||
updated.getAnnouncement().getStartTime(),
|
||||
updated.getAnnouncement().getEndTime());
|
||||
if (!announcements.isEmpty()) {
|
||||
throw new IllegalArgumentException(ANNOUNCEMENT_OVERLAP);
|
||||
}
|
||||
validateAnnouncement(updated);
|
||||
}
|
||||
|
||||
// Update the attributes
|
||||
@ -665,10 +696,20 @@ public class FeedRepository {
|
||||
CatalogExceptionMessage.taskOperationNotAllowed(userName, closeTask ? "closeTask" : "resolveTask"));
|
||||
}
|
||||
|
||||
private void validateAnnouncement(AnnouncementDetails announcementDetails) {
|
||||
if (announcementDetails.getStartTime() >= announcementDetails.getEndTime()) {
|
||||
private void validateAnnouncement(Thread thread) {
|
||||
long startTime = thread.getAnnouncement().getStartTime();
|
||||
long endTime = thread.getAnnouncement().getEndTime();
|
||||
if (startTime >= endTime) {
|
||||
throw new IllegalArgumentException(ANNOUNCEMENT_INVALID_START_TIME);
|
||||
}
|
||||
// TODO fix this - overlapping announcements should be allowed
|
||||
List<String> announcements =
|
||||
dao.feedDAO()
|
||||
.listAnnouncementBetween(thread.getId().toString(), thread.getEntityId().toString(), startTime, endTime);
|
||||
if (!announcements.isEmpty()) {
|
||||
// There is already an announcement that overlaps the new one
|
||||
throw new IllegalArgumentException(ANNOUNCEMENT_OVERLAP);
|
||||
}
|
||||
}
|
||||
|
||||
private void restorePatchAttributes(Thread original, Thread updated) {
|
||||
|
@ -25,9 +25,9 @@ import static org.openmetadata.service.util.EntityUtil.mlHyperParameterMatch;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import javax.json.JsonPatch;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.feed.ResolveTask;
|
||||
import org.openmetadata.schema.entity.data.MlModel;
|
||||
import org.openmetadata.schema.entity.services.MlModelService;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
@ -37,10 +37,12 @@ import org.openmetadata.schema.type.MlFeatureSource;
|
||||
import org.openmetadata.schema.type.MlHyperParameter;
|
||||
import org.openmetadata.schema.type.Relationship;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.TaskDetails;
|
||||
import org.openmetadata.schema.type.TaskType;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.service.resources.mlmodels.MlModelResource;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
@ -63,6 +65,14 @@ public class MlModelRepository extends EntityRepository<MlModel> {
|
||||
MODEL_UPDATE_FIELDS);
|
||||
}
|
||||
|
||||
public static MlFeature findMlFeature(List<MlFeature> features, String featureName) {
|
||||
return features.stream()
|
||||
.filter(c -> c.getName().equals(featureName))
|
||||
.findFirst()
|
||||
.orElseThrow(
|
||||
() -> new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("mlFeature", featureName)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFullyQualifiedName(MlModel mlModel) {
|
||||
mlModel.setFullyQualifiedName(
|
||||
@ -238,31 +248,53 @@ public class MlModelRepository extends EntityRepository<MlModel> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(TaskDetails task, MessageParser.EntityLink entityLink, String newValue, String user) {
|
||||
public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) {
|
||||
validateTaskThread(threadContext);
|
||||
EntityLink entityLink = threadContext.getAbout();
|
||||
if (entityLink.getFieldName().equals("mlFeatures")) {
|
||||
MlModel mlModel = getByName(null, entityLink.getEntityFQN(), getFields("tags"), Include.ALL, false);
|
||||
MlFeature mlFeature =
|
||||
mlModel.getMlFeatures().stream()
|
||||
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
|
||||
.findFirst()
|
||||
.orElseThrow(
|
||||
() ->
|
||||
new IllegalArgumentException(
|
||||
CatalogExceptionMessage.invalidFieldName("chart", entityLink.getArrayFieldName())));
|
||||
|
||||
String origJson = JsonUtils.pojoToJson(mlModel);
|
||||
if (EntityUtil.isDescriptionTask(task.getType())) {
|
||||
mlFeature.setDescription(newValue);
|
||||
} else if (EntityUtil.isTagTask(task.getType())) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
mlFeature.setTags(tags);
|
||||
TaskType taskType = threadContext.getThread().getTask().getType();
|
||||
if (EntityUtil.isDescriptionTask(taskType)) {
|
||||
return new MlFeatureDescriptionTaskWorkflow(threadContext);
|
||||
} else if (EntityUtil.isTagTask(taskType)) {
|
||||
return new MlFeatureTagTaskWorkflow(threadContext);
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("Invalid task type %s", taskType));
|
||||
}
|
||||
String updatedEntityJson = JsonUtils.pojoToJson(mlModel);
|
||||
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
|
||||
patch(null, mlModel.getId(), user, patch);
|
||||
return;
|
||||
}
|
||||
super.update(task, entityLink, newValue, user);
|
||||
return super.getTaskWorkflow(threadContext);
|
||||
}
|
||||
|
||||
static class MlFeatureDescriptionTaskWorkflow extends DescriptionTaskWorkflow {
|
||||
private final MlFeature mlFeature;
|
||||
|
||||
MlFeatureDescriptionTaskWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
MlModel mlModel = (MlModel) threadContext.getAboutEntity();
|
||||
mlFeature = findMlFeature(mlModel.getMlFeatures(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
mlFeature.setDescription(resolveTask.getNewValue());
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
static class MlFeatureTagTaskWorkflow extends TagTaskWorkflow {
|
||||
private final MlFeature mlFeature;
|
||||
|
||||
MlFeatureTagTaskWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
MlModel mlModel = (MlModel) threadContext.getAboutEntity();
|
||||
mlFeature = findMlFeature(mlModel.getMlFeatures(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class);
|
||||
mlFeature.setTags(tags);
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
private void populateService(MlModel mlModel) {
|
||||
|
@ -16,15 +16,16 @@ package org.openmetadata.service.jdbi3;
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
||||
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
import static org.openmetadata.schema.type.Include.ALL;
|
||||
import static org.openmetadata.service.Entity.CONTAINER;
|
||||
import static org.openmetadata.service.Entity.FIELD_TAGS;
|
||||
import static org.openmetadata.service.Entity.PIPELINE_SERVICE;
|
||||
import static org.openmetadata.service.util.EntityUtil.taskMatch;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import javax.json.JsonPatch;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.feed.ResolveTask;
|
||||
import org.openmetadata.schema.entity.data.Pipeline;
|
||||
import org.openmetadata.schema.entity.data.PipelineStatus;
|
||||
import org.openmetadata.schema.entity.services.PipelineService;
|
||||
@ -34,11 +35,13 @@ import org.openmetadata.schema.type.Relationship;
|
||||
import org.openmetadata.schema.type.Status;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.Task;
|
||||
import org.openmetadata.schema.type.TaskDetails;
|
||||
import org.openmetadata.schema.type.TaskType;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.exception.EntityNotFoundException;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.service.resources.pipelines.PipelineResource;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
@ -71,30 +74,55 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(TaskDetails task, MessageParser.EntityLink entityLink, String newValue, String user) {
|
||||
public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) {
|
||||
validateTaskThread(threadContext);
|
||||
EntityLink entityLink = threadContext.getAbout();
|
||||
if (entityLink.getFieldName().equals(TASKS_FIELD)) {
|
||||
Pipeline pipeline = getByName(null, entityLink.getEntityFQN(), getFields("tasks,tags"), Include.ALL, false);
|
||||
String oldJson = JsonUtils.pojoToJson(pipeline);
|
||||
Task pipelineTask =
|
||||
pipeline.getTasks().stream()
|
||||
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
|
||||
.findFirst()
|
||||
.orElseThrow(
|
||||
() ->
|
||||
new IllegalArgumentException(
|
||||
CatalogExceptionMessage.invalidFieldName("task", entityLink.getArrayFieldName())));
|
||||
if (EntityUtil.isDescriptionTask(task.getType())) {
|
||||
pipelineTask.setDescription(newValue);
|
||||
} else if (EntityUtil.isTagTask(task.getType())) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
pipelineTask.setTags(tags);
|
||||
TaskType taskType = threadContext.getThread().getTask().getType();
|
||||
if (EntityUtil.isDescriptionTask(taskType)) {
|
||||
return new TaskDescriptionWorkflow(threadContext);
|
||||
} else if (EntityUtil.isTagTask(taskType)) {
|
||||
return new TaskTagWorkflow(threadContext);
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("Invalid task type %s", taskType));
|
||||
}
|
||||
String updatedEntityJson = JsonUtils.pojoToJson(pipeline);
|
||||
JsonPatch patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
|
||||
patch(null, pipeline.getId(), user, patch);
|
||||
return;
|
||||
}
|
||||
super.update(task, entityLink, newValue, user);
|
||||
return super.getTaskWorkflow(threadContext);
|
||||
}
|
||||
|
||||
static class TaskDescriptionWorkflow extends DescriptionTaskWorkflow {
|
||||
private final Task task;
|
||||
|
||||
TaskDescriptionWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
Pipeline pipeline = Entity.getEntity(CONTAINER, threadContext.getAboutEntity().getId(), "tasks", ALL);
|
||||
threadContext.setAboutEntity(pipeline);
|
||||
task = findTask(pipeline.getTasks(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
task.setDescription(resolveTask.getNewValue());
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
static class TaskTagWorkflow extends TagTaskWorkflow {
|
||||
private final Task task;
|
||||
|
||||
TaskTagWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
Pipeline pipeline = Entity.getEntity(CONTAINER, threadContext.getAboutEntity().getId(), "tasks,tags", ALL);
|
||||
threadContext.setAboutEntity(pipeline);
|
||||
task = findTask(pipeline.getTasks(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class);
|
||||
task.setTags(tags);
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -361,4 +389,11 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Task findTask(List<Task> tasks, String taskName) {
|
||||
return tasks.stream()
|
||||
.filter(c -> c.getName().equals(taskName))
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("task", taskName)));
|
||||
}
|
||||
}
|
||||
|
@ -32,20 +32,22 @@ import java.util.UUID;
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.json.JsonPatch;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.feed.ResolveTask;
|
||||
import org.openmetadata.schema.entity.data.SearchIndex;
|
||||
import org.openmetadata.schema.entity.services.SearchService;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Relationship;
|
||||
import org.openmetadata.schema.type.SearchIndexField;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.TaskDetails;
|
||||
import org.openmetadata.schema.type.TaskType;
|
||||
import org.openmetadata.schema.type.searchindex.SearchIndexSampleData;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.service.resources.searchindex.SearchIndexResource;
|
||||
import org.openmetadata.service.security.mask.PIIMasker;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
@ -275,51 +277,84 @@ public class SearchIndexRepository extends EntityRepository<SearchIndex> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(TaskDetails task, MessageParser.EntityLink entityLink, String newValue, String user) {
|
||||
public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) {
|
||||
validateTaskThread(threadContext);
|
||||
EntityLink entityLink = threadContext.getAbout();
|
||||
if (entityLink.getFieldName().equals("fields")) {
|
||||
String schemaName = entityLink.getArrayFieldName();
|
||||
String childrenSchemaName = "";
|
||||
if (entityLink.getArrayFieldName().contains(".")) {
|
||||
String fieldNameWithoutQuotes =
|
||||
entityLink.getArrayFieldName().substring(1, entityLink.getArrayFieldName().length() - 1);
|
||||
schemaName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf("."));
|
||||
childrenSchemaName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1);
|
||||
TaskType taskType = threadContext.getThread().getTask().getType();
|
||||
if (EntityUtil.isDescriptionTask(taskType)) {
|
||||
return new FieldDescriptionWorkflow(threadContext);
|
||||
} else if (EntityUtil.isTagTask(taskType)) {
|
||||
return new FieldTagWorkflow(threadContext);
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("Invalid task type %s", taskType));
|
||||
}
|
||||
SearchIndex searchIndex = getByName(null, entityLink.getEntityFQN(), getFields("tags"), ALL, false);
|
||||
SearchIndexField schemaField = null;
|
||||
for (SearchIndexField field : searchIndex.getFields()) {
|
||||
if (field.getName().equals(schemaName)) {
|
||||
schemaField = field;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!"".equals(childrenSchemaName) && schemaField != null) {
|
||||
schemaField = getChildrenSchemaField(schemaField.getChildren(), childrenSchemaName);
|
||||
}
|
||||
if (schemaField == null) {
|
||||
throw new IllegalArgumentException(
|
||||
CatalogExceptionMessage.invalidFieldName("schema", entityLink.getArrayFieldName()));
|
||||
}
|
||||
|
||||
String origJson = JsonUtils.pojoToJson(searchIndex);
|
||||
if (EntityUtil.isDescriptionTask(task.getType())) {
|
||||
schemaField.setDescription(newValue);
|
||||
} else if (EntityUtil.isTagTask(task.getType())) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
schemaField.setTags(tags);
|
||||
}
|
||||
String updatedEntityJson = JsonUtils.pojoToJson(searchIndex);
|
||||
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
|
||||
patch(null, searchIndex.getId(), user, patch);
|
||||
return;
|
||||
}
|
||||
super.update(task, entityLink, newValue, user);
|
||||
return super.getTaskWorkflow(threadContext);
|
||||
}
|
||||
|
||||
private static SearchIndexField getChildrenSchemaField(List<SearchIndexField> fields, String childrenSchemaName) {
|
||||
static class FieldDescriptionWorkflow extends DescriptionTaskWorkflow {
|
||||
private final SearchIndexField schemaField;
|
||||
|
||||
FieldDescriptionWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
schemaField =
|
||||
getSchemaField((SearchIndex) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
schemaField.setDescription(resolveTask.getNewValue());
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
static class FieldTagWorkflow extends TagTaskWorkflow {
|
||||
private final SearchIndexField schemaField;
|
||||
|
||||
FieldTagWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
schemaField =
|
||||
getSchemaField((SearchIndex) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class);
|
||||
schemaField.setTags(tags);
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
private static SearchIndexField getSchemaField(SearchIndex searchIndex, String fieldName) {
|
||||
String schemaName = fieldName;
|
||||
List<SearchIndexField> schemaFields = searchIndex.getFields();
|
||||
String childSchemaName = "";
|
||||
if (fieldName.contains(".")) {
|
||||
String fieldNameWithoutQuotes = fieldName.substring(1, fieldName.length() - 1);
|
||||
schemaName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf("."));
|
||||
childSchemaName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1);
|
||||
}
|
||||
SearchIndexField schemaField = null;
|
||||
for (SearchIndexField field : schemaFields) {
|
||||
if (field.getName().equals(schemaName)) {
|
||||
schemaField = field;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!"".equals(childSchemaName) && schemaField != null) {
|
||||
schemaField = getChildSchemaField(schemaField.getChildren(), childSchemaName);
|
||||
}
|
||||
if (schemaField == null) {
|
||||
throw new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("schema", fieldName));
|
||||
}
|
||||
return schemaField;
|
||||
}
|
||||
|
||||
private static SearchIndexField getChildSchemaField(List<SearchIndexField> fields, String childSchemaName) {
|
||||
SearchIndexField childrenSchemaField = null;
|
||||
for (SearchIndexField field : fields) {
|
||||
if (field.getName().equals(childrenSchemaName)) {
|
||||
if (field.getName().equals(childSchemaName)) {
|
||||
childrenSchemaField = field;
|
||||
break;
|
||||
}
|
||||
@ -327,7 +362,7 @@ public class SearchIndexRepository extends EntityRepository<SearchIndex> {
|
||||
if (childrenSchemaField == null) {
|
||||
for (SearchIndexField field : fields) {
|
||||
if (field.getChildren() != null) {
|
||||
childrenSchemaField = getChildrenSchemaField(field.getChildren(), childrenSchemaName);
|
||||
childrenSchemaField = getChildSchemaField(field.getChildren(), childSchemaName);
|
||||
if (childrenSchemaField != null) {
|
||||
break;
|
||||
}
|
||||
|
@ -39,7 +39,6 @@ import java.util.UUID;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import javax.json.JsonPatch;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
@ -47,6 +46,7 @@ import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.data.CreateTableProfile;
|
||||
import org.openmetadata.schema.api.feed.ResolveTask;
|
||||
import org.openmetadata.schema.entity.data.DatabaseSchema;
|
||||
import org.openmetadata.schema.entity.data.Table;
|
||||
import org.openmetadata.schema.tests.CustomMetric;
|
||||
@ -69,11 +69,12 @@ import org.openmetadata.schema.type.TableJoins;
|
||||
import org.openmetadata.schema.type.TableProfile;
|
||||
import org.openmetadata.schema.type.TableProfilerConfig;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.TaskDetails;
|
||||
import org.openmetadata.schema.type.TaskType;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.exception.EntityNotFoundException;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
|
||||
import org.openmetadata.service.resources.databases.DatabaseUtil;
|
||||
import org.openmetadata.service.resources.databases.TableResource;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
@ -762,48 +763,76 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) {
|
||||
validateEntityLinkFieldExists(entityLink, task.getType());
|
||||
public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) {
|
||||
validateTaskThread(threadContext);
|
||||
EntityLink entityLink = threadContext.getAbout();
|
||||
if (entityLink.getFieldName().equals("columns")) {
|
||||
String columnName = entityLink.getArrayFieldName();
|
||||
String childrenName = "";
|
||||
if (entityLink.getArrayFieldName().contains(".")) {
|
||||
String fieldNameWithoutQuotes =
|
||||
entityLink.getArrayFieldName().substring(1, entityLink.getArrayFieldName().length() - 1);
|
||||
columnName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf("."));
|
||||
childrenName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1);
|
||||
TaskType taskType = threadContext.getThread().getTask().getType();
|
||||
if (EntityUtil.isDescriptionTask(taskType)) {
|
||||
return new ColumnDescriptionWorkflow(threadContext);
|
||||
} else if (EntityUtil.isTagTask(taskType)) {
|
||||
return new ColumnTagWorkflow(threadContext);
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("Invalid task type %s", taskType));
|
||||
}
|
||||
Table table = getByName(null, entityLink.getEntityFQN(), getFields("columns,tags"), Include.ALL, false);
|
||||
Column column = null;
|
||||
for (Column c : table.getColumns()) {
|
||||
if (c.getName().equals(columnName)) {
|
||||
column = c;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!"".equals(childrenName) && column != null) {
|
||||
column = getChildrenColumn(column.getChildren(), childrenName);
|
||||
}
|
||||
if (column == null) {
|
||||
throw new IllegalArgumentException(
|
||||
CatalogExceptionMessage.invalidFieldName("column", entityLink.getArrayFieldName()));
|
||||
}
|
||||
String origJson = JsonUtils.pojoToJson(table);
|
||||
if (EntityUtil.isDescriptionTask(task.getType())) {
|
||||
column.setDescription(newValue);
|
||||
} else if (EntityUtil.isTagTask(task.getType())) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
column.setTags(tags);
|
||||
}
|
||||
String updatedEntityJson = JsonUtils.pojoToJson(table);
|
||||
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
|
||||
patch(null, table.getId(), user, patch);
|
||||
return;
|
||||
}
|
||||
super.update(task, entityLink, newValue, user);
|
||||
return super.getTaskWorkflow(threadContext);
|
||||
}
|
||||
|
||||
private static Column getChildrenColumn(List<Column> column, String childrenName) {
|
||||
static class ColumnDescriptionWorkflow extends DescriptionTaskWorkflow {
|
||||
private final Column column;
|
||||
|
||||
ColumnDescriptionWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
Table table = Entity.getEntity(TABLE, threadContext.getAboutEntity().getId(), "columns", ALL);
|
||||
threadContext.setAboutEntity(table);
|
||||
column = getColumn((Table) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
column.setDescription(resolveTask.getNewValue());
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
static class ColumnTagWorkflow extends TagTaskWorkflow {
|
||||
private final Column column;
|
||||
|
||||
ColumnTagWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
Table table = Entity.getEntity(TABLE, threadContext.getAboutEntity().getId(), "columns,tags", ALL);
|
||||
threadContext.setAboutEntity(table);
|
||||
column = getColumn((Table) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class);
|
||||
column.setTags(tags);
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
private static Column getColumn(Table table, String columnName) {
|
||||
String childrenName = "";
|
||||
if (columnName.contains(".")) {
|
||||
String fieldNameWithoutQuotes = columnName.substring(1, columnName.length() - 1);
|
||||
columnName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf("."));
|
||||
childrenName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1);
|
||||
}
|
||||
|
||||
Column column = EntityUtil.findColumn(table.getColumns(), columnName);
|
||||
if (!"".equals(childrenName) && column != null) {
|
||||
column = getChildColumn(column.getChildren(), childrenName);
|
||||
}
|
||||
if (column == null) {
|
||||
throw new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("column", columnName));
|
||||
}
|
||||
return column;
|
||||
}
|
||||
|
||||
private static Column getChildColumn(List<Column> column, String childrenName) {
|
||||
Column childrenColumn = null;
|
||||
for (Column col : column) {
|
||||
if (col.getName().equals(childrenName)) {
|
||||
@ -814,7 +843,7 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
if (childrenColumn == null) {
|
||||
for (Column value : column) {
|
||||
if (value.getChildren() != null) {
|
||||
childrenColumn = getChildrenColumn(value.getChildren(), childrenName);
|
||||
childrenColumn = getChildColumn(value.getChildren(), childrenName);
|
||||
if (childrenColumn != null) {
|
||||
break;
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
|
||||
import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME;
|
||||
import static org.openmetadata.service.Entity.FIELD_TAGS;
|
||||
import static org.openmetadata.service.Entity.MESSAGING_SERVICE;
|
||||
import static org.openmetadata.service.util.EntityUtil.getSchemaField;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
@ -31,21 +30,23 @@ import java.util.UUID;
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.json.JsonPatch;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.feed.ResolveTask;
|
||||
import org.openmetadata.schema.entity.data.Topic;
|
||||
import org.openmetadata.schema.entity.services.MessagingService;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Field;
|
||||
import org.openmetadata.schema.type.Relationship;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.TaskDetails;
|
||||
import org.openmetadata.schema.type.TaskType;
|
||||
import org.openmetadata.schema.type.topic.CleanupPolicy;
|
||||
import org.openmetadata.schema.type.topic.TopicSampleData;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.service.resources.topics.TopicResource;
|
||||
import org.openmetadata.service.security.mask.PIIMasker;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
@ -268,48 +269,79 @@ public class TopicRepository extends EntityRepository<Topic> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(TaskDetails task, MessageParser.EntityLink entityLink, String newValue, String user) {
|
||||
public TaskWorkflow getTaskWorkflow(ThreadContext threadContext) {
|
||||
validateTaskThread(threadContext);
|
||||
EntityLink entityLink = threadContext.getAbout();
|
||||
if (entityLink.getFieldName().equals("messageSchema")) {
|
||||
String schemaName = entityLink.getArrayFieldName();
|
||||
String childrenSchemaName = "";
|
||||
if (entityLink.getArrayFieldName().contains(".")) {
|
||||
String fieldNameWithoutQuotes =
|
||||
entityLink.getArrayFieldName().substring(1, entityLink.getArrayFieldName().length() - 1);
|
||||
schemaName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf("."));
|
||||
childrenSchemaName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1);
|
||||
TaskType taskType = threadContext.getThread().getTask().getType();
|
||||
if (EntityUtil.isDescriptionTask(taskType)) {
|
||||
return new MessageSchemaDescriptionWorkflow(threadContext);
|
||||
} else if (EntityUtil.isTagTask(taskType)) {
|
||||
return new MessageSchemaTagWorkflow(threadContext);
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("Invalid task type %s", taskType));
|
||||
}
|
||||
Topic topic = getByName(null, entityLink.getEntityFQN(), getFields("tags"), ALL, false);
|
||||
Field schemaField = null;
|
||||
for (Field field : topic.getMessageSchema().getSchemaFields()) {
|
||||
if (field.getName().equals(schemaName)) {
|
||||
schemaField = field;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!"".equals(childrenSchemaName) && schemaField != null) {
|
||||
schemaField = getchildrenSchemaField(schemaField.getChildren(), childrenSchemaName);
|
||||
}
|
||||
if (schemaField == null) {
|
||||
throw new IllegalArgumentException(
|
||||
CatalogExceptionMessage.invalidFieldName("schema", entityLink.getArrayFieldName()));
|
||||
}
|
||||
|
||||
String origJson = JsonUtils.pojoToJson(topic);
|
||||
if (EntityUtil.isDescriptionTask(task.getType())) {
|
||||
schemaField.setDescription(newValue);
|
||||
} else if (EntityUtil.isTagTask(task.getType())) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
schemaField.setTags(tags);
|
||||
}
|
||||
String updatedEntityJson = JsonUtils.pojoToJson(topic);
|
||||
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
|
||||
patch(null, topic.getId(), user, patch);
|
||||
return;
|
||||
}
|
||||
super.update(task, entityLink, newValue, user);
|
||||
return super.getTaskWorkflow(threadContext);
|
||||
}
|
||||
|
||||
private static Field getchildrenSchemaField(List<Field> fields, String childrenSchemaName) {
|
||||
static class MessageSchemaDescriptionWorkflow extends DescriptionTaskWorkflow {
|
||||
private final Field schemaField;
|
||||
|
||||
MessageSchemaDescriptionWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
schemaField =
|
||||
getSchemaField((Topic) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
schemaField.setDescription(resolveTask.getNewValue());
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
static class MessageSchemaTagWorkflow extends TagTaskWorkflow {
|
||||
private final Field schemaField;
|
||||
|
||||
MessageSchemaTagWorkflow(ThreadContext threadContext) {
|
||||
super(threadContext);
|
||||
schemaField =
|
||||
getSchemaField((Topic) threadContext.getAboutEntity(), threadContext.getAbout().getArrayFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(resolveTask.getNewValue(), TagLabel.class);
|
||||
schemaField.setTags(tags);
|
||||
return threadContext.getAboutEntity();
|
||||
}
|
||||
}
|
||||
|
||||
private static Field getSchemaField(Topic topic, String schemaName) {
|
||||
String childrenSchemaName = "";
|
||||
if (schemaName.contains(".")) {
|
||||
String fieldNameWithoutQuotes = schemaName.substring(1, schemaName.length() - 1);
|
||||
schemaName = fieldNameWithoutQuotes.substring(0, fieldNameWithoutQuotes.indexOf("."));
|
||||
childrenSchemaName = fieldNameWithoutQuotes.substring(fieldNameWithoutQuotes.lastIndexOf(".") + 1);
|
||||
}
|
||||
Field schemaField = null;
|
||||
for (Field field : topic.getMessageSchema().getSchemaFields()) {
|
||||
if (field.getName().equals(schemaName)) {
|
||||
schemaField = field;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!"".equals(childrenSchemaName) && schemaField != null) {
|
||||
schemaField = getChildSchemaField(schemaField.getChildren(), childrenSchemaName);
|
||||
}
|
||||
if (schemaField == null) {
|
||||
throw new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("schema", schemaName));
|
||||
}
|
||||
return schemaField;
|
||||
}
|
||||
|
||||
private static Field getChildSchemaField(List<Field> fields, String childrenSchemaName) {
|
||||
Field childrenSchemaField = null;
|
||||
for (Field field : fields) {
|
||||
if (field.getName().equals(childrenSchemaName)) {
|
||||
@ -320,7 +352,7 @@ public class TopicRepository extends EntityRepository<Topic> {
|
||||
if (childrenSchemaField == null) {
|
||||
for (Field field : fields) {
|
||||
if (field.getChildren() != null) {
|
||||
childrenSchemaField = getchildrenSchemaField(field.getChildren(), childrenSchemaName);
|
||||
childrenSchemaField = getChildSchemaField(field.getChildren(), childrenSchemaName);
|
||||
if (childrenSchemaField != null) {
|
||||
break;
|
||||
}
|
||||
@ -449,7 +481,7 @@ public class TopicRepository extends EntityRepository<Topic> {
|
||||
updatedField.setDescription(origField.getDescription());
|
||||
return;
|
||||
}
|
||||
String field = getSchemaField(original, origField, FIELD_DESCRIPTION);
|
||||
String field = EntityUtil.getSchemaField(original, origField, FIELD_DESCRIPTION);
|
||||
recordChange(field, origField.getDescription(), updatedField.getDescription());
|
||||
}
|
||||
|
||||
@ -459,7 +491,7 @@ public class TopicRepository extends EntityRepository<Topic> {
|
||||
updatedField.setDisplayName(origField.getDisplayName());
|
||||
return;
|
||||
}
|
||||
String field = getSchemaField(original, origField, FIELD_DISPLAY_NAME);
|
||||
String field = EntityUtil.getSchemaField(original, origField, FIELD_DISPLAY_NAME);
|
||||
recordChange(field, origField.getDisplayName(), updatedField.getDisplayName());
|
||||
}
|
||||
|
||||
@ -469,7 +501,7 @@ public class TopicRepository extends EntityRepository<Topic> {
|
||||
updatedField.setDataTypeDisplay(origField.getDataTypeDisplay());
|
||||
return;
|
||||
}
|
||||
String field = getSchemaField(original, origField, FIELD_DATA_TYPE_DISPLAY);
|
||||
String field = EntityUtil.getSchemaField(original, origField, FIELD_DATA_TYPE_DISPLAY);
|
||||
recordChange(field, origField.getDataTypeDisplay(), updatedField.getDataTypeDisplay());
|
||||
}
|
||||
}
|
||||
|
@ -536,4 +536,12 @@ public final class EntityUtil {
|
||||
public static boolean isTagTask(TaskType taskType) {
|
||||
return taskType == TaskType.RequestTag || taskType == TaskType.UpdateTag;
|
||||
}
|
||||
|
||||
public static Column findColumn(List<Column> columns, String columnName) {
|
||||
return columns.stream()
|
||||
.filter(c -> c.getName().equals(columnName))
|
||||
.findFirst()
|
||||
.orElseThrow(
|
||||
() -> new IllegalArgumentException(CatalogExceptionMessage.invalidFieldName("column", columnName)));
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package org.openmetadata.service.util;
|
||||
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
|
||||
import static org.openmetadata.service.formatter.util.FormatterUtil.getFormattedMessages;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@ -34,7 +33,7 @@ import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
public final class FeedUtils {
|
||||
private FeedUtils() {}
|
||||
|
||||
public static List<Thread> getThreads(ChangeEvent changeEvent, String loggedInUserName) throws IOException {
|
||||
public static List<Thread> getThreads(ChangeEvent changeEvent, String loggedInUserName) {
|
||||
if (changeEvent == null || changeEvent.getEntity() == null) {
|
||||
return Collections.emptyList(); // Response has no entity to produce change event from
|
||||
}
|
||||
|
@ -33,6 +33,7 @@ import static org.openmetadata.service.exception.CatalogExceptionMessage.ANNOUNC
|
||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.entityNotFound;
|
||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.permissionNotAllowed;
|
||||
import static org.openmetadata.service.resources.EntityResourceTest.C1;
|
||||
import static org.openmetadata.service.resources.EntityResourceTest.USER1;
|
||||
import static org.openmetadata.service.resources.EntityResourceTest.USER_ADDRESS_TAG_LABEL;
|
||||
import static org.openmetadata.service.security.SecurityUtil.authHeaders;
|
||||
import static org.openmetadata.service.security.SecurityUtil.getPrincipalName;
|
||||
@ -392,7 +393,6 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
|
||||
int totalAnnouncementCount = listAnnouncements(null, null, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
|
||||
|
||||
// create two announcements with start time in the future
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
String about = String.format("<#E::%s::%s>", Entity.TABLE, TABLE.getFullyQualifiedName());
|
||||
|
||||
// Create announcement 1
|
||||
@ -445,7 +445,6 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
|
||||
@Test
|
||||
void post_invalidAnnouncement_400() throws IOException {
|
||||
// create two announcements with same start time in the future
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
String about = String.format("<#E::%s::%s>", Entity.TABLE, TABLE.getFullyQualifiedName());
|
||||
AnnouncementDetails announcementDetails = getAnnouncementDetails("1", 3, 5);
|
||||
createAnnouncement(USER.getName(), about, "Announcement One", announcementDetails, USER_AUTH_HEADERS);
|
||||
@ -962,41 +961,38 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
|
||||
|
||||
@Test
|
||||
void list_threadsWithOwnerFilter() throws HttpResponseException {
|
||||
// THREAD is created with TABLE entity in BeforeAll
|
||||
int totalThreadCount = listThreads(null, null, ADMIN_AUTH_HEADERS).getPaging().getTotal();
|
||||
String ownerId = TABLE.getOwner().getId().toString();
|
||||
assertNotNull(ownerId);
|
||||
int user1ThreadCount =
|
||||
listThreadsWithFilter(ownerId, FilterType.OWNER.toString(), USER_AUTH_HEADERS).getPaging().getTotal();
|
||||
int user2ThreadCount =
|
||||
listThreadsWithFilter(USER2.getId().toString(), FilterType.OWNER.toString(), USER_AUTH_HEADERS)
|
||||
.getPaging()
|
||||
.getTotal();
|
||||
String user1 = USER1.getId().toString(); // user1 is the owner of TABLE
|
||||
String user2 = USER2.getId().toString(); // user2 belongs to team2 which owns TABLE2
|
||||
assertNotNull(user1);
|
||||
// Get thread counts for user1 and user2
|
||||
int user1ThreadCount = listThreadsWithFilter(user1, FilterType.OWNER, USER_AUTH_HEADERS).getPaging().getTotal();
|
||||
int user2ThreadCount = listThreadsWithFilter(user2, FilterType.OWNER, USER_AUTH_HEADERS).getPaging().getTotal();
|
||||
|
||||
// create another thread on an entity with a different owner
|
||||
String ownerId2 = TABLE2.getOwner().getId().toString();
|
||||
assertNotNull(ownerId2);
|
||||
// create another thread on an entity with team2 as owner
|
||||
String team2 = TABLE2.getOwner().getId().toString();
|
||||
assertNotEquals(user1, team2);
|
||||
createAndCheck(
|
||||
create().withAbout(String.format("<#E::table::%s>", TABLE2.getFullyQualifiedName())).withFrom(ADMIN_USER_NAME),
|
||||
ADMIN_AUTH_HEADERS);
|
||||
assertNotEquals(ownerId, ownerId2);
|
||||
|
||||
ThreadList threads = listThreadsWithFilter(ownerId, FilterType.OWNER.toString(), USER_AUTH_HEADERS);
|
||||
// user1 thread count remains the same as the newly created thread belongs to team2 and user1 is not part of it
|
||||
ThreadList threads = listThreadsWithFilter(user1, FilterType.OWNER, USER_AUTH_HEADERS);
|
||||
assertEquals(user1ThreadCount, threads.getPaging().getTotal());
|
||||
|
||||
// This should return error since the table is owned by a team
|
||||
// and for the filter we are passing team id instead of user id
|
||||
assertResponse(
|
||||
() -> listThreadsWithFilter(ownerId2, FilterType.OWNER.toString(), USER_AUTH_HEADERS),
|
||||
() -> listThreadsWithFilter(team2, FilterType.OWNER, USER_AUTH_HEADERS),
|
||||
NOT_FOUND,
|
||||
entityNotFound(Entity.USER, ownerId2));
|
||||
entityNotFound(Entity.USER, team2));
|
||||
|
||||
// Now, test the filter with user who is part of the team
|
||||
threads = listThreadsWithFilter(USER2.getId().toString(), FilterType.OWNER.toString(), USER_AUTH_HEADERS);
|
||||
// Now, test the filter with user2 who is part of the team2
|
||||
threads = listThreadsWithFilter(user2, FilterType.OWNER, USER_AUTH_HEADERS);
|
||||
assertEquals(user2ThreadCount + 1, threads.getPaging().getTotal());
|
||||
|
||||
// Test if no user id filter returns all threads
|
||||
threads = listThreadsWithFilter(null, FilterType.OWNER.toString(), USER_AUTH_HEADERS);
|
||||
threads = listThreadsWithFilter(null, FilterType.OWNER, USER_AUTH_HEADERS);
|
||||
assertEquals(totalThreadCount + 1, threads.getPaging().getTotal());
|
||||
}
|
||||
|
||||
@ -1019,8 +1015,7 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
|
||||
ADMIN_AUTH_HEADERS);
|
||||
addPostAndCheck(thread, createPost, ADMIN_AUTH_HEADERS);
|
||||
|
||||
ThreadList threads =
|
||||
listThreadsWithFilter(USER.getId().toString(), FilterType.MENTIONS.toString(), USER_AUTH_HEADERS);
|
||||
ThreadList threads = listThreadsWithFilter(USER.getId().toString(), FilterType.MENTIONS, USER_AUTH_HEADERS);
|
||||
assertEquals(2, threads.getPaging().getTotal());
|
||||
}
|
||||
|
||||
@ -1046,11 +1041,10 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
|
||||
.until(
|
||||
() -> {
|
||||
ThreadList threads =
|
||||
listThreadsWithFilter(USER.getId().toString(), FilterType.FOLLOWS.toString(), USER_AUTH_HEADERS);
|
||||
listThreadsWithFilter(USER.getId().toString(), FilterType.FOLLOWS, USER_AUTH_HEADERS);
|
||||
return threads.getPaging().getTotal().equals(initialThreadCount + 3);
|
||||
});
|
||||
ThreadList threads =
|
||||
listThreadsWithFilter(USER.getId().toString(), FilterType.FOLLOWS.toString(), USER_AUTH_HEADERS);
|
||||
ThreadList threads = listThreadsWithFilter(USER.getId().toString(), FilterType.FOLLOWS, USER_AUTH_HEADERS);
|
||||
assertEquals(initialThreadCount + 3, threads.getPaging().getTotal());
|
||||
assertEquals(initialThreadCount + 3, threads.getData().size());
|
||||
assertEquals(
|
||||
@ -1059,7 +1053,7 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
|
||||
assertEquals("Message 2", threads.getData().get(1).getMessage());
|
||||
|
||||
// Filter by follows for another user should return 0 threads
|
||||
threads = listThreadsWithFilter(USER2.getId().toString(), FilterType.FOLLOWS.toString(), USER_AUTH_HEADERS);
|
||||
threads = listThreadsWithFilter(USER2.getId().toString(), FilterType.FOLLOWS, USER_AUTH_HEADERS);
|
||||
assertEquals(0, threads.getPaging().getTotal());
|
||||
assertEquals(0, threads.getData().size());
|
||||
}
|
||||
@ -1386,6 +1380,11 @@ public class FeedResourceTest extends OpenMetadataApplicationTest {
|
||||
TestUtils.put(target, userId, OK, authHeaders);
|
||||
}
|
||||
|
||||
public ThreadList listThreadsWithFilter(String userId, FilterType filterType, Map<String, String> authHeaders)
|
||||
throws HttpResponseException {
|
||||
return listThreadsWithFilter(userId, filterType.toString(), authHeaders);
|
||||
}
|
||||
|
||||
public ThreadList listThreadsWithFilter(String userId, String filterType, Map<String, String> authHeaders)
|
||||
throws HttpResponseException {
|
||||
WebTarget target = getResource("feed");
|
||||
|
Loading…
x
Reference in New Issue
Block a user