diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java index 31947948f2a..f7f704e9504 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java @@ -15,14 +15,20 @@ package org.openmetadata.service.jdbi3; import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME; import static org.openmetadata.service.Entity.FIELD_FOLLOWERS; import static org.openmetadata.service.Entity.FIELD_TAGS; +import static org.openmetadata.service.util.EntityUtil.getSchemaField; import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.function.BiPredicate; +import java.util.function.Function; +import java.util.stream.Collectors; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.entity.data.Topic; import org.openmetadata.schema.entity.services.MessagingService; @@ -35,6 +41,7 @@ import org.openmetadata.schema.type.topic.CleanupPolicy; import org.openmetadata.schema.type.topic.TopicSampleData; import org.openmetadata.service.Entity; import org.openmetadata.service.resources.topics.TopicResource; +import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.EntityUtil.Fields; import org.openmetadata.service.util.FullyQualifiedName; import org.openmetadata.service.util.JsonUtils; @@ -201,7 +208,7 @@ public class TopicRepository extends EntityRepository { } private void applyTags(List fields) { - // Add column level tags by adding tag to column relationship + // Add field level tags by adding tag to field relationship for (Field field : fields) { applyTags(field.getTags(), field.getFullyQualifiedName()); if (field.getChildren() != null) { @@ -241,6 +248,11 @@ public class TopicRepository extends EntityRepository { "schema.schemaType", original.getMessageSchema().getSchemaType(), updated.getMessageSchema().getSchemaType()); + updateSchemaFields( + "schemaFields", + original.getMessageSchema().getSchemaFields(), + updated.getMessageSchema().getSchemaFields(), + EntityUtil.schemaFieldMatch); } recordChange("topicConfig", original.getTopicConfig(), updated.getTopicConfig()); updateCleanupPolicies(original, updated); @@ -257,5 +269,80 @@ public class TopicRepository extends EntityRepository { deleted, CleanupPolicy::equals); } + + private void updateSchemaFields( + String fieldName, List origFields, List updatedFields, BiPredicate fieldMatch) + throws IOException { + List deletedFields = new ArrayList<>(); + List addedFields = new ArrayList<>(); + recordListChange(fieldName, origFields, updatedFields, addedFields, deletedFields, fieldMatch); + // carry forward tags and description if deletedFields matches added field + Map addedFieldMap = + addedFields.stream().collect(Collectors.toMap(Field::getName, Function.identity())); + + for (Field deleted : deletedFields) { + if (addedFieldMap.containsKey(deleted.getName())) { + Field addedField = addedFieldMap.get(deleted.getName()); + if (nullOrEmpty(addedField.getDescription()) && nullOrEmpty(deleted.getDescription())) { + addedField.setDescription(deleted.getDescription()); + } + if (nullOrEmpty(addedField.getTags()) && nullOrEmpty(deleted.getTags())) { + addedField.setTags(deleted.getTags()); + } + } + } + + // Delete tags related to deleted fields + deletedFields.forEach(deleted -> daoCollection.tagUsageDAO().deleteTagsByTarget(deleted.getFullyQualifiedName())); + + // Add tags related to newly added fields + for (Field added : addedFields) { + applyTags(added.getTags(), added.getFullyQualifiedName()); + } + + // Carry forward the user generated metadata from existing fields to new fields + for (Field updated : updatedFields) { + // Find stored field matching name, data type and ordinal position + Field stored = origFields.stream().filter(c -> fieldMatch.test(c, updated)).findAny().orElse(null); + if (stored == null) { // New field added + continue; + } + + updateFieldDescription(stored, updated); + updateFieldDisplayName(stored, updated); + updateTags( + stored.getFullyQualifiedName(), + EntityUtil.getFieldName(fieldName, updated.getName(), FIELD_TAGS), + stored.getTags(), + updated.getTags()); + + if (updated.getChildren() != null && stored.getChildren() != null) { + String childrenFieldName = EntityUtil.getFieldName(fieldName, updated.getName()); + updateSchemaFields(childrenFieldName, stored.getChildren(), updated.getChildren(), fieldMatch); + } + } + + majorVersionChange = majorVersionChange || !deletedFields.isEmpty(); + } + + private void updateFieldDescription(Field origField, Field updatedField) throws JsonProcessingException { + if (operation.isPut() && !nullOrEmpty(origField.getDescription()) && updatedByBot()) { + // Revert the non-empty field description if being updated by a bot + updatedField.setDescription(origField.getDescription()); + return; + } + String field = getSchemaField(original, origField, FIELD_DISPLAY_NAME); + recordChange(field, origField.getDescription(), updatedField.getDescription()); + } + + private void updateFieldDisplayName(Field origField, Field updatedField) throws JsonProcessingException { + if (operation.isPut() && !nullOrEmpty(origField.getDescription()) && updatedByBot()) { + // Revert the non-empty field description if being updated by a bot + updatedField.setDisplayName(origField.getDisplayName()); + return; + } + String field = getSchemaField(original, origField, FIELD_DISPLAY_NAME); + recordChange(field, origField.getDisplayName(), updatedField.getDisplayName()); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java index 5d59dbbfa4c..5fdf38c7a7b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java @@ -40,6 +40,7 @@ import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.data.TermReference; import org.openmetadata.schema.entity.data.GlossaryTerm; import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.entity.data.Topic; import org.openmetadata.schema.entity.policies.accessControl.Rule; import org.openmetadata.schema.entity.tags.Tag; import org.openmetadata.schema.entity.type.CustomProperty; @@ -51,6 +52,7 @@ import org.openmetadata.schema.type.Column; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.EventType; import org.openmetadata.schema.type.FailureDetails; +import org.openmetadata.schema.type.Field; import org.openmetadata.schema.type.FieldChange; import org.openmetadata.schema.type.MetadataOperation; import org.openmetadata.schema.type.MlFeature; @@ -146,6 +148,10 @@ public final class EntityUtil { public static final BiPredicate ruleMatch = (ref1, ref2) -> ref1.getName().equals(ref2.getName()); + public static final BiPredicate schemaFieldMatch = + (field1, field2) -> + field1.getName().equalsIgnoreCase(field2.getName()) && field1.getDataType() == field2.getDataType(); + private EntityUtil() {} /** Validate Ingestion Schedule */ @@ -375,6 +381,15 @@ public final class EntityUtil { : FullyQualifiedName.build("columns", localColumnName, columnField); } + /** Return schema field name of format "schemaFields".fieldName.fieldName */ + public static String getSchemaField(Topic topic, Field field, String fieldName) { + // Remove topic FQN from schemaField FQN to get the local name + String localFieldName = EntityUtil.getLocalColumnName(topic.getFullyQualifiedName(), field.getFullyQualifiedName()); + return fieldName == null + ? FullyQualifiedName.build("schemaFields", localFieldName) + : FullyQualifiedName.build("schemaFields", localFieldName, fieldName); + } + /** Return rule field name of format "rules".ruleName.ruleFieldName */ public static String getRuleField(Rule rule, String ruleField) { return ruleField == null