Refactor MIE consumer to be used in Samza (#1203)

This commit is contained in:
Yi (Alan) Wang 2018-06-12 15:58:33 -07:00 committed by GitHub
parent 8655c407c9
commit fdc3ba03d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 29 deletions

View File

@ -42,8 +42,8 @@ ext.externalDependency = [
"parquet_avro" : "org.apache.parquet:parquet-avro:1.8.1",
"hibernate_core" : "org.hibernate:hibernate-core:5.2.5.Final",
"hikaricp" : "com.zaxxer:HikariCP:2.6.3",
"hibernate_core" : "org.hibernate:hibernate-core:5.2.16.Final",
"hikaricp" : "com.zaxxer:HikariCP:2.7.8",
"lombok" : "org.projectlombok:lombok:1.16.20",
"guava" : "com.google.guava:guava:19.0",

View File

@ -17,6 +17,7 @@ import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.DataOrigin;
import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.FailedMetadataInventoryEvent;
import com.linkedin.events.metadata.MetadataChangeEvent;
import com.linkedin.events.metadata.MetadataInventoryEvent;
import com.typesafe.config.Config;
import java.util.List;
@ -27,13 +28,13 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import wherehows.dao.DaoFactory;
import wherehows.dao.table.DictDatasetDao;
import wherehows.dao.view.DatasetViewDao;
import wherehows.common.exceptions.UnauthorizedException;
import wherehows.dao.DaoFactory;
import wherehows.dao.view.DatasetViewDao;
import wherehows.utils.ProcessorUtil;
import static wherehows.util.UrnUtil.*;
import static wherehows.utils.ProcessorUtil.*;
@Slf4j
@ -43,14 +44,11 @@ public class MetadataInventoryProcessor extends KafkaMessageProcessor {
private final DatasetViewDao _datasetViewDao;
private final DictDatasetDao _dictDatasetDao;
public MetadataInventoryProcessor(Config config, DaoFactory daoFactory, String producerTopic,
KafkaProducer<String, IndexedRecord> producer) {
super(producerTopic, producer);
_datasetViewDao = daoFactory.getDatasetViewDao();
_dictDatasetDao = daoFactory.getDictDatasetDao();
_whitelistActors = ProcessorUtil.getWhitelistedActors(config, "whitelist.mie");
@ -70,27 +68,26 @@ public class MetadataInventoryProcessor extends KafkaMessageProcessor {
final MetadataInventoryEvent event = (MetadataInventoryEvent) indexedRecord;
try {
processEvent(event);
for (MetadataChangeEvent mce : processEvent(event)) {
sendMessage(mce);
log.info("set " + mce.datasetIdentifier + " removed");
}
} catch (Exception exception) {
log.error("MIE Processor Error:", exception);
log.error("Message content: {}", event.toString());
sendMessage(newFailedEvent(event, exception));
}
}
public void processEvent(MetadataInventoryEvent event) throws Exception {
public List<MetadataChangeEvent> processEvent(MetadataInventoryEvent event) throws Exception {
final ChangeAuditStamp changeAuditStamp = event.changeAuditStamp;
String actorUrn = changeAuditStamp.actorUrn == null ? null : changeAuditStamp.actorUrn.toString();
final String actorUrn = changeAuditStamp.actorUrn == null ? null : changeAuditStamp.actorUrn.toString();
if (_whitelistActors != null && !_whitelistActors.contains(actorUrn)) {
throw new UnauthorizedException("Actor " + actorUrn + " not in whitelist, skip processing");
}
final String platformUrn = event.dataPlatformUrn.toString();
final String platform = getUrnEntity(platformUrn);
final DataOrigin origin = event.dataOrigin;
final String namespace = event.namespace.toString();
log.info("Processing MIE for " + platform + " " + origin + " " + namespace);
@ -99,24 +96,21 @@ public class MetadataInventoryProcessor extends KafkaMessageProcessor {
event.exclusionPatterns.stream().map(s -> Pattern.compile(s.toString())).collect(Collectors.toList());
final List<String> names = event.nativeNames.stream().map(CharSequence::toString).collect(Collectors.toList());
log.info("new datasets: " + names);
log.debug("new datasets: " + names);
final List<String> existingDatasets = _datasetViewDao.listFullNames(platform, origin.name(), namespace);
log.info("existing datasets: " + existingDatasets);
log.debug("existing datasets: " + existingDatasets);
for (String removedDataset : ProcessorUtil.listDiffWithExclusion(existingDatasets, names, exclusions)) {
try {
DatasetIdentifier identifier = new DatasetIdentifier();
identifier.dataPlatformUrn = platformUrn;
identifier.dataOrigin = origin;
identifier.nativeName = removedDataset;
// find removed datasets by diff
return ProcessorUtil.listDiffWithExclusion(existingDatasets, names, exclusions).stream().map(datasetName -> {
// send MCE to DELETE dataset
DatasetIdentifier identifier = new DatasetIdentifier();
identifier.dataPlatformUrn = platformUrn;
identifier.dataOrigin = origin;
identifier.nativeName = datasetName;
_dictDatasetDao.setDatasetRemoved(identifier, true, changeAuditStamp);
log.info("set " + removedDataset + " removed");
} catch (Exception e) {
log.error("Fail to mark dataset " + removedDataset + " as removed", e);
}
}
return mceDelete(identifier, actorUrn);
}).collect(Collectors.toList());
}
public FailedMetadataInventoryEvent newFailedEvent(MetadataInventoryEvent event, Throwable throwable) {

View File

@ -13,6 +13,10 @@
*/
package wherehows.utils;
import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.MetadataChangeEvent;
import com.typesafe.config.Config;
import java.util.Arrays;
import java.util.HashSet;
@ -61,4 +65,20 @@ public class ProcessorUtil {
return new HashSet<>(Arrays.asList(actors.split(";")));
}
/**
* Create MCE to DELETE the dataset
*/
public static MetadataChangeEvent mceDelete(@Nonnull DatasetIdentifier dataset, String actor) {
MetadataChangeEvent mce = new MetadataChangeEvent();
mce.datasetIdentifier = dataset;
ChangeAuditStamp auditStamp = new ChangeAuditStamp();
auditStamp.actorUrn = actor;
auditStamp.time = System.currentTimeMillis();
auditStamp.type = ChangeType.DELETE;
mce.changeAuditStamp = auditStamp;
return mce;
}
}

View File

@ -14,6 +14,10 @@
package wherehows.util;
import com.google.common.collect.ImmutableSet;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.events.metadata.DataOrigin;
import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.MetadataChangeEvent;
import com.typesafe.config.Config;
import java.util.ArrayList;
import java.util.Arrays;
@ -74,4 +78,23 @@ public class ProcessorUtilTest {
assertEquals(actors, null);
}
@Test
public void testMceDelete() {
String actor = "tester";
DatasetIdentifier dataset = makeDataset("test");
MetadataChangeEvent mce = ProcessorUtil.mceDelete(dataset, actor);
assertEquals(mce.datasetIdentifier, dataset);
assertEquals(mce.changeAuditStamp.type, ChangeType.DELETE);
assertEquals(mce.changeAuditStamp.actorUrn, actor);
}
private DatasetIdentifier makeDataset(String datasetName) {
DatasetIdentifier identifier = new DatasetIdentifier();
identifier.nativeName = datasetName;
identifier.dataPlatformUrn = "urn:li:dataPlatform:hive";
identifier.dataOrigin = DataOrigin.DEV;
return identifier;
}
}