From 3c4b423f364a58087fd4fa13a89be59bc5b66187 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 15 Mar 2023 23:44:47 -0700 Subject: [PATCH] Fix patching tags for tasks (#10561) * Fix patching tags for tasks * Fix stylecheck --- .../service/elasticsearch/PipelineIndex.java | 10 +++- .../service/jdbi3/PipelineRepository.java | 49 ++++++++++++++++++- .../pipelines/PipelineResourceTest.java | 10 +++- 3 files changed, 65 insertions(+), 4 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/PipelineIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/PipelineIndex.java index 8e4fb35d63d..16628746808 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/PipelineIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/PipelineIndex.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import org.openmetadata.schema.entity.data.Pipeline; +import org.openmetadata.schema.type.TagLabel; import org.openmetadata.schema.type.Task; import org.openmetadata.service.Entity; import org.openmetadata.service.util.JsonUtils; @@ -22,15 +23,22 @@ public class PipelineIndex implements ElasticSearchIndex { List suggest = new ArrayList<>(); List serviceSuggest = new ArrayList<>(); List taskSuggest = new ArrayList<>(); + List tags = new ArrayList<>(); suggest.add(ElasticSearchSuggest.builder().input(pipeline.getFullyQualifiedName()).weight(5).build()); suggest.add(ElasticSearchSuggest.builder().input(pipeline.getDisplayName()).weight(10).build()); serviceSuggest.add(ElasticSearchSuggest.builder().input(pipeline.getService().getName()).weight(5).build()); - ParseTags parseTags = new ParseTags(ElasticSearchIndexUtils.parseTags(pipeline.getTags())); + if (pipeline.getTasks() != null) { for (Task task : pipeline.getTasks()) { taskSuggest.add(ElasticSearchSuggest.builder().input(task.getName()).weight(5).build()); + if (task.getTags() != null) { + tags.addAll(task.getTags()); + } } } + tags.addAll(ElasticSearchIndexUtils.parseTags(pipeline.getTags())); + + ParseTags parseTags = new ParseTags(tags); doc.put("name", pipeline.getName() != null ? pipeline.getName() : pipeline.getDisplayName()); doc.put("displayName", pipeline.getDisplayName() != null ? pipeline.getDisplayName() : pipeline.getName()); doc.put("followers", ElasticSearchIndexUtils.parseFollowers(pipeline.getFollowers())); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java index a42c3d04018..316efbe66d8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java @@ -16,6 +16,7 @@ package org.openmetadata.service.jdbi3; import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.Entity.FIELD_FOLLOWERS; +import static org.openmetadata.service.Entity.FIELD_TAGS; import static org.openmetadata.service.util.EntityUtil.taskMatch; import com.fasterxml.jackson.core.JsonProcessingException; @@ -59,12 +60,14 @@ public class PipelineRepository extends EntityRepository { @Override public void setFullyQualifiedName(Pipeline pipeline) { pipeline.setFullyQualifiedName(FullyQualifiedName.add(pipeline.getService().getName(), pipeline.getName())); + setTaskFQN(pipeline.getFullyQualifiedName(), pipeline.getTasks()); } @Override public Pipeline setFields(Pipeline pipeline, Fields fields) throws IOException { pipeline.setService(getContainer(pipeline.getId())); pipeline.setFollowers(fields.contains(FIELD_FOLLOWERS) ? getFollowers(pipeline) : null); + getTaskTags(fields.contains(FIELD_TAGS), pipeline.getTasks()); if (!fields.contains("tasks")) { pipeline.withTasks(null); } @@ -181,10 +184,13 @@ public class PipelineRepository extends EntityRepository { // Don't store owner, database, href and tags as JSON. Build it on the fly based on relationships pipeline.withOwner(null).withService(null).withHref(null).withTags(null); + // Don't store column tags as JSON but build it on the fly based on relationships + List taskWithTags = pipeline.getTasks(); + pipeline.setTasks(cloneWithoutTags(taskWithTags)); store(pipeline, update); // Restore the relationships - pipeline.withOwner(owner).withService(service).withTags(tags); + pipeline.withOwner(owner).withService(service).withTags(tags).withTasks(taskWithTags); } @Override @@ -199,6 +205,37 @@ public class PipelineRepository extends EntityRepository { applyTags(pipeline); } + @Override + public void applyTags(Pipeline pipeline) { + // Add table level tags by adding tag to table relationship + super.applyTags(pipeline); + applyTags(pipeline.getTasks()); + } + + private void applyTags(List tasks) { + if (tasks != null) { + for (Task task : tasks) { + applyTags(task.getTags(), task.getFullyQualifiedName()); + } + } + } + + private void getTaskTags(boolean setTags, List tasks) { + for (Task t : listOrEmpty(tasks)) { + t.setTags(setTags ? getTags(t.getFullyQualifiedName()) : null); + } + } + + private void setTaskFQN(String parentFQN, List tasks) { + if (tasks != null) { + tasks.forEach( + t -> { + String taskFqn = FullyQualifiedName.add(parentFQN, t.getName()); + t.setFullyQualifiedName(taskFqn); + }); + } + } + @Override public EntityUpdater getUpdater(Pipeline original, Pipeline updated, Operation operation) { return new PipelineUpdater(original, updated, operation); @@ -210,6 +247,15 @@ public class PipelineRepository extends EntityRepository { pipeline.setServiceType(service.getServiceType()); } + private static List cloneWithoutTags(List tasks) { + if (nullOrEmpty(tasks)) { + return tasks; + } + List copy = new ArrayList<>(); + tasks.forEach(t -> copy.add(t.withTags(null))); + return copy; + } + /** Handles entity updated from PUT and POST operation. */ public class PipelineUpdater extends EntityUpdater { public PipelineUpdater(Pipeline original, Pipeline updated, Operation operation) { @@ -249,6 +295,7 @@ public class PipelineRepository extends EntityRepository { } updateTaskDescription(stored, updatedTask); } + applyTags(updatedTasks); boolean removedTasks = updatedTasks.size() < origTasks.size(); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/pipelines/PipelineResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/pipelines/PipelineResourceTest.java index a7c955923b7..c437c5bddc4 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/pipelines/PipelineResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/pipelines/PipelineResourceTest.java @@ -125,6 +125,9 @@ public class PipelineResourceTest extends EntityResourceTest tasks = new ArrayList<>(); - Task taskEmptyDesc = new Task().withName("taskEmpty").withTaskUrl("http://localhost:0"); + Task taskEmptyDesc = + new Task().withName("taskEmpty").withTaskUrl("http://localhost:0").withTags(List.of(USER_ADDRESS_TAG_LABEL)); tasks.add(taskEmptyDesc); fieldAdded(change, "tasks", tasks); fieldUpdated(change, "description", "", "newDescription"); - // Create new request with all the Tasks List updatedTasks = Stream.concat(TASKS.stream(), tasks.stream()).collect(Collectors.toList()); pipeline.setTasks(updatedTasks); pipeline.setDescription("newDescription"); pipeline = patchEntityAndCheck(pipeline, origJson, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change); + pipeline = getPipeline(pipeline.getId(), "*", ADMIN_AUTH_HEADERS); + // validate tasks + validateTasks(updatedTasks, pipeline.getTasks()); // add a description to an existing task origJson = JsonUtils.pojoToJson(pipeline);