Update MetastoreAuditProcessor to reduce storage, also refactor some code

This commit is contained in:
Yi Wang 2016-10-11 11:26:36 -07:00
parent 5049c847fa
commit fcd6cf149e
12 changed files with 45 additions and 25 deletions

View File

@ -29,11 +29,11 @@ import metadata.etl.models.EtlJobName;
import models.daos.ClusterDao; import models.daos.ClusterDao;
import models.daos.EtlJobDao; import models.daos.EtlJobDao;
import msgs.KafkaResponseMsg; import msgs.KafkaCommMsg;
import play.Logger; import play.Logger;
import play.Play; import play.Play;
import utils.KafkaConfig; import models.kafka.KafkaConfig;
import utils.KafkaConfig.Topic; import models.kafka.KafkaConfig.Topic;
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient; import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.utils.ClusterUtil; import wherehows.common.utils.ClusterUtil;
@ -132,9 +132,9 @@ public class KafkaConsumerMaster extends UntypedActor {
@Override @Override
public void onReceive(Object message) public void onReceive(Object message)
throws Exception { throws Exception {
if (message instanceof KafkaResponseMsg) { if (message instanceof KafkaCommMsg) {
final KafkaResponseMsg kafkaMsg = (KafkaResponseMsg) message; final KafkaCommMsg kafkaCommMsg = (KafkaCommMsg) message;
Logger.debug("Got kafka response message: " + kafkaMsg.toString()); Logger.debug(kafkaCommMsg.toString());
} else { } else {
unhandled(message); unhandled(message);
} }

View File

@ -24,7 +24,7 @@ import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata; import kafka.message.MessageAndMetadata;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import msgs.KafkaResponseMsg; import msgs.KafkaCommMsg;
import play.Logger; import play.Logger;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.kafka.serializers.KafkaAvroDeserializer; import wherehows.common.kafka.serializers.KafkaAvroDeserializer;

View File

@ -20,7 +20,7 @@ import java.util.Map;
import java.util.List; import java.util.List;
import models.daos.DatasetDao; import models.daos.DatasetDao;
import models.daos.UserDao; import models.daos.UserDao;
import models.utils.Urn; import utils.Urn;
import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.dao.EmptyResultDataAccessException;
import play.Logger; import play.Logger;
import play.libs.Json; import play.libs.Json;

View File

@ -19,7 +19,7 @@ import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import models.daos.DatasetInfoDao; import models.daos.DatasetInfoDao;
import models.utils.Urn; import utils.Urn;
import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.dao.EmptyResultDataAccessException;
import play.Logger; import play.Logger;
import play.libs.Json; import play.libs.Json;

View File

@ -19,7 +19,7 @@ import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import models.daos.LineageDao; import models.daos.LineageDao;
import models.utils.Urn; import utils.Urn;
import play.libs.Json; import play.libs.Json;
import play.mvc.BodyParser; import play.mvc.BodyParser;
import play.mvc.Controller; import play.mvc.Controller;

View File

@ -22,7 +22,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import models.utils.Urn; import utils.Urn;
import org.springframework.dao.DataAccessException; import org.springframework.dao.DataAccessException;
import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.dao.EmptyResultDataAccessException;
import play.Logger; import play.Logger;

View File

@ -20,7 +20,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import models.utils.Urn; import utils.Urn;
import utils.JdbcUtil; import utils.JdbcUtil;
import wherehows.common.schemas.LineageRecord; import wherehows.common.schemas.LineageRecord;
import wherehows.common.utils.PartitionPatternMatcher; import wherehows.common.utils.PartitionPatternMatcher;

View File

@ -41,8 +41,8 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
public Record process(GenericData.Record record, String topic) public Record process(GenericData.Record record, String topic)
throws Exception { throws Exception {
if (record != null) { if (record != null && record.get("name") != null) {
String name = (String) record.get("name"); final String name = record.get("name").toString();
// only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor" // only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor"
if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) if (name.equals(DALI_LIMITED_RETENTION_AUDITOR)
|| name.equals(DALI_AUTOPURGED_AUDITOR) || name.equals(DALI_AUTOPURGED_AUDITOR)
@ -52,10 +52,10 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
String hasError = metadata.get("HasError"); String hasError = metadata.get("HasError");
if (!hasError.equalsIgnoreCase("true")) { if (!hasError.equalsIgnoreCase("true")) {
String datasetUrn = metadata.get("DatasetPath"); String datasetPath = metadata.get("DatasetPath");
String datasetUrn = DATASET_URN_PREFIX + (datasetPath.startsWith("/") ? "" : "/") + datasetPath;
String ownerUrns = metadata.get("OwnerURNs"); String ownerUrns = metadata.get("OwnerURNs");
DatasetInfoDao.updateKafkaDatasetOwner(DATASET_URN_PREFIX + datasetUrn, ownerUrns, DATASET_OWNER_SOURCE, DatasetInfoDao.updateKafkaDatasetOwner(datasetUrn, ownerUrns, DATASET_OWNER_SOURCE, timestamp);
timestamp);
} }
} }
} }

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/ */
package utils; package models.kafka;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.HashMap; import java.util.HashMap;
@ -22,6 +22,7 @@ import metadata.etl.models.EtlJobName;
import models.daos.EtlJobPropertyDao; import models.daos.EtlJobPropertyDao;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import play.Logger; import play.Logger;
import utils.JdbcUtil;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.writers.DatabaseWriter; import wherehows.common.writers.DatabaseWriter;

View File

@ -86,8 +86,8 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
eventRecord = new MetastoreAuditRecord(server, instance, appName, eventName, eventType, timestamp); eventRecord = new MetastoreAuditRecord(server, instance, appName, eventName, eventType, timestamp);
eventRecord.setEventInfo(metastoreThriftUri, metastoreVersion, isSuccessful, isDataDeleted); eventRecord.setEventInfo(metastoreThriftUri, metastoreVersion, isSuccessful, isDataDeleted);
eventRecord.setTableInfo(dbName, tableName, partition, location, owner, createTime, lastAccessTime); eventRecord.setTableInfo(dbName, tableName, partition, location, owner, createTime, lastAccessTime);
eventRecord.setOldInfo(StringUtil.toStringReplaceNull(oldInfo, null)); // eventRecord.setOldInfo(StringUtil.toStringReplaceNull(oldInfo, null));
eventRecord.setNewInfo(StringUtil.toStringReplaceNull(newInfo, null)); // eventRecord.setNewInfo(StringUtil.toStringReplaceNull(newInfo, null));
} }
return eventRecord; return eventRecord;
} }

