feat(protobuf): adding deprecation support for datasets and fields (#4634)

This commit is contained in:
leifker 2022-05-04 09:53:08 -05:00 committed by GitHub
parent 23f657ed76
commit c22d52d1bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 403 additions and 100 deletions

View File

@ -22,6 +22,7 @@ enum DataHubMetadataType {
TERM = 3; // Datahub Term
OWNER = 4; // Datahub Owner
DOMAIN = 5; // Datahub Domain
DEPRECATION = 6; // Datahub Deprecation
}
// Assuming Glossary Term defined from bootstrap example
@ -96,6 +97,9 @@ message lifecycle {
bool archived = 5500 [(datahubField.type) = TAG, (datahubField.type) = PROPERTY];
Frequency frequency = 5510 [(datahubField.type) = TAG, (datahubField.type) = PROPERTY];
string ttl = 5520 [(datahubField.type) = TAG];
repeated string deprecation_note = 5530 [(datahubField.type) = DEPRECATION];
uint64 deprecation_time = 5531 [(datahubField.type) = DEPRECATION];
}
}
@ -110,6 +114,9 @@ message message {
string tags = 5600 [(datahubField.type) = TAG_LIST];
MessageType type = 5610 [(datahubField.type) = TAG, (datahubField.type) = PROPERTY];
repeated string deprecation_note = 5620 [(datahubField.type) = DEPRECATION, (datahubField.type) = PROPERTY];
uint64 deprecation_time = 5621 [(datahubField.type) = DEPRECATION, (datahubField.type) = PROPERTY];
}
}

View File

@ -8,6 +8,10 @@ import "google/protobuf/timestamp.proto";
Clickstream impressions
**/
message Impression {
option deprecated = true;
option(meta.lifecycle.deprecation_note) = "Impression events are deprecated.";
option(meta.lifecycle.deprecation_time) = 1649693315;
option(meta.message.type) = EVENT;
option(meta.kafka.topics) = "clickstream_impressions";
@ -37,4 +41,6 @@ message Impression {
(meta.securityField.classification_enum) = HighlyConfidential,
(meta.securityField.classification) = "Classification.Sensitive"
]; // event details
string deprecated_field = 3 [deprecated = true];
}

View File

