From fdc3ba03d1e41f09c3c82b2469e48bf727bd99c9 Mon Sep 17 00:00:00 2001 From: "Yi (Alan) Wang" Date: Tue, 12 Jun 2018 15:58:33 -0700 Subject: [PATCH] Refactor MIE consumer to be used in Samza (#1203) --- gradle/scripts/dependency.gradle | 4 +- .../MetadataInventoryProcessor.java | 48 ++++++++----------- .../java/wherehows/utils/ProcessorUtil.java | 20 ++++++++ .../wherehows/util/ProcessorUtilTest.java | 23 +++++++++ 4 files changed, 66 insertions(+), 29 deletions(-) diff --git a/gradle/scripts/dependency.gradle b/gradle/scripts/dependency.gradle index 77c10b3f32..58bebb5bb5 100644 --- a/gradle/scripts/dependency.gradle +++ b/gradle/scripts/dependency.gradle @@ -42,8 +42,8 @@ ext.externalDependency = [ "parquet_avro" : "org.apache.parquet:parquet-avro:1.8.1", - "hibernate_core" : "org.hibernate:hibernate-core:5.2.5.Final", - "hikaricp" : "com.zaxxer:HikariCP:2.6.3", + "hibernate_core" : "org.hibernate:hibernate-core:5.2.16.Final", + "hikaricp" : "com.zaxxer:HikariCP:2.7.8", "lombok" : "org.projectlombok:lombok:1.16.20", "guava" : "com.google.guava:guava:19.0", diff --git a/wherehows-kafka/src/main/java/wherehows/processors/MetadataInventoryProcessor.java b/wherehows-kafka/src/main/java/wherehows/processors/MetadataInventoryProcessor.java index 8befd295de..c08e2ec346 100644 --- a/wherehows-kafka/src/main/java/wherehows/processors/MetadataInventoryProcessor.java +++ b/wherehows-kafka/src/main/java/wherehows/processors/MetadataInventoryProcessor.java @@ -17,6 +17,7 @@ import com.linkedin.events.metadata.ChangeAuditStamp; import com.linkedin.events.metadata.DataOrigin; import com.linkedin.events.metadata.DatasetIdentifier; import com.linkedin.events.metadata.FailedMetadataInventoryEvent; +import com.linkedin.events.metadata.MetadataChangeEvent; import com.linkedin.events.metadata.MetadataInventoryEvent; import com.typesafe.config.Config; import java.util.List; @@ -27,13 +28,13 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.IndexedRecord; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.clients.producer.KafkaProducer; -import wherehows.dao.DaoFactory; -import wherehows.dao.table.DictDatasetDao; -import wherehows.dao.view.DatasetViewDao; import wherehows.common.exceptions.UnauthorizedException; +import wherehows.dao.DaoFactory; +import wherehows.dao.view.DatasetViewDao; import wherehows.utils.ProcessorUtil; import static wherehows.util.UrnUtil.*; +import static wherehows.utils.ProcessorUtil.*; @Slf4j @@ -43,14 +44,11 @@ public class MetadataInventoryProcessor extends KafkaMessageProcessor { private final DatasetViewDao _datasetViewDao; - private final DictDatasetDao _dictDatasetDao; - public MetadataInventoryProcessor(Config config, DaoFactory daoFactory, String producerTopic, KafkaProducer producer) { super(producerTopic, producer); _datasetViewDao = daoFactory.getDatasetViewDao(); - _dictDatasetDao = daoFactory.getDictDatasetDao(); _whitelistActors = ProcessorUtil.getWhitelistedActors(config, "whitelist.mie"); @@ -70,27 +68,26 @@ public class MetadataInventoryProcessor extends KafkaMessageProcessor { final MetadataInventoryEvent event = (MetadataInventoryEvent) indexedRecord; try { - processEvent(event); + for (MetadataChangeEvent mce : processEvent(event)) { + sendMessage(mce); + log.info("set " + mce.datasetIdentifier + " removed"); + } } catch (Exception exception) { log.error("MIE Processor Error:", exception); log.error("Message content: {}", event.toString()); - sendMessage(newFailedEvent(event, exception)); } } - public void processEvent(MetadataInventoryEvent event) throws Exception { + public List processEvent(MetadataInventoryEvent event) throws Exception { final ChangeAuditStamp changeAuditStamp = event.changeAuditStamp; - String actorUrn = changeAuditStamp.actorUrn == null ? null : changeAuditStamp.actorUrn.toString(); + final String actorUrn = changeAuditStamp.actorUrn == null ? null : changeAuditStamp.actorUrn.toString(); if (_whitelistActors != null && !_whitelistActors.contains(actorUrn)) { throw new UnauthorizedException("Actor " + actorUrn + " not in whitelist, skip processing"); } final String platformUrn = event.dataPlatformUrn.toString(); - final String platform = getUrnEntity(platformUrn); - final DataOrigin origin = event.dataOrigin; - final String namespace = event.namespace.toString(); log.info("Processing MIE for " + platform + " " + origin + " " + namespace); @@ -99,24 +96,21 @@ public class MetadataInventoryProcessor extends KafkaMessageProcessor { event.exclusionPatterns.stream().map(s -> Pattern.compile(s.toString())).collect(Collectors.toList()); final List names = event.nativeNames.stream().map(CharSequence::toString).collect(Collectors.toList()); - log.info("new datasets: " + names); + log.debug("new datasets: " + names); final List existingDatasets = _datasetViewDao.listFullNames(platform, origin.name(), namespace); - log.info("existing datasets: " + existingDatasets); + log.debug("existing datasets: " + existingDatasets); - for (String removedDataset : ProcessorUtil.listDiffWithExclusion(existingDatasets, names, exclusions)) { - try { - DatasetIdentifier identifier = new DatasetIdentifier(); - identifier.dataPlatformUrn = platformUrn; - identifier.dataOrigin = origin; - identifier.nativeName = removedDataset; + // find removed datasets by diff + return ProcessorUtil.listDiffWithExclusion(existingDatasets, names, exclusions).stream().map(datasetName -> { + // send MCE to DELETE dataset + DatasetIdentifier identifier = new DatasetIdentifier(); + identifier.dataPlatformUrn = platformUrn; + identifier.dataOrigin = origin; + identifier.nativeName = datasetName; - _dictDatasetDao.setDatasetRemoved(identifier, true, changeAuditStamp); - log.info("set " + removedDataset + " removed"); - } catch (Exception e) { - log.error("Fail to mark dataset " + removedDataset + " as removed", e); - } - } + return mceDelete(identifier, actorUrn); + }).collect(Collectors.toList()); } public FailedMetadataInventoryEvent newFailedEvent(MetadataInventoryEvent event, Throwable throwable) { diff --git a/wherehows-kafka/src/main/java/wherehows/utils/ProcessorUtil.java b/wherehows-kafka/src/main/java/wherehows/utils/ProcessorUtil.java index c5c6aa9d32..c02766810d 100644 --- a/wherehows-kafka/src/main/java/wherehows/utils/ProcessorUtil.java +++ b/wherehows-kafka/src/main/java/wherehows/utils/ProcessorUtil.java @@ -13,6 +13,10 @@ */ package wherehows.utils; +import com.linkedin.events.metadata.ChangeAuditStamp; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.events.metadata.DatasetIdentifier; +import com.linkedin.events.metadata.MetadataChangeEvent; import com.typesafe.config.Config; import java.util.Arrays; import java.util.HashSet; @@ -61,4 +65,20 @@ public class ProcessorUtil { return new HashSet<>(Arrays.asList(actors.split(";"))); } + + /** + * Create MCE to DELETE the dataset + */ + public static MetadataChangeEvent mceDelete(@Nonnull DatasetIdentifier dataset, String actor) { + MetadataChangeEvent mce = new MetadataChangeEvent(); + mce.datasetIdentifier = dataset; + + ChangeAuditStamp auditStamp = new ChangeAuditStamp(); + auditStamp.actorUrn = actor; + auditStamp.time = System.currentTimeMillis(); + auditStamp.type = ChangeType.DELETE; + mce.changeAuditStamp = auditStamp; + + return mce; + } } diff --git a/wherehows-kafka/src/test/java/wherehows/util/ProcessorUtilTest.java b/wherehows-kafka/src/test/java/wherehows/util/ProcessorUtilTest.java index 0b7e37c789..6cf1c17acc 100644 --- a/wherehows-kafka/src/test/java/wherehows/util/ProcessorUtilTest.java +++ b/wherehows-kafka/src/test/java/wherehows/util/ProcessorUtilTest.java @@ -14,6 +14,10 @@ package wherehows.util; import com.google.common.collect.ImmutableSet; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.events.metadata.DataOrigin; +import com.linkedin.events.metadata.DatasetIdentifier; +import com.linkedin.events.metadata.MetadataChangeEvent; import com.typesafe.config.Config; import java.util.ArrayList; import java.util.Arrays; @@ -74,4 +78,23 @@ public class ProcessorUtilTest { assertEquals(actors, null); } + + @Test + public void testMceDelete() { + String actor = "tester"; + DatasetIdentifier dataset = makeDataset("test"); + MetadataChangeEvent mce = ProcessorUtil.mceDelete(dataset, actor); + + assertEquals(mce.datasetIdentifier, dataset); + assertEquals(mce.changeAuditStamp.type, ChangeType.DELETE); + assertEquals(mce.changeAuditStamp.actorUrn, actor); + } + + private DatasetIdentifier makeDataset(String datasetName) { + DatasetIdentifier identifier = new DatasetIdentifier(); + identifier.nativeName = datasetName; + identifier.dataPlatformUrn = "urn:li:dataPlatform:hive"; + identifier.dataOrigin = DataOrigin.DEV; + return identifier; + } }