View File

@ -20,15 +20,21 @@ import java.util.Map;
/** /**
* Generic communication message between KafkaConsumerMaster and KafkaConsumerWorker * Generic communication message between KafkaConsumerMaster and KafkaConsumerWorker
*/ */
public class KafkaResponseMsg { public class KafkaCommMsg {
// Message type: AUDIT, AUDIT_RESPONSE, FLUSH, FLUSH_RESPONSE, HEARTBEAT, etc
private String msgType; private String msgType;
// Kafka topic of the worker
private String topic; private String topic;
// Kafka worker thread number
private int thread;
// Message content
private Map<String, Object> content; private Map<String, Object> content;
public KafkaResponseMsg(String msgType, String topic) { public KafkaCommMsg(String msgType, String topic, int thread) {
this.msgType = msgType; this.msgType = msgType;
this.topic = topic; this.topic = topic;
this.thread = thread;
this.content = new HashMap<>(); this.content = new HashMap<>();
} }
@ -48,6 +54,14 @@ public class KafkaResponseMsg {
this.topic = topic; this.topic = topic;
} }
public int getThread() {
return thread;
}
public void setThread(int thread) {
this.thread = thread;
}
public Map<String, Object> getContent() { public Map<String, Object> getContent() {
return content; return content;
} }
@ -56,8 +70,13 @@ public class KafkaResponseMsg {
this.content = content; this.content = content;
} }
public void putContent(String key, Object value) {
this.content.put(key, value);
}
@Override @Override
public String toString() { public String toString() {
return "KafkaResponseMsg [type=" + msgType + ", topic=" + topic + ", " + content.toString() + "]"; return "KafkaCommMsg [type=" + msgType + ", topic=" + topic + ", thread=" + thread + ", content="
+ content.toString() + "]";
} }
} }

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/ */
package models.utils; package utils;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -19,7 +19,7 @@ import play.Logger;
/** /**
* Urn class used for urn convertion * Urn class used for urn conversion
* Created by zsun on 1/15/15. * Created by zsun on 1/15/15.
*/ */
public class Urn { public class Urn {