diff --git a/backend-service/app/models/daos/DatasetInfoDao.java b/backend-service/app/models/daos/DatasetInfoDao.java index 674c0f0884..89a1e66828 100644 --- a/backend-service/app/models/daos/DatasetInfoDao.java +++ b/backend-service/app/models/daos/DatasetInfoDao.java @@ -698,6 +698,13 @@ public class DatasetInfoDao { final JsonNode idNode = root.path("datasetId"); final JsonNode urnNode = root.path("urn"); final JsonNode owners = root.path("owners"); + final JsonNode ownerSourceNode = root.path("source"); + String ownerSource = null; + + if (ownerSourceNode != null && (!ownerSourceNode.isMissingNode())) + { + ownerSource = ownerSourceNode.asText(); + } if ((idNode.isMissingNode() && urnNode.isMissingNode()) || owners.isMissingNode() || !owners.isArray()) { throw new IllegalArgumentException( @@ -719,7 +726,6 @@ public class DatasetInfoDao { ObjectMapper om = new ObjectMapper(); List ownerList = new ArrayList<>(); - int sortId = 0; for (final JsonNode owner : owners) { DatasetOwnerRecord record = om.convertValue(owner, DatasetOwnerRecord.class); record.setDatasetId(datasetId); @@ -747,8 +753,6 @@ public class DatasetInfoDao { record.setIsActive(isActive); String ownerTypeString = record.getOwnerType(); record.setIsGroup(ownerTypeString != null && ownerTypeString.equalsIgnoreCase("group") ? "Y" : "N"); - sortId++; - record.setSortId(sortId); if (datasetId == 0 || appId == 0) { String sql = PreparedStatementUtil.prepareInsertTemplateWithColumn(DATASET_OWNER_UNMATCHED_TABLE, @@ -759,29 +763,141 @@ public class DatasetInfoDao { } } - List oldOwnerList = getDatasetOwnerByDatasetUrn(urn); - // merge old owner info into updated owner list - for (DatasetOwnerRecord rec : ownerList) { - for (DatasetOwnerRecord old : oldOwnerList) { - if (rec.getDatasetId().equals(old.getDatasetId()) && rec.getOwner() - .equals(old.getOwner()) && rec.getAppId().equals(old.getAppId())) { - rec.setDbIds(old.getDbIds()); - rec.setCreatedTime(StringUtil.toLong(old.getCreatedTime())); + mergeDatasetOwners(ownerList, datasetId, urn, ownerSource); + } - // take the higher priority owner category - rec.setOwnerCategory(OwnerType.chooseOwnerType(rec.getOwnerCategory(), old.getOwnerCategory())); + public static void updateKafkaDatasetOwner( + String datasetUrn, + String owners, + String ownerSource, + Long sourceUnixTime) + throws Exception + { + if (datasetUrn == null) + { + return; + } - // merge owner source as comma separated list - rec.setOwnerSource(mergeOwnerSource(rec.getOwnerSource(), old.getOwnerSource())); + Integer datasetId = 0; - // remove from owner source? + try + { + datasetId = Integer.parseInt(DatasetDao.getDatasetByUrn(datasetUrn).get("id").toString()); + } + catch(Exception e) + { + Logger.error("Exception in updateKafkaDatasetOwner: " + e.getMessage()); + } + if (datasetId == 0) + { + return; + } + + List ownerList = new ArrayList(); + if (owners != null) + { + String[] ownerArray = owners.split(","); + if (ownerArray != null && ownerArray.length > 0) + { + for(int i = 0; i < ownerArray.length; i++) + { + String ownerName = null; + String namespace = null; + String ownerIdType = null; + String isGroup = "N"; + String owner = ownerArray[i]; + if (owner != null) + { + int lastIndex = owner.lastIndexOf(':'); + if (lastIndex != -1) + { + ownerName = owner.substring(lastIndex+1); + namespace = owner.substring(0, lastIndex); + if (namespace != null && namespace.equalsIgnoreCase("urn:li:griduser")) + { + isGroup = "Y"; + ownerIdType = "GROUP"; + } + else + { + ownerIdType = "PERSON"; + } + } + DatasetOwnerRecord record = new DatasetOwnerRecord(); + record.setDatasetId(datasetId); + record.setDatasetUrn(datasetUrn); + record.setOwnerType("Producer"); + record.setOwner(ownerName); + record.setOwnerType(ownerIdType); + record.setIsGroup(isGroup); + record.setIsActive("Y"); + record.setNamespace(namespace); + record.setOwnerSource(ownerSource); + record.setSourceTime(sourceUnixTime); + record.setCreatedTime(sourceUnixTime); + record.setModifiedTime(System.currentTimeMillis() / 1000); + ownerList.add(record); + } } } } + mergeDatasetOwners(ownerList, datasetId, datasetUrn, ownerSource); + } + + public static void mergeDatasetOwners( + List newOwnerList, + Integer datasetId, + String datasetUrn, + String source) + throws Exception{ + List oldOwnerList = getDatasetOwnerByDatasetUrn(datasetUrn); + Integer sortId = 0; + Map uniqueRecords = new HashMap(); + List combinedList = new ArrayList(); + if (newOwnerList != null) + { + for(DatasetOwnerRecord owner: newOwnerList) + { + owner.setSortId(sortId++); + uniqueRecords.put(owner.getOwner(), owner); + combinedList.add(owner); + } + } + + if (oldOwnerList != null) + { + for(DatasetOwnerRecord owner: newOwnerList) + { + DatasetOwnerRecord exist = uniqueRecords.get(owner.getOwner()); + if (exist != null) + { + exist.setDbIds(owner.getDbIds()); + exist.setCreatedTime(StringUtil.toLong(owner.getCreatedTime())); + + // take the higher priority owner category + exist.setOwnerCategory(OwnerType.chooseOwnerType(exist.getOwnerCategory(), owner.getOwnerCategory())); + + // merge owner source as comma separated list + exist.setOwnerSource(mergeOwnerSource(exist.getOwnerSource(), owner.getOwnerSource())); + exist.setConfirmedBy(owner.getConfirmedBy()); + exist.setConfirmedOn(owner.getConfirmedOn()); + } + else + { + if(!(source != null && source.equalsIgnoreCase(owner.getOwnerSource()))) + { + owner.setSortId(sortId++); + uniqueRecords.put(owner.getOwner(), owner); + combinedList.add(owner); + } + } + } + } + // remove old info then insert new info OWNER_WRITER.execute(DELETE_DATASET_OWNER_BY_DATASET_ID, new Object[]{datasetId}); - for (DatasetOwnerRecord record : ownerList) { + for (DatasetOwnerRecord record : combinedList) { OWNER_WRITER.append(record); } OWNER_WRITER.insert(); diff --git a/backend-service/app/utils/GobblinTrackingAuditProcessor.java b/backend-service/app/utils/GobblinTrackingAuditProcessor.java new file mode 100644 index 0000000000..06f03cb2fa --- /dev/null +++ b/backend-service/app/utils/GobblinTrackingAuditProcessor.java @@ -0,0 +1,68 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package utils; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import models.daos.DatasetInfoDao; +import org.apache.avro.generic.GenericData; +import wherehows.common.schemas.Record; +import wherehows.common.utils.StringUtil; + + +public class GobblinTrackingAuditProcessor{ + + + /** + * Process a Gobblin tracking event audit record + * @param record + * @param topic + * @throws Exception + */ + + final private static String DALI_LIMITED_RETENTION_AUDITOR = "DaliLimitedRetentionAuditor"; + final private static String DALI_AUTOPURGED_AUDITOR = "DaliAutoPurgeAuditor"; + final private static String DS_IGNORE_IDPC_AUDITOR = "DsIgnoreIDPCAuditor"; + final private static String DATASET_URN_PREFIX = "hdfs://"; + final private static String DATASET_OWNER_SOURCE = "IDPC"; + + public Record process(GenericData.Record record, String topic) throws Exception { + + if (record != null) { + String name = (String) record.get("name"); + // only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor" + if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) || + name.equals(DALI_AUTOPURGED_AUDITOR) || + name.equals(DS_IGNORE_IDPC_AUDITOR)) + { + Long timestamp = (Long) record.get("timestamp"); + Map metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata")); + + String hasError = metadata.get("HasError"); + if (!hasError.equalsIgnoreCase("true")) + { + String datasetUrn = metadata.get("DatasetPath"); + String ownerUrns = metadata.get("OwnerURNs"); + DatasetInfoDao.updateKafkaDatasetOwner( + DATASET_URN_PREFIX + datasetUrn,ownerUrns, + DATASET_OWNER_SOURCE, + timestamp); + } + } + } + return null; + } +} diff --git a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingCompactionProcessor.java b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingCompactionProcessor.java index 15f8ca31b6..4e76614008 100644 --- a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingCompactionProcessor.java +++ b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingCompactionProcessor.java @@ -22,6 +22,7 @@ import wherehows.common.schemas.ClusterInfo; import wherehows.common.schemas.GobblinTrackingCompactionRecord; import wherehows.common.schemas.Record; import wherehows.common.utils.ClusterUtil; +import wherehows.common.utils.StringUtil; /** @@ -54,7 +55,7 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor { if (name.equals("CompactionCompleted") || name.equals("CompactionRecordCounts")) { // logger.info("Processing Gobblin tracking event record: " + name); final long timestamp = (long) record.get("timestamp"); - final Map metadata = convertObjectMapToStringMap(record.get("metadata")); + final Map metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata")); final String jobContext = "Gobblin:" + name; final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier")); diff --git a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingDistcpNgProcessor.java b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingDistcpNgProcessor.java index 2920d8d9da..730df9ea14 100644 --- a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingDistcpNgProcessor.java +++ b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingDistcpNgProcessor.java @@ -20,6 +20,7 @@ import org.apache.avro.generic.GenericData; import wherehows.common.schemas.GobblinTrackingDistcpNgRecord; import wherehows.common.schemas.Record; import wherehows.common.utils.ClusterUtil; +import wherehows.common.utils.StringUtil; /** @@ -51,7 +52,7 @@ public class GobblinTrackingDistcpNgProcessor extends KafkaConsumerProcessor { if (name.equals("DatasetPublished")) { // || name.equals("FilePublished")) { // logger.info("Processing Gobblin tracking event record: " + name); final long timestamp = (long) record.get("timestamp"); - final Map metadata = convertObjectMapToStringMap(record.get("metadata")); + final Map metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata")); final String jobContext = "DistcpNG:" + name; final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier")); diff --git a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingLumosProcessor.java b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingLumosProcessor.java index 305a732dac..461d030907 100644 --- a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingLumosProcessor.java +++ b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingLumosProcessor.java @@ -20,6 +20,7 @@ import org.apache.avro.generic.GenericData; import wherehows.common.schemas.GobblinTrackingLumosRecord; import wherehows.common.schemas.Record; import wherehows.common.utils.ClusterUtil; +import wherehows.common.utils.StringUtil; public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor { @@ -57,7 +58,7 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor { // only handle "DeltaPublished" and "SnapshotPublished" if (name.equals("DeltaPublished") || name.equals("SnapshotPublished")) { final long timestamp = (long) record.get("timestamp"); - final Map metadata = convertObjectMapToStringMap(record.get("metadata")); + final Map metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata")); // logger.info("Processing Gobblin tracking event record: " + name + ", timestamp: " + timestamp); final String jobContext = "Lumos:" + name; diff --git a/metadata-etl/src/main/java/metadata/etl/kafka/KafkaConsumerProcessor.java b/metadata-etl/src/main/java/metadata/etl/kafka/KafkaConsumerProcessor.java index 1ed07e24df..eb6a2c0e69 100644 --- a/metadata-etl/src/main/java/metadata/etl/kafka/KafkaConsumerProcessor.java +++ b/metadata-etl/src/main/java/metadata/etl/kafka/KafkaConsumerProcessor.java @@ -64,18 +64,4 @@ public abstract class KafkaConsumerProcessor { return 0; } } - - /** - * Convert Object with type Map to Map - * @param obj Object with type Map - * @return Map - */ - protected Map convertObjectMapToStringMap(Object obj) { - final Map map = (Map) obj; - final Map metadata = new HashMap<>(); - for (Map.Entry entry : map.entrySet()) { - metadata.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); - } - return metadata; - } } diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetOwnerRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetOwnerRecord.java index b9c4bc1363..cacca867ef 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetOwnerRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetOwnerRecord.java @@ -36,12 +36,14 @@ public class DatasetOwnerRecord extends AbstractRecord { Long sourceTime; Long createdTime; Long modifiedTime; + String confirmedBy; + Long confirmedOn; @Override public String[] getDbColumnNames() { return new String[]{"dataset_id", "dataset_urn", "app_id", "owner_type", "owner_sub_type", "owner_id", "owner_id_type", "is_group", "is_active", "sort_id", "namespace", "owner_source", "db_ids", - "source_time", "created_time", "modified_time"}; + "source_time", "created_time", "modified_time", "confirmed_by", "confirmed_on"}; } @Override @@ -208,4 +210,20 @@ public class DatasetOwnerRecord extends AbstractRecord { public void setModifiedTime(Long modifiedTime) { this.modifiedTime = modifiedTime; } + + public Long getConfirmedOn() { + return confirmedOn; + } + + public void setConfirmedOn(Long confirmedOn) { + this.confirmedOn = confirmedOn; + } + + public String getConfirmedBy() { + return confirmedBy; + } + + public void setConfirmedBy(String confirmedBy) { + this.confirmedBy = confirmedBy; + } } diff --git a/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java b/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java index 336519874b..a3de188687 100644 --- a/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java +++ b/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import wherehows.common.schemas.Record; @@ -76,4 +77,18 @@ public class StringUtil { public static Boolean toBoolean(Object obj) { return obj != null ? Boolean.valueOf(obj.toString()) : null; } + + /** + * Convert Object with type Map to Map + * @param obj Object with type Map + * @return Map + */ + public static Map convertObjectMapToStringMap(Object obj) { + final Map map = (Map) obj; + final Map metadata = new HashMap<>(); + for (Map.Entry entry : map.entrySet()) { + metadata.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); + } + return metadata; + } }