mirror of
https://github.com/datahub-project/datahub.git
synced 2026-01-08 15:56:13 +00:00
Removing unnecessary classes for mxe-registration
This commit is contained in:
parent
250ced5eeb
commit
6b137a862b
@ -28,4 +28,4 @@ pegasus.main.idlOptions.addIdlItem([
|
||||
'com.linkedin.identity.rest'
|
||||
])
|
||||
|
||||
ext.apiProject = project(':gms:api')
|
||||
ext.apiProject = project(':gms:api')
|
||||
@ -3,8 +3,6 @@ package com.linkedin.metadata.dao.utils;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
|
||||
@ -28,11 +26,10 @@ public class ESUtils {
|
||||
BoolQueryBuilder boolFilter = new BoolQueryBuilder();
|
||||
for (Map.Entry<String, String> entry : requestMap.entrySet()) {
|
||||
BoolQueryBuilder filters = new BoolQueryBuilder();
|
||||
// TODO: Remove checking for urn after solving META-10102
|
||||
Arrays.stream(Urn.isUrn(entry.getValue()) ? new String[]{entry.getValue()} : entry.getValue().split(","))
|
||||
Arrays.stream(entry.getValue().split(","))
|
||||
.forEach(elem -> filters.should(QueryBuilders.matchQuery(entry.getKey(), elem)));
|
||||
boolFilter.must(filters);
|
||||
}
|
||||
return boolFilter;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,7 +45,7 @@ public class SearchUtils {
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a requestMap to a filter
|
||||
* Converts a requestMap to a filter
|
||||
*
|
||||
* @param requestMap a map of fields and values
|
||||
* @return the search filter
|
||||
|
||||
@ -256,7 +256,7 @@ public class Neo4jUtil {
|
||||
*/
|
||||
@Nonnull
|
||||
public static RelationshipFilter createRelationshipFilter(@Nonnull Filter filter,
|
||||
@Nonnull RelationshipDirection relationshipDirection) {
|
||||
@Nonnull RelationshipDirection relationshipDirection) {
|
||||
return new RelationshipFilter().setCriteria(filter.getCriteria()).setDirection(relationshipDirection);
|
||||
}
|
||||
|
||||
@ -270,7 +270,7 @@ public class Neo4jUtil {
|
||||
*/
|
||||
@Nonnull
|
||||
public static RelationshipFilter createRelationshipFilter(@Nonnull String field, @Nonnull String value,
|
||||
@Nonnull RelationshipDirection relationshipDirection) {
|
||||
@Nonnull RelationshipDirection relationshipDirection) {
|
||||
return createRelationshipFilter(createFilter(field, value), relationshipDirection);
|
||||
}
|
||||
|
||||
@ -284,10 +284,10 @@ public class Neo4jUtil {
|
||||
@Nonnull
|
||||
public static Filter createFilter(@Nonnull String field, @Nonnull String value) {
|
||||
return new Filter()
|
||||
.setCriteria(
|
||||
new CriterionArray(Collections.singletonList(
|
||||
new Criterion().setField(field).setValue(value).setCondition(Condition.EQUAL)
|
||||
))
|
||||
);
|
||||
.setCriteria(
|
||||
new CriterionArray(Collections.singletonList(
|
||||
new Criterion().setField(field).setValue(value).setCondition(Condition.EQUAL)
|
||||
))
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -24,4 +24,4 @@ public class DatasetSnapshotRequestBuilderTest extends BaseSnapshotRequestBuilde
|
||||
|
||||
assertEquals(builder.urnClass(), DatasetUrn.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -87,4 +87,4 @@ public class QueryUtilTest {
|
||||
private AspectVersion makeAspectVersion(String aspectName, long version) {
|
||||
return new AspectVersion().setAspect(aspectName).setVersion(version);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,63 +1 @@
|
||||
apply plugin: 'java'
|
||||
|
||||
configurations {
|
||||
avro17
|
||||
|
||||
validationCompile
|
||||
validationCompile.extendsFrom testCompile
|
||||
|
||||
validationRuntime
|
||||
validationRuntime.extendsFrom testRuntime
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compile externalDependency.kafkaSchemaRegistry
|
||||
|
||||
compileOnly project(':metadata-events:mxe-avro-1.7')
|
||||
|
||||
validationCompile externalDependency.avroMigrationHelper
|
||||
validationCompile externalDependency.testng
|
||||
|
||||
avro17 project(':metadata-events:mxe-avro-1.7')
|
||||
}
|
||||
|
||||
//////////////////////////////////////
|
||||
// Schema compatibility validation
|
||||
//////////////////////////////////////
|
||||
sourceSets {
|
||||
validation {
|
||||
java {
|
||||
compileClasspath += main.output + test.output
|
||||
runtimeClasspath += main.output + test.output
|
||||
srcDir file('src/validation/java')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
idea {
|
||||
module {
|
||||
testSourceDirs += file("src/validation/java")
|
||||
}
|
||||
}
|
||||
|
||||
task validate17(type: Test, dependsOn: configurations.avro17) {
|
||||
testLogging.events 'standard_out'
|
||||
testClassesDirs = sourceSets.validation.output.classesDirs
|
||||
classpath = files(sourceSets.validation.runtimeClasspath, configurations.avro17)
|
||||
|
||||
// Prevent gradle from caching each validation run
|
||||
outputs.upToDateWhen { false }
|
||||
}
|
||||
|
||||
// Make sure validate17 are run as part of gradle build
|
||||
check.dependsOn += [validate17]
|
||||
|
||||
//////////////////////////////////////
|
||||
// Schema registration
|
||||
//////////////////////////////////////
|
||||
def mainClassName = 'com.linkedin.mxe.RegisterSchemas'
|
||||
|
||||
task register17(type: JavaExec, dependsOn: [configurations.compile, configurations.avro17]) {
|
||||
main = mainClassName
|
||||
classpath = files(sourceSets.main.runtimeClasspath, configurations.avro17)
|
||||
}
|
||||
|
||||
@ -1,41 +0,0 @@
|
||||
package com.linkedin.mxe;
|
||||
|
||||
import com.linkedin.pegasus2avro.mxe.FailedMetadataChangeEvent;
|
||||
import com.linkedin.pegasus2avro.mxe.MetadataAuditEvent;
|
||||
import com.linkedin.pegasus2avro.mxe.MetadataChangeEvent;
|
||||
import com.linkedin.pegasus2avro.mxe.MetadataGraphEvent;
|
||||
import com.linkedin.pegasus2avro.mxe.MetadataSearchEvent;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.avro.Schema;
|
||||
|
||||
|
||||
public class Configs {
|
||||
|
||||
public static final Map<String, String> FABRIC_SCHEMA_REGISTRY_MAP =
|
||||
Collections.unmodifiableMap(new HashMap<String, String>() {
|
||||
{
|
||||
put("ei", "http://ei4-schemaregistry-vip-1.int.linkedin.com:10252");
|
||||
put("corp", "http://lca1-schema-registry-vip-1.corp.linkedin.com:10252");
|
||||
}
|
||||
});
|
||||
|
||||
public static final Map<String, Schema> TOPIC_SCHEMA_MAP = Collections.unmodifiableMap(new HashMap<String, Schema>() {
|
||||
{
|
||||
put(Topics.METADATA_AUDIT_EVENT, MetadataAuditEvent.SCHEMA$);
|
||||
put(Topics.METADATA_CHANGE_EVENT, MetadataChangeEvent.SCHEMA$);
|
||||
put(Topics.FAILED_METADATA_CHANGE_EVENT, FailedMetadataChangeEvent.SCHEMA$);
|
||||
put(Topics.METADATA_GRAPH_EVENT, MetadataGraphEvent.SCHEMA$);
|
||||
put(Topics.METADATA_SEARCH_EVENT, MetadataSearchEvent.SCHEMA$);
|
||||
|
||||
put(Topics.DEV_METADATA_AUDIT_EVENT, MetadataAuditEvent.SCHEMA$);
|
||||
put(Topics.DEV_METADATA_CHANGE_EVENT, MetadataChangeEvent.SCHEMA$);
|
||||
put(Topics.DEV_FAILED_METADATA_CHANGE_EVENT, FailedMetadataChangeEvent.SCHEMA$);
|
||||
}
|
||||
});
|
||||
|
||||
private Configs() {
|
||||
// Util class
|
||||
}
|
||||
}
|
||||
@ -1,42 +0,0 @@
|
||||
package com.linkedin.mxe;
|
||||
|
||||
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
||||
import org.apache.avro.Schema;
|
||||
|
||||
|
||||
public class RegisterSchemas {
|
||||
|
||||
public static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://localhost:8081";
|
||||
|
||||
private RegisterSchemas() {
|
||||
}
|
||||
|
||||
static CachedSchemaRegistryClient createClient(String url) {
|
||||
return new CachedSchemaRegistryClient(url, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
private static void registerSchema(String topic, Schema schema, CachedSchemaRegistryClient client) {
|
||||
try {
|
||||
client.register(topic, schema);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void registerSchema(String topic, String schemaRegistryUrl) {
|
||||
CachedSchemaRegistryClient client = createClient(schemaRegistryUrl);
|
||||
Schema schema = Configs.TOPIC_SCHEMA_MAP.get(topic);
|
||||
System.out.println(String.format("Registering %s using registry %s (size: %d)", topic, schemaRegistryUrl,
|
||||
schema.toString(false).length()));
|
||||
registerSchema(topic, schema, client);
|
||||
}
|
||||
|
||||
public static void main(final String[] args) {
|
||||
final String url = args.length == 1 ? args[0] : DEFAULT_SCHEMA_REGISTRY_URL;
|
||||
Configs.TOPIC_SCHEMA_MAP.forEach((topic, schema) -> {
|
||||
System.out.println(String.format("Registering %s using registry %s (size: %d)", topic,
|
||||
url, schema.toString(false).length()));
|
||||
registerSchema(topic, url);
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -7,10 +7,6 @@ public class Topics {
|
||||
public static final String METADATA_GRAPH_EVENT = "MetadataGraphEvent";
|
||||
public static final String METADATA_SEARCH_EVENT = "MetadataSearchEvent";
|
||||
|
||||
public static final String DEV_METADATA_AUDIT_EVENT = "MetadataAuditEvent_dev";
|
||||
public static final String DEV_METADATA_CHANGE_EVENT = "MetadataChangeEvent_dev";
|
||||
public static final String DEV_FAILED_METADATA_CHANGE_EVENT = "FailedMetadataChangeEvent_dev";
|
||||
|
||||
private Topics() {
|
||||
// Util class
|
||||
}
|
||||
|
||||
@ -1,66 +0,0 @@
|
||||
package com.linkedin.mxe;
|
||||
|
||||
import com.linkedin.avro.legacy.LegacyAvroSchema;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityChecker;
|
||||
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
||||
import org.apache.avro.*;
|
||||
import org.testng.Reporter;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
import static com.linkedin.mxe.RegisterSchemas.DEFAULT_SCHEMA_REGISTRY_URL;
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
|
||||
public class SchemaCompatibilityTests {
|
||||
|
||||
private static final String FORWARD_INCOMPATIBILITY_WARNING = "" // format
|
||||
+ "************************************************************************\n" //format
|
||||
+ " New schema is forward incompatible with the current schema.\n" // format
|
||||
+ " Consider bumping up the major version.\n" // format
|
||||
+ "************************************************************************\n";
|
||||
|
||||
private static final AvroCompatibilityChecker AVRO_BACKWARD_COMPATIBILITY_CHECKER =
|
||||
AvroCompatibilityChecker.BACKWARD_CHECKER;
|
||||
|
||||
private static final AvroCompatibilityChecker AVRO_FORWARD_COMPATIBILITY_CHECKER =
|
||||
AvroCompatibilityChecker.FORWARD_CHECKER;
|
||||
|
||||
@Test
|
||||
public void testBackwardCompatibility() {
|
||||
final CachedSchemaRegistryClient client = RegisterSchemas.createClient(DEFAULT_SCHEMA_REGISTRY_URL);
|
||||
|
||||
Configs.TOPIC_SCHEMA_MAP.forEach((topic, schema) -> {
|
||||
Schema olderSchema = findLastRegisteredSchemaMetadata(topic, client);
|
||||
if (olderSchema == null) {
|
||||
Reporter.log("Unable to find registered schema for " + topic + true);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check backward compatibility, i.e. can new schema fits old data
|
||||
assertTrue(AVRO_BACKWARD_COMPATIBILITY_CHECKER.isCompatible(schema, olderSchema),
|
||||
"New schema is backward incompatible with the current schema \n\n");
|
||||
|
||||
// Check forward compatibility, i.e. can new data fits old schema
|
||||
assertTrue(AVRO_FORWARD_COMPATIBILITY_CHECKER.isCompatible(schema, olderSchema),
|
||||
FORWARD_INCOMPATIBILITY_WARNING + "\n\n");
|
||||
});
|
||||
}
|
||||
|
||||
private Schema findLastRegisteredSchemaMetadata(String topic, CachedSchemaRegistryClient client) {
|
||||
try {
|
||||
SchemaMetadata metadata = client.getLatestSchemaMetadata(topic);
|
||||
return client.getBySubjectAndID(topic, metadata.getId());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private SchemaMetadata makeSchemaMetadata(String topic, Schema schema) {
|
||||
LegacyAvroSchema legacyAvroSchema = new LegacyAvroSchema(topic, schema.toString(false));
|
||||
Random rand = new Random();
|
||||
return new SchemaMetadata(rand.nextInt(), rand.nextInt(), legacyAvroSchema.toString());
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user