From f015d66fd24ac51fe6396e2008895db8edeaef4c Mon Sep 17 00:00:00 2001 From: "Yi (Alan) Wang" Date: Mon, 11 Sep 2017 18:13:31 -0700 Subject: [PATCH] Remove obsolete kafka code in wherehows-common (#739) --- gradle/scripts/dependency.gradle | 1 - gradle/scripts/repositories.gradle | 8 - wherehows-backend/build.gradle | 2 - wherehows-common/build.gradle | 4 - .../java/wherehows/common/kafka/SchemaId.java | 74 ---- .../kafka/formatter/AvroMessageFormatter.java | 137 ------ .../kafka/formatter/AvroMessageReader.java | 207 --------- .../avro/AvroCompatibilityChecker.java | 71 --- .../avro/AvroCompatibilityLevel.java | 49 -- .../client/CachedSchemaRegistryClient.java | 185 -------- .../client/MockSchemaRegistryClient.java | 240 ---------- .../schemaregistry/client/SchemaMetadata.java | 40 -- .../client/SchemaRegistryClient.java | 43 -- .../client/rest/RestService.java | 419 ------------------ .../schemaregistry/client/rest/Versions.java | 44 -- .../client/rest/entities/Config.java | 70 --- .../client/rest/entities/ErrorMessage.java | 52 --- .../client/rest/entities/Schema.java | 130 ------ .../client/rest/entities/SchemaString.java | 50 --- .../requests/CompatibilityCheckResponse.java | 43 -- .../requests/ConfigUpdateRequest.java | 43 -- .../requests/RegisterSchemaRequest.java | 78 ---- .../requests/RegisterSchemaResponse.java | 42 -- .../rest/exceptions/RestClientException.java | 34 -- .../client/rest/utils/UrlList.java | 81 ---- .../AbstractKafkaAvroDeserializer.java | 248 ----------- .../serializers/AbstractKafkaAvroSerDe.java | 132 ------ .../AbstractKafkaAvroSerDeConfig.java | 60 --- .../AbstractKafkaAvroSerializer.java | 100 ----- .../kafka/serializers/KafkaAvroDecoder.java | 49 -- .../serializers/KafkaAvroDeserializer.java | 68 --- .../KafkaAvroDeserializerConfig.java | 41 -- .../kafka/serializers/KafkaAvroEncoder.java | 48 -- .../serializers/KafkaAvroSerializer.java | 60 --- .../KafkaAvroSerializerConfig.java | 33 -- .../kafka/serializers/NonRecordContainer.java | 59 --- 36 files changed, 3045 deletions(-) delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/SchemaId.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/formatter/AvroMessageFormatter.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/formatter/AvroMessageReader.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/avro/AvroCompatibilityChecker.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/avro/AvroCompatibilityLevel.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/CachedSchemaRegistryClient.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/MockSchemaRegistryClient.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/SchemaMetadata.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/SchemaRegistryClient.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/RestService.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/Versions.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/Config.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/ErrorMessage.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/Schema.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/SchemaString.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/CompatibilityCheckResponse.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/ConfigUpdateRequest.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/RegisterSchemaRequest.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/RegisterSchemaResponse.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/exceptions/RestClientException.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/utils/UrlList.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroDeserializer.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerDe.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerDeConfig.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerializer.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDecoder.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDeserializer.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDeserializerConfig.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroEncoder.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroSerializer.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroSerializerConfig.java delete mode 100644 wherehows-common/src/main/java/wherehows/common/kafka/serializers/NonRecordContainer.java diff --git a/gradle/scripts/dependency.gradle b/gradle/scripts/dependency.gradle index 451a55aee4d..4a97139b65d 100644 --- a/gradle/scripts/dependency.gradle +++ b/gradle/scripts/dependency.gradle @@ -39,7 +39,6 @@ ext.externalDependency = [ "akka" : "com.typesafe.akka:akka-actor_2.10:2.3.15", "spring_jdbc" : "org.springframework:spring-jdbc:4.1.6.RELEASE", - "kafka" : "org.apache.kafka:kafka_2.10:0.10.2.1", "kafka_clients" : "org.apache.kafka:kafka-clients:0.10.2.1", "confluent_avro_serde": "io.confluent:kafka-avro-serializer:3.3.0", diff --git a/gradle/scripts/repositories.gradle b/gradle/scripts/repositories.gradle index 782eeb53da7..aa782a94193 100644 --- a/gradle/scripts/repositories.gradle +++ b/gradle/scripts/repositories.gradle @@ -25,14 +25,6 @@ repositories { name "gradle-plugins" url 'http://plugins.gradle.org/m2/' } - /* ivy { // this is required by metadata store Restli client within LinkedIn - url 'http://artifactory.corp.linkedin.com:8081/artifactory/repo' - layout 'pattern', { - ivy '[organisation]/[module]/[revision]/[module]-[revision].ivy' - artifact '[organisation]/[module]/[revision]/[artifact]-[revision](-[classifier]).[ext]' - m2compatible = true - } - } */ } try { diff --git a/wherehows-backend/build.gradle b/wherehows-backend/build.gradle index 1ae10a5795c..ddf22e2a634 100644 --- a/wherehows-backend/build.gradle +++ b/wherehows-backend/build.gradle @@ -13,8 +13,6 @@ dependencies { play externalDependency.play_java_jdbc play externalDependency.slf4j_api play externalDependency.jasypt - play externalDependency.kafka - play externalDependency.kafka_clients playTest 'com.github.stefanbirkner:system-rules:1.16.0' playTest 'org.easytesting:fest-assert-core:2.0M10' diff --git a/wherehows-common/build.gradle b/wherehows-common/build.gradle index f41ec7fd667..d1bce5c5c75 100644 --- a/wherehows-common/build.gradle +++ b/wherehows-common/build.gradle @@ -7,10 +7,6 @@ dependencies { compile externalDependency.jgit compile externalDependency.jsoup compile externalDependency.commons_io - compile externalDependency.avro - compile externalDependency.kafka - compile externalDependency.kafka_clients - compile externalDependency.confluent_avro_serde compile externalDependency.jackson_core compile externalDependency.jackson_databind compile externalDependency.guava diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/SchemaId.java b/wherehows-common/src/main/java/wherehows/common/kafka/SchemaId.java deleted file mode 100644 index 000245e192d..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/SchemaId.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package wherehows.common.kafka; - -import java.nio.ByteBuffer; -import javax.xml.bind.DatatypeConverter; - - -public class SchemaId { - - /** - * Kafka Avro Schema ID type and size - */ - public enum Type { - INT(4), LONG(8), UUID(16); - private int size; - - Type(int size) { - this.size = size; - } - - public int size() { - return size; - } - } - - /** - * get Schema ID from ByteBuffer by the type, move ByteBuffer position by size - * @param byteBuffer - * @return - */ - public static String getIdString(Type type, ByteBuffer byteBuffer) { - if (type == Type.INT) { - return Integer.toString(byteBuffer.getInt()); - } else if (type == Type.LONG) { - return Long.toString(byteBuffer.getLong()); - } else if (type == Type.UUID) { - byte[] bytes = new byte[Type.UUID.size]; - byteBuffer.get(bytes); - return DatatypeConverter.printHexBinary(bytes).toLowerCase(); - } else { - return null; - } - } - - /** - * convert String id into ByteBuffer - * @param type - * @param id String - * @return byte[] - */ - public static byte[] getIdBytes(Type type, String id) { - if (type == Type.INT) { - return ByteBuffer.allocate(Type.INT.size).putInt(Integer.parseInt(id)).array(); - } else if (type == Type.LONG) { - return ByteBuffer.allocate(Type.LONG.size).putLong(Long.parseLong(id)).array(); - } else if (type == Type.UUID) { - return DatatypeConverter.parseHexBinary(id); - } else { - return new byte[0]; - } - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/formatter/AvroMessageFormatter.java b/wherehows-common/src/main/java/wherehows/common/kafka/formatter/AvroMessageFormatter.java deleted file mode 100644 index 73106132996..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/formatter/AvroMessageFormatter.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.formatter; - -import org.apache.avro.AvroRuntimeException; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.io.JsonEncoder; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.errors.SerializationException; - -import java.io.IOException; -import java.io.PrintStream; -import java.util.Properties; - -import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; -import wherehows.common.kafka.serializers.AbstractKafkaAvroDeserializer; -import wherehows.common.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import kafka.common.MessageFormatter; - -/** - * Example - * To use AvroMessageFormatter, first make sure that Zookeeper, Kafka and schema registry server are - * all started. Second, make sure the jar for AvroMessageFormatter and its dependencies are included - * in the classpath of kafka-console-consumer.sh. Then run the following command. - * - * 1. To read only the value of the messages in JSON - * bin/kafka-console-consumer.sh --consumer.config config/consumer.properties --topic t1 \ - * --zookeeper localhost:2181 --formatter io.confluent.kafka.formatter.AvroMessageFormatter \ - * --property schema.registry.url=http://localhost:8081 - * - * 2. To read both the key and the value of the messages in JSON - * bin/kafka-console-consumer.sh --consumer.config config/consumer.properties --topic t1 \ - * --zookeeper localhost:2181 --formatter io.confluent.kafka.formatter.AvroMessageFormatter \ - * --property schema.registry.url=http://localhost:8081 \ - * --property print.key=true - * - */ -public class AvroMessageFormatter extends AbstractKafkaAvroDeserializer - implements MessageFormatter { - - private final EncoderFactory encoderFactory = EncoderFactory.get(); - private boolean printKey = false; - private byte[] keySeparator = "\t".getBytes(); - private byte[] lineSeparator = "\n".getBytes(); - - /** - * Constructor needed by kafka console consumer. - */ - public AvroMessageFormatter() { - } - - /** - * For testing only. - */ - AvroMessageFormatter(SchemaRegistryClient schemaRegistryClient, boolean printKey) { - this.schemaRegistry = schemaRegistryClient; - this.printKey = printKey; - } - - @Override - public void init(Properties props) { - if (props == null) { - throw new ConfigException("Missing schema registry url!"); - } - String url = props.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); - if (url == null) { - throw new ConfigException("Missing schema registry url!"); - } - schemaRegistry = new CachedSchemaRegistryClient( - url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); - - if (props.containsKey("print.key")) { - printKey = props.getProperty("print.key").trim().toLowerCase().equals("true"); - } - if (props.containsKey("key.separator")) { - keySeparator = props.getProperty("key.separator").getBytes(); - } - if (props.containsKey("line.separator")) { - lineSeparator = props.getProperty("line.separator").getBytes(); - } - } - - @Override - public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { - if (printKey) { - try { - writeTo(consumerRecord.key(), output); - output.write(keySeparator); - } catch (IOException ioe) { - throw new SerializationException("Error while formatting the key", ioe); - } - } - try { - writeTo(consumerRecord.value(), output); - output.write(lineSeparator); - } catch (IOException ioe) { - throw new SerializationException("Error while formatting the value", ioe); - } - } - - private void writeTo(byte[] data, PrintStream output) throws IOException { - Object object = deserialize(data); - Schema schema = getSchema(object); - - try { - JsonEncoder encoder = encoderFactory.jsonEncoder(schema, output); - DatumWriter writer = new GenericDatumWriter(schema); - writer.write(object, encoder); - encoder.flush(); - } catch (AvroRuntimeException e) { - throw new SerializationException( - String.format("Error serializing Avro data of schema %s to json", schema), e); - } - } - - @Override - public void close() { - // nothing to do - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/formatter/AvroMessageReader.java b/wherehows-common/src/main/java/wherehows/common/kafka/formatter/AvroMessageReader.java deleted file mode 100644 index d9c659f9b01..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/formatter/AvroMessageReader.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.formatter; - -import org.apache.avro.AvroRuntimeException; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.util.Utf8; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.errors.SerializationException; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; -import wherehows.common.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import wherehows.common.kafka.serializers.AbstractKafkaAvroSerializer; -import kafka.common.KafkaException; -import kafka.producer.KeyedMessage; -import kafka.common.MessageReader; - -/** - * Example - * To use AvroMessageReader, first make sure that Zookeeper, Kafka and schema registry server are - * all started. Second, make sure the jar for AvroMessageReader and its dependencies are included - * in the classpath of kafka-console-producer.sh. Then run the following - * command. - * - * 1. Send Avro string as value. (make sure there is no space in the schema string) - * bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1 \ - * --line-reader io.confluent.kafka.formatter.AvroMessageReader \ - * --property schema.registry.url=http://localhost:8081 \ - * --property value.schema='{"type":"string"}' - * - * In the shell, type in the following. - * "a" - * "b" - * - * 2. Send Avro record as value. - * bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1 \ - * --line-reader io.confluent.kafka.formatter.AvroMessageReader \ - * --property schema.registry.url=http://localhost:8081 \ - * --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' - * - * In the shell, type in the following. - * {"f1": "value1"} - * - * 3. Send Avro string as key and Avro record as value. - * bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1 \ - * --line-reader io.confluent.kafka.formatter.AvroMessageReader \ - * --property schema.registry.url=http://localhost:8081 \ - * --property parse.key=true \ - * --property key.schema='{"type":"string"}' \ - * --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' - * - * In the shell, type in the following. - * "key1" \t {"f1": "value1"} - * - */ -public class AvroMessageReader extends AbstractKafkaAvroSerializer implements MessageReader { - - private String topic = null; - private BufferedReader reader = null; - private Boolean parseKey = false; - private String keySeparator = "\t"; - private boolean ignoreError = false; - private final DecoderFactory decoderFactory = DecoderFactory.get(); - private Schema keySchema = null; - private Schema valueSchema = null; - private String keySubject = null; - private String valueSubject = null; - - /** - * Constructor needed by kafka console producer. - */ - public AvroMessageReader() { - } - - /** - * For testing only. - */ - AvroMessageReader(SchemaRegistryClient schemaRegistryClient, Schema keySchema, Schema valueSchema, - String topic, boolean parseKey, BufferedReader reader) { - this.schemaRegistry = schemaRegistryClient; - this.keySchema = keySchema; - this.valueSchema = valueSchema; - this.topic = topic; - this.keySubject = topic + "-key"; - this.valueSubject = topic + "-value"; - this.parseKey = parseKey; - this.reader = reader; - } - - @Override - public void init(java.io.InputStream inputStream, java.util.Properties props) { - topic = props.getProperty("topic"); - if (props.containsKey("parse.key")) { - parseKey = props.getProperty("parse.key").trim().toLowerCase().equals("true"); - } - if (props.containsKey("key.separator")) { - keySeparator = props.getProperty("key.separator"); - } - if (props.containsKey("ignore.error")) { - ignoreError = props.getProperty("ignore.error").trim().toLowerCase().equals("true"); - } - reader = new BufferedReader(new InputStreamReader(inputStream)); - String url = props.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); - if (url == null) { - throw new ConfigException("Missing schema registry url!"); - } - schemaRegistry = new CachedSchemaRegistryClient( - url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); - if (!props.containsKey("value.schema")) { - throw new ConfigException("Must provide the Avro schema string in value.schema"); - } - String valueSchemaString = props.getProperty("value.schema"); - Schema.Parser parser = new Schema.Parser(); - valueSchema = parser.parse(valueSchemaString); - - if (parseKey) { - if (!props.containsKey("key.schema")) { - throw new ConfigException("Must provide the Avro schema string in key.schema"); - } - String keySchemaString = props.getProperty("key.schema"); - keySchema = parser.parse(keySchemaString); - } - keySubject = topic + "-key"; - valueSubject = topic + "-value"; - } - - @Override - public ProducerRecord readMessage() { - try { - String line = reader.readLine(); - if (line == null) { - return null; - } - if (!parseKey) { - Object value = jsonToAvro(line, valueSchema); - byte[] serializedValue = serializeImpl(valueSubject, value); - return new ProducerRecord<>(topic, serializedValue); - } else { - int keyIndex = line.indexOf(keySeparator); - if (keyIndex < 0) { - if (ignoreError) { - Object value = jsonToAvro(line, valueSchema); - byte[] serializedValue = serializeImpl(valueSubject, value); - return new ProducerRecord<>(topic, serializedValue); - } else { - throw new KafkaException("No key found in line " + line); - } - } else { - String keyString = line.substring(0, keyIndex); - String valueString = (keyIndex + keySeparator.length() > line.length()) ? - "" : line.substring(keyIndex + keySeparator.length()); - Object key = jsonToAvro(keyString, keySchema); - byte[] serializedKey = serializeImpl(keySubject, key); - Object value = jsonToAvro(valueString, valueSchema); - byte[] serializedValue = serializeImpl(valueSubject, value); - return new ProducerRecord<>(topic, serializedKey, serializedValue); - } - } - } catch (IOException e) { - throw new KafkaException("Error reading from input", e); - } - } - - private Object jsonToAvro(String jsonString, Schema schema) { - try { - DatumReader reader = new GenericDatumReader(schema); - Object object = reader.read(null, decoderFactory.jsonDecoder(schema, jsonString)); - - if (schema.getType().equals(Schema.Type.STRING)) { - object = ((Utf8) object).toString(); - } - return object; - } catch (IOException e) { - throw new SerializationException( - String.format("Error deserializing json %s to Avro of schema %s", jsonString, schema), e); - } catch (AvroRuntimeException e) { - throw new SerializationException( - String.format("Error deserializing json %s to Avro of schema %s", jsonString, schema), e); - } - } - - @Override - public void close() { - // nothing to do - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/avro/AvroCompatibilityChecker.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/avro/AvroCompatibilityChecker.java deleted file mode 100644 index 1b9550416e3..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/avro/AvroCompatibilityChecker.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.avro; - -import org.apache.avro.Schema; -import org.apache.avro.SchemaValidationException; -import org.apache.avro.SchemaValidator; -import org.apache.avro.SchemaValidatorBuilder; - -import java.util.ArrayList; -import java.util.List; - -public class AvroCompatibilityChecker { - - // Check if the new schema can be used to read data produced by the latest schema - private static SchemaValidator BACKWARD_VALIDATOR = - new SchemaValidatorBuilder().canReadStrategy().validateLatest(); - public static AvroCompatibilityChecker BACKWARD_CHECKER = new AvroCompatibilityChecker( - BACKWARD_VALIDATOR); - // Check if data produced by the new schema can be read by the latest schema - private static SchemaValidator FORWARD_VALIDATOR = - new SchemaValidatorBuilder().canBeReadStrategy().validateLatest(); - public static AvroCompatibilityChecker FORWARD_CHECKER = new AvroCompatibilityChecker( - FORWARD_VALIDATOR); - // Check if the new schema is both forward and backward compatible with the latest schema - private static SchemaValidator FULL_VALIDATOR = - new SchemaValidatorBuilder().mutualReadStrategy().validateLatest(); - public static AvroCompatibilityChecker FULL_CHECKER = new AvroCompatibilityChecker( - FULL_VALIDATOR); - private static SchemaValidator NO_OP_VALIDATOR = new SchemaValidator() { - @Override - public void validate(Schema schema, Iterable schemas) throws SchemaValidationException { - // do nothing - } - }; - public static AvroCompatibilityChecker NO_OP_CHECKER = new AvroCompatibilityChecker( - NO_OP_VALIDATOR); - private final SchemaValidator validator; - - private AvroCompatibilityChecker(SchemaValidator validator) { - this.validator = validator; - } - - /** - * Check the compatibility between the new schema and the latest schema - */ - public boolean isCompatible(Schema newSchema, Schema latestSchema) { - List schemas = new ArrayList(); - schemas.add(latestSchema); - - try { - validator.validate(newSchema, schemas); - } catch (SchemaValidationException e) { - return false; - } - - return true; - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/avro/AvroCompatibilityLevel.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/avro/AvroCompatibilityLevel.java deleted file mode 100644 index 2d262285cee..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/avro/AvroCompatibilityLevel.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.avro; - -public enum AvroCompatibilityLevel { - NONE("NONE", AvroCompatibilityChecker.NO_OP_CHECKER), - BACKWARD("BACKWARD", AvroCompatibilityChecker.BACKWARD_CHECKER), - FORWARD("FORWARD", AvroCompatibilityChecker.FORWARD_CHECKER), - FULL("FULL", AvroCompatibilityChecker.FULL_CHECKER); - - public final String name; - public final AvroCompatibilityChecker compatibilityChecker; - - private AvroCompatibilityLevel(String name, AvroCompatibilityChecker compatibilityChecker) { - this.name = name; - this.compatibilityChecker = compatibilityChecker; - } - - public static AvroCompatibilityLevel forName(String name) { - if (name == null) { - return null; - } - - name = name.toUpperCase(); - if (NONE.name.equals(name)) { - return NONE; - } else if (BACKWARD.name.equals(name)) { - return BACKWARD; - } else if (FORWARD.name.equals(name)) { - return FORWARD; - } else if (FULL.name.equals(name)) { - return FULL; - } else { - return null; - } - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/CachedSchemaRegistryClient.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/CachedSchemaRegistryClient.java deleted file mode 100644 index 296d87b1d58..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/CachedSchemaRegistryClient.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client; - -import wherehows.common.kafka.schemaregistry.client.rest.RestService; -import wherehows.common.kafka.schemaregistry.client.rest.entities.Config; -import wherehows.common.kafka.schemaregistry.client.rest.entities.SchemaString; -import wherehows.common.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest; -import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import org.apache.avro.Schema; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.IdentityHashMap; -import java.util.List; -import java.util.Map; - -public class CachedSchemaRegistryClient implements SchemaRegistryClient { - - private final RestService restService; - private final int identityMapCapacity; - private final Map> schemaCache; - private final Map> idCache; - private final Map> versionCache; - - private static final int defaultIdentityMapCapacity = 1000; - - public CachedSchemaRegistryClient(String baseUrl) { - this(baseUrl, defaultIdentityMapCapacity); - } - - public CachedSchemaRegistryClient(String baseUrl, int identityMapCapacity) { - this(new RestService(baseUrl), identityMapCapacity); - } - - public CachedSchemaRegistryClient(List baseUrls, int identityMapCapacity) { - this(new RestService(baseUrls), identityMapCapacity); - } - - public CachedSchemaRegistryClient(RestService restService, int identityMapCapacity) { - this.identityMapCapacity = identityMapCapacity; - this.schemaCache = new HashMap>(); - this.idCache = new HashMap>(); - this.versionCache = new HashMap>(); - this.restService = restService; - this.idCache.put(null, new HashMap()); - } - - private String registerAndGetId(String subject, Schema schema) - throws IOException, RestClientException { - return restService.registerSchema(schema.toString(), subject); - } - - private Schema getSchemaByIdFromRegistry(String id) throws IOException, RestClientException { - SchemaString restSchema = restService.getId(id); - return new Schema.Parser().parse(restSchema.getSchemaString()); - } - - private int getVersionFromRegistry(String subject, Schema schema) - throws IOException, RestClientException{ - wherehows.common.kafka.schemaregistry.client.rest.entities.Schema response = - restService.lookUpSubjectVersion(schema.toString(), subject); - return response.getVersion(); - } - - @Override - public synchronized String register(String subject, Schema schema) - throws IOException, RestClientException { - Map schemaIdMap; - if (schemaCache.containsKey(subject)) { - schemaIdMap = schemaCache.get(subject); - } else { - schemaIdMap = new IdentityHashMap(); - schemaCache.put(subject, schemaIdMap); - } - - if (schemaIdMap.containsKey(schema)) { - return schemaIdMap.get(schema); - } else { - if (schemaIdMap.size() >= identityMapCapacity) { - throw new IllegalStateException("Too many schema objects created for " + subject + "!"); - } - String id = registerAndGetId(subject, schema); - schemaIdMap.put(schema, id); - idCache.get(null).put(id, schema); - return id; - } - } - - @Override - public synchronized Schema getByID(String id) throws IOException, RestClientException { - return getBySubjectAndID(null, id); - } - - @Override - public synchronized Schema getBySubjectAndID(String subject, String id) - throws IOException, RestClientException { - - Map idSchemaMap; - if (idCache.containsKey(subject)) { - idSchemaMap = idCache.get(subject); - } else { - idSchemaMap = new HashMap(); - idCache.put(subject, idSchemaMap); - } - - if (idSchemaMap.containsKey(id)) { - return idSchemaMap.get(id); - } else { - Schema schema = getSchemaByIdFromRegistry(id); - idSchemaMap.put(id, schema); - return schema; - } - } - - @Override - public synchronized SchemaMetadata getLatestSchemaMetadata(String subject) - throws IOException, RestClientException { - wherehows.common.kafka.schemaregistry.client.rest.entities.Schema response - = restService.getLatestVersion(subject); - String id = response.getId(); - int version = response.getVersion(); - String schema = response.getSchema(); - return new SchemaMetadata(id, version, schema); - } - - @Override - public synchronized int getVersion(String subject, Schema schema) - throws IOException, RestClientException{ - Map schemaVersionMap; - if (versionCache.containsKey(subject)) { - schemaVersionMap = versionCache.get(subject); - } else { - schemaVersionMap = new IdentityHashMap(); - versionCache.put(subject, schemaVersionMap); - } - - if (schemaVersionMap.containsKey(schema)) { - return schemaVersionMap.get(schema); - } else { - if (schemaVersionMap.size() >= identityMapCapacity) { - throw new IllegalStateException("Too many schema objects created for " + subject + "!"); - } - int version = getVersionFromRegistry(subject, schema); - schemaVersionMap.put(schema, version); - return version; - } - } - - @Override - public boolean testCompatibility(String subject, Schema schema) throws IOException, RestClientException { - return restService.testCompatibility(schema.toString(), subject, "latest"); - } - - @Override - public String updateCompatibility(String subject, String compatibility) throws IOException, RestClientException { - ConfigUpdateRequest response = restService.updateCompatibility(compatibility, subject); - return response.getCompatibilityLevel(); - } - - @Override - public String getCompatibility(String subject) throws IOException, RestClientException { - Config response = restService.getConfig(subject); - return response.getCompatibilityLevel(); - } - - @Override - public Collection getAllSubjects() throws IOException, RestClientException { - return restService.getAllSubjects(); - } - -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/MockSchemaRegistryClient.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/MockSchemaRegistryClient.java deleted file mode 100644 index 2b60e2e1d5f..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/MockSchemaRegistryClient.java +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client; - -import org.apache.avro.Schema; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.IdentityHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import wherehows.common.kafka.schemaregistry.avro.AvroCompatibilityLevel; -import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException; - -/** - * Mock implementation of SchemaRegistryClient that can be used for tests. This version is NOT - * thread safe. Schema data is stored in memory and is not persistent or shared across instances. - */ -public class MockSchemaRegistryClient implements SchemaRegistryClient { - - private String defaultCompatibility = "BACKWARD"; - private final Map> schemaCache; - private final Map> idCache; - private final Map> versionCache; - private final Map compatibilityCache; - private final AtomicInteger ids; - - public MockSchemaRegistryClient() { - schemaCache = new HashMap>(); - idCache = new HashMap>(); - versionCache = new HashMap>(); - compatibilityCache = new HashMap(); - ids = new AtomicInteger(0); - idCache.put(null, new HashMap()); - } - - private String getIdFromRegistry(String subject, Schema schema) throws IOException { - Map idSchemaMap; - if (idCache.containsKey(subject)) { - idSchemaMap = idCache.get(subject); - for (Map.Entry entry: idSchemaMap.entrySet()) { - if (entry.getValue().toString().equals(schema.toString())) { - generateVersion(subject, schema); - return entry.getKey(); - } - } - } else { - idSchemaMap = new HashMap(); - } - String id = Integer.toString(ids.incrementAndGet()); - idSchemaMap.put(id, schema); - idCache.put(subject, idSchemaMap); - generateVersion(subject, schema); - return id; - } - - private void generateVersion(String subject, Schema schema) { - ArrayList versions = getAllVersions(subject); - Map schemaVersionMap; - int currentVersion; - if (versions.isEmpty()) { - schemaVersionMap = new IdentityHashMap(); - currentVersion = 1; - } else { - schemaVersionMap = versionCache.get(subject); - currentVersion = versions.get(versions.size() - 1) + 1; - } - schemaVersionMap.put(schema, currentVersion); - versionCache.put(subject, schemaVersionMap); - } - - private ArrayList getAllVersions(String subject) { - ArrayList versions = new ArrayList(); - if (versionCache.containsKey(subject)) { - versions.addAll(versionCache.get(subject).values()); - Collections.sort(versions); - } - return versions; - } - - private Schema getSchemaBySubjectAndIdFromRegistry(String subject, String id) throws IOException { - if (idCache.containsKey(subject)) { - Map idSchemaMap = idCache.get(subject); - if (idSchemaMap.containsKey(id)) { - return idSchemaMap.get(id); - } - } - throw new IOException("Cannot get schema from schema registry!"); - } - - @Override - public synchronized String register(String subject, Schema schema) - throws IOException, RestClientException { - Map schemaIdMap; - if (schemaCache.containsKey(subject)) { - schemaIdMap = schemaCache.get(subject); - } else { - schemaIdMap = new IdentityHashMap(); - schemaCache.put(subject, schemaIdMap); - } - - if (schemaIdMap.containsKey(schema)) { - return schemaIdMap.get(schema); - } else { - String id = getIdFromRegistry(subject, schema); - schemaIdMap.put(schema, id); - idCache.get(null).put(id, schema); - return id; - } - } - - @Override - public synchronized Schema getByID(String id) throws IOException, RestClientException { - return getBySubjectAndID(null, id); - } - - @Override - public synchronized Schema getBySubjectAndID(String subject, String id) - throws IOException, RestClientException { - Map idSchemaMap; - if (idCache.containsKey(subject)) { - idSchemaMap = idCache.get(subject); - } else { - idSchemaMap = new HashMap(); - idCache.put(subject, idSchemaMap); - } - - if (idSchemaMap.containsKey(id)) { - return idSchemaMap.get(id); - } else { - Schema schema = getSchemaBySubjectAndIdFromRegistry(subject, id); - idSchemaMap.put(id, schema); - return schema; - } - } - - private int getLatestVersion(String subject) - throws IOException, RestClientException { - ArrayList versions = getAllVersions(subject); - if (versions.isEmpty()) { - throw new IOException("No schema registered under subject!"); - } else { - return versions.get(versions.size() - 1); - } - } - - @Override - public synchronized SchemaMetadata getLatestSchemaMetadata(String subject) - throws IOException, RestClientException { - int version = getLatestVersion(subject); - String schemaString = null; - Map schemaVersionMap = versionCache.get(subject); - for (Map.Entry entry: schemaVersionMap.entrySet()) { - if (entry.getValue() == version) { - schemaString = entry.getKey().toString(); - } - } - String id = "-1"; - Map idSchemaMap = idCache.get(subject); - for (Map.Entry entry: idSchemaMap.entrySet()) { - if (entry.getValue().toString().equals(schemaString)) { - id = entry.getKey(); - } - } - return new SchemaMetadata(id, version, schemaString); - } - - @Override - public synchronized int getVersion(String subject, Schema schema) - throws IOException, RestClientException{ - if (versionCache.containsKey(subject)) { - return versionCache.get(subject).get(schema); - } else { - throw new IOException("Cannot get version from schema registry!"); - } - } - - @Override - public boolean testCompatibility(String subject, Schema newSchema) throws IOException, - RestClientException { - SchemaMetadata latestSchemaMetadata = getLatestSchemaMetadata(subject); - Schema latestSchema = getSchemaBySubjectAndIdFromRegistry(subject, latestSchemaMetadata.getId()); - String compatibility = compatibilityCache.get(subject); - if (compatibility == null) { - compatibility = defaultCompatibility; - } - - AvroCompatibilityLevel compatibilityLevel = AvroCompatibilityLevel.forName(compatibility); - if (compatibilityLevel == null) { - return false; - } - - return compatibilityLevel.compatibilityChecker.isCompatible(newSchema, latestSchema); - } - - @Override - public String updateCompatibility(String subject, String compatibility) throws IOException, - RestClientException { - if (subject == null) { - defaultCompatibility = compatibility; - return compatibility; - } - compatibilityCache.put(subject, compatibility); - return compatibility; - } - - @Override - public String getCompatibility(String subject) throws IOException, RestClientException { - String compatibility = compatibilityCache.get(subject); - if (compatibility == null) { - compatibility = defaultCompatibility; - } - return compatibility; - } - - @Override - public Collection getAllSubjects() throws IOException, RestClientException { - List results = new ArrayList<>(); - results.addAll(this.schemaCache.keySet()); - Collections.sort(results, String.CASE_INSENSITIVE_ORDER); - return results; - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/SchemaMetadata.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/SchemaMetadata.java deleted file mode 100644 index 068fc468544..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/SchemaMetadata.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client; - -public class SchemaMetadata { - private String id; - private int version; - private String schema; - - public SchemaMetadata(String id, int version, String schema) { - this.id = id; - this.version = version; - this.schema = schema; - - } - - public String getId() { - return id; - } - - public int getVersion() { - return version; - } - - public String getSchema() { - return schema; - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/SchemaRegistryClient.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/SchemaRegistryClient.java deleted file mode 100644 index df8dc8e9f40..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/SchemaRegistryClient.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client; - -import org.apache.avro.Schema; - -import java.io.IOException; -import java.util.Collection; - -import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException; - -public interface SchemaRegistryClient { - - public String register(String subject, Schema schema) throws IOException, RestClientException; - - public Schema getByID(String id) throws IOException, RestClientException; - - public Schema getBySubjectAndID(String subject, String id) throws IOException, RestClientException; - - public SchemaMetadata getLatestSchemaMetadata(String subject) throws IOException, RestClientException; - - public int getVersion(String subject, Schema schema) throws IOException, RestClientException; - - public boolean testCompatibility(String subject, Schema schema) throws IOException, RestClientException; - - public String updateCompatibility(String subject, String compatibility) throws IOException, RestClientException; - - public String getCompatibility(String subject) throws IOException, RestClientException; - - public Collection getAllSubjects() throws IOException, RestClientException; -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/RestService.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/RestService.java deleted file mode 100644 index f340b58afb0..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/RestService.java +++ /dev/null @@ -1,419 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.io.IOUtils; -import wherehows.common.kafka.schemaregistry.client.rest.entities.Config; -import wherehows.common.kafka.schemaregistry.client.rest.entities.ErrorMessage; -import wherehows.common.kafka.schemaregistry.client.rest.entities.Schema; -import wherehows.common.kafka.schemaregistry.client.rest.entities.SchemaString; -import wherehows.common.kafka.schemaregistry.client.rest.entities.requests.CompatibilityCheckResponse; -import wherehows.common.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest; -import wherehows.common.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; -import wherehows.common.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; -import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import wherehows.common.kafka.schemaregistry.client.rest.utils.UrlList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Rest access layer for sending requests to the schema registry. - */ -public class RestService { - - private static final Logger log = LoggerFactory.getLogger(RestService.class); - - private final static String PLAIN_STRING = "PLAIN_STRING"; - private final static String TYPED_RESPONSE = "TYPED_RESPONSE"; - private final static TypeReference PLAIN_STRING_RESPONSE_TYPE = null; - private final static TypeReference REGISTER_RESPONSE_TYPE = - new TypeReference() { - }; - private final static TypeReference GET_CONFIG_RESPONSE_TYPE = - new TypeReference() { - }; - private final static TypeReference GET_SCHEMA_BY_ID_RESPONSE_TYPE = - new TypeReference() { - }; - private final static TypeReference GET_SCHEMA_BY_VERSION_RESPONSE_TYPE = - new TypeReference() { - }; - private final static TypeReference> ALL_VERSIONS_RESPONSE_TYPE = - new TypeReference>() { - }; - private final static TypeReference> ALL_TOPICS_RESPONSE_TYPE = - new TypeReference>() { - }; - private final static TypeReference - COMPATIBILITY_CHECK_RESPONSE_TYPE_REFERENCE = - new TypeReference() { - }; - private final static TypeReference - SUBJECT_SCHEMA_VERSION_RESPONSE_TYPE_REFERENCE = - new TypeReference() { - }; - private final static TypeReference - UPDATE_CONFIG_RESPONSE_TYPE_REFERENCE = - new TypeReference() { - }; - - private String SUBJECT_PREFIX_FORMAT = "/subjects/%s"; - private String SUBJECT_ALL_PREFIX_FORMAT = "/subjects"; - private String REGISTER_SCHEMA_PREFIX_FORMAT = "/subjects/%s/versions"; - private String COMPATIBILITY_TEST_PREFIX_FORMAT = "/compatibility/subjects/%s/versions/%s"; - private String CONFIG_PREFIX_FORMAT = "/config/%s"; - private String CONFIG_ALL_PREFIX_FORMAT = "/config"; - private String SCHEMA_ID_PREFIX_FORMAT = "/schemas/id=%s"; // "/schemas/ids/%s" - private String VERSION_PREFIX_FORMAT = "/subjects/%s/versions/%d"; - private String VERSION_LATEST_PREFIX_FORMAT = "/subjects/%s/versions/latest"; - private String VERSION_ALL_PREFIX_FORMAT = "/subjects/%s/versions"; - - private static final int JSON_PARSE_ERROR_CODE = 50005; - private static ObjectMapper jsonDeserializer = new ObjectMapper(); - - public static final Map DEFAULT_REQUEST_PROPERTIES; - - static { - DEFAULT_REQUEST_PROPERTIES = new HashMap(); - DEFAULT_REQUEST_PROPERTIES.put("Content-Type", Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED); - DEFAULT_REQUEST_PROPERTIES.put("Response-Type", PLAIN_STRING); - } - - private UrlList baseUrls; - - public RestService(UrlList baseUrls) { - this.baseUrls = baseUrls; - } - - public RestService(List baseUrls) { - this(new UrlList(baseUrls)); - } - - public RestService(String baseUrlConfig) { - this(parseBaseUrl(baseUrlConfig)); - } - - - /** - * @param baseUrl HTTP connection will be established with this url. - * @param method HTTP method ("GET", "POST", "PUT", etc.) - * @param requestBodyData Bytes to be sent in the request body. - * @param requestProperties HTTP header properties. - * @param responseFormat Expected format of the response to the HTTP request. - * @param The type of the deserialized response to the HTTP request. - * @return The deserialized response to the HTTP request, or null if no data is expected. - */ - private T sendHttpRequest(String baseUrl, String method, byte[] requestBodyData, - Map requestProperties, - TypeReference responseFormat) - throws IOException, RestClientException { - log.debug(String.format("Sending %s with input %s to %s", - method, requestBodyData == null ? "null" : new String(requestBodyData), - baseUrl)); - - HttpURLConnection connection = null; - try { - URL url = new URL(baseUrl); - connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod(method); - - // connection.getResponseCode() implicitly calls getInputStream, so always set to true. - // On the other hand, leaving this out breaks nothing. - connection.setDoInput(true); - - for (Map.Entry entry : requestProperties.entrySet()) { - connection.setRequestProperty(entry.getKey(), entry.getValue()); - } - - connection.setUseCaches(false); - - if (requestBodyData != null) { - connection.setDoOutput(true); - OutputStream os = null; - try { - os = connection.getOutputStream(); - os.write(requestBodyData); - os.flush(); - } catch (IOException e) { - log.error("Failed to send HTTP request to endpoint: " + url, e); - throw e; - } finally { - if (os != null) os.close(); - } - } - - int responseCode = connection.getResponseCode(); - if (responseCode == HttpURLConnection.HTTP_OK) { - InputStream is = connection.getInputStream(); - T result = responseFormat != null ? jsonDeserializer.readValue(is, responseFormat) - : (T) IOUtils.toString(is); - is.close(); - return result; - } else if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) { - return null; - } else { - InputStream es = connection.getErrorStream(); - ErrorMessage errorMessage; - try { - errorMessage = jsonDeserializer.readValue(es, ErrorMessage.class); - } catch (JsonProcessingException e) { - errorMessage = new ErrorMessage(JSON_PARSE_ERROR_CODE, e.getMessage()); - } - es.close(); - throw new RestClientException(errorMessage.getMessage(), responseCode, - errorMessage.getErrorCode()); - } - - } finally { - if (connection != null) { - connection.disconnect(); - } - } - } - - private T httpRequest(String path, String method, - byte[] requestBodyData, Map requestProperties, - TypeReference responseFormat) throws IOException, RestClientException { - for (int i = 0, n = baseUrls.size(); i < n; i++) { - String baseUrl = baseUrls.current(); - try { - return sendHttpRequest(baseUrl + path, method, requestBodyData, requestProperties, responseFormat); - } catch (IOException e) { - baseUrls.fail(baseUrl); - if (i == n - 1) throw e; // Raise the exception since we have no more urls to try - } - } - throw new IOException("Internal HTTP retry error"); // Can't get here - } - - public Schema lookUpSubjectVersion(String schemaString, String subject) - throws IOException, RestClientException { - RegisterSchemaRequest request = new RegisterSchemaRequest(); - request.setSchema(schemaString); - return lookUpSubjectVersion(request, subject); - } - - public Schema lookUpSubjectVersion(RegisterSchemaRequest registerSchemaRequest, - String subject) - throws IOException, RestClientException { - return lookUpSubjectVersion(DEFAULT_REQUEST_PROPERTIES, registerSchemaRequest, subject); - } - - public Schema lookUpSubjectVersion(Map requestProperties, - RegisterSchemaRequest registerSchemaRequest, - String subject) - throws IOException, RestClientException { - String path = String.format(SUBJECT_PREFIX_FORMAT, subject); - - Schema schema = httpRequest(path, "POST", registerSchemaRequest.toJson().getBytes(), - requestProperties, SUBJECT_SCHEMA_VERSION_RESPONSE_TYPE_REFERENCE); - - return schema; - } - - public String registerSchema(String schemaString, String subject) - throws IOException, RestClientException { - RegisterSchemaRequest request = new RegisterSchemaRequest(); - request.setSchema(schemaString); - return registerSchema(request, subject); - } - - public String registerSchema(RegisterSchemaRequest registerSchemaRequest, String subject) - throws IOException, RestClientException { - return registerSchema(DEFAULT_REQUEST_PROPERTIES, registerSchemaRequest, subject); - } - - public String registerSchema(Map requestProperties, - RegisterSchemaRequest registerSchemaRequest, String subject) - throws IOException, RestClientException { - String path = String.format(REGISTER_SCHEMA_PREFIX_FORMAT, subject); - - RegisterSchemaResponse response = httpRequest(path, "POST", - registerSchemaRequest.toJson().getBytes(), requestProperties, REGISTER_RESPONSE_TYPE); - - return response.getId(); - } - - public boolean testCompatibility(String schemaString, String subject, String version) - throws IOException, RestClientException { - RegisterSchemaRequest request = new RegisterSchemaRequest(); - request.setSchema(schemaString); - return testCompatibility(request, subject, version); - } - - public boolean testCompatibility(RegisterSchemaRequest registerSchemaRequest, - String subject, - String version) - throws IOException, RestClientException { - return testCompatibility(DEFAULT_REQUEST_PROPERTIES, registerSchemaRequest, - subject, version); - } - - public boolean testCompatibility(Map requestProperties, - RegisterSchemaRequest registerSchemaRequest, - String subject, - String version) - throws IOException, RestClientException { - String path = String.format(COMPATIBILITY_TEST_PREFIX_FORMAT, subject, version); - - CompatibilityCheckResponse response = - httpRequest(path, "POST", registerSchemaRequest.toJson().getBytes(), - requestProperties, COMPATIBILITY_CHECK_RESPONSE_TYPE_REFERENCE); - return response.getIsCompatible(); - } - - public ConfigUpdateRequest updateCompatibility(String compatibility, String subject) - throws IOException, RestClientException { - ConfigUpdateRequest request = new ConfigUpdateRequest(); - request.setCompatibilityLevel(compatibility); - return updateConfig(request, subject); - } - - public ConfigUpdateRequest updateConfig(ConfigUpdateRequest configUpdateRequest, - String subject) - throws IOException, RestClientException { - return updateConfig(DEFAULT_REQUEST_PROPERTIES, configUpdateRequest, subject); - } - - /** - * On success, this api simply echoes the request in the response. - */ - public ConfigUpdateRequest updateConfig(Map requestProperties, - ConfigUpdateRequest configUpdateRequest, - String subject) - throws IOException, RestClientException { - String path = subject != null ? String.format(CONFIG_PREFIX_FORMAT, subject) : CONFIG_ALL_PREFIX_FORMAT; - - ConfigUpdateRequest response = - httpRequest(path, "PUT", configUpdateRequest.toJson().getBytes(), - requestProperties, UPDATE_CONFIG_RESPONSE_TYPE_REFERENCE); - return response; - } - - public Config getConfig(String subject) - throws IOException, RestClientException { - return getConfig(DEFAULT_REQUEST_PROPERTIES, subject); - } - - public Config getConfig(Map requestProperties, - String subject) - throws IOException, RestClientException { - String path = subject != null ? String.format(CONFIG_PREFIX_FORMAT, subject) : CONFIG_ALL_PREFIX_FORMAT; - - Config config = - httpRequest(path, "GET", null, requestProperties, GET_CONFIG_RESPONSE_TYPE); - return config; - } - - public SchemaString getId(String id) throws IOException, RestClientException { - return getId(DEFAULT_REQUEST_PROPERTIES, id); - } - - public SchemaString getId(Map requestProperties, - String id) throws IOException, RestClientException { - String path = String.format(SCHEMA_ID_PREFIX_FORMAT, id); - - SchemaString response; - if (requestProperties.containsKey("Response-Type") - && requestProperties.get("Response-Type").equals(PLAIN_STRING)) { - response = new SchemaString(httpRequest(path, "GET", null, requestProperties, PLAIN_STRING_RESPONSE_TYPE)); - } else { - response = httpRequest(path, "GET", null, requestProperties, GET_SCHEMA_BY_ID_RESPONSE_TYPE); - } - return response; - } - - public Schema getVersion(String subject, int version) throws IOException, RestClientException { - return getVersion(DEFAULT_REQUEST_PROPERTIES, subject, version); - } - - public Schema getVersion(Map requestProperties, - String subject, int version) - throws IOException, RestClientException { - String path = String.format(VERSION_PREFIX_FORMAT, subject, version); - - Schema response = httpRequest(path, "GET", null, requestProperties, - GET_SCHEMA_BY_VERSION_RESPONSE_TYPE); - return response; - } - - public Schema getLatestVersion(String subject) - throws IOException, RestClientException { - return getLatestVersion(DEFAULT_REQUEST_PROPERTIES, subject); - } - - public Schema getLatestVersion(Map requestProperties, - String subject) - throws IOException, RestClientException { - String path = String.format(VERSION_LATEST_PREFIX_FORMAT, subject); - - Schema response = httpRequest(path, "GET", null, requestProperties, - GET_SCHEMA_BY_VERSION_RESPONSE_TYPE); - return response; - } - - public List getAllVersions(String subject) - throws IOException, RestClientException { - return getAllVersions(DEFAULT_REQUEST_PROPERTIES, subject); - } - - public List getAllVersions(Map requestProperties, - String subject) - throws IOException, RestClientException { - String path = String.format(VERSION_ALL_PREFIX_FORMAT, subject); - - List response = httpRequest(path, "GET", null, requestProperties, - ALL_VERSIONS_RESPONSE_TYPE); - return response; - } - - public List getAllSubjects() - throws IOException, RestClientException { - return getAllSubjects(DEFAULT_REQUEST_PROPERTIES); - } - - public List getAllSubjects(Map requestProperties) - throws IOException, RestClientException { - List response = httpRequest(SUBJECT_ALL_PREFIX_FORMAT, "GET", null, requestProperties, - ALL_TOPICS_RESPONSE_TYPE); - return response; - } - - private static List parseBaseUrl(String baseUrl) { - List baseUrls = Arrays.asList(baseUrl.split("\\s*,\\s*")); - if (baseUrls.isEmpty()) { - throw new IllegalArgumentException("Missing required schema registry url list"); - } - return baseUrls; - } - - public UrlList getBaseUrls() { - return baseUrls; - } - -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/Versions.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/Versions.java deleted file mode 100644 index a208c1e3e99..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/Versions.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest; - -import java.util.Arrays; -import java.util.List; - -public class Versions { - - public static final String SCHEMA_REGISTRY_V1_JSON = "application/vnd.schemaregistry.v1+json"; - // Default weight = 1 - public static final String SCHEMA_REGISTRY_V1_JSON_WEIGHTED = SCHEMA_REGISTRY_V1_JSON; - // These are defaults that track the most recent API version. These should always be specified - // anywhere the latest version is produced/consumed. - public static final String SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT = SCHEMA_REGISTRY_V1_JSON; - public static final String SCHEMA_REGISTRY_DEFAULT_JSON = "application/vnd.schemaregistry+json"; - public static final String SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED = - SCHEMA_REGISTRY_DEFAULT_JSON + - "; qs=0.9"; - public static final String JSON = "application/json"; - public static final String JSON_WEIGHTED = JSON + "; qs=0.5"; - - public static final List PREFERRED_RESPONSE_TYPES = Arrays - .asList(Versions.SCHEMA_REGISTRY_V1_JSON, Versions.SCHEMA_REGISTRY_DEFAULT_JSON, - Versions.JSON); - - // This type is completely generic and carries no actual information about the type of data, but - // it is the default for request entities if no content type is specified. Well behaving users - // of the API will always specify the content type, but ad hoc use may omit it. We treat this as - // JSON since that's all we currently support. - public static final String GENERIC_REQUEST = "application/octet-stream"; -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/Config.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/Config.java deleted file mode 100644 index 3a2e62e8df0..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/Config.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest.entities; - -import org.codehaus.jackson.annotate.JsonProperty; - -public class Config { - - private String compatibilityLevel; - - public Config(@JsonProperty("compatibility") String compatibilityLevel) { - this.compatibilityLevel = compatibilityLevel; - } - - public Config() { - compatibilityLevel = null; - } - - @JsonProperty("compatibility") - public String getCompatibilityLevel() { - return compatibilityLevel; - } - - @JsonProperty("compatibility") - public void setCompatibilityLevel(String compatibilityLevel) { - this.compatibilityLevel = compatibilityLevel; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - Config that = (Config) o; - - if (!this.compatibilityLevel.equals(that.compatibilityLevel)) { - return false; - } - return true; - } - - @Override - public int hashCode() { - int result = 31 * compatibilityLevel.hashCode(); - return result; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("{compatibilityLevel=" + this.compatibilityLevel + "}"); - return sb.toString(); - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/ErrorMessage.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/ErrorMessage.java deleted file mode 100644 index a9e093bcd6f..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/ErrorMessage.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest.entities; - -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - * Generic JSON error message. - */ -public class ErrorMessage { - - private int errorCode; - private String message; - - public ErrorMessage(@JsonProperty("error_code") int errorCode, - @JsonProperty("message") String message) { - this.errorCode = errorCode; - this.message = message; - } - - @JsonProperty("error_code") - public int getErrorCode() { - return errorCode; - } - - @JsonProperty("error_code") - public void setErrorCode(int error_code) { - this.errorCode = error_code; - } - - @JsonProperty - public String getMessage() { - return message; - } - - @JsonProperty - public void setMessage(String message) { - this.message = message; - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/Schema.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/Schema.java deleted file mode 100644 index 825e2ce8ef6..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/Schema.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest.entities; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class Schema implements Comparable { - private String subject; - private Integer version; - private String id; - private String schema; - - public Schema(@JsonProperty("subject") String subject, - @JsonProperty("version") Integer version, - @JsonProperty("id") String id, - @JsonProperty("schema") String schema) { - this.subject = subject; - this.version = version; - this.id = id; - this.schema = schema; - } - - @JsonProperty("subject") - public String getSubject() { - return subject; - } - - @JsonProperty("subject") - public void setSubject(String subject) { - this.subject = subject; - } - - @JsonProperty("version") - public Integer getVersion() { - return this.version; - } - - @JsonProperty("version") - public void setVersion(Integer version) { - this.version = version; - } - - @JsonProperty("id") - public String getId() { - return this.id; - } - - @JsonProperty("id") - public void setId(String id) { - this.id = id; - } - - @JsonProperty("schema") - public String getSchema() { - return this.schema; - } - - @JsonProperty("schema") - public void setSchema(String schema) { - this.schema = schema; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - Schema that = (Schema) o; - - if (!this.subject.equals(that.subject)) { - return false; - } - if (!this.version.equals(that.version)) { - return false; - } - if (!this.id.equals(that.getId())) { - return false; - } - if (!this.schema.equals(that.schema)) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = subject.hashCode(); - result = 31 * result + version; - result = 31 * result + id.hashCode(); - result = 31 * result + schema.hashCode(); - return result; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("{subject=" + this.subject + ","); - sb.append("version=" + this.version + ","); - sb.append("id=" + this.id + ","); - sb.append("schema=" + this.schema + "}"); - return sb.toString(); - } - - @Override - public int compareTo(Schema that) { - int result = this.subject.compareTo(that.subject); - if (result != 0) { - return result; - } - result = this.version - that.version; - return result; - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/SchemaString.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/SchemaString.java deleted file mode 100644 index cce57b0f015..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/SchemaString.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest.entities; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; - -public class SchemaString { - private String schemaString; - - public SchemaString() { - - } - - public SchemaString(String schemaString) { - this.schemaString = schemaString; - } - - public static SchemaString fromJson(String json) throws IOException { - return new ObjectMapper().readValue(json, SchemaString.class); - } - - @JsonProperty("schema") - public String getSchemaString() { - return schemaString; - } - - @JsonProperty("schema") - public void setSchemaString(String schemaString) { - this.schemaString = schemaString; - } - - public String toJson() throws IOException { - return new ObjectMapper().writeValueAsString(this); - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/CompatibilityCheckResponse.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/CompatibilityCheckResponse.java deleted file mode 100644 index b96e1bfe45c..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/CompatibilityCheckResponse.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest.entities.requests; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; - -public class CompatibilityCheckResponse { - private boolean isCompatible; - - public static CompatibilityCheckResponse fromJson(String json) throws IOException { - return new ObjectMapper().readValue(json, CompatibilityCheckResponse.class); - } - - @JsonProperty("is_compatible") - public boolean getIsCompatible() { - return isCompatible; - } - - @JsonProperty("is_compatible") - public void setIsCompatible(boolean isCompatible) { - this.isCompatible = isCompatible; - } - - public String toJson() throws IOException { - return new ObjectMapper().writeValueAsString(this); - } - -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/ConfigUpdateRequest.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/ConfigUpdateRequest.java deleted file mode 100644 index 569037dc0be..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/ConfigUpdateRequest.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest.entities.requests; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; - -public class ConfigUpdateRequest { - - private String compatibilityLevel; - - public static ConfigUpdateRequest fromJson(String json) throws IOException { - return new ObjectMapper().readValue(json, ConfigUpdateRequest.class); - } - - @JsonProperty("compatibility") - public String getCompatibilityLevel() { - return this.compatibilityLevel; - } - - @JsonProperty("compatibility") - public void setCompatibilityLevel(String compatibilityLevel) { - this.compatibilityLevel = compatibilityLevel; - } - - public String toJson() throws IOException { - return new ObjectMapper().writeValueAsString(this); - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/RegisterSchemaRequest.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/RegisterSchemaRequest.java deleted file mode 100644 index d23c63585d9..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/RegisterSchemaRequest.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest.entities.requests; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; - -public class RegisterSchemaRequest { - private String schema; - - public static RegisterSchemaRequest fromJson(String json) throws IOException { - return new ObjectMapper().readValue(json, RegisterSchemaRequest.class); - } - - @JsonProperty("schema") - public String getSchema() { - return this.schema; - } - - @JsonProperty("schema") - public void setSchema(String schema) { - this.schema = schema; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - - RegisterSchemaRequest that = (RegisterSchemaRequest) o; - - if (schema != null ? !schema.equals(that.schema) : that.schema != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (schema != null ? schema.hashCode() : 0); - return result; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("{schema=" + this.schema + "}"); - return sb.toString(); - } - - public String toJson() throws IOException { - return new ObjectMapper().writeValueAsString(this); - } - -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/RegisterSchemaResponse.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/RegisterSchemaResponse.java deleted file mode 100644 index 81a63644048..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/entities/requests/RegisterSchemaResponse.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest.entities.requests; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; - -public class RegisterSchemaResponse { - private String id; - - public static RegisterSchemaResponse fromJson(String json) throws IOException { - return new ObjectMapper().readValue(json, RegisterSchemaResponse.class); - } - - @JsonProperty("id") - public String getId() { - return id; - } - - @JsonProperty("id") - public void setId(String id) { - this.id = id; - } - - public String toJson() throws IOException { - return new ObjectMapper().writeValueAsString(this); - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/exceptions/RestClientException.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/exceptions/RestClientException.java deleted file mode 100644 index 83c9de1cead..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/exceptions/RestClientException.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest.exceptions; - -public class RestClientException extends Exception { - private final int status; - private final int errorCode; - - public RestClientException(final String message, final int status, final int errorCode) { - super(message + "; error code: " + errorCode); - this.status = status; - this.errorCode = errorCode; - } - - public int getStatus() { - return status; - } - - public int getErrorCode() { - return errorCode; - } -} \ No newline at end of file diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/utils/UrlList.java b/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/utils/UrlList.java deleted file mode 100644 index 5e6e2722360..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/schemaregistry/client/rest/utils/UrlList.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.schemaregistry.client.rest.utils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * This class manages a set of urls for accessing an upstream registry. It basically - * maintains a pointer to a known good url which can be accessed through {@link #current()}. - * When a request against the current url fails, the {@link #fail(String)} method is invoked, - * and we'll move on to the next url (returning back to the start if we have to). - * - */ -public class UrlList { - private final AtomicInteger index; - private final List urls; - - public UrlList(List urls) { - if (urls == null || urls.isEmpty()) { - throw new IllegalArgumentException("Expected at least one URL to be passed in constructor"); - } - - this.urls = new ArrayList(urls); - this.index = new AtomicInteger(new Random().nextInt(urls.size())); - } - - public UrlList(String url) { - this(Arrays.asList(url)); - } - - /** - * Get the current url - * @return the url - */ - public String current() { - return urls.get(index.get()); - } - - /** - * Declare the given url as failed. This will cause the urls to - * rotate, so that the next request will be done against a new url - * (if one exists). - * @param url the url that has failed - */ - public void fail(String url) { - int currentIndex = index.get(); - if (urls.get(currentIndex).equals(url)) { - index.compareAndSet(currentIndex, (currentIndex+1)%urls.size()); - } - } - - /** - * The number of unique urls contained in this collection. - * @return the count of urls - */ - public int size() { - return urls.size(); - } - - @Override - public String toString() { - return urls.toString(); - } - -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroDeserializer.java b/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroDeserializer.java deleted file mode 100644 index 2ea2538fa56..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroDeserializer.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package wherehows.common.kafka.serializers; - -import java.util.Arrays; -import javax.xml.bind.DatatypeConverter; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericContainer; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.specific.SpecificData; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificRecord; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.errors.SerializationException; -import org.codehaus.jackson.node.JsonNodeFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import wherehows.common.kafka.SchemaId; -import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import kafka.utils.VerifiableProperties; - -public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe { - public static final String SCHEMA_REGISTRY_SCHEMA_VERSION_PROP = - "schema.registry.schema.version"; - - private final DecoderFactory decoderFactory = DecoderFactory.get(); - protected boolean useSpecificAvroReader = false; - private final Map readerSchemaCache = new ConcurrentHashMap(); - - - /** - * Sets properties for this deserializer without overriding the schema registry client itself. - * Useful for testing, where a mock client is injected. - */ - protected void configure(KafkaAvroDeserializerConfig config) { - configureClientProperties(config); - useSpecificAvroReader = config - .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG); - } - - protected KafkaAvroDeserializerConfig deserializerConfig(Map props) { - try { - return new KafkaAvroDeserializerConfig(props); - } catch (io.confluent.common.config.ConfigException e) { - throw new ConfigException(e.getMessage()); - } - } - - protected KafkaAvroDeserializerConfig deserializerConfig(VerifiableProperties props) { - try { - return new KafkaAvroDeserializerConfig(props.props()); - } catch (io.confluent.common.config.ConfigException e) { - throw new ConfigException(e.getMessage()); - } - } - - private ByteBuffer getByteBuffer(byte[] payload) { - ByteBuffer buffer = ByteBuffer.wrap(payload); - if (buffer.get() != MAGIC_BYTE) { - throw new SerializationException("Unknown magic byte!"); - } - return buffer; - } - - /** - * Deserializes the payload without including schema information for primitive types, maps, and arrays. Just the resulting - * deserialized object is returned. - * - * This behavior is the norm for Decoders/Deserializers. - * - * @param payload serialized data - * @return the deserialized object - * @throws SerializationException - */ - protected Object deserialize(byte[] payload) throws SerializationException { - return deserialize(false, null, null, payload, null); - } - - /** - * Just like single-parameter version but take topic string as a parameter - * - * @param includeSchemaAndVersion boolean - * @param topic String - * @param payload serialized data - * @return the deserialized object - * @throws SerializationException - */ - protected Object deserialize(boolean includeSchemaAndVersion, String topic, byte[] payload) throws SerializationException { - return deserialize(includeSchemaAndVersion, topic, null, payload, null); - } - - /** - * Just like single-parameter version but accepts an Avro schema to use for reading - * - * @param payload serialized data - * @param readerSchema schema to use for Avro read (optional, enables Avro projection) - * @return the deserialized object - * @throws SerializationException - */ - protected Object deserialize(byte[] payload, Schema readerSchema) throws SerializationException { - return deserialize(false, null, null, payload, readerSchema); - } - - // The Object return type is a bit messy, but this is the simplest way to have flexible decoding and not duplicate - // deserialization code multiple times for different variants. - protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException { - // Even if the caller requests schema & version, if the payload is null we cannot include it. The caller must handle - // this case. - if (payload == null) { - return null; - } - - String id = "-1"; // int id = -1; - try { - ByteBuffer buffer = getByteBuffer(payload); - //id = buffer.getInt(); - id = SchemaId.getIdString(ID_TYPE, buffer); - String subject = includeSchemaAndVersion ? getSubjectName(topic, isKey) : null; - Schema schema = schemaRegistry.getBySubjectAndID(subject, id); - int length = buffer.limit() - 1 - ID_TYPE.size(); - final Object result; - if (schema.getType().equals(Schema.Type.BYTES)) { - byte[] bytes = new byte[length]; - buffer.get(bytes, 0, length); - result = bytes; - } else { - int start = buffer.position() + buffer.arrayOffset(); - DatumReader reader = getDatumReader(schema, readerSchema); - Object object = reader.read(null, decoderFactory.binaryDecoder(buffer.array(), start, length, null)); - - if (schema.getType().equals(Schema.Type.STRING)) { - object = object.toString(); // Utf8 -> String - } - result = object; - } - - if (includeSchemaAndVersion) { - // Annotate the schema with the version. Note that we only do this if the schema + - // version are requested, i.e. in Kafka Connect converters. This is critical because that - // code *will not* rely on exact schema equality. Regular deserializers *must not* include - // this information because it would return schemas which are not equivalent. - // - // Note, however, that we also do not fill in the connect.version field. This allows the - // Converter to let a version provided by a Kafka Connect source take priority over the - // schema registry's ordering (which is implicit by auto-registration time rather than - // explicit from the Connector). - Integer version = schemaRegistry.getVersion(subject, schema); - if (schema.getType() == Schema.Type.UNION) { - // Can't set additional properties on a union schema since it's just a list, so set it - // on the first non-null entry - for (Schema memberSchema : schema.getTypes()) { - if (memberSchema.getType() != Schema.Type.NULL) { - memberSchema.addProp(SCHEMA_REGISTRY_SCHEMA_VERSION_PROP, - JsonNodeFactory.instance.numberNode(version)); - break; - } - } - } else { - schema.addProp(SCHEMA_REGISTRY_SCHEMA_VERSION_PROP, - JsonNodeFactory.instance.numberNode(version)); - } - if (schema.getType().equals(Schema.Type.RECORD)) { - return result; - } else { - return new NonRecordContainer(schema, result); - } - } else { - return result; - } - } catch (IOException | RuntimeException e) { - // avro deserialization may throw AvroRuntimeException, NullPointerException, etc - throw new SerializationException("Error deserializing Avro message for id " + id, e); - } catch (RestClientException e) { - byte[] initialBytes = Arrays.copyOf(payload, 40); - throw new SerializationException("Error retrieving Avro schema for topic " + topic + " id " + id - + ", initial bytes " + DatatypeConverter.printHexBinary(initialBytes).toLowerCase(), e); - } - } - - /** - * Deserializes the payload and includes schema information, with version information from the schema registry embedded - * in the schema. - * - * @param payload the serialized data - * @return a GenericContainer with the schema and data, either as a {@link NonRecordContainer}, - * {@link org.apache.avro.generic.GenericRecord}, or {@link SpecificRecord} - * @throws SerializationException - */ - protected GenericContainer deserializeWithSchemaAndVersion(String topic, boolean isKey, byte[] payload) throws SerializationException { - return (GenericContainer) deserialize(true, topic, isKey, payload, null); - } - - private DatumReader getDatumReader(Schema writerSchema, Schema readerSchema) { - if (useSpecificAvroReader) { - if (readerSchema == null) { - readerSchema = getReaderSchema(writerSchema); - } - return new SpecificDatumReader(writerSchema, readerSchema); - } else { - if (readerSchema == null) { - return new GenericDatumReader(writerSchema); - } - return new GenericDatumReader(writerSchema, readerSchema); - } - } - - @SuppressWarnings("unchecked") - private Schema getReaderSchema(Schema writerSchema) { - Schema readerSchema = readerSchemaCache.get(writerSchema.getFullName()); - if (readerSchema == null) { - Class readerClass = SpecificData.get().getClass(writerSchema); - if (readerClass != null) { - try { - readerSchema = readerClass.newInstance().getSchema(); - } catch (InstantiationException e) { - throw new SerializationException(writerSchema.getFullName() + " specified by the " + - "writers schema could not be instantiated to find the readers schema."); - } catch (IllegalAccessException e) { - throw new SerializationException(writerSchema.getFullName() + " specified by the " + - "writers schema is not allowed to be instantiated to find the readers schema."); - } - readerSchemaCache.put(writerSchema.getFullName(), readerSchema); - } else { - throw new SerializationException("Could not find class " + writerSchema.getFullName() + - " specified in writer's schema whilst finding reader's schema for a SpecificRecord."); - } - } - return readerSchema; - } - -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerDe.java b/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerDe.java deleted file mode 100644 index f7b533e4d39..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerDe.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package wherehows.common.kafka.serializers; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericContainer; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.errors.SerializationException; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import wherehows.common.kafka.SchemaId; -import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; -import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException; - -/** - * Common fields and helper methods for both the serializer and the deserializer. - */ -public abstract class AbstractKafkaAvroSerDe { - protected static final byte MAGIC_BYTE = 0x0; - //protected static final int idSize = 4; - protected static final SchemaId.Type ID_TYPE = SchemaId.Type.UUID; - - private static final Map primitiveSchemas; - protected SchemaRegistryClient schemaRegistry; - - static { - Schema.Parser parser = new Schema.Parser(); - primitiveSchemas = new HashMap<>(); - primitiveSchemas.put("Null", createPrimitiveSchema(parser, "null")); - primitiveSchemas.put("Boolean", createPrimitiveSchema(parser, "boolean")); - primitiveSchemas.put("Integer", createPrimitiveSchema(parser, "int")); - primitiveSchemas.put("Long", createPrimitiveSchema(parser, "long")); - primitiveSchemas.put("Float", createPrimitiveSchema(parser, "float")); - primitiveSchemas.put("Double", createPrimitiveSchema(parser, "double")); - primitiveSchemas.put("String", createPrimitiveSchema(parser, "string")); - primitiveSchemas.put("Bytes", createPrimitiveSchema(parser, "bytes")); - } - - private static Schema createPrimitiveSchema(Schema.Parser parser, String type) { - String schemaString = String.format("{\"type\" : \"%s\"}", type); - return parser.parse(schemaString); - } - - protected void configureClientProperties(AbstractKafkaAvroSerDeConfig config) { - try { - List urls = config.getSchemaRegistryUrls(); - int maxSchemaObject = config.getMaxSchemasPerSubject(); - - if (schemaRegistry == null) { - schemaRegistry = new CachedSchemaRegistryClient(urls, maxSchemaObject); - } - } catch (io.confluent.common.config.ConfigException e) { - throw new ConfigException(e.getMessage()); - } - } - - /** - * Get the subject name for the given topic and value type. - */ - protected static String getSubjectName(String topic, boolean isKey) { - if (isKey) { - return topic + "-key"; - } else { - return topic + "-value"; - } - } - - /** - * Get the subject name used by the old Encoder interface, which relies only on the value type rather than the topic. - */ - protected static String getOldSubjectName(Object value) { - if (value instanceof GenericContainer) { - return ((GenericContainer) value).getSchema().getName() + "-value"; - } else { - throw new SerializationException("Primitive types are not supported yet"); - } - } - - protected Schema getSchema(Object object) { - if (object == null) { - return primitiveSchemas.get("Null"); - } else if (object instanceof Boolean) { - return primitiveSchemas.get("Boolean"); - } else if (object instanceof Integer) { - return primitiveSchemas.get("Integer"); - } else if (object instanceof Long) { - return primitiveSchemas.get("Long"); - } else if (object instanceof Float) { - return primitiveSchemas.get("Float"); - } else if (object instanceof Double) { - return primitiveSchemas.get("Double"); - } else if (object instanceof CharSequence) { - return primitiveSchemas.get("String"); - } else if (object instanceof byte[]) { - return primitiveSchemas.get("Bytes"); - } else if (object instanceof GenericContainer) { - return ((GenericContainer) object).getSchema(); - } else { - throw new IllegalArgumentException( - "Unsupported Avro type. Supported types are null, Boolean, Integer, Long, " - + "Float, Double, String, byte[] and IndexedRecord"); - } - } - - public String register(String subject, Schema schema) throws IOException, RestClientException { - return schemaRegistry.register(subject, schema); - } - - public Schema getByID(String id) throws IOException, RestClientException { - return schemaRegistry.getByID(id); - } - - public Schema getBySubjectAndID(String subject, String id) throws IOException, RestClientException { - return schemaRegistry.getBySubjectAndID(subject, id); - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerDeConfig.java b/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerDeConfig.java deleted file mode 100644 index 4d7ccd80fb0..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerDeConfig.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.serializers; - -import io.confluent.common.config.AbstractConfig; -import io.confluent.common.config.ConfigDef; -import io.confluent.common.config.ConfigDef.Type; -import io.confluent.common.config.ConfigDef.Importance; - -import java.util.List; -import java.util.Map; - -/** - * Base class for configs for serializers and deserializers, defining a few common configs and - * defaults. - */ -public class AbstractKafkaAvroSerDeConfig extends AbstractConfig { - - public static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url"; - public static final String SCHEMA_REGISTRY_URL_DOC = - "Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas."; - - public static final String MAX_SCHEMAS_PER_SUBJECT_CONFIG = "max.schemas.per.subject"; - public static final int MAX_SCHEMAS_PER_SUBJECT_DEFAULT = 1000; - public static final String MAX_SCHEMAS_PER_SUBJECT_DOC = - "Maximum number of schemas to create or cache locally."; - - public static ConfigDef baseConfigDef() { - return new ConfigDef() - .define(SCHEMA_REGISTRY_URL_CONFIG, Type.LIST, - Importance.HIGH, SCHEMA_REGISTRY_URL_DOC) - .define(MAX_SCHEMAS_PER_SUBJECT_CONFIG, Type.INT, MAX_SCHEMAS_PER_SUBJECT_DEFAULT, - Importance.LOW, MAX_SCHEMAS_PER_SUBJECT_DOC); - } - - public AbstractKafkaAvroSerDeConfig(ConfigDef config, Map props) { - super(config, props); - } - - public int getMaxSchemasPerSubject(){ - return this.getInt(MAX_SCHEMAS_PER_SUBJECT_CONFIG); - } - - public List getSchemaRegistryUrls(){ - return this.getList(SCHEMA_REGISTRY_URL_CONFIG); - } - -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerializer.java b/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerializer.java deleted file mode 100644 index 65d15d8f06c..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/AbstractKafkaAvroSerializer.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package wherehows.common.kafka.serializers; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.avro.specific.SpecificRecord; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.errors.SerializationException; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Map; - -import wherehows.common.kafka.SchemaId; -import wherehows.common.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import kafka.utils.VerifiableProperties; - -public abstract class AbstractKafkaAvroSerializer extends AbstractKafkaAvroSerDe { - private final EncoderFactory encoderFactory = EncoderFactory.get(); - - protected void configure(KafkaAvroSerializerConfig config) { - configureClientProperties(config); - } - - protected KafkaAvroSerializerConfig serializerConfig(Map props) { - try { - return new KafkaAvroSerializerConfig(props); - } catch (io.confluent.common.config.ConfigException e) { - throw new ConfigException(e.getMessage()); - } - } - - protected KafkaAvroSerializerConfig serializerConfig(VerifiableProperties props) { - try { - return new KafkaAvroSerializerConfig(props.props()); - } catch (io.confluent.common.config.ConfigException e) { - throw new ConfigException(e.getMessage()); - } - } - - protected byte[] serializeImpl(String subject, Object object) throws SerializationException { - Schema schema = null; - // null needs to treated specially since the client most likely just wants to send - // an individual null value instead of making the subject a null type. Also, null in - // Kafka has a special meaning for deletion in a topic with the compact retention policy. - // Therefore, we will bypass schema registration and return a null value in Kafka, instead - // of an Avro encoded null. - if (object == null) { - return null; - } - - try { - schema = getSchema(object); - String id = schemaRegistry.register(subject, schema); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - out.write(MAGIC_BYTE); - //out.write(ByteBuffer.allocate(idSize).putInt(id).array()); - out.write(SchemaId.getIdBytes(ID_TYPE, id)); - if (object instanceof byte[]) { - out.write((byte[]) object); - } else { - BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null); - DatumWriter writer; - Object value = object instanceof NonRecordContainer ? ((NonRecordContainer) object).getValue() : object; - if (value instanceof SpecificRecord) { - writer = new SpecificDatumWriter<>(schema); - } else { - writer = new GenericDatumWriter<>(schema); - } - writer.write(value, encoder); - encoder.flush(); - } - byte[] bytes = out.toByteArray(); - out.close(); - return bytes; - } catch (IOException | RuntimeException e) { - // avro serialization can throw AvroRuntimeException, NullPointerException, - // ClassCastException, etc - throw new SerializationException("Error serializing Avro message", e); - } catch (RestClientException e) { - throw new SerializationException("Error registering Avro schema: " + schema, e); - } - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDecoder.java b/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDecoder.java deleted file mode 100644 index d9faa369a00..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDecoder.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.serializers; - -import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; -import kafka.serializer.Decoder; -import kafka.utils.VerifiableProperties; -import org.apache.avro.Schema; - -public class KafkaAvroDecoder extends AbstractKafkaAvroDeserializer implements Decoder { - - public KafkaAvroDecoder(SchemaRegistryClient schemaRegistry) { - this.schemaRegistry = schemaRegistry; - } - - public KafkaAvroDecoder(SchemaRegistryClient schemaRegistry, VerifiableProperties props) { - this.schemaRegistry = schemaRegistry; - configure(deserializerConfig(props)); - } - - /** - * Constructor used by Kafka consumer. - */ - public KafkaAvroDecoder(VerifiableProperties props) { - configure(new KafkaAvroDeserializerConfig(props.props())); - } - - @Override - public Object fromBytes(byte[] bytes) { - return deserialize(bytes); - } - - /** - * Pass a reader schema to get an Avro projection - */ - public Object fromBytes(byte[] bytes, Schema readerSchema) { return deserialize(bytes, readerSchema); } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDeserializer.java b/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDeserializer.java deleted file mode 100644 index 2b7c1a6746f..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDeserializer.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.serializers; - -import org.apache.avro.Schema; -import org.apache.kafka.common.serialization.Deserializer; - -import java.util.Map; - -import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; - -public class KafkaAvroDeserializer extends AbstractKafkaAvroDeserializer - implements Deserializer { - - private boolean isKey; - - /** - * Constructor used by Kafka consumer. - */ - public KafkaAvroDeserializer() { - - } - - public KafkaAvroDeserializer(SchemaRegistryClient client) { - schemaRegistry = client; - } - - public KafkaAvroDeserializer(SchemaRegistryClient client, Map props) { - schemaRegistry = client; - configure(deserializerConfig(props)); - } - - @Override - public void configure(Map configs, boolean isKey) { - this.isKey = isKey; - configure(new KafkaAvroDeserializerConfig(configs)); - } - - @Override - public Object deserialize(String s, byte[] bytes) { - // return deserialize(bytes); - return deserialize(false, s, bytes); - } - - /** - * Pass a reader schema to get an Avro projection - */ - public Object deserialize(String s, byte[] bytes, Schema readerSchema) { - return deserialize(bytes, readerSchema); - } - - @Override - public void close() { - - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDeserializerConfig.java b/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDeserializerConfig.java deleted file mode 100644 index 5631384e44c..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroDeserializerConfig.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.serializers; - -import io.confluent.common.config.ConfigDef; -import io.confluent.common.config.ConfigDef.Type; -import io.confluent.common.config.ConfigDef.Importance; - -import java.util.Map; - -public class KafkaAvroDeserializerConfig extends AbstractKafkaAvroSerDeConfig { - - public static final String SPECIFIC_AVRO_READER_CONFIG = "specific.avro.reader"; - public static final boolean SPECIFIC_AVRO_READER_DEFAULT = false; - public static final String SPECIFIC_AVRO_READER_DOC = - "If true, tries to look up the SpecificRecord class "; - - private static ConfigDef config; - - static { - config = baseConfigDef() - .define(SPECIFIC_AVRO_READER_CONFIG, Type.BOOLEAN, SPECIFIC_AVRO_READER_DEFAULT, - Importance.LOW, SPECIFIC_AVRO_READER_DOC); - } - - public KafkaAvroDeserializerConfig(Map props) { - super(config, props); - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroEncoder.java b/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroEncoder.java deleted file mode 100644 index b1f06a1a741..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroEncoder.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.serializers; - -import org.apache.avro.generic.IndexedRecord; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.errors.SerializationException; - -import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; -import kafka.serializer.Encoder; -import kafka.utils.VerifiableProperties; - -/** - * At present, this only works for value not key and it only supports IndexedRecord. To register for - * a topic, user will need to provide a list of record_name:topic_name map as a CSV format. By - * default, the encoder will use record name as topic. - */ -public class KafkaAvroEncoder extends AbstractKafkaAvroSerializer implements Encoder { - - public KafkaAvroEncoder(SchemaRegistryClient schemaRegistry) { - this.schemaRegistry = schemaRegistry; - } - - /** - * Constructor used by Kafka producer. - */ - public KafkaAvroEncoder(VerifiableProperties props) { - configure(serializerConfig(props)); - } - - @Override - public byte[] toBytes(Object object) { - return serializeImpl(getOldSubjectName(object), object); - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroSerializer.java b/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroSerializer.java deleted file mode 100644 index 6781d9d2e55..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroSerializer.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.serializers; - -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.serialization.Serializer; - -import java.util.Map; - -import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; - -public class KafkaAvroSerializer extends AbstractKafkaAvroSerializer implements Serializer { - - private boolean isKey; - - /** - * Constructor used by Kafka producer. - */ - public KafkaAvroSerializer() { - - } - - public KafkaAvroSerializer(SchemaRegistryClient client) { - schemaRegistry = client; - } - - public KafkaAvroSerializer(SchemaRegistryClient client, Map props) { - schemaRegistry = client; - configure(serializerConfig(props)); - } - - @Override - public void configure(Map configs, boolean isKey) { - this.isKey = isKey; - configure(new KafkaAvroSerializerConfig(configs)); - } - - @Override - public byte[] serialize(String topic, Object record) { - return serializeImpl(getSubjectName(topic, isKey), record); - } - - @Override - public void close() { - - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroSerializerConfig.java b/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroSerializerConfig.java deleted file mode 100644 index 60cbc014cde..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/KafkaAvroSerializerConfig.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.serializers; - -import io.confluent.common.config.ConfigDef; -import io.confluent.common.config.ConfigDef.Importance; -import io.confluent.common.config.ConfigDef.Type; - -import java.util.Map; - -public class KafkaAvroSerializerConfig extends AbstractKafkaAvroSerDeConfig { - private static ConfigDef config; - - static { - config = baseConfigDef(); - } - - public KafkaAvroSerializerConfig(Map props) { - super(config, props); - } -} diff --git a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/NonRecordContainer.java b/wherehows-common/src/main/java/wherehows/common/kafka/serializers/NonRecordContainer.java deleted file mode 100644 index f31c47a2b1a..00000000000 --- a/wherehows-common/src/main/java/wherehows/common/kafka/serializers/NonRecordContainer.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package wherehows.common.kafka.serializers; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericContainer; -import org.apache.kafka.common.errors.SerializationException; - -import java.util.Objects; - -/** - * Wrapper for all non-record types that includes the schema for the data. - */ -public class NonRecordContainer implements GenericContainer { - private final Schema schema; - private final Object value; - - public NonRecordContainer(Schema schema, Object value) { - if (schema == null) - throw new SerializationException("Schema may not be null."); - this.schema = schema; - this.value = value; - } - - @Override - public Schema getSchema() { - return schema; - } - - public Object getValue() { - return value; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NonRecordContainer that = (NonRecordContainer) o; - return Objects.equals(schema, that.schema) && - Objects.equals(value, that.value); - } - - @Override - public int hashCode() { - return Objects.hash(schema, value); - } -}