Fix patching tags for tasks (#10561)

* Fix patching tags for tasks

* Fix stylecheck
This commit is contained in:
Sriharsha Chintalapani 2023-03-15 23:44:47 -07:00 committed by GitHub
parent 0e9930c0ed
commit 3c4b423f36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 4 deletions

View File

@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.openmetadata.schema.entity.data.Pipeline;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.Task;
import org.openmetadata.service.Entity;
import org.openmetadata.service.util.JsonUtils;
@ -22,15 +23,22 @@ public class PipelineIndex implements ElasticSearchIndex {
List<ElasticSearchSuggest> suggest = new ArrayList<>();
List<ElasticSearchSuggest> serviceSuggest = new ArrayList<>();
List<ElasticSearchSuggest> taskSuggest = new ArrayList<>();
List<TagLabel> tags = new ArrayList<>();
suggest.add(ElasticSearchSuggest.builder().input(pipeline.getFullyQualifiedName()).weight(5).build());
suggest.add(ElasticSearchSuggest.builder().input(pipeline.getDisplayName()).weight(10).build());
serviceSuggest.add(ElasticSearchSuggest.builder().input(pipeline.getService().getName()).weight(5).build());
ParseTags parseTags = new ParseTags(ElasticSearchIndexUtils.parseTags(pipeline.getTags()));
if (pipeline.getTasks() != null) {
for (Task task : pipeline.getTasks()) {
taskSuggest.add(ElasticSearchSuggest.builder().input(task.getName()).weight(5).build());
if (task.getTags() != null) {
tags.addAll(task.getTags());
}
}
}
tags.addAll(ElasticSearchIndexUtils.parseTags(pipeline.getTags()));
ParseTags parseTags = new ParseTags(tags);
doc.put("name", pipeline.getName() != null ? pipeline.getName() : pipeline.getDisplayName());
doc.put("displayName", pipeline.getDisplayName() != null ? pipeline.getDisplayName() : pipeline.getName());
doc.put("followers", ElasticSearchIndexUtils.parseFollowers(pipeline.getFollowers()));

View File

@ -16,6 +16,7 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.FIELD_FOLLOWERS;
import static org.openmetadata.service.Entity.FIELD_TAGS;
import static org.openmetadata.service.util.EntityUtil.taskMatch;
import com.fasterxml.jackson.core.JsonProcessingException;
@ -59,12 +60,14 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
@Override
public void setFullyQualifiedName(Pipeline pipeline) {
pipeline.setFullyQualifiedName(FullyQualifiedName.add(pipeline.getService().getName(), pipeline.getName()));
setTaskFQN(pipeline.getFullyQualifiedName(), pipeline.getTasks());
}
@Override
public Pipeline setFields(Pipeline pipeline, Fields fields) throws IOException {
pipeline.setService(getContainer(pipeline.getId()));
pipeline.setFollowers(fields.contains(FIELD_FOLLOWERS) ? getFollowers(pipeline) : null);
getTaskTags(fields.contains(FIELD_TAGS), pipeline.getTasks());
if (!fields.contains("tasks")) {
pipeline.withTasks(null);
}
@ -181,10 +184,13 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
// Don't store owner, database, href and tags as JSON. Build it on the fly based on relationships
pipeline.withOwner(null).withService(null).withHref(null).withTags(null);
// Don't store column tags as JSON but build it on the fly based on relationships
List<Task> taskWithTags = pipeline.getTasks();
pipeline.setTasks(cloneWithoutTags(taskWithTags));
store(pipeline, update);
// Restore the relationships
pipeline.withOwner(owner).withService(service).withTags(tags);
pipeline.withOwner(owner).withService(service).withTags(tags).withTasks(taskWithTags);
}
@Override
@ -199,6 +205,37 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
applyTags(pipeline);
}
@Override
public void applyTags(Pipeline pipeline) {
// Add table level tags by adding tag to table relationship
super.applyTags(pipeline);
applyTags(pipeline.getTasks());
}
private void applyTags(List<Task> tasks) {
if (tasks != null) {
for (Task task : tasks) {
applyTags(task.getTags(), task.getFullyQualifiedName());
}
}
}
private void getTaskTags(boolean setTags, List<Task> tasks) {
for (Task t : listOrEmpty(tasks)) {
t.setTags(setTags ? getTags(t.getFullyQualifiedName()) : null);
}
}
private void setTaskFQN(String parentFQN, List<Task> tasks) {
if (tasks != null) {
tasks.forEach(
t -> {
String taskFqn = FullyQualifiedName.add(parentFQN, t.getName());
t.setFullyQualifiedName(taskFqn);
});
}
}
@Override
public EntityUpdater getUpdater(Pipeline original, Pipeline updated, Operation operation) {
return new PipelineUpdater(original, updated, operation);
@ -210,6 +247,15 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
pipeline.setServiceType(service.getServiceType());
}
private static List<Task> cloneWithoutTags(List<Task> tasks) {
if (nullOrEmpty(tasks)) {
return tasks;
}
List<Task> copy = new ArrayList<>();
tasks.forEach(t -> copy.add(t.withTags(null)));
return copy;
}
/** Handles entity updated from PUT and POST operation. */
public class PipelineUpdater extends EntityUpdater {
public PipelineUpdater(Pipeline original, Pipeline updated, Operation operation) {
@ -249,6 +295,7 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
}
updateTaskDescription(stored, updatedTask);
}
applyTags(updatedTasks);
boolean removedTasks = updatedTasks.size() < origTasks.size();

View File

@ -125,6 +125,9 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
assertTrue(
expectedTask.getName().equals(actualTask.getName())
|| expectedTask.getName().equals(actualTask.getDisplayName()));
if (expectedTask.getTags() != null && !expectedTask.getTags().isEmpty() && actualTask.getTags() != null) {
assertEquals(expectedTask.getTags(), actualTask.getTags());
}
i++;
}
}
@ -423,16 +426,19 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
// Add a task without description
ChangeDescription change = getChangeDescription(pipeline.getVersion());
List<Task> tasks = new ArrayList<>();
Task taskEmptyDesc = new Task().withName("taskEmpty").withTaskUrl("http://localhost:0");
Task taskEmptyDesc =
new Task().withName("taskEmpty").withTaskUrl("http://localhost:0").withTags(List.of(USER_ADDRESS_TAG_LABEL));
tasks.add(taskEmptyDesc);
fieldAdded(change, "tasks", tasks);
fieldUpdated(change, "description", "", "newDescription");
// Create new request with all the Tasks
List<Task> updatedTasks = Stream.concat(TASKS.stream(), tasks.stream()).collect(Collectors.toList());
pipeline.setTasks(updatedTasks);
pipeline.setDescription("newDescription");
pipeline = patchEntityAndCheck(pipeline, origJson, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
pipeline = getPipeline(pipeline.getId(), "*", ADMIN_AUTH_HEADERS);
// validate tasks
validateTasks(updatedTasks, pipeline.getTasks());
// add a description to an existing task
origJson = JsonUtils.pojoToJson(pipeline);