From a56f0661d5dfc31d1fc9393df94aa0ff2ee199ec Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Thu, 6 Jan 2022 21:37:16 +0530 Subject: [PATCH] perf(ingest): changes to improve ingest performance a bit (#3837) --- .../upgrade/nocode/DataMigrationStep.java | 2 +- docs/advanced/monitoring.md | 2 +- metadata-io/build.gradle | 2 + .../metadata/entity/EntityService.java | 141 ++++++++----- .../metadata/entity/ebean/EbeanAspectDao.java | 44 ++++- .../entity/ebean/EbeanEntityService.java | 185 ++++++++++++------ .../entity/EbeanEntityServiceTest.java | 80 +++++++- .../IngestDataPlatformInstancesStep.java | 2 +- .../boot/steps/IngestDataPlatformsStep.java | 2 +- .../boot/steps/IngestRootUserStep.java | 2 +- 10 files changed, 346 insertions(+), 116 deletions(-) diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java index 7b03ebd9d5..eb5ad3e307 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java @@ -157,7 +157,7 @@ public class DataMigrationStep implements UpgradeStep { browsePathsStamp.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)); browsePathsStamp.setTime(System.currentTimeMillis()); - _entityService.ingestAspect(urn, BROWSE_PATHS_ASPECT_NAME, browsePaths, browsePathsStamp); + _entityService.ingestAspect(urn, BROWSE_PATHS_ASPECT_NAME, browsePaths, browsePathsStamp, null); urnsWithBrowsePath.add(urn); } catch (URISyntaxException e) { diff --git a/docs/advanced/monitoring.md b/docs/advanced/monitoring.md index 07b2a11ce6..afd55e47ab 100644 --- a/docs/advanced/monitoring.md +++ b/docs/advanced/monitoring.md @@ -55,7 +55,7 @@ In the JVM dashboard, you can find detailed charts based on JVM metrics like CPU dashboard, you can find charts to monitor each endpoint and the kafka topics. Using the example implementation, go to http://localhost:3001 to find the grafana dashboards! (Username: admin, PW: admin) -To make it easy to track various metrics within the code base, we created MetricsUtil class. This util class creates a +To make it easy to track various metrics within the code base, we created MetricUtils class. This util class creates a central metric registry, sets up the JMX reporter, and provides convenient functions for setting up counters and timers. You can run the following to create a counter and increment. diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 3ce4f09470..b760e19394 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -57,6 +57,8 @@ dependencies { test { // https://docs.gradle.org/current/userguide/performance.html maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 + testLogging.showStandardStreams = true + testLogging.exceptionFormat = 'full' } tasks.withType(Test) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java index b3f563e685..8406cd0469 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.HashSet; import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -122,6 +123,9 @@ public abstract class EntityService { @Nonnull final Set urns, @Nonnull final Set aspectNames); + + public abstract Map getLatestAspectsForUrn(@Nonnull final Urn urn, @Nonnull final Set aspectNames); + /** * Retrieves an aspect having a specific {@link Urn}, name, & version. * @@ -232,7 +236,46 @@ public abstract class EntityService { @Nonnull protected abstract UpdateAspectResult ingestAspectToLocalDB(@Nonnull final Urn urn, @Nonnull final String aspectName, @Nonnull final Function, RecordTemplate> updateLambda, - @Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata providedSystemMetadata); + @Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata systemMetadata); + + /** + * Same as ingestAspectToLocalDB but for multiple aspects + */ + @Nonnull + protected abstract List> ingestAspectsToLocalDB(@Nonnull final Urn urn, + @Nonnull List> aspectRecordsToIngest, + @Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata providedSystemMetadata); + + @Nonnull + private SystemMetadata generateSystemMetadataIfEmpty(SystemMetadata systemMetadata) { + if (systemMetadata == null) { + systemMetadata = new SystemMetadata(); + systemMetadata.setRunId(DEFAULT_RUN_ID); + systemMetadata.setLastObserved(System.currentTimeMillis()); + } + return systemMetadata; + } + + private void validateUrn(@Nonnull final Urn urn) { + if (!urn.toString().trim().equals(urn.toString())) { + throw new IllegalArgumentException("Error: cannot provide an URN with leading or trailing whitespace"); + } + } + + public void ingestAspects(@Nonnull final Urn urn, @Nonnull List> aspectRecordsToIngest, + @Nonnull final AuditStamp auditStamp, SystemMetadata systemMetadata) { + + validateUrn(urn); + systemMetadata = generateSystemMetadataIfEmpty(systemMetadata); + + Timer.Context ingestToLocalDBTimer = MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time(); + List> ingestResults = ingestAspectsToLocalDB(urn, aspectRecordsToIngest, auditStamp, systemMetadata); + ingestToLocalDBTimer.stop(); + + for (Pair result: ingestResults) { + sendEventForUpdateAspectResult(urn, result.getFirst(), result.getSecond()); + } + } /** * Ingests (inserts) a new version of an entity aspect & emits a {@link com.linkedin.mxe.MetadataAuditEvent}. @@ -252,14 +295,19 @@ public abstract class EntityService { log.debug("Invoked ingestAspect with urn: {}, aspectName: {}, newValue: {}", urn, aspectName, newValue); - if (!urn.toString().trim().equals(urn.toString())) { - throw new IllegalArgumentException("Error: cannot provide an URN with leading or trailing whitespace"); - } + validateUrn(urn); + systemMetadata = generateSystemMetadataIfEmpty(systemMetadata); Timer.Context ingestToLocalDBTimer = MetricUtils.timer(this.getClass(), "ingestAspectToLocalDB").time(); UpdateAspectResult result = ingestAspectToLocalDB(urn, aspectName, ignored -> newValue, auditStamp, systemMetadata); ingestToLocalDBTimer.stop(); + return sendEventForUpdateAspectResult(urn, aspectName, result); + } + + private RecordTemplate sendEventForUpdateAspectResult(@Nonnull final Urn urn, @Nonnull final String aspectName, + @Nonnull UpdateAspectResult result) { + final RecordTemplate oldValue = result.getOldValue(); final RecordTemplate updatedValue = result.getNewValue(); final SystemMetadata oldSystemMetadata = result.getOldSystemMetadata(); @@ -268,7 +316,7 @@ public abstract class EntityService { // Apply retention policies asynchronously if there was an update to existing aspect value if (oldValue != updatedValue && oldValue != null && retentionService != null) { retentionService.applyRetention(urn, aspectName, - Optional.of(new RetentionService.RetentionContext(Optional.of(result.maxVersion)))); + Optional.of(new RetentionService.RetentionContext(Optional.of(result.maxVersion)))); } // Produce MAE after a successful update @@ -292,23 +340,12 @@ public abstract class EntityService { result.getNewSystemMetadata(), MetadataAuditOperation.UPDATE); produceMAETimer.stop(); } else { - log.debug( - String.format("Skipped producing MetadataAuditEvent for ingested aspect %s, urn %s. Aspect has not changed.", - aspectName, urn)); + log.debug("Skipped producing MetadataAuditEvent for ingested aspect {}, urn {}. Aspect has not changed.", + aspectName, urn); } - return updatedValue; } - public RecordTemplate ingestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName, - @Nonnull final RecordTemplate newValue, @Nonnull final AuditStamp auditStamp) { - - SystemMetadata generatedSystemMetadata = new SystemMetadata(); - generatedSystemMetadata.setLastObserved(System.currentTimeMillis()); - - return ingestAspect(urn, aspectName, newValue, auditStamp, generatedSystemMetadata); - } - public IngestProposalResult ingestProposal(@Nonnull MetadataChangeProposal metadataChangeProposal, AuditStamp auditStamp) { @@ -343,17 +380,12 @@ public abstract class EntityService { ValidationUtils.validateOrThrow(aspect); } catch (ModelConversionException e) { throw new RuntimeException( - String.format("Could not deserialize {} for aspect {}", metadataChangeProposal.getAspect().getValue(), + String.format("Could not deserialize %s for aspect %s", metadataChangeProposal.getAspect().getValue(), metadataChangeProposal.getAspectName())); } log.debug("aspect = {}", aspect); - SystemMetadata systemMetadata = metadataChangeProposal.getSystemMetadata(); - if (systemMetadata == null) { - systemMetadata = new SystemMetadata(); - systemMetadata.setRunId(DEFAULT_RUN_ID); - systemMetadata.setLastObserved(System.currentTimeMillis()); - } + SystemMetadata systemMetadata = generateSystemMetadataIfEmpty(metadataChangeProposal.getSystemMetadata()); systemMetadata.setRegistryName(aspectSpec.getRegistryName()); systemMetadata.setRegistryVersion(aspectSpec.getRegistryVersion().toString()); @@ -380,8 +412,7 @@ public abstract class EntityService { } if (oldAspect != newAspect || getAlwaysEmitAuditEvent()) { - log.debug(String.format("Producing MetadataChangeLog for ingested aspect %s, urn %s", - metadataChangeProposal.getAspectName(), entityUrn)); + log.debug("Producing MetadataChangeLog for ingested aspect {}, urn {}", metadataChangeProposal.getAspectName(), entityUrn); final MetadataChangeLog metadataChangeLog = new MetadataChangeLog(metadataChangeProposal.data()); if (oldAspect != null) { @@ -397,13 +428,13 @@ public abstract class EntityService { metadataChangeLog.setSystemMetadata(newSystemMetadata); } - log.debug(String.format("Serialized MCL event: %s", metadataChangeLog)); + log.debug("Serialized MCL event: {}", metadataChangeLog); // Since only timeseries aspects are ingested as of now, simply produce mae event for it produceMetadataChangeLog(entityUrn, aspectSpec, metadataChangeLog); } else { log.debug( - String.format("Skipped producing MetadataAuditEvent for ingested aspect %s, urn %s. Aspect has not changed.", - metadataChangeProposal.getAspectName(), entityUrn)); + "Skipped producing MetadataChangeLog for ingested aspect {}, urn {}. Aspect has not changed.", + metadataChangeProposal.getAspectName(), entityUrn); } return new IngestProposalResult(entityUrn, oldAspect != newAspect); @@ -454,7 +485,7 @@ public abstract class EntityService { * @return a map of {@link Urn} to {@link Entity} object */ public Map getEntities(@Nonnull final Set urns, @Nonnull Set aspectNames) { - log.debug(String.format("Invoked getEntities with urns %s, aspects %s", urns, aspectNames)); + log.debug("Invoked getEntities with urns {}, aspects {}", urns, aspectNames); if (urns.isEmpty()) { return Collections.emptyMap(); } @@ -532,13 +563,13 @@ public abstract class EntityService { } public RecordTemplate getLatestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName) { - log.debug(String.format("Invoked getLatestAspect with urn %s, aspect %s", urn, aspectName)); + log.debug("Invoked getLatestAspect with urn {}, aspect {}", urn, aspectName); return getAspect(urn, aspectName, ASPECT_LATEST_VERSION); } public void ingestEntities(@Nonnull final List entities, @Nonnull final AuditStamp auditStamp, @Nonnull final List systemMetadata) { - log.debug(String.format("Invoked ingestEntities with entities %s, audit stamp %s", entities, auditStamp)); + log.debug("Invoked ingestEntities with entities {}, audit stamp {}", entities, auditStamp); Streams.zip(entities.stream(), systemMetadata.stream(), (a, b) -> new Pair(a, b)) .forEach(pair -> ingestEntity(pair.getFirst(), auditStamp, pair.getSecond())); } @@ -553,8 +584,7 @@ public abstract class EntityService { public void ingestEntity(@Nonnull Entity entity, @Nonnull AuditStamp auditStamp, @Nonnull SystemMetadata systemMetadata) { - log.debug(String.format("Invoked ingestEntity with entity %s, audit stamp %s systemMetadata %s", entity, auditStamp, - systemMetadata.toString())); + log.debug("Invoked ingestEntity with entity {}, audit stamp {} systemMetadata {}", entity, auditStamp, systemMetadata.toString()); ingestSnapshotUnion(entity.getValue(), auditStamp, systemMetadata); } @@ -585,20 +615,44 @@ public abstract class EntityService { .collect(Collectors.toList()))); } + /** + Returns true if entityType should have some aspect as per its definition + but aspects given does not have that aspect + */ + private boolean isAspectProvided(String entityType, String aspectName, Set aspects) { + return _entityRegistry.getEntitySpec(entityType).getAspectSpecMap().containsKey(aspectName) + && !aspects.contains(aspectName); + } + public List> generateDefaultAspectsIfMissing(@Nonnull final Urn urn, Set includedAspects) { + Set aspectsToGet = new HashSet<>(); + String entityType = urnToEntityName(urn); + + boolean shouldCheckBrowsePath = isAspectProvided(entityType, BROWSE_PATHS, includedAspects); + if (shouldCheckBrowsePath) { + aspectsToGet.add(BROWSE_PATHS); + } + + boolean shouldCheckDataPlatform = isAspectProvided(entityType, DATA_PLATFORM_INSTANCE, includedAspects); + if (shouldCheckDataPlatform) { + aspectsToGet.add(DATA_PLATFORM_INSTANCE); + } + List> aspects = new ArrayList<>(); final String keyAspectName = getKeyAspectName(urn); - RecordTemplate keyAspect = getLatestAspect(urn, keyAspectName); + aspectsToGet.add(keyAspectName); + + Map latestAspects = getLatestAspectsForUrn(urn, aspectsToGet); + + RecordTemplate keyAspect = latestAspects.get(keyAspectName); if (keyAspect == null) { keyAspect = buildKeyAspect(urn); aspects.add(Pair.of(keyAspectName, keyAspect)); } - String entityType = urnToEntityName(urn); - if (_entityRegistry.getEntitySpec(entityType).getAspectSpecMap().containsKey(BROWSE_PATHS) - && getLatestAspect(urn, BROWSE_PATHS) == null && !includedAspects.contains(BROWSE_PATHS)) { + if (shouldCheckBrowsePath && latestAspects.get(BROWSE_PATHS) == null) { try { BrowsePaths generatedBrowsePath = BrowsePathUtils.buildBrowsePath(urn, getEntityRegistry()); if (generatedBrowsePath != null) { @@ -609,8 +663,7 @@ public abstract class EntityService { } } - if (_entityRegistry.getEntitySpec(entityType).getAspectSpecMap().containsKey(DATA_PLATFORM_INSTANCE) - && getLatestAspect(urn, DATA_PLATFORM_INSTANCE) == null && !includedAspects.contains(DATA_PLATFORM_INSTANCE)) { + if (shouldCheckDataPlatform && latestAspects.get(DATA_PLATFORM_INSTANCE) == null) { DataPlatformInstanceUtils.buildDataPlatformInstance(entityType, keyAspect) .ifPresent(aspect -> aspects.add(Pair.of(DATA_PLATFORM_INSTANCE, aspect))); } @@ -629,9 +682,7 @@ public abstract class EntityService { aspectRecordsToIngest.addAll(generateDefaultAspectsIfMissing(urn, aspectRecordsToIngest.stream().map(pair -> pair.getFirst()).collect(Collectors.toSet()))); - aspectRecordsToIngest.forEach(aspectNamePair -> { - ingestAspect(urn, aspectNamePair.getFirst(), aspectNamePair.getSecond(), auditStamp, systemMetadata); - }); + ingestAspects(urn, aspectRecordsToIngest, auditStamp, systemMetadata); } public Snapshot buildSnapshot(@Nonnull final Urn urn, @Nonnull final RecordTemplate aspectValue) { @@ -707,7 +758,7 @@ public abstract class EntityService { try { return Urn.createFromString(urnStr); } catch (URISyntaxException e) { - log.error(String.format("Failed to convert urn string %s into Urn object", urnStr)); + log.error("Failed to convert urn string {} into Urn object", urnStr); throw new ModelConversionException(String.format("Failed to convert urn string %s into Urn object ", urnStr), e); } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index 35e6d0ec49..6915504ec9 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -12,6 +12,8 @@ import com.linkedin.metadata.query.ListResultMetadata; import com.linkedin.metadata.search.utils.QueryUtils; import io.ebean.DuplicateKeyException; import io.ebean.EbeanServer; +import io.ebean.ExpressionList; +import io.ebean.Junction; import io.ebean.PagedList; import io.ebean.Query; import io.ebean.RawSql; @@ -101,7 +103,8 @@ public class EbeanAspectDao { @Nonnull final String newActor, @Nullable final String newImpersonator, @Nonnull final Timestamp newTime, - @Nullable final String newSystemMetadata + @Nullable final String newSystemMetadata, + final Long nextVersion ) { validateConnection(); if (!_canWrite) { @@ -110,7 +113,7 @@ public class EbeanAspectDao { // Save oldValue as the largest version + 1 long largestVersion = 0; if (oldAspectMetadata != null && oldTime != null) { - largestVersion = getNextVersion(urn, aspectName); + largestVersion = nextVersion; saveAspect(urn, aspectName, oldAspectMetadata, oldActor, oldImpersonator, oldTime, oldSystemMetadata, largestVersion, true); } @@ -441,6 +444,7 @@ public class EbeanAspectDao { T result = null; do { try (Transaction transaction = _server.beginTransaction(TxIsolation.REPEATABLE_READ)) { + transaction.setBatchMode(true); result = block.get(); transaction.commit(); lastException = null; @@ -457,7 +461,7 @@ public class EbeanAspectDao { return result; } - private long getNextVersion(@Nonnull final String urn, @Nonnull final String aspectName) { + public long getNextVersion(@Nonnull final String urn, @Nonnull final String aspectName) { validateConnection(); final List result = _server.find(EbeanAspectV2.class) .where() @@ -471,6 +475,40 @@ public class EbeanAspectDao { return result.isEmpty() ? 0 : result.get(0).getVersion() + 1L; } + public Map getNextVersions(@Nonnull final String urn, @Nonnull final Set aspectNames) { + Map result = new HashMap<>(); + Junction queryJunction = _server.find(EbeanAspectV2.class) + .select("aspect, max(version)") + .where() + .eq("urn", urn) + .or(); + + ExpressionList exp = null; + for (String aspectName: aspectNames) { + if (exp == null) { + exp = queryJunction.eq("aspect", aspectName); + } else { + exp = exp.eq("aspect", aspectName); + } + } + if (exp == null) { + return result; + } + List dbResults = exp.endOr().findIds(); + + for (EbeanAspectV2.PrimaryKey key: dbResults) { + result.put(key.getAspect(), key.getVersion()); + } + for (String aspectName: aspectNames) { + long nextVal = 0L; + if (result.containsKey(aspectName)) { + nextVal = result.get(aspectName) + 1L; + } + result.put(aspectName, nextVal); + } + return result; + } + @Nonnull private ListResult toListResult( @Nonnull final List values, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java index 12d22e0061..f248c6c6df 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java @@ -30,8 +30,10 @@ import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.PegasusUtils; import com.linkedin.mxe.MetadataAuditOperation; import com.linkedin.mxe.SystemMetadata; +import com.linkedin.util.Pair; import java.net.URISyntaxException; import java.sql.Timestamp; +import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -73,12 +75,24 @@ public class EbeanEntityService extends EntityService { _entityDao = entityDao; } - @Override + @Nonnull Map getLatestEbeanAspectForUrn(@Nonnull final Urn urn, + @Nonnull final Set aspectNames) { + Set urns = new HashSet<>(); + urns.add(urn); + + Map result = new HashMap<>(); + getLatestAspectEbeans(urns, aspectNames).forEach((key, aspectEntry) -> { + final String aspectName = key.getAspect(); + result.put(aspectName, aspectEntry); + }); + return result; + } + @Nonnull - public Map> getLatestAspects(@Nonnull final Set urns, + private Map getLatestAspectEbeans(@Nonnull final Set urns, @Nonnull final Set aspectNames) { - log.debug(String.format("Invoked getLatestAspects with urns: %s, aspectNames: %s", urns, aspectNames)); + log.debug("Invoked getLatestAspects with urns: {}, aspectNames: {}", urns, aspectNames); // Create DB keys final Set dbKeys = urns.stream().map(urn -> { @@ -88,6 +102,33 @@ public class EbeanEntityService extends EntityService { .collect(Collectors.toList()); }).flatMap(List::stream).collect(Collectors.toSet()); + Map batchGetResults = new HashMap<>(); + Iterators.partition(dbKeys.iterator(), 500) + .forEachRemaining(batch -> batchGetResults.putAll(_entityDao.batchGet(ImmutableSet.copyOf(batch)))); + return batchGetResults; + } + + @Override + @Nonnull + public Map getLatestAspectsForUrn(@Nonnull final Urn urn, @Nonnull final Set aspectNames) { + Map batchGetResults = getLatestAspectEbeans(new HashSet<>(Arrays.asList(urn)), aspectNames); + + final Map result = new HashMap<>(); + batchGetResults.forEach((key, aspectEntry) -> { + final String aspectName = key.getAspect(); + final RecordTemplate aspectRecord = toAspectRecord(urn, aspectName, aspectEntry.getMetadata(), getEntityRegistry()); + result.put(aspectName, aspectRecord); + }); + return result; + } + + @Override + @Nonnull + public Map> getLatestAspects(@Nonnull final Set urns, + @Nonnull final Set aspectNames) { + + Map batchGetResults = getLatestAspectEbeans(urns, aspectNames); + // Fetch from db and populate urn -> aspect map. final Map> urnToAspects = new HashMap<>(); @@ -102,10 +143,6 @@ public class EbeanEntityService extends EntityService { urnToAspects.get(key).add(keyAspect); }); - Map batchGetResults = new HashMap<>(); - Iterators.partition(dbKeys.iterator(), 500) - .forEachRemaining(batch -> batchGetResults.putAll(_entityDao.batchGet(ImmutableSet.copyOf(batch)))); - batchGetResults.forEach((key, aspectEntry) -> { final Urn urn = toUrn(key.getUrn()); final String aspectName = key.getAspect(); @@ -139,7 +176,7 @@ public class EbeanEntityService extends EntityService { @Nullable public RecordTemplate getAspect(@Nonnull final Urn urn, @Nonnull final String aspectName, @Nonnull long version) { - log.debug(String.format("Invoked getAspect with urn: %s, aspectName: %s, version: %s", urn, aspectName, version)); + log.debug("Invoked getAspect with urn: {}, aspectName: {}, version: {}", urn, aspectName, version); version = calculateVersionNumber(urn, aspectName, version); final EbeanAspectV2.PrimaryKey primaryKey = new EbeanAspectV2.PrimaryKey(urn.toString(), aspectName, version); @@ -218,8 +255,7 @@ public class EbeanEntityService extends EntityService { @Override public VersionedAspect getVersionedAspect(@Nonnull Urn urn, @Nonnull String aspectName, long version) { - log.debug(String.format("Invoked getVersionedAspect with urn: %s, aspectName: %s, version: %s", urn, aspectName, - version)); + log.debug("Invoked getVersionedAspect with urn: {}, aspectName: {}, version: {}", urn, aspectName, version); VersionedAspect result = new VersionedAspect(); @@ -249,9 +285,8 @@ public class EbeanEntityService extends EntityService { public ListResult listLatestAspects(@Nonnull final String entityName, @Nonnull final String aspectName, final int start, final int count) { - log.debug( - String.format("Invoked listLatestAspects with entityName: %s, aspectName: %s, start: %s, count: %s", entityName, - aspectName, start, count)); + log.debug("Invoked listLatestAspects with entityName: {}, aspectName: {}, start: {}, count: {}", entityName, + aspectName, start, count); final ListResult aspectMetadataList = _entityDao.listLatestAspectMetadata(entityName, aspectName, start, count); @@ -274,54 +309,92 @@ public class EbeanEntityService extends EntityService { @Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata providedSystemMetadata) { return _entityDao.runInTransactionWithRetry(() -> { + final String urnStr = urn.toString(); + final EbeanAspectV2 latest = _entityDao.getLatestAspect(urnStr, aspectName); + long nextVersion = _entityDao.getNextVersion(urnStr, aspectName); - // 1. Fetch the latest existing version of the aspect. - final EbeanAspectV2 latest = _entityDao.getLatestAspect(urn.toString(), aspectName); - final EbeanAspectV2 keyAspect = _entityDao.getLatestAspect(urn.toString(), getKeyAspectName(urn)); - - // 2. Compare the latest existing and new. - final RecordTemplate oldValue = - latest == null ? null : toAspectRecord(urn, aspectName, latest.getMetadata(), getEntityRegistry()); - final RecordTemplate newValue = updateLambda.apply(Optional.ofNullable(oldValue)); - - // 3. If there is no difference between existing and new, we just update - // the lastObserved in system metadata. RunId should stay as the original runId - if (oldValue != null && DataTemplateUtil.areEqual(oldValue, newValue)) { - SystemMetadata latestSystemMetadata = EbeanUtils.parseSystemMetadata(latest.getSystemMetadata()); - latestSystemMetadata.setLastObserved(providedSystemMetadata.getLastObserved()); - - latest.setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata)); - - _entityDao.saveAspect(latest, false); - - return new UpdateAspectResult(urn, oldValue, oldValue, - EbeanUtils.parseSystemMetadata(latest.getSystemMetadata()), latestSystemMetadata, - MetadataAuditOperation.UPDATE, 0); - } - - // 4. Save the newValue as the latest version - log.debug(String.format("Ingesting aspect with name %s, urn %s", aspectName, urn)); - long versionOfOld = _entityDao.saveLatestAspect(urn.toString(), aspectName, latest == null ? null : toJsonAspect(oldValue), - latest == null ? null : latest.getCreatedBy(), latest == null ? null : latest.getCreatedFor(), - latest == null ? null : latest.getCreatedOn(), latest == null ? null : latest.getSystemMetadata(), - toJsonAspect(newValue), auditStamp.getActor().toString(), - auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null, - new Timestamp(auditStamp.getTime()), toJsonAspect(providedSystemMetadata)); - - return new UpdateAspectResult(urn, oldValue, newValue, - latest == null ? null : EbeanUtils.parseSystemMetadata(latest.getSystemMetadata()), providedSystemMetadata, - MetadataAuditOperation.UPDATE, versionOfOld); + return ingestAspectToLocalDBNoTransaction(urn, aspectName, updateLambda, auditStamp, providedSystemMetadata, latest, nextVersion); }, DEFAULT_MAX_TRANSACTION_RETRY); } + @Override + @Nonnull + protected List> ingestAspectsToLocalDB(@Nonnull final Urn urn, + @Nonnull List> aspectRecordsToIngest, + @Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata systemMetadata) { + + return _entityDao.runInTransactionWithRetry(() -> { + + final Set aspectNames = aspectRecordsToIngest + .stream() + .map(Pair::getFirst) + .collect(Collectors.toSet()); + + Map latestAspects = getLatestEbeanAspectForUrn(urn, aspectNames); + Map nextVersions = _entityDao.getNextVersions(urn.toString(), aspectNames); + + List> result = new ArrayList<>(); + for (Pair aspectRecord: aspectRecordsToIngest) { + String aspectName = aspectRecord.getFirst(); + RecordTemplate newValue = aspectRecord.getSecond(); + EbeanAspectV2 latest = latestAspects.get(aspectName); + long nextVersion = nextVersions.get(aspectName); + UpdateAspectResult updateResult = ingestAspectToLocalDBNoTransaction(urn, aspectName, ignored -> newValue, auditStamp, systemMetadata, + latest, nextVersion); + result.add(new Pair<>(aspectName, updateResult)); + } + return result; + }, DEFAULT_MAX_TRANSACTION_RETRY); + } + + @Nonnull + private UpdateAspectResult ingestAspectToLocalDBNoTransaction(@Nonnull final Urn urn, + @Nonnull final String aspectName, @Nonnull final Function, RecordTemplate> updateLambda, + @Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata providedSystemMetadata, @Nullable final EbeanAspectV2 latest, + @Nonnull final Long nextVersion) { + + // 2. Compare the latest existing and new. + final RecordTemplate oldValue = + latest == null ? null : toAspectRecord(urn, aspectName, latest.getMetadata(), getEntityRegistry()); + final RecordTemplate newValue = updateLambda.apply(Optional.ofNullable(oldValue)); + + // 3. If there is no difference between existing and new, we just update + // the lastObserved in system metadata. RunId should stay as the original runId + if (oldValue != null && DataTemplateUtil.areEqual(oldValue, newValue)) { + SystemMetadata latestSystemMetadata = EbeanUtils.parseSystemMetadata(latest.getSystemMetadata()); + latestSystemMetadata.setLastObserved(providedSystemMetadata.getLastObserved()); + + latest.setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata)); + + _entityDao.saveAspect(latest, false); + + return new UpdateAspectResult(urn, oldValue, oldValue, + EbeanUtils.parseSystemMetadata(latest.getSystemMetadata()), latestSystemMetadata, + MetadataAuditOperation.UPDATE, 0); + } + + // 4. Save the newValue as the latest version + log.debug("Ingesting aspect with name {}, urn {}", aspectName, urn); + long versionOfOld = _entityDao.saveLatestAspect(urn.toString(), aspectName, latest == null ? null : toJsonAspect(oldValue), + latest == null ? null : latest.getCreatedBy(), latest == null ? null : latest.getCreatedFor(), + latest == null ? null : latest.getCreatedOn(), latest == null ? null : latest.getSystemMetadata(), + toJsonAspect(newValue), auditStamp.getActor().toString(), + auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null, + new Timestamp(auditStamp.getTime()), toJsonAspect(providedSystemMetadata), nextVersion); + + return new UpdateAspectResult(urn, oldValue, newValue, + latest == null ? null : EbeanUtils.parseSystemMetadata(latest.getSystemMetadata()), providedSystemMetadata, + MetadataAuditOperation.UPDATE, versionOfOld); + } + @Override @Nonnull public RecordTemplate updateAspect(@Nonnull final Urn urn, @Nonnull final String entityName, @Nonnull final String aspectName, @Nonnull final AspectSpec aspectSpec, @Nonnull final RecordTemplate newValue, @Nonnull final AuditStamp auditStamp, @Nonnull final long version, @Nonnull final boolean emitMae) { log.debug( - String.format("Invoked updateAspect with urn: %s, aspectName: %s, newValue: %s, version: %s, emitMae: %s", urn, - aspectName, newValue, version, emitMae)); + "Invoked updateAspect with urn: {}, aspectName: {}, newValue: {}, version: {}, emitMae: {}", urn, + aspectName, newValue, version, emitMae); return updateAspect(urn, entityName, aspectName, aspectSpec, newValue, auditStamp, version, emitMae, DEFAULT_MAX_TRANSACTION_RETRY); } @@ -345,7 +418,7 @@ public class EbeanEntityService extends EntityService { oldAspect == null ? new SystemMetadata() : EbeanUtils.parseSystemMetadata(oldAspect.getSystemMetadata()); newSystemMetadata.setLastObserved(System.currentTimeMillis()); - log.debug(String.format("Updating aspect with name %s, urn %s", aspectName, urn)); + log.debug("Updating aspect with name {}, urn {}", aspectName, urn); _entityDao.saveAspect(urn.toString(), aspectName, toJsonAspect(value), auditStamp.getActor().toString(), auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null, new Timestamp(auditStamp.getTime()), toJsonAspect(newSystemMetadata), version, oldAspect == null); @@ -358,12 +431,12 @@ public class EbeanEntityService extends EntityService { final RecordTemplate newValue = result.getNewValue(); if (emitMae) { - log.debug(String.format("Producing MetadataAuditEvent for updated aspect %s, urn %s", aspectName, urn)); + log.debug("Producing MetadataAuditEvent for updated aspect {}, urn {}", aspectName, urn); produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, oldValue, newValue, result.getOldSystemMetadata(), result.getNewSystemMetadata(), ChangeType.UPSERT); } else { - log.debug(String.format("Skipped producing MetadataAuditEvent for updated aspect %s, urn %s. emitMAE is false.", - aspectName, urn)); + log.debug("Skipped producing MetadataAuditEvent for updated aspect {}, urn {}. emitMAE is false.", + aspectName, urn); } return newValue; @@ -588,7 +661,7 @@ public class EbeanEntityService extends EntityService { @Override @Nonnull public ListUrnsResult listUrns(@Nonnull final String entityName, final int start, final int count) { - log.debug(String.format("Invoked listUrns with entityName: %s, start: %s, count: %s", entityName, start, count)); + log.debug("Invoked listUrns with entityName: {}, start: {}, count: {}", entityName, start, count); // If a keyAspect exists, the entity exists. final String keyAspectName = getEntityRegistry().getEntitySpec(entityName).getKeyAspectSpec().getName(); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index ab970c5653..ebc0d4e5fb 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -1,5 +1,7 @@ package com.linkedin.metadata.entity; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.linkedin.common.AuditStamp; @@ -11,6 +13,7 @@ import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.JacksonDataTemplateCodec; import com.linkedin.data.template.RecordTemplate; import com.linkedin.dataset.DatasetProfile; +import com.linkedin.entity.Entity; import com.linkedin.entity.EntityResponse; import com.linkedin.entity.EnvelopedAspect; import com.linkedin.events.metadata.ChangeType; @@ -19,6 +22,7 @@ import com.linkedin.metadata.aspect.Aspect; import com.linkedin.metadata.aspect.CorpUserAspect; import com.linkedin.metadata.aspect.CorpUserAspectArray; import com.linkedin.metadata.aspect.VersionedAspect; +import com.datahub.util.RecordUtils; import com.linkedin.metadata.entity.ebean.EbeanAspectDao; import com.linkedin.metadata.entity.ebean.EbeanAspectV2; import com.linkedin.metadata.entity.ebean.EbeanEntityService; @@ -45,22 +49,36 @@ import com.linkedin.mxe.SystemMetadata; import com.linkedin.retention.DataHubRetentionConfig; import com.linkedin.retention.Retention; import com.linkedin.retention.VersionBasedRetention; +import com.linkedin.util.Pair; import io.ebean.EbeanServer; import io.ebean.EbeanServerFactory; import io.ebean.config.ServerConfig; import io.ebean.datasource.DataSourceConfig; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; + + import javax.annotation.Nonnull; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import static org.mockito.Mockito.*; -import static org.testng.Assert.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; public class EbeanEntityServiceTest { @@ -216,7 +234,7 @@ public class EbeanEntityServiceTest { ImmutableList.of(metadata1, metadata2)); // 2. Retrieve Entities - Map readEntities = + Map readEntities = _entityService.getEntities(ImmutableSet.of(entityUrn1, entityUrn2), Collections.emptySet()); // 3. Compare Entity Objects @@ -338,6 +356,42 @@ public class EbeanEntityServiceTest { verifyNoMoreInteractions(_mockProducer); } + @Test + public void testIngestAspectsGetLatestAspects() throws Exception { + + Urn entityUrn = Urn.createFromString("urn:li:corpuser:test"); + + List> pairToIngest = new ArrayList<>(); + + Status writeAspect1 = new Status().setRemoved(false); + String aspectName1 = getAspectName(writeAspect1); + pairToIngest.add(getAspectRecordPair(writeAspect1, Status.class)); + + CorpUserInfo writeAspect2 = createCorpUserInfo("email@test.com"); + String aspectName2 = getAspectName(writeAspect2); + pairToIngest.add(getAspectRecordPair(writeAspect2, CorpUserInfo.class)); + + SystemMetadata metadata1 = new SystemMetadata(); + metadata1.setLastObserved(1625792689); + metadata1.setRunId("run-123"); + + _entityService.ingestAspects(entityUrn, pairToIngest, TEST_AUDIT_STAMP, metadata1); + + Map latestAspects = _entityService.getLatestAspectsForUrn( + entityUrn, + new HashSet<>(Arrays.asList(aspectName1, aspectName2)) + ); + assertTrue(DataTemplateUtil.areEqual(writeAspect1, latestAspects.get(aspectName1))); + assertTrue(DataTemplateUtil.areEqual(writeAspect2, latestAspects.get(aspectName2))); + + verify(_mockProducer, times(2)).produceMetadataChangeLog(Mockito.eq(entityUrn), + Mockito.any(), Mockito.any()); + verify(_mockProducer, times(2)).produceMetadataAuditEvent(Mockito.eq(entityUrn), + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + + verifyNoMoreInteractions(_mockProducer); + } + @Test public void testIngestGetLatestAspect() throws Exception { Urn entityUrn = Urn.createFromString("urn:li:corpuser:test"); @@ -925,4 +979,16 @@ public class EbeanEntityServiceTest { corpUserInfo.setActive(true); return corpUserInfo; } + + private String getAspectName(RecordTemplate record) { + return PegasusUtils.getAspectNameFromSchema(record.schema()); + } + + private Pair getAspectRecordPair(T aspect, Class clazz) + throws Exception { + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + RecordTemplate recordTemplate = RecordUtils.toRecordTemplate(clazz, objectMapper.writeValueAsString(aspect)); + return new Pair<>(getAspectName(aspect), recordTemplate); + } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java index 603f8e8bc9..79a318810d 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java @@ -85,7 +85,7 @@ public class IngestDataPlatformInstancesStep implements BootstrapStep { final AuditStamp aspectAuditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); - _entityService.ingestAspect(urn, PLATFORM_INSTANCE_ASPECT_NAME, dataPlatformInstance.get(), aspectAuditStamp); + _entityService.ingestAspect(urn, PLATFORM_INSTANCE_ASPECT_NAME, dataPlatformInstance.get(), aspectAuditStamp, null); } log.info("Finished ingesting DataPlaformInstance for urn {} to {}", start, start + BATCH_SIZE); start += BATCH_SIZE; diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java index 23a43e269c..56910cf24b 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java @@ -59,7 +59,7 @@ public class IngestDataPlatformsStep implements BootstrapStep { final AuditStamp aspectAuditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); - _entityService.ingestAspect(urn, PLATFORM_ASPECT_NAME, info, aspectAuditStamp); + _entityService.ingestAspect(urn, PLATFORM_ASPECT_NAME, info, aspectAuditStamp, null); } } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRootUserStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRootUserStep.java index 555153130c..4d1883044c 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRootUserStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRootUserStep.java @@ -57,6 +57,6 @@ public class IngestRootUserStep implements BootstrapStep { RecordUtils.toRecordTemplate(CorpUserInfo.class, userObj.get("info").toString()); final AuditStamp aspectAuditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); - _entityService.ingestAspect(urn, USER_INFO_ASPECT_NAME, info, aspectAuditStamp); + _entityService.ingestAspect(urn, USER_INFO_ASPECT_NAME, info, aspectAuditStamp, null); } } \ No newline at end of file