mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-16 12:38:13 +00:00
refactor(build): Remove unnecessary ext modules. (#3074)
This commit is contained in:
parent
2264547a5b
commit
f3fc0970f3
@ -2,14 +2,11 @@ package com.linkedin.metadata.dao.producer;
|
|||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.linkedin.common.urn.Urn;
|
import com.linkedin.common.urn.Urn;
|
||||||
import com.linkedin.data.template.RecordTemplate;
|
|
||||||
import com.linkedin.metadata.EventUtils;
|
import com.linkedin.metadata.EventUtils;
|
||||||
import com.linkedin.metadata.dao.exception.ModelConversionException;
|
import com.linkedin.metadata.dao.exception.ModelConversionException;
|
||||||
import com.linkedin.metadata.dao.utils.ModelUtils;
|
|
||||||
import com.linkedin.metadata.event.EntityEventProducer;
|
import com.linkedin.metadata.event.EntityEventProducer;
|
||||||
import com.linkedin.metadata.models.AspectSpec;
|
import com.linkedin.metadata.models.AspectSpec;
|
||||||
import com.linkedin.metadata.snapshot.Snapshot;
|
import com.linkedin.metadata.snapshot.Snapshot;
|
||||||
import com.linkedin.mxe.Configs;
|
|
||||||
import com.linkedin.mxe.MetadataAuditEvent;
|
import com.linkedin.mxe.MetadataAuditEvent;
|
||||||
import com.linkedin.mxe.MetadataAuditOperation;
|
import com.linkedin.mxe.MetadataAuditOperation;
|
||||||
import com.linkedin.mxe.MetadataChangeLog;
|
import com.linkedin.mxe.MetadataChangeLog;
|
||||||
@ -18,7 +15,6 @@ import com.linkedin.mxe.TopicConvention;
|
|||||||
import com.linkedin.mxe.TopicConventionImpl;
|
import com.linkedin.mxe.TopicConventionImpl;
|
||||||
import com.linkedin.mxe.Topics;
|
import com.linkedin.mxe.Topics;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
@ -26,7 +22,6 @@ import javax.annotation.Nullable;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
import org.apache.avro.specific.SpecificRecord;
|
|
||||||
import org.apache.kafka.clients.producer.Callback;
|
import org.apache.kafka.clients.producer.Callback;
|
||||||
import org.apache.kafka.clients.producer.Producer;
|
import org.apache.kafka.clients.producer.Producer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
@ -153,52 +148,6 @@ public class EntityKafkaMetadataEventProducer implements EntityEventProducer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void produceAspectSpecificMetadataAuditEvent(@Nonnull final Urn urn, @Nullable final RecordTemplate oldValue,
|
|
||||||
@Nonnull final RecordTemplate newValue, @Nullable final SystemMetadata oldSystemMetadata,
|
|
||||||
@Nullable final SystemMetadata newSystemMetadata, @Nonnull final MetadataAuditOperation operation) {
|
|
||||||
// TODO switch to convention once versions are annotated in the schema
|
|
||||||
final String topicKey = ModelUtils.getAspectSpecificMAETopicName(urn, newValue);
|
|
||||||
if (!isValidAspectSpecificTopic(topicKey)) {
|
|
||||||
log.warn("The event topic for entity {} and aspect {}, expected to be {}, has not been registered.",
|
|
||||||
urn.getClass().getCanonicalName(), newValue.getClass().getCanonicalName(), topicKey);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
String topic;
|
|
||||||
Class<? extends SpecificRecord> maeAvroClass;
|
|
||||||
RecordTemplate metadataAuditEvent;
|
|
||||||
try {
|
|
||||||
topic = (String) Topics.class.getField(topicKey).get(null);
|
|
||||||
maeAvroClass = Configs.TOPIC_SCHEMA_CLASS_MAP.get(topic);
|
|
||||||
metadataAuditEvent = (RecordTemplate) EventUtils.getPegasusClass(maeAvroClass).newInstance();
|
|
||||||
|
|
||||||
metadataAuditEvent.getClass().getMethod("setUrn", urn.getClass()).invoke(metadataAuditEvent, urn);
|
|
||||||
metadataAuditEvent.getClass().getMethod("setNewValue", newValue.getClass()).invoke(metadataAuditEvent, newValue);
|
|
||||||
if (oldValue != null) {
|
|
||||||
metadataAuditEvent.getClass()
|
|
||||||
.getMethod("setOldValue", oldValue.getClass())
|
|
||||||
.invoke(metadataAuditEvent, oldValue);
|
|
||||||
}
|
|
||||||
} catch (NoSuchFieldException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException
|
|
||||||
| InstantiationException | InvocationTargetException e) {
|
|
||||||
throw new IllegalArgumentException("Failed to compose the Pegasus aspect specific MAE", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
GenericRecord record;
|
|
||||||
try {
|
|
||||||
record = EventUtils.pegasusToAvroAspectSpecificMXE(maeAvroClass, metadataAuditEvent);
|
|
||||||
} catch (NoSuchFieldException | IOException | IllegalAccessException e) {
|
|
||||||
throw new ModelConversionException("Failed to convert Pegasus aspect specific MAE to Avro", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_callback.isPresent()) {
|
|
||||||
_producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get());
|
|
||||||
} else {
|
|
||||||
_producer.send(new ProducerRecord(topic, urn.toString(), record));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static boolean isValidAspectSpecificTopic(@Nonnull String topic) {
|
static boolean isValidAspectSpecificTopic(@Nonnull String topic) {
|
||||||
return Arrays.stream(Topics.class.getFields()).anyMatch(field -> field.getName().equals(topic));
|
return Arrays.stream(Topics.class.getFields()).anyMatch(field -> field.getName().equals(topic));
|
||||||
|
|||||||
@ -9,14 +9,12 @@ import com.linkedin.metadata.dao.exception.ModelConversionException;
|
|||||||
import com.linkedin.metadata.dao.utils.ModelUtils;
|
import com.linkedin.metadata.dao.utils.ModelUtils;
|
||||||
import com.linkedin.metadata.dao.utils.RecordUtils;
|
import com.linkedin.metadata.dao.utils.RecordUtils;
|
||||||
import com.linkedin.metadata.snapshot.Snapshot;
|
import com.linkedin.metadata.snapshot.Snapshot;
|
||||||
import com.linkedin.mxe.Configs;
|
|
||||||
import com.linkedin.mxe.MetadataAuditEvent;
|
import com.linkedin.mxe.MetadataAuditEvent;
|
||||||
import com.linkedin.mxe.MetadataChangeEvent;
|
import com.linkedin.mxe.MetadataChangeEvent;
|
||||||
import com.linkedin.mxe.TopicConvention;
|
import com.linkedin.mxe.TopicConvention;
|
||||||
import com.linkedin.mxe.TopicConventionImpl;
|
import com.linkedin.mxe.TopicConventionImpl;
|
||||||
import com.linkedin.mxe.Topics;
|
import com.linkedin.mxe.Topics;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -26,7 +24,6 @@ import javax.annotation.Nullable;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
import org.apache.avro.specific.SpecificRecord;
|
|
||||||
import org.apache.kafka.clients.producer.Callback;
|
import org.apache.kafka.clients.producer.Callback;
|
||||||
import org.apache.kafka.clients.producer.Producer;
|
import org.apache.kafka.clients.producer.Producer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
@ -130,46 +127,9 @@ public class KafkaMetadataEventProducer<SNAPSHOT extends RecordTemplate, ASPECT_
|
|||||||
@Override
|
@Override
|
||||||
public <ASPECT extends RecordTemplate> void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn,
|
public <ASPECT extends RecordTemplate> void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn,
|
||||||
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue) {
|
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue) {
|
||||||
// TODO switch to convention once versions are annotated in the schema
|
// Aspect Specific MAE not supported.
|
||||||
final String topicKey = ModelUtils.getAspectSpecificMAETopicName(urn, newValue);
|
// TODO: Remove references to this class.
|
||||||
if (!isValidAspectSpecificTopic(topicKey)) {
|
throw new UnsupportedOperationException();
|
||||||
log.warn("The event topic for entity {} and aspect {}, expected to be {}, has not been registered.",
|
|
||||||
urn.getClass().getCanonicalName(), newValue.getClass().getCanonicalName(), topicKey);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
String topic;
|
|
||||||
Class<? extends SpecificRecord> maeAvroClass;
|
|
||||||
RecordTemplate metadataAuditEvent;
|
|
||||||
try {
|
|
||||||
topic = (String) Topics.class.getField(topicKey).get(null);
|
|
||||||
maeAvroClass = Configs.TOPIC_SCHEMA_CLASS_MAP.get(topic);
|
|
||||||
metadataAuditEvent = (RecordTemplate) EventUtils.getPegasusClass(maeAvroClass).newInstance();
|
|
||||||
|
|
||||||
metadataAuditEvent.getClass().getMethod("setUrn", urn.getClass()).invoke(metadataAuditEvent, urn);
|
|
||||||
metadataAuditEvent.getClass().getMethod("setNewValue", newValue.getClass()).invoke(metadataAuditEvent, newValue);
|
|
||||||
if (oldValue != null) {
|
|
||||||
metadataAuditEvent.getClass()
|
|
||||||
.getMethod("setOldValue", oldValue.getClass())
|
|
||||||
.invoke(metadataAuditEvent, oldValue);
|
|
||||||
}
|
|
||||||
} catch (NoSuchFieldException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException
|
|
||||||
| InstantiationException | InvocationTargetException e) {
|
|
||||||
throw new IllegalArgumentException("Failed to compose the Pegasus aspect specific MAE", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
GenericRecord record;
|
|
||||||
try {
|
|
||||||
record = EventUtils.pegasusToAvroAspectSpecificMXE(maeAvroClass, metadataAuditEvent);
|
|
||||||
} catch (NoSuchFieldException | IOException | IllegalAccessException e) {
|
|
||||||
throw new ModelConversionException("Failed to convert Pegasus aspect specific MAE to Avro", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_callback.isPresent()) {
|
|
||||||
_producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get());
|
|
||||||
} else {
|
|
||||||
_producer.send(new ProducerRecord(topic, urn.toString(), record));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
|
|||||||
@ -9,7 +9,6 @@ import java.util.Collections;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.specific.SpecificRecord;
|
|
||||||
|
|
||||||
|
|
||||||
public class Configs {
|
public class Configs {
|
||||||
@ -36,41 +35,6 @@ public class Configs {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
public static final Map<String, Class<? extends SpecificRecord>> TOPIC_SCHEMA_CLASS_MAP =
|
|
||||||
Collections.unmodifiableMap(new HashMap<String, Class<? extends SpecificRecord>>() {
|
|
||||||
{
|
|
||||||
// Aspect-specific MCE topic to schema.
|
|
||||||
// CorpGroupUrn
|
|
||||||
put(Topics.METADATA_AUDIT_EVENT_CORPGROUP_CORPGROUPINFO,
|
|
||||||
com.linkedin.pegasus2avro.mxe.corpGroup.corpGroupInfo.MetadataAuditEvent.class);
|
|
||||||
// CorpUserUrn
|
|
||||||
put(Topics.METADATA_AUDIT_EVENT_CORPUSER_CORPUSERINFO,
|
|
||||||
com.linkedin.pegasus2avro.mxe.corpuser.corpUserInfo.MetadataAuditEvent.class);
|
|
||||||
put(Topics.METADATA_AUDIT_EVENT_CORPUSER_CORPUSEREDITABLEINFO,
|
|
||||||
com.linkedin.pegasus2avro.mxe.corpuser.corpUserEditableInfo.MetadataAuditEvent.class);
|
|
||||||
|
|
||||||
// Aspect-specific MCE topic to schema.
|
|
||||||
// CorpGroupUrn
|
|
||||||
put(Topics.METADATA_CHANGE_EVENT_CORPGROUP_CORPGROUPINFO,
|
|
||||||
com.linkedin.pegasus2avro.mxe.corpGroup.corpGroupInfo.MetadataChangeEvent.class);
|
|
||||||
// CorpUserUrn
|
|
||||||
put(Topics.METADATA_CHANGE_EVENT_CORPUSER_CORPUSERINFO,
|
|
||||||
com.linkedin.pegasus2avro.mxe.corpuser.corpUserInfo.MetadataChangeEvent.class);
|
|
||||||
put(Topics.METADATA_CHANGE_EVENT_CORPUSER_CORPUSEREDITABLEINFO,
|
|
||||||
com.linkedin.pegasus2avro.mxe.corpuser.corpUserEditableInfo.MetadataChangeEvent.class);
|
|
||||||
|
|
||||||
// Aspect-specific FMCE topic to schema.
|
|
||||||
// CorpGroupUrn
|
|
||||||
put(Topics.FAILED_METADATA_CHANGE_EVENT_CORPGROUP_CORPGROUPINFO,
|
|
||||||
com.linkedin.pegasus2avro.mxe.corpGroup.corpGroupInfo.FailedMetadataChangeEvent.class);
|
|
||||||
// CorpUserUrn
|
|
||||||
put(Topics.FAILED_METADATA_CHANGE_EVENT_CORPUSER_CORPUSERINFO,
|
|
||||||
com.linkedin.pegasus2avro.mxe.corpuser.corpUserInfo.FailedMetadataChangeEvent.class);
|
|
||||||
put(Topics.FAILED_METADATA_CHANGE_EVENT_CORPUSER_CORPUSEREDITABLEINFO,
|
|
||||||
com.linkedin.pegasus2avro.mxe.corpuser.corpUserEditableInfo.FailedMetadataChangeEvent.class);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
private Configs() {
|
private Configs() {
|
||||||
// Util class
|
// Util class
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,12 +5,7 @@ dependencies {
|
|||||||
dataModel project(path: ':li-utils', configuration: 'dataTemplate')
|
dataModel project(path: ':li-utils', configuration: 'dataTemplate')
|
||||||
}
|
}
|
||||||
|
|
||||||
task copyGeneratedPdl(type: Copy, dependsOn: ':metadata-models-ext:build') {
|
task copyMetadataModels(type: Copy) {
|
||||||
from("../../metadata-models-ext/src/mainGeneratedPdl/pegasus/com/linkedin/mxe")
|
|
||||||
into file("src/main/pegasus/com/linkedin/mxe")
|
|
||||||
}
|
|
||||||
|
|
||||||
task copyMetadataModels(type: Copy, dependsOn: copyGeneratedPdl) {
|
|
||||||
from("../../metadata-models/src/main/pegasus/")
|
from("../../metadata-models/src/main/pegasus/")
|
||||||
into file("src/main/pegasus")
|
into file("src/main/pegasus")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,7 +2,7 @@ apply plugin: 'java'
|
|||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
compile project(':metadata-events:mxe-avro-1.7')
|
compile project(':metadata-events:mxe-avro-1.7')
|
||||||
compile project(':metadata-models-ext')
|
compile project(':metadata-models')
|
||||||
compile spec.product.pegasus.dataAvro1_6
|
compile spec.product.pegasus.dataAvro1_6
|
||||||
|
|
||||||
testCompile externalDependency.gmaDaoApi
|
testCompile externalDependency.gmaDaoApi
|
||||||
|
|||||||
@ -217,12 +217,6 @@ public abstract class EntityService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_producer.produceMetadataAuditEvent(urn, oldSnapshot, newSnapshot, oldSystemMetadata, newSystemMetadata, operation);
|
_producer.produceMetadataAuditEvent(urn, oldSnapshot, newSnapshot, oldSystemMetadata, newSystemMetadata, operation);
|
||||||
|
|
||||||
// 4.1 Produce aspect specific MAE after a successful update
|
|
||||||
if (_emitAspectSpecificAuditEvent) {
|
|
||||||
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldAspectValue, newAspectValue, oldSystemMetadata,
|
|
||||||
newSystemMetadata, operation);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@ -47,21 +47,4 @@ public interface EntityEventProducer {
|
|||||||
@Nonnull AspectSpec aspectSpec,
|
@Nonnull AspectSpec aspectSpec,
|
||||||
@Nonnull final MetadataChangeLog metadataChangeLog
|
@Nonnull final MetadataChangeLog metadataChangeLog
|
||||||
);
|
);
|
||||||
|
|
||||||
/**
|
|
||||||
* Produces an aspect-specific {@link com.linkedin.mxe.MetadataAuditEvent} from a
|
|
||||||
* new & previous Entity Aspect.
|
|
||||||
*
|
|
||||||
* @param urn the urn associated with the entity changed
|
|
||||||
* @param oldValue a {@link RecordTemplate} corresponding to the old aspect.
|
|
||||||
* @param newValue a {@link RecordTemplate} corresponding to the new aspect.
|
|
||||||
*/
|
|
||||||
void produceAspectSpecificMetadataAuditEvent(
|
|
||||||
@Nonnull final Urn urn,
|
|
||||||
@Nullable final RecordTemplate oldValue,
|
|
||||||
@Nonnull final RecordTemplate newValue,
|
|
||||||
SystemMetadata oldSystemMetadata,
|
|
||||||
SystemMetadata newSystemMetadata,
|
|
||||||
MetadataAuditOperation operation
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,34 +0,0 @@
|
|||||||
apply plugin: 'pegasus'
|
|
||||||
|
|
||||||
configurations {
|
|
||||||
schemaGen
|
|
||||||
}
|
|
||||||
|
|
||||||
dependencies {
|
|
||||||
dataModel project(path: ':metadata-models', configuration: 'dataTemplate')
|
|
||||||
dataModel project(path: ':li-utils', configuration: 'dataTemplate')
|
|
||||||
schemaGen project(':metadata-models-generator')
|
|
||||||
}
|
|
||||||
|
|
||||||
task generateMXESchemas(type: JavaExec) {
|
|
||||||
main = 'com.linkedin.metadata.generator.SchemaGenerator'
|
|
||||||
jvmArgs '-Dgenerator.resolver.path=' + '../metadata-models/src/main/pegasus/:' + '../li-utils/src/main/pegasus/:' + configurations.dataModel.asPath
|
|
||||||
classpath = project.configurations.schemaGen
|
|
||||||
args '../metadata-models/src/main/pegasus', 'src/mainGeneratedPdl/pegasus/com/linkedin/mxe'
|
|
||||||
}
|
|
||||||
|
|
||||||
task generateMXEDataTemplate(type: com.linkedin.pegasus.gradle.tasks.GenerateDataTemplateTask,
|
|
||||||
dependsOn: generateMXESchemas) {
|
|
||||||
inputDir = file('src/mainGeneratedPdl/pegasus/com/linkedin/mxe')
|
|
||||||
destinationDir = file('src/mainGeneratedDataTemplate/java')
|
|
||||||
codegenClasspath = configurations.pegasusPlugin
|
|
||||||
resolverPath = files('src/mainGeneratedPdl/pegasus/', '../metadata-models/src/main/pegasus', '../li-utils/src/main/pegasus') +
|
|
||||||
configurations.dataModel
|
|
||||||
}
|
|
||||||
|
|
||||||
compileMainGeneratedDataTemplateJava.dependsOn generateMXEDataTemplate
|
|
||||||
|
|
||||||
clean {
|
|
||||||
project.delete("src/mainGeneratedDataTemplate")
|
|
||||||
project.delete("src/mainGeneratedPdl")
|
|
||||||
}
|
|
||||||
2
metadata-models-generator/.gitignore
vendored
2
metadata-models-generator/.gitignore
vendored
@ -1,2 +0,0 @@
|
|||||||
src/main/pegasus
|
|
||||||
src/test/pegasus
|
|
||||||
@ -1,16 +0,0 @@
|
|||||||
apply plugin: 'java'
|
|
||||||
|
|
||||||
dependencies {
|
|
||||||
compile spec.product.pegasus.data
|
|
||||||
compile spec.product.pegasus.generator
|
|
||||||
compile externalDependency.commonsIo
|
|
||||||
compile externalDependency.findbugsAnnotations
|
|
||||||
compile externalDependency.guava
|
|
||||||
compile externalDependency.rythmEngine
|
|
||||||
|
|
||||||
compileOnly externalDependency.lombok
|
|
||||||
annotationProcessor externalDependency.lombok
|
|
||||||
|
|
||||||
testCompile spec.product.pegasus.data
|
|
||||||
testCompile externalDependency.testng
|
|
||||||
}
|
|
||||||
@ -1,76 +0,0 @@
|
|||||||
package com.linkedin.metadata.generator;
|
|
||||||
|
|
||||||
import com.linkedin.metadata.rythm.RythmGenerator;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
|
||||||
import javax.annotation.Nonnull;
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.rythmengine.Rythm;
|
|
||||||
|
|
||||||
import static com.linkedin.metadata.generator.SchemaGeneratorConstants.*;
|
|
||||||
import static com.linkedin.metadata.generator.SchemaGeneratorUtil.*;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Render the property annotations to the MXE pdl schema.
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
public class EventSchemaComposer extends RythmGenerator {
|
|
||||||
|
|
||||||
public void render(@Nonnull List<EventSpec> eventSpecs, @Nonnull String mainOutput) {
|
|
||||||
eventSpecs.forEach(eventSpec -> {
|
|
||||||
switch (eventSpec.getSpecType()) {
|
|
||||||
case METADATA_CHANGE_EVENT:
|
|
||||||
processRecord(eventSpec, mainOutput);
|
|
||||||
break;
|
|
||||||
// TODO: reserved for other MXEs
|
|
||||||
default:
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("Unrecognized event type %s to be rendered.", eventSpec.getSpecType()));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processRecord(@Nonnull EventSpec eventSpec, @Nonnull String mainOutput) {
|
|
||||||
eventSpec.getUrnSet().forEach(urn -> {
|
|
||||||
try {
|
|
||||||
generateResultFile(urn, eventSpec, mainOutput);
|
|
||||||
} catch (IOException ex) {
|
|
||||||
log.error(String.format("Generate result file failed due to %s.", ex.getCause()));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void generateResultFile(@Nonnull String entityUrn, @Nonnull EventSpec eventSpec,
|
|
||||||
@Nonnull String mainOutput) throws IOException {
|
|
||||||
final String entityName = deCapitalize(getEntityName(entityUrn));
|
|
||||||
final String aspectName = deCapitalize(eventSpec.getValueType());
|
|
||||||
final File directory =
|
|
||||||
createOutputFolder(mainOutput + File.separator + entityName + File.separator + aspectName);
|
|
||||||
final String namespace = String.format(".%s.%s", entityName, aspectName);
|
|
||||||
|
|
||||||
// generate MCE
|
|
||||||
writeToFile(new File(directory, METADATA_CHANGE_EVENT + PDL_SUFFIX),
|
|
||||||
renderToString(eventSpec, entityUrn, namespace, EVENT_TEMPLATES.get(METADATA_CHANGE_EVENT)));
|
|
||||||
|
|
||||||
// generate FMCE
|
|
||||||
writeToFile(new File(directory, FAILED_METADATA_CHANGE_EVENT + PDL_SUFFIX),
|
|
||||||
renderToString(eventSpec, entityUrn, namespace, EVENT_TEMPLATES.get(FAILED_METADATA_CHANGE_EVENT)));
|
|
||||||
|
|
||||||
// generate MAE
|
|
||||||
writeToFile(new File(directory, METADATA_AUDIT_EVENT + PDL_SUFFIX),
|
|
||||||
renderToString(eventSpec, entityUrn, namespace, EVENT_TEMPLATES.get(METADATA_AUDIT_EVENT)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Nonnull
|
|
||||||
private String renderToString(@Nullable EventSpec eventSpec, @Nonnull String entityUrn, @Nonnull String namespace,
|
|
||||||
@Nonnull String template) throws IOException {
|
|
||||||
final String result = Rythm.renderIfTemplateExists(template, entityUrn, namespace, eventSpec);
|
|
||||||
if (result.isEmpty()) {
|
|
||||||
throw new IOException(String.format("Template does not exist: %s.", template));
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,43 +0,0 @@
|
|||||||
package com.linkedin.metadata.generator;
|
|
||||||
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
import javax.annotation.Nonnull;
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Getter & setter class for schema event metadata.
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
public class EventSpec {
|
|
||||||
// delta model for partial update, such as: com.linkedin.datasetGroup.MembershipDelta.
|
|
||||||
protected String delta;
|
|
||||||
|
|
||||||
// fullValueType of the model, such as: com.linkedin.identity.CorpUserInfo.
|
|
||||||
protected String fullValueType;
|
|
||||||
|
|
||||||
// namespace of the model, such as: com.linkedin.identity.
|
|
||||||
protected String namespace;
|
|
||||||
|
|
||||||
// specType of the model, such as: MetadataChangeEvent.
|
|
||||||
protected String specType;
|
|
||||||
|
|
||||||
// entities leverage the model, such as: com.linkedin.common.CorpuserUrn.
|
|
||||||
protected Set<String> urnSet = new HashSet<>();
|
|
||||||
|
|
||||||
// valueType of the model, such as: CorpUserInfo.
|
|
||||||
protected String valueType;
|
|
||||||
|
|
||||||
public EventSpec() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasDelta() {
|
|
||||||
return delta != null && !delta.isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setValueType(@Nonnull String schemaFullName) {
|
|
||||||
fullValueType = schemaFullName;
|
|
||||||
valueType = SchemaGeneratorUtil.stripNamespace(schemaFullName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,63 +0,0 @@
|
|||||||
package com.linkedin.metadata.generator;
|
|
||||||
|
|
||||||
import com.linkedin.data.schema.DataSchema;
|
|
||||||
import com.linkedin.data.schema.RecordDataSchema;
|
|
||||||
import com.linkedin.pegasus.generator.DataSchemaParser;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import javax.annotation.Nonnull;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
|
|
||||||
import static com.linkedin.metadata.generator.SchemaGeneratorConstants.*;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Parse the property annotations from the pdl schema.
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
public class SchemaAnnotationRetriever {
|
|
||||||
|
|
||||||
private final DataSchemaParser _dataSchemaParser;
|
|
||||||
|
|
||||||
public SchemaAnnotationRetriever(@Nonnull String resolverPath) {
|
|
||||||
_dataSchemaParser = new DataSchemaParser(resolverPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<EventSpec> generate(@Nonnull String[] sources) throws IOException {
|
|
||||||
|
|
||||||
final DataSchemaParser.ParseResult parseResult = _dataSchemaParser.parseSources(sources);
|
|
||||||
final List<EventSpec> eventSpecs = new ArrayList<>();
|
|
||||||
for (DataSchema dataSchema : parseResult.getSchemaAndLocations().keySet()) {
|
|
||||||
if (dataSchema.getType() == DataSchema.Type.RECORD) {
|
|
||||||
generate((RecordDataSchema) dataSchema, eventSpecs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return eventSpecs;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void generate(@Nonnull RecordDataSchema schema, @Nonnull List<EventSpec> specs) {
|
|
||||||
final Map<String, Object> props = schema.getProperties();
|
|
||||||
EventSpec eventSpec = null;
|
|
||||||
Map<String, Object> annotationInfo = null;
|
|
||||||
if (props.containsKey(ASPECT)) {
|
|
||||||
eventSpec = new EventSpec();
|
|
||||||
eventSpec.setSpecType(METADATA_CHANGE_EVENT);
|
|
||||||
annotationInfo = (Map<String, Object>) props.get(ASPECT);
|
|
||||||
} else {
|
|
||||||
log.debug(String.format("No recognized property annotations are presented in %s.", schema.getFullName()));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
specs.add(eventSpec);
|
|
||||||
if (annotationInfo != null && annotationInfo.containsKey(ENTITY_URNS)) {
|
|
||||||
eventSpec.setUrnSet(new HashSet<>((List) annotationInfo.get(ENTITY_URNS)));
|
|
||||||
}
|
|
||||||
if (annotationInfo != null && annotationInfo.containsKey(DELTA)) {
|
|
||||||
eventSpec.setDelta((String) annotationInfo.get(DELTA));
|
|
||||||
}
|
|
||||||
eventSpec.setNamespace(schema.getNamespace());
|
|
||||||
eventSpec.setValueType(schema.getFullName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,40 +0,0 @@
|
|||||||
package com.linkedin.metadata.generator;
|
|
||||||
|
|
||||||
import com.linkedin.data.schema.generator.AbstractGenerator;
|
|
||||||
import java.io.IOException;
|
|
||||||
import com.linkedin.pegasus.generator.DataSchemaParser;
|
|
||||||
import javax.annotation.Nonnull;
|
|
||||||
import org.rythmengine.Rythm;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Generates MXE schemas.
|
|
||||||
*/
|
|
||||||
public class SchemaGenerator {
|
|
||||||
|
|
||||||
private final DataSchemaParser _dataSchemaParser;
|
|
||||||
|
|
||||||
public SchemaGenerator(@Nonnull String resolverPath) {
|
|
||||||
_dataSchemaParser = new DataSchemaParser(resolverPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
|
||||||
if (args.length != 2) {
|
|
||||||
System.out.println("Usage: cmd <sourcePathInput> <generatedFileOutput>");
|
|
||||||
System.exit(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
final String source = args[0];
|
|
||||||
final SchemaAnnotationRetriever schemaAnnotationRetriever =
|
|
||||||
new SchemaAnnotationRetriever(System.getProperty(AbstractGenerator.GENERATOR_RESOLVER_PATH));
|
|
||||||
final String[] sources = {source};
|
|
||||||
schemaAnnotationRetriever.generate(sources);
|
|
||||||
|
|
||||||
final String mainOutput = args[1];
|
|
||||||
final EventSchemaComposer eventSchemaComposer = new EventSchemaComposer();
|
|
||||||
eventSchemaComposer.setupRythmEngine();
|
|
||||||
eventSchemaComposer.render(schemaAnnotationRetriever.generate(sources), mainOutput);
|
|
||||||
|
|
||||||
Rythm.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,33 +0,0 @@
|
|||||||
package com.linkedin.metadata.generator;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constants used in PDL/Rythm to describe event schemas.
|
|
||||||
*/
|
|
||||||
public class SchemaGeneratorConstants {
|
|
||||||
private SchemaGeneratorConstants() {
|
|
||||||
}
|
|
||||||
|
|
||||||
// used in SchemaAnnotationRetriever
|
|
||||||
static final String ASPECT = "Aspect";
|
|
||||||
static final String DELTA = "Delta";
|
|
||||||
static final String ENTITY_URNS = "EntityUrns";
|
|
||||||
|
|
||||||
// used in EventSchemaComposer
|
|
||||||
static final String FAILED_METADATA_CHANGE_EVENT = "FailedMetadataChangeEvent";
|
|
||||||
static final String FAILED_METADATA_CHANGE_EVENT_PREFIX = "Failed";
|
|
||||||
static final String METADATA_AUDIT_EVENT = "MetadataAuditEvent";
|
|
||||||
static final String METADATA_CHANGE_EVENT = "MetadataChangeEvent";
|
|
||||||
static final String PDL_SUFFIX = ".pdl";
|
|
||||||
static final Map<String, String> EVENT_TEMPLATES = Collections.unmodifiableMap(new HashMap<String, String>() {
|
|
||||||
{
|
|
||||||
put(FAILED_METADATA_CHANGE_EVENT, "FailedMetadataChangeEvent.rythm");
|
|
||||||
put(METADATA_AUDIT_EVENT, "MetadataAuditEvent.rythm");
|
|
||||||
put(METADATA_CHANGE_EVENT, "MetadataChangeEvent.rythm");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
@ -1,84 +0,0 @@
|
|||||||
package com.linkedin.metadata.generator;
|
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
|
||||||
import com.google.common.io.Files;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import javax.annotation.Nonnull;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Utility used in generating resource classes.
|
|
||||||
*/
|
|
||||||
public class SchemaGeneratorUtil {
|
|
||||||
|
|
||||||
private SchemaGeneratorUtil() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* De-Capitalize the input name.
|
|
||||||
*
|
|
||||||
* @param name the string whose first character will be converted to lowercase.
|
|
||||||
* @return the converted name
|
|
||||||
*/
|
|
||||||
@Nonnull
|
|
||||||
public static String deCapitalize(@Nonnull String name) {
|
|
||||||
if (name.isEmpty()) {
|
|
||||||
throw new IllegalArgumentException();
|
|
||||||
} else {
|
|
||||||
return Character.toLowerCase(name.charAt(0)) + name.substring(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Strip the urn namespace to the entity name.
|
|
||||||
*
|
|
||||||
* @param urn the namespace of the entityUrn.
|
|
||||||
* @return the entity name.
|
|
||||||
*/
|
|
||||||
@Nonnull
|
|
||||||
public static String getEntityName(@Nonnull String urn) {
|
|
||||||
return stripNamespace(urn).substring(0,
|
|
||||||
stripNamespace(urn).length() - 3); // Truncate the urn to entity name such as: FooBarUrn -> FooBar
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Strip the namespace to the valueType.
|
|
||||||
*
|
|
||||||
* @param namespace the namespace of the entity.
|
|
||||||
* @return the valueType of the namespace.
|
|
||||||
*/
|
|
||||||
@Nonnull
|
|
||||||
public static String stripNamespace(@Nonnull String namespace) {
|
|
||||||
final int index = namespace.lastIndexOf('.');
|
|
||||||
if (index < namespace.length() - 1) {
|
|
||||||
// index == -1 (dot not found) || 0 <= index < length -1 (namespace is not ended with dot)
|
|
||||||
return namespace.substring(index + 1);
|
|
||||||
}
|
|
||||||
throw new IllegalArgumentException();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Write the content to the file.
|
|
||||||
*
|
|
||||||
* @param file the target file.
|
|
||||||
* @param content the content to be written in the file.
|
|
||||||
*/
|
|
||||||
public static void writeToFile(@Nonnull File file, @Nonnull String content) throws IOException {
|
|
||||||
Files.asCharSink(file, Charsets.UTF_8).write(content);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create event schema output folder.
|
|
||||||
*
|
|
||||||
* @param eventSchemaOutput the output path for the rendered schemas.
|
|
||||||
* @return the directory of the output path.
|
|
||||||
*/
|
|
||||||
public static File createOutputFolder(@Nonnull String eventSchemaOutput) throws IOException {
|
|
||||||
final File directory = new File(eventSchemaOutput);
|
|
||||||
if (!directory.mkdirs() && !directory.exists()) {
|
|
||||||
throw new IOException(String.format("%s cannot be created.", directory));
|
|
||||||
}
|
|
||||||
return directory;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,22 +0,0 @@
|
|||||||
package com.linkedin.metadata.rythm;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import org.rythmengine.Rythm;
|
|
||||||
import org.rythmengine.conf.RythmConfigurationKey;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Base class for code generator based on <a href="http://rythmengine.org/">rythm template engine</a>.
|
|
||||||
*/
|
|
||||||
public abstract class RythmGenerator {
|
|
||||||
|
|
||||||
public void setupRythmEngine() {
|
|
||||||
final Map<String, Object> config = new HashMap<>();
|
|
||||||
StreamResourceLoader loader = new StreamResourceLoader();
|
|
||||||
config.put(RythmConfigurationKey.CODEGEN_COMPACT_ENABLED.getKey(), false);
|
|
||||||
config.put(RythmConfigurationKey.RESOURCE_LOADER_IMPLS.getKey(), loader);
|
|
||||||
Rythm.init(config);
|
|
||||||
loader.setEngine(Rythm.engine());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,20 +0,0 @@
|
|||||||
package com.linkedin.metadata.rythm;
|
|
||||||
|
|
||||||
import org.rythmengine.resource.ITemplateResource;
|
|
||||||
import org.rythmengine.resource.ResourceLoaderBase;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Rythm template resource loader that can load template from resource stream under root directory `rythm`.
|
|
||||||
*/
|
|
||||||
public class StreamResourceLoader extends ResourceLoaderBase {
|
|
||||||
@Override
|
|
||||||
public String getResourceLoaderRoot() {
|
|
||||||
return "rythm";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ITemplateResource load(String path) {
|
|
||||||
return new StreamTemplateResource(path, this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,75 +0,0 @@
|
|||||||
package com.linkedin.metadata.rythm;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.net.URLConnection;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.rythmengine.resource.TemplateResourceBase;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An implementation of {@link TemplateResourceBase} based on templates located
|
|
||||||
* at resource stream located under root directory "rythm" at first.
|
|
||||||
* Otherwise it will load template based on the relative path.
|
|
||||||
*/
|
|
||||||
public class StreamTemplateResource extends TemplateResourceBase {
|
|
||||||
private final String path;
|
|
||||||
private transient URLConnection connection = null;
|
|
||||||
|
|
||||||
public StreamTemplateResource(String path, StreamResourceLoader loader) {
|
|
||||||
super(loader);
|
|
||||||
this.path = path;
|
|
||||||
|
|
||||||
final URL url = getClass().getClassLoader().getResource(path);
|
|
||||||
if (url != null) {
|
|
||||||
try {
|
|
||||||
connection = url.openConnection();
|
|
||||||
} catch (IOException ex) {
|
|
||||||
throw new RuntimeException("Get template resource failed", ex);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
final File rythm = new File(path);
|
|
||||||
if (rythm.exists()) {
|
|
||||||
connection = rythm.toURI().toURL().openConnection();
|
|
||||||
}
|
|
||||||
} catch (IOException ex) {
|
|
||||||
throw new RuntimeException("Get template resource failed", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Object getKey() {
|
|
||||||
return path;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isValid() {
|
|
||||||
return connection != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected long defCheckInterval() {
|
|
||||||
return 1000 * 5;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected long lastModified() {
|
|
||||||
return connection.getLastModified();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String reload() {
|
|
||||||
if (isValid()) {
|
|
||||||
try {
|
|
||||||
return IOUtils.toString(connection.getInputStream());
|
|
||||||
} catch (IOException e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,29 +0,0 @@
|
|||||||
@import com.linkedin.metadata.generator.EventSpec;
|
|
||||||
@import com.linkedin.metadata.generator.SchemaGeneratorUtil;
|
|
||||||
@args String entityUrn, String nameSpace EventSpec eventSpec
|
|
||||||
@assign (entityName) {@SchemaGeneratorUtil.getEntityName(entityUrn)}
|
|
||||||
namespace com.linkedin.mxe@(nameSpace)
|
|
||||||
|
|
||||||
import com.linkedin.avro2pegasus.events.KafkaAuditHeader
|
|
||||||
|
|
||||||
/**
|
|
||||||
* FailedMetadataChangeEvent for the @(entityName)Urn with @(eventSpec.getValueType()) aspect.
|
|
||||||
*/
|
|
||||||
@@FailedMetadataChangeEvent
|
|
||||||
record FailedMetadataChangeEvent {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Kafka event for capturing a failure to process a MetadataChangeEvent.
|
|
||||||
*/
|
|
||||||
auditHeader: optional KafkaAuditHeader
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The event that failed to be processed.
|
|
||||||
*/
|
|
||||||
metadataChangeEvent: MetadataChangeEvent
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The error message or the stacktrace for the failure.
|
|
||||||
*/
|
|
||||||
error: string
|
|
||||||
}
|
|
||||||
@ -1,36 +0,0 @@
|
|||||||
@import com.linkedin.metadata.generator.EventSpec;
|
|
||||||
@import com.linkedin.metadata.generator.SchemaGeneratorUtil;
|
|
||||||
@args String entityUrn, String nameSpace EventSpec eventSpec
|
|
||||||
@assign (entityName) {@SchemaGeneratorUtil.getEntityName(entityUrn)}
|
|
||||||
namespace com.linkedin.mxe@(nameSpace)
|
|
||||||
|
|
||||||
import com.linkedin.avro2pegasus.events.KafkaAuditHeader
|
|
||||||
import @entityUrn
|
|
||||||
import @eventSpec.getFullValueType()
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MetadataAuditEvent for the @(entityName)Urn with @(eventSpec.getValueType()) aspect.
|
|
||||||
*/
|
|
||||||
@@MetadataAuditEvent
|
|
||||||
record MetadataAuditEvent {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Kafka audit header for the MetadataAuditEvent.
|
|
||||||
*/
|
|
||||||
auditHeader: optional KafkaAuditHeader
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @(entityName)Urn as the key for the MetadataAuditEvent.
|
|
||||||
*/
|
|
||||||
urn: @(entityName)Urn
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Aspect of the @eventSpec.getValueType() before the update.
|
|
||||||
*/
|
|
||||||
oldValue: optional @eventSpec.getValueType()
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Aspect of the @eventSpec.getValueType() after the update.
|
|
||||||
*/
|
|
||||||
newValue: @eventSpec.getValueType()
|
|
||||||
}
|
|
||||||
@ -1,41 +0,0 @@
|
|||||||
@import com.linkedin.metadata.generator.EventSpec;
|
|
||||||
@import com.linkedin.metadata.generator.SchemaGeneratorUtil;
|
|
||||||
@args String entityUrn, String nameSpace EventSpec eventSpec
|
|
||||||
@assign (entityName) {@SchemaGeneratorUtil.getEntityName(entityUrn)}
|
|
||||||
namespace com.linkedin.mxe@(nameSpace)
|
|
||||||
|
|
||||||
import com.linkedin.avro2pegasus.events.KafkaAuditHeader
|
|
||||||
import @entityUrn
|
|
||||||
import @eventSpec.getFullValueType()
|
|
||||||
@if (eventSpec.hasDelta()) {
|
|
||||||
import @eventSpec.getDelta()
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MetadataChangeEvent for the @(entityName)Urn with @(eventSpec.getValueType()) aspect.
|
|
||||||
*/
|
|
||||||
@@MetadataChangeEvent
|
|
||||||
record MetadataChangeEvent {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Kafka audit header. See go/kafkaauditheader for more info.
|
|
||||||
*/
|
|
||||||
auditHeader: optional KafkaAuditHeader
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @(entityName)Urn as the key for the MetadataChangeEvent.
|
|
||||||
*/
|
|
||||||
urn: @(entityName)Urn
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Value of the proposed @eventSpec.getValueType() change.
|
|
||||||
*/
|
|
||||||
proposedValue: optional @eventSpec.getValueType()
|
|
||||||
@if (eventSpec.hasDelta()) {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Delta of the proposed @SchemaGeneratorUtil.stripNamespace(eventSpec.getDelta()) partial update.
|
|
||||||
*/
|
|
||||||
proposedDelta: optional @SchemaGeneratorUtil.stripNamespace(eventSpec.getDelta())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,72 +0,0 @@
|
|||||||
package com.linkedin.metadata.generator;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileInputStream;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.rythmengine.Rythm;
|
|
||||||
import org.testng.annotations.Test;
|
|
||||||
|
|
||||||
import static com.linkedin.metadata.generator.SchemaGeneratorConstants.*;
|
|
||||||
import static com.linkedin.metadata.generator.TestEventSpec.*;
|
|
||||||
import static org.testng.Assert.*;
|
|
||||||
|
|
||||||
|
|
||||||
public class TestEventSchemaComposer {
|
|
||||||
|
|
||||||
private static final String GENERATED_MXE_PATH = "src/testGeneratedPdl/pegasus/com/linkedin/mxe";
|
|
||||||
private static final String TEST_NAMESPACE = "/bar/annotatedAspectBar";
|
|
||||||
private static final String TEST_GENERATED_PDL = "MetadataChangeEvent.pdl";
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMCESchemaRender() throws Exception {
|
|
||||||
final String testMCE = GENERATED_MXE_PATH + TEST_NAMESPACE + File.separator + TEST_GENERATED_PDL;
|
|
||||||
final File metadataChangeEvent = new File(testMCE);
|
|
||||||
|
|
||||||
populateEvents();
|
|
||||||
|
|
||||||
assertTrue(metadataChangeEvent.exists());
|
|
||||||
assertEquals(IOUtils.toString(new FileInputStream(testMCE)), IOUtils.toString(this.getClass()
|
|
||||||
.getClassLoader()
|
|
||||||
.getResourceAsStream("com/linkedin/mxe" + TEST_NAMESPACE + File.separator + TEST_GENERATED_PDL)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFMCESchemaRender() throws Exception {
|
|
||||||
final String testFMCE =
|
|
||||||
GENERATED_MXE_PATH + TEST_NAMESPACE + File.separator + FAILED_METADATA_CHANGE_EVENT_PREFIX + TEST_GENERATED_PDL;
|
|
||||||
final File failedMetadataChangeEventBar = new File(testFMCE);
|
|
||||||
|
|
||||||
populateEvents();
|
|
||||||
|
|
||||||
assertTrue(failedMetadataChangeEventBar.exists());
|
|
||||||
assertEquals(IOUtils.toString(new FileInputStream(testFMCE)), IOUtils.toString(this.getClass()
|
|
||||||
.getClassLoader()
|
|
||||||
.getResourceAsStream("com/linkedin/mxe" + TEST_NAMESPACE + File.separator + FAILED_METADATA_CHANGE_EVENT_PREFIX
|
|
||||||
+ TEST_GENERATED_PDL)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMAESchemaRender() throws Exception {
|
|
||||||
final String testMAE =
|
|
||||||
GENERATED_MXE_PATH + TEST_NAMESPACE + File.separator + METADATA_AUDIT_EVENT + PDL_SUFFIX;
|
|
||||||
final File metadataAuditEventBar = new File(testMAE);
|
|
||||||
|
|
||||||
populateEvents();
|
|
||||||
|
|
||||||
assertTrue(metadataAuditEventBar.exists());
|
|
||||||
assertEquals(IOUtils.toString(new FileInputStream(testMAE)), IOUtils.toString(this.getClass()
|
|
||||||
.getClassLoader()
|
|
||||||
.getResourceAsStream(
|
|
||||||
"com/linkedin/mxe" + TEST_NAMESPACE + File.separator + METADATA_AUDIT_EVENT + PDL_SUFFIX)));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void populateEvents() throws Exception {
|
|
||||||
SchemaAnnotationRetriever schemaAnnotationRetriever =
|
|
||||||
new SchemaAnnotationRetriever(TEST_METADATA_MODELS_RESOLVED_PATH);
|
|
||||||
final String[] sources = {TEST_METADATA_MODELS_SOURCE_PATH};
|
|
||||||
EventSchemaComposer eventSchemaComposer = new EventSchemaComposer();
|
|
||||||
eventSchemaComposer.setupRythmEngine();
|
|
||||||
eventSchemaComposer.render(schemaAnnotationRetriever.generate(sources), GENERATED_MXE_PATH);
|
|
||||||
Rythm.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,69 +0,0 @@
|
|||||||
package com.linkedin.metadata.generator;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import javax.annotation.Nonnull;
|
|
||||||
import org.testng.annotations.Test;
|
|
||||||
|
|
||||||
import static org.testng.Assert.*;
|
|
||||||
|
|
||||||
|
|
||||||
public class TestEventSpec {
|
|
||||||
|
|
||||||
static final String TEST_METADATA_MODELS_RESOLVED_PATH =
|
|
||||||
"src/test/resources/com/linkedin/testing" + "../../metadata-testing/metadata-test-models/src/main/pegasus/:";
|
|
||||||
static final String TEST_METADATA_MODELS_SOURCE_PATH = "src/test/resources/com/linkedin/testing";
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEventSpecParse() throws Exception {
|
|
||||||
|
|
||||||
SchemaAnnotationRetriever schemaAnnotationRetriever =
|
|
||||||
new SchemaAnnotationRetriever(TEST_METADATA_MODELS_RESOLVED_PATH);
|
|
||||||
final String[] sources = {TEST_METADATA_MODELS_SOURCE_PATH};
|
|
||||||
final List<EventSpec> eventSpecs = schemaAnnotationRetriever.generate(sources);
|
|
||||||
|
|
||||||
// Check if annotations are correctly generated.
|
|
||||||
final ArrayList<String> testList = new ArrayList<>();
|
|
||||||
mapAspectToUrn(eventSpecs).get("com.linkedin.testing.FooUrn")
|
|
||||||
.forEach(eventSpec -> testList.add(eventSpec.getValueType()));
|
|
||||||
assertTrue(testList.containsAll(new ArrayList<>(Arrays.asList("AnnotatedAspectFoo", "AnnotatedAspectBar"))));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testValidFullSchemaName() throws Exception {
|
|
||||||
EventSpec tesEventSpec = new EventSpec();
|
|
||||||
final String validFullSchemaName = "com.linkedin.testing.BarUrn";
|
|
||||||
|
|
||||||
tesEventSpec.setValueType(validFullSchemaName);
|
|
||||||
|
|
||||||
assertEquals(tesEventSpec.getValueType(), "BarUrn");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expectedExceptions = IllegalArgumentException.class)
|
|
||||||
public void testInValidFullSchemaName() throws Exception {
|
|
||||||
EventSpec tesEventSpec = new EventSpec();
|
|
||||||
final String validFullSchemaName = "com.linkedin.testing.BarUrn.";
|
|
||||||
|
|
||||||
tesEventSpec.setValueType(validFullSchemaName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Nonnull
|
|
||||||
private Map<String, ArrayList<EventSpec>> mapAspectToUrn(@Nonnull List<EventSpec> eventSpecs) {
|
|
||||||
final Map<String, ArrayList<EventSpec>> eventsMap = new HashMap<>();
|
|
||||||
for (EventSpec eventSpec : eventSpecs) {
|
|
||||||
final Set<String> urnSet = eventSpec.getUrnSet();
|
|
||||||
urnSet.forEach((urn) -> {
|
|
||||||
if (eventsMap.containsKey(urn)) {
|
|
||||||
eventsMap.get(urn).add(eventSpec);
|
|
||||||
} else {
|
|
||||||
eventsMap.put(urn, new ArrayList<>(Arrays.asList(eventSpec)));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return eventsMap;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,32 +0,0 @@
|
|||||||
package com.linkedin.metadata.generator;
|
|
||||||
|
|
||||||
import org.testng.annotations.Test;
|
|
||||||
|
|
||||||
import static com.linkedin.metadata.generator.SchemaGeneratorUtil.*;
|
|
||||||
import static org.testng.Assert.*;
|
|
||||||
|
|
||||||
|
|
||||||
public class TestSchemaGeneratorUtil {
|
|
||||||
|
|
||||||
private static final String TEST_NAME = "BarUrn";
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDeCapitalize() {
|
|
||||||
assertEquals(deCapitalize(TEST_NAME), "barUrn");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetEntityName() {
|
|
||||||
assertEquals(getEntityName(TEST_NAME), "Bar");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testStripNamespace() {
|
|
||||||
assertEquals(stripNamespace(TEST_NAME), "BarUrn");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expectedExceptions = IllegalArgumentException.class)
|
|
||||||
public void testStripIllegalNamespace() {
|
|
||||||
stripNamespace(TEST_NAME + ".");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,25 +0,0 @@
|
|||||||
namespace com.linkedin.mxe.bar.annotatedAspectBar
|
|
||||||
|
|
||||||
import com.linkedin.avro2pegasus.events.KafkaAuditHeader
|
|
||||||
|
|
||||||
/**
|
|
||||||
* FailedMetadataChangeEvent for the BarUrn with AnnotatedAspectBar aspect.
|
|
||||||
*/
|
|
||||||
@FailedMetadataChangeEvent
|
|
||||||
record FailedMetadataChangeEvent {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Kafka event for capturing a failure to process a MetadataChangeEvent.
|
|
||||||
*/
|
|
||||||
auditHeader: optional KafkaAuditHeader
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The event that failed to be processed.
|
|
||||||
*/
|
|
||||||
metadataChangeEvent: MetadataChangeEvent
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The error message or the stacktrace for the failure.
|
|
||||||
*/
|
|
||||||
error: string
|
|
||||||
}
|
|
||||||
@ -1,32 +0,0 @@
|
|||||||
namespace com.linkedin.mxe.bar.annotatedAspectBar
|
|
||||||
|
|
||||||
import com.linkedin.avro2pegasus.events.KafkaAuditHeader
|
|
||||||
import com.linkedin.testing.BarUrn
|
|
||||||
import com.linkedin.testing.AnnotatedAspectBar
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MetadataAuditEvent for the BarUrn with AnnotatedAspectBar aspect.
|
|
||||||
*/
|
|
||||||
@MetadataAuditEvent
|
|
||||||
record MetadataAuditEvent {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Kafka audit header for the MetadataAuditEvent.
|
|
||||||
*/
|
|
||||||
auditHeader: optional KafkaAuditHeader
|
|
||||||
|
|
||||||
/**
|
|
||||||
* BarUrn as the key for the MetadataAuditEvent.
|
|
||||||
*/
|
|
||||||
urn: BarUrn
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Aspect of the AnnotatedAspectBar before the update.
|
|
||||||
*/
|
|
||||||
oldValue: optional AnnotatedAspectBar
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Aspect of the AnnotatedAspectBar after the update.
|
|
||||||
*/
|
|
||||||
newValue: AnnotatedAspectBar
|
|
||||||
}
|
|
||||||
@ -1,33 +0,0 @@
|
|||||||
namespace com.linkedin.mxe.bar.annotatedAspectBar
|
|
||||||
|
|
||||||
import com.linkedin.avro2pegasus.events.KafkaAuditHeader
|
|
||||||
import com.linkedin.testing.BarUrn
|
|
||||||
import com.linkedin.testing.AnnotatedAspectBar
|
|
||||||
import com.linkedin.testing.AnnotatedAspectBarDelta
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MetadataChangeEvent for the BarUrn with AnnotatedAspectBar aspect.
|
|
||||||
*/
|
|
||||||
@MetadataChangeEvent
|
|
||||||
record MetadataChangeEvent {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Kafka audit header. See go/kafkaauditheader for more info.
|
|
||||||
*/
|
|
||||||
auditHeader: optional KafkaAuditHeader
|
|
||||||
|
|
||||||
/**
|
|
||||||
* BarUrn as the key for the MetadataChangeEvent.
|
|
||||||
*/
|
|
||||||
urn: BarUrn
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Value of the proposed AnnotatedAspectBar change.
|
|
||||||
*/
|
|
||||||
proposedValue: optional AnnotatedAspectBar
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Delta of the proposed AnnotatedAspectBarDelta partial update.
|
|
||||||
*/
|
|
||||||
proposedDelta: optional AnnotatedAspectBarDelta
|
|
||||||
}
|
|
||||||
@ -1,21 +0,0 @@
|
|||||||
namespace com.linkedin.testing
|
|
||||||
|
|
||||||
/**
|
|
||||||
* For unit tests
|
|
||||||
*/
|
|
||||||
@Aspect = { "Delta": "com.linkedin.testing.AnnotatedAspectBarDelta",
|
|
||||||
"EntityUrns": ["com.linkedin.testing.FooUrn", "com.linkedin.testing.BarUrn"]}
|
|
||||||
record AnnotatedAspectBar {
|
|
||||||
|
|
||||||
/** For unit tests */
|
|
||||||
stringField: string
|
|
||||||
|
|
||||||
/** For unit tests */
|
|
||||||
boolField: boolean
|
|
||||||
|
|
||||||
/** For unit tests */
|
|
||||||
longField: long
|
|
||||||
|
|
||||||
/** For unit tests */
|
|
||||||
arrayField: array[string]
|
|
||||||
}
|
|
||||||
@ -1,10 +0,0 @@
|
|||||||
namespace com.linkedin.testing
|
|
||||||
|
|
||||||
/**
|
|
||||||
* For unit tests
|
|
||||||
*/
|
|
||||||
record AnnotatedAspectBarDelta {
|
|
||||||
|
|
||||||
/** For unit tests */
|
|
||||||
stringFieldToUpdate: string
|
|
||||||
}
|
|
||||||
@ -1,11 +0,0 @@
|
|||||||
namespace com.linkedin.testing
|
|
||||||
|
|
||||||
/**
|
|
||||||
* For unit tests
|
|
||||||
*/
|
|
||||||
@Aspect.EntityUrns = ["com.linkedin.testing.FooUrn"]
|
|
||||||
record AnnotatedAspectFoo {
|
|
||||||
|
|
||||||
/** For unit tests */
|
|
||||||
value: string
|
|
||||||
}
|
|
||||||
@ -24,8 +24,6 @@ include 'metadata-jobs:mce-consumer'
|
|||||||
include 'metadata-jobs:mae-consumer-job'
|
include 'metadata-jobs:mae-consumer-job'
|
||||||
include 'metadata-jobs:mce-consumer-job'
|
include 'metadata-jobs:mce-consumer-job'
|
||||||
include 'metadata-models'
|
include 'metadata-models'
|
||||||
include 'metadata-models-ext'
|
|
||||||
include 'metadata-models-generator'
|
|
||||||
include 'metadata-models-validator'
|
include 'metadata-models-validator'
|
||||||
include 'metadata-testing:metadata-models-test-utils'
|
include 'metadata-testing:metadata-models-test-utils'
|
||||||
include 'metadata-testing:metadata-test-utils'
|
include 'metadata-testing:metadata-test-utils'
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user