diff --git a/backend-service/app/actors/KafkaConsumerMaster.java b/backend-service/app/actors/KafkaConsumerMaster.java index 8d6864eb1e..a3175935d0 100644 --- a/backend-service/app/actors/KafkaConsumerMaster.java +++ b/backend-service/app/actors/KafkaConsumerMaster.java @@ -29,11 +29,11 @@ import metadata.etl.models.EtlJobName; import models.daos.ClusterDao; import models.daos.EtlJobDao; -import msgs.KafkaResponseMsg; +import msgs.KafkaCommMsg; import play.Logger; import play.Play; -import utils.KafkaConfig; -import utils.KafkaConfig.Topic; +import models.kafka.KafkaConfig; +import models.kafka.KafkaConfig.Topic; import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient; import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; import wherehows.common.utils.ClusterUtil; @@ -132,9 +132,9 @@ public class KafkaConsumerMaster extends UntypedActor { @Override public void onReceive(Object message) throws Exception { - if (message instanceof KafkaResponseMsg) { - final KafkaResponseMsg kafkaMsg = (KafkaResponseMsg) message; - Logger.debug("Got kafka response message: " + kafkaMsg.toString()); + if (message instanceof KafkaCommMsg) { + final KafkaCommMsg kafkaCommMsg = (KafkaCommMsg) message; + Logger.debug(kafkaCommMsg.toString()); } else { unhandled(message); } diff --git a/backend-service/app/actors/KafkaConsumerWorker.java b/backend-service/app/actors/KafkaConsumerWorker.java index 61be6d8260..d7ffe002f7 100644 --- a/backend-service/app/actors/KafkaConsumerWorker.java +++ b/backend-service/app/actors/KafkaConsumerWorker.java @@ -24,7 +24,7 @@ import kafka.consumer.KafkaStream; import kafka.message.MessageAndMetadata; import akka.actor.UntypedActor; -import msgs.KafkaResponseMsg; +import msgs.KafkaCommMsg; import play.Logger; import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; import wherehows.common.kafka.serializers.KafkaAvroDeserializer; diff --git a/backend-service/app/controllers/DatasetController.java b/backend-service/app/controllers/DatasetController.java index 15b01a1c3b..13b97498f0 100644 --- a/backend-service/app/controllers/DatasetController.java +++ b/backend-service/app/controllers/DatasetController.java @@ -20,7 +20,7 @@ import java.util.Map; import java.util.List; import models.daos.DatasetDao; import models.daos.UserDao; -import models.utils.Urn; +import utils.Urn; import org.springframework.dao.EmptyResultDataAccessException; import play.Logger; import play.libs.Json; diff --git a/backend-service/app/controllers/DatasetInfoController.java b/backend-service/app/controllers/DatasetInfoController.java index fe0209873c..745d71dfbd 100644 --- a/backend-service/app/controllers/DatasetInfoController.java +++ b/backend-service/app/controllers/DatasetInfoController.java @@ -19,7 +19,7 @@ import java.sql.SQLException; import java.util.List; import java.util.Map; import models.daos.DatasetInfoDao; -import models.utils.Urn; +import utils.Urn; import org.springframework.dao.EmptyResultDataAccessException; import play.Logger; import play.libs.Json; diff --git a/backend-service/app/controllers/LineageController.java b/backend-service/app/controllers/LineageController.java index a289f24114..c752f9dfe0 100644 --- a/backend-service/app/controllers/LineageController.java +++ b/backend-service/app/controllers/LineageController.java @@ -19,7 +19,7 @@ import java.sql.SQLException; import java.util.List; import java.util.Map; import models.daos.LineageDao; -import models.utils.Urn; +import utils.Urn; import play.libs.Json; import play.mvc.BodyParser; import play.mvc.Controller; diff --git a/backend-service/app/models/daos/DatasetInfoDao.java b/backend-service/app/models/daos/DatasetInfoDao.java index 4a7a456a6f..86cdc4afb2 100644 --- a/backend-service/app/models/daos/DatasetInfoDao.java +++ b/backend-service/app/models/daos/DatasetInfoDao.java @@ -22,7 +22,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import models.utils.Urn; +import utils.Urn; import org.springframework.dao.DataAccessException; import org.springframework.dao.EmptyResultDataAccessException; import play.Logger; diff --git a/backend-service/app/models/daos/LineageDao.java b/backend-service/app/models/daos/LineageDao.java index d6c58f0975..9c0c713e7f 100644 --- a/backend-service/app/models/daos/LineageDao.java +++ b/backend-service/app/models/daos/LineageDao.java @@ -20,7 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; -import models.utils.Urn; +import utils.Urn; import utils.JdbcUtil; import wherehows.common.schemas.LineageRecord; import wherehows.common.utils.PartitionPatternMatcher; diff --git a/backend-service/app/models/kafka/GobblinTrackingAuditProcessor.java b/backend-service/app/models/kafka/GobblinTrackingAuditProcessor.java index 784ad2d012..047db185ff 100644 --- a/backend-service/app/models/kafka/GobblinTrackingAuditProcessor.java +++ b/backend-service/app/models/kafka/GobblinTrackingAuditProcessor.java @@ -41,8 +41,8 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor { public Record process(GenericData.Record record, String topic) throws Exception { - if (record != null) { - String name = (String) record.get("name"); + if (record != null && record.get("name") != null) { + final String name = record.get("name").toString(); // only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor" if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) || name.equals(DALI_AUTOPURGED_AUDITOR) @@ -52,10 +52,10 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor { String hasError = metadata.get("HasError"); if (!hasError.equalsIgnoreCase("true")) { - String datasetUrn = metadata.get("DatasetPath"); + String datasetPath = metadata.get("DatasetPath"); + String datasetUrn = DATASET_URN_PREFIX + (datasetPath.startsWith("/") ? "" : "/") + datasetPath; String ownerUrns = metadata.get("OwnerURNs"); - DatasetInfoDao.updateKafkaDatasetOwner(DATASET_URN_PREFIX + datasetUrn, ownerUrns, DATASET_OWNER_SOURCE, - timestamp); + DatasetInfoDao.updateKafkaDatasetOwner(datasetUrn, ownerUrns, DATASET_OWNER_SOURCE, timestamp); } } } diff --git a/backend-service/app/utils/KafkaConfig.java b/backend-service/app/models/kafka/KafkaConfig.java similarity index 99% rename from backend-service/app/utils/KafkaConfig.java rename to backend-service/app/models/kafka/KafkaConfig.java index 3b8c8062c7..0eaf70bc6a 100644 --- a/backend-service/app/utils/KafkaConfig.java +++ b/backend-service/app/models/kafka/KafkaConfig.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package utils; +package models.kafka; import java.lang.reflect.Method; import java.util.HashMap; @@ -22,6 +22,7 @@ import metadata.etl.models.EtlJobName; import models.daos.EtlJobPropertyDao; import org.apache.avro.generic.GenericData; import play.Logger; +import utils.JdbcUtil; import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; import wherehows.common.writers.DatabaseWriter; diff --git a/backend-service/app/models/kafka/MetastoreAuditProcessor.java b/backend-service/app/models/kafka/MetastoreAuditProcessor.java index 662bafcd1b..85558487bc 100644 --- a/backend-service/app/models/kafka/MetastoreAuditProcessor.java +++ b/backend-service/app/models/kafka/MetastoreAuditProcessor.java @@ -86,8 +86,8 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor { eventRecord = new MetastoreAuditRecord(server, instance, appName, eventName, eventType, timestamp); eventRecord.setEventInfo(metastoreThriftUri, metastoreVersion, isSuccessful, isDataDeleted); eventRecord.setTableInfo(dbName, tableName, partition, location, owner, createTime, lastAccessTime); - eventRecord.setOldInfo(StringUtil.toStringReplaceNull(oldInfo, null)); - eventRecord.setNewInfo(StringUtil.toStringReplaceNull(newInfo, null)); + // eventRecord.setOldInfo(StringUtil.toStringReplaceNull(oldInfo, null)); + // eventRecord.setNewInfo(StringUtil.toStringReplaceNull(newInfo, null)); } return eventRecord; } diff --git a/backend-service/app/msgs/KafkaResponseMsg.java b/backend-service/app/msgs/KafkaCommMsg.java similarity index 66% rename from backend-service/app/msgs/KafkaResponseMsg.java rename to backend-service/app/msgs/KafkaCommMsg.java index 1399d08251..988b90ced4 100644 --- a/backend-service/app/msgs/KafkaResponseMsg.java +++ b/backend-service/app/msgs/KafkaCommMsg.java @@ -20,15 +20,21 @@ import java.util.Map; /** * Generic communication message between KafkaConsumerMaster and KafkaConsumerWorker */ -public class KafkaResponseMsg { +public class KafkaCommMsg { + // Message type: AUDIT, AUDIT_RESPONSE, FLUSH, FLUSH_RESPONSE, HEARTBEAT, etc private String msgType; + // Kafka topic of the worker private String topic; + // Kafka worker thread number + private int thread; + // Message content private Map content; - public KafkaResponseMsg(String msgType, String topic) { + public KafkaCommMsg(String msgType, String topic, int thread) { this.msgType = msgType; this.topic = topic; + this.thread = thread; this.content = new HashMap<>(); } @@ -48,6 +54,14 @@ public class KafkaResponseMsg { this.topic = topic; } + public int getThread() { + return thread; + } + + public void setThread(int thread) { + this.thread = thread; + } + public Map getContent() { return content; } @@ -56,8 +70,13 @@ public class KafkaResponseMsg { this.content = content; } + public void putContent(String key, Object value) { + this.content.put(key, value); + } + @Override public String toString() { - return "KafkaResponseMsg [type=" + msgType + ", topic=" + topic + ", " + content.toString() + "]"; + return "KafkaCommMsg [type=" + msgType + ", topic=" + topic + ", thread=" + thread + ", content=" + + content.toString() + "]"; } } diff --git a/backend-service/app/models/utils/Urn.java b/backend-service/app/utils/Urn.java similarity index 98% rename from backend-service/app/models/utils/Urn.java rename to backend-service/app/utils/Urn.java index 658cba47dd..3740abf2d7 100644 --- a/backend-service/app/models/utils/Urn.java +++ b/backend-service/app/utils/Urn.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package models.utils; +package utils; import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -19,7 +19,7 @@ import play.Logger; /** - * Urn class used for urn convertion + * Urn class used for urn conversion * Created by zsun on 1/15/15. */ public class Urn {