Add dedupe and modify validation in MLE processor (#1204)

This commit is contained in:
Yi (Alan) Wang 2018-06-12 16:07:22 -07:00 committed by GitHub
parent fdc3ba03d1
commit af0840f11a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 50 deletions

View File

@ -13,16 +13,16 @@
*/
package wherehows.processors;
import com.google.common.collect.Sets;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.DatasetLineage;
import com.linkedin.events.metadata.DeploymentDetail;
import com.linkedin.events.metadata.FailedMetadataLineageEvent;
import com.linkedin.events.metadata.JobStatus;
import com.linkedin.events.metadata.MetadataLineageEvent;
import com.linkedin.events.metadata.agent;
import com.typesafe.config.Config;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
@ -35,6 +35,8 @@ import wherehows.dao.DaoFactory;
import wherehows.dao.table.LineageDao;
import wherehows.utils.ProcessorUtil;
import static wherehows.utils.ProcessorUtil.*;
@Slf4j
public class MetadataLineageProcessor extends KafkaMessageProcessor {
@ -93,12 +95,12 @@ public class MetadataLineageProcessor extends KafkaMessageProcessor {
}
List<DatasetLineage> lineages = event.lineage;
validateLineages(lineages);
DeploymentDetail deployments = event.deploymentDetail;
for (DatasetLineage lineage : lineages) {
dedupeAndValidateLineage(lineage);
}
// create lineage
_lineageDao.createLineages(actorUrn, lineages, deployments);
_lineageDao.createLineages(actorUrn, lineages, event.deploymentDetail);
}
/**
@ -126,11 +128,16 @@ public class MetadataLineageProcessor extends KafkaMessageProcessor {
return failedEvent;
}
private void validateLineages(List<DatasetLineage> lineages) {
for (DatasetLineage lineage : lineages) {
if (Sets.intersection(new HashSet(lineage.sourceDataset), new HashSet(lineage.destinationDataset)).size() > 0) {
throw new SelfLineageException("Source & destination datasets shouldn't overlap");
}
@VisibleForTesting
void dedupeAndValidateLineage(DatasetLineage lineage) {
lineage.sourceDataset = dedupeDatasets(lineage.sourceDataset);
lineage.destinationDataset = dedupeDatasets(lineage.destinationDataset);
// check intersection of source and destination
List<DatasetIdentifier> intersection = new ArrayList<>(lineage.sourceDataset);
intersection.retainAll(lineage.destinationDataset);
if (intersection.size() > 0) {
throw new SelfLineageException("Source & destination datasets shouldn't overlap");
}
}
}

View File

@ -34,6 +34,16 @@ public class ProcessorUtil {
private ProcessorUtil() {
}
/**
* Remove duplicate datasets in the list
* @param datasets List of DatasetIdentifier
* @return de-duped list
*/
@Nonnull
public static List<DatasetIdentifier> dedupeDatasets(@Nonnull List<DatasetIdentifier> datasets) {
return datasets.stream().distinct().collect(Collectors.toList());
}
/**
* Find the diff from existing list to the updated list, with exclusion patterns.
* @param existing List<String>

View File

@ -13,9 +13,7 @@
*/
package wherehows.processors;
import com.google.common.collect.ImmutableList;
import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.DataOrigin;
import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.DatasetLineage;
import com.linkedin.events.metadata.JobExecution;
@ -23,47 +21,58 @@ import com.linkedin.events.metadata.JobStatus;
import com.linkedin.events.metadata.MetadataLineageEvent;
import com.linkedin.events.metadata.agent;
import com.typesafe.config.Config;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import wherehows.common.exceptions.SelfLineageException;
import wherehows.dao.DaoFactory;
import wherehows.dao.table.LineageDao;
import wherehows.common.exceptions.SelfLineageException;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;
import static wherehows.util.ProcessorUtilTest.*;
public class MetadataLineageProcessorTest {
private static final String TOPIC = "testTopic";
private Config _mockConfig;
private DaoFactory _mockDaoFactory;
private LineageDao _mockLineageDao;
private MetadataLineageProcessor _processor;
private KafkaProducer<String, IndexedRecord> _mockProducer;
@BeforeTest
public void setup() {
_mockConfig = mock(Config.class);
_mockLineageDao = mock(LineageDao.class);
_mockDaoFactory = mock(DaoFactory.class);
when(_mockDaoFactory.getLineageDao()).thenReturn(_mockLineageDao);
_mockProducer = mock(KafkaProducer.class);
Config mockConfig = mock(Config.class);
when(mockConfig.hasPath("whitelist.mle")).thenReturn(false);
DaoFactory mockDaoFactory = mock(DaoFactory.class);
when(mockDaoFactory.getLineageDao()).thenReturn(mock(LineageDao.class));
_processor = new MetadataLineageProcessor(mockConfig, mockDaoFactory, "topic", mock(KafkaProducer.class));
}
@Test
public void testDedupeDatasets() {
DatasetIdentifier ds1 = makeDataset("foo");
DatasetIdentifier ds2 = makeDataset("foo");
DatasetIdentifier ds3 = makeDataset("bar");
DatasetIdentifier ds4 = makeDataset("bar");
DatasetIdentifier ds5 = makeDataset("bar");
DatasetLineage lineage = new DatasetLineage();
lineage.sourceDataset = Arrays.asList(ds1, ds2);
lineage.destinationDataset = Arrays.asList(ds3, ds4, ds5);
_processor.dedupeAndValidateLineage(lineage);
assertEquals(lineage.sourceDataset, Collections.singletonList(ds1));
assertEquals(lineage.destinationDataset, Collections.singletonList(ds3));
}
@Test(expectedExceptions = SelfLineageException.class)
public void testSelfLineageValid() throws Exception {
when(_mockConfig.hasPath("whitelist.mle")).thenReturn(false);
_processor = new MetadataLineageProcessor(_mockConfig, _mockDaoFactory, TOPIC, _mockProducer);
DatasetIdentifier src1 = newDatasetIdentifier("oracle", "foo", DataOrigin.DEV);
DatasetIdentifier src2 = newDatasetIdentifier("oracle", "bar", DataOrigin.DEV);
DatasetIdentifier dest = newDatasetIdentifier("oracle", "foo", DataOrigin.DEV);
DatasetIdentifier src1 = makeDataset("foo");
DatasetIdentifier src2 = makeDataset("bar");
DatasetIdentifier dest = makeDataset("foo");
MetadataLineageEvent mle =
newMetadataLineageEvent("foo", ImmutableList.of(src1, src2), ImmutableList.of(dest), JobStatus.SUCCEEDED);
newMetadataLineageEvent(Arrays.asList(src1, src2), Arrays.asList(dest), JobStatus.SUCCEEDED);
_processor.processEvent(mle);
}
@ -71,28 +80,26 @@ public class MetadataLineageProcessorTest {
// JobStatus RUNNING invalid MLE event
@Test
public void testLineageInvalid() {
when(_mockConfig.hasPath("whitelist.mle")).thenReturn(false);
_processor = new MetadataLineageProcessor(_mockConfig, _mockDaoFactory, TOPIC, _mockProducer);
DatasetIdentifier src1 = newDatasetIdentifier("oracle", "foo", DataOrigin.DEV);
DatasetIdentifier src2 = newDatasetIdentifier("oracle", "bar", DataOrigin.DEV);
DatasetIdentifier dest = newDatasetIdentifier("oracle", "foo", DataOrigin.DEV);
DatasetIdentifier src1 = makeDataset("foo");
DatasetIdentifier src2 = makeDataset("bar");
DatasetIdentifier dest = makeDataset("foo");
MetadataLineageEvent mle =
newMetadataLineageEvent("foo", ImmutableList.of(src1, src2), ImmutableList.of(dest), JobStatus.RUNNING);
newMetadataLineageEvent(Arrays.asList(src1, src2), Arrays.asList(dest), JobStatus.RUNNING);
try {
_processor.processEvent(mle);
} catch(Exception ex) {
} catch (Exception ex) {
assertTrue(ex instanceof UnsupportedOperationException);
}
}
private MetadataLineageEvent newMetadataLineageEvent(String actorUrn, List<DatasetIdentifier> sourceDatasets,
private MetadataLineageEvent newMetadataLineageEvent(List<DatasetIdentifier> sourceDatasets,
List<DatasetIdentifier> destinationDataset, JobStatus jobStatus) {
MetadataLineageEvent mle = new MetadataLineageEvent();
mle.type = agent.UNUSED;
mle.changeAuditStamp = new ChangeAuditStamp();
mle.changeAuditStamp.actorUrn = actorUrn;
mle.changeAuditStamp.actorUrn = "tester";
mle.jobExecution = new JobExecution();
mle.jobExecution.status = jobStatus;
@ -103,12 +110,4 @@ public class MetadataLineageProcessorTest {
return mle;
}
private DatasetIdentifier newDatasetIdentifier(String platform, String name, DataOrigin dataOrigin) {
DatasetIdentifier datasetIdentifier = new DatasetIdentifier();
datasetIdentifier.dataPlatformUrn = "urn:li:dataPlatform:" + platform;
datasetIdentifier.nativeName = name;
datasetIdentifier.dataOrigin = dataOrigin;
return datasetIdentifier;
}
}

View File

@ -34,6 +34,17 @@ import static org.testng.Assert.*;
public class ProcessorUtilTest {
@Test
public void testDedupeDatasets() {
DatasetIdentifier ds1 = makeDataset("test1");
DatasetIdentifier ds2 = makeDataset("test2");
DatasetIdentifier ds3 = makeDataset("test1");
List<DatasetIdentifier> datasets = Arrays.asList(ds1, ds2, ds3);
List<DatasetIdentifier> deduped = ProcessorUtil.dedupeDatasets(datasets);
assertEquals(deduped, Arrays.asList(ds1, ds2));
}
@Test
public void testListDiffWithExclusion() {
List<String> existing = new ArrayList<>(Arrays.asList("a", "b", "c"));
@ -90,7 +101,7 @@ public class ProcessorUtilTest {
assertEquals(mce.changeAuditStamp.actorUrn, actor);
}
private DatasetIdentifier makeDataset(String datasetName) {
public static DatasetIdentifier makeDataset(String datasetName) {
DatasetIdentifier identifier = new DatasetIdentifier();
identifier.nativeName = datasetName;
identifier.dataPlatformUrn = "urn:li:dataPlatform:hive";