Fixes #11976 Add support for suggest and request description and tags for all the entities (#11977)

This commit is contained in:
Suresh Srinivas 2023-06-13 12:45:07 -07:00 committed by GitHub
parent c96e239b36
commit 7d9b27b996
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 189 additions and 239 deletions

View File

@ -22,6 +22,8 @@ import org.openmetadata.schema.entity.teams.Team;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.TaskType;
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.service.util.JsonUtils;
public final class CatalogExceptionMessage {
@ -112,6 +114,10 @@ public final class CatalogExceptionMessage {
return String.format("Invalid fully qualified column name %s", fqn);
}
public static String invalidFieldName(String fieldType, String fieldName) {
return String.format("Invalid %s name %s", fieldType, fieldName);
}
public static String entityVersionNotFound(String entityType, UUID id, Double version) {
return String.format("%s instance for %s and version %s not found", entityType, id, version);
}
@ -216,4 +222,12 @@ public final class CatalogExceptionMessage {
return String.format(
"Failed to publish event %s to %s due to %s ", JsonUtils.pojoToJson(event), type.value(), message);
}
public static String invalidTaskField(EntityLink entityLink, TaskType taskType) {
return String.format("The Entity link with no field name - %s is not supported for %s task.", entityLink, taskType);
}
public static String invalidFieldForTask(String fieldName, TaskType type) {
return String.format("The field name %s is not supported for %s task.", fieldName, type);
}
}

View File

