Implement backfill API which uses SCSI

This commit is contained in:
Kerem Sahin 2020-08-12 20:46:54 -07:00 committed by John Plaisted
parent bc7a29802d
commit 64e5160365
6 changed files with 250 additions and 20 deletions

View File

@ -476,6 +476,46 @@ public class EbeanLocalDAOTest {
verifyNoMoreInteractions(_mockProducer);
}
@Test
public void testBackfillUsingSCSI() {
EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server);
dao.enableLocalSecondaryIndex(true);
List<Urn> urns = ImmutableList.of(makeFooUrn(1), makeFooUrn(2), makeFooUrn(3));
Map<Urn, Map<Class<? extends RecordTemplate>, RecordTemplate>> aspects = new HashMap<>();
urns.forEach(urn -> {
AspectFoo aspectFoo = new AspectFoo().setValue("foo");
AspectBar aspectBar = new AspectBar().setValue("bar");
aspects.put(urn, ImmutableMap.of(AspectFoo.class, aspectFoo, AspectBar.class, aspectBar));
addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, aspectFoo);
addMetadata(urn, AspectBar.class.getCanonicalName(), 0, aspectBar);
dao.processAndSaveUrnToLocalSecondaryIndex(urn);
});
// Backfill single aspect
Map<Urn, Optional<AspectFoo>> backfilledAspects1 = dao.backfill(AspectFoo.class, FooUrn.class, null, 3);
for (Urn urn: urns) {
RecordTemplate aspect = aspects.get(urn).get(AspectFoo.class);
assertEquals(backfilledAspects1.get(urn).get(), aspect);
verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, aspect, aspect);
}
clearInvocations(_mockProducer);
// Backfill set of aspects
Map<Urn, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> backfilledAspects2 =
dao.backfill(ImmutableSet.of(AspectFoo.class, AspectBar.class), FooUrn.class, null, 3);
for (Urn urn: urns) {
for (Class<? extends RecordTemplate> clazz: aspects.get(urn).keySet()) {
RecordTemplate aspect = aspects.get(urn).get(clazz);
assertEquals(backfilledAspects2.get(urn).get(clazz).get(), aspect);
verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, aspect, aspect);
}
}
verifyNoMoreInteractions(_mockProducer);
}
@Test
public void testListVersions() {
EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server);
@ -928,6 +968,32 @@ public class EbeanLocalDAOTest {
assertEquals(urns.getTotalCount(), 0);
}
@Test
void testListUrnsFromIndexForAnEntity() {
EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server);
dao.enableLocalSecondaryIndex(true);
FooUrn urn1 = makeFooUrn(1);
FooUrn urn2 = makeFooUrn(2);
FooUrn urn3 = makeFooUrn(3);
BarUrn urn4 = makeBarUrn(4);
dao.processAndSaveUrnToLocalSecondaryIndex(urn1);
dao.processAndSaveUrnToLocalSecondaryIndex(urn2);
dao.processAndSaveUrnToLocalSecondaryIndex(urn3);
dao.processAndSaveUrnToLocalSecondaryIndex(urn4);
// List foo urns
ListResult<Urn> urns = dao.listUrns(FooUrn.class, null, 2);
assertEquals(urns.getValues(), Arrays.asList(urn1, urn2));
assertEquals(urns.getTotalCount(), 3);
// List bar urns
urns = dao.listUrns(BarUrn.class, null, 1);
assertEquals(urns.getValues(), Arrays.asList(urn4));
assertEquals(urns.getTotalCount(), 1);
}
private void addMetadata(Urn urn, String aspectName, long version, RecordTemplate metadata) {
EbeanMetadataAspect aspect = new EbeanMetadataAspect();
aspect.setKey(new EbeanMetadataAspect.PrimaryKey(urn.toString(), aspectName, version));

View File

@ -20,9 +20,12 @@ import com.linkedin.metadata.dao.retention.TimeBasedRetention;
import com.linkedin.metadata.dao.retention.VersionBasedRetention;
import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.IndexCriterion;
import com.linkedin.metadata.query.IndexCriterionArray;
import com.linkedin.metadata.query.IndexFilter;
import java.time.Clock;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -383,6 +386,16 @@ public abstract class BaseLocalDAO<ASPECT_UNION extends UnionTemplate, URN exten
@Nonnull
public abstract ListResult<Urn> listUrns(@Nonnull IndexFilter indexFilter, @Nullable URN lastUrn, int pageSize);
/**
* Similar to {@link #listUrns(IndexFilter, URN, int)}. This is to get all urns with type URN
*/
@Nonnull
public ListResult<Urn> listUrns(@Nonnull Class<URN> urnClazz, @Nullable URN lastUrn, int pageSize) {
final IndexFilter indexFilter = new IndexFilter()
.setCriteria(new IndexCriterionArray(new IndexCriterion().setAspect(urnClazz.getCanonicalName())));
return listUrns(indexFilter, lastUrn, pageSize);
}
/**
* Runs the given lambda expression in a transaction with a limited number of retries.
*
@ -505,6 +518,27 @@ public abstract class BaseLocalDAO<ASPECT_UNION extends UnionTemplate, URN exten
return urnToAspects;
}
/**
* Similar to {@link #backfill(Class, Set)} but fetches the set of URNs to backfill using local secondary index
*/
@Nonnull
public <ASPECT extends RecordTemplate> Map<URN, Optional<ASPECT>> backfill(
@Nonnull Class<ASPECT> aspectClass, @Nonnull Class<URN> urnClazz, @Nullable URN lastUrn, int pageSize) {
final ListResult<Urn> urnList = listUrns(urnClazz, lastUrn, pageSize);
return backfill(aspectClass, new HashSet(urnList.getValues()));
}
/**
* Similar to {@link #backfill(Set, Set)} but fetches the set of URNs to backfill using local secondary index
*/
@Nonnull
public Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> backfill(
@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses, @Nonnull Class<URN> urnClazz,
@Nullable URN lastUrn, int pageSize) {
final ListResult<Urn> urnList = listUrns(urnClazz, lastUrn, pageSize);
return backfill(aspectClasses, new HashSet(urnList.getValues()));
}
/**
* Emits backfill MAE for an aspect of an entity and also backfills local secondary index if writes & backfill enabled
*

View File

@ -3,6 +3,7 @@ package com.linkedin.metadata.restli;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringArray;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.AspectKey;
import com.linkedin.metadata.dao.BaseLocalDAO;
@ -24,9 +25,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
@ -209,10 +212,12 @@ public abstract class BaseEntityResource<
/**
* An action method for emitting MAE backfill messages for an entity.
*
* @deprecated Use {@link #backfill(String[], String[])} instead
*/
@Action(name = ACTION_BACKFILL)
@Action(name = ACTION_BACKFILL_LEGACY)
@Nonnull
public Task<String[]> backfill(@ActionParam(PARAM_URN) @Nonnull String urnString,
public Task<BackfillResult> backfill(@ActionParam(PARAM_URN) @Nonnull String urnString,
@ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) {
return RestliUtils.toTask(() -> {
@ -222,25 +227,66 @@ public abstract class BaseEntityResource<
.filter(optionalAspect -> optionalAspect.isPresent())
.map(optionalAspect -> ModelUtils.getAspectName(optionalAspect.get().getClass()))
.collect(Collectors.toList());
return backfilledAspects.toArray(new String[0]);
return new BackfillResult().setEntities(new BackfillResultEntityArray(Collections.singleton(
new BackfillResultEntity().setUrn(urn).setAspects(new StringArray(backfilledAspects))
)));
});
}
/**
* An action method for emitting MAE backfill messages for a set of entities.
*/
@Action(name = ACTION_BATCH_BACKFILL)
@Action(name = ACTION_BACKFILL_WITH_URNS)
@Nonnull
public Task<Void> batchBackfill(@ActionParam(PARAM_URNS) @Nonnull String[] urns,
@ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) {
public Task<BackfillResult> backfill(@ActionParam(PARAM_URNS) @Nonnull String[] urns,
@ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) {
return RestliUtils.toTask(() -> {
final Set<URN> urnSet = Arrays.stream(urns).map(urnString -> parseUrnParam(urnString)).collect(Collectors.toSet());
getLocalDAO().backfill(parseAspectsParam(aspectNames), urnSet);
return null;
return buildBackfillResult(getLocalDAO().backfill(parseAspectsParam(aspectNames), urnSet));
});
}
/**
* An action method for emitting MAE backfill messages for a set of entities using SCSI.
*/
@Action(name = ACTION_BACKFILL)
@Nonnull
public Task<BackfillResult> backfill(@ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames,
@ActionParam(PARAM_URN) @Optional @Nullable String lastUrn,
@ActionParam(PARAM_LIMIT) int limit) {
return RestliUtils.toTask(() ->
buildBackfillResult(getLocalDAO().backfill(parseAspectsParam(aspectNames),
_urnClass,
parseUrnParam(lastUrn),
limit)));
}
@Nonnull
private BackfillResult buildBackfillResult(@Nonnull Map<URN, Map<Class<? extends RecordTemplate>,
java.util.Optional<? extends RecordTemplate>>> backfilledAspects) {
final Set<URN> urns = new TreeSet<>(Comparator.comparing(Urn::toString));
urns.addAll(backfilledAspects.keySet());
return new BackfillResult().setEntities(new BackfillResultEntityArray(
urns.stream().map(urn -> buildBackfillResultEntity(urn, backfilledAspects.get(urn)))
.collect(Collectors.toList())));
}
@Nonnull
private BackfillResultEntity buildBackfillResultEntity(@Nonnull URN urn, Map<Class<? extends RecordTemplate>,
java.util.Optional<? extends RecordTemplate>> aspectMap) {
return new BackfillResultEntity()
.setUrn(urn)
.setAspects(new StringArray(aspectMap.entrySet().stream()
.filter(aspect -> aspect.getValue().isPresent())
.map(aspect -> aspect.getKey().getCanonicalName())
.collect(Collectors.toList()))
);
}
/**
* For strongly consistent local secondary index, this provides {@link IndexFilter} which uses FQCN of the entity urn to filter
* on the aspect field of the index table. This serves the purpose of returning urns that are of given entity type from index table.
@ -272,7 +318,7 @@ public abstract class BaseEntityResource<
return RestliUtils.toTask(() ->
getLocalDAO()
.listUrns(filter, lastUrn == null ? null : parseUrnParam(lastUrn), limit)
.listUrns(filter, parseUrnParam(lastUrn), limit)
.getValues()
.stream()
.map(Urn::toString)
@ -341,8 +387,12 @@ public abstract class BaseEntityResource<
return ModelUtils.newSnapshot(_snapshotClass, urn, aspects);
}
@Nonnull
private URN parseUrnParam(@Nonnull String urnString) {
@Nullable
private URN parseUrnParam(@Nullable String urnString) {
if (urnString == null) {
return null;
}
try {
return createUrnFromString(urnString);
} catch (Exception e) {

View File

@ -7,7 +7,8 @@ public final class RestliConstants {
public static final String ACTION_AUTOCOMPLETE = "autocomplete";
public static final String ACTION_BACKFILL = "backfill";
public static final String ACTION_BATCH_BACKFILL = "batchBackfill";
public static final String ACTION_BACKFILL_WITH_URNS = "backfillWithUrns";
public static final String ACTION_BACKFILL_LEGACY = "backfillLegacy";
public static final String ACTION_BROWSE = "browse";
public static final String ACTION_GET_BROWSE_PATHS = "getBrowsePaths";
public static final String ACTION_GET_SNAPSHOT = "getSnapshot";

View File

@ -0,0 +1,23 @@
namespace com.linkedin.metadata.restli
import com.linkedin.common.Urn
/**
* The model for the result of a backfill
*/
record BackfillResult {
/**
* List of backfilled entities
*/
entities: array[record BackfillResultEntity {
/**
* Urn of the backfilled entity
*/
urn: Urn
/**
* List of the aspects backfilled for the entity
*/
aspects: array[string]
}]
}

View File

@ -315,9 +315,14 @@ public class BaseEntityResourceTest extends BaseEngineTest {
when(_mockLocalDAO.backfill(AspectFoo.class, urn)).thenReturn(Optional.of(foo));
String[] aspectNames = new String[]{ModelUtils.getAspectName(AspectFoo.class)};
String[] backfilledAspects = runAndWait(_resource.backfill(urn.toString(), aspectNames));
BackfillResult backfillResult = runAndWait(_resource.backfill(urn.toString(), aspectNames));
assertEquals(ImmutableSet.copyOf(backfilledAspects), ImmutableSet.of(ModelUtils.getAspectName(AspectFoo.class)));
assertEquals(backfillResult.getEntities().size(), 1);
BackfillResultEntity backfillResultEntity = backfillResult.getEntities().get(0);
assertEquals(backfillResultEntity.getUrn(), urn);
assertEquals(backfillResultEntity.getAspects().size(), 1);
assertEquals(backfillResultEntity.getAspects().get(0), ModelUtils.getAspectName(AspectFoo.class));
}
@Test
@ -328,9 +333,14 @@ public class BaseEntityResourceTest extends BaseEngineTest {
when(_mockLocalDAO.backfill(AspectFoo.class, urn)).thenReturn(Optional.of(foo));
when(_mockLocalDAO.backfill(AspectBar.class, urn)).thenReturn(Optional.of(bar));
String[] backfilledAspects = runAndWait(_resource.backfill(urn.toString(), null));
BackfillResult backfillResult = runAndWait(_resource.backfill(urn.toString(), null));
assertEquals(ImmutableSet.copyOf(backfilledAspects),
assertEquals(backfillResult.getEntities().size(), 1);
BackfillResultEntity backfillResultEntity = backfillResult.getEntities().get(0);
assertEquals(backfillResultEntity.getUrn(), urn);
assertEquals(backfillResultEntity.getAspects().size(), 2);
assertEquals(ImmutableSet.copyOf(backfillResultEntity.getAspects()),
ImmutableSet.of(ModelUtils.getAspectName(AspectFoo.class), ModelUtils.getAspectName(AspectBar.class)));
}
@ -350,12 +360,58 @@ public class BaseEntityResourceTest extends BaseEngineTest {
public void testBatchBackfill() {
Urn urn1 = makeUrn(1);
Urn urn2 = makeUrn(2);
AspectFoo foo1 = new AspectFoo().setValue("foo1");
AspectBar bar1 = new AspectBar().setValue("bar1");
AspectBar bar2 = new AspectBar().setValue("bar2");
String[] aspects = new String[] {"com.linkedin.testing.AspectFoo", "com.linkedin.testing.AspectBar"};
when(_mockLocalDAO.backfill(_resource.parseAspectsParam(aspects), ImmutableSet.of(urn1, urn2)))
.thenReturn(ImmutableMap.of(urn1, ImmutableMap.of(AspectFoo.class, Optional.of(foo1), AspectBar.class, Optional.of(bar1)),
urn2, ImmutableMap.of(AspectBar.class, Optional.of(bar2))));
runAndWait(_resource.batchBackfill(new String[]{urn1.toString(), urn2.toString()},
new String[] {"com.linkedin.testing.AspectFoo", "com.linkedin.testing.AspectBar"}));
BackfillResult backfillResult = runAndWait(_resource.backfill(new String[]{urn1.toString(), urn2.toString()}, aspects));
assertEquals(backfillResult.getEntities().size(), 2);
verify(_mockLocalDAO, times(1))
.backfill(ImmutableSet.of(AspectFoo.class, AspectBar.class), ImmutableSet.of(urn1, urn2));
// Test first entity
BackfillResultEntity backfillResultEntity = backfillResult.getEntities().get(0);
assertEquals(backfillResultEntity.getUrn(), urn1);
assertEquals(backfillResultEntity.getAspects().size(), 2);
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectFoo"));
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectBar"));
// Test second entity
backfillResultEntity = backfillResult.getEntities().get(1);
assertEquals(backfillResultEntity.getUrn(), urn2);
assertEquals(backfillResultEntity.getAspects().size(), 1);
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectBar"));
}
@Test
public void testBackfillUsingSCSI() {
Urn urn1 = makeUrn(1);
Urn urn2 = makeUrn(2);
AspectFoo foo1 = new AspectFoo().setValue("foo1");
AspectBar bar1 = new AspectBar().setValue("bar1");
AspectBar bar2 = new AspectBar().setValue("bar2");
String[] aspects = new String[] {"com.linkedin.testing.AspectFoo", "com.linkedin.testing.AspectBar"};
when(_mockLocalDAO.backfill(_resource.parseAspectsParam(aspects), Urn.class, null, 10))
.thenReturn(ImmutableMap.of(urn1, ImmutableMap.of(AspectFoo.class, Optional.of(foo1), AspectBar.class, Optional.of(bar1)),
urn2, ImmutableMap.of(AspectBar.class, Optional.of(bar2))));
BackfillResult backfillResult = runAndWait(_resource.backfill(aspects, null, 10));
assertEquals(backfillResult.getEntities().size(), 2);
// Test first entity
BackfillResultEntity backfillResultEntity = backfillResult.getEntities().get(0);
assertEquals(backfillResultEntity.getUrn(), urn1);
assertEquals(backfillResultEntity.getAspects().size(), 2);
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectFoo"));
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectBar"));
// Test second entity
backfillResultEntity = backfillResult.getEntities().get(1);
assertEquals(backfillResultEntity.getUrn(), urn2);
assertEquals(backfillResultEntity.getAspects().size(), 1);
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectBar"));
}
@Test