Update Kafka processor casting Object to String, also add debug info if can't fetch schema from Registery

This commit is contained in:
Yi Wang 2016-09-26 15:06:33 -07:00
parent bc06c1a882
commit ac34eb683f
8 changed files with 96 additions and 54 deletions

View File

@ -56,11 +56,11 @@ public class KafkaConsumerWorker extends UntypedActor {
if (message.equals("Start")) {
Logger.info("Starting Thread: " + _threadId + " for topic: " + _topic);
final ConsumerIterator<byte[], byte[]> it = _kafkaStream.iterator();
final Deserializer<Object> avroDeserializer = new KafkaAvroDeserializer(_schemaRegistryRestfulClient);
while (it.hasNext()) { // block for next input
try {
MessageAndMetadata<byte[], byte[]> msg = it.next();
Deserializer<Object> avroDeserializer = new KafkaAvroDeserializer(_schemaRegistryRestfulClient);
GenericData.Record kafkaMsgRecord = (GenericData.Record) avroDeserializer.deserialize(_topic, msg.message());
// Logger.debug("Kafka worker ThreadId " + _threadId + " Topic " + _topic + " record: " + rec);

View File

@ -41,18 +41,20 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
* @throws Exception
*/
@Override
public Record process(GenericData.Record record, String topic) throws Exception {
public Record process(GenericData.Record record, String topic)
throws Exception {
GobblinTrackingCompactionRecord eventRecord = null;
// only handle namespace "compaction.tracking.events"
if (record != null && record.get("namespace").equals("compaction.tracking.events")) {
final String name = (String) record.get("name");
if (record != null && record.get("namespace") != null && record.get("name") != null
&& "compaction.tracking.events".equals(record.get("namespace").toString())) {
final String name = record.get("name").toString();
// for event name "CompactionCompleted" or "CompactionRecordCounts"
if (name.equals("CompactionCompleted") || name.equals("CompactionRecordCounts")) {
// logger.info("Processing Gobblin tracking event record: " + name);
final long timestamp = (long) record.get("timestamp");
final Map<String, String> metadata = (Map<String, String>) record.get("metadata");
final Map<String, String> metadata = convertObjectMapToStringMap(record.get("metadata"));
final String jobContext = "Gobblin:" + name;
final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));
@ -90,8 +92,8 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
lateRecordCount = parseLong(metadata.get("LateRecordCount"));
}
eventRecord = new GobblinTrackingCompactionRecord(timestamp, jobContext,
cluster, projectName, flowId, jobId, execId);
eventRecord =
new GobblinTrackingCompactionRecord(timestamp, jobContext, cluster, projectName, flowId, jobId, execId);
eventRecord.setDatasetUrn(dataset, partitionType, partitionName);
eventRecord.setRecordCount(recordCount);
eventRecord.setLateRecordCount(lateRecordCount);

View File

@ -13,12 +13,10 @@
*/
package metadata.etl.kafka;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.avro.generic.GenericData;
import wherehows.common.schemas.ClusterInfo;
import wherehows.common.schemas.GobblinTrackingDistcpNgRecord;
import wherehows.common.schemas.Record;
import wherehows.common.utils.ClusterUtil;
@ -41,24 +39,26 @@ public class GobblinTrackingDistcpNgProcessor extends KafkaConsumerProcessor {
* @throws Exception
*/
@Override
public Record process(GenericData.Record record, String topic) throws Exception {
public Record process(GenericData.Record record, String topic)
throws Exception {
GobblinTrackingDistcpNgRecord eventRecord = null;
// handle namespace "gobblin.copy.CopyDataPublisher"
if (record != null && record.get("namespace").equals("gobblin.copy.CopyDataPublisher")) {
final String name = (String) record.get("name");
if (record != null && record.get("namespace") != null && record.get("name") != null
&& "gobblin.copy.CopyDataPublisher".equals(record.get("namespace").toString())) {
final String name = record.get("name").toString();
if (name.equals("DatasetPublished")) { // || name.equals("FilePublished")) {
// logger.info("Processing Gobblin tracking event record: " + name);
final long timestamp = (long) record.get("timestamp");
final Map<String, String> metadata = (Map<String, String>) record.get("metadata");
final Map<String, String> metadata = convertObjectMapToStringMap(record.get("metadata"));
final String jobContext = "DistcpNG:" + name;
final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));
final String projectName = metadata.get("azkabanProjectName");
final String flowId = metadata.get("azkabanFlowId");
final String jobId = metadata.get("azkabanJobId");
final int execId = Integer.parseInt(metadata.get("azkabanExecId"));
final int execId = parseInteger(metadata.get("azkabanExecId"));
// final String metricContextId = metadata.get("metricContextID");
// final String metricContextName = metadata.get("metricContextName");
@ -83,8 +83,8 @@ public class GobblinTrackingDistcpNgProcessor extends KafkaConsumerProcessor {
}
}
eventRecord = new GobblinTrackingDistcpNgRecord(timestamp, jobContext,
cluster, projectName, flowId, jobId, execId);
eventRecord =
new GobblinTrackingDistcpNgRecord(timestamp, jobContext, cluster, projectName, flowId, jobId, execId);
eventRecord.setDatasetUrn(dataset, partitionType, partitionName);
eventRecord.setEventInfo(upstreamTimestamp, originTimestamp, sourcePath, targetPath);
}

View File

@ -40,7 +40,6 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
private final String PartitionEpochRegex = "(\\d+)-\\w+-\\d+";
private final Pattern PartitionEpochPattern = Pattern.compile(PartitionEpochRegex);
/**
* Process a Gobblin tracking event lumos record
* @param record
@ -48,15 +47,17 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
* @throws Exception
*/
@Override
public Record process(GenericData.Record record, String topic) throws Exception {
public Record process(GenericData.Record record, String topic)
throws Exception {
GobblinTrackingLumosRecord eventRecord = null;
if (record != null) {
final String name = (String) record.get("name");
if (record != null && record.get("namespace") != null && record.get("name") != null) {
final String name = record.get("name").toString();
// only handle "DeltaPublished" and "SnapshotPublished"
if (name.equals("DeltaPublished") || name.equals("SnapshotPublished")) {
final long timestamp = (long) record.get("timestamp");
final Map<String, String> metadata = (Map<String, String>) record.get("metadata");
final Map<String, String> metadata = convertObjectMapToStringMap(record.get("metadata"));
// logger.info("Processing Gobblin tracking event record: " + name + ", timestamp: " + timestamp);
final String jobContext = "Lumos:" + name;
@ -116,10 +117,10 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
}
}
eventRecord = new GobblinTrackingLumosRecord(timestamp, cluster,
jobContext, projectName, flowId, jobId, execId);
eventRecord.setDatasetUrn(dataset, targetDirectory, partitionType, partitionName,
subpartitionType, subpartitionName);
eventRecord =
new GobblinTrackingLumosRecord(timestamp, cluster, jobContext, projectName, flowId, jobId, execId);
eventRecord.setDatasetUrn(dataset, targetDirectory, partitionType, partitionName, subpartitionType,
subpartitionName);
eventRecord.setMaxDataDate(maxDataDateEpoch3, maxDataKey);
eventRecord.setSource(datacenter, devEnv, sourceDatabase, sourceTable);
eventRecord.setRecordCount(recordCount);

View File

@ -13,6 +13,8 @@
*/
package metadata.etl.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,9 +40,9 @@ public abstract class KafkaConsumerProcessor {
/**
* Parse Integer value from a String, if null or exception, return 0
* Parse Long value from a String, if null or exception, return 0
* @param text String
* @return int
* @return long
*/
protected long parseLong(String text) {
try {
@ -50,4 +52,30 @@ public abstract class KafkaConsumerProcessor {
}
}
/**
* Parse Integer value from a String, if null or exception, return 0
* @param text String
* @return int
*/
protected int parseInteger(String text) {
try {
return Integer.parseInt(text);
} catch (NumberFormatException e) {
return 0;
}
}
/**
* Convert Object with type Map<Object, Object> to Map<String, String>
* @param obj Object with type Map<Object, Object>
* @return Map <String, String>
*/
protected Map<String, String> convertObjectMapToStringMap(Object obj) {
final Map<Object, Object> map = (Map<Object, Object>) obj;
final Map<String, String> metadata = new HashMap<>();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
metadata.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
}
return metadata;
}
}

View File

@ -36,9 +36,9 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
// logger.info("Processing Metastore Audit event record.");
final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader");
final String server = ClusterUtil.matchClusterCode(utf8ToString(auditHeader.get("server")));
final String instance = utf8ToString(auditHeader.get("instance"));
final String appName = utf8ToString(auditHeader.get("appName"));
final String server = ClusterUtil.matchClusterCode(String.valueOf(auditHeader.get("server")));
final String instance = String.valueOf(auditHeader.get("instance"));
final String appName = String.valueOf(auditHeader.get("appName"));
String eventName;
GenericData.Record content;
@ -63,20 +63,20 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
throw new IllegalArgumentException("Unknown Metastore Audit event: " + record);
}
final String eventType = utf8ToString(content.get("eventType"));
final String metastoreThriftUri = utf8ToString(content.get("metastoreThriftUri"));
final String metastoreVersion = utf8ToString(content.get("metastoreVersion"));
final String eventType = String.valueOf(content.get("eventType"));
final String metastoreThriftUri = String.valueOf(content.get("metastoreThriftUri"));
final String metastoreVersion = String.valueOf(content.get("metastoreVersion"));
final long timestamp = (long) content.get("timestamp");
final String isSuccessful = utf8ToString(content.get("isSuccessful"));
final String isDataDeleted = utf8ToString(content.get("isDataDeleted"));
final String isSuccessful = String.valueOf(content.get("isSuccessful"));
final String isDataDeleted = String.valueOf(content.get("isDataDeleted"));
// use newOne, if null, use oldOne
final GenericData.Record rec = newInfo != null ? (GenericData.Record) newInfo : (GenericData.Record) oldInfo;
final String dbName = utf8ToString(rec.get("dbName"));
final String tableName = utf8ToString(rec.get("tableName"));
final String partition = utf8ToString(rec.get("values"));
final String location = utf8ToString(rec.get("location"));
final String owner = utf8ToString(rec.get("owner"));
final String dbName = String.valueOf(rec.get("dbName"));
final String tableName = String.valueOf(rec.get("tableName"));
final String partition = String.valueOf(rec.get("values"));
final String location = String.valueOf(rec.get("location"));
final String owner = String.valueOf(rec.get("owner"));
final long createTime = (long) rec.get("createTime");
final long lastAccessTime = (long) rec.get("lastAccessTime");
@ -85,18 +85,9 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
// set null partition to '?' for primary key
eventRecord.setTableInfo(dbName, tableName, (partition != null ? partition : "?"),
location, owner, createTime, lastAccessTime);
eventRecord.setOldInfo(utf8ToString(oldInfo));
eventRecord.setNewInfo(utf8ToString(newInfo));
eventRecord.setOldInfo(String.valueOf(oldInfo));
eventRecord.setNewInfo(String.valueOf(newInfo));
}
return eventRecord;
}
/**
* Cast utf8 text to String, also handle null
* @param text utf8
* @return String
*/
private String utf8ToString(Object text) {
return text == null ? null : text.toString();
}
}

View File

@ -13,6 +13,8 @@
*/
package wherehows.common.kafka.serializers;
import java.util.Arrays;
import javax.xml.bind.DatatypeConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericDatumReader;
@ -91,6 +93,19 @@ public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSer
return deserialize(false, null, null, payload, null);
}
/**
* Just like single-parameter version but take topic string as a parameter
*
* @param includeSchemaAndVersion boolean
* @param topic String
* @param payload serialized data
* @return the deserialized object
* @throws SerializationException
*/
protected Object deserialize(boolean includeSchemaAndVersion, String topic, byte[] payload) throws SerializationException {
return deserialize(includeSchemaAndVersion, topic, null, payload, null);
}
/**
* Just like single-parameter version but accepts an Avro schema to use for reading
*
@ -173,7 +188,9 @@ public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSer
// avro deserialization may throw AvroRuntimeException, NullPointerException, etc
throw new SerializationException("Error deserializing Avro message for id " + id, e);
} catch (RestClientException e) {
throw new SerializationException("Error retrieving Avro schema for id " + id, e);
byte[] initialBytes = Arrays.copyOf(payload, 40);
throw new SerializationException("Error retrieving Avro schema for topic " + topic + " id " + id
+ ", initial bytes " + DatatypeConverter.printHexBinary(initialBytes).toLowerCase(), e);
}
}

View File

@ -50,13 +50,16 @@ public class KafkaAvroDeserializer extends AbstractKafkaAvroDeserializer
@Override
public Object deserialize(String s, byte[] bytes) {
return deserialize(bytes);
// return deserialize(bytes);
return deserialize(false, s, bytes);
}
/**
* Pass a reader schema to get an Avro projection
*/
public Object deserialize(String s, byte[] bytes, Schema readerSchema) { return deserialize(bytes, readerSchema); }
public Object deserialize(String s, byte[] bytes, Schema readerSchema) {
return deserialize(bytes, readerSchema);
}
@Override
public void close() {