Remove obsolete kafka code in wherehows-common (#739)

This commit is contained in:
Yi (Alan) Wang 2017-09-11 18:13:31 -07:00 committed by GitHub
parent 1f0ffc71dc
commit f015d66fd2
36 changed files with 0 additions and 3045 deletions

View File

@ -39,7 +39,6 @@ ext.externalDependency = [
"akka" : "com.typesafe.akka:akka-actor_2.10:2.3.15",
"spring_jdbc" : "org.springframework:spring-jdbc:4.1.6.RELEASE",
"kafka" : "org.apache.kafka:kafka_2.10:0.10.2.1",
"kafka_clients" : "org.apache.kafka:kafka-clients:0.10.2.1",
"confluent_avro_serde": "io.confluent:kafka-avro-serializer:3.3.0",

View File

@ -25,14 +25,6 @@ repositories {
name "gradle-plugins"
url 'http://plugins.gradle.org/m2/'
}
/* ivy { // this is required by metadata store Restli client within LinkedIn
url 'http://artifactory.corp.linkedin.com:8081/artifactory/repo'
layout 'pattern', {
ivy '[organisation]/[module]/[revision]/[module]-[revision].ivy'
artifact '[organisation]/[module]/[revision]/[artifact]-[revision](-[classifier]).[ext]'
m2compatible = true
}
} */
}
try {

View File

@ -13,8 +13,6 @@ dependencies {
play externalDependency.play_java_jdbc
play externalDependency.slf4j_api
play externalDependency.jasypt
play externalDependency.kafka
play externalDependency.kafka_clients
playTest 'com.github.stefanbirkner:system-rules:1.16.0'
playTest 'org.easytesting:fest-assert-core:2.0M10'

View File

@ -7,10 +7,6 @@ dependencies {
compile externalDependency.jgit
compile externalDependency.jsoup
compile externalDependency.commons_io
compile externalDependency.avro
compile externalDependency.kafka
compile externalDependency.kafka_clients
compile externalDependency.confluent_avro_serde
compile externalDependency.jackson_core
compile externalDependency.jackson_databind
compile externalDependency.guava

View File

@ -1,74 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka;
import java.nio.ByteBuffer;
import javax.xml.bind.DatatypeConverter;
public class SchemaId {
/**
* Kafka Avro Schema ID type and size
*/
public enum Type {
INT(4), LONG(8), UUID(16);
private int size;
Type(int size) {
this.size = size;
}
public int size() {
return size;
}
}
/**
* get Schema ID from ByteBuffer by the type, move ByteBuffer position by size
* @param byteBuffer
* @return
*/
public static String getIdString(Type type, ByteBuffer byteBuffer) {
if (type == Type.INT) {
return Integer.toString(byteBuffer.getInt());
} else if (type == Type.LONG) {
return Long.toString(byteBuffer.getLong());
} else if (type == Type.UUID) {
byte[] bytes = new byte[Type.UUID.size];
byteBuffer.get(bytes);
return DatatypeConverter.printHexBinary(bytes).toLowerCase();
} else {
return null;
}
}
/**
* convert String id into ByteBuffer
* @param type
* @param id String
* @return byte[]
*/
public static byte[] getIdBytes(Type type, String id) {
if (type == Type.INT) {
return ByteBuffer.allocate(Type.INT.size).putInt(Integer.parseInt(id)).array();
} else if (type == Type.LONG) {
return ByteBuffer.allocate(Type.LONG.size).putLong(Long.parseLong(id)).array();
} else if (type == Type.UUID) {
return DatatypeConverter.parseHexBinary(id);
} else {
return new byte[0];
}
}
}

View File

@ -1,137 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.formatter;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Properties;
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.kafka.serializers.AbstractKafkaAvroDeserializer;
import wherehows.common.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import kafka.common.MessageFormatter;
/**
* Example
* To use AvroMessageFormatter, first make sure that Zookeeper, Kafka and schema registry server are
* all started. Second, make sure the jar for AvroMessageFormatter and its dependencies are included
* in the classpath of kafka-console-consumer.sh. Then run the following command.
*
* 1. To read only the value of the messages in JSON
* bin/kafka-console-consumer.sh --consumer.config config/consumer.properties --topic t1 \
* --zookeeper localhost:2181 --formatter io.confluent.kafka.formatter.AvroMessageFormatter \
* --property schema.registry.url=http://localhost:8081
*
* 2. To read both the key and the value of the messages in JSON
* bin/kafka-console-consumer.sh --consumer.config config/consumer.properties --topic t1 \
* --zookeeper localhost:2181 --formatter io.confluent.kafka.formatter.AvroMessageFormatter \
* --property schema.registry.url=http://localhost:8081 \
* --property print.key=true
*
*/
public class AvroMessageFormatter extends AbstractKafkaAvroDeserializer
implements MessageFormatter {
private final EncoderFactory encoderFactory = EncoderFactory.get();
private boolean printKey = false;
private byte[] keySeparator = "\t".getBytes();
private byte[] lineSeparator = "\n".getBytes();
/**
* Constructor needed by kafka console consumer.
*/
public AvroMessageFormatter() {
}
/**
* For testing only.
*/
AvroMessageFormatter(SchemaRegistryClient schemaRegistryClient, boolean printKey) {
this.schemaRegistry = schemaRegistryClient;
this.printKey = printKey;
}
@Override
public void init(Properties props) {
if (props == null) {
throw new ConfigException("Missing schema registry url!");
}
String url = props.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
if (url == null) {
throw new ConfigException("Missing schema registry url!");
}
schemaRegistry = new CachedSchemaRegistryClient(
url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
if (props.containsKey("print.key")) {
printKey = props.getProperty("print.key").trim().toLowerCase().equals("true");
}
if (props.containsKey("key.separator")) {
keySeparator = props.getProperty("key.separator").getBytes();
}
if (props.containsKey("line.separator")) {
lineSeparator = props.getProperty("line.separator").getBytes();
}
}
@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
if (printKey) {
try {
writeTo(consumerRecord.key(), output);
output.write(keySeparator);
} catch (IOException ioe) {
throw new SerializationException("Error while formatting the key", ioe);
}
}
try {
writeTo(consumerRecord.value(), output);
output.write(lineSeparator);
} catch (IOException ioe) {
throw new SerializationException("Error while formatting the value", ioe);
}
}
private void writeTo(byte[] data, PrintStream output) throws IOException {
Object object = deserialize(data);
Schema schema = getSchema(object);
try {
JsonEncoder encoder = encoderFactory.jsonEncoder(schema, output);
DatumWriter<Object> writer = new GenericDatumWriter<Object>(schema);
writer.write(object, encoder);
encoder.flush();
} catch (AvroRuntimeException e) {
throw new SerializationException(
String.format("Error serializing Avro data of schema %s to json", schema), e);
}
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -1,207 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.formatter;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.Utf8;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import wherehows.common.kafka.serializers.AbstractKafkaAvroSerializer;
import kafka.common.KafkaException;
import kafka.producer.KeyedMessage;
import kafka.common.MessageReader;
/**
* Example
* To use AvroMessageReader, first make sure that Zookeeper, Kafka and schema registry server are
* all started. Second, make sure the jar for AvroMessageReader and its dependencies are included
* in the classpath of kafka-console-producer.sh. Then run the following
* command.
*
* 1. Send Avro string as value. (make sure there is no space in the schema string)
* bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1 \
* --line-reader io.confluent.kafka.formatter.AvroMessageReader \
* --property schema.registry.url=http://localhost:8081 \
* --property value.schema='{"type":"string"}'
*
* In the shell, type in the following.
* "a"
* "b"
*
* 2. Send Avro record as value.
* bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1 \
* --line-reader io.confluent.kafka.formatter.AvroMessageReader \
* --property schema.registry.url=http://localhost:8081 \
* --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
*
* In the shell, type in the following.
* {"f1": "value1"}
*
* 3. Send Avro string as key and Avro record as value.
* bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1 \
* --line-reader io.confluent.kafka.formatter.AvroMessageReader \
* --property schema.registry.url=http://localhost:8081 \
* --property parse.key=true \
* --property key.schema='{"type":"string"}' \
* --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
*
* In the shell, type in the following.
* "key1" \t {"f1": "value1"}
*
*/
public class AvroMessageReader extends AbstractKafkaAvroSerializer implements MessageReader {
private String topic = null;
private BufferedReader reader = null;
private Boolean parseKey = false;
private String keySeparator = "\t";
private boolean ignoreError = false;
private final DecoderFactory decoderFactory = DecoderFactory.get();
private Schema keySchema = null;
private Schema valueSchema = null;
private String keySubject = null;
private String valueSubject = null;
/**
* Constructor needed by kafka console producer.
*/
public AvroMessageReader() {
}
/**
* For testing only.
*/
AvroMessageReader(SchemaRegistryClient schemaRegistryClient, Schema keySchema, Schema valueSchema,
String topic, boolean parseKey, BufferedReader reader) {
this.schemaRegistry = schemaRegistryClient;
this.keySchema = keySchema;
this.valueSchema = valueSchema;
this.topic = topic;
this.keySubject = topic + "-key";
this.valueSubject = topic + "-value";
this.parseKey = parseKey;
this.reader = reader;
}
@Override
public void init(java.io.InputStream inputStream, java.util.Properties props) {
topic = props.getProperty("topic");
if (props.containsKey("parse.key")) {
parseKey = props.getProperty("parse.key").trim().toLowerCase().equals("true");
}
if (props.containsKey("key.separator")) {
keySeparator = props.getProperty("key.separator");
}
if (props.containsKey("ignore.error")) {
ignoreError = props.getProperty("ignore.error").trim().toLowerCase().equals("true");
}
reader = new BufferedReader(new InputStreamReader(inputStream));
String url = props.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
if (url == null) {
throw new ConfigException("Missing schema registry url!");
}
schemaRegistry = new CachedSchemaRegistryClient(
url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
if (!props.containsKey("value.schema")) {
throw new ConfigException("Must provide the Avro schema string in value.schema");
}
String valueSchemaString = props.getProperty("value.schema");
Schema.Parser parser = new Schema.Parser();
valueSchema = parser.parse(valueSchemaString);
if (parseKey) {
if (!props.containsKey("key.schema")) {
throw new ConfigException("Must provide the Avro schema string in key.schema");
}
String keySchemaString = props.getProperty("key.schema");
keySchema = parser.parse(keySchemaString);
}
keySubject = topic + "-key";
valueSubject = topic + "-value";
}
@Override
public ProducerRecord<byte[], byte[]> readMessage() {
try {
String line = reader.readLine();
if (line == null) {
return null;
}
if (!parseKey) {
Object value = jsonToAvro(line, valueSchema);
byte[] serializedValue = serializeImpl(valueSubject, value);
return new ProducerRecord<>(topic, serializedValue);
} else {
int keyIndex = line.indexOf(keySeparator);
if (keyIndex < 0) {
if (ignoreError) {
Object value = jsonToAvro(line, valueSchema);
byte[] serializedValue = serializeImpl(valueSubject, value);
return new ProducerRecord<>(topic, serializedValue);
} else {
throw new KafkaException("No key found in line " + line);
}
} else {
String keyString = line.substring(0, keyIndex);
String valueString = (keyIndex + keySeparator.length() > line.length()) ?
"" : line.substring(keyIndex + keySeparator.length());
Object key = jsonToAvro(keyString, keySchema);
byte[] serializedKey = serializeImpl(keySubject, key);
Object value = jsonToAvro(valueString, valueSchema);
byte[] serializedValue = serializeImpl(valueSubject, value);
return new ProducerRecord<>(topic, serializedKey, serializedValue);
}
}
} catch (IOException e) {
throw new KafkaException("Error reading from input", e);
}
}
private Object jsonToAvro(String jsonString, Schema schema) {
try {
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
Object object = reader.read(null, decoderFactory.jsonDecoder(schema, jsonString));
if (schema.getType().equals(Schema.Type.STRING)) {
object = ((Utf8) object).toString();
}
return object;
} catch (IOException e) {
throw new SerializationException(
String.format("Error deserializing json %s to Avro of schema %s", jsonString, schema), e);
} catch (AvroRuntimeException e) {
throw new SerializationException(
String.format("Error deserializing json %s to Avro of schema %s", jsonString, schema), e);
}
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -1,71 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.avro;
import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import java.util.ArrayList;
import java.util.List;
public class AvroCompatibilityChecker {
// Check if the new schema can be used to read data produced by the latest schema
private static SchemaValidator BACKWARD_VALIDATOR =
new SchemaValidatorBuilder().canReadStrategy().validateLatest();
public static AvroCompatibilityChecker BACKWARD_CHECKER = new AvroCompatibilityChecker(
BACKWARD_VALIDATOR);
// Check if data produced by the new schema can be read by the latest schema
private static SchemaValidator FORWARD_VALIDATOR =
new SchemaValidatorBuilder().canBeReadStrategy().validateLatest();
public static AvroCompatibilityChecker FORWARD_CHECKER = new AvroCompatibilityChecker(
FORWARD_VALIDATOR);
// Check if the new schema is both forward and backward compatible with the latest schema
private static SchemaValidator FULL_VALIDATOR =
new SchemaValidatorBuilder().mutualReadStrategy().validateLatest();
public static AvroCompatibilityChecker FULL_CHECKER = new AvroCompatibilityChecker(
FULL_VALIDATOR);
private static SchemaValidator NO_OP_VALIDATOR = new SchemaValidator() {
@Override
public void validate(Schema schema, Iterable<Schema> schemas) throws SchemaValidationException {
// do nothing
}
};
public static AvroCompatibilityChecker NO_OP_CHECKER = new AvroCompatibilityChecker(
NO_OP_VALIDATOR);
private final SchemaValidator validator;
private AvroCompatibilityChecker(SchemaValidator validator) {
this.validator = validator;
}
/**
* Check the compatibility between the new schema and the latest schema
*/
public boolean isCompatible(Schema newSchema, Schema latestSchema) {
List<Schema> schemas = new ArrayList<Schema>();
schemas.add(latestSchema);
try {
validator.validate(newSchema, schemas);
} catch (SchemaValidationException e) {
return false;
}
return true;
}
}

View File

@ -1,49 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.avro;
public enum AvroCompatibilityLevel {
NONE("NONE", AvroCompatibilityChecker.NO_OP_CHECKER),
BACKWARD("BACKWARD", AvroCompatibilityChecker.BACKWARD_CHECKER),
FORWARD("FORWARD", AvroCompatibilityChecker.FORWARD_CHECKER),
FULL("FULL", AvroCompatibilityChecker.FULL_CHECKER);
public final String name;
public final AvroCompatibilityChecker compatibilityChecker;
private AvroCompatibilityLevel(String name, AvroCompatibilityChecker compatibilityChecker) {
this.name = name;
this.compatibilityChecker = compatibilityChecker;
}
public static AvroCompatibilityLevel forName(String name) {
if (name == null) {
return null;
}
name = name.toUpperCase();
if (NONE.name.equals(name)) {
return NONE;
} else if (BACKWARD.name.equals(name)) {
return BACKWARD;
} else if (FORWARD.name.equals(name)) {
return FORWARD;
} else if (FULL.name.equals(name)) {
return FULL;
} else {
return null;
}
}
}

View File

@ -1,185 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client;
import wherehows.common.kafka.schemaregistry.client.rest.RestService;
import wherehows.common.kafka.schemaregistry.client.rest.entities.Config;
import wherehows.common.kafka.schemaregistry.client.rest.entities.SchemaString;
import wherehows.common.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
public class CachedSchemaRegistryClient implements SchemaRegistryClient {
private final RestService restService;
private final int identityMapCapacity;
private final Map<String, Map<Schema, String>> schemaCache;
private final Map<String, Map<String, Schema>> idCache;
private final Map<String, Map<Schema, Integer>> versionCache;
private static final int defaultIdentityMapCapacity = 1000;
public CachedSchemaRegistryClient(String baseUrl) {
this(baseUrl, defaultIdentityMapCapacity);
}
public CachedSchemaRegistryClient(String baseUrl, int identityMapCapacity) {
this(new RestService(baseUrl), identityMapCapacity);
}
public CachedSchemaRegistryClient(List<String> baseUrls, int identityMapCapacity) {
this(new RestService(baseUrls), identityMapCapacity);
}
public CachedSchemaRegistryClient(RestService restService, int identityMapCapacity) {
this.identityMapCapacity = identityMapCapacity;
this.schemaCache = new HashMap<String, Map<Schema, String>>();
this.idCache = new HashMap<String, Map<String, Schema>>();
this.versionCache = new HashMap<String, Map<Schema, Integer>>();
this.restService = restService;
this.idCache.put(null, new HashMap<String, Schema>());
}
private String registerAndGetId(String subject, Schema schema)
throws IOException, RestClientException {
return restService.registerSchema(schema.toString(), subject);
}
private Schema getSchemaByIdFromRegistry(String id) throws IOException, RestClientException {
SchemaString restSchema = restService.getId(id);
return new Schema.Parser().parse(restSchema.getSchemaString());
}
private int getVersionFromRegistry(String subject, Schema schema)
throws IOException, RestClientException{
wherehows.common.kafka.schemaregistry.client.rest.entities.Schema response =
restService.lookUpSubjectVersion(schema.toString(), subject);
return response.getVersion();
}
@Override
public synchronized String register(String subject, Schema schema)
throws IOException, RestClientException {
Map<Schema, String> schemaIdMap;
if (schemaCache.containsKey(subject)) {
schemaIdMap = schemaCache.get(subject);
} else {
schemaIdMap = new IdentityHashMap<Schema, String>();
schemaCache.put(subject, schemaIdMap);
}
if (schemaIdMap.containsKey(schema)) {
return schemaIdMap.get(schema);
} else {
if (schemaIdMap.size() >= identityMapCapacity) {
throw new IllegalStateException("Too many schema objects created for " + subject + "!");
}
String id = registerAndGetId(subject, schema);
schemaIdMap.put(schema, id);
idCache.get(null).put(id, schema);
return id;
}
}
@Override
public synchronized Schema getByID(String id) throws IOException, RestClientException {
return getBySubjectAndID(null, id);
}
@Override
public synchronized Schema getBySubjectAndID(String subject, String id)
throws IOException, RestClientException {
Map<String, Schema> idSchemaMap;
if (idCache.containsKey(subject)) {
idSchemaMap = idCache.get(subject);
} else {
idSchemaMap = new HashMap<String, Schema>();
idCache.put(subject, idSchemaMap);
}
if (idSchemaMap.containsKey(id)) {
return idSchemaMap.get(id);
} else {
Schema schema = getSchemaByIdFromRegistry(id);
idSchemaMap.put(id, schema);
return schema;
}
}
@Override
public synchronized SchemaMetadata getLatestSchemaMetadata(String subject)
throws IOException, RestClientException {
wherehows.common.kafka.schemaregistry.client.rest.entities.Schema response
= restService.getLatestVersion(subject);
String id = response.getId();
int version = response.getVersion();
String schema = response.getSchema();
return new SchemaMetadata(id, version, schema);
}
@Override
public synchronized int getVersion(String subject, Schema schema)
throws IOException, RestClientException{
Map<Schema, Integer> schemaVersionMap;
if (versionCache.containsKey(subject)) {
schemaVersionMap = versionCache.get(subject);
} else {
schemaVersionMap = new IdentityHashMap<Schema, Integer>();
versionCache.put(subject, schemaVersionMap);
}
if (schemaVersionMap.containsKey(schema)) {
return schemaVersionMap.get(schema);
} else {
if (schemaVersionMap.size() >= identityMapCapacity) {
throw new IllegalStateException("Too many schema objects created for " + subject + "!");
}
int version = getVersionFromRegistry(subject, schema);
schemaVersionMap.put(schema, version);
return version;
}
}
@Override
public boolean testCompatibility(String subject, Schema schema) throws IOException, RestClientException {
return restService.testCompatibility(schema.toString(), subject, "latest");
}
@Override
public String updateCompatibility(String subject, String compatibility) throws IOException, RestClientException {
ConfigUpdateRequest response = restService.updateCompatibility(compatibility, subject);
return response.getCompatibilityLevel();
}
@Override
public String getCompatibility(String subject) throws IOException, RestClientException {
Config response = restService.getConfig(subject);
return response.getCompatibilityLevel();
}
@Override
public Collection<String> getAllSubjects() throws IOException, RestClientException {
return restService.getAllSubjects();
}
}

View File

@ -1,240 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client;
import org.apache.avro.Schema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import wherehows.common.kafka.schemaregistry.avro.AvroCompatibilityLevel;
import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException;
/**
* Mock implementation of SchemaRegistryClient that can be used for tests. This version is NOT
* thread safe. Schema data is stored in memory and is not persistent or shared across instances.
*/
public class MockSchemaRegistryClient implements SchemaRegistryClient {
private String defaultCompatibility = "BACKWARD";
private final Map<String, Map<Schema, String>> schemaCache;
private final Map<String, Map<String, Schema>> idCache;
private final Map<String, Map<Schema, Integer>> versionCache;
private final Map<String, String> compatibilityCache;
private final AtomicInteger ids;
public MockSchemaRegistryClient() {
schemaCache = new HashMap<String, Map<Schema, String>>();
idCache = new HashMap<String, Map<String, Schema>>();
versionCache = new HashMap<String, Map<Schema, Integer>>();
compatibilityCache = new HashMap<String, String>();
ids = new AtomicInteger(0);
idCache.put(null, new HashMap<String, Schema>());
}
private String getIdFromRegistry(String subject, Schema schema) throws IOException {
Map<String, Schema> idSchemaMap;
if (idCache.containsKey(subject)) {
idSchemaMap = idCache.get(subject);
for (Map.Entry<String, Schema> entry: idSchemaMap.entrySet()) {
if (entry.getValue().toString().equals(schema.toString())) {
generateVersion(subject, schema);
return entry.getKey();
}
}
} else {
idSchemaMap = new HashMap<String, Schema>();
}
String id = Integer.toString(ids.incrementAndGet());
idSchemaMap.put(id, schema);
idCache.put(subject, idSchemaMap);
generateVersion(subject, schema);
return id;
}
private void generateVersion(String subject, Schema schema) {
ArrayList<Integer> versions = getAllVersions(subject);
Map<Schema, Integer> schemaVersionMap;
int currentVersion;
if (versions.isEmpty()) {
schemaVersionMap = new IdentityHashMap<Schema, Integer>();
currentVersion = 1;
} else {
schemaVersionMap = versionCache.get(subject);
currentVersion = versions.get(versions.size() - 1) + 1;
}
schemaVersionMap.put(schema, currentVersion);
versionCache.put(subject, schemaVersionMap);
}
private ArrayList<Integer> getAllVersions(String subject) {
ArrayList<Integer> versions = new ArrayList<Integer>();
if (versionCache.containsKey(subject)) {
versions.addAll(versionCache.get(subject).values());
Collections.sort(versions);
}
return versions;
}
private Schema getSchemaBySubjectAndIdFromRegistry(String subject, String id) throws IOException {
if (idCache.containsKey(subject)) {
Map<String, Schema> idSchemaMap = idCache.get(subject);
if (idSchemaMap.containsKey(id)) {
return idSchemaMap.get(id);
}
}
throw new IOException("Cannot get schema from schema registry!");
}
@Override
public synchronized String register(String subject, Schema schema)
throws IOException, RestClientException {
Map<Schema, String> schemaIdMap;
if (schemaCache.containsKey(subject)) {
schemaIdMap = schemaCache.get(subject);
} else {
schemaIdMap = new IdentityHashMap<Schema, String>();
schemaCache.put(subject, schemaIdMap);
}
if (schemaIdMap.containsKey(schema)) {
return schemaIdMap.get(schema);
} else {
String id = getIdFromRegistry(subject, schema);
schemaIdMap.put(schema, id);
idCache.get(null).put(id, schema);
return id;
}
}
@Override
public synchronized Schema getByID(String id) throws IOException, RestClientException {
return getBySubjectAndID(null, id);
}
@Override
public synchronized Schema getBySubjectAndID(String subject, String id)
throws IOException, RestClientException {
Map<String, Schema> idSchemaMap;
if (idCache.containsKey(subject)) {
idSchemaMap = idCache.get(subject);
} else {
idSchemaMap = new HashMap<String, Schema>();
idCache.put(subject, idSchemaMap);
}
if (idSchemaMap.containsKey(id)) {
return idSchemaMap.get(id);
} else {
Schema schema = getSchemaBySubjectAndIdFromRegistry(subject, id);
idSchemaMap.put(id, schema);
return schema;
}
}
private int getLatestVersion(String subject)
throws IOException, RestClientException {
ArrayList<Integer> versions = getAllVersions(subject);
if (versions.isEmpty()) {
throw new IOException("No schema registered under subject!");
} else {
return versions.get(versions.size() - 1);
}
}
@Override
public synchronized SchemaMetadata getLatestSchemaMetadata(String subject)
throws IOException, RestClientException {
int version = getLatestVersion(subject);
String schemaString = null;
Map<Schema, Integer> schemaVersionMap = versionCache.get(subject);
for (Map.Entry<Schema, Integer> entry: schemaVersionMap.entrySet()) {
if (entry.getValue() == version) {
schemaString = entry.getKey().toString();
}
}
String id = "-1";
Map<String, Schema> idSchemaMap = idCache.get(subject);
for (Map.Entry<String, Schema> entry: idSchemaMap.entrySet()) {
if (entry.getValue().toString().equals(schemaString)) {
id = entry.getKey();
}
}
return new SchemaMetadata(id, version, schemaString);
}
@Override
public synchronized int getVersion(String subject, Schema schema)
throws IOException, RestClientException{
if (versionCache.containsKey(subject)) {
return versionCache.get(subject).get(schema);
} else {
throw new IOException("Cannot get version from schema registry!");
}
}
@Override
public boolean testCompatibility(String subject, Schema newSchema) throws IOException,
RestClientException {
SchemaMetadata latestSchemaMetadata = getLatestSchemaMetadata(subject);
Schema latestSchema = getSchemaBySubjectAndIdFromRegistry(subject, latestSchemaMetadata.getId());
String compatibility = compatibilityCache.get(subject);
if (compatibility == null) {
compatibility = defaultCompatibility;
}
AvroCompatibilityLevel compatibilityLevel = AvroCompatibilityLevel.forName(compatibility);
if (compatibilityLevel == null) {
return false;
}
return compatibilityLevel.compatibilityChecker.isCompatible(newSchema, latestSchema);
}
@Override
public String updateCompatibility(String subject, String compatibility) throws IOException,
RestClientException {
if (subject == null) {
defaultCompatibility = compatibility;
return compatibility;
}
compatibilityCache.put(subject, compatibility);
return compatibility;
}
@Override
public String getCompatibility(String subject) throws IOException, RestClientException {
String compatibility = compatibilityCache.get(subject);
if (compatibility == null) {
compatibility = defaultCompatibility;
}
return compatibility;
}
@Override
public Collection<String> getAllSubjects() throws IOException, RestClientException {
List<String> results = new ArrayList<>();
results.addAll(this.schemaCache.keySet());
Collections.sort(results, String.CASE_INSENSITIVE_ORDER);
return results;
}
}

View File

@ -1,40 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client;
public class SchemaMetadata {
private String id;
private int version;
private String schema;
public SchemaMetadata(String id, int version, String schema) {
this.id = id;
this.version = version;
this.schema = schema;
}
public String getId() {
return id;
}
public int getVersion() {
return version;
}
public String getSchema() {
return schema;
}
}

View File

@ -1,43 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client;
import org.apache.avro.Schema;
import java.io.IOException;
import java.util.Collection;
import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException;
public interface SchemaRegistryClient {
public String register(String subject, Schema schema) throws IOException, RestClientException;
public Schema getByID(String id) throws IOException, RestClientException;
public Schema getBySubjectAndID(String subject, String id) throws IOException, RestClientException;
public SchemaMetadata getLatestSchemaMetadata(String subject) throws IOException, RestClientException;
public int getVersion(String subject, Schema schema) throws IOException, RestClientException;
public boolean testCompatibility(String subject, Schema schema) throws IOException, RestClientException;
public String updateCompatibility(String subject, String compatibility) throws IOException, RestClientException;
public String getCompatibility(String subject) throws IOException, RestClientException;
public Collection<String> getAllSubjects() throws IOException, RestClientException;
}

View File

@ -1,419 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import wherehows.common.kafka.schemaregistry.client.rest.entities.Config;
import wherehows.common.kafka.schemaregistry.client.rest.entities.ErrorMessage;
import wherehows.common.kafka.schemaregistry.client.rest.entities.Schema;
import wherehows.common.kafka.schemaregistry.client.rest.entities.SchemaString;
import wherehows.common.kafka.schemaregistry.client.rest.entities.requests.CompatibilityCheckResponse;
import wherehows.common.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import wherehows.common.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import wherehows.common.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import wherehows.common.kafka.schemaregistry.client.rest.utils.UrlList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Rest access layer for sending requests to the schema registry.
*/
public class RestService {
private static final Logger log = LoggerFactory.getLogger(RestService.class);
private final static String PLAIN_STRING = "PLAIN_STRING";
private final static String TYPED_RESPONSE = "TYPED_RESPONSE";
private final static TypeReference<String> PLAIN_STRING_RESPONSE_TYPE = null;
private final static TypeReference<RegisterSchemaResponse> REGISTER_RESPONSE_TYPE =
new TypeReference<RegisterSchemaResponse>() {
};
private final static TypeReference<Config> GET_CONFIG_RESPONSE_TYPE =
new TypeReference<Config>() {
};
private final static TypeReference<SchemaString> GET_SCHEMA_BY_ID_RESPONSE_TYPE =
new TypeReference<SchemaString>() {
};
private final static TypeReference<Schema> GET_SCHEMA_BY_VERSION_RESPONSE_TYPE =
new TypeReference<Schema>() {
};
private final static TypeReference<List<Integer>> ALL_VERSIONS_RESPONSE_TYPE =
new TypeReference<List<Integer>>() {
};
private final static TypeReference<List<String>> ALL_TOPICS_RESPONSE_TYPE =
new TypeReference<List<String>>() {
};
private final static TypeReference<CompatibilityCheckResponse>
COMPATIBILITY_CHECK_RESPONSE_TYPE_REFERENCE =
new TypeReference<CompatibilityCheckResponse>() {
};
private final static TypeReference<Schema>
SUBJECT_SCHEMA_VERSION_RESPONSE_TYPE_REFERENCE =
new TypeReference<Schema>() {
};
private final static TypeReference<ConfigUpdateRequest>
UPDATE_CONFIG_RESPONSE_TYPE_REFERENCE =
new TypeReference<ConfigUpdateRequest>() {
};
private String SUBJECT_PREFIX_FORMAT = "/subjects/%s";
private String SUBJECT_ALL_PREFIX_FORMAT = "/subjects";
private String REGISTER_SCHEMA_PREFIX_FORMAT = "/subjects/%s/versions";
private String COMPATIBILITY_TEST_PREFIX_FORMAT = "/compatibility/subjects/%s/versions/%s";
private String CONFIG_PREFIX_FORMAT = "/config/%s";
private String CONFIG_ALL_PREFIX_FORMAT = "/config";
private String SCHEMA_ID_PREFIX_FORMAT = "/schemas/id=%s"; // "/schemas/ids/%s"
private String VERSION_PREFIX_FORMAT = "/subjects/%s/versions/%d";
private String VERSION_LATEST_PREFIX_FORMAT = "/subjects/%s/versions/latest";
private String VERSION_ALL_PREFIX_FORMAT = "/subjects/%s/versions";
private static final int JSON_PARSE_ERROR_CODE = 50005;
private static ObjectMapper jsonDeserializer = new ObjectMapper();
public static final Map<String, String> DEFAULT_REQUEST_PROPERTIES;
static {
DEFAULT_REQUEST_PROPERTIES = new HashMap<String, String>();
DEFAULT_REQUEST_PROPERTIES.put("Content-Type", Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED);
DEFAULT_REQUEST_PROPERTIES.put("Response-Type", PLAIN_STRING);
}
private UrlList baseUrls;
public RestService(UrlList baseUrls) {
this.baseUrls = baseUrls;
}
public RestService(List<String> baseUrls) {
this(new UrlList(baseUrls));
}
public RestService(String baseUrlConfig) {
this(parseBaseUrl(baseUrlConfig));
}
/**
* @param baseUrl HTTP connection will be established with this url.
* @param method HTTP method ("GET", "POST", "PUT", etc.)
* @param requestBodyData Bytes to be sent in the request body.
* @param requestProperties HTTP header properties.
* @param responseFormat Expected format of the response to the HTTP request.
* @param <T> The type of the deserialized response to the HTTP request.
* @return The deserialized response to the HTTP request, or null if no data is expected.
*/
private <T> T sendHttpRequest(String baseUrl, String method, byte[] requestBodyData,
Map<String, String> requestProperties,
TypeReference<T> responseFormat)
throws IOException, RestClientException {
log.debug(String.format("Sending %s with input %s to %s",
method, requestBodyData == null ? "null" : new String(requestBodyData),
baseUrl));
HttpURLConnection connection = null;
try {
URL url = new URL(baseUrl);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod(method);
// connection.getResponseCode() implicitly calls getInputStream, so always set to true.
// On the other hand, leaving this out breaks nothing.
connection.setDoInput(true);
for (Map.Entry<String, String> entry : requestProperties.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
}
connection.setUseCaches(false);
if (requestBodyData != null) {
connection.setDoOutput(true);
OutputStream os = null;
try {
os = connection.getOutputStream();
os.write(requestBodyData);
os.flush();
} catch (IOException e) {
log.error("Failed to send HTTP request to endpoint: " + url, e);
throw e;
} finally {
if (os != null) os.close();
}
}
int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
InputStream is = connection.getInputStream();
T result = responseFormat != null ? jsonDeserializer.readValue(is, responseFormat)
: (T) IOUtils.toString(is);
is.close();
return result;
} else if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
return null;
} else {
InputStream es = connection.getErrorStream();
ErrorMessage errorMessage;
try {
errorMessage = jsonDeserializer.readValue(es, ErrorMessage.class);
} catch (JsonProcessingException e) {
errorMessage = new ErrorMessage(JSON_PARSE_ERROR_CODE, e.getMessage());
}
es.close();
throw new RestClientException(errorMessage.getMessage(), responseCode,
errorMessage.getErrorCode());
}
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
private <T> T httpRequest(String path, String method,
byte[] requestBodyData, Map<String, String> requestProperties,
TypeReference<T> responseFormat) throws IOException, RestClientException {
for (int i = 0, n = baseUrls.size(); i < n; i++) {
String baseUrl = baseUrls.current();
try {
return sendHttpRequest(baseUrl + path, method, requestBodyData, requestProperties, responseFormat);
} catch (IOException e) {
baseUrls.fail(baseUrl);
if (i == n - 1) throw e; // Raise the exception since we have no more urls to try
}
}
throw new IOException("Internal HTTP retry error"); // Can't get here
}
public Schema lookUpSubjectVersion(String schemaString, String subject)
throws IOException, RestClientException {
RegisterSchemaRequest request = new RegisterSchemaRequest();
request.setSchema(schemaString);
return lookUpSubjectVersion(request, subject);
}
public Schema lookUpSubjectVersion(RegisterSchemaRequest registerSchemaRequest,
String subject)
throws IOException, RestClientException {
return lookUpSubjectVersion(DEFAULT_REQUEST_PROPERTIES, registerSchemaRequest, subject);
}
public Schema lookUpSubjectVersion(Map<String, String> requestProperties,
RegisterSchemaRequest registerSchemaRequest,
String subject)
throws IOException, RestClientException {
String path = String.format(SUBJECT_PREFIX_FORMAT, subject);
Schema schema = httpRequest(path, "POST", registerSchemaRequest.toJson().getBytes(),
requestProperties, SUBJECT_SCHEMA_VERSION_RESPONSE_TYPE_REFERENCE);
return schema;
}
public String registerSchema(String schemaString, String subject)
throws IOException, RestClientException {
RegisterSchemaRequest request = new RegisterSchemaRequest();
request.setSchema(schemaString);
return registerSchema(request, subject);
}
public String registerSchema(RegisterSchemaRequest registerSchemaRequest, String subject)
throws IOException, RestClientException {
return registerSchema(DEFAULT_REQUEST_PROPERTIES, registerSchemaRequest, subject);
}
public String registerSchema(Map<String, String> requestProperties,
RegisterSchemaRequest registerSchemaRequest, String subject)
throws IOException, RestClientException {
String path = String.format(REGISTER_SCHEMA_PREFIX_FORMAT, subject);
RegisterSchemaResponse response = httpRequest(path, "POST",
registerSchemaRequest.toJson().getBytes(), requestProperties, REGISTER_RESPONSE_TYPE);
return response.getId();
}
public boolean testCompatibility(String schemaString, String subject, String version)
throws IOException, RestClientException {
RegisterSchemaRequest request = new RegisterSchemaRequest();
request.setSchema(schemaString);
return testCompatibility(request, subject, version);
}
public boolean testCompatibility(RegisterSchemaRequest registerSchemaRequest,
String subject,
String version)
throws IOException, RestClientException {
return testCompatibility(DEFAULT_REQUEST_PROPERTIES, registerSchemaRequest,
subject, version);
}
public boolean testCompatibility(Map<String, String> requestProperties,
RegisterSchemaRequest registerSchemaRequest,
String subject,
String version)
throws IOException, RestClientException {
String path = String.format(COMPATIBILITY_TEST_PREFIX_FORMAT, subject, version);
CompatibilityCheckResponse response =
httpRequest(path, "POST", registerSchemaRequest.toJson().getBytes(),
requestProperties, COMPATIBILITY_CHECK_RESPONSE_TYPE_REFERENCE);
return response.getIsCompatible();
}
public ConfigUpdateRequest updateCompatibility(String compatibility, String subject)
throws IOException, RestClientException {
ConfigUpdateRequest request = new ConfigUpdateRequest();
request.setCompatibilityLevel(compatibility);
return updateConfig(request, subject);
}
public ConfigUpdateRequest updateConfig(ConfigUpdateRequest configUpdateRequest,
String subject)
throws IOException, RestClientException {
return updateConfig(DEFAULT_REQUEST_PROPERTIES, configUpdateRequest, subject);
}
/**
* On success, this api simply echoes the request in the response.
*/
public ConfigUpdateRequest updateConfig(Map<String, String> requestProperties,
ConfigUpdateRequest configUpdateRequest,
String subject)
throws IOException, RestClientException {
String path = subject != null ? String.format(CONFIG_PREFIX_FORMAT, subject) : CONFIG_ALL_PREFIX_FORMAT;
ConfigUpdateRequest response =
httpRequest(path, "PUT", configUpdateRequest.toJson().getBytes(),
requestProperties, UPDATE_CONFIG_RESPONSE_TYPE_REFERENCE);
return response;
}
public Config getConfig(String subject)
throws IOException, RestClientException {
return getConfig(DEFAULT_REQUEST_PROPERTIES, subject);
}
public Config getConfig(Map<String, String> requestProperties,
String subject)
throws IOException, RestClientException {
String path = subject != null ? String.format(CONFIG_PREFIX_FORMAT, subject) : CONFIG_ALL_PREFIX_FORMAT;
Config config =
httpRequest(path, "GET", null, requestProperties, GET_CONFIG_RESPONSE_TYPE);
return config;
}
public SchemaString getId(String id) throws IOException, RestClientException {
return getId(DEFAULT_REQUEST_PROPERTIES, id);
}
public SchemaString getId(Map<String, String> requestProperties,
String id) throws IOException, RestClientException {
String path = String.format(SCHEMA_ID_PREFIX_FORMAT, id);
SchemaString response;
if (requestProperties.containsKey("Response-Type")
&& requestProperties.get("Response-Type").equals(PLAIN_STRING)) {
response = new SchemaString(httpRequest(path, "GET", null, requestProperties, PLAIN_STRING_RESPONSE_TYPE));
} else {
response = httpRequest(path, "GET", null, requestProperties, GET_SCHEMA_BY_ID_RESPONSE_TYPE);
}
return response;
}
public Schema getVersion(String subject, int version) throws IOException, RestClientException {
return getVersion(DEFAULT_REQUEST_PROPERTIES, subject, version);
}
public Schema getVersion(Map<String, String> requestProperties,
String subject, int version)
throws IOException, RestClientException {
String path = String.format(VERSION_PREFIX_FORMAT, subject, version);
Schema response = httpRequest(path, "GET", null, requestProperties,
GET_SCHEMA_BY_VERSION_RESPONSE_TYPE);
return response;
}
public Schema getLatestVersion(String subject)
throws IOException, RestClientException {
return getLatestVersion(DEFAULT_REQUEST_PROPERTIES, subject);
}
public Schema getLatestVersion(Map<String, String> requestProperties,
String subject)
throws IOException, RestClientException {
String path = String.format(VERSION_LATEST_PREFIX_FORMAT, subject);
Schema response = httpRequest(path, "GET", null, requestProperties,
GET_SCHEMA_BY_VERSION_RESPONSE_TYPE);
return response;
}
public List<Integer> getAllVersions(String subject)
throws IOException, RestClientException {
return getAllVersions(DEFAULT_REQUEST_PROPERTIES, subject);
}
public List<Integer> getAllVersions(Map<String, String> requestProperties,
String subject)
throws IOException, RestClientException {
String path = String.format(VERSION_ALL_PREFIX_FORMAT, subject);
List<Integer> response = httpRequest(path, "GET", null, requestProperties,
ALL_VERSIONS_RESPONSE_TYPE);
return response;
}
public List<String> getAllSubjects()
throws IOException, RestClientException {
return getAllSubjects(DEFAULT_REQUEST_PROPERTIES);
}
public List<String> getAllSubjects(Map<String, String> requestProperties)
throws IOException, RestClientException {
List<String> response = httpRequest(SUBJECT_ALL_PREFIX_FORMAT, "GET", null, requestProperties,
ALL_TOPICS_RESPONSE_TYPE);
return response;
}
private static List<String> parseBaseUrl(String baseUrl) {
List<String> baseUrls = Arrays.asList(baseUrl.split("\\s*,\\s*"));
if (baseUrls.isEmpty()) {
throw new IllegalArgumentException("Missing required schema registry url list");
}
return baseUrls;
}
public UrlList getBaseUrls() {
return baseUrls;
}
}

View File

@ -1,44 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest;
import java.util.Arrays;
import java.util.List;
public class Versions {
public static final String SCHEMA_REGISTRY_V1_JSON = "application/vnd.schemaregistry.v1+json";
// Default weight = 1
public static final String SCHEMA_REGISTRY_V1_JSON_WEIGHTED = SCHEMA_REGISTRY_V1_JSON;
// These are defaults that track the most recent API version. These should always be specified
// anywhere the latest version is produced/consumed.
public static final String SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT = SCHEMA_REGISTRY_V1_JSON;
public static final String SCHEMA_REGISTRY_DEFAULT_JSON = "application/vnd.schemaregistry+json";
public static final String SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED =
SCHEMA_REGISTRY_DEFAULT_JSON +
"; qs=0.9";
public static final String JSON = "application/json";
public static final String JSON_WEIGHTED = JSON + "; qs=0.5";
public static final List<String> PREFERRED_RESPONSE_TYPES = Arrays
.asList(Versions.SCHEMA_REGISTRY_V1_JSON, Versions.SCHEMA_REGISTRY_DEFAULT_JSON,
Versions.JSON);
// This type is completely generic and carries no actual information about the type of data, but
// it is the default for request entities if no content type is specified. Well behaving users
// of the API will always specify the content type, but ad hoc use may omit it. We treat this as
// JSON since that's all we currently support.
public static final String GENERIC_REQUEST = "application/octet-stream";
}

View File

@ -1,70 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest.entities;
import org.codehaus.jackson.annotate.JsonProperty;
public class Config {
private String compatibilityLevel;
public Config(@JsonProperty("compatibility") String compatibilityLevel) {
this.compatibilityLevel = compatibilityLevel;
}
public Config() {
compatibilityLevel = null;
}
@JsonProperty("compatibility")
public String getCompatibilityLevel() {
return compatibilityLevel;
}
@JsonProperty("compatibility")
public void setCompatibilityLevel(String compatibilityLevel) {
this.compatibilityLevel = compatibilityLevel;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Config that = (Config) o;
if (!this.compatibilityLevel.equals(that.compatibilityLevel)) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = 31 * compatibilityLevel.hashCode();
return result;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{compatibilityLevel=" + this.compatibilityLevel + "}");
return sb.toString();
}
}

View File

@ -1,52 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Generic JSON error message.
*/
public class ErrorMessage {
private int errorCode;
private String message;
public ErrorMessage(@JsonProperty("error_code") int errorCode,
@JsonProperty("message") String message) {
this.errorCode = errorCode;
this.message = message;
}
@JsonProperty("error_code")
public int getErrorCode() {
return errorCode;
}
@JsonProperty("error_code")
public void setErrorCode(int error_code) {
this.errorCode = error_code;
}
@JsonProperty
public String getMessage() {
return message;
}
@JsonProperty
public void setMessage(String message) {
this.message = message;
}
}

View File

@ -1,130 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Schema implements Comparable<Schema> {
private String subject;
private Integer version;
private String id;
private String schema;
public Schema(@JsonProperty("subject") String subject,
@JsonProperty("version") Integer version,
@JsonProperty("id") String id,
@JsonProperty("schema") String schema) {
this.subject = subject;
this.version = version;
this.id = id;
this.schema = schema;
}
@JsonProperty("subject")
public String getSubject() {
return subject;
}
@JsonProperty("subject")
public void setSubject(String subject) {
this.subject = subject;
}
@JsonProperty("version")
public Integer getVersion() {
return this.version;
}
@JsonProperty("version")
public void setVersion(Integer version) {
this.version = version;
}
@JsonProperty("id")
public String getId() {
return this.id;
}
@JsonProperty("id")
public void setId(String id) {
this.id = id;
}
@JsonProperty("schema")
public String getSchema() {
return this.schema;
}
@JsonProperty("schema")
public void setSchema(String schema) {
this.schema = schema;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Schema that = (Schema) o;
if (!this.subject.equals(that.subject)) {
return false;
}
if (!this.version.equals(that.version)) {
return false;
}
if (!this.id.equals(that.getId())) {
return false;
}
if (!this.schema.equals(that.schema)) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = subject.hashCode();
result = 31 * result + version;
result = 31 * result + id.hashCode();
result = 31 * result + schema.hashCode();
return result;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{subject=" + this.subject + ",");
sb.append("version=" + this.version + ",");
sb.append("id=" + this.id + ",");
sb.append("schema=" + this.schema + "}");
return sb.toString();
}
@Override
public int compareTo(Schema that) {
int result = this.subject.compareTo(that.subject);
if (result != 0) {
return result;
}
result = this.version - that.version;
return result;
}
}

View File

@ -1,50 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
public class SchemaString {
private String schemaString;
public SchemaString() {
}
public SchemaString(String schemaString) {
this.schemaString = schemaString;
}
public static SchemaString fromJson(String json) throws IOException {
return new ObjectMapper().readValue(json, SchemaString.class);
}
@JsonProperty("schema")
public String getSchemaString() {
return schemaString;
}
@JsonProperty("schema")
public void setSchemaString(String schemaString) {
this.schemaString = schemaString;
}
public String toJson() throws IOException {
return new ObjectMapper().writeValueAsString(this);
}
}

View File

@ -1,43 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest.entities.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
public class CompatibilityCheckResponse {
private boolean isCompatible;
public static CompatibilityCheckResponse fromJson(String json) throws IOException {
return new ObjectMapper().readValue(json, CompatibilityCheckResponse.class);
}
@JsonProperty("is_compatible")
public boolean getIsCompatible() {
return isCompatible;
}
@JsonProperty("is_compatible")
public void setIsCompatible(boolean isCompatible) {
this.isCompatible = isCompatible;
}
public String toJson() throws IOException {
return new ObjectMapper().writeValueAsString(this);
}
}

View File

@ -1,43 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest.entities.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
public class ConfigUpdateRequest {
private String compatibilityLevel;
public static ConfigUpdateRequest fromJson(String json) throws IOException {
return new ObjectMapper().readValue(json, ConfigUpdateRequest.class);
}
@JsonProperty("compatibility")
public String getCompatibilityLevel() {
return this.compatibilityLevel;
}
@JsonProperty("compatibility")
public void setCompatibilityLevel(String compatibilityLevel) {
this.compatibilityLevel = compatibilityLevel;
}
public String toJson() throws IOException {
return new ObjectMapper().writeValueAsString(this);
}
}

View File

@ -1,78 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest.entities.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
public class RegisterSchemaRequest {
private String schema;
public static RegisterSchemaRequest fromJson(String json) throws IOException {
return new ObjectMapper().readValue(json, RegisterSchemaRequest.class);
}
@JsonProperty("schema")
public String getSchema() {
return this.schema;
}
@JsonProperty("schema")
public void setSchema(String schema) {
this.schema = schema;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
RegisterSchemaRequest that = (RegisterSchemaRequest) o;
if (schema != null ? !schema.equals(that.schema) : that.schema != null) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (schema != null ? schema.hashCode() : 0);
return result;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{schema=" + this.schema + "}");
return sb.toString();
}
public String toJson() throws IOException {
return new ObjectMapper().writeValueAsString(this);
}
}

View File

@ -1,42 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest.entities.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
public class RegisterSchemaResponse {
private String id;
public static RegisterSchemaResponse fromJson(String json) throws IOException {
return new ObjectMapper().readValue(json, RegisterSchemaResponse.class);
}
@JsonProperty("id")
public String getId() {
return id;
}
@JsonProperty("id")
public void setId(String id) {
this.id = id;
}
public String toJson() throws IOException {
return new ObjectMapper().writeValueAsString(this);
}
}

View File

@ -1,34 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest.exceptions;
public class RestClientException extends Exception {
private final int status;
private final int errorCode;
public RestClientException(final String message, final int status, final int errorCode) {
super(message + "; error code: " + errorCode);
this.status = status;
this.errorCode = errorCode;
}
public int getStatus() {
return status;
}
public int getErrorCode() {
return errorCode;
}
}

View File

@ -1,81 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.schemaregistry.client.rest.utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class manages a set of urls for accessing an upstream registry. It basically
* maintains a pointer to a known good url which can be accessed through {@link #current()}.
* When a request against the current url fails, the {@link #fail(String)} method is invoked,
* and we'll move on to the next url (returning back to the start if we have to).
*
*/
public class UrlList {
private final AtomicInteger index;
private final List<String> urls;
public UrlList(List<String> urls) {
if (urls == null || urls.isEmpty()) {
throw new IllegalArgumentException("Expected at least one URL to be passed in constructor");
}
this.urls = new ArrayList<String>(urls);
this.index = new AtomicInteger(new Random().nextInt(urls.size()));
}
public UrlList(String url) {
this(Arrays.asList(url));
}
/**
* Get the current url
* @return the url
*/
public String current() {
return urls.get(index.get());
}
/**
* Declare the given url as failed. This will cause the urls to
* rotate, so that the next request will be done against a new url
* (if one exists).
* @param url the url that has failed
*/
public void fail(String url) {
int currentIndex = index.get();
if (urls.get(currentIndex).equals(url)) {
index.compareAndSet(currentIndex, (currentIndex+1)%urls.size());
}
}
/**
* The number of unique urls contained in this collection.
* @return the count of urls
*/
public int size() {
return urls.size();
}
@Override
public String toString() {
return urls.toString();
}
}

View File

@ -1,248 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.serializers;
import java.util.Arrays;
import javax.xml.bind.DatatypeConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;
import org.codehaus.jackson.node.JsonNodeFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import wherehows.common.kafka.SchemaId;
import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import kafka.utils.VerifiableProperties;
public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe {
public static final String SCHEMA_REGISTRY_SCHEMA_VERSION_PROP =
"schema.registry.schema.version";
private final DecoderFactory decoderFactory = DecoderFactory.get();
protected boolean useSpecificAvroReader = false;
private final Map<String, Schema> readerSchemaCache = new ConcurrentHashMap<String, Schema>();
/**
* Sets properties for this deserializer without overriding the schema registry client itself.
* Useful for testing, where a mock client is injected.
*/
protected void configure(KafkaAvroDeserializerConfig config) {
configureClientProperties(config);
useSpecificAvroReader = config
.getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
}
protected KafkaAvroDeserializerConfig deserializerConfig(Map<String, ?> props) {
try {
return new KafkaAvroDeserializerConfig(props);
} catch (io.confluent.common.config.ConfigException e) {
throw new ConfigException(e.getMessage());
}
}
protected KafkaAvroDeserializerConfig deserializerConfig(VerifiableProperties props) {
try {
return new KafkaAvroDeserializerConfig(props.props());
} catch (io.confluent.common.config.ConfigException e) {
throw new ConfigException(e.getMessage());
}
}
private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte!");
}
return buffer;
}
/**
* Deserializes the payload without including schema information for primitive types, maps, and arrays. Just the resulting
* deserialized object is returned.
*
* This behavior is the norm for Decoders/Deserializers.
*
* @param payload serialized data
* @return the deserialized object
* @throws SerializationException
*/
protected Object deserialize(byte[] payload) throws SerializationException {
return deserialize(false, null, null, payload, null);
}
/**
* Just like single-parameter version but take topic string as a parameter
*
* @param includeSchemaAndVersion boolean
* @param topic String
* @param payload serialized data
* @return the deserialized object
* @throws SerializationException
*/
protected Object deserialize(boolean includeSchemaAndVersion, String topic, byte[] payload) throws SerializationException {
return deserialize(includeSchemaAndVersion, topic, null, payload, null);
}
/**
* Just like single-parameter version but accepts an Avro schema to use for reading
*
* @param payload serialized data
* @param readerSchema schema to use for Avro read (optional, enables Avro projection)
* @return the deserialized object
* @throws SerializationException
*/
protected Object deserialize(byte[] payload, Schema readerSchema) throws SerializationException {
return deserialize(false, null, null, payload, readerSchema);
}
// The Object return type is a bit messy, but this is the simplest way to have flexible decoding and not duplicate
// deserialization code multiple times for different variants.
protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {
// Even if the caller requests schema & version, if the payload is null we cannot include it. The caller must handle
// this case.
if (payload == null) {
return null;
}
String id = "-1"; // int id = -1;
try {
ByteBuffer buffer = getByteBuffer(payload);
//id = buffer.getInt();
id = SchemaId.getIdString(ID_TYPE, buffer);
String subject = includeSchemaAndVersion ? getSubjectName(topic, isKey) : null;
Schema schema = schemaRegistry.getBySubjectAndID(subject, id);
int length = buffer.limit() - 1 - ID_TYPE.size();
final Object result;
if (schema.getType().equals(Schema.Type.BYTES)) {
byte[] bytes = new byte[length];
buffer.get(bytes, 0, length);
result = bytes;
} else {
int start = buffer.position() + buffer.arrayOffset();
DatumReader reader = getDatumReader(schema, readerSchema);
Object object = reader.read(null, decoderFactory.binaryDecoder(buffer.array(), start, length, null));
if (schema.getType().equals(Schema.Type.STRING)) {
object = object.toString(); // Utf8 -> String
}
result = object;
}
if (includeSchemaAndVersion) {
// Annotate the schema with the version. Note that we only do this if the schema +
// version are requested, i.e. in Kafka Connect converters. This is critical because that
// code *will not* rely on exact schema equality. Regular deserializers *must not* include
// this information because it would return schemas which are not equivalent.
//
// Note, however, that we also do not fill in the connect.version field. This allows the
// Converter to let a version provided by a Kafka Connect source take priority over the
// schema registry's ordering (which is implicit by auto-registration time rather than
// explicit from the Connector).
Integer version = schemaRegistry.getVersion(subject, schema);
if (schema.getType() == Schema.Type.UNION) {
// Can't set additional properties on a union schema since it's just a list, so set it
// on the first non-null entry
for (Schema memberSchema : schema.getTypes()) {
if (memberSchema.getType() != Schema.Type.NULL) {
memberSchema.addProp(SCHEMA_REGISTRY_SCHEMA_VERSION_PROP,
JsonNodeFactory.instance.numberNode(version));
break;
}
}
} else {
schema.addProp(SCHEMA_REGISTRY_SCHEMA_VERSION_PROP,
JsonNodeFactory.instance.numberNode(version));
}
if (schema.getType().equals(Schema.Type.RECORD)) {
return result;
} else {
return new NonRecordContainer(schema, result);
}
} else {
return result;
}
} catch (IOException | RuntimeException e) {
// avro deserialization may throw AvroRuntimeException, NullPointerException, etc
throw new SerializationException("Error deserializing Avro message for id " + id, e);
} catch (RestClientException e) {
byte[] initialBytes = Arrays.copyOf(payload, 40);
throw new SerializationException("Error retrieving Avro schema for topic " + topic + " id " + id
+ ", initial bytes " + DatatypeConverter.printHexBinary(initialBytes).toLowerCase(), e);
}
}
/**
* Deserializes the payload and includes schema information, with version information from the schema registry embedded
* in the schema.
*
* @param payload the serialized data
* @return a GenericContainer with the schema and data, either as a {@link NonRecordContainer},
* {@link org.apache.avro.generic.GenericRecord}, or {@link SpecificRecord}
* @throws SerializationException
*/
protected GenericContainer deserializeWithSchemaAndVersion(String topic, boolean isKey, byte[] payload) throws SerializationException {
return (GenericContainer) deserialize(true, topic, isKey, payload, null);
}
private DatumReader getDatumReader(Schema writerSchema, Schema readerSchema) {
if (useSpecificAvroReader) {
if (readerSchema == null) {
readerSchema = getReaderSchema(writerSchema);
}
return new SpecificDatumReader(writerSchema, readerSchema);
} else {
if (readerSchema == null) {
return new GenericDatumReader(writerSchema);
}
return new GenericDatumReader(writerSchema, readerSchema);
}
}
@SuppressWarnings("unchecked")
private Schema getReaderSchema(Schema writerSchema) {
Schema readerSchema = readerSchemaCache.get(writerSchema.getFullName());
if (readerSchema == null) {
Class<SpecificRecord> readerClass = SpecificData.get().getClass(writerSchema);
if (readerClass != null) {
try {
readerSchema = readerClass.newInstance().getSchema();
} catch (InstantiationException e) {
throw new SerializationException(writerSchema.getFullName() + " specified by the " +
"writers schema could not be instantiated to find the readers schema.");
} catch (IllegalAccessException e) {
throw new SerializationException(writerSchema.getFullName() + " specified by the " +
"writers schema is not allowed to be instantiated to find the readers schema.");
}
readerSchemaCache.put(writerSchema.getFullName(), readerSchema);
} else {
throw new SerializationException("Could not find class " + writerSchema.getFullName() +
" specified in writer's schema whilst finding reader's schema for a SpecificRecord.");
}
}
return readerSchema;
}
}

View File

@ -1,132 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.serializers;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import wherehows.common.kafka.SchemaId;
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException;
/**
* Common fields and helper methods for both the serializer and the deserializer.
*/
public abstract class AbstractKafkaAvroSerDe {
protected static final byte MAGIC_BYTE = 0x0;
//protected static final int idSize = 4;
protected static final SchemaId.Type ID_TYPE = SchemaId.Type.UUID;
private static final Map<String, Schema> primitiveSchemas;
protected SchemaRegistryClient schemaRegistry;
static {
Schema.Parser parser = new Schema.Parser();
primitiveSchemas = new HashMap<>();
primitiveSchemas.put("Null", createPrimitiveSchema(parser, "null"));
primitiveSchemas.put("Boolean", createPrimitiveSchema(parser, "boolean"));
primitiveSchemas.put("Integer", createPrimitiveSchema(parser, "int"));
primitiveSchemas.put("Long", createPrimitiveSchema(parser, "long"));
primitiveSchemas.put("Float", createPrimitiveSchema(parser, "float"));
primitiveSchemas.put("Double", createPrimitiveSchema(parser, "double"));
primitiveSchemas.put("String", createPrimitiveSchema(parser, "string"));
primitiveSchemas.put("Bytes", createPrimitiveSchema(parser, "bytes"));
}
private static Schema createPrimitiveSchema(Schema.Parser parser, String type) {
String schemaString = String.format("{\"type\" : \"%s\"}", type);
return parser.parse(schemaString);
}
protected void configureClientProperties(AbstractKafkaAvroSerDeConfig config) {
try {
List<String> urls = config.getSchemaRegistryUrls();
int maxSchemaObject = config.getMaxSchemasPerSubject();
if (schemaRegistry == null) {
schemaRegistry = new CachedSchemaRegistryClient(urls, maxSchemaObject);
}
} catch (io.confluent.common.config.ConfigException e) {
throw new ConfigException(e.getMessage());
}
}
/**
* Get the subject name for the given topic and value type.
*/
protected static String getSubjectName(String topic, boolean isKey) {
if (isKey) {
return topic + "-key";
} else {
return topic + "-value";
}
}
/**
* Get the subject name used by the old Encoder interface, which relies only on the value type rather than the topic.
*/
protected static String getOldSubjectName(Object value) {
if (value instanceof GenericContainer) {
return ((GenericContainer) value).getSchema().getName() + "-value";
} else {
throw new SerializationException("Primitive types are not supported yet");
}
}
protected Schema getSchema(Object object) {
if (object == null) {
return primitiveSchemas.get("Null");
} else if (object instanceof Boolean) {
return primitiveSchemas.get("Boolean");
} else if (object instanceof Integer) {
return primitiveSchemas.get("Integer");
} else if (object instanceof Long) {
return primitiveSchemas.get("Long");
} else if (object instanceof Float) {
return primitiveSchemas.get("Float");
} else if (object instanceof Double) {
return primitiveSchemas.get("Double");
} else if (object instanceof CharSequence) {
return primitiveSchemas.get("String");
} else if (object instanceof byte[]) {
return primitiveSchemas.get("Bytes");
} else if (object instanceof GenericContainer) {
return ((GenericContainer) object).getSchema();
} else {
throw new IllegalArgumentException(
"Unsupported Avro type. Supported types are null, Boolean, Integer, Long, "
+ "Float, Double, String, byte[] and IndexedRecord");
}
}
public String register(String subject, Schema schema) throws IOException, RestClientException {
return schemaRegistry.register(subject, schema);
}
public Schema getByID(String id) throws IOException, RestClientException {
return schemaRegistry.getByID(id);
}
public Schema getBySubjectAndID(String subject, String id) throws IOException, RestClientException {
return schemaRegistry.getBySubjectAndID(subject, id);
}
}

View File

@ -1,60 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.serializers;
import io.confluent.common.config.AbstractConfig;
import io.confluent.common.config.ConfigDef;
import io.confluent.common.config.ConfigDef.Type;
import io.confluent.common.config.ConfigDef.Importance;
import java.util.List;
import java.util.Map;
/**
* Base class for configs for serializers and deserializers, defining a few common configs and
* defaults.
*/
public class AbstractKafkaAvroSerDeConfig extends AbstractConfig {
public static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url";
public static final String SCHEMA_REGISTRY_URL_DOC =
"Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas.";
public static final String MAX_SCHEMAS_PER_SUBJECT_CONFIG = "max.schemas.per.subject";
public static final int MAX_SCHEMAS_PER_SUBJECT_DEFAULT = 1000;
public static final String MAX_SCHEMAS_PER_SUBJECT_DOC =
"Maximum number of schemas to create or cache locally.";
public static ConfigDef baseConfigDef() {
return new ConfigDef()
.define(SCHEMA_REGISTRY_URL_CONFIG, Type.LIST,
Importance.HIGH, SCHEMA_REGISTRY_URL_DOC)
.define(MAX_SCHEMAS_PER_SUBJECT_CONFIG, Type.INT, MAX_SCHEMAS_PER_SUBJECT_DEFAULT,
Importance.LOW, MAX_SCHEMAS_PER_SUBJECT_DOC);
}
public AbstractKafkaAvroSerDeConfig(ConfigDef config, Map<?, ?> props) {
super(config, props);
}
public int getMaxSchemasPerSubject(){
return this.getInt(MAX_SCHEMAS_PER_SUBJECT_CONFIG);
}
public List<String> getSchemaRegistryUrls(){
return this.getList(SCHEMA_REGISTRY_URL_CONFIG);
}
}

View File

@ -1,100 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.serializers;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import wherehows.common.kafka.SchemaId;
import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import kafka.utils.VerifiableProperties;
public abstract class AbstractKafkaAvroSerializer extends AbstractKafkaAvroSerDe {
private final EncoderFactory encoderFactory = EncoderFactory.get();
protected void configure(KafkaAvroSerializerConfig config) {
configureClientProperties(config);
}
protected KafkaAvroSerializerConfig serializerConfig(Map<String, ?> props) {
try {
return new KafkaAvroSerializerConfig(props);
} catch (io.confluent.common.config.ConfigException e) {
throw new ConfigException(e.getMessage());
}
}
protected KafkaAvroSerializerConfig serializerConfig(VerifiableProperties props) {
try {
return new KafkaAvroSerializerConfig(props.props());
} catch (io.confluent.common.config.ConfigException e) {
throw new ConfigException(e.getMessage());
}
}
protected byte[] serializeImpl(String subject, Object object) throws SerializationException {
Schema schema = null;
// null needs to treated specially since the client most likely just wants to send
// an individual null value instead of making the subject a null type. Also, null in
// Kafka has a special meaning for deletion in a topic with the compact retention policy.
// Therefore, we will bypass schema registration and return a null value in Kafka, instead
// of an Avro encoded null.
if (object == null) {
return null;
}
try {
schema = getSchema(object);
String id = schemaRegistry.register(subject, schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(MAGIC_BYTE);
//out.write(ByteBuffer.allocate(idSize).putInt(id).array());
out.write(SchemaId.getIdBytes(ID_TYPE, id));
if (object instanceof byte[]) {
out.write((byte[]) object);
} else {
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
DatumWriter<Object> writer;
Object value = object instanceof NonRecordContainer ? ((NonRecordContainer) object).getValue() : object;
if (value instanceof SpecificRecord) {
writer = new SpecificDatumWriter<>(schema);
} else {
writer = new GenericDatumWriter<>(schema);
}
writer.write(value, encoder);
encoder.flush();
}
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException | RuntimeException e) {
// avro serialization can throw AvroRuntimeException, NullPointerException,
// ClassCastException, etc
throw new SerializationException("Error serializing Avro message", e);
} catch (RestClientException e) {
throw new SerializationException("Error registering Avro schema: " + schema, e);
}
}
}

View File

@ -1,49 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.serializers;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import kafka.serializer.Decoder;
import kafka.utils.VerifiableProperties;
import org.apache.avro.Schema;
public class KafkaAvroDecoder extends AbstractKafkaAvroDeserializer implements Decoder<Object> {
public KafkaAvroDecoder(SchemaRegistryClient schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}
public KafkaAvroDecoder(SchemaRegistryClient schemaRegistry, VerifiableProperties props) {
this.schemaRegistry = schemaRegistry;
configure(deserializerConfig(props));
}
/**
* Constructor used by Kafka consumer.
*/
public KafkaAvroDecoder(VerifiableProperties props) {
configure(new KafkaAvroDeserializerConfig(props.props()));
}
@Override
public Object fromBytes(byte[] bytes) {
return deserialize(bytes);
}
/**
* Pass a reader schema to get an Avro projection
*/
public Object fromBytes(byte[] bytes, Schema readerSchema) { return deserialize(bytes, readerSchema); }
}

View File

@ -1,68 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.serializers;
import org.apache.avro.Schema;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
public class KafkaAvroDeserializer extends AbstractKafkaAvroDeserializer
implements Deserializer<Object> {
private boolean isKey;
/**
* Constructor used by Kafka consumer.
*/
public KafkaAvroDeserializer() {
}
public KafkaAvroDeserializer(SchemaRegistryClient client) {
schemaRegistry = client;
}
public KafkaAvroDeserializer(SchemaRegistryClient client, Map<String, ?> props) {
schemaRegistry = client;
configure(deserializerConfig(props));
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public Object deserialize(String s, byte[] bytes) {
// return deserialize(bytes);
return deserialize(false, s, bytes);
}
/**
* Pass a reader schema to get an Avro projection
*/
public Object deserialize(String s, byte[] bytes, Schema readerSchema) {
return deserialize(bytes, readerSchema);
}
@Override
public void close() {
}
}

View File

@ -1,41 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.serializers;
import io.confluent.common.config.ConfigDef;
import io.confluent.common.config.ConfigDef.Type;
import io.confluent.common.config.ConfigDef.Importance;
import java.util.Map;
public class KafkaAvroDeserializerConfig extends AbstractKafkaAvroSerDeConfig {
public static final String SPECIFIC_AVRO_READER_CONFIG = "specific.avro.reader";
public static final boolean SPECIFIC_AVRO_READER_DEFAULT = false;
public static final String SPECIFIC_AVRO_READER_DOC =
"If true, tries to look up the SpecificRecord class ";
private static ConfigDef config;
static {
config = baseConfigDef()
.define(SPECIFIC_AVRO_READER_CONFIG, Type.BOOLEAN, SPECIFIC_AVRO_READER_DEFAULT,
Importance.LOW, SPECIFIC_AVRO_READER_DOC);
}
public KafkaAvroDeserializerConfig(Map<?, ?> props) {
super(config, props);
}
}

View File

@ -1,48 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.serializers;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
/**
* At present, this only works for value not key and it only supports IndexedRecord. To register for
* a topic, user will need to provide a list of record_name:topic_name map as a CSV format. By
* default, the encoder will use record name as topic.
*/
public class KafkaAvroEncoder extends AbstractKafkaAvroSerializer implements Encoder<Object> {
public KafkaAvroEncoder(SchemaRegistryClient schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}
/**
* Constructor used by Kafka producer.
*/
public KafkaAvroEncoder(VerifiableProperties props) {
configure(serializerConfig(props));
}
@Override
public byte[] toBytes(Object object) {
return serializeImpl(getOldSubjectName(object), object);
}
}

View File

@ -1,60 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.serializers;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
public class KafkaAvroSerializer extends AbstractKafkaAvroSerializer implements Serializer<Object> {
private boolean isKey;
/**
* Constructor used by Kafka producer.
*/
public KafkaAvroSerializer() {
}
public KafkaAvroSerializer(SchemaRegistryClient client) {
schemaRegistry = client;
}
public KafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
schemaRegistry = client;
configure(serializerConfig(props));
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;
configure(new KafkaAvroSerializerConfig(configs));
}
@Override
public byte[] serialize(String topic, Object record) {
return serializeImpl(getSubjectName(topic, isKey), record);
}
@Override
public void close() {
}
}

View File

@ -1,33 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.serializers;
import io.confluent.common.config.ConfigDef;
import io.confluent.common.config.ConfigDef.Importance;
import io.confluent.common.config.ConfigDef.Type;
import java.util.Map;
public class KafkaAvroSerializerConfig extends AbstractKafkaAvroSerDeConfig {
private static ConfigDef config;
static {
config = baseConfigDef();
}
public KafkaAvroSerializerConfig(Map<?, ?> props) {
super(config, props);
}
}

View File

@ -1,59 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.kafka.serializers;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Objects;
/**
* Wrapper for all non-record types that includes the schema for the data.
*/
public class NonRecordContainer implements GenericContainer {
private final Schema schema;
private final Object value;
public NonRecordContainer(Schema schema, Object value) {
if (schema == null)
throw new SerializationException("Schema may not be null.");
this.schema = schema;
this.value = value;
}
@Override
public Schema getSchema() {
return schema;
}
public Object getValue() {
return value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NonRecordContainer that = (NonRecordContainer) o;
return Objects.equals(schema, that.schema) &&
Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(schema, value);
}
}