perf(ingest): changes to improve ingest performance a bit (#3837)

This commit is contained in:
Aseem Bansal 2022-01-06 21:37:16 +05:30 committed by GitHub
parent 34c27f076b
commit a56f0661d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 346 additions and 116 deletions

View File

@ -157,7 +157,7 @@ public class DataMigrationStep implements UpgradeStep {
browsePathsStamp.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR));
browsePathsStamp.setTime(System.currentTimeMillis());
_entityService.ingestAspect(urn, BROWSE_PATHS_ASPECT_NAME, browsePaths, browsePathsStamp);
_entityService.ingestAspect(urn, BROWSE_PATHS_ASPECT_NAME, browsePaths, browsePathsStamp, null);
urnsWithBrowsePath.add(urn);
} catch (URISyntaxException e) {

View File

@ -55,7 +55,7 @@ In the JVM dashboard, you can find detailed charts based on JVM metrics like CPU
dashboard, you can find charts to monitor each endpoint and the kafka topics. Using the example implementation, go
to http://localhost:3001 to find the grafana dashboards! (Username: admin, PW: admin)
To make it easy to track various metrics within the code base, we created MetricsUtil class. This util class creates a
To make it easy to track various metrics within the code base, we created MetricUtils class. This util class creates a
central metric registry, sets up the JMX reporter, and provides convenient functions for setting up counters and timers.
You can run the following to create a counter and increment.

View File

@ -57,6 +57,8 @@ dependencies {
test {
// https://docs.gradle.org/current/userguide/performance.html
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
testLogging.showStandardStreams = true
testLogging.exceptionFormat = 'full'
}
tasks.withType(Test) {

View File

@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.HashSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
@ -122,6 +123,9 @@ public abstract class EntityService {
@Nonnull final Set<Urn> urns,
@Nonnull final Set<String> aspectNames);
public abstract Map<String, RecordTemplate> getLatestAspectsForUrn(@Nonnull final Urn urn, @Nonnull final Set<String> aspectNames);
/**
* Retrieves an aspect having a specific {@link Urn}, name, & version.
*
@ -232,7 +236,46 @@ public abstract class EntityService {
@Nonnull
protected abstract UpdateAspectResult ingestAspectToLocalDB(@Nonnull final Urn urn, @Nonnull final String aspectName,
@Nonnull final Function<Optional<RecordTemplate>, RecordTemplate> updateLambda,
@Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata providedSystemMetadata);
@Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata systemMetadata);
/**
* Same as ingestAspectToLocalDB but for multiple aspects
*/
@Nonnull
protected abstract List<Pair<String, UpdateAspectResult>> ingestAspectsToLocalDB(@Nonnull final Urn urn,
@Nonnull List<Pair<String, RecordTemplate>> aspectRecordsToIngest,
@Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata providedSystemMetadata);
@Nonnull
private SystemMetadata generateSystemMetadataIfEmpty(SystemMetadata systemMetadata) {
if (systemMetadata == null) {
systemMetadata = new SystemMetadata();
systemMetadata.setRunId(DEFAULT_RUN_ID);
systemMetadata.setLastObserved(System.currentTimeMillis());
}
return systemMetadata;
}
private void validateUrn(@Nonnull final Urn urn) {
if (!urn.toString().trim().equals(urn.toString())) {
throw new IllegalArgumentException("Error: cannot provide an URN with leading or trailing whitespace");
}
}
public void ingestAspects(@Nonnull final Urn urn, @Nonnull List<Pair<String, RecordTemplate>> aspectRecordsToIngest,
@Nonnull final AuditStamp auditStamp, SystemMetadata systemMetadata) {
validateUrn(urn);
systemMetadata = generateSystemMetadataIfEmpty(systemMetadata);
Timer.Context ingestToLocalDBTimer = MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time();
List<Pair<String, UpdateAspectResult>> ingestResults = ingestAspectsToLocalDB(urn, aspectRecordsToIngest, auditStamp, systemMetadata);
ingestToLocalDBTimer.stop();
for (Pair<String, UpdateAspectResult> result: ingestResults) {
sendEventForUpdateAspectResult(urn, result.getFirst(), result.getSecond());
}
}
/**
* Ingests (inserts) a new version of an entity aspect & emits a {@link com.linkedin.mxe.MetadataAuditEvent}.
@ -252,14 +295,19 @@ public abstract class EntityService {
log.debug("Invoked ingestAspect with urn: {}, aspectName: {}, newValue: {}", urn, aspectName, newValue);
if (!urn.toString().trim().equals(urn.toString())) {
throw new IllegalArgumentException("Error: cannot provide an URN with leading or trailing whitespace");
}
validateUrn(urn);
systemMetadata = generateSystemMetadataIfEmpty(systemMetadata);
Timer.Context ingestToLocalDBTimer = MetricUtils.timer(this.getClass(), "ingestAspectToLocalDB").time();
UpdateAspectResult result = ingestAspectToLocalDB(urn, aspectName, ignored -> newValue, auditStamp, systemMetadata);
ingestToLocalDBTimer.stop();
return sendEventForUpdateAspectResult(urn, aspectName, result);
}
private RecordTemplate sendEventForUpdateAspectResult(@Nonnull final Urn urn, @Nonnull final String aspectName,
@Nonnull UpdateAspectResult result) {
final RecordTemplate oldValue = result.getOldValue();
final RecordTemplate updatedValue = result.getNewValue();
final SystemMetadata oldSystemMetadata = result.getOldSystemMetadata();
@ -268,7 +316,7 @@ public abstract class EntityService {
// Apply retention policies asynchronously if there was an update to existing aspect value
if (oldValue != updatedValue && oldValue != null && retentionService != null) {
retentionService.applyRetention(urn, aspectName,
Optional.of(new RetentionService.RetentionContext(Optional.of(result.maxVersion))));
Optional.of(new RetentionService.RetentionContext(Optional.of(result.maxVersion))));
}
// Produce MAE after a successful update
@ -292,23 +340,12 @@ public abstract class EntityService {
result.getNewSystemMetadata(), MetadataAuditOperation.UPDATE);
produceMAETimer.stop();
} else {
log.debug(
String.format("Skipped producing MetadataAuditEvent for ingested aspect %s, urn %s. Aspect has not changed.",
aspectName, urn));
log.debug("Skipped producing MetadataAuditEvent for ingested aspect {}, urn {}. Aspect has not changed.",
aspectName, urn);
}
return updatedValue;
}
public RecordTemplate ingestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName,
@Nonnull final RecordTemplate newValue, @Nonnull final AuditStamp auditStamp) {
SystemMetadata generatedSystemMetadata = new SystemMetadata();
generatedSystemMetadata.setLastObserved(System.currentTimeMillis());
return ingestAspect(urn, aspectName, newValue, auditStamp, generatedSystemMetadata);
}
public IngestProposalResult ingestProposal(@Nonnull MetadataChangeProposal metadataChangeProposal,
AuditStamp auditStamp) {
@ -343,17 +380,12 @@ public abstract class EntityService {
ValidationUtils.validateOrThrow(aspect);
} catch (ModelConversionException e) {
throw new RuntimeException(
String.format("Could not deserialize {} for aspect {}", metadataChangeProposal.getAspect().getValue(),
String.format("Could not deserialize %s for aspect %s", metadataChangeProposal.getAspect().getValue(),
metadataChangeProposal.getAspectName()));
}
log.debug("aspect = {}", aspect);
SystemMetadata systemMetadata = metadataChangeProposal.getSystemMetadata();
if (systemMetadata == null) {
systemMetadata = new SystemMetadata();
systemMetadata.setRunId(DEFAULT_RUN_ID);
systemMetadata.setLastObserved(System.currentTimeMillis());
}
SystemMetadata systemMetadata = generateSystemMetadataIfEmpty(metadataChangeProposal.getSystemMetadata());
systemMetadata.setRegistryName(aspectSpec.getRegistryName());
systemMetadata.setRegistryVersion(aspectSpec.getRegistryVersion().toString());
@ -380,8 +412,7 @@ public abstract class EntityService {
}
if (oldAspect != newAspect || getAlwaysEmitAuditEvent()) {
log.debug(String.format("Producing MetadataChangeLog for ingested aspect %s, urn %s",
metadataChangeProposal.getAspectName(), entityUrn));
log.debug("Producing MetadataChangeLog for ingested aspect {}, urn {}", metadataChangeProposal.getAspectName(), entityUrn);
final MetadataChangeLog metadataChangeLog = new MetadataChangeLog(metadataChangeProposal.data());
if (oldAspect != null) {
@ -397,13 +428,13 @@ public abstract class EntityService {
metadataChangeLog.setSystemMetadata(newSystemMetadata);
}
log.debug(String.format("Serialized MCL event: %s", metadataChangeLog));
log.debug("Serialized MCL event: {}", metadataChangeLog);
// Since only timeseries aspects are ingested as of now, simply produce mae event for it
produceMetadataChangeLog(entityUrn, aspectSpec, metadataChangeLog);
} else {
log.debug(
String.format("Skipped producing MetadataAuditEvent for ingested aspect %s, urn %s. Aspect has not changed.",
metadataChangeProposal.getAspectName(), entityUrn));
"Skipped producing MetadataChangeLog for ingested aspect {}, urn {}. Aspect has not changed.",
metadataChangeProposal.getAspectName(), entityUrn);
}
return new IngestProposalResult(entityUrn, oldAspect != newAspect);
@ -454,7 +485,7 @@ public abstract class EntityService {
* @return a map of {@link Urn} to {@link Entity} object
*/
public Map<Urn, Entity> getEntities(@Nonnull final Set<Urn> urns, @Nonnull Set<String> aspectNames) {
log.debug(String.format("Invoked getEntities with urns %s, aspects %s", urns, aspectNames));
log.debug("Invoked getEntities with urns {}, aspects {}", urns, aspectNames);
if (urns.isEmpty()) {
return Collections.emptyMap();
}
@ -532,13 +563,13 @@ public abstract class EntityService {
}
public RecordTemplate getLatestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName) {
log.debug(String.format("Invoked getLatestAspect with urn %s, aspect %s", urn, aspectName));
log.debug("Invoked getLatestAspect with urn {}, aspect {}", urn, aspectName);
return getAspect(urn, aspectName, ASPECT_LATEST_VERSION);
}
public void ingestEntities(@Nonnull final List<Entity> entities, @Nonnull final AuditStamp auditStamp,
@Nonnull final List<SystemMetadata> systemMetadata) {
log.debug(String.format("Invoked ingestEntities with entities %s, audit stamp %s", entities, auditStamp));
log.debug("Invoked ingestEntities with entities {}, audit stamp {}", entities, auditStamp);
Streams.zip(entities.stream(), systemMetadata.stream(), (a, b) -> new Pair<Entity, SystemMetadata>(a, b))
.forEach(pair -> ingestEntity(pair.getFirst(), auditStamp, pair.getSecond()));
}
@ -553,8 +584,7 @@ public abstract class EntityService {
public void ingestEntity(@Nonnull Entity entity, @Nonnull AuditStamp auditStamp,
@Nonnull SystemMetadata systemMetadata) {
log.debug(String.format("Invoked ingestEntity with entity %s, audit stamp %s systemMetadata %s", entity, auditStamp,
systemMetadata.toString()));
log.debug("Invoked ingestEntity with entity {}, audit stamp {} systemMetadata {}", entity, auditStamp, systemMetadata.toString());
ingestSnapshotUnion(entity.getValue(), auditStamp, systemMetadata);
}
@ -585,20 +615,44 @@ public abstract class EntityService {
.collect(Collectors.toList())));
}
/**
Returns true if entityType should have some aspect as per its definition
but aspects given does not have that aspect
*/
private boolean isAspectProvided(String entityType, String aspectName, Set<String> aspects) {
return _entityRegistry.getEntitySpec(entityType).getAspectSpecMap().containsKey(aspectName)
&& !aspects.contains(aspectName);
}
public List<Pair<String, RecordTemplate>> generateDefaultAspectsIfMissing(@Nonnull final Urn urn,
Set<String> includedAspects) {
Set<String> aspectsToGet = new HashSet<>();
String entityType = urnToEntityName(urn);
boolean shouldCheckBrowsePath = isAspectProvided(entityType, BROWSE_PATHS, includedAspects);
if (shouldCheckBrowsePath) {
aspectsToGet.add(BROWSE_PATHS);
}
boolean shouldCheckDataPlatform = isAspectProvided(entityType, DATA_PLATFORM_INSTANCE, includedAspects);
if (shouldCheckDataPlatform) {
aspectsToGet.add(DATA_PLATFORM_INSTANCE);
}
List<Pair<String, RecordTemplate>> aspects = new ArrayList<>();
final String keyAspectName = getKeyAspectName(urn);
RecordTemplate keyAspect = getLatestAspect(urn, keyAspectName);
aspectsToGet.add(keyAspectName);
Map<String, RecordTemplate> latestAspects = getLatestAspectsForUrn(urn, aspectsToGet);
RecordTemplate keyAspect = latestAspects.get(keyAspectName);
if (keyAspect == null) {
keyAspect = buildKeyAspect(urn);
aspects.add(Pair.of(keyAspectName, keyAspect));
}
String entityType = urnToEntityName(urn);
if (_entityRegistry.getEntitySpec(entityType).getAspectSpecMap().containsKey(BROWSE_PATHS)
&& getLatestAspect(urn, BROWSE_PATHS) == null && !includedAspects.contains(BROWSE_PATHS)) {
if (shouldCheckBrowsePath && latestAspects.get(BROWSE_PATHS) == null) {
try {
BrowsePaths generatedBrowsePath = BrowsePathUtils.buildBrowsePath(urn, getEntityRegistry());
if (generatedBrowsePath != null) {
@ -609,8 +663,7 @@ public abstract class EntityService {
}
}
if (_entityRegistry.getEntitySpec(entityType).getAspectSpecMap().containsKey(DATA_PLATFORM_INSTANCE)
&& getLatestAspect(urn, DATA_PLATFORM_INSTANCE) == null && !includedAspects.contains(DATA_PLATFORM_INSTANCE)) {
if (shouldCheckDataPlatform && latestAspects.get(DATA_PLATFORM_INSTANCE) == null) {
DataPlatformInstanceUtils.buildDataPlatformInstance(entityType, keyAspect)
.ifPresent(aspect -> aspects.add(Pair.of(DATA_PLATFORM_INSTANCE, aspect)));
}
@ -629,9 +682,7 @@ public abstract class EntityService {
aspectRecordsToIngest.addAll(generateDefaultAspectsIfMissing(urn,
aspectRecordsToIngest.stream().map(pair -> pair.getFirst()).collect(Collectors.toSet())));
aspectRecordsToIngest.forEach(aspectNamePair -> {
ingestAspect(urn, aspectNamePair.getFirst(), aspectNamePair.getSecond(), auditStamp, systemMetadata);
});
ingestAspects(urn, aspectRecordsToIngest, auditStamp, systemMetadata);
}
public Snapshot buildSnapshot(@Nonnull final Urn urn, @Nonnull final RecordTemplate aspectValue) {
@ -707,7 +758,7 @@ public abstract class EntityService {
try {
return Urn.createFromString(urnStr);
} catch (URISyntaxException e) {
log.error(String.format("Failed to convert urn string %s into Urn object", urnStr));
log.error("Failed to convert urn string {} into Urn object", urnStr);
throw new ModelConversionException(String.format("Failed to convert urn string %s into Urn object ", urnStr), e);
}
}

View File

@ -12,6 +12,8 @@ import com.linkedin.metadata.query.ListResultMetadata;
import com.linkedin.metadata.search.utils.QueryUtils;
import io.ebean.DuplicateKeyException;
import io.ebean.EbeanServer;
import io.ebean.ExpressionList;
import io.ebean.Junction;
import io.ebean.PagedList;
import io.ebean.Query;
import io.ebean.RawSql;
@ -101,7 +103,8 @@ public class EbeanAspectDao {
@Nonnull final String newActor,
@Nullable final String newImpersonator,
@Nonnull final Timestamp newTime,
@Nullable final String newSystemMetadata
@Nullable final String newSystemMetadata,
final Long nextVersion
) {
validateConnection();
if (!_canWrite) {
@ -110,7 +113,7 @@ public class EbeanAspectDao {
// Save oldValue as the largest version + 1
long largestVersion = 0;
if (oldAspectMetadata != null && oldTime != null) {
largestVersion = getNextVersion(urn, aspectName);
largestVersion = nextVersion;
saveAspect(urn, aspectName, oldAspectMetadata, oldActor, oldImpersonator, oldTime, oldSystemMetadata, largestVersion, true);
}
@ -441,6 +444,7 @@ public class EbeanAspectDao {
T result = null;
do {
try (Transaction transaction = _server.beginTransaction(TxIsolation.REPEATABLE_READ)) {
transaction.setBatchMode(true);
result = block.get();
transaction.commit();
lastException = null;
@ -457,7 +461,7 @@ public class EbeanAspectDao {
return result;
}
private long getNextVersion(@Nonnull final String urn, @Nonnull final String aspectName) {
public long getNextVersion(@Nonnull final String urn, @Nonnull final String aspectName) {
validateConnection();
final List<EbeanAspectV2.PrimaryKey> result = _server.find(EbeanAspectV2.class)
.where()
@ -471,6 +475,40 @@ public class EbeanAspectDao {
return result.isEmpty() ? 0 : result.get(0).getVersion() + 1L;
}
public Map<String, Long> getNextVersions(@Nonnull final String urn, @Nonnull final Set<String> aspectNames) {
Map<String, Long> result = new HashMap<>();
Junction<EbeanAspectV2> queryJunction = _server.find(EbeanAspectV2.class)
.select("aspect, max(version)")
.where()
.eq("urn", urn)
.or();
ExpressionList<EbeanAspectV2> exp = null;
for (String aspectName: aspectNames) {
if (exp == null) {
exp = queryJunction.eq("aspect", aspectName);
} else {
exp = exp.eq("aspect", aspectName);
}
}
if (exp == null) {
return result;
}
List<EbeanAspectV2.PrimaryKey> dbResults = exp.endOr().findIds();
for (EbeanAspectV2.PrimaryKey key: dbResults) {
result.put(key.getAspect(), key.getVersion());
}
for (String aspectName: aspectNames) {
long nextVal = 0L;
if (result.containsKey(aspectName)) {
nextVal = result.get(aspectName) + 1L;
}
result.put(aspectName, nextVal);
}
return result;
}
@Nonnull
private <T> ListResult<T> toListResult(
@Nonnull final List<T> values,

View File

@ -30,8 +30,10 @@ import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -73,12 +75,24 @@ public class EbeanEntityService extends EntityService {
_entityDao = entityDao;
}
@Override
@Nonnull Map<String, EbeanAspectV2> getLatestEbeanAspectForUrn(@Nonnull final Urn urn,
@Nonnull final Set<String> aspectNames) {
Set<Urn> urns = new HashSet<>();
urns.add(urn);
Map<String, EbeanAspectV2> result = new HashMap<>();
getLatestAspectEbeans(urns, aspectNames).forEach((key, aspectEntry) -> {
final String aspectName = key.getAspect();
result.put(aspectName, aspectEntry);
});
return result;
}
@Nonnull
public Map<Urn, List<RecordTemplate>> getLatestAspects(@Nonnull final Set<Urn> urns,
private Map<EbeanAspectV2.PrimaryKey, EbeanAspectV2> getLatestAspectEbeans(@Nonnull final Set<Urn> urns,
@Nonnull final Set<String> aspectNames) {
log.debug(String.format("Invoked getLatestAspects with urns: %s, aspectNames: %s", urns, aspectNames));
log.debug("Invoked getLatestAspects with urns: {}, aspectNames: {}", urns, aspectNames);
// Create DB keys
final Set<EbeanAspectV2.PrimaryKey> dbKeys = urns.stream().map(urn -> {
@ -88,6 +102,33 @@ public class EbeanEntityService extends EntityService {
.collect(Collectors.toList());
}).flatMap(List::stream).collect(Collectors.toSet());
Map<EbeanAspectV2.PrimaryKey, EbeanAspectV2> batchGetResults = new HashMap<>();
Iterators.partition(dbKeys.iterator(), 500)
.forEachRemaining(batch -> batchGetResults.putAll(_entityDao.batchGet(ImmutableSet.copyOf(batch))));
return batchGetResults;
}
@Override
@Nonnull
public Map<String, RecordTemplate> getLatestAspectsForUrn(@Nonnull final Urn urn, @Nonnull final Set<String> aspectNames) {
Map<EbeanAspectV2.PrimaryKey, EbeanAspectV2> batchGetResults = getLatestAspectEbeans(new HashSet<>(Arrays.asList(urn)), aspectNames);
final Map<String, RecordTemplate> result = new HashMap<>();
batchGetResults.forEach((key, aspectEntry) -> {
final String aspectName = key.getAspect();
final RecordTemplate aspectRecord = toAspectRecord(urn, aspectName, aspectEntry.getMetadata(), getEntityRegistry());
result.put(aspectName, aspectRecord);
});
return result;
}
@Override
@Nonnull
public Map<Urn, List<RecordTemplate>> getLatestAspects(@Nonnull final Set<Urn> urns,
@Nonnull final Set<String> aspectNames) {
Map<EbeanAspectV2.PrimaryKey, EbeanAspectV2> batchGetResults = getLatestAspectEbeans(urns, aspectNames);
// Fetch from db and populate urn -> aspect map.
final Map<Urn, List<RecordTemplate>> urnToAspects = new HashMap<>();
@ -102,10 +143,6 @@ public class EbeanEntityService extends EntityService {
urnToAspects.get(key).add(keyAspect);
});
Map<EbeanAspectV2.PrimaryKey, EbeanAspectV2> batchGetResults = new HashMap<>();
Iterators.partition(dbKeys.iterator(), 500)
.forEachRemaining(batch -> batchGetResults.putAll(_entityDao.batchGet(ImmutableSet.copyOf(batch))));
batchGetResults.forEach((key, aspectEntry) -> {
final Urn urn = toUrn(key.getUrn());
final String aspectName = key.getAspect();
@ -139,7 +176,7 @@ public class EbeanEntityService extends EntityService {
@Nullable
public RecordTemplate getAspect(@Nonnull final Urn urn, @Nonnull final String aspectName, @Nonnull long version) {
log.debug(String.format("Invoked getAspect with urn: %s, aspectName: %s, version: %s", urn, aspectName, version));
log.debug("Invoked getAspect with urn: {}, aspectName: {}, version: {}", urn, aspectName, version);
version = calculateVersionNumber(urn, aspectName, version);
final EbeanAspectV2.PrimaryKey primaryKey = new EbeanAspectV2.PrimaryKey(urn.toString(), aspectName, version);
@ -218,8 +255,7 @@ public class EbeanEntityService extends EntityService {
@Override
public VersionedAspect getVersionedAspect(@Nonnull Urn urn, @Nonnull String aspectName, long version) {
log.debug(String.format("Invoked getVersionedAspect with urn: %s, aspectName: %s, version: %s", urn, aspectName,
version));
log.debug("Invoked getVersionedAspect with urn: {}, aspectName: {}, version: {}", urn, aspectName, version);
VersionedAspect result = new VersionedAspect();
@ -249,9 +285,8 @@ public class EbeanEntityService extends EntityService {
public ListResult<RecordTemplate> listLatestAspects(@Nonnull final String entityName,
@Nonnull final String aspectName, final int start, final int count) {
log.debug(
String.format("Invoked listLatestAspects with entityName: %s, aspectName: %s, start: %s, count: %s", entityName,
aspectName, start, count));
log.debug("Invoked listLatestAspects with entityName: {}, aspectName: {}, start: {}, count: {}", entityName,
aspectName, start, count);
final ListResult<String> aspectMetadataList =
_entityDao.listLatestAspectMetadata(entityName, aspectName, start, count);
@ -274,54 +309,92 @@ public class EbeanEntityService extends EntityService {
@Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata providedSystemMetadata) {
return _entityDao.runInTransactionWithRetry(() -> {
final String urnStr = urn.toString();
final EbeanAspectV2 latest = _entityDao.getLatestAspect(urnStr, aspectName);
long nextVersion = _entityDao.getNextVersion(urnStr, aspectName);
// 1. Fetch the latest existing version of the aspect.
final EbeanAspectV2 latest = _entityDao.getLatestAspect(urn.toString(), aspectName);
final EbeanAspectV2 keyAspect = _entityDao.getLatestAspect(urn.toString(), getKeyAspectName(urn));
// 2. Compare the latest existing and new.
final RecordTemplate oldValue =
latest == null ? null : toAspectRecord(urn, aspectName, latest.getMetadata(), getEntityRegistry());
final RecordTemplate newValue = updateLambda.apply(Optional.ofNullable(oldValue));
// 3. If there is no difference between existing and new, we just update
// the lastObserved in system metadata. RunId should stay as the original runId
if (oldValue != null && DataTemplateUtil.areEqual(oldValue, newValue)) {
SystemMetadata latestSystemMetadata = EbeanUtils.parseSystemMetadata(latest.getSystemMetadata());
latestSystemMetadata.setLastObserved(providedSystemMetadata.getLastObserved());
latest.setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata));
_entityDao.saveAspect(latest, false);
return new UpdateAspectResult(urn, oldValue, oldValue,
EbeanUtils.parseSystemMetadata(latest.getSystemMetadata()), latestSystemMetadata,
MetadataAuditOperation.UPDATE, 0);
}
// 4. Save the newValue as the latest version
log.debug(String.format("Ingesting aspect with name %s, urn %s", aspectName, urn));
long versionOfOld = _entityDao.saveLatestAspect(urn.toString(), aspectName, latest == null ? null : toJsonAspect(oldValue),
latest == null ? null : latest.getCreatedBy(), latest == null ? null : latest.getCreatedFor(),
latest == null ? null : latest.getCreatedOn(), latest == null ? null : latest.getSystemMetadata(),
toJsonAspect(newValue), auditStamp.getActor().toString(),
auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null,
new Timestamp(auditStamp.getTime()), toJsonAspect(providedSystemMetadata));
return new UpdateAspectResult(urn, oldValue, newValue,
latest == null ? null : EbeanUtils.parseSystemMetadata(latest.getSystemMetadata()), providedSystemMetadata,
MetadataAuditOperation.UPDATE, versionOfOld);
return ingestAspectToLocalDBNoTransaction(urn, aspectName, updateLambda, auditStamp, providedSystemMetadata, latest, nextVersion);
}, DEFAULT_MAX_TRANSACTION_RETRY);
}
@Override
@Nonnull
protected List<Pair<String, UpdateAspectResult>> ingestAspectsToLocalDB(@Nonnull final Urn urn,
@Nonnull List<Pair<String, RecordTemplate>> aspectRecordsToIngest,
@Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata systemMetadata) {
return _entityDao.runInTransactionWithRetry(() -> {
final Set<String> aspectNames = aspectRecordsToIngest
.stream()
.map(Pair::getFirst)
.collect(Collectors.toSet());
Map<String, EbeanAspectV2> latestAspects = getLatestEbeanAspectForUrn(urn, aspectNames);
Map<String, Long> nextVersions = _entityDao.getNextVersions(urn.toString(), aspectNames);
List<Pair<String, UpdateAspectResult>> result = new ArrayList<>();
for (Pair<String, RecordTemplate> aspectRecord: aspectRecordsToIngest) {
String aspectName = aspectRecord.getFirst();
RecordTemplate newValue = aspectRecord.getSecond();
EbeanAspectV2 latest = latestAspects.get(aspectName);
long nextVersion = nextVersions.get(aspectName);
UpdateAspectResult updateResult = ingestAspectToLocalDBNoTransaction(urn, aspectName, ignored -> newValue, auditStamp, systemMetadata,
latest, nextVersion);
result.add(new Pair<>(aspectName, updateResult));
}
return result;
}, DEFAULT_MAX_TRANSACTION_RETRY);
}
@Nonnull
private UpdateAspectResult ingestAspectToLocalDBNoTransaction(@Nonnull final Urn urn,
@Nonnull final String aspectName, @Nonnull final Function<Optional<RecordTemplate>, RecordTemplate> updateLambda,
@Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata providedSystemMetadata, @Nullable final EbeanAspectV2 latest,
@Nonnull final Long nextVersion) {
// 2. Compare the latest existing and new.
final RecordTemplate oldValue =
latest == null ? null : toAspectRecord(urn, aspectName, latest.getMetadata(), getEntityRegistry());
final RecordTemplate newValue = updateLambda.apply(Optional.ofNullable(oldValue));
// 3. If there is no difference between existing and new, we just update
// the lastObserved in system metadata. RunId should stay as the original runId
if (oldValue != null && DataTemplateUtil.areEqual(oldValue, newValue)) {
SystemMetadata latestSystemMetadata = EbeanUtils.parseSystemMetadata(latest.getSystemMetadata());
latestSystemMetadata.setLastObserved(providedSystemMetadata.getLastObserved());
latest.setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata));
_entityDao.saveAspect(latest, false);
return new UpdateAspectResult(urn, oldValue, oldValue,
EbeanUtils.parseSystemMetadata(latest.getSystemMetadata()), latestSystemMetadata,
MetadataAuditOperation.UPDATE, 0);
}
// 4. Save the newValue as the latest version
log.debug("Ingesting aspect with name {}, urn {}", aspectName, urn);
long versionOfOld = _entityDao.saveLatestAspect(urn.toString(), aspectName, latest == null ? null : toJsonAspect(oldValue),
latest == null ? null : latest.getCreatedBy(), latest == null ? null : latest.getCreatedFor(),
latest == null ? null : latest.getCreatedOn(), latest == null ? null : latest.getSystemMetadata(),
toJsonAspect(newValue), auditStamp.getActor().toString(),
auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null,
new Timestamp(auditStamp.getTime()), toJsonAspect(providedSystemMetadata), nextVersion);
return new UpdateAspectResult(urn, oldValue, newValue,
latest == null ? null : EbeanUtils.parseSystemMetadata(latest.getSystemMetadata()), providedSystemMetadata,
MetadataAuditOperation.UPDATE, versionOfOld);
}
@Override
@Nonnull
public RecordTemplate updateAspect(@Nonnull final Urn urn, @Nonnull final String entityName,
@Nonnull final String aspectName, @Nonnull final AspectSpec aspectSpec, @Nonnull final RecordTemplate newValue,
@Nonnull final AuditStamp auditStamp, @Nonnull final long version, @Nonnull final boolean emitMae) {
log.debug(
String.format("Invoked updateAspect with urn: %s, aspectName: %s, newValue: %s, version: %s, emitMae: %s", urn,
aspectName, newValue, version, emitMae));
"Invoked updateAspect with urn: {}, aspectName: {}, newValue: {}, version: {}, emitMae: {}", urn,
aspectName, newValue, version, emitMae);
return updateAspect(urn, entityName, aspectName, aspectSpec, newValue, auditStamp, version, emitMae,
DEFAULT_MAX_TRANSACTION_RETRY);
}
@ -345,7 +418,7 @@ public class EbeanEntityService extends EntityService {
oldAspect == null ? new SystemMetadata() : EbeanUtils.parseSystemMetadata(oldAspect.getSystemMetadata());
newSystemMetadata.setLastObserved(System.currentTimeMillis());
log.debug(String.format("Updating aspect with name %s, urn %s", aspectName, urn));
log.debug("Updating aspect with name {}, urn {}", aspectName, urn);
_entityDao.saveAspect(urn.toString(), aspectName, toJsonAspect(value), auditStamp.getActor().toString(),
auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null,
new Timestamp(auditStamp.getTime()), toJsonAspect(newSystemMetadata), version, oldAspect == null);
@ -358,12 +431,12 @@ public class EbeanEntityService extends EntityService {
final RecordTemplate newValue = result.getNewValue();
if (emitMae) {
log.debug(String.format("Producing MetadataAuditEvent for updated aspect %s, urn %s", aspectName, urn));
log.debug("Producing MetadataAuditEvent for updated aspect {}, urn {}", aspectName, urn);
produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, oldValue, newValue,
result.getOldSystemMetadata(), result.getNewSystemMetadata(), ChangeType.UPSERT);
} else {
log.debug(String.format("Skipped producing MetadataAuditEvent for updated aspect %s, urn %s. emitMAE is false.",
aspectName, urn));
log.debug("Skipped producing MetadataAuditEvent for updated aspect {}, urn {}. emitMAE is false.",
aspectName, urn);
}
return newValue;
@ -588,7 +661,7 @@ public class EbeanEntityService extends EntityService {
@Override
@Nonnull
public ListUrnsResult listUrns(@Nonnull final String entityName, final int start, final int count) {
log.debug(String.format("Invoked listUrns with entityName: %s, start: %s, count: %s", entityName, start, count));
log.debug("Invoked listUrns with entityName: {}, start: {}, count: {}", entityName, start, count);
// If a keyAspect exists, the entity exists.
final String keyAspectName = getEntityRegistry().getEntitySpec(entityName).getKeyAspectSpec().getName();

View File

@ -1,5 +1,7 @@
package com.linkedin.metadata.entity;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
@ -11,6 +13,7 @@ import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.JacksonDataTemplateCodec;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.DatasetProfile;
import com.linkedin.entity.Entity;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.events.metadata.ChangeType;
@ -19,6 +22,7 @@ import com.linkedin.metadata.aspect.Aspect;
import com.linkedin.metadata.aspect.CorpUserAspect;
import com.linkedin.metadata.aspect.CorpUserAspectArray;
import com.linkedin.metadata.aspect.VersionedAspect;
import com.datahub.util.RecordUtils;
import com.linkedin.metadata.entity.ebean.EbeanAspectDao;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.EbeanEntityService;
@ -45,22 +49,36 @@ import com.linkedin.mxe.SystemMetadata;
import com.linkedin.retention.DataHubRetentionConfig;
import com.linkedin.retention.Retention;
import com.linkedin.retention.VersionBasedRetention;
import com.linkedin.util.Pair;
import io.ebean.EbeanServer;
import io.ebean.EbeanServerFactory;
import io.ebean.config.ServerConfig;
import io.ebean.datasource.DataSourceConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
public class EbeanEntityServiceTest {
@ -216,7 +234,7 @@ public class EbeanEntityServiceTest {
ImmutableList.of(metadata1, metadata2));
// 2. Retrieve Entities
Map<Urn, com.linkedin.entity.Entity> readEntities =
Map<Urn, Entity> readEntities =
_entityService.getEntities(ImmutableSet.of(entityUrn1, entityUrn2), Collections.emptySet());
// 3. Compare Entity Objects
@ -338,6 +356,42 @@ public class EbeanEntityServiceTest {
verifyNoMoreInteractions(_mockProducer);
}
@Test
public void testIngestAspectsGetLatestAspects() throws Exception {
Urn entityUrn = Urn.createFromString("urn:li:corpuser:test");
List<Pair<String, RecordTemplate>> pairToIngest = new ArrayList<>();
Status writeAspect1 = new Status().setRemoved(false);
String aspectName1 = getAspectName(writeAspect1);
pairToIngest.add(getAspectRecordPair(writeAspect1, Status.class));
CorpUserInfo writeAspect2 = createCorpUserInfo("email@test.com");
String aspectName2 = getAspectName(writeAspect2);
pairToIngest.add(getAspectRecordPair(writeAspect2, CorpUserInfo.class));
SystemMetadata metadata1 = new SystemMetadata();
metadata1.setLastObserved(1625792689);
metadata1.setRunId("run-123");
_entityService.ingestAspects(entityUrn, pairToIngest, TEST_AUDIT_STAMP, metadata1);
Map<String, RecordTemplate> latestAspects = _entityService.getLatestAspectsForUrn(
entityUrn,
new HashSet<>(Arrays.asList(aspectName1, aspectName2))
);
assertTrue(DataTemplateUtil.areEqual(writeAspect1, latestAspects.get(aspectName1)));
assertTrue(DataTemplateUtil.areEqual(writeAspect2, latestAspects.get(aspectName2)));
verify(_mockProducer, times(2)).produceMetadataChangeLog(Mockito.eq(entityUrn),
Mockito.any(), Mockito.any());
verify(_mockProducer, times(2)).produceMetadataAuditEvent(Mockito.eq(entityUrn),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
verifyNoMoreInteractions(_mockProducer);
}
@Test
public void testIngestGetLatestAspect() throws Exception {
Urn entityUrn = Urn.createFromString("urn:li:corpuser:test");
@ -925,4 +979,16 @@ public class EbeanEntityServiceTest {
corpUserInfo.setActive(true);
return corpUserInfo;
}
private String getAspectName(RecordTemplate record) {
return PegasusUtils.getAspectNameFromSchema(record.schema());
}
private <T extends RecordTemplate> Pair<String, RecordTemplate> getAspectRecordPair(T aspect, Class<T> clazz)
throws Exception {
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
RecordTemplate recordTemplate = RecordUtils.toRecordTemplate(clazz, objectMapper.writeValueAsString(aspect));
return new Pair<>(getAspectName(aspect), recordTemplate);
}
}

View File

@ -85,7 +85,7 @@ public class IngestDataPlatformInstancesStep implements BootstrapStep {
final AuditStamp aspectAuditStamp =
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
_entityService.ingestAspect(urn, PLATFORM_INSTANCE_ASPECT_NAME, dataPlatformInstance.get(), aspectAuditStamp);
_entityService.ingestAspect(urn, PLATFORM_INSTANCE_ASPECT_NAME, dataPlatformInstance.get(), aspectAuditStamp, null);
}
log.info("Finished ingesting DataPlaformInstance for urn {} to {}", start, start + BATCH_SIZE);
start += BATCH_SIZE;

View File

@ -59,7 +59,7 @@ public class IngestDataPlatformsStep implements BootstrapStep {
final AuditStamp aspectAuditStamp =
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
_entityService.ingestAspect(urn, PLATFORM_ASPECT_NAME, info, aspectAuditStamp);
_entityService.ingestAspect(urn, PLATFORM_ASPECT_NAME, info, aspectAuditStamp, null);
}
}
}

View File

@ -57,6 +57,6 @@ public class IngestRootUserStep implements BootstrapStep {
RecordUtils.toRecordTemplate(CorpUserInfo.class, userObj.get("info").toString());
final AuditStamp aspectAuditStamp =
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
_entityService.ingestAspect(urn, USER_INFO_ASPECT_NAME, info, aspectAuditStamp);
_entityService.ingestAspect(urn, USER_INFO_ASPECT_NAME, info, aspectAuditStamp, null);
}
}