fix(protobuf): skip MESSAGE-type options in PropertyVisitor (#14957)

DataHub protobuf metadata type annotations only apply to primitive extension types, not MESSAGE types (see https://docs.datahub.com/docs/metadata-integration/java/datahub-protobuf). Previously, PropertyVisitor incorrectly attempted to process MESSAGE-type options, causing issues with user test cases.

This change removes MESSAGE-type handling from PropertyVisitor. Additionally refactored ProtobufExtensionUtil.filterByDataHubType() for readability by replacing nested stream chains with explicit loops and extracting a matchesDataHubType() helper method.

🤖 Generated with Claude Code

Co-Authored-By: Claude noreply@anthropic.com
This commit is contained in:
Abe 2025-10-08 16:19:30 -07:00 committed by GitHub
parent 3caa781540
commit 0a0fcef047
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 73 additions and 66 deletions

View File

@ -14,7 +14,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -43,45 +42,59 @@ public class ProtobufExtensionUtil {
public static final String PROTOBUF_TYPE = "DataHubMetadataType";
}
/**
* Checks if a field descriptor is annotated with a specific DataHub metadata type option. Fields
* without an explicit annotation default to PROPERTY type.
*/
@SuppressWarnings("unchecked")
private static boolean matchesDataHubType(
Descriptors.FieldDescriptor fieldDescriptor,
ExtensionRegistry registry,
DataHubMetadataType filterType) {
DescriptorProtos.FieldDescriptorProto extendedProtoOptions =
extendProto(fieldDescriptor.toProto(), registry);
for (Map.Entry<Descriptors.FieldDescriptor, Object> extEntry :
extendedProtoOptions.getOptions().getAllFields().entrySet()) {
if (extEntry.getKey().getJavaType() != Descriptors.FieldDescriptor.JavaType.ENUM) {
continue;
}
Collection<Descriptors.EnumValueDescriptor> enumValues;
if (extEntry.getKey().isRepeated()) {
enumValues = (Collection<Descriptors.EnumValueDescriptor>) extEntry.getValue();
} else {
enumValues = List.of((Descriptors.EnumValueDescriptor) extEntry.getValue());
}
for (Descriptors.EnumValueDescriptor enumDesc : enumValues) {
if (!enumDesc.getType().getFullName().endsWith("." + DataHubMetadataType.PROTOBUF_TYPE)) {
continue;
}
DataHubMetadataType dhmt = DataHubMetadataType.valueOf(enumDesc.getName());
if (dhmt.equals(filterType)) {
return true;
}
}
}
return filterType.equals(DataHubMetadataType.PROPERTY);
}
public static List<Pair<Descriptors.FieldDescriptor, Object>> filterByDataHubType(
List<Pair<Descriptors.FieldDescriptor, Object>> options,
ExtensionRegistry registry,
DataHubMetadataType filterType) {
return options.stream()
.filter(
entry -> {
DescriptorProtos.FieldDescriptorProto extendedProtoOptions =
extendProto(entry.getKey().toProto(), registry);
Optional<DataHubMetadataType> dataHubMetadataType =
extendedProtoOptions.getOptions().getAllFields().entrySet().stream()
.filter(
extEntry ->
extEntry.getKey().getJavaType()
== Descriptors.FieldDescriptor.JavaType.ENUM)
.flatMap(
extEntry -> {
if (extEntry.getKey().isRepeated()) {
return ((Collection<Descriptors.EnumValueDescriptor>)
extEntry.getValue())
.stream();
} else {
return Stream.of(
(Descriptors.EnumValueDescriptor) extEntry.getValue());
}
})
.filter(
enumDesc ->
enumDesc
.getType()
.getFullName()
.endsWith("." + DataHubMetadataType.PROTOBUF_TYPE))
.map(enumDesc -> DataHubMetadataType.valueOf(enumDesc.getName()))
.filter(dhmt -> dhmt.equals(filterType))
.findFirst();
List<Pair<Descriptors.FieldDescriptor, Object>> result = new java.util.ArrayList<>();
return filterType.equals(dataHubMetadataType.orElse(DataHubMetadataType.PROPERTY));
})
.collect(Collectors.toList());
for (Pair<Descriptors.FieldDescriptor, Object> entry : options) {
if (matchesDataHubType(entry.getKey(), registry, filterType)) {
result.add(entry);
}
}
return result;
}
public static Stream<Map.Entry<String, String>> getProperties(

View File

@ -1,19 +1,20 @@
package datahub.protobuf.visitors.dataset;
import static datahub.protobuf.ProtobufUtils.getMessageOptions;
import static datahub.protobuf.visitors.ProtobufExtensionUtil.getProperties;
import com.google.gson.Gson;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.linkedin.data.template.StringMap;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.util.Pair;
import datahub.protobuf.visitors.ProtobufExtensionUtil;
import datahub.protobuf.visitors.ProtobufModelVisitor;
import datahub.protobuf.visitors.VisitContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class PropertyVisitor implements ProtobufModelVisitor<DatasetProperties> {
@ -21,35 +22,28 @@ public class PropertyVisitor implements ProtobufModelVisitor<DatasetProperties>
@Override
public Stream<DatasetProperties> visitGraph(VisitContext context) {
Map<String, String> properties =
Map<String, String> properties = new HashMap<>();
List<Pair<Descriptors.FieldDescriptor, Object>> propertyOptions =
ProtobufExtensionUtil.filterByDataHubType(
getMessageOptions(context.root().messageProto()),
context.getGraph().getRegistry(),
ProtobufExtensionUtil.DataHubMetadataType.PROPERTY)
.stream()
.flatMap(
fd -> {
if (fd.getKey().getJavaType() != Descriptors.FieldDescriptor.JavaType.MESSAGE) {
if (fd.getKey().isRepeated()) {
return Stream.of(
Map.entry(
fd.getKey().getName(),
GSON.toJson(
((Collection<?>) fd.getValue())
.stream()
.map(Object::toString)
.collect(Collectors.toList()))));
} else {
return Stream.of(Map.entry(fd.getKey().getName(), fd.getValue().toString()));
ProtobufExtensionUtil.DataHubMetadataType.PROPERTY);
for (Pair<Descriptors.FieldDescriptor, Object> fd : propertyOptions) {
var fieldDescriptor = fd.getKey();
if (fieldDescriptor.getJavaType() != Descriptors.FieldDescriptor.JavaType.MESSAGE) {
if (fieldDescriptor.isRepeated()) {
List<String> stringValues = new ArrayList<>();
for (Object item : (Collection<?>) fd.getValue()) {
stringValues.add(item.toString());
}
properties.put(fieldDescriptor.getName(), GSON.toJson(stringValues));
} else {
Descriptors.FieldDescriptor field = fd.getKey();
DescriptorProtos.DescriptorProto value =
(DescriptorProtos.DescriptorProto) fd.getValue();
return getProperties(field, value);
properties.put(fieldDescriptor.getName(), fd.getValue().toString());
}
}
}
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Stream.of(new DatasetProperties().setCustomProperties(new StringMap(properties)));
}