Fix #9283: Description and tags are not getting updated for schema fields (#9325)

* Fix #9283: Description and tags are not getting updated for schema fields

* Fix checkstyle; address comments

* Fix check style
This commit is contained in:
Sriharsha Chintalapani 2022-12-16 05:39:31 -08:00 committed by GitHub
parent a15600e4e6
commit 9598694b0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 103 additions and 1 deletions

View File

@ -15,14 +15,20 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME;
import static org.openmetadata.service.Entity.FIELD_FOLLOWERS;
import static org.openmetadata.service.Entity.FIELD_TAGS;
import static org.openmetadata.service.util.EntityUtil.getSchemaField;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.entity.data.Topic;
import org.openmetadata.schema.entity.services.MessagingService;
@ -35,6 +41,7 @@ import org.openmetadata.schema.type.topic.CleanupPolicy;
import org.openmetadata.schema.type.topic.TopicSampleData;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.topics.TopicResource;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
@ -201,7 +208,7 @@ public class TopicRepository extends EntityRepository<Topic> {
}
private void applyTags(List<Field> fields) {
// Add column level tags by adding tag to column relationship
// Add field level tags by adding tag to field relationship
for (Field field : fields) {
applyTags(field.getTags(), field.getFullyQualifiedName());
if (field.getChildren() != null) {
@ -241,6 +248,11 @@ public class TopicRepository extends EntityRepository<Topic> {
"schema.schemaType",
original.getMessageSchema().getSchemaType(),
updated.getMessageSchema().getSchemaType());
updateSchemaFields(
"schemaFields",
original.getMessageSchema().getSchemaFields(),
updated.getMessageSchema().getSchemaFields(),
EntityUtil.schemaFieldMatch);
}
recordChange("topicConfig", original.getTopicConfig(), updated.getTopicConfig());
updateCleanupPolicies(original, updated);
@ -257,5 +269,80 @@ public class TopicRepository extends EntityRepository<Topic> {
deleted,
CleanupPolicy::equals);
}
private void updateSchemaFields(
String fieldName, List<Field> origFields, List<Field> updatedFields, BiPredicate<Field, Field> fieldMatch)
throws IOException {
List<Field> deletedFields = new ArrayList<>();
List<Field> addedFields = new ArrayList<>();
recordListChange(fieldName, origFields, updatedFields, addedFields, deletedFields, fieldMatch);
// carry forward tags and description if deletedFields matches added field
Map<String, Field> addedFieldMap =
addedFields.stream().collect(Collectors.toMap(Field::getName, Function.identity()));
for (Field deleted : deletedFields) {
if (addedFieldMap.containsKey(deleted.getName())) {
Field addedField = addedFieldMap.get(deleted.getName());
if (nullOrEmpty(addedField.getDescription()) && nullOrEmpty(deleted.getDescription())) {
addedField.setDescription(deleted.getDescription());
}
if (nullOrEmpty(addedField.getTags()) && nullOrEmpty(deleted.getTags())) {
addedField.setTags(deleted.getTags());
}
}
}
// Delete tags related to deleted fields
deletedFields.forEach(deleted -> daoCollection.tagUsageDAO().deleteTagsByTarget(deleted.getFullyQualifiedName()));
// Add tags related to newly added fields
for (Field added : addedFields) {
applyTags(added.getTags(), added.getFullyQualifiedName());
}
// Carry forward the user generated metadata from existing fields to new fields
for (Field updated : updatedFields) {
// Find stored field matching name, data type and ordinal position
Field stored = origFields.stream().filter(c -> fieldMatch.test(c, updated)).findAny().orElse(null);
if (stored == null) { // New field added
continue;
}
updateFieldDescription(stored, updated);
updateFieldDisplayName(stored, updated);
updateTags(
stored.getFullyQualifiedName(),
EntityUtil.getFieldName(fieldName, updated.getName(), FIELD_TAGS),
stored.getTags(),
updated.getTags());
if (updated.getChildren() != null && stored.getChildren() != null) {
String childrenFieldName = EntityUtil.getFieldName(fieldName, updated.getName());
updateSchemaFields(childrenFieldName, stored.getChildren(), updated.getChildren(), fieldMatch);
}
}
majorVersionChange = majorVersionChange || !deletedFields.isEmpty();
}
private void updateFieldDescription(Field origField, Field updatedField) throws JsonProcessingException {
if (operation.isPut() && !nullOrEmpty(origField.getDescription()) && updatedByBot()) {
// Revert the non-empty field description if being updated by a bot
updatedField.setDescription(origField.getDescription());
return;
}
String field = getSchemaField(original, origField, FIELD_DISPLAY_NAME);
recordChange(field, origField.getDescription(), updatedField.getDescription());
}
private void updateFieldDisplayName(Field origField, Field updatedField) throws JsonProcessingException {
if (operation.isPut() && !nullOrEmpty(origField.getDescription()) && updatedByBot()) {
// Revert the non-empty field description if being updated by a bot
updatedField.setDisplayName(origField.getDisplayName());
return;
}
String field = getSchemaField(original, origField, FIELD_DISPLAY_NAME);
recordChange(field, origField.getDisplayName(), updatedField.getDisplayName());
}
}
}

View File

@ -40,6 +40,7 @@ import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.data.TermReference;
import org.openmetadata.schema.entity.data.GlossaryTerm;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.data.Topic;
import org.openmetadata.schema.entity.policies.accessControl.Rule;
import org.openmetadata.schema.entity.tags.Tag;
import org.openmetadata.schema.entity.type.CustomProperty;
@ -51,6 +52,7 @@ import org.openmetadata.schema.type.Column;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.FailureDetails;
import org.openmetadata.schema.type.Field;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.type.MlFeature;
@ -146,6 +148,10 @@ public final class EntityUtil {
public static final BiPredicate<Rule, Rule> ruleMatch = (ref1, ref2) -> ref1.getName().equals(ref2.getName());
public static final BiPredicate<Field, Field> schemaFieldMatch =
(field1, field2) ->
field1.getName().equalsIgnoreCase(field2.getName()) && field1.getDataType() == field2.getDataType();
private EntityUtil() {}
/** Validate Ingestion Schedule */
@ -375,6 +381,15 @@ public final class EntityUtil {
: FullyQualifiedName.build("columns", localColumnName, columnField);
}
/** Return schema field name of format "schemaFields".fieldName.fieldName */
public static String getSchemaField(Topic topic, Field field, String fieldName) {
// Remove topic FQN from schemaField FQN to get the local name
String localFieldName = EntityUtil.getLocalColumnName(topic.getFullyQualifiedName(), field.getFullyQualifiedName());
return fieldName == null
? FullyQualifiedName.build("schemaFields", localFieldName)
: FullyQualifiedName.build("schemaFields", localFieldName, fieldName);
}
/** Return rule field name of format "rules".ruleName.ruleFieldName */
public static String getRuleField(Rule rule, String ruleField) {
return ruleField == null