feat(ingest): Support protobuf description for enum field (#11027)

Co-authored-by: 양은석[G플레이스데이터개발] <eunseok.y@navercorp.com>
Co-authored-by: david-leifker <114954101+david-leifker@users.noreply.github.com>
This commit is contained in:
eunseokyang 2024-09-06 00:32:58 +09:00 committed by GitHub
parent e45674852c
commit 59f9a4ba55
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 324 additions and 210 deletions

View File

@ -1,5 +1,6 @@
package datahub.protobuf.model;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
@ -18,15 +19,20 @@ import com.linkedin.schema.StringType;
import datahub.protobuf.ProtobufUtils;
import datahub.protobuf.visitors.ProtobufModelVisitor;
import datahub.protobuf.visitors.VisitContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Builder(toBuilder = true)
@Getter
@AllArgsConstructor
@ -81,78 +87,72 @@ public class ProtobufField implements ProtobufElement {
@Override
public String nativeType() {
return Optional.ofNullable(nativeType)
.orElseGet(
() -> {
if (fieldProto.getTypeName().isEmpty()) {
return fieldProto.getType().name().split("_")[1].toLowerCase();
} else {
return fieldProto.getTypeName().replaceFirst("^[.]", "");
}
});
return Optional.ofNullable(nativeType).orElseGet(() -> {
if (fieldProto.getTypeName().isEmpty()) {
return fieldProto.getType().name().split("_")[1].toLowerCase();
} else {
return fieldProto.getTypeName().replaceFirst("^[.]", "");
}
});
}
@Override
public String fieldPathType() {
return Optional.ofNullable(fieldPathType)
.orElseGet(
() -> {
final String pathType;
return Optional.ofNullable(fieldPathType).orElseGet(() -> {
final String pathType;
switch (fieldProto.getType()) {
case TYPE_DOUBLE:
pathType = "double";
break;
case TYPE_FLOAT:
pathType = "float";
break;
case TYPE_SFIXED64:
case TYPE_FIXED64:
case TYPE_UINT64:
case TYPE_INT64:
case TYPE_SINT64:
pathType = "long";
break;
case TYPE_FIXED32:
case TYPE_SFIXED32:
case TYPE_INT32:
case TYPE_UINT32:
case TYPE_SINT32:
pathType = "int";
break;
case TYPE_BYTES:
pathType = "bytes";
break;
case TYPE_ENUM:
pathType = "enum";
break;
case TYPE_BOOL:
pathType = "boolean";
break;
case TYPE_STRING:
pathType = "string";
break;
case TYPE_GROUP:
case TYPE_MESSAGE:
pathType = nativeType().replace(".", "_");
break;
default:
throw new IllegalStateException(
String.format(
"Unexpected FieldDescriptorProto => FieldPathType %s",
fieldProto.getType()));
}
switch (fieldProto.getType()) {
case TYPE_DOUBLE:
pathType = "double";
break;
case TYPE_FLOAT:
pathType = "float";
break;
case TYPE_SFIXED64:
case TYPE_FIXED64:
case TYPE_UINT64:
case TYPE_INT64:
case TYPE_SINT64:
pathType = "long";
break;
case TYPE_FIXED32:
case TYPE_SFIXED32:
case TYPE_INT32:
case TYPE_UINT32:
case TYPE_SINT32:
pathType = "int";
break;
case TYPE_BYTES:
pathType = "bytes";
break;
case TYPE_ENUM:
pathType = "enum";
break;
case TYPE_BOOL:
pathType = "boolean";
break;
case TYPE_STRING:
pathType = "string";
break;
case TYPE_GROUP:
case TYPE_MESSAGE:
pathType = nativeType().replace(".", "_");
break;
default:
throw new IllegalStateException(
String.format("Unexpected FieldDescriptorProto => FieldPathType %s", fieldProto.getType()));
}
StringArray fieldPath = new StringArray();
StringArray fieldPath = new StringArray();
if (schemaFieldDataType().getType().isArrayType()) {
fieldPath.add("[type=array]");
}
if (schemaFieldDataType().getType().isArrayType()) {
fieldPath.add("[type=array]");
}
fieldPath.add(String.format("[type=%s]", pathType));
fieldPath.add(String.format("[type=%s]", pathType));
return String.join(".", fieldPath);
});
return String.join(".", fieldPath);
});
}
public boolean isMessage() {
@ -165,110 +165,92 @@ public class ProtobufField implements ProtobufElement {
}
public SchemaFieldDataType schemaFieldDataType() throws IllegalStateException {
return Optional.ofNullable(schemaFieldDataType)
.orElseGet(
() -> {
final SchemaFieldDataType.Type fieldType;
return Optional.ofNullable(schemaFieldDataType).orElseGet(() -> {
final SchemaFieldDataType.Type fieldType;
switch (fieldProto.getType()) {
case TYPE_DOUBLE:
case TYPE_FLOAT:
case TYPE_INT64:
case TYPE_UINT64:
case TYPE_INT32:
case TYPE_UINT32:
case TYPE_SINT32:
case TYPE_SINT64:
fieldType = SchemaFieldDataType.Type.create(new NumberType());
break;
case TYPE_GROUP:
case TYPE_MESSAGE:
fieldType = SchemaFieldDataType.Type.create(new RecordType());
break;
case TYPE_BYTES:
fieldType = SchemaFieldDataType.Type.create(new BytesType());
break;
case TYPE_ENUM:
fieldType = SchemaFieldDataType.Type.create(new EnumType());
break;
case TYPE_BOOL:
fieldType = SchemaFieldDataType.Type.create(new BooleanType());
break;
case TYPE_STRING:
fieldType = SchemaFieldDataType.Type.create(new StringType());
break;
case TYPE_FIXED64:
case TYPE_FIXED32:
case TYPE_SFIXED32:
case TYPE_SFIXED64:
fieldType = SchemaFieldDataType.Type.create(new FixedType());
break;
default:
throw new IllegalStateException(
String.format(
"Unexpected FieldDescriptorProto => SchemaFieldDataType: %s",
fieldProto.getType()));
}
switch (fieldProto.getType()) {
case TYPE_DOUBLE:
case TYPE_FLOAT:
case TYPE_INT64:
case TYPE_UINT64:
case TYPE_INT32:
case TYPE_UINT32:
case TYPE_SINT32:
case TYPE_SINT64:
fieldType = SchemaFieldDataType.Type.create(new NumberType());
break;
case TYPE_GROUP:
case TYPE_MESSAGE:
fieldType = SchemaFieldDataType.Type.create(new RecordType());
break;
case TYPE_BYTES:
fieldType = SchemaFieldDataType.Type.create(new BytesType());
break;
case TYPE_ENUM:
fieldType = SchemaFieldDataType.Type.create(new EnumType());
break;
case TYPE_BOOL:
fieldType = SchemaFieldDataType.Type.create(new BooleanType());
break;
case TYPE_STRING:
fieldType = SchemaFieldDataType.Type.create(new StringType());
break;
case TYPE_FIXED64:
case TYPE_FIXED32:
case TYPE_SFIXED32:
case TYPE_SFIXED64:
fieldType = SchemaFieldDataType.Type.create(new FixedType());
break;
default:
throw new IllegalStateException(
String.format("Unexpected FieldDescriptorProto => SchemaFieldDataType: %s", fieldProto.getType()));
}
if (fieldProto.getLabel().equals(FieldDescriptorProto.Label.LABEL_REPEATED)) {
return new SchemaFieldDataType()
.setType(
SchemaFieldDataType.Type.create(
new ArrayType().setNestedType(new StringArray())));
}
if (fieldProto.getLabel().equals(FieldDescriptorProto.Label.LABEL_REPEATED)) {
return new SchemaFieldDataType().setType(
SchemaFieldDataType.Type.create(new ArrayType().setNestedType(new StringArray())));
}
return new SchemaFieldDataType().setType(fieldType);
});
return new SchemaFieldDataType().setType(fieldType);
});
}
@Override
public Stream<SourceCodeInfo.Location> messageLocations() {
List<SourceCodeInfo.Location> fileLocations = fileProto().getSourceCodeInfo().getLocationList();
return fileLocations.stream()
.filter(
loc ->
loc.getPathCount() > 1
&& loc.getPath(0) == FileDescriptorProto.MESSAGE_TYPE_FIELD_NUMBER);
.filter(loc -> loc.getPathCount() > 1 && loc.getPath(0) == FileDescriptorProto.MESSAGE_TYPE_FIELD_NUMBER);
}
@Override
public String comment() {
return messageLocations()
.filter(location -> location.getPathCount() > 3)
.filter(
location ->
!ProtobufUtils.collapseLocationComments(location).isEmpty()
&& !isEnumType(location.getPathList()))
.filter(
location -> {
List<Integer> pathList = location.getPathList();
DescriptorProto messageType = fileProto().getMessageType(pathList.get(1));
return messageLocations().filter(location -> location.getPathCount() > 3)
.filter(location -> !ProtobufUtils.collapseLocationComments(location).isEmpty() && !isEnumType(
location.getPathList()))
.filter(location -> {
List<Integer> pathList = location.getPathList();
DescriptorProto messageType = fileProto().getMessageType(pathList.get(1));
if (!isNestedType
&& location.getPath(2) == DescriptorProto.FIELD_FIELD_NUMBER
&& fieldProto == messageType.getField(location.getPath(3))) {
return true;
} else if (isNestedType
&& location.getPath(2) == DescriptorProto.NESTED_TYPE_FIELD_NUMBER
&& fieldProto == getNestedTypeFields(pathList, messageType)) {
return true;
}
return false;
})
if (!isNestedType && location.getPath(2) == DescriptorProto.FIELD_FIELD_NUMBER
&& fieldProto == messageType.getField(location.getPath(3))) {
return true;
} else if (isNestedType && location.getPath(2) == DescriptorProto.NESTED_TYPE_FIELD_NUMBER
&& fieldProto == getNestedTypeFields(pathList, messageType)) {
return true;
}
return false;
})
.map(ProtobufUtils::collapseLocationComments)
.collect(Collectors.joining("\n"))
.trim();
}
private FieldDescriptorProto getNestedTypeFields(
List<Integer> pathList, DescriptorProto messageType) {
private FieldDescriptorProto getNestedTypeFields(List<Integer> pathList, DescriptorProto messageType) {
int pathSize = pathList.size();
List<Integer> nestedValues = new ArrayList<>(pathSize);
for (int index = 0; index < pathSize; index++) {
if (index > 1
&& index % 2 == 0
&& pathList.get(index) == DescriptorProto.NESTED_TYPE_FIELD_NUMBER) {
if (index > 1 && index % 2 == 0 && pathList.get(index) == DescriptorProto.NESTED_TYPE_FIELD_NUMBER) {
nestedValues.add(pathList.get(index + 1));
}
}
@ -278,9 +260,7 @@ public class ProtobufField implements ProtobufElement {
}
int fieldIndex = pathList.get(pathList.size() - 1);
if (isFieldPath(pathList)
&& pathSize % 2 == 0
&& fieldIndex < messageType.getFieldList().size()) {
if (isFieldPath(pathList) && pathSize % 2 == 0 && fieldIndex < messageType.getFieldList().size()) {
return messageType.getField(fieldIndex);
}
@ -293,9 +273,7 @@ public class ProtobufField implements ProtobufElement {
private boolean isEnumType(List<Integer> pathList) {
for (int index = 0; index < pathList.size(); index++) {
if (index > 1
&& index % 2 == 0
&& pathList.get(index) == DescriptorProto.ENUM_TYPE_FIELD_NUMBER) {
if (index > 1 && index % 2 == 0 && pathList.get(index) == DescriptorProto.ENUM_TYPE_FIELD_NUMBER) {
return true;
}
}
@ -330,4 +308,63 @@ public class ProtobufField implements ProtobufElement {
public int hashCode() {
return fullName().hashCode();
}
public boolean isEnum() {
return getFieldProto().getType() == DescriptorProtos.FieldDescriptorProto.Type.TYPE_ENUM;
}
public Optional<DescriptorProtos.EnumDescriptorProto> getEnumDescriptor() {
if (!isEnum()) {
return Optional.empty();
}
String enumTypeName = getFieldProto().getTypeName();
String shortEnumTypeName = enumTypeName.substring(enumTypeName.lastIndexOf('.') + 1);
return getProtobufMessage().fileProto().getEnumTypeList().stream()
.filter(enumType -> enumType.getName().equals(shortEnumTypeName))
.findFirst();
}
public List<DescriptorProtos.EnumValueDescriptorProto> getEnumValues() {
return getEnumDescriptor().map(DescriptorProtos.EnumDescriptorProto::getValueList).orElse(Collections.emptyList());
}
public Map<String, String> getEnumValuesWithComments() {
Optional<DescriptorProtos.EnumDescriptorProto> enumProtoOpt = getEnumDescriptor();
if (enumProtoOpt.isEmpty()) {
return Collections.emptyMap();
}
DescriptorProtos.EnumDescriptorProto enumProto = enumProtoOpt.get();
Map<String, String> valueComments = new LinkedHashMap<>();
List<DescriptorProtos.EnumValueDescriptorProto> values = enumProto.getValueList();
List<DescriptorProtos.SourceCodeInfo.Location> locations =
getProtobufMessage().fileProto().getSourceCodeInfo().getLocationList();
int enumIndex = getProtobufMessage().fileProto().getEnumTypeList().indexOf(enumProto);
for (int i = 0; i < values.size(); i++) {
DescriptorProtos.EnumValueDescriptorProto value = values.get(i);
int finalI = i;
String comment = locations.stream()
.filter(loc -> isEnumValueLocation(loc, enumIndex, finalI))
.findFirst()
.map(ProtobufUtils::collapseLocationComments)
.orElse("");
valueComments.put(value.getName(), comment);
}
return valueComments;
}
private boolean isEnumValueLocation(DescriptorProtos.SourceCodeInfo.Location location, int enumIndex,
int valueIndex) {
return location.getPathCount() > 3
&& location.getPath(0) == DescriptorProtos.FileDescriptorProto.ENUM_TYPE_FIELD_NUMBER
&& location.getPath(1) == enumIndex
&& location.getPath(2) == DescriptorProtos.EnumDescriptorProto.VALUE_FIELD_NUMBER
&& location.getPath(3) == valueIndex;
}
}

View File

@ -1,8 +1,5 @@
package datahub.protobuf.visitors.field;
import static datahub.protobuf.ProtobufUtils.getFieldOptions;
import static datahub.protobuf.ProtobufUtils.getMessageOptions;
import com.linkedin.common.GlobalTags;
import com.linkedin.common.GlossaryTermAssociation;
import com.linkedin.common.GlossaryTermAssociationArray;
@ -13,60 +10,107 @@ import com.linkedin.common.urn.TagUrn;
import com.linkedin.schema.SchemaField;
import com.linkedin.tag.TagProperties;
import com.linkedin.util.Pair;
import datahub.protobuf.model.FieldTypeEdge;
import datahub.protobuf.model.ProtobufElement;
import datahub.protobuf.model.ProtobufField;
import datahub.protobuf.visitors.ProtobufExtensionUtil;
import datahub.protobuf.visitors.VisitContext;
import org.jgrapht.GraphPath;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static datahub.protobuf.ProtobufUtils.getFieldOptions;
import static datahub.protobuf.ProtobufUtils.getMessageOptions;
public class ProtobufExtensionFieldVisitor extends SchemaFieldVisitor {
@Override
public Stream<Pair<SchemaField, Double>> visitField(ProtobufField field, VisitContext context) {
boolean isPrimaryKey =
getFieldOptions(field.getFieldProto()).stream()
.map(Pair::getKey)
.anyMatch(fieldDesc -> fieldDesc.getName().matches("(?i).*primary_?key"));
boolean isPrimaryKey = getFieldOptions(field.getFieldProto()).stream()
.map(Pair::getKey)
.anyMatch(fieldDesc -> fieldDesc.getName().matches("(?i).*primary_?key"));
List<TagAssociation> tags =
Stream.concat(
ProtobufExtensionUtil.extractTagPropertiesFromOptions(
getFieldOptions(field.getFieldProto()), context.getGraph().getRegistry()),
promotedTags(field, context))
.distinct()
.map(tag -> new TagAssociation().setTag(new TagUrn(tag.getName())))
.sorted(Comparator.comparing(t -> t.getTag().getName()))
.collect(Collectors.toList());
List<TagAssociation> tags = getTagAssociations(field, context);
List<GlossaryTermAssociation> terms = getGlossaryTermAssociations(field, context);
List<GlossaryTermAssociation> terms =
Stream.concat(
ProtobufExtensionUtil.extractTermAssociationsFromOptions(
getFieldOptions(field.getFieldProto()), context.getGraph().getRegistry()),
promotedTerms(field, context))
.distinct()
.sorted(Comparator.comparing(a -> a.getUrn().getNameEntity()))
.collect(Collectors.toList());
return context.streamAllPaths(field)
.map(path -> Pair.of(createSchemaField(field, context, path, isPrimaryKey, tags, terms),
context.calculateSortOrder(path, field)));
}
return context
.streamAllPaths(field)
.map(
path ->
Pair.of(
new SchemaField()
.setFieldPath(context.getFieldPath(path))
.setNullable(!isPrimaryKey)
.setIsPartOfKey(isPrimaryKey)
.setDescription(field.comment())
.setNativeDataType(field.nativeType())
.setType(field.schemaFieldDataType())
.setGlobalTags(new GlobalTags().setTags(new TagAssociationArray(tags)))
.setGlossaryTerms(
new GlossaryTerms()
.setTerms(new GlossaryTermAssociationArray(terms))
.setAuditStamp(context.getAuditStamp())),
context.calculateSortOrder(path, field)));
private SchemaField createSchemaField(ProtobufField field, VisitContext context,
GraphPath<ProtobufElement, FieldTypeEdge> path, boolean isPrimaryKey, List<TagAssociation> tags,
List<GlossaryTermAssociation> terms) {
String description = createFieldDescription(field);
return new SchemaField().setFieldPath(context.getFieldPath(path))
.setNullable(!isPrimaryKey)
.setIsPartOfKey(isPrimaryKey)
.setDescription(description)
.setNativeDataType(field.nativeType())
.setType(field.schemaFieldDataType())
.setGlobalTags(new GlobalTags().setTags(new TagAssociationArray(tags)))
.setGlossaryTerms(new GlossaryTerms().setTerms(new GlossaryTermAssociationArray(terms))
.setAuditStamp(context.getAuditStamp()));
}
private String createFieldDescription(ProtobufField field) {
StringBuilder description = new StringBuilder(field.comment());
if (field.isEnum()) {
description.append("\n\n");
Map<String, String> enumValuesWithComments = field.getEnumValuesWithComments();
if (!enumValuesWithComments.isEmpty()) {
appendEnumValues(description, field, enumValuesWithComments);
}
}
return description.toString();
}
private void appendEnumValues(StringBuilder description, ProtobufField field,
Map<String, String> enumValuesWithComments) {
enumValuesWithComments.forEach((name, comment) -> {
field.getEnumValues().stream().filter(v -> v.getName().equals(name)).findFirst().ifPresent(value -> {
description.append(String.format("%d: %s", value.getNumber(), name));
if (!comment.isEmpty()) {
description.append(" - ").append(comment);
}
description.append("\n");
});
});
}
private List<TagAssociation> getTagAssociations(ProtobufField field, VisitContext context) {
Stream<TagAssociation> fieldTags =
ProtobufExtensionUtil.extractTagPropertiesFromOptions(getFieldOptions(field.getFieldProto()),
context.getGraph().getRegistry()).map(tag -> new TagAssociation().setTag(new TagUrn(tag.getName())));
Stream<TagAssociation> promotedTags =
promotedTags(field, context).map(tag -> new TagAssociation().setTag(new TagUrn(tag.getName())));
return Stream.concat(fieldTags, promotedTags)
.distinct()
.sorted(Comparator.comparing(t -> t.getTag().getName()))
.collect(Collectors.toList());
}
private List<GlossaryTermAssociation> getGlossaryTermAssociations(ProtobufField field, VisitContext context) {
Stream<GlossaryTermAssociation> fieldTerms =
ProtobufExtensionUtil.extractTermAssociationsFromOptions(getFieldOptions(field.getFieldProto()),
context.getGraph().getRegistry());
Stream<GlossaryTermAssociation> promotedTerms = promotedTerms(field, context);
return Stream.concat(fieldTerms, promotedTerms)
.distinct()
.sorted(Comparator.comparing(a -> a.getUrn().getNameEntity()))
.collect(Collectors.toList());
}
/**
@ -76,12 +120,11 @@ public class ProtobufExtensionFieldVisitor extends SchemaFieldVisitor {
*/
private Stream<TagProperties> promotedTags(ProtobufField field, VisitContext context) {
if (field.isMessage()) {
return context.getGraph().outgoingEdgesOf(field).stream()
.flatMap(
e ->
ProtobufExtensionUtil.extractTagPropertiesFromOptions(
getMessageOptions(e.getEdgeTarget().messageProto()),
context.getGraph().getRegistry()))
return context.getGraph()
.outgoingEdgesOf(field)
.stream()
.flatMap(e -> ProtobufExtensionUtil.extractTagPropertiesFromOptions(
getMessageOptions(e.getEdgeTarget().messageProto()), context.getGraph().getRegistry()))
.distinct();
} else {
return Stream.of();
@ -95,12 +138,11 @@ public class ProtobufExtensionFieldVisitor extends SchemaFieldVisitor {
*/
private Stream<GlossaryTermAssociation> promotedTerms(ProtobufField field, VisitContext context) {
if (field.isMessage()) {
return context.getGraph().outgoingEdgesOf(field).stream()
.flatMap(
e ->
ProtobufExtensionUtil.extractTermAssociationsFromOptions(
getMessageOptions(e.getEdgeTarget().messageProto()),
context.getGraph().getRegistry()))
return context.getGraph()
.outgoingEdgesOf(field)
.stream()
.flatMap(e -> ProtobufExtensionUtil.extractTermAssociationsFromOptions(
getMessageOptions(e.getEdgeTarget().messageProto()), context.getGraph().getRegistry()))
.distinct();
} else {
return Stream.of();

View File

@ -355,4 +355,20 @@ public class ProtobufFieldTest {
assertEquals("test comment 14", msg3Field14.getDescription());
}
}
@Test
public void timestampUnitEnumDescriptionTest() throws IOException {
ProtobufDataset test = getTestProtobufDataset("extended_protobuf", "messageE");
SchemaMetadata testMetadata = test.getSchemaMetadata();
SchemaField timestampField = testMetadata.getFields()
.stream()
.filter(v -> v.getFieldPath()
.equals("[version=2.0].[type=extended_protobuf_TimestampUnitMessage].[type=enum].timestamp_unit_type"))
.findFirst()
.orElseThrow();
assertEquals("timestamp unit\n" + "\n" + "0: MILLISECOND - 10^-3 seconds\n" + "1: MICROSECOND - 10^-6 seconds\n"
+ "2: NANOSECOND - 10^-9 seconds\n", timestampField.getDescription());
}
}

View File

@ -0,0 +1,19 @@
syntax = "proto3";
package extended_protobuf;
/*
Timestamp unit enum
*/
enum TimestampUnitEnum {
MILLISECOND = 0; // 10^-3 seconds
MICROSECOND = 1; // 10^-6 seconds
NANOSECOND = 2; // 10^-9 seconds
}
/*
Timestamp unit message
*/
message TimestampUnitMessage {
// timestamp unit
TimestampUnitEnum timestamp_unit_type = 1;
}