From af0840f11acdf19b55bb8e27a3cc8cd6f677b2a2 Mon Sep 17 00:00:00 2001 From: "Yi (Alan) Wang" Date: Tue, 12 Jun 2018 16:07:22 -0700 Subject: [PATCH] Add dedupe and modify validation in MLE processor (#1204) --- .../processors/MetadataLineageProcessor.java | 31 +++++--- .../java/wherehows/utils/ProcessorUtil.java | 10 +++ .../MetadataLineageProcessorTest.java | 73 +++++++++---------- .../wherehows/util/ProcessorUtilTest.java | 13 +++- 4 files changed, 77 insertions(+), 50 deletions(-) diff --git a/wherehows-kafka/src/main/java/wherehows/processors/MetadataLineageProcessor.java b/wherehows-kafka/src/main/java/wherehows/processors/MetadataLineageProcessor.java index e57801f29d..8b28d1871c 100644 --- a/wherehows-kafka/src/main/java/wherehows/processors/MetadataLineageProcessor.java +++ b/wherehows-kafka/src/main/java/wherehows/processors/MetadataLineageProcessor.java @@ -13,16 +13,16 @@ */ package wherehows.processors; -import com.google.common.collect.Sets; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.events.metadata.ChangeAuditStamp; +import com.linkedin.events.metadata.DatasetIdentifier; import com.linkedin.events.metadata.DatasetLineage; -import com.linkedin.events.metadata.DeploymentDetail; import com.linkedin.events.metadata.FailedMetadataLineageEvent; import com.linkedin.events.metadata.JobStatus; import com.linkedin.events.metadata.MetadataLineageEvent; import com.linkedin.events.metadata.agent; import com.typesafe.config.Config; -import java.util.HashSet; +import java.util.ArrayList; import java.util.List; import java.util.Set; import lombok.extern.slf4j.Slf4j; @@ -35,6 +35,8 @@ import wherehows.dao.DaoFactory; import wherehows.dao.table.LineageDao; import wherehows.utils.ProcessorUtil; +import static wherehows.utils.ProcessorUtil.*; + @Slf4j public class MetadataLineageProcessor extends KafkaMessageProcessor { @@ -93,12 +95,12 @@ public class MetadataLineageProcessor extends KafkaMessageProcessor { } List lineages = event.lineage; - validateLineages(lineages); - - DeploymentDetail deployments = event.deploymentDetail; + for (DatasetLineage lineage : lineages) { + dedupeAndValidateLineage(lineage); + } // create lineage - _lineageDao.createLineages(actorUrn, lineages, deployments); + _lineageDao.createLineages(actorUrn, lineages, event.deploymentDetail); } /** @@ -126,11 +128,16 @@ public class MetadataLineageProcessor extends KafkaMessageProcessor { return failedEvent; } - private void validateLineages(List lineages) { - for (DatasetLineage lineage : lineages) { - if (Sets.intersection(new HashSet(lineage.sourceDataset), new HashSet(lineage.destinationDataset)).size() > 0) { - throw new SelfLineageException("Source & destination datasets shouldn't overlap"); - } + @VisibleForTesting + void dedupeAndValidateLineage(DatasetLineage lineage) { + lineage.sourceDataset = dedupeDatasets(lineage.sourceDataset); + lineage.destinationDataset = dedupeDatasets(lineage.destinationDataset); + + // check intersection of source and destination + List intersection = new ArrayList<>(lineage.sourceDataset); + intersection.retainAll(lineage.destinationDataset); + if (intersection.size() > 0) { + throw new SelfLineageException("Source & destination datasets shouldn't overlap"); } } } diff --git a/wherehows-kafka/src/main/java/wherehows/utils/ProcessorUtil.java b/wherehows-kafka/src/main/java/wherehows/utils/ProcessorUtil.java index c02766810d..2e92b419fc 100644 --- a/wherehows-kafka/src/main/java/wherehows/utils/ProcessorUtil.java +++ b/wherehows-kafka/src/main/java/wherehows/utils/ProcessorUtil.java @@ -34,6 +34,16 @@ public class ProcessorUtil { private ProcessorUtil() { } + /** + * Remove duplicate datasets in the list + * @param datasets List of DatasetIdentifier + * @return de-duped list + */ + @Nonnull + public static List dedupeDatasets(@Nonnull List datasets) { + return datasets.stream().distinct().collect(Collectors.toList()); + } + /** * Find the diff from existing list to the updated list, with exclusion patterns. * @param existing List diff --git a/wherehows-kafka/src/test/java/wherehows/processors/MetadataLineageProcessorTest.java b/wherehows-kafka/src/test/java/wherehows/processors/MetadataLineageProcessorTest.java index 5098eaad56..51d1e1770b 100644 --- a/wherehows-kafka/src/test/java/wherehows/processors/MetadataLineageProcessorTest.java +++ b/wherehows-kafka/src/test/java/wherehows/processors/MetadataLineageProcessorTest.java @@ -13,9 +13,7 @@ */ package wherehows.processors; -import com.google.common.collect.ImmutableList; import com.linkedin.events.metadata.ChangeAuditStamp; -import com.linkedin.events.metadata.DataOrigin; import com.linkedin.events.metadata.DatasetIdentifier; import com.linkedin.events.metadata.DatasetLineage; import com.linkedin.events.metadata.JobExecution; @@ -23,47 +21,58 @@ import com.linkedin.events.metadata.JobStatus; import com.linkedin.events.metadata.MetadataLineageEvent; import com.linkedin.events.metadata.agent; import com.typesafe.config.Config; +import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.apache.avro.generic.IndexedRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; +import wherehows.common.exceptions.SelfLineageException; import wherehows.dao.DaoFactory; import wherehows.dao.table.LineageDao; -import wherehows.common.exceptions.SelfLineageException; import static org.mockito.Mockito.*; import static org.testng.Assert.*; +import static wherehows.util.ProcessorUtilTest.*; public class MetadataLineageProcessorTest { - private static final String TOPIC = "testTopic"; - private Config _mockConfig; - private DaoFactory _mockDaoFactory; - private LineageDao _mockLineageDao; private MetadataLineageProcessor _processor; - private KafkaProducer _mockProducer; @BeforeTest public void setup() { - _mockConfig = mock(Config.class); - _mockLineageDao = mock(LineageDao.class); - _mockDaoFactory = mock(DaoFactory.class); - when(_mockDaoFactory.getLineageDao()).thenReturn(_mockLineageDao); - _mockProducer = mock(KafkaProducer.class); + Config mockConfig = mock(Config.class); + when(mockConfig.hasPath("whitelist.mle")).thenReturn(false); + DaoFactory mockDaoFactory = mock(DaoFactory.class); + when(mockDaoFactory.getLineageDao()).thenReturn(mock(LineageDao.class)); + + _processor = new MetadataLineageProcessor(mockConfig, mockDaoFactory, "topic", mock(KafkaProducer.class)); + } + + @Test + public void testDedupeDatasets() { + DatasetIdentifier ds1 = makeDataset("foo"); + DatasetIdentifier ds2 = makeDataset("foo"); + DatasetIdentifier ds3 = makeDataset("bar"); + DatasetIdentifier ds4 = makeDataset("bar"); + DatasetIdentifier ds5 = makeDataset("bar"); + DatasetLineage lineage = new DatasetLineage(); + lineage.sourceDataset = Arrays.asList(ds1, ds2); + lineage.destinationDataset = Arrays.asList(ds3, ds4, ds5); + + _processor.dedupeAndValidateLineage(lineage); + assertEquals(lineage.sourceDataset, Collections.singletonList(ds1)); + assertEquals(lineage.destinationDataset, Collections.singletonList(ds3)); } @Test(expectedExceptions = SelfLineageException.class) public void testSelfLineageValid() throws Exception { - when(_mockConfig.hasPath("whitelist.mle")).thenReturn(false); - _processor = new MetadataLineageProcessor(_mockConfig, _mockDaoFactory, TOPIC, _mockProducer); - DatasetIdentifier src1 = newDatasetIdentifier("oracle", "foo", DataOrigin.DEV); - DatasetIdentifier src2 = newDatasetIdentifier("oracle", "bar", DataOrigin.DEV); - DatasetIdentifier dest = newDatasetIdentifier("oracle", "foo", DataOrigin.DEV); + DatasetIdentifier src1 = makeDataset("foo"); + DatasetIdentifier src2 = makeDataset("bar"); + DatasetIdentifier dest = makeDataset("foo"); MetadataLineageEvent mle = - newMetadataLineageEvent("foo", ImmutableList.of(src1, src2), ImmutableList.of(dest), JobStatus.SUCCEEDED); + newMetadataLineageEvent(Arrays.asList(src1, src2), Arrays.asList(dest), JobStatus.SUCCEEDED); _processor.processEvent(mle); } @@ -71,28 +80,26 @@ public class MetadataLineageProcessorTest { // JobStatus RUNNING invalid MLE event @Test public void testLineageInvalid() { - when(_mockConfig.hasPath("whitelist.mle")).thenReturn(false); - _processor = new MetadataLineageProcessor(_mockConfig, _mockDaoFactory, TOPIC, _mockProducer); - DatasetIdentifier src1 = newDatasetIdentifier("oracle", "foo", DataOrigin.DEV); - DatasetIdentifier src2 = newDatasetIdentifier("oracle", "bar", DataOrigin.DEV); - DatasetIdentifier dest = newDatasetIdentifier("oracle", "foo", DataOrigin.DEV); + DatasetIdentifier src1 = makeDataset("foo"); + DatasetIdentifier src2 = makeDataset("bar"); + DatasetIdentifier dest = makeDataset("foo"); MetadataLineageEvent mle = - newMetadataLineageEvent("foo", ImmutableList.of(src1, src2), ImmutableList.of(dest), JobStatus.RUNNING); + newMetadataLineageEvent(Arrays.asList(src1, src2), Arrays.asList(dest), JobStatus.RUNNING); try { _processor.processEvent(mle); - } catch(Exception ex) { + } catch (Exception ex) { assertTrue(ex instanceof UnsupportedOperationException); } } - private MetadataLineageEvent newMetadataLineageEvent(String actorUrn, List sourceDatasets, + private MetadataLineageEvent newMetadataLineageEvent(List sourceDatasets, List destinationDataset, JobStatus jobStatus) { MetadataLineageEvent mle = new MetadataLineageEvent(); mle.type = agent.UNUSED; mle.changeAuditStamp = new ChangeAuditStamp(); - mle.changeAuditStamp.actorUrn = actorUrn; + mle.changeAuditStamp.actorUrn = "tester"; mle.jobExecution = new JobExecution(); mle.jobExecution.status = jobStatus; @@ -103,12 +110,4 @@ public class MetadataLineageProcessorTest { return mle; } - - private DatasetIdentifier newDatasetIdentifier(String platform, String name, DataOrigin dataOrigin) { - DatasetIdentifier datasetIdentifier = new DatasetIdentifier(); - datasetIdentifier.dataPlatformUrn = "urn:li:dataPlatform:" + platform; - datasetIdentifier.nativeName = name; - datasetIdentifier.dataOrigin = dataOrigin; - return datasetIdentifier; - } } diff --git a/wherehows-kafka/src/test/java/wherehows/util/ProcessorUtilTest.java b/wherehows-kafka/src/test/java/wherehows/util/ProcessorUtilTest.java index 6cf1c17acc..3b7aaa210e 100644 --- a/wherehows-kafka/src/test/java/wherehows/util/ProcessorUtilTest.java +++ b/wherehows-kafka/src/test/java/wherehows/util/ProcessorUtilTest.java @@ -34,6 +34,17 @@ import static org.testng.Assert.*; public class ProcessorUtilTest { + @Test + public void testDedupeDatasets() { + DatasetIdentifier ds1 = makeDataset("test1"); + DatasetIdentifier ds2 = makeDataset("test2"); + DatasetIdentifier ds3 = makeDataset("test1"); + List datasets = Arrays.asList(ds1, ds2, ds3); + + List deduped = ProcessorUtil.dedupeDatasets(datasets); + assertEquals(deduped, Arrays.asList(ds1, ds2)); + } + @Test public void testListDiffWithExclusion() { List existing = new ArrayList<>(Arrays.asList("a", "b", "c")); @@ -90,7 +101,7 @@ public class ProcessorUtilTest { assertEquals(mce.changeAuditStamp.actorUrn, actor); } - private DatasetIdentifier makeDataset(String datasetName) { + public static DatasetIdentifier makeDataset(String datasetName) { DatasetIdentifier identifier = new DatasetIdentifier(); identifier.nativeName = datasetName; identifier.dataPlatformUrn = "urn:li:dataPlatform:hive";