tracking the GobblinTrackingEvent_autit to get owner information

This commit is contained in:
jbai 2016-09-29 15:01:32 -07:00
parent 137d48a9b0
commit a11e4908dc
8 changed files with 241 additions and 35 deletions

View File

@ -698,6 +698,13 @@ public class DatasetInfoDao {
final JsonNode idNode = root.path("datasetId");
final JsonNode urnNode = root.path("urn");
final JsonNode owners = root.path("owners");
final JsonNode ownerSourceNode = root.path("source");
String ownerSource = null;
if (ownerSourceNode != null && (!ownerSourceNode.isMissingNode()))
{
ownerSource = ownerSourceNode.asText();
}
if ((idNode.isMissingNode() && urnNode.isMissingNode()) || owners.isMissingNode() || !owners.isArray()) {
throw new IllegalArgumentException(
@ -719,7 +726,6 @@ public class DatasetInfoDao {
ObjectMapper om = new ObjectMapper();
List<DatasetOwnerRecord> ownerList = new ArrayList<>();
int sortId = 0;
for (final JsonNode owner : owners) {
DatasetOwnerRecord record = om.convertValue(owner, DatasetOwnerRecord.class);
record.setDatasetId(datasetId);
@ -747,8 +753,6 @@ public class DatasetInfoDao {
record.setIsActive(isActive);
String ownerTypeString = record.getOwnerType();
record.setIsGroup(ownerTypeString != null && ownerTypeString.equalsIgnoreCase("group") ? "Y" : "N");
sortId++;
record.setSortId(sortId);
if (datasetId == 0 || appId == 0) {
String sql = PreparedStatementUtil.prepareInsertTemplateWithColumn(DATASET_OWNER_UNMATCHED_TABLE,
@ -759,29 +763,141 @@ public class DatasetInfoDao {
}
}
List<DatasetOwnerRecord> oldOwnerList = getDatasetOwnerByDatasetUrn(urn);
// merge old owner info into updated owner list
for (DatasetOwnerRecord rec : ownerList) {
for (DatasetOwnerRecord old : oldOwnerList) {
if (rec.getDatasetId().equals(old.getDatasetId()) && rec.getOwner()
.equals(old.getOwner()) && rec.getAppId().equals(old.getAppId())) {
rec.setDbIds(old.getDbIds());
rec.setCreatedTime(StringUtil.toLong(old.getCreatedTime()));
mergeDatasetOwners(ownerList, datasetId, urn, ownerSource);
}
// take the higher priority owner category
rec.setOwnerCategory(OwnerType.chooseOwnerType(rec.getOwnerCategory(), old.getOwnerCategory()));
public static void updateKafkaDatasetOwner(
String datasetUrn,
String owners,
String ownerSource,
Long sourceUnixTime)
throws Exception
{
if (datasetUrn == null)
{
return;
}
// merge owner source as comma separated list
rec.setOwnerSource(mergeOwnerSource(rec.getOwnerSource(), old.getOwnerSource()));
Integer datasetId = 0;
// remove from owner source?
try
{
datasetId = Integer.parseInt(DatasetDao.getDatasetByUrn(datasetUrn).get("id").toString());
}
catch(Exception e)
{
Logger.error("Exception in updateKafkaDatasetOwner: " + e.getMessage());
}
if (datasetId == 0)
{
return;
}
List<DatasetOwnerRecord> ownerList = new ArrayList<DatasetOwnerRecord>();
if (owners != null)
{
String[] ownerArray = owners.split(",");
if (ownerArray != null && ownerArray.length > 0)
{
for(int i = 0; i < ownerArray.length; i++)
{
String ownerName = null;
String namespace = null;
String ownerIdType = null;
String isGroup = "N";
String owner = ownerArray[i];
if (owner != null)
{
int lastIndex = owner.lastIndexOf(':');
if (lastIndex != -1)
{
ownerName = owner.substring(lastIndex+1);
namespace = owner.substring(0, lastIndex);
if (namespace != null && namespace.equalsIgnoreCase("urn:li:griduser"))
{
isGroup = "Y";
ownerIdType = "GROUP";
}
else
{
ownerIdType = "PERSON";
}
}
DatasetOwnerRecord record = new DatasetOwnerRecord();
record.setDatasetId(datasetId);
record.setDatasetUrn(datasetUrn);
record.setOwnerType("Producer");
record.setOwner(ownerName);
record.setOwnerType(ownerIdType);
record.setIsGroup(isGroup);
record.setIsActive("Y");
record.setNamespace(namespace);
record.setOwnerSource(ownerSource);
record.setSourceTime(sourceUnixTime);
record.setCreatedTime(sourceUnixTime);
record.setModifiedTime(System.currentTimeMillis() / 1000);
ownerList.add(record);
}
}
}
}
mergeDatasetOwners(ownerList, datasetId, datasetUrn, ownerSource);
}
public static void mergeDatasetOwners(
List<DatasetOwnerRecord> newOwnerList,
Integer datasetId,
String datasetUrn,
String source)
throws Exception{
List<DatasetOwnerRecord> oldOwnerList = getDatasetOwnerByDatasetUrn(datasetUrn);
Integer sortId = 0;
Map<String, DatasetOwnerRecord> uniqueRecords = new HashMap<String, DatasetOwnerRecord>();
List<DatasetOwnerRecord> combinedList = new ArrayList<DatasetOwnerRecord>();
if (newOwnerList != null)
{
for(DatasetOwnerRecord owner: newOwnerList)
{
owner.setSortId(sortId++);
uniqueRecords.put(owner.getOwner(), owner);
combinedList.add(owner);
}
}
if (oldOwnerList != null)
{
for(DatasetOwnerRecord owner: newOwnerList)
{
DatasetOwnerRecord exist = uniqueRecords.get(owner.getOwner());
if (exist != null)
{
exist.setDbIds(owner.getDbIds());
exist.setCreatedTime(StringUtil.toLong(owner.getCreatedTime()));
// take the higher priority owner category
exist.setOwnerCategory(OwnerType.chooseOwnerType(exist.getOwnerCategory(), owner.getOwnerCategory()));
// merge owner source as comma separated list
exist.setOwnerSource(mergeOwnerSource(exist.getOwnerSource(), owner.getOwnerSource()));
exist.setConfirmedBy(owner.getConfirmedBy());
exist.setConfirmedOn(owner.getConfirmedOn());
}
else
{
if(!(source != null && source.equalsIgnoreCase(owner.getOwnerSource())))
{
owner.setSortId(sortId++);
uniqueRecords.put(owner.getOwner(), owner);
combinedList.add(owner);
}
}
}
}
// remove old info then insert new info
OWNER_WRITER.execute(DELETE_DATASET_OWNER_BY_DATASET_ID, new Object[]{datasetId});
for (DatasetOwnerRecord record : ownerList) {
for (DatasetOwnerRecord record : combinedList) {
OWNER_WRITER.append(record);
}
OWNER_WRITER.insert();

View File

@ -0,0 +1,68 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package utils;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import models.daos.DatasetInfoDao;
import org.apache.avro.generic.GenericData;
import wherehows.common.schemas.Record;
import wherehows.common.utils.StringUtil;
public class GobblinTrackingAuditProcessor{
/**
* Process a Gobblin tracking event audit record
* @param record
* @param topic
* @throws Exception
*/
final private static String DALI_LIMITED_RETENTION_AUDITOR = "DaliLimitedRetentionAuditor";
final private static String DALI_AUTOPURGED_AUDITOR = "DaliAutoPurgeAuditor";
final private static String DS_IGNORE_IDPC_AUDITOR = "DsIgnoreIDPCAuditor";
final private static String DATASET_URN_PREFIX = "hdfs://";
final private static String DATASET_OWNER_SOURCE = "IDPC";
public Record process(GenericData.Record record, String topic) throws Exception {
if (record != null) {
String name = (String) record.get("name");
// only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor"
if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) ||
name.equals(DALI_AUTOPURGED_AUDITOR) ||
name.equals(DS_IGNORE_IDPC_AUDITOR))
{
Long timestamp = (Long) record.get("timestamp");
Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
String hasError = metadata.get("HasError");
if (!hasError.equalsIgnoreCase("true"))
{
String datasetUrn = metadata.get("DatasetPath");
String ownerUrns = metadata.get("OwnerURNs");
DatasetInfoDao.updateKafkaDatasetOwner(
DATASET_URN_PREFIX + datasetUrn,ownerUrns,
DATASET_OWNER_SOURCE,
timestamp);
}
}
}
return null;
}
}

View File

@ -22,6 +22,7 @@ import wherehows.common.schemas.ClusterInfo;
import wherehows.common.schemas.GobblinTrackingCompactionRecord;
import wherehows.common.schemas.Record;
import wherehows.common.utils.ClusterUtil;
import wherehows.common.utils.StringUtil;
/**
@ -54,7 +55,7 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
if (name.equals("CompactionCompleted") || name.equals("CompactionRecordCounts")) {
// logger.info("Processing Gobblin tracking event record: " + name);
final long timestamp = (long) record.get("timestamp");
final Map<String, String> metadata = convertObjectMapToStringMap(record.get("metadata"));
final Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
final String jobContext = "Gobblin:" + name;
final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));

View File

@ -20,6 +20,7 @@ import org.apache.avro.generic.GenericData;
import wherehows.common.schemas.GobblinTrackingDistcpNgRecord;
import wherehows.common.schemas.Record;
import wherehows.common.utils.ClusterUtil;
import wherehows.common.utils.StringUtil;
/**
@ -51,7 +52,7 @@ public class GobblinTrackingDistcpNgProcessor extends KafkaConsumerProcessor {
if (name.equals("DatasetPublished")) { // || name.equals("FilePublished")) {
// logger.info("Processing Gobblin tracking event record: " + name);
final long timestamp = (long) record.get("timestamp");
final Map<String, String> metadata = convertObjectMapToStringMap(record.get("metadata"));
final Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
final String jobContext = "DistcpNG:" + name;
final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));

View File

@ -20,6 +20,7 @@ import org.apache.avro.generic.GenericData;
import wherehows.common.schemas.GobblinTrackingLumosRecord;
import wherehows.common.schemas.Record;
import wherehows.common.utils.ClusterUtil;
import wherehows.common.utils.StringUtil;
public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
@ -57,7 +58,7 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
// only handle "DeltaPublished" and "SnapshotPublished"
if (name.equals("DeltaPublished") || name.equals("SnapshotPublished")) {
final long timestamp = (long) record.get("timestamp");
final Map<String, String> metadata = convertObjectMapToStringMap(record.get("metadata"));
final Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
// logger.info("Processing Gobblin tracking event record: " + name + ", timestamp: " + timestamp);
final String jobContext = "Lumos:" + name;

View File

@ -64,18 +64,4 @@ public abstract class KafkaConsumerProcessor {
return 0;
}
}
/**
* Convert Object with type Map<Object, Object> to Map<String, String>
* @param obj Object with type Map<Object, Object>
* @return Map <String, String>
*/
protected Map<String, String> convertObjectMapToStringMap(Object obj) {
final Map<Object, Object> map = (Map<Object, Object>) obj;
final Map<String, String> metadata = new HashMap<>();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
metadata.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
}
return metadata;
}
}

View File

@ -36,12 +36,14 @@ public class DatasetOwnerRecord extends AbstractRecord {
Long sourceTime;
Long createdTime;
Long modifiedTime;
String confirmedBy;
Long confirmedOn;
@Override
public String[] getDbColumnNames() {
return new String[]{"dataset_id", "dataset_urn", "app_id", "owner_type", "owner_sub_type", "owner_id",
"owner_id_type", "is_group", "is_active", "sort_id", "namespace", "owner_source", "db_ids",
"source_time", "created_time", "modified_time"};
"source_time", "created_time", "modified_time", "confirmed_by", "confirmed_on"};
}
@Override
@ -208,4 +210,20 @@ public class DatasetOwnerRecord extends AbstractRecord {
public void setModifiedTime(Long modifiedTime) {
this.modifiedTime = modifiedTime;
}
public Long getConfirmedOn() {
return confirmedOn;
}
public void setConfirmedOn(Long confirmedOn) {
this.confirmedOn = confirmedOn;
}
public String getConfirmedBy() {
return confirmedBy;
}
public void setConfirmedBy(String confirmedBy) {
this.confirmedBy = confirmedBy;
}
}

View File

@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import wherehows.common.schemas.Record;
@ -76,4 +77,18 @@ public class StringUtil {
public static Boolean toBoolean(Object obj) {
return obj != null ? Boolean.valueOf(obj.toString()) : null;
}
/**
* Convert Object with type Map<Object, Object> to Map<String, String>
* @param obj Object with type Map<Object, Object>
* @return Map <String, String>
*/
public static Map<String, String> convertObjectMapToStringMap(Object obj) {
final Map<Object, Object> map = (Map<Object, Object>) obj;
final Map<String, String> metadata = new HashMap<>();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
metadata.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
}
return metadata;
}
}