@ -134,6 +134,7 @@ enum DataHubMetadataType {
TERM = 3; // Datahub Term
OWNER = 4; // Datahub Owner
DOMAIN = 5; // Datahub Domain
DEPRECATION = 6; // Datahub Deprecation
}
/*
@ -210,20 +211,24 @@ message msg {
meta.MetaEnumExample type = 60004 [(meta.fld.type) = TAG, (meta.fld.type) = PROPERTY];
bool bool_feature = 60005 [(meta.fld.type) = TAG];
string alert_channel = 60007 [(meta.fld.type) = PROPERTY];
repeated string deprecation_note = 60008 [(meta.fld.type) = DEPRECATION, (meta.fld.type) = PROPERTY];
uint64 deprecation_time = 60009 [(meta.fld.type) = DEPRECATION, (meta.fld.type) = PROPERTY];
}
}
```
#### DataHubMetadataType
| DataHubMetadataType | String | Bool | Enum | Repeated |
|---------------------|--------|------|------|----------|
| PROPERTY | X | X | X | X |
| TAG | X | X | X | |
| TAG_LIST | X | | | |
| TERM | X | | X | |
| OWNER | X | | | X |
| DOMAIN | X | | | X |
| DataHubMetadataType | String | Bool | Enum | Repeated | Uint64 |
|---------------------|-----------|------|------|-----------|----------|
| PROPERTY | X | X | X | X | |
| TAG | X | X | X | | |
| TAG_LIST | X | | | | |
| TERM | X | | X | | |
| OWNER | X | | | X | |
| DOMAIN | X | | | X | |
| DEPRECATION | X (notes) | | | X (notes) | X (time) |
##### PROPERTY
@ -361,6 +366,33 @@ The dot delimited prefix also works with enum types where the prefix is the enum
}
```
In addition, tags can be added to fields as well as messages. The following is a consolidated example for all the possible tag options on fields.
```protobuf
enum MetaEnumExample {
UNKNOWN = 0;
ENTITY = 1;
EVENT = 2;
}
message fld {
extend google.protobuf.FieldOptions {
string tags = 6000 [(meta.fld.type) = TAG_LIST];
string tagString = 6001 [(meta.fld.type) = TAG];
bool tagBool = 6002 [(meta.fld.type) = TAG];
MetaEnumExample tagEnum = 6003 [(meta.fld.type) = TAG];
}
}
message Message {
uint32 my_field = 1
[(meta.fld.tags) = "a, b, c",
(meta.fld.tagString) = "myTag",
(meta.fld.tagBool) = true,
(meta.fld.tagEnum) = ENTITY];
}
```
##### TERM
Terms are specified by either a fully qualified string value or an enum where the enum type's name is the first element in the fully qualified term name.
@ -387,6 +419,29 @@ The following example shows both methods, either of which would result in the te
}
```
The following is a consolidated example for the possible field level term options.
```protobuf
enum Classification {
HighlyConfidential = 0;
Confidential = 1;
Sensitive = 2;
}
message fld {
extend google.protobuf.FieldOptions {
Classification term = 5000 [(meta.fld.type) = TERM];
string class = 5001 [(meta.fld.type) = TERM];
}
}
message Message {
uint32 my_field = 1
[(meta.fld.term) = HighlyConfidential,
(meta.fld.class) = "Classification.HighlyConfidential"];
}
```
##### OWNER
One or more owners can be specified and can be any combination of `corpUser` and `corpGroup` entities. The default entity type is `corpGroup`. By default, the ownership type is set to `producer`, see the second example for setting the ownership type.
@ -438,6 +493,36 @@ Set the domain id for the dataset. The domain should exist already. Note that th
}
```
##### DEPRECATION
Deprecation of fields and messages are natively supported by protobuf options.
The standard "Deprecation" aspect is used for a dataset generated from a protobuf `message`.
Field deprecation adds a tag with the following urn `urn:li:tag:deprecated` (red, #FF000).
```protobuf
message msg {
extend google.protobuf.MessageOptions {
repeated string deprecation_note = 5620 [(meta.fld.type) = DEPRECATION];
uint64 deprecation_time = 5621 [(meta.fld.type) = DEPRECATION];
}
}
message Message {
option deprecated = true;
option (meta.msg.deprecation_note) = "Deprecated for this other message.";
option (meta.msg.deprecation_note) = "Drop in replacement.";
option (meta.msg.deprecation_time) = 1649689387;
}
```
The field deprecation tag works without definition in `meta.proto` using the native protobuf option.
```protobuf
message Message {
uint32 my_field = 1 [deprecated = true];
}
```
## Gradle Integration
An example application is included which works with the `protobuf-gradle-plugin`, see the standalone [example project](../datahub-protobuf-example).

View File

@ -21,13 +21,13 @@ import datahub.protobuf.visitors.dataset.DomainVisitor;
import datahub.protobuf.visitors.dataset.InstitutionalMemoryVisitor;
import datahub.protobuf.visitors.dataset.KafkaTopicPropertyVisitor;
import datahub.protobuf.visitors.dataset.OwnershipVisitor;
import datahub.protobuf.visitors.dataset.ProtobufExtensionPropertyVisitor;
import datahub.protobuf.visitors.dataset.ProtobufExtensionTagAssocVisitor;
import datahub.protobuf.visitors.dataset.ProtobufExtensionTermAssocVisitor;
import datahub.protobuf.visitors.dataset.PropertyVisitor;
import datahub.protobuf.visitors.dataset.TagAssociationVisitor;
import datahub.protobuf.visitors.dataset.TermAssociationVisitor;
import datahub.protobuf.visitors.field.SchemaFieldVisitor;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.protobuf.visitors.field.ProtobufExtensionFieldVisitor;
import datahub.protobuf.visitors.tags.ProtobufExtensionTagVisitor;
import datahub.protobuf.visitors.tags.TagVisitor;
import javax.annotation.Nullable;
import java.io.IOException;
@ -122,7 +122,7 @@ public class ProtobufDataset {
new ProtobufGraph(fileSet, messageName, filename), schema, auditStamp, fabricType)
.setMetadataChangeProposalVisitors(
List.of(
new ProtobufExtensionTagVisitor()
new TagVisitor()
)
)
.setFieldVisitor(new ProtobufExtensionFieldVisitor())
@ -131,7 +131,7 @@ public class ProtobufDataset {
.datasetPropertyVisitors(
List.of(
new KafkaTopicPropertyVisitor(),
new ProtobufExtensionPropertyVisitor()
new PropertyVisitor()
)
)
.institutionalMemoryMetadataVisitors(
@ -141,12 +141,12 @@ public class ProtobufDataset {
)
.tagAssociationVisitors(
List.of(
new ProtobufExtensionTagAssocVisitor()
new TagAssociationVisitor()
)
)
.termAssociationVisitors(
List.of(
new ProtobufExtensionTermAssocVisitor()
new TermAssociationVisitor()
)
)
.ownershipVisitors(

View File

@ -4,10 +4,15 @@ import com.google.common.collect.ImmutableList;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.ExtensionRegistry;
import com.linkedin.util.Pair;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
@ -35,6 +40,93 @@ public class ProtobufUtils {
return new String(orig.getBytes(StandardCharsets.ISO_8859_1));
}
/*
* Reflection used to prevent an exception deep inside the protobuf library due to a getter method
* mutating the json name field and causing an equality check to fail between an instance that has and has not
* had the getter called.
*
* https://github.com/protocolbuffers/protobuf/blob/main/java/core/src/main/java/com/google/protobuf/Descriptors.java#L1105
*
* java.lang.IllegalArgumentException: FieldDescriptors can only be compared to other FieldDescriptors for fields of the same message type.
* at com.google.protobuf.Descriptors$FieldDescriptor.compareTo(Descriptors.java:1344)
* at com.google.protobuf.Descriptors$FieldDescriptor.compareTo(Descriptors.java:1057)
* at java.base/java.util.TreeMap.put(TreeMap.java:566)
* at java.base/java.util.AbstractMap.putAll(AbstractMap.java:281)
* at java.base/java.util.TreeMap.putAll(TreeMap.java:325)
* at com.google.protobuf.GeneratedMessageV3$ExtendableMessage.getAllFields(GeneratedMessageV3.java:1240)
*
*/
private static final Method FIELD_OPT_EXT_FIELDS_METHOD;
private static final Method FIELD_OPT_ALL_FIELD_METHOD;
private static final Method MSG_OPT_EXT_FIELDS_METHOD;
private static final Method MSG_OPT_ALL_FIELD_METHOD;
static {
try {
FIELD_OPT_EXT_FIELDS_METHOD = DescriptorProtos.FieldOptions.class.getSuperclass()
.getDeclaredMethod("getExtensionFields");
FIELD_OPT_EXT_FIELDS_METHOD.setAccessible(true);
FIELD_OPT_ALL_FIELD_METHOD = DescriptorProtos.FieldOptions.class.getSuperclass().getSuperclass()
.getDeclaredMethod("getAllFieldsMutable", boolean.class);
FIELD_OPT_ALL_FIELD_METHOD.setAccessible(true);
MSG_OPT_EXT_FIELDS_METHOD = DescriptorProtos.MessageOptions.class.getSuperclass()
.getDeclaredMethod("getExtensionFields");
MSG_OPT_EXT_FIELDS_METHOD.setAccessible(true);
MSG_OPT_ALL_FIELD_METHOD = DescriptorProtos.MessageOptions.class.getSuperclass().getSuperclass()
.getDeclaredMethod("getAllFieldsMutable", boolean.class);
MSG_OPT_ALL_FIELD_METHOD.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
public static List<Pair<Descriptors.FieldDescriptor, Object>> getFieldOptions(DescriptorProtos.FieldDescriptorProto fieldProto) {
try {
LinkedList<Pair<Descriptors.FieldDescriptor, Object>> options = new LinkedList<>();
options.addAll(((Map<Descriptors.FieldDescriptor, Object>) FIELD_OPT_EXT_FIELDS_METHOD.invoke(fieldProto.getOptions()))
.entrySet()
.stream()
.map(e -> Pair.of(e.getKey(), e.getValue()))
.collect(Collectors.toList()));
options.addAll(((Map<Descriptors.FieldDescriptor, Object>) FIELD_OPT_ALL_FIELD_METHOD.invoke(fieldProto.getOptions(), false))
.entrySet()
.stream()
.map(e -> Pair.of(e.getKey(), e.getValue()))
.collect(Collectors.toList()));
return options;
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
public static List<Pair<Descriptors.FieldDescriptor, Object>> getMessageOptions(DescriptorProtos.DescriptorProto messageProto) {
try {
LinkedList<Pair<Descriptors.FieldDescriptor, Object>> options = new LinkedList<>();
options.addAll(((Map<Descriptors.FieldDescriptor, Object>) MSG_OPT_EXT_FIELDS_METHOD.invoke(messageProto.getOptions()))
.entrySet()
.stream()
.map(e -> Pair.of(e.getKey(), e.getValue()))
.collect(Collectors.toList()));
options.addAll(((Map<Descriptors.FieldDescriptor, Object>) MSG_OPT_ALL_FIELD_METHOD.invoke(messageProto.getOptions(),
false))
.entrySet()
.stream()
.map(e -> Pair.of(e.getKey(), e.getValue()))
.collect(Collectors.toList()));
return options;
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
public static ExtensionRegistry buildRegistry(DescriptorProtos.FileDescriptorSet fileSet) {
ExtensionRegistry registry = ExtensionRegistry.newInstance();
Map<String, DescriptorProtos.FileDescriptorProto> descriptorProtoMap = fileSet.getFileList().stream()
@ -94,7 +186,9 @@ public class ProtobufUtils {
// Finally, construct the actual descriptor.
Descriptors.FileDescriptor[] empty = new Descriptors.FileDescriptor[0];
return Descriptors.FileDescriptor.buildFrom(descriptorProto, dependencies.build().toArray(empty), false);
Descriptors.FileDescriptor descript = Descriptors.FileDescriptor.buildFrom(descriptorProto, dependencies.build().toArray(empty), false);
descriptorCache.put(descript.getName(), descript);
return descript;
}
}

View File

@ -8,9 +8,11 @@ import com.google.protobuf.InvalidProtocolBufferException;
import com.linkedin.common.GlossaryTermAssociation;
import com.linkedin.common.urn.GlossaryTermUrn;
import com.linkedin.tag.TagProperties;
import com.linkedin.util.Pair;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -30,14 +32,14 @@ public class ProtobufExtensionUtil {
}
public enum DataHubMetadataType {
PROPERTY, TAG, TAG_LIST, TERM, OWNER, DOMAIN;
PROPERTY, TAG, TAG_LIST, TERM, OWNER, DOMAIN, DEPRECATION;
public static final String PROTOBUF_TYPE = "DataHubMetadataType";
}
public static Map<Descriptors.FieldDescriptor, Object> filterByDataHubType(Map<Descriptors.FieldDescriptor, Object> options,
public static List<Pair<Descriptors.FieldDescriptor, Object>> filterByDataHubType(List<Pair<Descriptors.FieldDescriptor, Object>> options,
ExtensionRegistry registry, DataHubMetadataType filterType) {
return options.entrySet().stream()
return options.stream()
.filter(entry -> {
DescriptorProtos.FieldDescriptorProto extendedProtoOptions = extendProto(entry.getKey().toProto(), registry);
Optional<DataHubMetadataType> dataHubMetadataType = extendedProtoOptions.getOptions().getAllFields().entrySet().stream()
@ -55,8 +57,7 @@ public class ProtobufExtensionUtil {
.findFirst();
return filterType.equals(dataHubMetadataType.orElse(DataHubMetadataType.PROPERTY));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}).collect(Collectors.toList());
}
public static Stream<Map.Entry<String, String>> getProperties(Descriptors.FieldDescriptor field, DescriptorProtos.DescriptorProto value) {
@ -67,12 +68,12 @@ public class ProtobufExtensionUtil {
});
}
public static Stream<TagProperties> extractTagPropertiesFromOptions(Map<Descriptors.FieldDescriptor, Object> options, ExtensionRegistry registry) {
Stream<TagProperties> tags = filterByDataHubType(options, registry, DataHubMetadataType.TAG).entrySet().stream()
public static Stream<TagProperties> extractTagPropertiesFromOptions(List<Pair<Descriptors.FieldDescriptor, Object>> options, ExtensionRegistry registry) {
Stream<TagProperties> tags = filterByDataHubType(options, registry, DataHubMetadataType.TAG).stream()
.filter(e -> e.getKey().isExtension())
.flatMap(extEntry -> {
if (extEntry.getKey().isRepeated()) {
return ((Collection<?>) extEntry.getValue()).stream().map(v -> Map.entry(extEntry.getKey(), v));
return ((Collection<?>) extEntry.getValue()).stream().map(v -> Pair.of(extEntry.getKey(), v));
} else {
return Stream.of(extEntry);
}
@ -103,7 +104,7 @@ public class ProtobufExtensionUtil {
}
}).filter(Objects::nonNull);
Stream<TagProperties> tagListTags = filterByDataHubType(options, registry, DataHubMetadataType.TAG_LIST).entrySet().stream()
Stream<TagProperties> tagListTags = filterByDataHubType(options, registry, DataHubMetadataType.TAG_LIST).stream()
.filter(e -> e.getKey().isExtension())
.flatMap(entry -> {
switch (entry.getKey().getJavaType()) {
@ -117,16 +118,26 @@ public class ProtobufExtensionUtil {
}
}).filter(Objects::nonNull);
return Stream.concat(tags, tagListTags);
Stream<TagProperties> deprecationTag;
if (options.stream().anyMatch(opt -> opt.getKey().getFullName().endsWith(".deprecated")
&& opt.getKey().getFullName().startsWith("google.protobuf.")
&& opt.getKey().getJavaType() == Descriptors.FieldDescriptor.JavaType.BOOLEAN
&& (Boolean) opt.getValue())) {
deprecationTag = Stream.of(new TagProperties().setName("deprecated").setColorHex("#FF0000"));
} else {
deprecationTag = Stream.empty();
}
public static Stream<GlossaryTermAssociation> extractTermAssociationsFromOptions(Map<Descriptors.FieldDescriptor, Object> options,
return Stream.of(tags, tagListTags, deprecationTag).reduce(Stream::concat).orElse(Stream.empty());
}
public static Stream<GlossaryTermAssociation> extractTermAssociationsFromOptions(List<Pair<Descriptors.FieldDescriptor, Object>> fieldOptions,
ExtensionRegistry registry) {
return filterByDataHubType(options, registry, DataHubMetadataType.TERM).entrySet().stream()
return filterByDataHubType(fieldOptions, registry, DataHubMetadataType.TERM).stream()
.filter(e -> e.getKey().isExtension())
.flatMap(extEntry -> {
if (extEntry.getKey().isRepeated()) {
return ((Collection<?>) extEntry.getValue()).stream().map(v -> Map.entry(extEntry.getKey(), v));
return ((Collection<?>) extEntry.getValue()).stream().map(v -> Pair.of(extEntry.getKey(), v));
} else {
return Stream.of(extEntry);
}

View File

@ -1,5 +1,6 @@
package datahub.protobuf.visitors.dataset;
import com.linkedin.common.Deprecation;
import com.linkedin.common.GlobalTags;
import com.linkedin.common.GlossaryTermAssociation;
import com.linkedin.common.GlossaryTermAssociationArray;
@ -30,6 +31,7 @@ import lombok.Builder;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -53,6 +55,8 @@ public class DatasetVisitor implements ProtobufModelVisitor<MetadataChangePropos
private final String protocBase64 = "";
@Builder.Default
private final ProtobufModelVisitor<String> descriptionVisitor = new DescriptionVisitor();
@Builder.Default
private final ProtobufModelVisitor<Deprecation> deprecationVisitor = new DeprecationVisitor();
@Override
public Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>> visitGraph(VisitContext context) {
@ -90,7 +94,10 @@ public class DatasetVisitor implements ProtobufModelVisitor<MetadataChangePropos
)).setLastModified(context.getAuditStamp()), "ownership"),
new MetadataChangeProposalWrapper<>(DatasetUrn.ENTITY_TYPE, datasetUrn, ChangeType.UPSERT,
new Domains(new DataMap(Map.of("domains",
new UrnArray(g.accept(context, domainVisitors).collect(Collectors.toList())).data()))), "domains")
);
new UrnArray(g.accept(context, domainVisitors).collect(Collectors.toList())).data()))), "domains"),
g.accept(context, List.of(deprecationVisitor)).findFirst()
.map(dep -> new MetadataChangeProposalWrapper<>(DatasetUrn.ENTITY_TYPE, datasetUrn, ChangeType.UPSERT,
dep, "deprecation")).orElse(null)
).filter(Objects::nonNull);
}
}

View File

@ -0,0 +1,53 @@
package datahub.protobuf.visitors.dataset;
import com.google.protobuf.Descriptors;
import com.linkedin.common.Deprecation;
import com.linkedin.util.Pair;
import datahub.protobuf.visitors.ProtobufExtensionUtil;
import datahub.protobuf.visitors.ProtobufModelVisitor;
import datahub.protobuf.visitors.VisitContext;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static datahub.protobuf.ProtobufUtils.getMessageOptions;
public class DeprecationVisitor implements ProtobufModelVisitor<Deprecation> {
@Override
public Stream<Deprecation> visitGraph(VisitContext context) {
if (context.root().messageProto().getOptions().getDeprecated()) {
List<Pair<Descriptors.FieldDescriptor, Object>> deprecationOptions = ProtobufExtensionUtil
.filterByDataHubType(getMessageOptions(context.root().messageProto()),
context.getGraph().getRegistry(), ProtobufExtensionUtil.DataHubMetadataType.DEPRECATION);
String decommissionNote = deprecationOptions.stream()
.filter(opt -> opt.getKey().getJavaType() == Descriptors.FieldDescriptor.JavaType.STRING)
.flatMap(opt -> {
if (opt.getKey().isRepeated()) {
return ((Collection<String>) opt.getValue()).stream();
} else {
return Stream.of(opt.getValue());
}
})
.map(Object::toString)
.collect(Collectors.joining("\n"));
Optional<Long> decommissionTime = deprecationOptions.stream()
.filter(opt -> opt.getKey().getJavaType() == Descriptors.FieldDescriptor.JavaType.LONG)
.map(opt -> (Long) opt.getValue())
.findFirst();
return Stream.of(new Deprecation()
.setDeprecated(true)
.setNote(decommissionNote)
.setDecommissionTime(decommissionTime.orElse(0L))
.setActor(context.getAuditStamp().getActor()));
} else {
return Stream.empty();
}
}
}

View File

@ -1,19 +1,22 @@
package datahub.protobuf.visitors.dataset;
import com.linkedin.common.urn.Urn;
import com.linkedin.util.Pair;
import datahub.protobuf.visitors.ProtobufExtensionUtil;
import datahub.protobuf.visitors.ProtobufModelVisitor;
import datahub.protobuf.visitors.VisitContext;
import java.util.stream.Stream;
import static datahub.protobuf.ProtobufUtils.getMessageOptions;
public class DomainVisitor implements ProtobufModelVisitor<Urn> {
@Override
public Stream<Urn> visitGraph(VisitContext context) {
return ProtobufExtensionUtil.filterByDataHubType(context.root().messageProto()
.getOptions().getAllFields(), context.getGraph().getRegistry(), ProtobufExtensionUtil.DataHubMetadataType.DOMAIN)
.values().stream().map(o ->
return ProtobufExtensionUtil.filterByDataHubType(getMessageOptions(context.root().messageProto()),
context.getGraph().getRegistry(), ProtobufExtensionUtil.DataHubMetadataType.DOMAIN)
.stream().map(Pair::getValue).map(o ->
Urn.createFromTuple("domain", ((String) o).toLowerCase())
);
}

View File

@ -15,13 +15,15 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import static datahub.protobuf.ProtobufUtils.getMessageOptions;
public class OwnershipVisitor implements ProtobufModelVisitor<Owner> {
@Override
public Stream<Owner> visitGraph(VisitContext context) {
return ProtobufExtensionUtil.filterByDataHubType(context.root().messageProto()
.getOptions().getAllFields(), context.getGraph().getRegistry(), ProtobufExtensionUtil.DataHubMetadataType.OWNER)
.entrySet().stream()
return ProtobufExtensionUtil.filterByDataHubType(getMessageOptions(context.root().messageProto()), context.getGraph().getRegistry(),
ProtobufExtensionUtil.DataHubMetadataType.OWNER)
.stream()
.flatMap(extEntry -> {
if (extEntry.getKey().isRepeated()) {
return ((Collection<String>) extEntry.getValue()).stream().map(v -> Map.entry(extEntry.getKey(), v));

View File

@ -14,17 +14,18 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static datahub.protobuf.ProtobufUtils.getMessageOptions;
import static datahub.protobuf.visitors.ProtobufExtensionUtil.getProperties;
public class ProtobufExtensionPropertyVisitor implements ProtobufModelVisitor<DatasetProperties> {
public class PropertyVisitor implements ProtobufModelVisitor<DatasetProperties> {
private static final Gson GSON = new Gson();
@Override
public Stream<DatasetProperties> visitGraph(VisitContext context) {
Map<String, String> properties = ProtobufExtensionUtil.filterByDataHubType(context.root().messageProto()
.getOptions().getAllFields(), context.getGraph().getRegistry(), ProtobufExtensionUtil.DataHubMetadataType.PROPERTY)
.entrySet().stream().flatMap(fd -> {
Map<String, String> properties = ProtobufExtensionUtil.filterByDataHubType(getMessageOptions(context.root().messageProto()),
context.getGraph().getRegistry(), ProtobufExtensionUtil.DataHubMetadataType.PROPERTY)
.stream().flatMap(fd -> {
if (fd.getKey().getJavaType() != Descriptors.FieldDescriptor.JavaType.MESSAGE) {
if (fd.getKey().isRepeated()) {
return Stream.of(Map.entry(fd.getKey().getName(), GSON.toJson(

View File

@ -8,13 +8,15 @@ import datahub.protobuf.visitors.VisitContext;
import java.util.stream.Stream;
import static datahub.protobuf.ProtobufUtils.getMessageOptions;
public class ProtobufExtensionTagAssocVisitor implements ProtobufModelVisitor<TagAssociation> {
public class TagAssociationVisitor implements ProtobufModelVisitor<TagAssociation> {
@Override
public Stream<TagAssociation> visitGraph(VisitContext context) {
return ProtobufExtensionUtil.extractTagPropertiesFromOptions(context.root().messageProto().getOptions()
.getAllFields(), context.getGraph().getRegistry())
return ProtobufExtensionUtil.extractTagPropertiesFromOptions(getMessageOptions(context.root().messageProto()),
context.getGraph().getRegistry())
.map(tag -> new TagAssociation().setTag(new TagUrn(tag.getName())));
}
}

View File

@ -7,11 +7,13 @@ import datahub.protobuf.visitors.VisitContext;
import java.util.stream.Stream;
public class ProtobufExtensionTermAssocVisitor implements ProtobufModelVisitor<GlossaryTermAssociation> {
import static datahub.protobuf.ProtobufUtils.getMessageOptions;
public class TermAssociationVisitor implements ProtobufModelVisitor<GlossaryTermAssociation> {
@Override
public Stream<GlossaryTermAssociation> visitGraph(VisitContext context) {
return ProtobufExtensionUtil.extractTermAssociationsFromOptions(context.root().messageProto().getOptions().getAllFields(),
return ProtobufExtensionUtil.extractTermAssociationsFromOptions(getMessageOptions(context.root().messageProto()),
context.getGraph().getRegistry());
}
}

View File

@ -19,16 +19,19 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static datahub.protobuf.ProtobufUtils.getFieldOptions;
import static datahub.protobuf.ProtobufUtils.getMessageOptions;
public class ProtobufExtensionFieldVisitor extends SchemaFieldVisitor {
@Override
public Stream<Pair<SchemaField, Double>> visitField(ProtobufField field, VisitContext context) {
boolean isPrimaryKey = field.getFieldProto().getOptions().getAllFields().keySet().stream()
boolean isPrimaryKey = getFieldOptions(field.getFieldProto()).stream().map(Pair::getKey)
.anyMatch(fieldDesc -> fieldDesc.getName().matches("(?i).*primary_?key"));
List<TagAssociation> tags = Stream.concat(
ProtobufExtensionUtil.extractTagPropertiesFromOptions(
field.getFieldProto().getOptions().getAllFields(),
getFieldOptions(field.getFieldProto()),
context.getGraph().getRegistry()),
promotedTags(field, context))
.distinct().map(tag -> new TagAssociation().setTag(new TagUrn(tag.getName())))
@ -37,7 +40,7 @@ public class ProtobufExtensionFieldVisitor extends SchemaFieldVisitor {
List<GlossaryTermAssociation> terms = Stream.concat(
ProtobufExtensionUtil.extractTermAssociationsFromOptions(
field.getFieldProto().getOptions().getAllFields(), context.getGraph().getRegistry()),
getFieldOptions(field.getFieldProto()), context.getGraph().getRegistry()),
promotedTerms(field, context))
.distinct()
.sorted(Comparator.comparing(a -> a.getUrn().getNameEntity()))
@ -65,8 +68,8 @@ public class ProtobufExtensionFieldVisitor extends SchemaFieldVisitor {
private Stream<TagProperties> promotedTags(ProtobufField field, VisitContext context) {
if (field.isMessage()) {
return context.getGraph().outgoingEdgesOf(field).stream().flatMap(e ->
ProtobufExtensionUtil.extractTagPropertiesFromOptions(e.getEdgeTarget().messageProto()
.getOptions().getAllFields(), context.getGraph().getRegistry())
ProtobufExtensionUtil.extractTagPropertiesFromOptions(getMessageOptions(e.getEdgeTarget().messageProto()),
context.getGraph().getRegistry())
).distinct();
} else {
return Stream.of();
@ -80,8 +83,8 @@ public class ProtobufExtensionFieldVisitor extends SchemaFieldVisitor {
private Stream<GlossaryTermAssociation> promotedTerms(ProtobufField field, VisitContext context) {
if (field.isMessage()) {
return context.getGraph().outgoingEdgesOf(field).stream().flatMap(e ->
ProtobufExtensionUtil.extractTermAssociationsFromOptions(e.getEdgeTarget().messageProto()
.getOptions().getAllFields(), context.getGraph().getRegistry())
ProtobufExtensionUtil.extractTermAssociationsFromOptions(getMessageOptions(e.getEdgeTarget().messageProto()),
context.getGraph().getRegistry())
).distinct();
} else {
return Stream.of();

View File

@ -10,23 +10,26 @@ import datahub.protobuf.visitors.ProtobufExtensionUtil;
import datahub.protobuf.visitors.VisitContext;
import datahub.event.MetadataChangeProposalWrapper;
import static datahub.protobuf.ProtobufUtils.getFieldOptions;
import static datahub.protobuf.ProtobufUtils.getMessageOptions;
import java.util.stream.Stream;
public class ProtobufExtensionTagVisitor implements ProtobufModelVisitor<MetadataChangeProposalWrapper<? extends RecordTemplate>> {
public class TagVisitor implements ProtobufModelVisitor<MetadataChangeProposalWrapper<? extends RecordTemplate>> {
private static final String TAG_PROPERTIES_ASPECT = "tagProperties";
@Override
public Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>> visitGraph(VisitContext context) {
return ProtobufExtensionUtil.extractTagPropertiesFromOptions(context.root().messageProto().getOptions()
.getAllFields(), context.getGraph().getRegistry())
.map(ProtobufExtensionTagVisitor::wrapTagProperty);
return ProtobufExtensionUtil.extractTagPropertiesFromOptions(getMessageOptions(context.root().messageProto()),
context.getGraph().getRegistry())
.map(TagVisitor::wrapTagProperty);
}
@Override
public Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>> visitField(ProtobufField field, VisitContext context) {
return ProtobufExtensionUtil.extractTagPropertiesFromOptions(field.getFieldProto().getOptions().getAllFields(),
return ProtobufExtensionUtil.extractTagPropertiesFromOptions(getFieldOptions(field.getFieldProto()),
context.getGraph().getRegistry())
.map(ProtobufExtensionTagVisitor::wrapTagProperty);
.map(TagVisitor::wrapTagProperty);
}
private static MetadataChangeProposalWrapper<TagProperties> wrapTagProperty(TagProperties tagProperty) {

View File

@ -11,41 +11,48 @@ import java.util.stream.Collectors;
import static datahub.protobuf.TestFixtures.getTestProtobufGraph;
import static datahub.protobuf.TestFixtures.getVisitContextBuilder;
import static java.util.Map.entry;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ProtobufExtensionPropertyVisitorTest {
public class PropertyVisitorTest {
@Test
public void extendedMessageTest() throws IOException {
ProtobufExtensionPropertyVisitor test = new ProtobufExtensionPropertyVisitor();
PropertyVisitor test = new PropertyVisitor();
List<DatasetProperties> actual = getTestProtobufGraph("extended_protobuf", "messageA")
.accept(getVisitContextBuilder("extended_protobuf.Person"),
List.of(test)).collect(Collectors.toList());
assertEquals(List.of(
new DatasetProperties().setCustomProperties(new StringMap(Map.of("classification_enum", "HighlyConfidential",
"bool_feature", "true",
"alert_channel", "#alerts",
"repeat_enum", "[\"ENTITY\",\"EVENT\"]",
"team", "[\"corpGroup:TeamB\",\"corpUser:datahub\"]",
"technical_owner", "[\"corpGroup:TechnicalOwner\"]",
"tag_list", "a, b, c",
"domain", "Engineering",
"repeat_string", "[\"a\",\"b\"]",
"type", "ENTITY")))),
new DatasetProperties().setCustomProperties(new StringMap(Map.ofEntries(
entry("classification_enum", "HighlyConfidential"),
entry("bool_feature", "true"),
entry("alert_channel", "#alerts"),
entry("repeat_enum", "[\"ENTITY\",\"EVENT\"]"),
entry("team", "[\"corpGroup:TeamB\",\"corpUser:datahub\"]"),
entry("technical_owner", "[\"corpGroup:TechnicalOwner\"]"),
entry("tag_list", "a, b, c"),
entry("domain", "Engineering"),
entry("repeat_string", "[\"a\",\"b\"]"),
entry("type", "ENTITY"))))),
actual);
}
@Test
public void extendedFieldTest() throws IOException {
ProtobufExtensionPropertyVisitor test = new ProtobufExtensionPropertyVisitor();
PropertyVisitor test = new PropertyVisitor();
List<DatasetProperties> actual = getTestProtobufGraph("extended_protobuf", "messageB")
.accept(getVisitContextBuilder("extended_protobuf.Person"),
List.of(test)).collect(Collectors.toList());
assertEquals(List.of(new DatasetProperties()
.setCustomProperties(new StringMap(Map.of("data_steward", "corpUser:datahub")))), actual);
.setCustomProperties(new StringMap(Map.ofEntries(
entry("data_steward", "corpUser:datahub"),
entry("deprecated", "true"),
entry("deprecation_note", "[\"Deprecated for this other message.\",\"Drop in replacement.\"]"),
entry("deprecation_time", "1649689387")
)))), actual);
}
}

View File

@ -14,11 +14,11 @@ import static datahub.protobuf.TestFixtures.getVisitContextBuilder;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ProtobufExtensionTermAssocVisitorTest {
public class TermAssociationVisitorTest {
@Test
public void extendedMessageTest() throws IOException {
ProtobufExtensionTermAssocVisitor test = new ProtobufExtensionTermAssocVisitor();
TermAssociationVisitor test = new TermAssociationVisitor();
assertEquals(Set.of(
new GlossaryTermAssociation().setUrn(new GlossaryTermUrn("a")),
new GlossaryTermAssociation().setUrn(new GlossaryTermUrn("b")),
@ -33,7 +33,7 @@ public class ProtobufExtensionTermAssocVisitorTest {
@Test
public void extendedFieldTest() throws IOException {
ProtobufExtensionTermAssocVisitor test = new ProtobufExtensionTermAssocVisitor();
TermAssociationVisitor test = new TermAssociationVisitor();
assertEquals(
Set.of(),
getTestProtobufGraph("extended_protobuf", "messageB").

View File

@ -206,6 +206,7 @@ public class ProtobufExtensionFieldVisitorTest {
.setGlobalTags(new GlobalTags().setTags(new TagAssociationArray(
new TagAssociation().setTag(new TagUrn("MetaEnumExample.EVENT")),
new TagAssociation().setTag(new TagUrn("d")),
new TagAssociation().setTag(new TagUrn("deprecated")),
new TagAssociation().setTag(new TagUrn("e")),
new TagAssociation().setTag(new TagUrn("f")),
new TagAssociation().setTag(new TagUrn("product_type.my type")),

View File

@ -1,7 +1,7 @@
package datahub.protobuf.visitors.tag;
import com.linkedin.tag.TagProperties;
import datahub.protobuf.visitors.tags.ProtobufExtensionTagVisitor;
import datahub.protobuf.visitors.tags.TagVisitor;
import datahub.event.MetadataChangeProposalWrapper;
import org.junit.Test;
@ -15,11 +15,11 @@ import static datahub.protobuf.TestFixtures.getVisitContextBuilder;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ProtobufExtensionTagVisitorTest {
public class TagVisitorTest {
@Test
public void extendedMessageTest() throws IOException {
ProtobufExtensionTagVisitor test = new ProtobufExtensionTagVisitor();
TagVisitor test = new TagVisitor();
assertEquals(Set.of(
new TagProperties()
.setName("bool_feature")
@ -44,7 +44,10 @@ public class ProtobufExtensionTagVisitorTest {
.setDescription("meta.msg.repeat_string"),
new TagProperties()
.setName("repeat_string.b")
.setDescription("meta.msg.repeat_string")
.setDescription("meta.msg.repeat_string"),
new TagProperties()
.setName("deprecated")
.setColorHex("#FF0000")
), getTestProtobufGraph("extended_protobuf", "messageA")
.accept(getVisitContextBuilder("extended_protobuf.Person"), List.of(test))
.map(MetadataChangeProposalWrapper::getAspect)
@ -71,12 +74,15 @@ public class ProtobufExtensionTagVisitorTest {
.setDescription("meta.fld.tag_list"),
new TagProperties()
.setName("f")
.setDescription("meta.fld.tag_list")
.setDescription("meta.fld.tag_list"),
new TagProperties()
.setName("deprecated")
.setColorHex("#FF0000")
);
assertEquals(expectedTagProperties,
getTestProtobufGraph("extended_protobuf", "messageB")
.accept(getVisitContextBuilder("extended_protobuf.Person"), List.of(new ProtobufExtensionTagVisitor()))
.accept(getVisitContextBuilder("extended_protobuf.Person"), List.of(new TagVisitor()))
.map(MetadataChangeProposalWrapper::getAspect)
.collect(Collectors.toSet()));
}

View File

@ -14,6 +14,11 @@ message Department {
The comment added after thought
*/
message Person {
option deprecated = true;
option (meta.msg.deprecation_note) = "Deprecated for this other message.";
option (meta.msg.deprecation_note) = "Drop in replacement.";
option (meta.msg.deprecation_time) = 1649689387;
option(meta.msg.data_steward) = "corpUser:datahub";
string name = 1 [(meta.fld.classification) = "Classification.HighlyConfidential"]; // person name
@ -29,7 +34,8 @@ message Person {
Department dept = 4; // department name of the person
string test_coverage = 5
[(meta.fld.product_type_bool) = true,
[deprecated = true,
(meta.fld.product_type_bool) = true,
(meta.fld.product_type) = "my type",
(meta.fld.product_type_enum) = EVENT,
(meta.fld.tag_list) = "d, e, f"];

View File

@ -22,6 +22,7 @@ enum DataHubMetadataType {
TERM = 3; // Datahub Term
OWNER = 4; // Datahub Owner
DOMAIN = 5; // Datahub Domain
DEPRECATION = 6; // Datahub Deprecation
}
/*
@ -91,5 +92,8 @@ message msg {
repeated string repeat_string = 60010 [(fld.type) = TERM, (fld.type) = TAG, (fld.type) = PROPERTY];
repeated MetaEnumExample repeat_enum = 60012 [(fld.type) = TERM, (fld.type) = TAG, (fld.type) = PROPERTY];
repeated string deprecation_note = 60020 [(fld.type) = DEPRECATION, (fld.type) = PROPERTY];
uint64 deprecation_time = 60021 [(fld.type) = DEPRECATION, (fld.type) = PROPERTY];
}
}