Get owners for espresso and oracle, and fix a bug for teradata

This commit is contained in:
Na Zhang 2016-10-13 16:07:00 -07:00
commit 043dc25e89
58 changed files with 3003 additions and 1128 deletions

View File

@ -16,8 +16,6 @@ package actors;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -31,16 +29,14 @@ import metadata.etl.models.EtlJobName;
import models.daos.ClusterDao;
import models.daos.EtlJobDao;
import msgs.KafkaResponseMsg;
import msgs.KafkaCommMsg;
import play.Logger;
import play.Play;
import utils.KafkaConfig;
import utils.KafkaConfig.Topic;
import models.kafka.KafkaConfig;
import models.kafka.KafkaConfig.Topic;
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.schemas.AbstractRecord;
import wherehows.common.utils.ClusterUtil;
import wherehows.common.writers.DatabaseWriter;
/**
@ -52,8 +48,6 @@ public class KafkaConsumerMaster extends UntypedActor {
private static List<Integer> _kafkaJobList;
// map of kafka job id to configs
private static Map<Integer, KafkaConfig> _kafkaConfigs = new HashMap<>();
// map of topic name to DB writer
private static Map<String, DatabaseWriter> _topicDbWriters = new HashMap<>();
@Override
public void preStart()
@ -80,7 +74,6 @@ public class KafkaConsumerMaster extends UntypedActor {
final Map<String, Topic> kafkaTopics = kafkaConfig.getTopics();
kafkaConfig.updateTopicProcessor();
_topicDbWriters.putAll(kafkaConfig.getTopicDbWriters());
// create Kafka consumer connector
Logger.info("Create Kafka Consumer with config: " + kafkaProps.toString());
@ -111,7 +104,8 @@ public class KafkaConsumerMaster extends UntypedActor {
for (final KafkaStream<byte[], byte[]> stream : streams) {
ActorRef childActor = getContext().actorOf(
Props.create(KafkaConsumerWorker.class, topic, threadNumber, stream, schemaRegistryClient,
kafkaConfig.getProcessorClass(topic), kafkaConfig.getProcessorMethod(topic)));
kafkaConfig.getProcessorClass(topic), kafkaConfig.getProcessorMethod(topic),
kafkaConfig.getDbWriter(topic)));
childActor.tell("Start", getSelf());
threadNumber++;
@ -134,25 +128,9 @@ public class KafkaConsumerMaster extends UntypedActor {
@Override
public void onReceive(Object message)
throws Exception {
if (message instanceof KafkaResponseMsg) {
final KafkaResponseMsg kafkaMsg = (KafkaResponseMsg) message;
final String topic = kafkaMsg.getTopic();
final AbstractRecord record = kafkaMsg.getRecord();
if (record != null && _topicDbWriters.containsKey(topic)) {
Logger.debug("Writing to DB kafka event record: " + topic);
final DatabaseWriter dbWriter = _topicDbWriters.get(topic);
try {
dbWriter.append(record);
// dbWriter.close();
dbWriter.insert();
} catch (SQLException | IOException e) {
Logger.error("Error while inserting event record: " + record, e);
}
} else {
Logger.error("Unknown kafka response " + kafkaMsg);
}
if (message instanceof KafkaCommMsg) {
final KafkaCommMsg kafkaCommMsg = (KafkaCommMsg) message;
Logger.debug(kafkaCommMsg.toString());
} else {
unhandled(message);
}

View File

@ -13,8 +13,10 @@
*/
package actors;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.SQLException;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.common.serialization.Deserializer;
import kafka.consumer.ConsumerIterator;
@ -22,11 +24,12 @@ import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import akka.actor.UntypedActor;
import msgs.KafkaResponseMsg;
import msgs.KafkaCommMsg;
import play.Logger;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.kafka.serializers.KafkaAvroDeserializer;
import wherehows.common.schemas.AbstractRecord;
import wherehows.common.writers.DatabaseWriter;
/**
@ -39,16 +42,22 @@ public class KafkaConsumerWorker extends UntypedActor {
private final SchemaRegistryClient _schemaRegistryRestfulClient;
private final Object _processorClass;
private final Method _processorMethod;
private final DatabaseWriter _dbWriter;
private int _receivedRecordCount;
private int _processedRecordCount;
public KafkaConsumerWorker(String topic, int threadNumber,
KafkaStream<byte[], byte[]> stream, SchemaRegistryClient schemaRegistryRestfulClient,
Object processorClass, Method processorMethod) {
Object processorClass, Method processorMethod, DatabaseWriter dbWriter) {
this._topic = topic;
this._threadId = threadNumber;
this._kafkaStream = stream;
this._schemaRegistryRestfulClient = schemaRegistryRestfulClient;
this._processorClass = processorClass;
this._processorMethod = processorMethod;
this._dbWriter = dbWriter;
this._receivedRecordCount = 0; // number of received kafka messages
this._processedRecordCount = 0; // number of processed records
}
@Override
@ -59,23 +68,36 @@ public class KafkaConsumerWorker extends UntypedActor {
final Deserializer<Object> avroDeserializer = new KafkaAvroDeserializer(_schemaRegistryRestfulClient);
while (it.hasNext()) { // block for next input
_receivedRecordCount++;
try {
MessageAndMetadata<byte[], byte[]> msg = it.next();
GenericData.Record kafkaMsgRecord = (GenericData.Record) avroDeserializer.deserialize(_topic, msg.message());
// Logger.debug("Kafka worker ThreadId " + _threadId + " Topic " + _topic + " record: " + rec);
// invoke processor
final AbstractRecord record = (AbstractRecord) _processorMethod.invoke(
_processorClass, kafkaMsgRecord, _topic);
// send processed record to master
// save record to database
if (record != null) {
getSender().tell(new KafkaResponseMsg(record, _topic), getSelf());
_dbWriter.append(record);
// _dbWriter.close();
_dbWriter.insert();
_processedRecordCount++;
}
} catch (InvocationTargetException ite) {
Logger.error("Processing topic " + _topic + " record error: " + ite.getCause()
+ " - " + ite.getTargetException());
} catch (SQLException | IOException e) {
Logger.error("Error while inserting event record: ", e);
} catch (Throwable ex) {
Logger.error("Error in notify order. ", ex);
}
if (_receivedRecordCount % 1000 == 0) {
Logger.debug(_topic + " received " + _receivedRecordCount + " processed " + _processedRecordCount);
}
}
Logger.info("Shutting down Thread: " + _threadId + " for topic: " + _topic);
} else {

View File

@ -20,7 +20,7 @@ import java.util.Map;
import java.util.List;
import models.daos.DatasetDao;
import models.daos.UserDao;
import models.utils.Urn;
import utils.Urn;
import org.springframework.dao.EmptyResultDataAccessException;
import play.Logger;
import play.libs.Json;

View File

@ -19,7 +19,7 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import models.daos.DatasetInfoDao;
import models.utils.Urn;
import utils.Urn;
import org.springframework.dao.EmptyResultDataAccessException;
import play.Logger;
import play.libs.Json;

View File

@ -19,7 +19,7 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import models.daos.LineageDao;
import models.utils.Urn;
import utils.Urn;
import play.libs.Json;
import play.mvc.BodyParser;
import play.mvc.Controller;

View File

@ -22,7 +22,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import models.utils.Urn;
import utils.Urn;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.EmptyResultDataAccessException;
import play.Logger;
@ -57,7 +57,7 @@ public class DatasetInfoDao {
private static final String DATASET_CASE_SENSITIVE_TABLE = "dataset_case_sensitivity";
private static final String DATASET_REFERENCE_TABLE = "dataset_reference";
private static final String DATASET_PARTITION_TABLE = "dataset_partition";
private static final String DATASET_SECURITY_TABLE = "dataset_security";
private static final String DATASET_SECURITY_TABLE = "dataset_security_info";
private static final String DATASET_OWNER_TABLE = "dataset_owner";
private static final String DATASET_OWNER_UNMATCHED_TABLE = "stg_dataset_owner_unmatched";
private static final String DATASET_CONSTRAINT_TABLE = "dataset_constraint";
@ -214,20 +214,61 @@ public class DatasetInfoDao {
DatasetInventoryItemRecord.getInventoryItemColumns());
private static Object[] findIdAndUrn(JsonNode idNode, JsonNode urnNode)
private static Object[] findIdAndUrn(Integer datasetId)
throws SQLException {
final Integer datasetId;
final String urn;
if (!idNode.isMissingNode()) {
datasetId = idNode.asInt();
urn = DatasetDao.getDatasetById(datasetId).get("urn").toString();
} else {
urn = urnNode.asText();
datasetId = Integer.valueOf(DatasetDao.getDatasetByUrn(urn).get("id").toString());
}
final String urn = datasetId != null ? DatasetDao.getDatasetById(datasetId).get("urn").toString() : null;
return new Object[]{datasetId, urn};
}
private static Object[] findIdAndUrn(String urn)
throws SQLException {
final Integer datasetId = urn != null ? Integer.valueOf(DatasetDao.getDatasetByUrn(urn).get("id").toString()) : null;
return new Object[]{datasetId, urn};
}
private static Object[] findDataset(JsonNode root) {
// use dataset id to find dataset first
final JsonNode idNode = root.path("datasetId");
if (!idNode.isMissingNode() && !idNode.isNull()) {
try {
final Object[] idUrn = findIdAndUrn(idNode.asInt());
if (idUrn[0] != null && idUrn[1] != null) {
return idUrn;
}
} catch (Exception ex) {
}
}
// use dataset uri to find dataset
final JsonNode properties = root.path("datasetProperties");
if (!properties.isMissingNode() && !properties.isNull()) {
final JsonNode uri = properties.path("uri");
if (!uri.isMissingNode() && !uri.isNull()) {
try {
final Object[] idUrn = findIdAndUrn(uri.asText());
if (idUrn[0] != null && idUrn[1] != null) {
return idUrn;
}
} catch (Exception ex) {
}
}
}
// use dataset urn to find dataset
final JsonNode urnNode = root.path("urn");
if (!urnNode.isMissingNode() && !urnNode.isNull()) {
try {
final Object[] idUrn = findIdAndUrn(urnNode.asText());
if (idUrn[0] != null && idUrn[1] != null) {
return idUrn;
}
} catch (Exception ex) {
}
}
return new Object[]{null, null};
}
public static List<DatasetDeploymentRecord> getDatasetDeploymentByDatasetId(int datasetId)
throws DataAccessException {
Map<String, Object> params = new HashMap<>();
@ -262,16 +303,16 @@ public class DatasetInfoDao {
public static void updateDatasetDeployment(JsonNode root)
throws Exception {
final JsonNode idNode = root.path("datasetId");
final JsonNode urnNode = root.path("urn");
final JsonNode deployment = root.path("deploymentInfo");
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || deployment.isMissingNode() || !deployment.isArray()) {
if (deployment.isMissingNode() || !deployment.isArray()) {
throw new IllegalArgumentException(
"Dataset deployment info update fail, " + "Json missing necessary fields: " + root.toString());
"Dataset deployment info update error, missing necessary fields: " + root.toString());
}
final Object[] idUrn = findIdAndUrn(idNode, urnNode);
final Object[] idUrn = findDataset(root);
if (idUrn[0] == null || idUrn[1] == null) {
throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString());
}
final Integer datasetId = (Integer) idUrn[0];
final String urn = (String) idUrn[1];
@ -325,16 +366,16 @@ public class DatasetInfoDao {
public static void updateDatasetCapacity(JsonNode root)
throws Exception {
final JsonNode idNode = root.path("datasetId");
final JsonNode urnNode = root.path("urn");
final JsonNode capacity = root.path("capacity");
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || capacity.isMissingNode() || !capacity.isArray()) {
if (capacity.isMissingNode() || !capacity.isArray()) {
throw new IllegalArgumentException(
"Dataset capacity info update fail, " + "Json missing necessary fields: " + root.toString());
"Dataset capacity info update error, missing necessary fields: " + root.toString());
}
final Object[] idUrn = findIdAndUrn(idNode, urnNode);
final Object[] idUrn = findDataset(root);
if (idUrn[0] == null || idUrn[1] == null) {
throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString());
}
final Integer datasetId = (Integer) idUrn[0];
final String urn = (String) idUrn[1];
@ -387,16 +428,16 @@ public class DatasetInfoDao {
public static void updateDatasetTags(JsonNode root)
throws Exception {
final JsonNode idNode = root.path("datasetId");
final JsonNode urnNode = root.path("urn");
final JsonNode tags = root.path("tags");
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || tags.isMissingNode() || !tags.isArray()) {
if (tags.isMissingNode() || !tags.isArray()) {
throw new IllegalArgumentException(
"Dataset tag info update fail, " + "Json missing necessary fields: " + root.toString());
"Dataset tag info update error, missing necessary fields: " + root.toString());
}
final Object[] idUrn = findIdAndUrn(idNode, urnNode);
final Object[] idUrn = findDataset(root);
if (idUrn[0] == null || idUrn[1] == null) {
throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString());
}
final Integer datasetId = (Integer) idUrn[0];
final String urn = (String) idUrn[1];
@ -440,16 +481,21 @@ public class DatasetInfoDao {
public static void updateDatasetCaseSensitivity(JsonNode root)
throws Exception {
final JsonNode idNode = root.path("datasetId");
final JsonNode urnNode = root.path("urn");
final JsonNode caseSensitivity = root.path("caseSensitivity");
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || caseSensitivity.isMissingNode()) {
final JsonNode properties = root.path("datasetProperties");
if (properties.isMissingNode() || properties.isNull()) {
throw new IllegalArgumentException(
"Dataset case_sensitivity info update fail, " + "Json missing necessary fields: " + root.toString());
"Dataset properties update fail, missing necessary fields: " + root.toString());
}
final JsonNode caseSensitivity = properties.path("caseSensitivity");
if (caseSensitivity.isMissingNode() || caseSensitivity.isNull()) {
throw new IllegalArgumentException(
"Dataset properties update fail, missing necessary fields: " + root.toString());
}
final Object[] idUrn = findIdAndUrn(idNode, urnNode);
final Object[] idUrn = findDataset(root);
if (idUrn[0] == null || idUrn[1] == null) {
throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString());
}
final Integer datasetId = (Integer) idUrn[0];
final String urn = (String) idUrn[1];
@ -506,16 +552,16 @@ public class DatasetInfoDao {
public static void updateDatasetReference(JsonNode root)
throws Exception {
final JsonNode idNode = root.path("datasetId");
final JsonNode urnNode = root.path("urn");
final JsonNode references = root.path("references");
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || references.isMissingNode() || !references.isArray()) {
if (references.isMissingNode() || !references.isArray()) {
throw new IllegalArgumentException(
"Dataset reference info update fail, " + "Json missing necessary fields: " + root.toString());
"Dataset reference info update fail, missing necessary fields: " + root.toString());
}
final Object[] idUrn = findIdAndUrn(idNode, urnNode);
final Object[] idUrn = findDataset(root);
if (idUrn[0] == null || idUrn[1] == null) {
throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString());
}
final Integer datasetId = (Integer) idUrn[0];
final String urn = (String) idUrn[1];
@ -560,16 +606,16 @@ public class DatasetInfoDao {
public static void updateDatasetPartition(JsonNode root)
throws Exception {
final JsonNode idNode = root.path("datasetId");
final JsonNode urnNode = root.path("urn");
final JsonNode partition = root.path("partitionSpec");
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || partition.isMissingNode()) {
if (partition.isMissingNode() || partition.isNull()) {
throw new IllegalArgumentException(
"Dataset reference info update fail, " + "Json missing necessary fields: " + root.toString());
"Dataset reference info update fail, missing necessary fields: " + root.toString());
}
final Object[] idUrn = findIdAndUrn(idNode, urnNode);
final Object[] idUrn = findDataset(root);
if (idUrn[0] == null || idUrn[1] == null) {
throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString());
}
final Integer datasetId = (Integer) idUrn[0];
final String urn = (String) idUrn[1];
@ -629,16 +675,16 @@ public class DatasetInfoDao {
public static void updateDatasetSecurity(JsonNode root)
throws Exception {
final JsonNode urnNode = root.path("urn");
final JsonNode idNode = root.path("datasetId");
final JsonNode security = root.path("securitySpec");
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || security.isMissingNode()) {
if (security.isMissingNode() || security.isNull()) {
throw new IllegalArgumentException(
"Dataset security info update fail, " + "Json missing necessary fields: " + root.toString());
"Dataset security info update fail, missing necessary fields: " + root.toString());
}
final Object[] idUrn = findIdAndUrn(idNode, urnNode);
final Object[] idUrn = findDataset(root);
if (idUrn[0] == null || idUrn[1] == null) {
throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString());
}
final Integer datasetId = (Integer) idUrn[0];
final String urn = (String) idUrn[1];
@ -695,29 +741,27 @@ public class DatasetInfoDao {
public static void updateDatasetOwner(JsonNode root)
throws Exception {
final JsonNode idNode = root.path("datasetId");
final JsonNode urnNode = root.path("urn");
final JsonNode owners = root.path("owners");
if (owners.isMissingNode() || !owners.isArray()) {
throw new IllegalArgumentException(
"Dataset owner info update fail, missing necessary fields: " + root.toString());
}
final JsonNode ownerSourceNode = root.path("source");
String ownerSource = null;
if (ownerSourceNode != null && (!ownerSourceNode.isMissingNode()))
{
if (!ownerSourceNode.isNull() && !ownerSourceNode.isMissingNode()) {
ownerSource = ownerSourceNode.asText();
}
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || owners.isMissingNode() || !owners.isArray()) {
throw new IllegalArgumentException(
"Dataset owner info update fail, " + "Json missing necessary fields: " + root.toString());
}
Integer datasetId = 0;
String urn = null;
try {
final Object[] idUrn = findIdAndUrn(idNode, urnNode);
final Integer datasetId;
final String urn;
final Object[] idUrn = findDataset(root);
if (idUrn[0] == null || idUrn[1] == null) {
datasetId = 0;
urn = root.path("datasetProperties").path("uri").asText();
} else {
datasetId = (Integer) idUrn[0];
urn = (String) idUrn[1];
} catch (Exception ex) {
}
final JsonNode auditHeader = root.path("auditHeader");
@ -734,15 +778,15 @@ public class DatasetInfoDao {
record.setCreatedTime(eventTime);
record.setModifiedTime(System.currentTimeMillis() / 1000);
final String ownerString = record.getOwner();
final String ownerString = record.getOwnerUrn();
int lastIndex = ownerString.lastIndexOf(':');
if (lastIndex >= 0) {
record.setOwner(ownerString.substring(lastIndex + 1));
record.setOwnerUrn(ownerString.substring(lastIndex + 1));
record.setNamespace(ownerString.substring(0, lastIndex));
} else {
record.setNamespace("");
}
Map<String, Object> ownerInfo = getOwnerByOwnerId(record.getOwner());
Map<String, Object> ownerInfo = getOwnerByOwnerId(record.getOwnerUrn());
Integer appId = 0;
String isActive = "N";
if (ownerInfo.containsKey("app_id")) {
@ -766,78 +810,56 @@ public class DatasetInfoDao {
mergeDatasetOwners(ownerList, datasetId, urn, ownerSource);
}
public static void updateKafkaDatasetOwner(
String datasetUrn,
String owners,
String ownerSource,
Long sourceUnixTime)
throws Exception
{
if (datasetUrn == null)
{
public static void updateKafkaDatasetOwner(String datasetUrn, String owners, String ownerSource, Long sourceUnixTime)
throws Exception {
if (datasetUrn == null) {
return;
}
Integer datasetId = 0;
try
{
try {
datasetId = Integer.parseInt(DatasetDao.getDatasetByUrn(datasetUrn).get("id").toString());
}
catch(Exception e)
{
} catch (Exception e) {
Logger.error("Exception in updateKafkaDatasetOwner: " + e.getMessage());
}
if (datasetId == 0)
{
if (datasetId == 0) {
return;
}
List<DatasetOwnerRecord> ownerList = new ArrayList<DatasetOwnerRecord>();
if (owners != null)
{
List<DatasetOwnerRecord> ownerList = new ArrayList<>();
if (owners != null) {
String[] ownerArray = owners.split(",");
if (ownerArray != null && ownerArray.length > 0)
{
for(int i = 0; i < ownerArray.length; i++)
{
String ownerName = null;
String namespace = null;
String ownerIdType = null;
String isGroup = "N";
String owner = ownerArray[i];
if (owner != null)
{
int lastIndex = owner.lastIndexOf(':');
if (lastIndex != -1)
{
ownerName = owner.substring(lastIndex+1);
namespace = owner.substring(0, lastIndex);
if (namespace != null && namespace.equalsIgnoreCase("urn:li:griduser"))
{
isGroup = "Y";
ownerIdType = "GROUP";
}
else
{
ownerIdType = "PERSON";
}
for (String owner : ownerArray) {
String ownerName = null;
String namespace = null;
String ownerIdType = null;
String isGroup = "N";
if (owner != null) {
int lastIndex = owner.lastIndexOf(':');
if (lastIndex != -1) {
ownerName = owner.substring(lastIndex + 1);
namespace = owner.substring(0, lastIndex);
if (namespace != null && namespace.equalsIgnoreCase("urn:li:griduser")) {
isGroup = "Y";
ownerIdType = "GROUP";
} else {
ownerIdType = "PERSON";
}
DatasetOwnerRecord record = new DatasetOwnerRecord();
record.setDatasetId(datasetId);
record.setDatasetUrn(datasetUrn);
record.setOwnerType("Producer");
record.setOwner(ownerName);
record.setOwnerType(ownerIdType);
record.setIsGroup(isGroup);
record.setIsActive("Y");
record.setNamespace(namespace);
record.setOwnerSource(ownerSource);
record.setSourceTime(sourceUnixTime);
record.setCreatedTime(sourceUnixTime);
record.setModifiedTime(System.currentTimeMillis() / 1000);
ownerList.add(record);
}
DatasetOwnerRecord record = new DatasetOwnerRecord();
record.setDatasetId(datasetId);
record.setDatasetUrn(datasetUrn);
record.setOwnerType("Producer");
record.setOwnerUrn(ownerName);
record.setOwnerType(ownerIdType);
record.setIsGroup(isGroup);
record.setIsActive("Y");
record.setNamespace(namespace);
record.setOwnerSource(ownerSource);
record.setSourceTime(sourceUnixTime);
record.setCreatedTime(sourceUnixTime);
record.setModifiedTime(System.currentTimeMillis() / 1000);
ownerList.add(record);
}
}
}
@ -845,54 +867,46 @@ public class DatasetInfoDao {
mergeDatasetOwners(ownerList, datasetId, datasetUrn, ownerSource);
}
public static void mergeDatasetOwners(
List<DatasetOwnerRecord> newOwnerList,
Integer datasetId,
String datasetUrn,
String source)
throws Exception{
List<DatasetOwnerRecord> oldOwnerList = getDatasetOwnerByDatasetUrn(datasetUrn);
Integer sortId = 0;
Map<String, DatasetOwnerRecord> uniqueRecords = new HashMap<String, DatasetOwnerRecord>();
List<DatasetOwnerRecord> combinedList = new ArrayList<DatasetOwnerRecord>();
if (newOwnerList != null)
{
for(DatasetOwnerRecord owner: newOwnerList)
{
owner.setSortId(sortId++);
uniqueRecords.put(owner.getOwner(), owner);
combinedList.add(owner);
}
private static void mergeDatasetOwners(List<DatasetOwnerRecord> newOwnerList, Integer datasetId, String datasetUrn,
String source)
throws Exception {
List<DatasetOwnerRecord> oldOwnerList = new ArrayList<>();
try {
oldOwnerList.addAll(getDatasetOwnerByDatasetUrn(datasetUrn));
} catch (Exception ex) {
}
if (oldOwnerList != null)
{
for(DatasetOwnerRecord owner: newOwnerList)
{
DatasetOwnerRecord exist = uniqueRecords.get(owner.getOwner());
if (exist != null)
{
exist.setDbIds(owner.getDbIds());
exist.setCreatedTime(StringUtil.toLong(owner.getCreatedTime()));
Integer sortId = 0;
Map<String, DatasetOwnerRecord> uniqueRecords = new HashMap<>();
List<DatasetOwnerRecord> combinedList = new ArrayList<>();
if (newOwnerList != null) {
for (DatasetOwnerRecord owner : newOwnerList) {
owner.setSortId(sortId++);
uniqueRecords.put(owner.getOwnerUrn(), owner);
combinedList.add(owner);
}
}
// take the higher priority owner category
exist.setOwnerCategory(OwnerType.chooseOwnerType(exist.getOwnerCategory(), owner.getOwnerCategory()));
for (DatasetOwnerRecord owner : oldOwnerList) {
DatasetOwnerRecord exist = uniqueRecords.get(owner.getOwnerUrn());
if (exist != null) {
exist.setDbIds(owner.getDbIds());
exist.setCreatedTime(StringUtil.toLong(owner.getCreatedTime()));
// merge owner source as comma separated list
exist.setOwnerSource(mergeOwnerSource(exist.getOwnerSource(), owner.getOwnerSource()));
exist.setConfirmedBy(owner.getConfirmedBy());
exist.setConfirmedOn(owner.getConfirmedOn());
}
else
{
if(!(source != null && source.equalsIgnoreCase(owner.getOwnerSource())))
{
owner.setSortId(sortId++);
uniqueRecords.put(owner.getOwner(), owner);
combinedList.add(owner);
}
}
// take the higher priority owner category
exist.setOwnerCategory(OwnerType.chooseOwnerType(exist.getOwnerCategory(), owner.getOwnerCategory()));
// merge owner source as comma separated list
exist.setOwnerSource(mergeOwnerSource(exist.getOwnerSource(), owner.getOwnerSource()));
exist.setConfirmedBy(owner.getConfirmedBy());
exist.setConfirmedOn(owner.getConfirmedOn());
} else {
if (!(source != null && source.equalsIgnoreCase(owner.getOwnerSource()))) {
owner.setSortId(sortId++);
uniqueRecords.put(owner.getOwnerUrn(), owner);
combinedList.add(owner);
}
}
}
// remove old info then insert new info
@ -979,17 +993,16 @@ public class DatasetInfoDao {
public static void updateDatasetConstraint(JsonNode root)
throws Exception {
final JsonNode idNode = root.path("datasetId");
final JsonNode urnNode = root.path("urn");
final JsonNode constraints = root.path("constraints");
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || constraints.isMissingNode()
|| !constraints.isArray()) {
if (constraints.isMissingNode() || !constraints.isArray()) {
throw new IllegalArgumentException(
"Dataset constraints info update fail, " + "Json missing necessary fields: " + root.toString());
"Dataset constraints info update fail, missing necessary fields: " + root.toString());
}
final Object[] idUrn = findIdAndUrn(idNode, urnNode);
final Object[] idUrn = findDataset(root);
if (idUrn[0] == null || idUrn[1] == null) {
throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString());
}
final Integer datasetId = (Integer) idUrn[0];
final String urn = (String) idUrn[1];
@ -1042,16 +1055,16 @@ public class DatasetInfoDao {
public static void updateDatasetIndex(JsonNode root)
throws Exception {
final JsonNode idNode = root.path("datasetId");
final JsonNode urnNode = root.path("urn");
final JsonNode indices = root.path("indices");
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || indices.isMissingNode() || !indices.isArray()) {
if (indices.isMissingNode() || !indices.isArray()) {
throw new IllegalArgumentException(
"Dataset indices info update fail, " + "Json missing necessary fields: " + root.toString());
"Dataset indices info update fail, missing necessary fields: " + root.toString());
}
final Object[] idUrn = findIdAndUrn(idNode, urnNode);
final Object[] idUrn = findDataset(root);
if (idUrn[0] == null || idUrn[1] == null) {
throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString());
}
final Integer datasetId = (Integer) idUrn[0];
final String urn = (String) idUrn[1];
@ -1104,28 +1117,25 @@ public class DatasetInfoDao {
public static void updateDatasetSchema(JsonNode root)
throws Exception {
final JsonNode idNode = root.path("datasetId");
final JsonNode urnNode = root.path("urn");
final JsonNode schemas = root.path("schemas");
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || schemas.isMissingNode()) {
final JsonNode schema = root.path("schema");
if (schema.isMissingNode() || schema.isNull()) {
throw new IllegalArgumentException(
"Dataset schemas update fail, " + "Json missing necessary fields: " + root.toString());
"Dataset schema update fail, missing necessary fields: " + root.toString());
}
Integer datasetId = 0;
String urn = null;
try {
final Object[] idUrn = findIdAndUrn(idNode, urnNode);
final Object[] idUrn = findDataset(root);
if (idUrn[0] != null && idUrn[1] != null) {
datasetId = (Integer) idUrn[0];
urn = (String) idUrn[1];
} catch (Exception ex) {
urn = urnNode.asText();
} else {
urn = root.path("datasetProperties").path("uri").asText();
}
ObjectMapper om = new ObjectMapper();
DatasetSchemaInfoRecord rec = om.convertValue(schemas, DatasetSchemaInfoRecord.class);
DatasetSchemaInfoRecord rec = om.convertValue(schema, DatasetSchemaInfoRecord.class);
rec.setDatasetId(datasetId);
rec.setDatasetUrn(urn);
rec.setModifiedTime(System.currentTimeMillis() / 1000);
@ -1135,8 +1145,8 @@ public class DatasetInfoDao {
DatasetRecord record = new DatasetRecord();
record.setUrn(urn);
record.setSourceCreatedTime("" + rec.getCreateTime() / 1000);
record.setSchema(rec.getOriginalSchema());
record.setSchemaType(rec.getFormat());
record.setSchema(rec.getOriginalSchema().getText());
record.setSchemaType(rec.getOriginalSchema().getFormat());
record.setFields((String) StringUtil.objectToJsonString(rec.getFieldSchema()));
record.setSource("API");
@ -1154,8 +1164,8 @@ public class DatasetInfoDao {
// if dataset already exist in dict_dataset, update info
else {
DICT_DATASET_WRITER.execute(UPDATE_DICT_DATASET_WITH_SCHEMA_CHANGE,
new Object[]{rec.getOriginalSchema(), rec.getFormat(), StringUtil.objectToJsonString(rec.getFieldSchema()),
"API", System.currentTimeMillis() / 1000, datasetId});
new Object[]{rec.getOriginalSchema().getText(), rec.getOriginalSchema().getFormat(),
StringUtil.objectToJsonString(rec.getFieldSchema()), "API", System.currentTimeMillis() / 1000, datasetId});
}
// get old dataset fields info

View File

@ -20,7 +20,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import models.utils.Urn;
import utils.Urn;
import utils.JdbcUtil;
import wherehows.common.schemas.LineageRecord;
import wherehows.common.utils.PartitionPatternMatcher;

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package utils;
package models.kafka;
import java.util.Map;
import java.util.regex.Matcher;
@ -23,15 +23,7 @@ import wherehows.common.schemas.Record;
import wherehows.common.utils.StringUtil;
public class GobblinTrackingAuditProcessor{
/**
* Process a Gobblin tracking event audit record
* @param record
* @param topic
* @throws Exception
*/
public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
final private static String DALI_LIMITED_RETENTION_AUDITOR = "DaliLimitedRetentionAuditor";
final private static String DALI_AUTOPURGED_AUDITOR = "DaliAutoPurgeAuditor";
@ -39,27 +31,31 @@ public class GobblinTrackingAuditProcessor{
final private static String DATASET_URN_PREFIX = "hdfs://";
final private static String DATASET_OWNER_SOURCE = "IDPC";
public Record process(GenericData.Record record, String topic) throws Exception {
/**
* Process a Gobblin tracking event audit record
* @param record
* @param topic
* @return null
* @throws Exception
*/
public Record process(GenericData.Record record, String topic)
throws Exception {
if (record != null) {
String name = (String) record.get("name");
if (record != null && record.get("name") != null) {
final String name = record.get("name").toString();
// only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor"
if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) ||
name.equals(DALI_AUTOPURGED_AUDITOR) ||
name.equals(DS_IGNORE_IDPC_AUDITOR))
{
if (name.equals(DALI_LIMITED_RETENTION_AUDITOR)
|| name.equals(DALI_AUTOPURGED_AUDITOR)
|| name.equals(DS_IGNORE_IDPC_AUDITOR)) {
Long timestamp = (Long) record.get("timestamp");
Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
String hasError = metadata.get("HasError");
if (!hasError.equalsIgnoreCase("true"))
{
String datasetUrn = metadata.get("DatasetPath");
if (!hasError.equalsIgnoreCase("true")) {
String datasetPath = metadata.get("DatasetPath");
String datasetUrn = DATASET_URN_PREFIX + (datasetPath.startsWith("/") ? "" : "/") + datasetPath;
String ownerUrns = metadata.get("OwnerURNs");
DatasetInfoDao.updateKafkaDatasetOwner(
DATASET_URN_PREFIX + datasetUrn,ownerUrns,
DATASET_OWNER_SOURCE,
timestamp);
DatasetInfoDao.updateKafkaDatasetOwner(datasetUrn, ownerUrns, DATASET_OWNER_SOURCE, timestamp);
}
}
}

View File

@ -11,14 +11,12 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.kafka;
package models.kafka;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.avro.generic.GenericData;
import wherehows.common.schemas.ClusterInfo;
import wherehows.common.schemas.GobblinTrackingCompactionRecord;
import wherehows.common.schemas.Record;
import wherehows.common.utils.ClusterUtil;
@ -39,6 +37,7 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
* Process a Gobblin tracking event compaction record
* @param record
* @param topic
* @return Record
* @throws Exception
*/
@Override
@ -78,7 +77,7 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
if (name.equals("CompactionCompleted")) {
dataset = metadata.get("datasetUrn");
partitionName = metadata.get("partition");
recordCount = parseLong(metadata.get("recordCount"));
recordCount = StringUtil.parseLong(metadata.get("recordCount"));
}
// name = "CompactionRecordCounts"
else {
@ -89,8 +88,8 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
partitionName = m.group(3);
}
recordCount = parseLong(metadata.get("RegularRecordCount"));
lateRecordCount = parseLong(metadata.get("LateRecordCount"));
recordCount = StringUtil.parseLong(metadata.get("RegularRecordCount"));
lateRecordCount = StringUtil.parseLong(metadata.get("LateRecordCount"));
}
eventRecord =

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.kafka;
package models.kafka;
import java.util.Map;
import java.util.regex.Matcher;
@ -35,8 +35,9 @@ public class GobblinTrackingDistcpNgProcessor extends KafkaConsumerProcessor {
/**
* Process a Gobblin tracking event distcp_ng event record
* @param record
* @param record GenericData.Record
* @param topic
* @return Record
* @throws Exception
*/
@Override
@ -59,12 +60,12 @@ public class GobblinTrackingDistcpNgProcessor extends KafkaConsumerProcessor {
final String projectName = metadata.get("azkabanProjectName");
final String flowId = metadata.get("azkabanFlowId");
final String jobId = metadata.get("azkabanJobId");
final int execId = parseInteger(metadata.get("azkabanExecId"));
final int execId = StringUtil.parseInteger(metadata.get("azkabanExecId"));
// final String metricContextId = metadata.get("metricContextID");
// final String metricContextName = metadata.get("metricContextName");
final long upstreamTimestamp = parseLong(metadata.get("upstreamTimestamp"));
final long originTimestamp = parseLong(metadata.get("originTimestamp"));
final long upstreamTimestamp = StringUtil.parseLong(metadata.get("upstreamTimestamp"));
final long originTimestamp = StringUtil.parseLong(metadata.get("originTimestamp"));
final String sourcePath = metadata.get("SourcePath");
final String targetPath = metadata.get("TargetPath");

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.kafka;
package models.kafka;
import java.util.Map;
import java.util.regex.Matcher;
@ -45,6 +45,7 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
* Process a Gobblin tracking event lumos record
* @param record
* @param topic
* @return Record
* @throws Exception
*/
@Override
@ -86,7 +87,7 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
datacenter = datasourceColo;
}
final long recordCount = parseLong(metadata.get("recordCount"));
final long recordCount = StringUtil.parseLong(metadata.get("recordCount"));
final String partitionType = "snapshot";
final String partition = metadata.get("partition");
@ -94,18 +95,18 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
String subpartitionType = null;
String subpartitionName = null;
final long dropdate = parseLong(metadata.get("Dropdate"));
final long dropdate = StringUtil.parseLong(metadata.get("Dropdate"));
long maxDataDateEpoch3 = dropdate;
long maxDataKey = 0; // if field is null, default value 0
if (!isPartitionRegular(partition)) {
maxDataKey = parseLong(getPartitionEpoch(partition));
maxDataKey = StringUtil.parseLong(getPartitionEpoch(partition));
}
// handle name 'SnapshotPublished'
if (name.equals("SnapshotPublished")) {
partitionName = partition;
if (dropdate < 1460000000000L) {
maxDataDateEpoch3 = parseLong(getPartitionEpoch(targetDirectory));
maxDataDateEpoch3 = StringUtil.parseLong(getPartitionEpoch(targetDirectory));
}
}
// handle name 'DeltaPublished'
@ -114,7 +115,7 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
subpartitionType = "_delta";
subpartitionName = partition;
if (dropdate < 1460000000000L) {
maxDataDateEpoch3 = parseLong(getPartitionEpoch(subpartitionName));
maxDataDateEpoch3 = StringUtil.parseLong(getPartitionEpoch(subpartitionName));
}
}

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package utils;
package models.kafka;
import java.lang.reflect.Method;
import java.util.HashMap;
@ -22,6 +22,7 @@ import metadata.etl.models.EtlJobName;
import models.daos.EtlJobPropertyDao;
import org.apache.avro.generic.GenericData;
import play.Logger;
import utils.JdbcUtil;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.writers.DatabaseWriter;

View File

@ -11,10 +11,8 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.kafka;
package models.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,36 +30,11 @@ public abstract class KafkaConsumerProcessor {
/**
* Abstract method 'process' to be implemented by specific processor
* input Kafka record, process information and write to DB.
* @param record
* @param record GenericData.Record
* @param topic
* @return wherehows.common.schemas.Record
* @throws Exception
*/
public abstract Record process(GenericData.Record record, String topic) throws Exception;
/**
* Parse Long value from a String, if null or exception, return 0
* @param text String
* @return long
*/
protected long parseLong(String text) {
try {
return Long.parseLong(text);
} catch (NumberFormatException e) {
return 0;
}
}
/**
* Parse Integer value from a String, if null or exception, return 0
* @param text String
* @return int
*/
protected int parseInteger(String text) {
try {
return Integer.parseInt(text);
} catch (NumberFormatException e) {
return 0;
}
}
}

View File

@ -11,13 +11,10 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package utils;
package models.kafka;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import models.daos.DatasetInfoDao;
import org.apache.avro.generic.GenericData;
import play.Logger;
@ -26,6 +23,10 @@ import wherehows.common.schemas.Record;
public class MetadataChangeProcessor {
private final String[] CHANGE_ITEMS =
{"schema", "owners", "datasetProperties", "references", "partitionSpec", "deploymentInfo", "tags",
"constraints", "indices", "capacity", "securitySpec"};
/**
* Process a MetadataChangeEvent record
* @param record GenericData.Record
@ -36,97 +37,34 @@ public class MetadataChangeProcessor {
public Record process(GenericData.Record record, String topic)
throws Exception {
if (record != null) {
// Logger.info("Processing Metadata Change Event record. ");
Logger.debug("Processing Metadata Change Event record. ");
final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader");
final GenericData.Array<GenericData.Record> changedItems =
(GenericData.Array<GenericData.Record>) record.get("changedItems");
if (auditHeader == null || auditHeader.get("time") == null) {
Logger.info("MetadataChangeEvent without auditHeader, abort process. " + record.toString());
return null;
}
// list change items, schema first
List<String> changes = new ArrayList<>();
for (GenericData.Record item : changedItems) {
switch (item.get("changeScope").toString().toLowerCase()) { // can't be null
case "*":
Collections.addAll(changes, "schemas", "owners", "references", "partitionSpec", "deploymentInfo",
"caseSensitivity", "tags", "constraints", "indices", "capacity", "securitySpec");
break;
case "schema":
if (!changes.contains("schemas")) {
changes.add(0, "schemas"); // add to front
}
break;
case "owner":
if (!changes.contains("owners")) {
changes.add("owners");
}
break;
case "reference":
if (!changes.contains("references")) {
changes.add("references");
}
break;
case "partition":
if (!changes.contains("partitionSpec")) {
changes.add("partitionSpec");
}
break;
case "deployment":
if (!changes.contains("deploymentInfo")) {
changes.add("deploymentInfo");
}
break;
case "casesensitivity":
if (!changes.contains("caseSensitivity")) {
changes.add("caseSensitivity");
}
break;
case "tag":
if (!changes.contains("tags")) {
changes.add("tags");
}
break;
case "constraint":
if (!changes.contains("constraints")) {
changes.add("constraints");
}
break;
case "index":
if (!changes.contains("indices")) {
changes.add("indices");
}
break;
case "capacity":
if (!changes.contains("capacity")) {
changes.add("capacity");
}
break;
case "security":
if (!changes.contains("securitySpec")) {
changes.add("securitySpec");
}
break;
default:
break;
}
final GenericData.Record datasetIdentifier = (GenericData.Record) record.get("datasetIdentifier");
final GenericData.Record datasetProperties = (GenericData.Record) record.get("datasetProperties");
final String urn = String.valueOf(record.get("urn"));
if (urn == null && (datasetProperties == null || datasetProperties.get("uri") == null)
&& datasetIdentifier == null) {
Logger.info("Can't identify dataset from uri/urn/datasetIdentifier, abort process. " + record.toString());
return null;
}
Logger.debug("Updating dataset " + changedItems.toString());
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(record.toString());
final JsonNode rootNode = new ObjectMapper().readTree(record.toString());
for (String itemName : changes) {
for (String itemName : CHANGE_ITEMS) {
// check if the corresponding change field has content
if (record.get(itemName) == null) {
continue;
}
switch (itemName) {
case "schemas":
case "schema":
try {
DatasetInfoDao.updateDatasetSchema(rootNode);
} catch (Exception ex) {
@ -140,6 +78,13 @@ public class MetadataChangeProcessor {
Logger.debug("Metadata change exception: owner ", ex);
}
break;
case "datasetProperties":
try {
DatasetInfoDao.updateDatasetCaseSensitivity(rootNode);
} catch (Exception ex) {
Logger.debug("Metadata change exception: case sensitivity ", ex);
}
break;
case "references":
try {
DatasetInfoDao.updateDatasetReference(rootNode);
@ -161,13 +106,6 @@ public class MetadataChangeProcessor {
Logger.debug("Metadata change exception: deployment ", ex);
}
break;
case "caseSensitivity":
try {
DatasetInfoDao.updateDatasetCaseSensitivity(rootNode);
} catch (Exception ex) {
Logger.debug("Metadata change exception: case sensitivity ", ex);
}
break;
case "tags":
try {
DatasetInfoDao.updateDatasetTags(rootNode);

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package utils;
package models.kafka;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -20,7 +20,7 @@ import org.apache.avro.generic.GenericData;
import wherehows.common.schemas.Record;
public class MetadataInventoryProcessor {
public class MetadataInventoryProcessor extends KafkaConsumerProcessor {
/**
* Process a MetadataInventoryEvent record

View File

@ -11,12 +11,13 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.kafka;
package models.kafka;
import org.apache.avro.generic.GenericData;
import wherehows.common.schemas.MetastoreAuditRecord;
import wherehows.common.schemas.Record;
import wherehows.common.utils.ClusterUtil;
import wherehows.common.utils.StringUtil;
public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
@ -25,6 +26,7 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
* Process a Metastore Table/Partition Audit event record
* @param record
* @param topic
* @return Record
* @throws Exception
*/
@Override
@ -65,7 +67,7 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
final String eventType = String.valueOf(content.get("eventType"));
final String metastoreThriftUri = String.valueOf(content.get("metastoreThriftUri"));
final String metastoreVersion = String.valueOf(content.get("metastoreVersion"));
final String metastoreVersion = StringUtil.toStringReplaceNull(content.get("metastoreVersion"), null);
final long timestamp = (long) content.get("timestamp");
final String isSuccessful = String.valueOf(content.get("isSuccessful"));
final String isDataDeleted = String.valueOf(content.get("isDataDeleted"));
@ -74,19 +76,18 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
final GenericData.Record rec = newInfo != null ? (GenericData.Record) newInfo : (GenericData.Record) oldInfo;
final String dbName = String.valueOf(rec.get("dbName"));
final String tableName = String.valueOf(rec.get("tableName"));
final String partition = String.valueOf(rec.get("values"));
final String location = String.valueOf(rec.get("location"));
final String owner = String.valueOf(rec.get("owner"));
// set null / "null" partition to '?' for primary key
final String partition = StringUtil.toStringReplaceNull(rec.get("values"), "?");
final String location = StringUtil.toStringReplaceNull(rec.get("location"), null);
final String owner = StringUtil.toStringReplaceNull(rec.get("owner"), null);
final long createTime = (long) rec.get("createTime");
final long lastAccessTime = (long) rec.get("lastAccessTime");
eventRecord = new MetastoreAuditRecord(server, instance, appName, eventName, eventType, timestamp);
eventRecord.setEventInfo(metastoreThriftUri, metastoreVersion, isSuccessful, isDataDeleted);
// set null partition to '?' for primary key
eventRecord.setTableInfo(dbName, tableName, (partition != null ? partition : "?"),
location, owner, createTime, lastAccessTime);
eventRecord.setOldInfo(String.valueOf(oldInfo));
eventRecord.setNewInfo(String.valueOf(newInfo));
eventRecord.setTableInfo(dbName, tableName, partition, location, owner, createTime, lastAccessTime);
// eventRecord.setOldInfo(StringUtil.toStringReplaceNull(oldInfo, null));
// eventRecord.setNewInfo(StringUtil.toStringReplaceNull(newInfo, null));
}
return eventRecord;
}

View File

@ -0,0 +1,82 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package msgs;
import java.util.HashMap;
import java.util.Map;
/**
* Generic communication message between KafkaConsumerMaster and KafkaConsumerWorker
*/
public class KafkaCommMsg {
// Message type: AUDIT, AUDIT_RESPONSE, FLUSH, FLUSH_RESPONSE, HEARTBEAT, etc
private String msgType;
// Kafka topic of the worker
private String topic;
// Kafka worker thread number
private int thread;
// Message content
private Map<String, Object> content;
public KafkaCommMsg(String msgType, String topic, int thread) {
this.msgType = msgType;
this.topic = topic;
this.thread = thread;
this.content = new HashMap<>();
}
public String getMsgType() {
return msgType;
}
public void setMsgType(String msgType) {
this.msgType = msgType;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getThread() {
return thread;
}
public void setThread(int thread) {
this.thread = thread;
}
public Map<String, Object> getContent() {
return content;
}
public void setContent(Map<String, Object> content) {
this.content = content;
}
public void putContent(String key, Object value) {
this.content.put(key, value);
}
@Override
public String toString() {
return "KafkaCommMsg [type=" + msgType + ", topic=" + topic + ", thread=" + thread + ", content="
+ content.toString() + "]";
}
}

View File

@ -1,52 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package msgs;
import wherehows.common.schemas.AbstractRecord;
/**
* Message wrapper for communication between KafkaConsumerMaster and KafkaConsumerWorker
*/
public class KafkaResponseMsg {
private AbstractRecord record;
private String topic;
public KafkaResponseMsg(AbstractRecord record, String topic) {
this.record = record;
this.topic = topic;
}
public AbstractRecord getRecord() {
return record;
}
public void setRecord(AbstractRecord record) {
this.record = record;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
@Override
public String toString() {
return "KafkaResponseMsg [record=" + record + ", topic=" + topic + "]";
}
}

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package models.utils;
package utils;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@ -19,7 +19,7 @@ import play.Logger;
/**
* Urn class used for urn convertion
* Urn class used for urn conversion
* Created by zsun on 1/15/15.
*/
public class Urn {

View File

@ -107,9 +107,9 @@ subprojects {
"play_java_jdbc" : "com.typesafe.play:play-java-jdbc_2.10:2.2.4",
"play_cache" : "com.typesafe.play:play-cache_2.10:2.2.4",
"kafka" : "org.apache.kafka:kafka_2.10:0.10.0.0",
"kafka_clients" : "org.apache.kafka:kafka-clients:0.10.0.0",
"confluent_common_cfg" : "io.confluent:common-config:3.0.0"
"kafka" : "org.apache.kafka:kafka_2.10:0.10.0.1",
"kafka_clients" : "org.apache.kafka:kafka-clients:0.10.0.1",
"confluent_common_cfg" : "io.confluent:common-config:3.0.1"
]
task buildWithWarning(type: JavaCompile, dependsOn: build) {

View File

@ -17,10 +17,11 @@ CREATE TABLE dataset_deployment (
`dataset_id` INT UNSIGNED NOT NULL,
`dataset_urn` VARCHAR(200) NOT NULL,
`deployment_tier` VARCHAR(20) NOT NULL,
`datacenter` VARCHAR(20) NOT NULL,
`datacenter` VARCHAR(20) DEFAULT NULL,
`region` VARCHAR(50) DEFAULT NULL,
`zone` VARCHAR(50) DEFAULT NULL,
`cluster` VARCHAR(100) DEFAULT NULL,
`container` VARCHAR(100) DEFAULT NULL,
`enabled` BOOLEAN NOT NULL,
`additional_info` TEXT CHAR SET utf8 DEFAULT NULL,
`modified_time` INT UNSIGNED DEFAULT NULL
@ -104,22 +105,27 @@ CREATE TABLE dataset_partition (
ENGINE = InnoDB
DEFAULT CHARSET = latin1;
CREATE TABLE dataset_security (
`dataset_id` INT UNSIGNED NOT NULL,
`dataset_urn` VARCHAR(200) NOT NULL,
`classification` VARCHAR(200) DEFAULT NULL,
`record_owner_type` VARCHAR(20) DEFAULT NULL,
`record_owner` VARCHAR(200) DEFAULT NULL,
`compliance_type` VARCHAR(30) DEFAULT NULL,
`retention_policy` VARCHAR(200) DEFAULT NULL,
`geographic_affinity` VARCHAR(200) DEFAULT NULL,
`modified_time` INT UNSIGNED DEFAULT NULL
CREATE TABLE `dataset_security_info` (
`dataset_id` INT(10) UNSIGNED NOT NULL,
`dataset_urn` VARCHAR(200) NOT NULL,
`classification` VARCHAR(500) DEFAULT NULL
COMMENT 'JSON: confidential fields',
`retention_policy` VARCHAR(200) DEFAULT NULL
COMMENT 'JSON: specification of retention',
`geographic_affinity` VARCHAR(200) DEFAULT NULL
COMMENT 'JSON: must be stored in the geo region',
`record_owner_type` VARCHAR(50) DEFAULT NULL
COMMENT 'MEMBER,CUSTOMER,INTERNAL,COMPANY,GROUP',
`compliance_purge_type` VARCHAR(30) DEFAULT NULL
COMMENT 'AUTO_PURGE,CUSTOM_PURGE,LIMITED_RETENTION,PURGE_NOT_APPLICABLE',
`compliance_purge_entities` VARCHAR(200) DEFAULT NULL,
`modified_time` INT(10) UNSIGNED DEFAULT NULL
COMMENT 'the modified time in epoch',
PRIMARY KEY (`dataset_id`),
UNIQUE KEY (`dataset_urn`)
UNIQUE KEY `dataset_urn` (`dataset_urn`)
)
ENGINE = InnoDB
DEFAULT CHARSET = latin1;
DEFAULT CHARSET = utf8;
CREATE TABLE dataset_constraint (
`dataset_id` INT UNSIGNED NOT NULL,
@ -163,11 +169,7 @@ CREATE TABLE dataset_schema_info (
`version` VARCHAR(20) DEFAULT NULL,
`name` VARCHAR(100) DEFAULT NULL,
`description` TEXT CHAR SET utf8 DEFAULT NULL,
`format` VARCHAR(20) DEFAULT NULL,
`original_schema` TEXT DEFAULT NULL,
`original_schema_checksum` VARCHAR(100) DEFAULT NULL,
`key_schema_type` VARCHAR(20) DEFAULT NULL,
`key_schema_format` VARCHAR(100) DEFAULT NULL,
`original_schema` MEDIUMTEXT CHAR SET utf8 DEFAULT NULL,
`key_schema` MEDIUMTEXT CHAR SET utf8 DEFAULT NULL,
`is_field_name_case_sensitive` BOOLEAN DEFAULT NULL,
`field_schema` MEDIUMTEXT CHAR SET utf8 DEFAULT NULL,

View File

@ -95,8 +95,8 @@ CREATE TABLE flow_job (
job_type_id SMALLINT COMMENT 'type id of the job',
job_type VARCHAR(63) COMMENT 'type of the job',
ref_flow_id INT UNSIGNED NULL COMMENT 'the reference flow id of the job if the job is a subflow',
pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job',
post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job',
pre_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run before this job',
post_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run after this job',
is_current CHAR(1) COMMENT 'determine if it is a current job',
is_first CHAR(1) COMMENT 'determine if it is the first job',
is_last CHAR(1) COMMENT 'determine if it is the last job',
@ -126,8 +126,8 @@ CREATE TABLE stg_flow_job (
job_type VARCHAR(63) COMMENT 'type of the job',
ref_flow_id INT UNSIGNED NULL COMMENT 'the reference flow id of the job if the job is a subflow',
ref_flow_path VARCHAR(1024) COMMENT 'the reference flow path of the job if the job is a subflow',
pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job',
post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job',
pre_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run before this job',
post_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run after this job',
is_current CHAR(1) COMMENT 'determine if it is a current job',
is_first CHAR(1) COMMENT 'determine if it is the first job',
is_last CHAR(1) COMMENT 'determine if it is the last job',
@ -366,6 +366,7 @@ CREATE TABLE flow_schedule (
COMMENT 'flow id',
unit VARCHAR(31) COMMENT 'unit of time',
frequency INT COMMENT 'frequency of the unit',
cron_expression VARCHAR(127) COMMENT 'cron expression',
is_active CHAR(1) COMMENT 'determine if it is an active schedule',
included_instances VARCHAR(127) COMMENT 'included instance',
excluded_instances VARCHAR(127) COMMENT 'excluded instance',
@ -389,6 +390,7 @@ CREATE TABLE stg_flow_schedule (
flow_path VARCHAR(1024) COMMENT 'flow path from top level',
unit VARCHAR(31) COMMENT 'unit of time',
frequency INT COMMENT 'frequency of the unit',
cron_expression VARCHAR(127) COMMENT 'cron expression',
included_instances VARCHAR(127) COMMENT 'included instance',
excluded_instances VARCHAR(127) COMMENT 'excluded instance',
effective_start_time INT UNSIGNED COMMENT 'effective start time of the flow execution',
@ -438,7 +440,7 @@ CREATE TABLE stg_flow_owner_permission (
DEFAULT CHARSET = utf8
COMMENT = 'Scheduler owner table' PARTITION BY HASH (app_id) PARTITIONS 8;
CREATE TABLE job_execution_ext_reference (
CREATE TABLE job_execution_ext_reference (
app_id smallint(5) UNSIGNED COMMENT 'application id of the flow' NOT NULL,
job_exec_id bigint(20) UNSIGNED COMMENT 'job execution id either inherit or generated' NOT NULL,
attempt_id smallint(6) COMMENT 'job execution attempt id' DEFAULT '0',
@ -463,11 +465,11 @@ PARTITION BY HASH(app_id)
PARTITION p7)
;
CREATE INDEX idx_job_execution_ext_ref__ext_ref_id USING BTREE
CREATE INDEX idx_job_execution_ext_ref__ext_ref_id USING BTREE
ON job_execution_ext_reference(ext_ref_id);
CREATE TABLE stg_job_execution_ext_reference (
CREATE TABLE stg_job_execution_ext_reference (
app_id smallint(5) UNSIGNED COMMENT 'application id of the flow' NOT NULL,
job_exec_id bigint(20) UNSIGNED COMMENT 'job execution id either inherit or generated' NOT NULL,
attempt_id smallint(6) COMMENT 'job execution attempt id' DEFAULT '0',

View File

@ -94,3 +94,18 @@ CREATE TABLE `stg_repo_owner` (
`paths` TEXT CHAR SET utf8 DEFAULT NULL COMMENT 'covered paths by this acl',
PRIMARY KEY (`scm_repo_fullname`, `scm_type`, `owner_type`, `owner_name`, `app_id`)
) ENGINE = InnoDB DEFAULT CHARSET = latin1;
CREATE TABLE stg_database_scm_map (
`database_name` VARCHAR(100) COMMENT 'database name',
`database_type` VARCHAR(50) COMMENT 'database type',
`app_name` VARCHAR(127) COMMENT 'the name of application',
`scm_type` VARCHAR(50) COMMENT 'scm type',
`scm_url` VARCHAR(127) COMMENT 'scm url',
`committers` VARCHAR(500) COMMENT 'committers',
`filepath` VARCHAR(200) COMMENT 'filepath',
`app_id` INT COMMENT 'application id of the namesapce',
`wh_etl_exec_id` BIGINT COMMENT 'wherehows etl execution id that modified this record',
PRIMARY KEY (`database_type`,`database_name`,`scm_type`,`app_name`)
) ENGINE = InnoDB DEFAULT CHARSET = latin1;

View File

@ -64,3 +64,15 @@ CREATE TABLE favorites (
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE user_login_history (
log_id INT(11) AUTO_INCREMENT NOT NULL,
username VARCHAR(20) NOT NULL,
authentication_type VARCHAR(20) NOT NULL,
`status` VARCHAR(20) NOT NULL,
message TEXT DEFAULT NULL,
login_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (log_id)
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8;

View File

@ -24,6 +24,7 @@ source ETL_DDL/metric_metadata.sql;
source ETL_DDL/owner_metadata.sql;
source ETL_DDL/patterns.sql;
source ETL_DDL/kafka_tracking.sql;
source ETL_DDL/dataset_info_metadata.sql;
source WEB_DDL/track.sql;
source WEB_DDL/users.sql;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,216 @@
{
"type": "record",
"name": "MetadataInventoryEvent",
"namespace": "com.linkedin.events.wherehows",
"fields": [
{
"name": "auditHeader",
"type": {
"type": "record",
"name": "KafkaAuditHeader",
"namespace": "com.linkedin.events",
"fields": [
{
"name": "time",
"type": "long",
"doc": "The time at which the event was emitted into Kafka.",
"logicalType": "timestamp-millis"
},
{
"name": "server",
"type": "string",
"doc": "The fully-qualified name of the host of this event."
},
{
"name": "instance",
"type": [
"null",
"string"
],
"doc": "The name of the application instance on the server. e.g. i002,stg05,group003"
},
{
"name": "appName",
"type": "string",
"doc": "The name of the application/service from which the event is emitted."
},
{
"name": "messageId",
"type": {
"type": "fixed",
"name": "UUID",
"namespace": "com.linkedin.events",
"size": 16
},
"doc": "A unique identifier for the message."
}
]
}
},
{
"name": "dataPlatformUrn",
"type": "string",
"doc": "The platform or type of the metadata object: espresso,kafka,oracle,voldemort,hdfs,hive,dalids,teradata,... for example, urn:li:dataPlatform:espresso, urn:li:dataPlatform:dalids"
},
{
"name": "datasetList",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "DatasetIventoryItem",
"fields": [
{
"name": "nativeName",
"type": "string",
"doc": "The native name: <db>.<table>, /dir/subdir/<name>, or <name>"
},
{
"name": "dataOrigin",
"type": {
"type": "enum",
"name": "DeploymentTier",
"symbols": [
"PROD",
"CORP",
"GRID",
"PREPROD",
"CANARY",
"DMZ",
"STG",
"UAT",
"UAT1",
"UAT2",
"UAT3",
"QA",
"QA1",
"QA2",
"QA3",
"EI",
"EI1",
"EI2",
"EI3",
"QEI",
"QEI1",
"QEI2",
"QEI3",
"TEST",
"LIT",
"SIT",
"INT",
"DEV",
"LOCAL",
"ARCHIVE",
"DROPBOX",
"SANDBOX",
"POC"
]
},
"doc": "Origin/Source Tier where the record is generated? This can be different from Deployment. For example, PROD data can be copied to a TEST server, then DataOrigin=PROD while the dataset instance belongs to TEST",
"default": "PROD"
},
{
"name": "datasetProperties",
"type": [
"null",
{
"type": "record",
"name": "DatasetProperties",
"fields": [
{
"name": "changeAuditStamp",
"type": {
"type": "record",
"name": "ChangeAuditStamp",
"fields": [
{
"name": "actorUrn",
"type": "string",
"doc": "urn:li:corpuser:jsmith, urn:li:team:xyz, urn:li:service:money"
},
{
"name": "type",
"type": "string",
"doc": "CREATE, UPDATE, DELETE"
},
{
"name": "time",
"type": "long",
"doc": "Epoch",
"logicalType": "timestamp-millis"
},
{
"name": "note",
"type": "string",
"doc": "Extra detail about the changes"
}
]
}
},
{
"name": "nativeType",
"type": {
"type": "enum",
"name": "PlatformNativeType",
"symbols": [
"TABLE",
"VIEW",
"DIRECTORY",
"FILE",
"INDEX",
"STREAM",
"BLOB",
"FUNCTION",
"OTHER"
]
},
"doc": "The native type about how the dataset is stored in the platform"
},
{
"name": "uri",
"type": [
"string",
"null"
],
"doc": "The abstracted such as hdfs:///data/tracking/PageViewEvent, file:///dir/file_name. This is often used in codes and scripts."
},
{
"name": "caseSensitivity",
"type": [
"null",
{
"type": "record",
"name": "CaseSensitivityInfo",
"fields": [
{
"name": "datasetName",
"type": "boolean",
"doc": "Is native object name CS?",
"default": true
},
{
"name": "fieldName",
"type": "boolean",
"doc": "Is field name CS?",
"default": true
},
{
"name": "dataContent",
"type": "boolean",
"doc": "Is data content CS?",
"default": true
}
]
}
]
}
]
}
]
}
]
}
}
}
]
}

View File

@ -0,0 +1,56 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.git;
import java.io.InputStream;
import java.util.Properties;
import metadata.etl.EtlJob;
public class CodeSearchMetadataEtl extends EtlJob {
@Deprecated
public CodeSearchMetadataEtl(int appId, long whExecId) {
super(appId, null, whExecId);
}
public CodeSearchMetadataEtl(int appId, long whExecId, Properties prop) {
super(appId, null, whExecId, prop);
}
@Override
public void extract()
throws Exception {
logger.info("In Code Search metadata ETL, launch extract jython scripts");
InputStream inputStream = classLoader.getResourceAsStream("jython/CodeSearchExtract.py");
// logger.info("call scripts with args: " + interpreter.getSystemState().argv);
interpreter.execfile(inputStream);
inputStream.close();
}
@Override
public void transform()
throws Exception {
}
@Override
public void load()
throws Exception {
logger.info("In Code Search metadata ETL, launch load jython scripts");
InputStream inputStream = classLoader.getResourceAsStream("jython/CodeSearchLoad.py");
interpreter.execfile(inputStream);
inputStream.close();
}
}

View File

@ -28,6 +28,7 @@ import metadata.etl.ldap.LdapEtl;
import metadata.etl.scheduler.azkaban.AzkabanExecEtl;
import metadata.etl.scheduler.oozie.OozieExecEtl;
import metadata.etl.models.EtlJobName;
import metadata.etl.git.CodeSearchMetadataEtl;
/**
@ -63,6 +64,8 @@ public class EtlJobFactory {
return new OracleMetadataEtl(refId, whExecId, properties);
case PRODUCT_REPO_METADATA_ETL:
return new MultiproductMetadataEtl(refId, whExecId, properties);
case DATABASE_SCM_METADATA_ETL:
return new CodeSearchMetadataEtl(refId, whExecId, properties);
default:
throw new UnsupportedOperationException("Unsupported job type: " + etlJobName);
}

View File

@ -31,6 +31,7 @@ public enum EtlJobName {
ORACLE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB),
PRODUCT_REPO_METADATA_ETL(EtlType.OPERATION, RefIdType.APP),
KAFKA_CONSUMER_ETL(EtlType.OPERATION, RefIdType.DB),
DATABASE_SCM_METADATA_ETL(EtlType.OPERATION, RefIdType.APP),
;
EtlType etlType;

View File

@ -224,13 +224,18 @@ class AzkabanExtract:
# print json.dumps(row[json_column], indent=4)
if row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["isRecurring"] == 'true':
unit = row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"][-1:]
unit = self._period_unit_table[unit]
frequency = int(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"][:-1])
unit, frequency, cron_expr = None, None, None
period = row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"]
if period is not None and period != "null" and period[-1:] in self._period_unit_table:
unit = self._period_unit_table[period[-1:]]
frequency = int(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"][:-1])
if "cronExpression" in row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]:
cron_expr = row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["cronExpression"]
schedule_record = AzkabanFlowScheduleRecord(self.app_id,
row[json_column]["actions"][0]["actionJson"]["projectName"] + ':' + row[json_column]["actions"][0]["actionJson"]["flowName"],
unit,
frequency,
cron_expr,
long(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["firstCheckTime"]) / 1000,
int(time.mktime(datetime.date(2099,12,31).timetuple())),
'0',

View File

@ -0,0 +1,190 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
import sys,os,re
import requests
import subprocess
from wherehows.common import Constant
from wherehows.common.schemas import SCMOwnerRecord
from wherehows.common.writers import FileWriter
from org.slf4j import LoggerFactory
class CodeSearchExtract:
"""
Lists all repos for oracle & espresso databases. Since this feature is not
available through the UI, we need to use http://go/codesearch to discover
the multiproduct repos that use 'li-db' plugin.
"""
# verbose = False
limit_search_result = 500
# limit_multiproduct = None
# limit_plugin = None
def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.base_url = args[Constant.BASE_URL_KEY]
self.code_search_committer_writer = FileWriter(args[Constant.DATABASE_SCM_REPO_OUTPUT_KEY])
def run(self):
offset_min = 1
offset_max = 100
databases = []
search_request = \
{"request":
{
"other":{"CurrentResult":str(offset_min),"requestTimeout":"200000000"},
"queryContext":{"numToScore":1000,"docDataSet":"results","rawQuery":"type:gradle plugin:*'li-db'"},
"paginationContext":{"numToReturn":offset_max}
}
}
while True:
resp = requests.post(self.base_url + '/galene-codesearch?action=search',
json=search_request,
verify=False)
if resp.status_code != 200:
# This means something went wrong.
d = resp.json()
self.logger.info("Request Error! Stack trace {}".format(d['stackTrace']))
# raise Exception('Request Error', 'POST /galene-codesearch?action=search %s' % (resp.status_code))
break
result = resp.json()['value']
self.logger.debug("Pagination offset = {}".format(result['total']))
for element in result['elements']:
fpath = element['docData']['filepath']
ri = fpath.rindex('/')
prop_file = fpath[:ri] + '/database.properties'
# e.g. identity-mt/database/Identity/database.properties
# network/database/externmembermap/database.properties
# cap-backend/database/campaigns-db/database.properties
databases.append( {'filepath': prop_file, 'app_name': element['docData']['mp']} )
if result['total'] < 100:
break
offset_min += int(result['total'])
offset_max += 100 # if result['total'] < 100 else result['total']
search_request['request']['other']['CurrentResult'] = str(offset_min)
search_request['request']['paginationContext']['numToReturn'] = offset_max
self.logger.debug("Property file path {}".format(search_request))
self.logger.debug(" length of databases is {}".format(len(databases)))
owner_count = 0
committers_count = 0
for db in databases:
prop_file = db['filepath']
file_request = \
{"request":{
"other":{"filepath":prop_file,
"TextTokenize":"True",
"CurrentResult":"1",
"requestTimeout":"2000000000"
},
"queryContext":{"numToScore":10,"docDataSet":"result"},
"paginationContext":{"numToReturn":1}
}
}
resp = requests.post(self.base_url + '/galene-codesearch?action=search',
json=file_request,
verify=False)
if resp.status_code != 200:
# This means something went wrong.
d = resp.json()
self.logger.info("Request Error! Stack trace {}".format(d['stackTrace']))
continue
result = resp.json()['value']
if result['total'] < 1:
self.logger.info("Nothing found for {}".format(prop_file))
continue
if "repoUrl" in result['elements'][0]['docData']:
db['scm_url'] = result['elements'][0]['docData']['repoUrl']
db['scm_type'] = result['elements'][0]['docData']['repotype']
db['committers'] = ''
if db['scm_type'] == 'SVN':
schema_in_repo = re.sub(r"http://(\w+)\.([\w\.\-/].*)database.properties\?view=markup",
"http://svn." + r"\2" + "schema", db['scm_url'])
db['committers'] = self.get_svn_committers(schema_in_repo)
committers_count +=1
self.logger.info("Committers for {} => {}".format(schema_in_repo,db['committers']))
else:
self.logger.info("Search request {}".format(prop_file))
code = result['elements'][0]['docData']['code']
code_dict = dict(line.split("=", 1) for line in code.strip().splitlines())
if "database.name" in code_dict:
db['database_name'] = code_dict['database.name']
if "database.type" in code_dict:
db['database_type'] = code_dict['database.type']
owner_record = SCMOwnerRecord(
db['scm_url'],
db['database_name'],
db['database_type'],
db['app_name'],
db['filepath'],
db['committers'],
db['scm_type']
)
owner_count += 1
self.code_search_committer_writer.append(owner_record)
self.code_search_committer_writer.close()
self.logger.info('Finish Fetching committers, total {} committers entries'.format(committers_count))
self.logger.info('Finish Fetching SVN owners, total {} records'.format(owner_count))
def get_svn_committers(self, svn_repo_path):
"""Collect recent committers from the cmd
svn log %s | grep '^\(A=\|r[0-9]* \)' | head -10
e.g.
r1617887 | htang | 2016-09-21 14:27:40 -0700 (Wed, 21 Sep 2016) | 12 lines
A=shanda,pravi
r1600397 | llu | 2016-08-08 17:14:22 -0700 (Mon, 08 Aug 2016) | 3 lines
A=rramakri,htang
"""
#svn_cmd = """svn log %s | grep '^\(A=\|r[0-9]* \)' | head -10"""
committers = []
possible_svn_paths = [svn_repo_path, svn_repo_path + "ta"]
for svn_repo_path in possible_svn_paths:
p = subprocess.Popen('svn log ' + svn_repo_path + " |grep '^\(A=\|r[0-9]* \)' |head -10",
shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
svn_log_output, svn_log_err = p.communicate()
if svn_log_err[:12] == 'svn: E160013':
continue # try the next possible path
for line in svn_log_output.split('\n'):
if re.match(r"r[0-9]+", line):
committer = line.split('|')[1].strip()
if committer not in committers:
committers.append(committer)
elif line[:2] == 'A=':
for apvr in line[2:].split(','):
if apvr not in committers:
committers.append(apvr)
if len(committers) > 0:
self.logger.debug(" {}, ' => ', {}".format(svn_repo_path,committers))
break
return ','.join(committers)
if __name__ == "__main__":
args = sys.argv[1]
e = CodeSearchExtract()
e.run()

View File

@ -0,0 +1,153 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
from com.ziclix.python.sql import zxJDBC
from wherehows.common import Constant
from org.slf4j import LoggerFactory
import sys, os, datetime
class CodeSearchLoad:
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
username = args[Constant.WH_DB_USERNAME_KEY]
password = args[Constant.WH_DB_PASSWORD_KEY]
JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY]
JDBC_URL = args[Constant.WH_DB_URL_KEY]
self.database_scm_repo_file = args[Constant.DATABASE_SCM_REPO_OUTPUT_KEY]
self.app_id = args[Constant.APP_ID_KEY]
self.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
self.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
self.conn_cursor = self.conn_mysql.cursor()
if Constant.INNODB_LOCK_WAIT_TIMEOUT in args:
lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT]
self.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time)
self.logger.info("Load Code Search CSV into {}, app_id {}, wh_exec_id {}"
.format(JDBC_URL, self.app_id, self.wh_etl_exec_id))
def load_database_scm_repo(self):
load_database_scm_repos_cmd = '''
DELETE FROM stg_database_scm_map WHERE app_id = {app_id};
-- load into stg table
LOAD DATA LOCAL INFILE '{source_file}'
INTO TABLE stg_database_scm_map
FIELDS TERMINATED BY '\Z' ESCAPED BY '\0'
LINES TERMINATED BY '\n'
(`scm_url`, `database_name`, `database_type`, `app_name`, `filepath`, `committers`, `scm_type`)
'''.format(source_file=self.database_scm_repo_file, app_id=self.app_id)
self.executeCommands(load_database_scm_repos_cmd)
self.logger.info("finish loading SCM metadata.")
def merge_repo_owners_into_dataset_owners(self):
merge_repo_owners_into_dataset_owners_cmd = '''
UPDATE stg_database_scm_map stg
SET stg.app_id = {app_id};
UPDATE stg_database_scm_map stg
SET stg.wh_etl_exec_id = {wh_etl_exec_id};
-- find owner app_id, 300 for USER, 301 for GROUP
UPDATE stg_database_scm_map stg
JOIN (select app_id, user_id from dir_external_user_info) ldap
ON FIND_IN_SET(ldap.user_id,stg.committers)
SET stg.app_id = ldap.app_id;
UPDATE stg_database_scm_map stg
JOIN (select distinct app_id, group_id from dir_external_group_user_map) ldap
ON FIND_IN_SET(ldap.group_id,stg.committers)
SET stg.app_id = ldap.app_id;
-- INSERT/UPDATE into dataset_owner
INSERT INTO dataset_owner (
dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, owner_id_type,
owner_source, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id
)
SELECT * FROM (
SELECT ds.id, ds.urn, u.user_id n_owner_id, '0' n_sort_id,
'urn:li:corpuser' n_namespace, r.app_id,
'Owner' n_owner_type,
null n_owner_sub_type,
case when r.app_id = 300 then 'USER' when r.app_id = 301 then 'GROUP' else null end n_owner_id_type,
'SCM' n_owner_source, null db_ids,
IF(r.app_id = 301, 'Y', 'N') is_group,
'Y' is_active, 0 source_time, unix_timestamp(NOW()) created_time, r.wh_etl_exec_id
FROM dict_dataset ds
JOIN stg_database_scm_map r
ON ds.urn LIKE concat(r.database_type, ':///', r.database_name,'/%')
JOIN dir_external_user_info u
ON FIND_IN_SET(u.user_id,r.committers)
) n
ON DUPLICATE KEY UPDATE
dataset_urn = n.urn,
sort_id = COALESCE(n.n_sort_id, sort_id),
owner_type = CASE WHEN n.n_owner_type IS NULL OR owner_type >= n.n_owner_type
THEN owner_type ELSE n.n_owner_type END,
owner_sub_type = COALESCE(owner_sub_type, n.n_owner_sub_type),
owner_id_type = COALESCE(owner_id_type, n.n_owner_id_type),
owner_source = CASE WHEN owner_source is null THEN 'SCM'
WHEN owner_source LIKE '%SCM%' THEN owner_source ELSE CONCAT(owner_source, ',SCM') END,
namespace = COALESCE(namespace, n.n_namespace),
wh_etl_exec_id = n.wh_etl_exec_id,
modified_time = unix_timestamp(NOW());
-- reset dataset owner sort id
UPDATE dataset_owner d
JOIN (
select dataset_urn, dataset_id, owner_type, owner_id, sort_id,
@owner_rank := IF(@current_dataset_id = dataset_id, @owner_rank + 1, 0) rank,
@current_dataset_id := dataset_id
from dataset_owner, (select @current_dataset_id := 0, @owner_rank := 0) t
where dataset_urn like 'espresso:///%' or dataset_urn like 'oracle:///%'
order by dataset_id asc, owner_type desc, sort_id asc, owner_id asc
) s
ON d.dataset_id = s.dataset_id AND d.owner_id = s.owner_id
SET d.sort_id = s.rank;
'''.format(app_id=self.app_id,wh_etl_exec_id = self.wh_etl_exec_id)
self.executeCommands(merge_repo_owners_into_dataset_owners_cmd)
self.logger.info("finish merging repo and dataset owners")
def executeCommands(self, commands):
for cmd in commands.split(";"):
self.logger.debug(cmd)
self.conn_cursor.execute(cmd)
self.conn_mysql.commit()
def run(self):
try:
begin = datetime.datetime.now().strftime("%H:%M:%S")
self.load_database_scm_repo()
self.merge_repo_owners_into_dataset_owners()
end = datetime.datetime.now().strftime("%H:%M:%S")
self.logger.info("Load Code Search metadata [%s -> %s]" % (str(begin), str(end)))
finally:
self.conn_cursor.close()
self.conn_mysql.close()
if __name__ == "__main__":
args = sys.argv[1]
l = CodeSearchLoad(args)
l.run()

View File

@ -129,8 +129,13 @@ class MultiproductLoad:
ON DUPLICATE KEY UPDATE
dataset_urn = n.urn,
sort_id = COALESCE(n.n_sort_id, sort_id),
owner_type = CASE WHEN n.n_owner_type IS NULL OR owner_type >= n.n_owner_type
THEN owner_type ELSE n.n_owner_type END,
-- the Owner_type precedence (from high to low) is: OWNER, PRODUCER, DELEGATE, STAKEHOLDER
owner_type = CASE WHEN (
case owner_type when 'OWNER' then 20 when 'PRODUCER' then 40 when 'DELEGATE' then 60 when 'STACKHOLDER' then 80 else 100 end
) <= (
case n.n_owner_type when 'OWNER' then 20 when 'PRODUCER' then 40 when 'DELEGATE' then 60 when 'STACKHOLDER' then 80 else 100 end
)
THEN owner_type ELSE n.n_owner_type END,
owner_sub_type = COALESCE(owner_sub_type, n.n_owner_sub_type),
owner_id_type = COALESCE(owner_id_type, n.n_owner_id_type),
owner_source = CASE WHEN owner_source is null THEN 'SCM'

View File

@ -163,6 +163,7 @@ class OozieExtract:
row['app_path'],
row['time_unit'],
int(row['frequency']),
None,
row['start_time'],
row['end_time'],
row['ref_id'],

View File

@ -136,15 +136,16 @@ class SchedulerLoad:
self.wh_con.commit()
cmd = """
INSERT INTO flow_schedule (app_id, flow_id, unit, frequency, included_instances, excluded_instances, effective_start_time, effective_end_time, is_active, ref_id,
INSERT INTO flow_schedule (app_id, flow_id, unit, frequency, cron_expression, included_instances, excluded_instances, effective_start_time, effective_end_time, is_active, ref_id,
created_time, modified_time, wh_etl_exec_id)
SELECT app_id, flow_id, unit, frequency, included_instances, excluded_instances, effective_start_time, effective_end_time, 'Y', ref_id,
SELECT app_id, flow_id, unit, frequency, cron_expression, included_instances, excluded_instances, effective_start_time, effective_end_time, 'Y', ref_id,
unix_timestamp(NOW()) created_time, NULL modified_time, wh_etl_exec_id
FROM stg_flow_schedule s
WHERE s.app_id = {app_id} AND s.flow_id IS NOT NULL
ON DUPLICATE KEY UPDATE
unit = s.unit,
frequency = s.frequency,
cron_expression = s.cron_expression,
is_active = 'Y',
ref_id = s.ref_id,
included_instances = s.included_instances,

View File

@ -33,7 +33,7 @@ class SchedulerTransform:
"owners": {"columns": "app_id, flow_path, owner_id, wh_etl_exec_id",
"file": "owner.csv",
"table": "stg_flow_owner_permission"},
"schedules": {"columns": "app_id, flow_path, unit, frequency, effective_start_time, effective_end_time, ref_id, wh_etl_exec_id",
"schedules": {"columns": "app_id, flow_path, unit, frequency, cron_expression, effective_start_time, effective_end_time, ref_id, wh_etl_exec_id",
"file": "schedule.csv",
"table": "stg_flow_schedule"},
"flow_execs": {"columns": "app_id, flow_name, flow_path, flow_exec_uuid, source_version, flow_exec_status, attempt_id, executed_by, start_time, end_time, wh_etl_exec_id",

View File

@ -20,6 +20,7 @@ from wherehows.common.schemas import SampleDataRecord
from wherehows.common.writers import FileWriter
from wherehows.common import Constant
from org.slf4j import LoggerFactory
from distutils.util import strtobool
class TeradataExtract:
@ -505,6 +506,7 @@ class TeradataExtract:
scaned_dict = {} # a cache of {name : {urn : _, data : _}} to avoid repeat computing
if sample:
self.logger.info("Start collecting sample data.")
open(sample_output_file, 'wb')
os.chmod(sample_output_file, 0666)
sample_file_writer = FileWriter(sample_output_file)
@ -549,7 +551,15 @@ if __name__ == "__main__":
e.conn_td = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
do_sample = True
if Constant.TD_LOAD_SAMPLE in args:
do_sample = bool(args[Constant.TD_LOAD_SAMPLE])
do_sample = strtobool(args[Constant.TD_LOAD_SAMPLE])
# if value error from strtobool, do_sample remains as default value which is True
if do_sample:
if datetime.datetime.now().strftime('%a') in args[Constant.TD_COLLECT_SAMPLE_DATA_DAYS]:
do_sample = True
else:
do_sample = False
try:
e.conn_td.cursor().execute(
"SET QUERY_BAND = 'script=%s; pid=%d; ' FOR SESSION;" % ('TeradataExtract.py', os.getpid()))

View File

@ -0,0 +1,51 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.git;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
public class CodeSearchMetadataEtlTest {
CodeSearchMetadataEtl _etl;
@BeforeTest
public void setUp()
throws Exception {
_etl = new CodeSearchMetadataEtl(800, 0L);
}
@Test
public void extractTest()
throws Exception {
_etl.extract();
// check the csv file
}
@Test
public void loadTest()
throws Exception {
_etl.load();
// check in database
}
@Test
public void runTest()
throws Exception {
extractTest();
//transformTest();
loadTest();
}
}

View File

@ -36,9 +36,8 @@ public class UserDAO extends AbstractMySQLOpenSourceDAO
"(name, username, password_digest, email, password_digest_type, authentication_type) " +
"VALUES(?, ?, SHA1(?), ? , 'SHA1', 'default')";
private final static String CREATE_LDAP_USER = "INSERT INTO users " +
"(name, username, email, department_number, password_digest_type) " +
"VALUES(?, ?, ?, ?, 'SHA1')";
private final static String CREATE_LDAP_USER =
"INSERT INTO users (name, username, email, department_number, authentication_type) VALUES(?, ?, ?, ?, 'LDAP')";
private final static String GET_USER_COUNT = "SELECT COUNT(*) FROM users WHERE username = ?";
@ -55,6 +54,9 @@ public class UserDAO extends AbstractMySQLOpenSourceDAO
"display_name as name, 'indiviual person' as category FROM dir_external_user_info " +
"UNION SELECT DISTINCT group_id as id, NULL as name, 'group' as category FROM dir_external_group_user_map";
private final static String INSERT_USER_LOGIN_HISTORY =
"INSERT INTO user_login_history (username, authentication_type, `status`, message) VALUES (?, ?, ?, ?)";
private final static String PASSWORD_COLUMN = "password_digest";
private final static String DEFAULT_DETAIL_VIEW = "accordion";
@ -336,4 +338,10 @@ public class UserDAO extends AbstractMySQLOpenSourceDAO
}
return userEntities;
}
public static void insertLoginHistory(String username, String loginType, String status, String message) {
if (username != null && loginType != null && status != null) {
getJdbcTemplate().update(INSERT_USER_LOGIN_HISTORY, username, loginType, status, message);
}
}
}

View File

@ -39,154 +39,143 @@ import javax.naming.directory.SearchResult;
public class AuthenticationManager {
public static String MASTER_LDAPURL_URL_KEY = "authentication.ldap.url";
public static String MASTER_LDAP_URL_KEY = "authentication.ldap.url";
public static String MASTER_PRINCIPAL_DOMAIN_KEY = "authentication.principal.domain";
public static String LDAP_CONTEXT_FACTORY_CLASS_KEY = "authentication.ldap.context_factory_class";
public static String LDAP_SEARCH_BASE_KEY = "authentication.ldap.search.base";
public static String MASTER_PRICIPAL_DOMAIN_KEY = "authentication.principal.domain";
public static String LDAP_CONTEXT_FACTORY_CLASS_KEY = "authentication.ldap.context_factory_class";
public static String LDAP_DISPLAYNAME_KEY = "displayName";
public static String LDAP_MAIL_KEY = "mail";
public static String LDAP_DEPARTMENT_NUMBER_KEY = "departmentNumber";
public static void authenticateUser(String userName, String password) throws NamingException, SQLException {
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
throw new IllegalArgumentException("Username and password can not be blank.");
}
if (UserDAO.authenticate(userName, password))
{
return;
}
else
{
DirContext ctx = null;
try
{
Hashtable<String, String> env = buildEnvContext(userName, password);
ctx = new InitialDirContext(env);
if (!UserDAO.userExist(userName))
{
User user = getAttributes(userName);
UserDAO.addLdapUser(user);
}
}
catch(NamingException e)
{
Logger.error("Ldap authentication failed for user: " + userName);
Logger.error(e.getMessage());
throw(e);
}
catch(SQLException e)
{
Logger.error("Ldap authentication failed for user: " + userName);
Logger.error(e.getMessage());
throw(e);
}
finally {
if (ctx != null)
ctx.close();
}
}
public static String LDAP_DISPLAY_NAME_KEY = "displayName";
public static String LDAP_MAIL_KEY = "mail";
public static String LDAP_DEPARTMENT_NUMBER_KEY = "departmentNumber";
public static void authenticateUser(String userName, String password)
throws NamingException, SQLException {
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
throw new IllegalArgumentException("Username and password can not be blank.");
}
private static Hashtable<String, String> buildEnvContext(String username,
String password) {
Hashtable<String, String> env = new Hashtable<String, String>(11);
env.put(Context.INITIAL_CONTEXT_FACTORY, Play.application().configuration().getString(LDAP_CONTEXT_FACTORY_CLASS_KEY));
env.put(Context.PROVIDER_URL, Play.application().configuration().getString(MASTER_LDAPURL_URL_KEY));
env.put(Context.SECURITY_PRINCIPAL,
username + Play.application().configuration().getString(MASTER_PRICIPAL_DOMAIN_KEY));
env.put(Context.SECURITY_CREDENTIALS, password);
return env;
if (UserDAO.authenticate(userName, password)) {
UserDAO.insertLoginHistory(userName, "default", "SUCCESS", null);
return;
}
public static Map<String, String> getUserAttributes(String userName,
String... attributeNames) throws NamingException
{
if (StringUtils.isBlank(userName))
{
throw new IllegalArgumentException("Username and password can not be blank.");
final String contextFactories = Play.application().configuration().getString(LDAP_CONTEXT_FACTORY_CLASS_KEY);
/* three LDAP properties, each is a '|' separated string of same number of tokens. e.g.
Url: "ldaps://ldap1.abc.com:1234|ldap://ldap2.abc.com:5678"
Principal Domain: "@abc.com|@abc.cn"
Search Base: "ou=Staff Users,dc=abc,dc=com|ou=Staff Users,dc=abc,dc=cn"
*/
final String[] ldapUrls = Play.application().configuration().getString(MASTER_LDAP_URL_KEY).split("\\s*\\|\\s*");
final String[] principalDomains =
Play.application().configuration().getString(MASTER_PRINCIPAL_DOMAIN_KEY).split("\\s*\\|\\s*");
final String[] ldapSearchBase =
Play.application().configuration().getString(LDAP_SEARCH_BASE_KEY).split("\\s*\\|\\s*");
DirContext ctx = null;
for (int i = 0; i < ldapUrls.length; i++) {
try {
Hashtable<String, String> env =
buildEnvContext(userName, password, contextFactories, ldapUrls[i], principalDomains[i]);
ctx = new InitialDirContext(env);
if (!UserDAO.userExist(userName)) {
User user = getAttributes(ctx, ldapSearchBase[i], userName, principalDomains[i]);
UserDAO.addLdapUser(user);
}
if (attributeNames.length == 0)
{
return Collections.emptyMap();
break;
} catch (NamingException e) {
// Logger.error("Ldap authentication failed for user " + userName + " - " + principalDomains[i] + " - " + ldapUrls[i], e);
UserDAO.insertLoginHistory(userName, "LDAP", "FAILURE", ldapUrls[i] + e.getMessage());
// if exhausted all ldap options and can't authenticate user
if (i >= ldapUrls.length - 1) {
throw e;
}
Hashtable<String, String> env = buildEnvContext("elabldap", "2authISg00d");
DirContext ctx = new InitialDirContext(env);
Attributes matchAttr = new BasicAttributes(true);
BasicAttribute basicAttr =
new BasicAttribute("userPrincipalName", userName + "@linkedin.biz");
matchAttr.put(basicAttr);
NamingEnumeration<? extends SearchResult> searchResult =
ctx.search("ou=Staff Users,dc=linkedin,dc=biz", matchAttr, attributeNames);
if (ctx != null)
{
ctx.close();
} catch (SQLException e) {
// Logger.error("Ldap authentication SQL error for user: " + userName, e);
UserDAO.insertLoginHistory(userName, "LDAP", "FAILURE", ldapUrls[i] + e.getMessage());
throw e;
} finally {
if (ctx != null) {
ctx.close();
}
}
}
UserDAO.insertLoginHistory(userName, "LDAP", "SUCCESS", null);
}
Map<String, String> result = new HashMap<String, String>();
private static Hashtable<String, String> buildEnvContext(String username, String password, String contextFactory,
String ldapUrl, String principalDomain) {
Hashtable<String, String> env = new Hashtable<>(11);
env.put(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
env.put(Context.PROVIDER_URL, ldapUrl);
env.put(Context.SECURITY_PRINCIPAL, username + principalDomain);
env.put(Context.SECURITY_CREDENTIALS, password);
return env;
}
if (searchResult.hasMore())
{
NamingEnumeration<? extends Attribute> attributes =
searchResult.next().getAttributes().getAll();
while (attributes.hasMore())
{
Attribute attr = attributes.next();
String attrId = attr.getID();
String attrValue = (String) attr.get();
result.put(attrId, attrValue);
}
}
return result;
public static Map<String, String> getUserAttributes(DirContext ctx, String searchBase, String userName,
String principalDomain, String... attributeNames)
throws NamingException {
if (StringUtils.isBlank(userName)) {
throw new IllegalArgumentException("Username and password can not be blank.");
}
public static User getAttributes(String userName) throws NamingException, SQLException
{
Map<String, String> userDetailMap = getUserAttributes(userName,
LDAP_DISPLAYNAME_KEY,
LDAP_MAIL_KEY,
LDAP_DEPARTMENT_NUMBER_KEY);
String displayName = userDetailMap.get(LDAP_DISPLAYNAME_KEY);
displayName = displayName.trim().replaceAll(" +", " ");
String displaNameTokens[] = displayName.split(" ");
String firstName = displaNameTokens[0];
String lastName = displaNameTokens[1];
String email = userDetailMap.get(LDAP_MAIL_KEY);
String department = userDetailMap.get(LDAP_DEPARTMENT_NUMBER_KEY);
int departmentNum = 0;
if (StringUtils.isNotBlank(department))
{
try
{
departmentNum = Integer.parseInt(department);
}
catch(NumberFormatException e)
{
Logger.error("Convert department number failed. Error message: " + e.getMessage());
departmentNum = 0;
}
}
User user = new User();
user.email = email;
user.userName = userName;
user.name = firstName + " " + lastName;
user.departmentNum = departmentNum;
return user;
if (attributeNames.length == 0) {
return Collections.emptyMap();
}
}
Attributes matchAttr = new BasicAttributes(true);
BasicAttribute basicAttr = new BasicAttribute("userPrincipalName", userName + principalDomain);
matchAttr.put(basicAttr);
NamingEnumeration<? extends SearchResult> searchResult = ctx.search(searchBase, matchAttr, attributeNames);
if (ctx != null) {
ctx.close();
}
Map<String, String> result = new HashMap<>();
if (searchResult.hasMore()) {
NamingEnumeration<? extends Attribute> attributes = searchResult.next().getAttributes().getAll();
while (attributes.hasMore()) {
Attribute attr = attributes.next();
String attrId = attr.getID();
String attrValue = (String) attr.get();
result.put(attrId, attrValue);
}
}
return result;
}
public static User getAttributes(DirContext ctx, String searchBase, String userName, String principalDomain)
throws NamingException, SQLException {
Map<String, String> userDetailMap =
getUserAttributes(ctx, searchBase, userName, principalDomain, LDAP_DISPLAY_NAME_KEY, LDAP_MAIL_KEY,
LDAP_DEPARTMENT_NUMBER_KEY);
String displayName = userDetailMap.get(LDAP_DISPLAY_NAME_KEY);
String[] displayNameTokens = displayName.trim().replaceAll(" +", " ").split(" ");
String firstName = displayNameTokens[0];
String lastName = displayNameTokens[1];
String email = userDetailMap.get(LDAP_MAIL_KEY);
String department = userDetailMap.get(LDAP_DEPARTMENT_NUMBER_KEY);
int departmentNum = 0;
if (StringUtils.isNotBlank(department)) {
try {
departmentNum = Integer.parseInt(department);
} catch (NumberFormatException e) {
Logger.error("Convert department number failed. Error message: " + e.getMessage());
departmentNum = 0;
}
}
User user = new User();
user.email = email;
user.userName = userName;
user.name = firstName + " " + lastName;
user.departmentNum = departmentNum;
return user;
}
}

View File

@ -84,6 +84,7 @@ backend.service.url = "$YOUR_BACKEND_SERVICE_URL"
authentication.ldap.url = "$YOUR_LDAP_SERVER"
authentication.ldap.context_factory_class = "com.sun.jndi.ldap.LdapCtxFactory"
authentication.principal.domain = "$YOUR_LDAP_DOMAIN"
authentication.ldap.search.base = "$YOUR_LDAP_SEARCH_BASE"
dataset.hdfs_browser.link = "https://localhost:8888/filebrowser/#"

View File

@ -93,6 +93,8 @@ public class Constant {
public static final String TD_DEFAULT_DATABASE_KEY = "teradata.default_database";
/** Optional. The property_name field in wh_etl_job_property table. Decide whether load sample data or not */
public static final String TD_LOAD_SAMPLE = "teradata.load_sample";
/** The property_name field in wh_etl_job_property table. Collect sample data collection only for certain weekdays */
public static final String TD_COLLECT_SAMPLE_DATA_DAYS = "teradata.collect.sample.data.days";
// Hdfs
/** The property_name field in wh_etl_job_property table. Whether using remote mode or not */
@ -206,4 +208,9 @@ public class Constant {
public static final String GIT_PROJECT_OUTPUT_KEY = "git.project.metadata";
public static final String PRODUCT_REPO_OUTPUT_KEY = "product.repo.metadata";
public static final String PRODUCT_REPO_OWNER_OUTPUT_KEY = "product.repo.owner";
// code search
public static final String DATABASE_SCM_REPO_OUTPUT_KEY = "database.scm.repo";
public static final String BASE_URL_KEY = "base.url.key";
}

View File

@ -15,12 +15,11 @@ package wherehows.common.enums;
public enum OwnerType {
PRODUCER(10),
DELEGATE(20),
CONSUMER(30),
AUDITOR(40),
STAKEHOLDER(50);
// the precedence (from high to low) is: OWNER, PRODUCER, DELEGATE, STAKEHOLDER
OWNER(20),
PRODUCER(40),
DELEGATE(60),
STAKEHOLDER(80);
private int numVal;
@ -45,12 +44,12 @@ public enum OwnerType {
} catch (NullPointerException | IllegalArgumentException ex) {
}
int type2value = 110;
int type2value = 100;
try {
type2value = OwnerType.valueOf(type2.toUpperCase()).value();
} catch (NullPointerException | IllegalArgumentException ex) {
}
return type1value < type2value ? type1 : type2;
return type1value <= type2value ? type1 : type2;
}
}

View File

@ -25,17 +25,19 @@ public class AzkabanFlowScheduleRecord extends AbstractRecord {
String flowPath;
String unit;
Integer frequency;
String cronExpression;
Long effectiveStartTime;
Long effectiveEndTime;
String refId;
Long whExecId;
public AzkabanFlowScheduleRecord(Integer appId, String flowPath, String unit, Integer frequency,
Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) {
String cronExpression, Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) {
this.appId = appId;
this.flowPath = flowPath;
this.unit = unit;
this.frequency = frequency;
this.cronExpression = cronExpression;
this.effectiveStartTime = effectiveStartTime;
this.effectiveEndTime = effectiveEndTime;
this.refId = refId;
@ -49,6 +51,7 @@ public class AzkabanFlowScheduleRecord extends AbstractRecord {
allFields.add(flowPath);
allFields.add(unit);
allFields.add(frequency);
allFields.add(cronExpression);
allFields.add(effectiveStartTime);
allFields.add(effectiveEndTime);
allFields.add(refId);

View File

@ -26,6 +26,7 @@ public class DatasetDeploymentRecord extends AbstractRecord {
String region;
String zone;
String cluster;
String container;
Boolean enabled;
Map<String, String> additionalDeploymentInfo;
Long modifiedTime;
@ -33,7 +34,7 @@ public class DatasetDeploymentRecord extends AbstractRecord {
@Override
public String[] getDbColumnNames() {
return new String[]{"dataset_id", "dataset_urn", "deployment_tier", "datacenter", "region", "zone", "cluster",
"enabled", "additional_info", "modified_time"};
"container", "enabled", "additional_info", "modified_time"};
}
@Override
@ -100,6 +101,14 @@ public class DatasetDeploymentRecord extends AbstractRecord {
this.cluster = cluster;
}
public String getContainer() {
return container;
}
public void setContainer(String container) {
this.container = container;
}
public Boolean getEnabled() {
return enabled;
}

View File

@ -0,0 +1,47 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.List;
public class DatasetEntityRecord extends AbstractRecord {
String identifierType;
String identifierField;
@Override
public List<Object> fillAllFields() {
return null;
}
public DatasetEntityRecord() {
}
public String getIdentifierType() {
return identifierType;
}
public void setIdentifierType(String identifierType) {
this.identifierType = identifierType;
}
public String getIdentifierField() {
return identifierField;
}
public void setIdentifierField(String identifierField) {
this.identifierField = identifierField;
}
}

View File

@ -42,7 +42,6 @@ public class DatasetFieldSchemaRecord extends AbstractRecord {
String charType;
Integer precision;
Integer scale;
String confidentialFlags;
Boolean isRecursive;
Boolean partitioned;
Boolean indexed;
@ -55,7 +54,7 @@ public class DatasetFieldSchemaRecord extends AbstractRecord {
return new String[]{"dataset_id", "position", "parent_field_position", "field_json_path", "field_path", "parent_path",
"field_name", "label", "aliases", "type", "logical_type", "semantic_type", "abstract_type", "description",
"nullable", "default_value", "max_byte_length", "max_char_length", "char_type", "precision", "scale",
"confidential_flags", "is_recursive", "partitioned", "indexed", "namespace", "default_comment_id", "comment_ids"};
"is_recursive", "partitioned", "indexed", "namespace", "default_comment_id", "comment_ids"};
}
@Override
@ -67,8 +66,7 @@ public class DatasetFieldSchemaRecord extends AbstractRecord {
public String[] getFieldDetailColumns() {
return new String[]{"dataset_id", "sort_id", "parent_sort_id", "parent_path", "field_name", "fields_layout_id",
"field_label", "data_type", "data_size", "data_precision", "data_fraction", "is_nullable", "is_indexed",
"is_partitioned", "is_recursive", "confidential_flags", "default_value", "namespace", "default_comment_id",
"comment_ids"};
"is_partitioned", "is_recursive", "default_value", "namespace", "default_comment_id", "comment_ids"};
}
@JsonIgnore
@ -76,7 +74,7 @@ public class DatasetFieldSchemaRecord extends AbstractRecord {
return new Object[]{datasetId, position, parentFieldPosition, parentPath, fieldName, 0, label, type, maxCharLength,
precision, scale, nullable != null && nullable ? "Y" : "N", indexed != null && indexed ? "Y" : "N",
partitioned != null && partitioned ? "Y" : "N", isRecursive != null && isRecursive ? "Y" : "N",
confidentialFlags, defaultValue, namespace, defaultCommentId, commentIds};
defaultValue, namespace, defaultCommentId, commentIds};
}
public DatasetFieldSchemaRecord() {
@ -250,14 +248,6 @@ public class DatasetFieldSchemaRecord extends AbstractRecord {
this.scale = scale;
}
public String getConfidentialFlags() {
return confidentialFlags;
}
public void setConfidentialFlags(String confidentialFlags) {
this.confidentialFlags = confidentialFlags;
}
public Boolean getRecursive() {
return isRecursive;
}

View File

@ -0,0 +1,56 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.List;
public class DatasetKeySchemaRecord extends AbstractRecord {
String keyType;
String format;
String text;
@Override
public List<Object> fillAllFields() {
return null;
}
public DatasetKeySchemaRecord() {
}
public String getKeyType() {
return keyType;
}
public void setKeyType(String keyType) {
this.keyType = keyType;
}
public String getFormat() {
return format;
}
public void setFormat(String format) {
this.format = format;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
}

View File

@ -0,0 +1,66 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.List;
import java.util.Map;
public class DatasetOriginalSchemaRecord extends AbstractRecord {
String format;
String name;
String text;
Map<String, String> checksum;
@Override
public List<Object> fillAllFields() {
return null;
}
public DatasetOriginalSchemaRecord() {
}
public String getFormat() {
return format;
}
public void setFormat(String format) {
this.format = format;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
public Map<String, String> getChecksum() {
return checksum;
}
public void setChecksum(Map<String, String> checksum) {
this.checksum = checksum;
}
}

View File

@ -24,8 +24,7 @@ public class DatasetOwnerRecord extends AbstractRecord {
String datasetUrn;
Integer appId;
String ownerCategory;
String ownerSubCategory;
String owner;
String ownerUrn;
String ownerType;
String isGroup;
String isActive;
@ -41,7 +40,7 @@ public class DatasetOwnerRecord extends AbstractRecord {
@Override
public String[] getDbColumnNames() {
return new String[]{"dataset_id", "dataset_urn", "app_id", "owner_type", "owner_sub_type", "owner_id",
return new String[]{"dataset_id", "dataset_urn", "app_id", "owner_type", "owner_id",
"owner_id_type", "is_group", "is_active", "sort_id", "namespace", "owner_source", "db_ids",
"source_time", "created_time", "modified_time", "confirmed_by", "confirmed_on"};
}
@ -50,7 +49,7 @@ public class DatasetOwnerRecord extends AbstractRecord {
public List<Object> fillAllFields() {
List<Object> allFields = new ArrayList<>();
allFields.add(datasetUrn);
allFields.add(owner);
allFields.add(ownerUrn);
allFields.add(sortId);
allFields.add(namespace);
allFields.add(dbIds);
@ -60,13 +59,13 @@ public class DatasetOwnerRecord extends AbstractRecord {
@JsonIgnore
public String[] getDbColumnForUnmatchedOwner() {
return new String[]{"dataset_urn", "app_id", "owner_type", "owner_sub_type", "owner_id", "owner_id_type",
return new String[]{"dataset_urn", "app_id", "owner_type", "owner_id", "owner_id_type",
"is_group", "is_active", "sort_id", "namespace", "owner_source", "db_name", "db_id", "source_time"};
}
@JsonIgnore
public Object[] getValuesForUnmatchedOwner() {
return new Object[]{datasetUrn, appId, ownerCategory, ownerSubCategory, owner, ownerType, isGroup,
return new Object[]{datasetUrn, appId, ownerCategory, ownerUrn, ownerType, isGroup,
isActive, sortId, namespace, ownerSource, "N/A", dbIds, sourceTime};
}
@ -76,7 +75,7 @@ public class DatasetOwnerRecord extends AbstractRecord {
public DatasetOwnerRecord(String datasetUrn, String ownerId, Integer sortId, String namespace, String dbName,
Long sourceTime) {
this.datasetUrn = datasetUrn;
this.owner = ownerId;
this.ownerUrn = ownerId;
this.sortId = sortId;
this.namespace = namespace;
this.dbIds = dbName;
@ -115,20 +114,12 @@ public class DatasetOwnerRecord extends AbstractRecord {
this.ownerCategory = ownerCategory;
}
public String getOwnerSubCategory() {
return ownerSubCategory;
public String getOwnerUrn() {
return ownerUrn;
}
public void setOwnerSubCategory(String ownerSubCategory) {
this.ownerSubCategory = ownerSubCategory;
}
public String getOwner() {
return owner;
}
public void setOwner(String owner) {
this.owner = owner;
public void setOwnerUrn(String ownerUrn) {
this.ownerUrn = ownerUrn;
}
public String getOwnerType() {

View File

@ -23,7 +23,7 @@ public class DatasetPartitionKeyRecord {
String partitionLevel;
String partitionType;
String timeFormat;
String granularity;
String timeGranularity;
List<String> fieldNames;
List<String> partitionValues;
Integer numberOfHashBuckets;
@ -55,12 +55,12 @@ public class DatasetPartitionKeyRecord {
this.timeFormat = timeFormat;
}
public String getGranularity() {
return granularity;
public String getTimeGranularity() {
return timeGranularity;
}
public void setGranularity(String granularity) {
this.granularity = granularity;
public void setTimeGranularity(String timeGranularity) {
this.timeGranularity = timeGranularity;
}
public List<String> getFieldNames() {

View File

@ -20,7 +20,7 @@ public class DatasetPartitionRecord extends AbstractRecord {
Integer datasetId;
String datasetUrn;
String totalPartitionLevel;
Integer totalPartitionLevel;
String partitionSpecText;
Boolean hasTimePartition;
Boolean hasHashPartition;
@ -58,11 +58,11 @@ public class DatasetPartitionRecord extends AbstractRecord {
this.datasetUrn = datasetUrn;
}
public String getTotalPartitionLevel() {
public Integer getTotalPartitionLevel() {
return totalPartitionLevel;
}
public void setTotalPartitionLevel(String totalPartitionLevel) {
public void setTotalPartitionLevel(Integer totalPartitionLevel) {
this.totalPartitionLevel = totalPartitionLevel;
}

View File

@ -27,12 +27,8 @@ public class DatasetSchemaInfoRecord extends AbstractRecord {
String version;
String name;
String description;
String format;
String originalSchema;
Map<String, String> originalSchemaChecksum;
String keySchemaType;
String keySchemaFormat;
String keySchema;
DatasetOriginalSchemaRecord originalSchema;
DatasetKeySchemaRecord keySchema;
Boolean isFieldNameCaseSensitive;
List<DatasetFieldSchemaRecord> fieldSchema;
List<DatasetFieldPathRecord> changeDataCaptureFields;
@ -42,9 +38,8 @@ public class DatasetSchemaInfoRecord extends AbstractRecord {
@Override
public String[] getDbColumnNames() {
return new String[]{"dataset_id", "dataset_urn", "is_latest_revision", "create_time", "revision", "version", "name",
"description", "format", "original_schema", "original_schema_checksum", "key_schema_type", "key_schema_format",
"key_schema", "is_field_name_case_sensitive", "field_schema", "change_data_capture_fields", "audit_fields",
"modified_time"};
"description", "original_schema", "key_schema", "is_field_name_case_sensitive", "field_schema",
"change_data_capture_fields", "audit_fields", "modified_time"};
}
@Override
@ -119,51 +114,19 @@ public class DatasetSchemaInfoRecord extends AbstractRecord {
this.description = description;
}
public String getFormat() {
return format;
}
public void setFormat(String format) {
this.format = format;
}
public String getOriginalSchema() {
public DatasetOriginalSchemaRecord getOriginalSchema() {
return originalSchema;
}
public void setOriginalSchema(String originalSchema) {
public void setOriginalSchema(DatasetOriginalSchemaRecord originalSchema) {
this.originalSchema = originalSchema;
}
public Map<String, String> getOriginalSchemaChecksum() {
return originalSchemaChecksum;
}
public void setOriginalSchemaChecksum(Map<String, String> originalSchemaChecksum) {
this.originalSchemaChecksum = originalSchemaChecksum;
}
public String getKeySchemaType() {
return keySchemaType;
}
public void setKeySchemaType(String keySchemaType) {
this.keySchemaType = keySchemaType;
}
public String getKeySchemaFormat() {
return keySchemaFormat;
}
public void setKeySchemaFormat(String keySchemaFormat) {
this.keySchemaFormat = keySchemaFormat;
}
public String getKeySchema() {
public DatasetKeySchemaRecord getKeySchema() {
return keySchema;
}
public void setKeySchema(String keySchema) {
public void setKeySchema(DatasetKeySchemaRecord keySchema) {
this.keySchema = keySchema;
}

View File

@ -21,18 +21,18 @@ public class DatasetSecurityRecord extends AbstractRecord {
Integer datasetId;
String datasetUrn;
Map<String, Integer> classification;
Map<String, List<String>> classification;
String recordOwnerType;
List<String> recordOwner;
String complianceType;
List<DatasetEntityRecord> compliancePurgeEntities;
DatasetRetentionRecord retentionPolicy;
DatasetGeographicAffinityRecord geographicAffinity;
Long modifiedTime;
@Override
public String[] getDbColumnNames() {
return new String[]{"dataset_id", "dataset_urn", "classification", "record_owner_type", "record_owner",
"compliance_type", "retention_policy", "geographic_affinity", "modified_time"};
return new String[]{"dataset_id", "dataset_urn", "classification", "record_owner_type", "compliance_purge_type",
"compliance_purge_entities", "retention_policy", "geographic_affinity", "modified_time"};
}
@Override
@ -59,11 +59,11 @@ public class DatasetSecurityRecord extends AbstractRecord {
this.datasetUrn = datasetUrn;
}
public Map<String, Integer> getClassification() {
public Map<String, List<String>> getClassification() {
return classification;
}
public void setClassification(Map<String, Integer> classification) {
public void setClassification(Map<String, List<String>> classification) {
this.classification = classification;
}
@ -75,12 +75,12 @@ public class DatasetSecurityRecord extends AbstractRecord {
this.recordOwnerType = recordOwnerType;
}
public List<String> getRecordOwner() {
return recordOwner;
public List<DatasetEntityRecord> getCompliancePurgeEntities() {
return compliancePurgeEntities;
}
public void setRecordOwner(List<String> recordOwner) {
this.recordOwner = recordOwner;
public void setCompliancePurgeEntities(List<DatasetEntityRecord> compliancePurgeEntities) {
this.compliancePurgeEntities = compliancePurgeEntities;
}
public String getComplianceType() {

View File

@ -18,7 +18,7 @@ package wherehows.common.schemas;
*/
public class OozieFlowScheduleRecord extends AzkabanFlowScheduleRecord {
public OozieFlowScheduleRecord(Integer appId, String flowPath, String frequency, Integer interval,
Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) {
super(appId, flowPath, frequency, interval, effectiveStartTime, effectiveEndTime, refId, whExecId);
String cronExpression, Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) {
super(appId, flowPath, frequency, interval, cronExpression, effectiveStartTime, effectiveEndTime, refId, whExecId);
}
}

View File

@ -0,0 +1,53 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.ArrayList;
import java.util.List;
public class SCMOwnerRecord extends AbstractRecord {
String scmUrl;
String databaseName;
String databaseType;
String appName;
String filePath;
String committers;
String scmType;
public SCMOwnerRecord(String scmUrl, String databaseName, String databaseType, String appName, String filePath,
String committers, String scmType) {
this.scmUrl = scmUrl;
this.databaseName = databaseName;
this.databaseType = databaseType;
this.appName = appName;
this.filePath = filePath;
this.committers = committers;
this.scmType = scmType;
}
@Override
public List<Object> fillAllFields() {
List<Object> allFields = new ArrayList<>();
allFields.add(scmUrl);
allFields.add(databaseName);
allFields.add(databaseType);
allFields.add(appName);
allFields.add(filePath);
allFields.add(committers);
allFields.add(scmType);
return allFields;
}
}

View File

@ -15,7 +15,6 @@ package wherehows.common.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@ -51,6 +50,11 @@ public class StringUtil {
}
}
/**
* Convert objects of type Collection, Map, Array and AbstractRecord to Json string
* @param obj
* @return
*/
public static Object objectToJsonString(Object obj) {
if (obj instanceof Collection || obj instanceof Map || obj instanceof Object[] || obj instanceof Record) {
try {
@ -62,10 +66,6 @@ public class StringUtil {
return obj;
}
public static String toStr(Object obj) {
return obj != null ? obj.toString() : null;
}
public static Long toLong(Object obj) {
return obj != null ? Long.valueOf(obj.toString()) : null;
}
@ -91,4 +91,40 @@ public class StringUtil {
}
return metadata;
}
/**
* Parse Long value from a String, if null or exception, return 0
* @param text String
* @return long
*/
public static long parseLong(String text) {
try {
return Long.parseLong(text);
} catch (NumberFormatException e) {
return 0;
}
}
/**
* Parse Integer value from a String, if null or exception, return 0
* @param text String
* @return int
*/
public static int parseInteger(String text) {
try {
return Integer.parseInt(text);
} catch (NumberFormatException e) {
return 0;
}
}
/**
* Object to string, replace null/"null" with replacement string
* @param obj Object
* @return String
*/
public static String toStringReplaceNull(Object obj, String replacement) {
String string = String.valueOf(obj);
return string == null || string.equals("null") ? replacement : string;
}
}