diff --git a/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/visitors/ProtobufExtensionUtil.java b/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/visitors/ProtobufExtensionUtil.java index 085516a025..16633a5564 100644 --- a/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/visitors/ProtobufExtensionUtil.java +++ b/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/visitors/ProtobufExtensionUtil.java @@ -14,7 +14,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -43,45 +42,59 @@ public class ProtobufExtensionUtil { public static final String PROTOBUF_TYPE = "DataHubMetadataType"; } + /** + * Checks if a field descriptor is annotated with a specific DataHub metadata type option. Fields + * without an explicit annotation default to PROPERTY type. + */ + @SuppressWarnings("unchecked") + private static boolean matchesDataHubType( + Descriptors.FieldDescriptor fieldDescriptor, + ExtensionRegistry registry, + DataHubMetadataType filterType) { + DescriptorProtos.FieldDescriptorProto extendedProtoOptions = + extendProto(fieldDescriptor.toProto(), registry); + + for (Map.Entry extEntry : + extendedProtoOptions.getOptions().getAllFields().entrySet()) { + if (extEntry.getKey().getJavaType() != Descriptors.FieldDescriptor.JavaType.ENUM) { + continue; + } + + Collection enumValues; + if (extEntry.getKey().isRepeated()) { + enumValues = (Collection) extEntry.getValue(); + } else { + enumValues = List.of((Descriptors.EnumValueDescriptor) extEntry.getValue()); + } + + for (Descriptors.EnumValueDescriptor enumDesc : enumValues) { + if (!enumDesc.getType().getFullName().endsWith("." + DataHubMetadataType.PROTOBUF_TYPE)) { + continue; + } + + DataHubMetadataType dhmt = DataHubMetadataType.valueOf(enumDesc.getName()); + if (dhmt.equals(filterType)) { + return true; + } + } + } + + return filterType.equals(DataHubMetadataType.PROPERTY); + } + public static List> filterByDataHubType( List> options, ExtensionRegistry registry, DataHubMetadataType filterType) { - return options.stream() - .filter( - entry -> { - DescriptorProtos.FieldDescriptorProto extendedProtoOptions = - extendProto(entry.getKey().toProto(), registry); - Optional dataHubMetadataType = - extendedProtoOptions.getOptions().getAllFields().entrySet().stream() - .filter( - extEntry -> - extEntry.getKey().getJavaType() - == Descriptors.FieldDescriptor.JavaType.ENUM) - .flatMap( - extEntry -> { - if (extEntry.getKey().isRepeated()) { - return ((Collection) - extEntry.getValue()) - .stream(); - } else { - return Stream.of( - (Descriptors.EnumValueDescriptor) extEntry.getValue()); - } - }) - .filter( - enumDesc -> - enumDesc - .getType() - .getFullName() - .endsWith("." + DataHubMetadataType.PROTOBUF_TYPE)) - .map(enumDesc -> DataHubMetadataType.valueOf(enumDesc.getName())) - .filter(dhmt -> dhmt.equals(filterType)) - .findFirst(); + List> result = new java.util.ArrayList<>(); - return filterType.equals(dataHubMetadataType.orElse(DataHubMetadataType.PROPERTY)); - }) - .collect(Collectors.toList()); + for (Pair entry : options) { + if (matchesDataHubType(entry.getKey(), registry, filterType)) { + result.add(entry); + } + } + + return result; } public static Stream> getProperties( diff --git a/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/visitors/dataset/PropertyVisitor.java b/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/visitors/dataset/PropertyVisitor.java index 113cf6f1a5..24f9f3ded2 100644 --- a/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/visitors/dataset/PropertyVisitor.java +++ b/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/visitors/dataset/PropertyVisitor.java @@ -1,19 +1,20 @@ package datahub.protobuf.visitors.dataset; import static datahub.protobuf.ProtobufUtils.getMessageOptions; -import static datahub.protobuf.visitors.ProtobufExtensionUtil.getProperties; import com.google.gson.Gson; -import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; import com.linkedin.data.template.StringMap; import com.linkedin.dataset.DatasetProperties; +import com.linkedin.util.Pair; import datahub.protobuf.visitors.ProtobufExtensionUtil; import datahub.protobuf.visitors.ProtobufModelVisitor; import datahub.protobuf.visitors.VisitContext; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; public class PropertyVisitor implements ProtobufModelVisitor { @@ -21,35 +22,28 @@ public class PropertyVisitor implements ProtobufModelVisitor @Override public Stream visitGraph(VisitContext context) { - Map properties = + Map properties = new HashMap<>(); + + List> propertyOptions = ProtobufExtensionUtil.filterByDataHubType( - getMessageOptions(context.root().messageProto()), - context.getGraph().getRegistry(), - ProtobufExtensionUtil.DataHubMetadataType.PROPERTY) - .stream() - .flatMap( - fd -> { - if (fd.getKey().getJavaType() != Descriptors.FieldDescriptor.JavaType.MESSAGE) { - if (fd.getKey().isRepeated()) { - return Stream.of( - Map.entry( - fd.getKey().getName(), - GSON.toJson( - ((Collection) fd.getValue()) - .stream() - .map(Object::toString) - .collect(Collectors.toList())))); - } else { - return Stream.of(Map.entry(fd.getKey().getName(), fd.getValue().toString())); - } - } else { - Descriptors.FieldDescriptor field = fd.getKey(); - DescriptorProtos.DescriptorProto value = - (DescriptorProtos.DescriptorProto) fd.getValue(); - return getProperties(field, value); - } - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + getMessageOptions(context.root().messageProto()), + context.getGraph().getRegistry(), + ProtobufExtensionUtil.DataHubMetadataType.PROPERTY); + + for (Pair fd : propertyOptions) { + var fieldDescriptor = fd.getKey(); + if (fieldDescriptor.getJavaType() != Descriptors.FieldDescriptor.JavaType.MESSAGE) { + if (fieldDescriptor.isRepeated()) { + List stringValues = new ArrayList<>(); + for (Object item : (Collection) fd.getValue()) { + stringValues.add(item.toString()); + } + properties.put(fieldDescriptor.getName(), GSON.toJson(stringValues)); + } else { + properties.put(fieldDescriptor.getName(), fd.getValue().toString()); + } + } + } return Stream.of(new DatasetProperties().setCustomProperties(new StringMap(properties))); }