mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-29 17:59:24 +00:00
Merge branch 'datahub' of https://github.com/linkedin/WhereHows into datahub
This commit is contained in:
commit
cb60ad8d45
@ -1,6 +1,5 @@
|
||||
package com.linkedin.identity.client;
|
||||
|
||||
import com.linkedin.avro2pegasus.events.search.PaginationContext;
|
||||
import com.linkedin.common.urn.CorpuserUrn;
|
||||
import com.linkedin.identity.CorpUser;
|
||||
import com.linkedin.identity.CorpUserEditableInfo;
|
||||
@ -85,15 +84,16 @@ public class CorpUsers extends BaseClient implements SearchableClient<CorpUser>
|
||||
/**
|
||||
* Get all {@link CorpUser} models of the corp users
|
||||
*
|
||||
* @param context pagination param
|
||||
* @param start offset to start
|
||||
* @param count number of max {@link CorpUser}s to return
|
||||
* @return {@link CorpUser} models of the corp user
|
||||
* @throws RemoteInvocationException
|
||||
*/
|
||||
@Nonnull
|
||||
public List<CorpUser> getAll(@Nonnull PaginationContext context)
|
||||
public List<CorpUser> getAll(int start, int count)
|
||||
throws RemoteInvocationException {
|
||||
final GetAllRequest<CorpUser> getAllRequest = CORP_USERS_REQUEST_BUILDERS.getAll()
|
||||
.paginate(context.getStart(), context.getNumToReturn())
|
||||
.paginate(start, count)
|
||||
.build();
|
||||
return _client.sendRequest(getAllRequest).getResponseEntity().getElements();
|
||||
}
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
package com.linkedin.dataset.factory;
|
||||
|
||||
import com.linkedin.metadata.dao.browse.DatasetBrowseConfig;
|
||||
import com.linkedin.metadata.configs.DatasetBrowseConfig;
|
||||
import com.linkedin.metadata.dao.browse.ESBrowseDAO;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.linkedin.metadata.dao.browse;
|
||||
package com.linkedin.metadata.configs;
|
||||
|
||||
import com.linkedin.metadata.dao.browse.BaseBrowseConfig;
|
||||
import com.linkedin.metadata.search.DatasetDocument;
|
||||
|
||||
public class DatasetBrowseConfig extends BaseBrowseConfig<DatasetDocument> {
|
||||
@ -1,35 +0,0 @@
|
||||
{
|
||||
"name": "PaginationContext",
|
||||
"namespace": "com.linkedin.avro2pegasus.events.search",
|
||||
"doc": "Data about pagination",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "numToReturn",
|
||||
"doc": "The number of entities to return after applying the filters described by entityFilter",
|
||||
"optional": true,
|
||||
"type": "int"
|
||||
},
|
||||
{
|
||||
"name": "entityFilter",
|
||||
"doc": "List of URNs of specific entities to be filtered (eg. urn:li:member:1)",
|
||||
"optional": true,
|
||||
"type": {
|
||||
"type": "array",
|
||||
"items": "string"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "start",
|
||||
"doc": "This field has been deprecated in favor of entityFilter and startFilter.",
|
||||
"optional": true,
|
||||
"type": "int"
|
||||
},
|
||||
{
|
||||
"name": "startFilter",
|
||||
"doc": "The number of top scoring results to filter after applying entityFilter.",
|
||||
"optional": true,
|
||||
"type": "int"
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -3,8 +3,6 @@ package com.linkedin.metadata.dao.utils;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
|
||||
@ -28,8 +26,7 @@ public class ESUtils {
|
||||
BoolQueryBuilder boolFilter = new BoolQueryBuilder();
|
||||
for (Map.Entry<String, String> entry : requestMap.entrySet()) {
|
||||
BoolQueryBuilder filters = new BoolQueryBuilder();
|
||||
// TODO: Remove checking for urn after solving META-10102
|
||||
Arrays.stream(Urn.isUrn(entry.getValue()) ? new String[]{entry.getValue()} : entry.getValue().split(","))
|
||||
Arrays.stream(entry.getValue().split(","))
|
||||
.forEach(elem -> filters.should(QueryBuilders.matchQuery(entry.getKey(), elem)));
|
||||
boolFilter.must(filters);
|
||||
}
|
||||
|
||||
@ -45,7 +45,7 @@ public class SearchUtils {
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a requestMap to a filter
|
||||
* Converts a requestMap to a filter
|
||||
*
|
||||
* @param requestMap a map of fields and values
|
||||
* @return the search filter
|
||||
|
||||
@ -256,7 +256,7 @@ public class Neo4jUtil {
|
||||
*/
|
||||
@Nonnull
|
||||
public static RelationshipFilter createRelationshipFilter(@Nonnull Filter filter,
|
||||
@Nonnull RelationshipDirection relationshipDirection) {
|
||||
@Nonnull RelationshipDirection relationshipDirection) {
|
||||
return new RelationshipFilter().setCriteria(filter.getCriteria()).setDirection(relationshipDirection);
|
||||
}
|
||||
|
||||
@ -270,7 +270,7 @@ public class Neo4jUtil {
|
||||
*/
|
||||
@Nonnull
|
||||
public static RelationshipFilter createRelationshipFilter(@Nonnull String field, @Nonnull String value,
|
||||
@Nonnull RelationshipDirection relationshipDirection) {
|
||||
@Nonnull RelationshipDirection relationshipDirection) {
|
||||
return createRelationshipFilter(createFilter(field, value), relationshipDirection);
|
||||
}
|
||||
|
||||
@ -284,10 +284,10 @@ public class Neo4jUtil {
|
||||
@Nonnull
|
||||
public static Filter createFilter(@Nonnull String field, @Nonnull String value) {
|
||||
return new Filter()
|
||||
.setCriteria(
|
||||
new CriterionArray(Collections.singletonList(
|
||||
new Criterion().setField(field).setValue(value).setCondition(Condition.EQUAL)
|
||||
))
|
||||
);
|
||||
.setCriteria(
|
||||
new CriterionArray(Collections.singletonList(
|
||||
new Criterion().setField(field).setValue(value).setCondition(Condition.EQUAL)
|
||||
))
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -1,63 +1 @@
|
||||
apply plugin: 'java'
|
||||
|
||||
configurations {
|
||||
avro17
|
||||
|
||||
validationCompile
|
||||
validationCompile.extendsFrom testCompile
|
||||
|
||||
validationRuntime
|
||||
validationRuntime.extendsFrom testRuntime
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compile externalDependency.kafkaSchemaRegistry
|
||||
|
||||
compileOnly project(':metadata-events:mxe-avro-1.7')
|
||||
|
||||
validationCompile externalDependency.avroMigrationHelper
|
||||
validationCompile externalDependency.testng
|
||||
|
||||
avro17 project(':metadata-events:mxe-avro-1.7')
|
||||
}
|
||||
|
||||
//////////////////////////////////////
|
||||
// Schema compatibility validation
|
||||
//////////////////////////////////////
|
||||
sourceSets {
|
||||
validation {
|
||||
java {
|
||||
compileClasspath += main.output + test.output
|
||||
runtimeClasspath += main.output + test.output
|
||||
srcDir file('src/validation/java')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
idea {
|
||||
module {
|
||||
testSourceDirs += file("src/validation/java")
|
||||
}
|
||||
}
|
||||
|
||||
task validate17(type: Test, dependsOn: configurations.avro17) {
|
||||
testLogging.events 'standard_out'
|
||||
testClassesDirs = sourceSets.validation.output.classesDirs
|
||||
classpath = files(sourceSets.validation.runtimeClasspath, configurations.avro17)
|
||||
|
||||
// Prevent gradle from caching each validation run
|
||||
outputs.upToDateWhen { false }
|
||||
}
|
||||
|
||||
// Make sure validate17 are run as part of gradle build
|
||||
check.dependsOn += [validate17]
|
||||
|
||||
//////////////////////////////////////
|
||||
// Schema registration
|
||||
//////////////////////////////////////
|
||||
def mainClassName = 'com.linkedin.mxe.RegisterSchemas'
|
||||
|
||||
task register17(type: JavaExec, dependsOn: [configurations.compile, configurations.avro17]) {
|
||||
main = mainClassName
|
||||
classpath = files(sourceSets.main.runtimeClasspath, configurations.avro17)
|
||||
}
|
||||
|
||||
@ -1,41 +0,0 @@
|
||||
package com.linkedin.mxe;
|
||||
|
||||
import com.linkedin.pegasus2avro.mxe.FailedMetadataChangeEvent;
|
||||
import com.linkedin.pegasus2avro.mxe.MetadataAuditEvent;
|
||||
import com.linkedin.pegasus2avro.mxe.MetadataChangeEvent;
|
||||
import com.linkedin.pegasus2avro.mxe.MetadataGraphEvent;
|
||||
import com.linkedin.pegasus2avro.mxe.MetadataSearchEvent;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.avro.Schema;
|
||||
|
||||
|
||||
public class Configs {
|
||||
|
||||
public static final Map<String, String> FABRIC_SCHEMA_REGISTRY_MAP =
|
||||
Collections.unmodifiableMap(new HashMap<String, String>() {
|
||||
{
|
||||
put("ei", "http://ei4-schemaregistry-vip-1.int.linkedin.com:10252");
|
||||
put("corp", "http://lca1-schema-registry-vip-1.corp.linkedin.com:10252");
|
||||
}
|
||||
});
|
||||
|
||||
public static final Map<String, Schema> TOPIC_SCHEMA_MAP = Collections.unmodifiableMap(new HashMap<String, Schema>() {
|
||||
{
|
||||
put(Topics.METADATA_AUDIT_EVENT, MetadataAuditEvent.SCHEMA$);
|
||||
put(Topics.METADATA_CHANGE_EVENT, MetadataChangeEvent.SCHEMA$);
|
||||
put(Topics.FAILED_METADATA_CHANGE_EVENT, FailedMetadataChangeEvent.SCHEMA$);
|
||||
put(Topics.METADATA_GRAPH_EVENT, MetadataGraphEvent.SCHEMA$);
|
||||
put(Topics.METADATA_SEARCH_EVENT, MetadataSearchEvent.SCHEMA$);
|
||||
|
||||
put(Topics.DEV_METADATA_AUDIT_EVENT, MetadataAuditEvent.SCHEMA$);
|
||||
put(Topics.DEV_METADATA_CHANGE_EVENT, MetadataChangeEvent.SCHEMA$);
|
||||
put(Topics.DEV_FAILED_METADATA_CHANGE_EVENT, FailedMetadataChangeEvent.SCHEMA$);
|
||||
}
|
||||
});
|
||||
|
||||
private Configs() {
|
||||
// Util class
|
||||
}
|
||||
}
|
||||
@ -1,42 +0,0 @@
|
||||
package com.linkedin.mxe;
|
||||
|
||||
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
||||
import org.apache.avro.Schema;
|
||||
|
||||
|
||||
public class RegisterSchemas {
|
||||
|
||||
public static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://localhost:8081";
|
||||
|
||||
private RegisterSchemas() {
|
||||
}
|
||||
|
||||
static CachedSchemaRegistryClient createClient(String url) {
|
||||
return new CachedSchemaRegistryClient(url, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
private static void registerSchema(String topic, Schema schema, CachedSchemaRegistryClient client) {
|
||||
try {
|
||||
client.register(topic, schema);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void registerSchema(String topic, String schemaRegistryUrl) {
|
||||
CachedSchemaRegistryClient client = createClient(schemaRegistryUrl);
|
||||
Schema schema = Configs.TOPIC_SCHEMA_MAP.get(topic);
|
||||
System.out.println(String.format("Registering %s using registry %s (size: %d)", topic, schemaRegistryUrl,
|
||||
schema.toString(false).length()));
|
||||
registerSchema(topic, schema, client);
|
||||
}
|
||||
|
||||
public static void main(final String[] args) {
|
||||
final String url = args.length == 1 ? args[0] : DEFAULT_SCHEMA_REGISTRY_URL;
|
||||
Configs.TOPIC_SCHEMA_MAP.forEach((topic, schema) -> {
|
||||
System.out.println(String.format("Registering %s using registry %s (size: %d)", topic,
|
||||
url, schema.toString(false).length()));
|
||||
registerSchema(topic, url);
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -7,10 +7,6 @@ public class Topics {
|
||||
public static final String METADATA_GRAPH_EVENT = "MetadataGraphEvent";
|
||||
public static final String METADATA_SEARCH_EVENT = "MetadataSearchEvent";
|
||||
|
||||
public static final String DEV_METADATA_AUDIT_EVENT = "MetadataAuditEvent_dev";
|
||||
public static final String DEV_METADATA_CHANGE_EVENT = "MetadataChangeEvent_dev";
|
||||
public static final String DEV_FAILED_METADATA_CHANGE_EVENT = "FailedMetadataChangeEvent_dev";
|
||||
|
||||
private Topics() {
|
||||
// Util class
|
||||
}
|
||||
|
||||
@ -1,66 +0,0 @@
|
||||
package com.linkedin.mxe;
|
||||
|
||||
import com.linkedin.avro.legacy.LegacyAvroSchema;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityChecker;
|
||||
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
||||
import org.apache.avro.*;
|
||||
import org.testng.Reporter;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
import static com.linkedin.mxe.RegisterSchemas.DEFAULT_SCHEMA_REGISTRY_URL;
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
|
||||
public class SchemaCompatibilityTests {
|
||||
|
||||
private static final String FORWARD_INCOMPATIBILITY_WARNING = "" // format
|
||||
+ "************************************************************************\n" //format
|
||||
+ " New schema is forward incompatible with the current schema.\n" // format
|
||||
+ " Consider bumping up the major version.\n" // format
|
||||
+ "************************************************************************\n";
|
||||
|
||||
private static final AvroCompatibilityChecker AVRO_BACKWARD_COMPATIBILITY_CHECKER =
|
||||
AvroCompatibilityChecker.BACKWARD_CHECKER;
|
||||
|
||||
private static final AvroCompatibilityChecker AVRO_FORWARD_COMPATIBILITY_CHECKER =
|
||||
AvroCompatibilityChecker.FORWARD_CHECKER;
|
||||
|
||||
@Test
|
||||
public void testBackwardCompatibility() {
|
||||
final CachedSchemaRegistryClient client = RegisterSchemas.createClient(DEFAULT_SCHEMA_REGISTRY_URL);
|
||||
|
||||
Configs.TOPIC_SCHEMA_MAP.forEach((topic, schema) -> {
|
||||
Schema olderSchema = findLastRegisteredSchemaMetadata(topic, client);
|
||||
if (olderSchema == null) {
|
||||
Reporter.log("Unable to find registered schema for " + topic + true);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check backward compatibility, i.e. can new schema fits old data
|
||||
assertTrue(AVRO_BACKWARD_COMPATIBILITY_CHECKER.isCompatible(schema, olderSchema),
|
||||
"New schema is backward incompatible with the current schema \n\n");
|
||||
|
||||
// Check forward compatibility, i.e. can new data fits old schema
|
||||
assertTrue(AVRO_FORWARD_COMPATIBILITY_CHECKER.isCompatible(schema, olderSchema),
|
||||
FORWARD_INCOMPATIBILITY_WARNING + "\n\n");
|
||||
});
|
||||
}
|
||||
|
||||
private Schema findLastRegisteredSchemaMetadata(String topic, CachedSchemaRegistryClient client) {
|
||||
try {
|
||||
SchemaMetadata metadata = client.getLatestSchemaMetadata(topic);
|
||||
return client.getBySubjectAndID(topic, metadata.getId());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private SchemaMetadata makeSchemaMetadata(String topic, Schema schema) {
|
||||
LegacyAvroSchema legacyAvroSchema = new LegacyAvroSchema(topic, schema.toString(false));
|
||||
Random rand = new Random();
|
||||
return new SchemaMetadata(rand.nextInt(), rand.nextInt(), legacyAvroSchema.toString());
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user