mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-25 17:04:54 +00:00
This commit is contained in:
parent
ce49247011
commit
9403b146fa
@ -34,6 +34,7 @@ import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import javax.json.JsonPatch;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
@ -64,9 +65,11 @@ import org.openmetadata.catalog.type.Include;
|
||||
import org.openmetadata.catalog.type.Post;
|
||||
import org.openmetadata.catalog.type.Reaction;
|
||||
import org.openmetadata.catalog.type.Relationship;
|
||||
import org.openmetadata.catalog.type.TagLabel;
|
||||
import org.openmetadata.catalog.type.Task;
|
||||
import org.openmetadata.catalog.type.TaskDetails;
|
||||
import org.openmetadata.catalog.type.TaskStatus;
|
||||
import org.openmetadata.catalog.type.TaskType;
|
||||
import org.openmetadata.catalog.type.ThreadType;
|
||||
import org.openmetadata.catalog.util.EntityUtil;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
@ -177,129 +180,148 @@ public class FeedRepository {
|
||||
return new PatchResponse<>(Status.OK, updatedHref, RestUtil.ENTITY_UPDATED);
|
||||
}
|
||||
|
||||
private void performTask(
|
||||
TaskDetails task, EntityLink entityLink, EntityReference reference, UriInfo uriInfo, String newValue, String user)
|
||||
throws IOException {
|
||||
TaskType taskType = task.getType();
|
||||
List<TaskType> descriptionTasks = List.of(TaskType.RequestDescription, TaskType.UpdateDescription);
|
||||
List<TaskType> tagTasks = List.of(TaskType.RequestTag, TaskType.UpdateTag);
|
||||
List<TaskType> supportedTasks =
|
||||
Stream.concat(descriptionTasks.stream(), tagTasks.stream()).collect(Collectors.toList());
|
||||
// task needs to be completed only for Request or update description or tags.
|
||||
if (supportedTasks.contains(taskType)) {
|
||||
EntityRepository<?> repository = getEntityRepository(reference.getType());
|
||||
String json = repository.dao.findJsonByFqn(entityLink.getEntityFQN(), Include.ALL);
|
||||
switch (entityLink.getEntityType()) {
|
||||
case TABLE:
|
||||
Table table = JsonUtils.readValue(json, Table.class);
|
||||
String oldJson = JsonUtils.pojoToJson(table);
|
||||
if (entityLink.getFieldName().equals("columns")) {
|
||||
Optional<Column> col =
|
||||
table.getColumns().stream().filter(c -> c.getName().equals(entityLink.getArrayFieldName())).findFirst();
|
||||
if (col.isPresent()) {
|
||||
Column column = col.get();
|
||||
if (descriptionTasks.contains(taskType)) {
|
||||
column.setDescription(newValue);
|
||||
} else if (tagTasks.contains(taskType)) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
column.setTags(tags);
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The Column with name '%s' is not found in the table.", entityLink.getArrayFieldName()));
|
||||
}
|
||||
} else if (descriptionTasks.contains(taskType) && entityLink.getFieldName().equals("description")) {
|
||||
table.setDescription(newValue);
|
||||
} else if (tagTasks.contains(taskType) && entityLink.getFieldName().equals("tags")) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
table.setTags(tags);
|
||||
} else {
|
||||
// Not supported
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The field name %s is not supported for %s task.", entityLink.getFieldName(), task.getType()));
|
||||
}
|
||||
String updatedEntityJson = JsonUtils.pojoToJson(table);
|
||||
JsonPatch patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
|
||||
repository.patch(uriInfo, table.getId(), user, patch);
|
||||
break;
|
||||
case TOPIC:
|
||||
Topic topic = JsonUtils.readValue(json, Topic.class);
|
||||
oldJson = JsonUtils.pojoToJson(topic);
|
||||
if (descriptionTasks.contains(taskType) && entityLink.getFieldName().equals("description")) {
|
||||
topic.setDescription(newValue);
|
||||
} else if (tagTasks.contains(taskType) && entityLink.getFieldName().equals("tags")) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
topic.setTags(tags);
|
||||
} else {
|
||||
// Not supported
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The field name %s is not supported for %s task.", entityLink.getFieldName(), task.getType()));
|
||||
}
|
||||
updatedEntityJson = JsonUtils.pojoToJson(topic);
|
||||
patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
|
||||
repository.patch(uriInfo, topic.getId(), user, patch);
|
||||
break;
|
||||
case DASHBOARD:
|
||||
Dashboard dashboard = JsonUtils.readValue(json, Dashboard.class);
|
||||
oldJson = JsonUtils.pojoToJson(dashboard);
|
||||
if (descriptionTasks.contains(taskType) && entityLink.getFieldName().equals("description")) {
|
||||
dashboard.setDescription(newValue);
|
||||
} else if (entityLink.getFieldName().equals("charts")) {
|
||||
Optional<EntityReference> ch =
|
||||
dashboard.getCharts().stream()
|
||||
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
|
||||
.findFirst();
|
||||
if (ch.isPresent()) {
|
||||
EntityReference chart = ch.get();
|
||||
if (descriptionTasks.contains(taskType)) {
|
||||
chart.setDescription(newValue);
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The Chart with name '%s' is not found in the dashboard.", entityLink.getArrayFieldName()));
|
||||
}
|
||||
} else {
|
||||
// Not supported
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The field name %s is not supported for %s task.", entityLink.getFieldName(), task.getType()));
|
||||
}
|
||||
updatedEntityJson = JsonUtils.pojoToJson(dashboard);
|
||||
patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
|
||||
repository.patch(uriInfo, dashboard.getId(), user, patch);
|
||||
break;
|
||||
case PIPELINE:
|
||||
Pipeline pipeline = JsonUtils.readValue(json, Pipeline.class);
|
||||
oldJson = JsonUtils.pojoToJson(pipeline);
|
||||
if (descriptionTasks.contains(taskType) && entityLink.getFieldName().equals("description")) {
|
||||
pipeline.setDescription(newValue);
|
||||
} else if (entityLink.getFieldName().equals("tasks")) {
|
||||
Optional<Task> tsk =
|
||||
pipeline.getTasks().stream()
|
||||
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
|
||||
.findFirst();
|
||||
if (tsk.isPresent()) {
|
||||
Task pipelineTask = tsk.get();
|
||||
if (descriptionTasks.contains(taskType)) {
|
||||
pipelineTask.setDescription(newValue);
|
||||
} else if (tagTasks.contains(taskType)) {
|
||||
List<TagLabel> tags = JsonUtils.readObjects(newValue, TagLabel.class);
|
||||
pipelineTask.setTags(tags);
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The Task with name '%s' is not found in the pipeline.", entityLink.getArrayFieldName()));
|
||||
}
|
||||
} else {
|
||||
// Not supported
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The field name %s is not supported for %s task.", entityLink.getFieldName(), task.getType()));
|
||||
}
|
||||
updatedEntityJson = JsonUtils.pojoToJson(pipeline);
|
||||
patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
|
||||
repository.patch(uriInfo, pipeline.getId(), user, patch);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public PatchResponse<Thread> resolveTask(UriInfo uriInfo, Thread thread, String user, ResolveTask resolveTask)
|
||||
throws IOException {
|
||||
// perform the task
|
||||
TaskDetails task = thread.getTask();
|
||||
EntityLink entityLink = EntityLink.parse(thread.getAbout());
|
||||
EntityReference reference = EntityUtil.validateEntityLink(entityLink);
|
||||
switch (task.getType()) {
|
||||
case RequestDescription:
|
||||
case UpdateDescription:
|
||||
EntityRepository<?> repository = getEntityRepository(reference.getType());
|
||||
String json = repository.dao.findJsonByFqn(entityLink.getEntityFQN(), Include.ALL);
|
||||
switch (entityLink.getEntityType()) {
|
||||
case TABLE:
|
||||
Table table = JsonUtils.readValue(json, Table.class);
|
||||
String oldJson = JsonUtils.pojoToJson(table);
|
||||
if (entityLink.getFieldName().equals("columns")) {
|
||||
Optional<Column> col =
|
||||
table.getColumns().stream()
|
||||
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
|
||||
.findFirst();
|
||||
if (col.isPresent()) {
|
||||
Column column = col.get();
|
||||
column.setDescription(resolveTask.getNewValue());
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The Column with name '%s' is not found in the table.", entityLink.getArrayFieldName()));
|
||||
}
|
||||
} else if (entityLink.getFieldName().equals("description")) {
|
||||
table.setDescription(resolveTask.getNewValue());
|
||||
} else {
|
||||
// Not supported
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The field name %s is not supported for %s task.", entityLink.getFieldName(), task.getType()));
|
||||
}
|
||||
String updatedEntityJson = JsonUtils.pojoToJson(table);
|
||||
JsonPatch patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
|
||||
repository.patch(uriInfo, table.getId(), user, patch);
|
||||
break;
|
||||
case TOPIC:
|
||||
Topic topic = JsonUtils.readValue(json, Topic.class);
|
||||
oldJson = JsonUtils.pojoToJson(topic);
|
||||
if (entityLink.getFieldName().equals("description")) {
|
||||
topic.setDescription(resolveTask.getNewValue());
|
||||
} else {
|
||||
// Not supported
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The field name %s is not supported for %s task.", entityLink.getFieldName(), task.getType()));
|
||||
}
|
||||
updatedEntityJson = JsonUtils.pojoToJson(topic);
|
||||
patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
|
||||
repository.patch(uriInfo, topic.getId(), user, patch);
|
||||
break;
|
||||
case DASHBOARD:
|
||||
Dashboard dashboard = JsonUtils.readValue(json, Dashboard.class);
|
||||
oldJson = JsonUtils.pojoToJson(dashboard);
|
||||
if (entityLink.getFieldName().equals("description")) {
|
||||
dashboard.setDescription(resolveTask.getNewValue());
|
||||
} else if (entityLink.getFieldName().equals("charts")) {
|
||||
Optional<EntityReference> ch =
|
||||
dashboard.getCharts().stream()
|
||||
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
|
||||
.findFirst();
|
||||
if (ch.isPresent()) {
|
||||
EntityReference chart = ch.get();
|
||||
chart.setDescription(resolveTask.getNewValue());
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The Chart with name '%s' is not found in the dashboard.", entityLink.getArrayFieldName()));
|
||||
}
|
||||
} else {
|
||||
// Not supported
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The field name %s is not supported for %s task.", entityLink.getFieldName(), task.getType()));
|
||||
}
|
||||
updatedEntityJson = JsonUtils.pojoToJson(dashboard);
|
||||
patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
|
||||
repository.patch(uriInfo, dashboard.getId(), user, patch);
|
||||
break;
|
||||
case PIPELINE:
|
||||
Pipeline pipeline = JsonUtils.readValue(json, Pipeline.class);
|
||||
oldJson = JsonUtils.pojoToJson(pipeline);
|
||||
if (entityLink.getFieldName().equals("description")) {
|
||||
pipeline.setDescription(resolveTask.getNewValue());
|
||||
} else if (entityLink.getFieldName().equals("tasks")) {
|
||||
Optional<Task> tsk =
|
||||
pipeline.getTasks().stream()
|
||||
.filter(c -> c.getName().equals(entityLink.getArrayFieldName()))
|
||||
.findFirst();
|
||||
if (tsk.isPresent()) {
|
||||
Task pipelineTask = tsk.get();
|
||||
pipelineTask.setDescription(resolveTask.getNewValue());
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The Task with name '%s' is not found in the pipeline.", entityLink.getArrayFieldName()));
|
||||
}
|
||||
} else {
|
||||
// Not supported
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"The field name %s is not supported for %s task.", entityLink.getFieldName(), task.getType()));
|
||||
}
|
||||
updatedEntityJson = JsonUtils.pojoToJson(pipeline);
|
||||
patch = JsonUtils.getJsonPatch(oldJson, updatedEntityJson);
|
||||
repository.patch(uriInfo, pipeline.getId(), user, patch);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case RequestTag:
|
||||
case UpdateTag:
|
||||
break;
|
||||
case Generic:
|
||||
default:
|
||||
// no action to be taken. Just close the task
|
||||
break;
|
||||
}
|
||||
performTask(task, entityLink, reference, uriInfo, resolveTask.getNewValue(), user);
|
||||
|
||||
// Update the attributes
|
||||
task.withNewValue(resolveTask.getNewValue());
|
||||
closeTask(thread, user);
|
||||
|
@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
|
||||
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.noPermission;
|
||||
import static org.openmetadata.catalog.resources.EntityResourceTest.USER_ADDRESS_TAG_LABEL;
|
||||
import static org.openmetadata.catalog.security.SecurityUtil.authHeaders;
|
||||
import static org.openmetadata.catalog.security.SecurityUtil.getPrincipalName;
|
||||
import static org.openmetadata.catalog.util.TestUtils.ADMIN_AUTH_HEADERS;
|
||||
@ -43,6 +44,7 @@ import java.net.URISyntaxException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@ -91,6 +93,7 @@ import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.type.Post;
|
||||
import org.openmetadata.catalog.type.Reaction;
|
||||
import org.openmetadata.catalog.type.ReactionType;
|
||||
import org.openmetadata.catalog.type.TagLabel;
|
||||
import org.openmetadata.catalog.type.TaskDetails;
|
||||
import org.openmetadata.catalog.type.TaskStatus;
|
||||
import org.openmetadata.catalog.type.TaskType;
|
||||
@ -388,7 +391,7 @@ public class FeedResourceTest extends CatalogApplicationTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void post_resolveTask_200() throws IOException {
|
||||
void post_resolveTask_description_200() throws IOException {
|
||||
CreateTaskDetails taskDetails =
|
||||
new CreateTaskDetails()
|
||||
.withOldValue("old description")
|
||||
@ -432,6 +435,54 @@ public class FeedResourceTest extends CatalogApplicationTest {
|
||||
assertEquals(TaskStatus.Closed, task.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
void post_resolveTask_tags_200() throws IOException {
|
||||
String newValue = "[" + JsonUtils.pojoToJson(USER_ADDRESS_TAG_LABEL) + "]";
|
||||
CreateTaskDetails taskDetails =
|
||||
new CreateTaskDetails()
|
||||
.withOldValue(null)
|
||||
.withAssignees(List.of(USER2.getEntityReference()))
|
||||
.withType(TaskType.RequestTag)
|
||||
.withSuggestion(newValue);
|
||||
|
||||
String about = create().getAbout();
|
||||
about = about.substring(0, about.length() - 1) + "::columns::c1::tags>";
|
||||
CreateThread create =
|
||||
create()
|
||||
.withMessage("Request Tags for column")
|
||||
.withTaskDetails(taskDetails)
|
||||
.withType(ThreadType.Task)
|
||||
.withAbout(about);
|
||||
|
||||
Map<String, String> userAuthHeaders = authHeaders(USER.getEmail());
|
||||
createAndCheck(create, userAuthHeaders);
|
||||
|
||||
ThreadList tasks = listTasks(null, null, null, null, userAuthHeaders);
|
||||
TaskDetails task = tasks.getData().get(0).getTask();
|
||||
assertNotNull(task.getId());
|
||||
int taskId = task.getId();
|
||||
|
||||
ResolveTask resolveTask = new ResolveTask().withNewValue(newValue);
|
||||
resolveTask(taskId, resolveTask, userAuthHeaders);
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("fields", "tags");
|
||||
ResultList<Table> tables = TABLE_RESOURCE_TEST.listEntities(params, userAuthHeaders);
|
||||
Optional<Table> table =
|
||||
tables.getData().stream()
|
||||
.filter(t -> t.getFullyQualifiedName().equals(TABLE.getFullyQualifiedName()))
|
||||
.findFirst();
|
||||
assertTrue(table.isPresent());
|
||||
List<TagLabel> tags =
|
||||
table.get().getColumns().stream().filter(c -> c.getName().equals("c1")).findFirst().get().getTags();
|
||||
assertEquals(USER_ADDRESS_TAG_LABEL.getTagFQN(), tags.get(0).getTagFQN());
|
||||
|
||||
Thread taskThread = getTask(taskId, userAuthHeaders);
|
||||
task = taskThread.getTask();
|
||||
assertEquals(taskId, task.getId());
|
||||
assertEquals(newValue, task.getNewValue());
|
||||
assertEquals(TaskStatus.Closed, task.getStatus());
|
||||
}
|
||||
|
||||
private static Stream<Arguments> provideStringsForListThreads() {
|
||||
return Stream.of(
|
||||
Arguments.of(String.format("<#E::%s::%s>", Entity.USER, USER.getName())),
|
||||
|
Loading…
x
Reference in New Issue
Block a user