@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
public class CatalogGenericExceptionMapper implements ExceptionMapper<Throwable> {
@Override
public Response toResponse(Throwable ex) {
ex.printStackTrace();
LOG.debug(ex.getMessage());
if (ex instanceof ProcessingException
|| ex instanceof IllegalArgumentException

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import javax.json.JsonPatch;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.data.DashboardDataModel;
@ -30,12 +31,16 @@ import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.TaskDetails;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.resources.databases.DatabaseUtil;
import org.openmetadata.service.resources.datamodels.DashboardDataModelResource;
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
@Slf4j
public class DashboardDataModelRepository extends EntityRepository<DashboardDataModel> {
@ -63,6 +68,34 @@ public class DashboardDataModelRepository extends EntityRepository<DashboardData
ColumnUtil.setColumnFQN(dashboardDataModel.getFullyQualifiedName(), dashboardDataModel.getColumns());
}
@Override
public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) throws IOException {
if (entityLink.getFieldName().equals("columns")) {
DashboardDataModel dashboardDataModel =
getByName(null, entityLink.getEntityFQN(), getFields("columns,tags"), Include.ALL);
String origJson = JsonUtils.pojoToJson(dashboardDataModel);
Column column =
dashboardDataModel.getColumns().stream()
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
.findFirst()
.orElseThrow(
() ->
new IllegalArgumentException(
CatalogExceptionMessage.invalidFieldName("column", entityLink.getArrayFieldName())));
if (EntityUtil.isDescriptionTask(task.getType())) {
column.setDescription(newValue);
} else if (EntityUtil.isTagTask(task.getType())) {
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
column.setTags(tags);
}
String updatedEntityJson = JsonUtils.pojoToJson(dashboardDataModel);
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
patch(null, dashboardDataModel.getId(), user, patch);
return;
}
super.update(task, entityLink, newValue, user);
}
@Override
public void prepare(DashboardDataModel dashboardDataModel) throws IOException {
DashboardService dashboardService = Entity.getEntity(dashboardDataModel.getService(), "", Include.ALL);

View File

@ -14,19 +14,24 @@
package org.openmetadata.service.jdbi3;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
import static org.openmetadata.service.Entity.FIELD_FOLLOWERS;
import static org.openmetadata.service.Entity.FIELD_TAGS;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.Dashboard;
import org.openmetadata.schema.entity.services.DashboardService;
import org.openmetadata.schema.type.*;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.openmetadata.service.resources.dashboards.DashboardResource;
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.FullyQualifiedName;
@ -53,6 +58,30 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
dashboard.setFullyQualifiedName(FullyQualifiedName.add(dashboard.getService().getName(), dashboard.getName()));
}
@Override
public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) throws IOException {
if (entityLink.getFieldName().equals("charts")) {
Dashboard dashboard = getByName(null, entityLink.getEntityFQN(), getFields("charts,tags"), Include.ALL);
EntityReference chart =
dashboard.getCharts().stream()
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
.findFirst()
.orElseThrow(
() ->
new IllegalArgumentException(
CatalogExceptionMessage.invalidFieldName("chart", entityLink.getArrayFieldName())));
String fieldName =
EntityUtil.isDescriptionTask(task.getType())
? FIELD_DESCRIPTION
: EntityUtil.isTagTask(task.getType()) ? FIELD_TAGS : "invalidField";
EntityLink chartLink = new EntityLink(Entity.CHART, chart.getFullyQualifiedName(), fieldName, null, null);
EntityRepository<? extends EntityInterface> chartRepository = Entity.getEntityRepository(Entity.CHART);
chartRepository.update(task, chartLink, newValue, user);
return;
}
super.update(task, entityLink, newValue, user);
}
@Override
public Dashboard setFields(Dashboard dashboard, Fields fields) throws IOException {
dashboard.setService(getContainer(dashboard.getId()));

View File

@ -92,6 +92,8 @@ import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.TagLabel.TagSource;
import org.openmetadata.schema.type.TaskDetails;
import org.openmetadata.schema.type.TaskType;
import org.openmetadata.schema.type.Votes;
import org.openmetadata.schema.type.csv.CsvImportResult;
import org.openmetadata.service.Entity;
@ -103,6 +105,7 @@ import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityVersionPair;
import org.openmetadata.service.jdbi3.CollectionDAO.ExtensionRecord;
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.service.resources.tags.TagLabelCache;
import org.openmetadata.service.security.policyevaluator.SubjectCache;
import org.openmetadata.service.util.EntityUtil;
@ -265,6 +268,26 @@ public abstract class EntityRepository<T extends EntityInterface> {
entity.setFullyQualifiedName(entity.getName());
}
/** Update an entity based suggested description and tags in the task */
public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) throws IOException {
TaskType taskType = task.getType();
T entity = getByName(null, entityLink.getEntityFQN(), getFields("tags"), Include.ALL);
String origJson = JsonUtils.pojoToJson(entity);
if (EntityUtil.isDescriptionTask(taskType) && entityLink.getFieldName().equals(FIELD_DESCRIPTION)) {
entity.setDescription(newValue);
} else if (supportsTags && EntityUtil.isTagTask(taskType) && entityLink.getFieldName().equals("tags")) {
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
entity.setTags(tags);
} else {
// Not supported
throw new IllegalArgumentException(
CatalogExceptionMessage.invalidFieldForTask(entityLink.getFieldName(), task.getType()));
}
String updatedEntityJson = JsonUtils.pojoToJson(entity);
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
patch(null, entity.getId(), user, patch);
}
/**
* Initialize data from json files if seed data does not exist in corresponding tables. Seed data is stored under
* openmetadata-service/src/main/resources/json/data/{entityType}

View File

@ -20,13 +20,6 @@ import static org.openmetadata.schema.type.Relationship.ADDRESSED_TO;
import static org.openmetadata.schema.type.Relationship.CREATED;
import static org.openmetadata.schema.type.Relationship.IS_ABOUT;
import static org.openmetadata.schema.type.Relationship.REPLIED_TO;
import static org.openmetadata.service.Entity.DASHBOARD;
import static org.openmetadata.service.Entity.DASHBOARD_DATA_MODEL;
import static org.openmetadata.service.Entity.DATABASE_SCHEMA;
import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
import static org.openmetadata.service.Entity.PIPELINE;
import static org.openmetadata.service.Entity.TABLE;
import static org.openmetadata.service.Entity.TOPIC;
import static org.openmetadata.service.Entity.getEntityRepository;
import static org.openmetadata.service.exception.CatalogExceptionMessage.ANNOUNCEMENT_INVALID_START_TIME;
import static org.openmetadata.service.exception.CatalogExceptionMessage.ANNOUNCEMENT_OVERLAP;
@ -49,7 +42,6 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.json.JsonPatch;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.SecurityContext;
@ -64,23 +56,15 @@ import org.openmetadata.schema.api.feed.CloseTask;
import org.openmetadata.schema.api.feed.EntityLinkThreadCount;
import org.openmetadata.schema.api.feed.ResolveTask;
import org.openmetadata.schema.api.feed.ThreadCount;
import org.openmetadata.schema.entity.data.Dashboard;
import org.openmetadata.schema.entity.data.DashboardDataModel;
import org.openmetadata.schema.entity.data.DatabaseSchema;
import org.openmetadata.schema.entity.data.Pipeline;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.data.Topic;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.type.AnnouncementDetails;
import org.openmetadata.schema.type.Column;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Post;
import org.openmetadata.schema.type.Reaction;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.Task;
import org.openmetadata.schema.type.TaskDetails;
import org.openmetadata.schema.type.TaskStatus;
import org.openmetadata.schema.type.TaskType;
@ -90,6 +74,7 @@ import org.openmetadata.service.ResourceRegistry;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.formatter.decorators.FeedMessageDecorator;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.util.FeedMessage;
import org.openmetadata.service.resources.feeds.FeedResource;
import org.openmetadata.service.resources.feeds.FeedUtil;
import org.openmetadata.service.resources.feeds.MessageParser;
@ -105,10 +90,8 @@ import org.openmetadata.service.util.ResultList;
@Slf4j
public class FeedRepository {
private static final String UNSUPPORTED_FIELD_NAME_FOR_TASK = "The field name %s is not supported for %s task.";
private final CollectionDAO dao;
private static MessageDecorator feedMessageFormatter = new FeedMessageDecorator();
private static final MessageDecorator<FeedMessage> feedMessageFormatter = new FeedMessageDecorator();
public FeedRepository(CollectionDAO dao) {
this.dao = dao;
@ -145,9 +128,9 @@ public class FeedRepository {
// Validate user creating the thread
User createdByUser = SubjectCache.getInstance().getUser(thread.getCreatedBy());
if (thread.getType().equals(ThreadType.Task)) {
if (thread.getType() == ThreadType.Task) {
thread.withTask(thread.getTask().withId(getNextTaskId())); // Assign taskId for a task
} else if (thread.getType().equals(ThreadType.Announcement)) {
} else if (thread.getType() == ThreadType.Announcement) {
// Validate start and end time for announcement
validateAnnouncement(thread.getAnnouncement());
long startTime = thread.getAnnouncement().getStartTime();
@ -209,222 +192,14 @@ public class FeedRepository {
return new PatchResponse<>(Status.OK, updatedHref, RestUtil.ENTITY_UPDATED);
}
private void performTask(
TaskDetails task, EntityLink entityLink, EntityReference reference, UriInfo uriInfo, String newValue, String user)
throws IOException {
TaskType taskType = task.getType();
List<TaskType> descriptionTasks = List.of(TaskType.RequestDescription, TaskType.UpdateDescription);
List<TaskType> tagTasks = List.of(TaskType.RequestTag, TaskType.UpdateTag);
List<TaskType> supportedTasks =
Stream.concat(descriptionTasks.stream(), tagTasks.stream()).collect(Collectors.toList());
// task needs to be completed only for Request or update description or tags.
if (supportedTasks.contains(taskType)) {
EntityRepository<?> repository = getEntityRepository(reference.getType());
String json = repository.dao.findJsonByFqn(entityLink.getEntityFQN(), Include.ALL);
switch (entityLink.getEntityType()) {
case TABLE:
Table table = JsonUtils.readValue(json, Table.class);
String oldJson = JsonUtils.pojoToJson(table);
if (entityLink.getFieldName() != null) {
if (entityLink.getFieldName().equals("columns")) {
Optional<Column> col =
table.getColumns().stream()
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
.findFirst();
if (col.isPresent()) {
Column column = col.get();
if (descriptionTasks.contains(taskType)) {
column.setDescription(newValue);
} else if (tagTasks.contains(taskType)) {
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
column.setTags(tags);
}
} else {
throw new IllegalArgumentException(
String.format(
"The Column with name '%s' is not found in the table.", entityLink.getArrayFieldName()));
}
} else if (descriptionTasks.contains(taskType) && entityLink.getFieldName().equals(FIELD_DESCRIPTION)) {
table.setDescription(newValue);
} else if (tagTasks.contains(taskType) && entityLink.getFieldName().equals("tags")) {
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
table.setTags(tags);
} else {
// Not supported
throw new IllegalArgumentException(
String.format(UNSUPPORTED_FIELD_NAME_FOR_TASK, entityLink.getFieldName(), task.getType()));
}
} else {
// Not supported
throw new IllegalArgumentException(
String.format(
"The Entity link with no field name - %s is not supported for %s task.",
entityLink, task.getType()));
}
String updatedEntityJson = JsonUtils.pojoToJson(table);
JsonPatch patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
repository.patch(uriInfo, table.getId(), user, patch);
break;
case TOPIC:
Topic topic = JsonUtils.readValue(json, Topic.class);
oldJson = JsonUtils.pojoToJson(topic);
if (descriptionTasks.contains(taskType) && entityLink.getFieldName().equals(FIELD_DESCRIPTION)) {
topic.setDescription(newValue);
} else if (tagTasks.contains(taskType) && entityLink.getFieldName().equals("tags")) {
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
topic.setTags(tags);
} else {
// Not supported
throw new IllegalArgumentException(
String.format(UNSUPPORTED_FIELD_NAME_FOR_TASK, entityLink.getFieldName(), task.getType()));
}
updatedEntityJson = JsonUtils.pojoToJson(topic);
patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
repository.patch(uriInfo, topic.getId(), user, patch);
break;
case DASHBOARD:
Dashboard dashboard = JsonUtils.readValue(json, Dashboard.class);
oldJson = JsonUtils.pojoToJson(dashboard);
if (descriptionTasks.contains(taskType) && entityLink.getFieldName().equals(FIELD_DESCRIPTION)) {
dashboard.setDescription(newValue);
} else if (entityLink.getFieldName().equals("charts")) {
Optional<EntityReference> ch =
dashboard.getCharts().stream()
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
.findFirst();
if (ch.isPresent()) {
EntityReference chart = ch.get();
if (descriptionTasks.contains(taskType)) {
chart.setDescription(newValue);
}
} else {
throw new IllegalArgumentException(
String.format(
"The Chart with name '%s' is not found in the dashboard.", entityLink.getArrayFieldName()));
}
} else {
// Not supported
throw new IllegalArgumentException(
String.format(UNSUPPORTED_FIELD_NAME_FOR_TASK, entityLink.getFieldName(), task.getType()));
}
updatedEntityJson = JsonUtils.pojoToJson(dashboard);
patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
repository.patch(uriInfo, dashboard.getId(), user, patch);
break;
case DASHBOARD_DATA_MODEL:
DashboardDataModel dashboardDataModel = JsonUtils.readValue(json, DashboardDataModel.class);
oldJson = JsonUtils.pojoToJson(dashboardDataModel);
if (entityLink.getFieldName() != null) {
if (entityLink.getFieldName().equals("columns")) {
Optional<Column> col =
dashboardDataModel.getColumns().stream()
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
.findFirst();
if (col.isPresent()) {
Column column = col.get();
if (descriptionTasks.contains(taskType)) {
column.setDescription(newValue);
} else if (tagTasks.contains(taskType)) {
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
column.setTags(tags);
}
} else {
throw new IllegalArgumentException(
String.format(
"The Column with name '%s' is not found in the dashboard data model.",
entityLink.getArrayFieldName()));
}
} else if (descriptionTasks.contains(taskType) && entityLink.getFieldName().equals(FIELD_DESCRIPTION)) {
dashboardDataModel.setDescription(newValue);
} else if (tagTasks.contains(taskType) && entityLink.getFieldName().equals("tags")) {
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
dashboardDataModel.setTags(tags);
} else {
// Not supported
throw new IllegalArgumentException(
String.format(UNSUPPORTED_FIELD_NAME_FOR_TASK, entityLink.getFieldName(), task.getType()));
}
} else {
// Not supported
throw new IllegalArgumentException(
String.format(
"The Entity link with no field name - %s is not supported for %s task.",
entityLink, task.getType()));
}
updatedEntityJson = JsonUtils.pojoToJson(dashboardDataModel);
patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
repository.patch(uriInfo, dashboardDataModel.getId(), user, patch);
break;
case PIPELINE:
Pipeline pipeline = JsonUtils.readValue(json, Pipeline.class);
oldJson = JsonUtils.pojoToJson(pipeline);
if (descriptionTasks.contains(taskType) && entityLink.getFieldName().equals(FIELD_DESCRIPTION)) {
pipeline.setDescription(newValue);
} else if (entityLink.getFieldName().equals("tasks")) {
Optional<Task> tsk =
pipeline.getTasks().stream()
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
.findFirst();
if (tsk.isPresent()) {
Task pipelineTask = tsk.get();
if (descriptionTasks.contains(taskType)) {
pipelineTask.setDescription(newValue);
} else if (tagTasks.contains(taskType)) {
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
pipelineTask.setTags(tags);
}
} else {
throw new IllegalArgumentException(
String.format(
"The Task with name '%s' is not found in the pipeline.", entityLink.getArrayFieldName()));
}
} else {
// Not supported
throw new IllegalArgumentException(
String.format(UNSUPPORTED_FIELD_NAME_FOR_TASK, entityLink.getFieldName(), task.getType()));
}
updatedEntityJson = JsonUtils.pojoToJson(pipeline);
patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
repository.patch(uriInfo, pipeline.getId(), user, patch);
break;
case DATABASE_SCHEMA:
DatabaseSchema databaseSchema = JsonUtils.readValue(json, DatabaseSchema.class);
oldJson = JsonUtils.pojoToJson(databaseSchema);
if (entityLink.getFieldName() != null) {
if (descriptionTasks.contains(taskType) && entityLink.getFieldName().equals(FIELD_DESCRIPTION)) {
databaseSchema.setDescription(newValue);
} else if (tagTasks.contains(taskType) && entityLink.getFieldName().equals("tags")) {
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
databaseSchema.setTags(tags);
} else {
// Not supported
throw new IllegalArgumentException(
String.format(UNSUPPORTED_FIELD_NAME_FOR_TASK, entityLink.getFieldName(), task.getType()));
}
} else {
// Not supported
throw new IllegalArgumentException(
String.format(
"The Entity link with no field name - %s is not supported for %s task.",
entityLink, task.getType()));
}
updatedEntityJson = JsonUtils.pojoToJson(databaseSchema);
patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
repository.patch(uriInfo, databaseSchema.getId(), user, patch);
break;
default:
throw new IllegalArgumentException("Task is not supported for the Data Asset.");
}
}
}
public PatchResponse<Thread> resolveTask(UriInfo uriInfo, Thread thread, String user, ResolveTask resolveTask)
throws IOException {
// perform the task
TaskDetails task = thread.getTask();
EntityLink about = EntityLink.parse(thread.getAbout());
EntityReference aboutRef = EntityUtil.validateEntityLink(about);
performTask(task, about, aboutRef, uriInfo, resolveTask.getNewValue(), user);
EntityRepository<?> repository = getEntityRepository(aboutRef.getType());
repository.update(task, about, resolveTask.getNewValue(), user);
// Update the attributes
task.withNewValue(resolveTask.getNewValue());
@ -921,9 +696,7 @@ public class FeedRepository {
}
private void sortPostsInThreads(List<Thread> threads) {
for (Thread t : threads) {
sortPosts(t);
}
threads.forEach(this::sortPosts);
}
/** Limit the number of posts within each thread to the requested limitPosts. */

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.json.JsonPatch;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.Pipeline;
@ -34,8 +35,11 @@ import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.Status;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.Task;
import org.openmetadata.schema.type.TaskDetails;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.service.resources.pipelines.PipelineResource;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
@ -66,6 +70,33 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
setTaskFQN(pipeline.getFullyQualifiedName(), pipeline.getTasks());
}
@Override
public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) throws IOException {
if (entityLink.getFieldName().equals("tasks")) {
Pipeline pipeline = getByName(null, entityLink.getEntityFQN(), getFields("tasks,tags"), Include.ALL);
String oldJson = JsonUtils.pojoToJson(pipeline);
Task pipelineTask =
pipeline.getTasks().stream()
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
.findFirst()
.orElseThrow(
() ->
new IllegalArgumentException(
CatalogExceptionMessage.invalidFieldName("task", entityLink.getArrayFieldName())));
if (EntityUtil.isDescriptionTask(task.getType())) {
pipelineTask.setDescription(newValue);
} else if (EntityUtil.isTagTask(task.getType())) {
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
pipelineTask.setTags(tags);
}
String updatedEntityJson = JsonUtils.pojoToJson(pipeline);
JsonPatch patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
patch(null, pipeline.getId(), user, patch);
return;
}
super.update(task, entityLink, newValue, user);
}
@Override
public Pipeline setFields(Pipeline pipeline, Fields fields) throws IOException {
pipeline.setService(getContainer(pipeline.getId()));

View File

@ -41,6 +41,7 @@ import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.json.JsonPatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
@ -69,10 +70,14 @@ import org.openmetadata.schema.type.TableJoins;
import org.openmetadata.schema.type.TableProfile;
import org.openmetadata.schema.type.TableProfilerConfig;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.TaskDetails;
import org.openmetadata.schema.type.TaskType;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.resources.databases.DatabaseUtil;
import org.openmetadata.service.resources.databases.TableResource;
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.FullyQualifiedName;
@ -259,10 +264,9 @@ public class TableRepository extends EntityRepository<Table> {
entityRelationshipRecords.stream()
.filter(entityRelationshipRecord -> entityRelationshipRecord.getType().equals(Entity.TEST_SUITE))
.findFirst();
if (!testSuiteRelationshipRecord.isEmpty()) {
return getEntity(Entity.TEST_SUITE, testSuiteRelationshipRecord.get().getId(), "*", Include.ALL);
}
return null;
return testSuiteRelationshipRecord.isPresent()
? getEntity(Entity.TEST_SUITE, testSuiteRelationshipRecord.get().getId(), "*", Include.ALL)
: null;
}
@Transaction
@ -733,6 +737,36 @@ public class TableRepository extends EntityRepository<Table> {
return allTags;
}
@Override
public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) throws IOException {
// TODO move this as the first check
validateEntityLinkFieldExists(entityLink, task.getType());
if (entityLink.getFieldName().equals("columns")) {
Table table = getByName(null, entityLink.getEntityFQN(), getFields("columns,tags"), Include.ALL);
Column column =
table.getColumns().stream()
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
.findFirst()
.orElseThrow(
() ->
new IllegalArgumentException(
CatalogExceptionMessage.invalidFieldName("column", entityLink.getArrayFieldName())));
String origJson = JsonUtils.pojoToJson(table);
if (EntityUtil.isDescriptionTask(task.getType())) {
column.setDescription(newValue);
} else if (EntityUtil.isTagTask(task.getType())) {
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
column.setTags(tags);
}
String updatedEntityJson = JsonUtils.pojoToJson(table);
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
patch(null, table.getId(), user, patch);
return;
}
super.update(task, entityLink, newValue, user);
}
private void getColumnTags(boolean setTags, List<Column> columns) {
for (Column c : listOrEmpty(columns)) {
c.setTags(setTags ? getTags(c.getFullyQualifiedName()) : null);
@ -948,6 +982,12 @@ public class TableRepository extends EntityRepository<Table> {
}
}
private void validateEntityLinkFieldExists(EntityLink entityLink, TaskType taskType) {
if (entityLink.getFieldName() == null) {
throw new IllegalArgumentException(CatalogExceptionMessage.invalidTaskField(entityLink, taskType));
}
}
/** Handles entity updated from PUT and POST operation. */
public class TableUpdater extends ColumnEntityUpdater {
public TableUpdater(Table original, Table updated, Operation operation) {

View File

@ -514,4 +514,12 @@ public final class EntityUtil {
}
}
}
public static boolean isDescriptionTask(TaskType taskType) {
return taskType == TaskType.RequestDescription || taskType == TaskType.UpdateDescription;
}
public static boolean isTagTask(TaskType taskType) {
return taskType == TaskType.RequestTag || taskType == TaskType.UpdateTag;
}
}