MIE consumer to use platform specific listName API (#1214)

This commit is contained in:
Yi (Alan) Wang 2018-06-18 15:41:11 -07:00 committed by GitHub
parent d5dfb38a84
commit 2c1e1fa240
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 20 additions and 6 deletions

View File

@ -108,12 +108,14 @@ public class DatasetViewDao extends BaseViewDao {
}
@Nonnull
public List<String> listSegments(@Nonnull String platform, @Nullable String origin, @Nonnull String prefix) throws Exception {
public List<String> listSegments(@Nonnull String platform, @Nullable String origin, @Nonnull String prefix)
throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Nonnull
public List<String> listFullNames(@Nonnull String platform, @Nullable String origin, @Nonnull String prefix) throws Exception {
public List<String> listFullNames(@Nonnull String platform, @Nullable String origin, @Nonnull String cluster,
@Nonnull String prefix) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}

View File

@ -88,6 +88,7 @@ public class MetadataInventoryProcessor extends KafkaMessageProcessor {
final String platformUrn = event.dataPlatformUrn.toString();
final String platform = getUrnEntity(platformUrn);
final DataOrigin origin = event.dataOrigin;
final String cluster = event.deployment.cluster.toString(); // if null cluster, throw exception here
final String namespace = event.namespace.toString();
log.info("Processing MIE for " + platform + " " + origin + " " + namespace);
@ -98,7 +99,7 @@ public class MetadataInventoryProcessor extends KafkaMessageProcessor {
final List<String> names = event.nativeNames.stream().map(CharSequence::toString).collect(Collectors.toList());
log.debug("new datasets: " + names);
final List<String> existingDatasets = _datasetViewDao.listFullNames(platform, origin.name(), namespace);
final List<String> existingDatasets = _datasetViewDao.listFullNames(platform, origin.name(), cluster, namespace);
log.debug("existing datasets: " + existingDatasets);
// find removed datasets by diff
@ -109,7 +110,7 @@ public class MetadataInventoryProcessor extends KafkaMessageProcessor {
identifier.dataOrigin = origin;
identifier.nativeName = datasetName;
return mceDelete(identifier, actorUrn);
return mceDelete(identifier, event.deployment, actorUrn);
}).collect(Collectors.toList());
}

View File

@ -16,9 +16,11 @@ package wherehows.utils;
import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.DeploymentDetail;
import com.linkedin.events.metadata.MetadataChangeEvent;
import com.typesafe.config.Config;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -79,7 +81,8 @@ public class ProcessorUtil {
/**
* Create MCE to DELETE the dataset
*/
public static MetadataChangeEvent mceDelete(@Nonnull DatasetIdentifier dataset, String actor) {
public static MetadataChangeEvent mceDelete(@Nonnull DatasetIdentifier dataset, @Nonnull DeploymentDetail deployment,
String actor) {
MetadataChangeEvent mce = new MetadataChangeEvent();
mce.datasetIdentifier = dataset;
@ -89,6 +92,8 @@ public class ProcessorUtil {
auditStamp.type = ChangeType.DELETE;
mce.changeAuditStamp = auditStamp;
mce.deploymentInfo = Collections.singletonList(deployment);
return mce;
}
}

View File

@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableSet;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.events.metadata.DataOrigin;
import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.DeploymentDetail;
import com.linkedin.events.metadata.MetadataChangeEvent;
import com.typesafe.config.Config;
import java.util.ArrayList;
@ -94,9 +95,14 @@ public class ProcessorUtilTest {
public void testMceDelete() {
String actor = "tester";
DatasetIdentifier dataset = makeDataset("test");
MetadataChangeEvent mce = ProcessorUtil.mceDelete(dataset, actor);
DeploymentDetail deployment = new DeploymentDetail();
deployment.cluster = "foo";
MetadataChangeEvent mce = ProcessorUtil.mceDelete(dataset, deployment, actor);
assertEquals(mce.datasetIdentifier, dataset);
assertEquals(mce.deploymentInfo, Collections.singletonList(deployment));
assertEquals(mce.changeAuditStamp.type, ChangeType.DELETE);
assertEquals(mce.changeAuditStamp.actorUrn, actor);
}