Use changeAuditStamp for MCE/MLE whitelist validation (#928)

This commit is contained in:
Yi (Alan) Wang 2018-01-05 11:21:56 -08:00 committed by GitHub
parent 7274eaf0ae
commit 0cd9d68258
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 68 additions and 24 deletions

View File

@ -15,7 +15,6 @@ package wherehows.dao.table;
import com.linkedin.events.metadata.DatasetLineage;
import com.linkedin.events.metadata.DeploymentDetail;
import com.linkedin.events.metadata.agent;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -27,13 +26,13 @@ public class LineageDao {
/**
* Create lineage dataset that requested the lineage via Kafka lineage event.
* @param appType agent that created the lineage info
* @param actorUrn String actor Urn
* @param datasetLineages List of lineages
* @param deployment deployment environment i.e. PROD, DEV, EI and etc
* @return return process result as true/false
*/
public Boolean createLineages(@Nonnull agent appType, @Nonnull List<DatasetLineage> datasetLineages,
@Nullable DeploymentDetail deployment) {
public Boolean createLineages(@Nullable String actorUrn, @Nonnull List<DatasetLineage> datasetLineages,
@Nullable DeploymentDetail deployment) throws Exception {
// TODO: write lineage Dao to DB
throw new UnsupportedOperationException("Lineage not implemented yet.");
}

View File

@ -7,7 +7,7 @@
{
"name": "actorUrn",
"type": [ "null", "string" ],
"doc": "urn:li:corpuser:jsmith, urn:li:team:xyz, urn:li:service:money"
"doc": "urn:li:corpuser:ldap, urn:li:corpGroup:abc, and urn:li:multiProduct:mp_name"
},
{
"name": "impersonatorUrn",

View File

@ -9,6 +9,12 @@
"type": "com.linkedin.events.KafkaAuditHeader",
"doc": "This header records information about the context of an event as it is emitted into kafka and is intended to be used by the kafka audit application. For more information see go/kafkaauditheader"
},
{
"name": "changeAuditStamp",
"type": ["null", "ChangeAuditStamp"],
"doc": "Required, marked as optional for backward compatibility. The change auditing information, including actor, change type and timestamp, etc",
"default": null
},
{
"name": "type",
"type": {
@ -30,7 +36,8 @@
"TERADATA",
"UMP",
"VENICE",
"VOLDEMORT_BNP"
"VOLDEMORT_BNP",
"UNUSED"
],
"symbolDocs": {
"AZKABAN": "Azkaban - batch workflow job scheduler",
@ -48,10 +55,11 @@
"TERADATA": "Teradata - Data Warehouse",
"UMP": "UMP - Unified Metrics Pipeline",
"VENICE": "Venice - Derived Data Platform",
"VOLDEMORT": "Voldemort - Distributed Key Value Storage"
"VOLDEMORT": "Voldemort - Distributed Key Value Storage",
"UNUSED": "Always use this value as the field is now deprecated"
}
},
"doc": "The type name of the application, such as Azkaban, Gobblin, etc."
"doc": "Deprecated. New clients should set this to UNUSED and use actor field in changeAuditStamp instead."
},
{
"name": "name",

View File

@ -13,6 +13,12 @@
"name": "SuggestedFieldClassification",
"namespace": "com.linkedin.events.metadata",
"fields": [
{
"name": "uid",
"type": [ "null", "string" ],
"doc": "Unique identifier for the suggestion.",
"default": null
},
{
"name": "suggestion",
"type": "ComplianceEntity",
@ -37,6 +43,12 @@
"name": "SuggestedDatasetClassification",
"namespace": "com.linkedin.events.metadata",
"fields": [
{
"name": "uid",
"type": [ "null", "string" ],
"doc": "Unique identifier for the suggestion.",
"default": null
},
{
"name": "isContaining",
"type": "boolean",

View File

@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -47,7 +48,8 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor {
private final String whitelistStr = config.hasPath("whitelist.mce") ? config.getString("whitelist.mce") : "";
private final Set<String> whitelistActors = new HashSet<>(Arrays.asList(whitelistStr.split(";")));
private final Set<String> whitelistActors =
StringUtils.isBlank(whitelistStr) ? null : new HashSet<>(Arrays.asList(whitelistStr.split(";")));
private final DictDatasetDao _dictDatasetDao = DAO_FACTORY.getDictDatasetDao();
@ -62,6 +64,7 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor {
public MetadataChangeProcessor(DaoFactory daoFactory, String producerTopic,
KafkaProducer<String, IndexedRecord> producer) {
super(daoFactory, producerTopic, producer);
log.info("MCE whitelist: " + whitelistActors);
}
/**
@ -87,9 +90,11 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor {
private void processEvent(MetadataChangeEvent event) throws Exception {
final ChangeAuditStamp changeAuditStamp = event.changeAuditStamp;
if (whitelistActors.size() > 0 && !whitelistActors.contains(changeAuditStamp.actorUrn.toString())) {
throw new RuntimeException("Actor not in whitelist, skip processing");
String actorUrn = changeAuditStamp.actorUrn == null ? null : changeAuditStamp.actorUrn.toString();
if (whitelistActors != null && !whitelistActors.contains(actorUrn)) {
throw new RuntimeException("Actor " + actorUrn + " not in whitelist, skip processing");
}
final ChangeType changeType = changeAuditStamp.type;
final DatasetIdentifier identifier = event.datasetIdentifier;

View File

@ -13,6 +13,7 @@
*/
package wherehows.processors;
import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.DatasetLineage;
import com.linkedin.events.metadata.DeploymentDetail;
import com.linkedin.events.metadata.FailedMetadataLineageEvent;
@ -21,12 +22,13 @@ import com.linkedin.events.metadata.agent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import wherehows.dao.DaoFactory;
@ -40,14 +42,15 @@ public class MetadataLineageProcessor extends KafkaMessageProcessor {
private final String whitelistStr = config.hasPath("whitelist.mle") ? config.getString("whitelist.mle") : "";
private final Set<agent> whitelistAppTypes =
Arrays.stream(whitelistStr.split(";")).map(agent::valueOf).collect(Collectors.toSet());
private final Set<String> whitelistActors =
StringUtils.isBlank(whitelistStr) ? null : new HashSet<>(Arrays.asList(whitelistStr.split(";")));
private final LineageDao _lineageDao = DAO_FACTORY.getLineageDao();
public MetadataLineageProcessor(DaoFactory daoFactory, String producerTopic,
KafkaProducer<String, IndexedRecord> producer) {
super(daoFactory, producerTopic, producer);
log.info("MLE whitelist: " + whitelistActors);
}
/**
@ -76,26 +79,43 @@ public class MetadataLineageProcessor extends KafkaMessageProcessor {
if (event.lineage == null || event.lineage.size() == 0) {
throw new IllegalArgumentException("No Lineage info in record");
}
log.debug("MLE: " + event.lineage.toString());
agent appType = event.type;
if (whitelistAppTypes.size() > 0 && !whitelistAppTypes.contains(appType)) {
throw new RuntimeException("App Type not in whitelist, skip processing");
String actorUrn = getActorUrn(event);
if (whitelistActors != null && !whitelistActors.contains(actorUrn)) {
throw new RuntimeException("Actor " + actorUrn + " not in whitelist, skip processing");
}
List<DatasetLineage> lineages = event.lineage;
DeploymentDetail deployments = event.deploymentDetail;
// create lineage
_lineageDao.createLineages(appType, lineages, deployments);
_lineageDao.createLineages(actorUrn, lineages, deployments);
}
/**
* Get actor Urn string from app type or change audit stamp
*/
private String getActorUrn(MetadataLineageEvent event) {
// use app type first
if (event.type != agent.UNUSED) {
return "urn:li:multiProduct:" + event.type.name().toLowerCase();
}
// if app type = UNUSED, use actorUrn in ChangeAuditStamp
ChangeAuditStamp auditStamp = event.changeAuditStamp;
if (auditStamp == null || auditStamp.actorUrn == null) {
throw new IllegalArgumentException("Requires ChangeAuditStamp actorUrn if MLE agent is UNUSED");
}
return auditStamp.actorUrn.toString();
}
private FailedMetadataLineageEvent newFailedEvent(MetadataLineageEvent event, Throwable throwable) {
FailedMetadataLineageEvent faileEvent = new FailedMetadataLineageEvent();
faileEvent.time = System.currentTimeMillis();
faileEvent.error = ExceptionUtils.getStackTrace(throwable);
faileEvent.metadataLineageEvent = event;
return faileEvent;
FailedMetadataLineageEvent failedEvent = new FailedMetadataLineageEvent();
failedEvent.time = System.currentTimeMillis();
failedEvent.error = ExceptionUtils.getStackTrace(throwable);
failedEvent.metadataLineageEvent = event;
return failedEvent;
}
}