package com.linkedin.metadata.dao; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.data.schema.validation.CoercionMode; import com.linkedin.data.schema.validation.RequiredMode; import com.linkedin.data.schema.validation.UnrecognizedFieldMode; import com.linkedin.data.schema.validation.ValidateDataAgainstSchema; import com.linkedin.data.schema.validation.ValidationOptions; import com.linkedin.data.schema.validation.ValidationResult; import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.UnionTemplate; import com.linkedin.metadata.dao.equality.DefaultEqualityTester; import com.linkedin.metadata.dao.equality.EqualityTester; import com.linkedin.metadata.dao.exception.ModelValidationException; import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer; import com.linkedin.metadata.dao.retention.IndefiniteRetention; import com.linkedin.metadata.dao.retention.Retention; import com.linkedin.metadata.dao.retention.TimeBasedRetention; import com.linkedin.metadata.dao.retention.VersionBasedRetention; import com.linkedin.metadata.query.ExtraInfo; import java.time.Clock; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.Value; /** * A base class for all Local DAOs. * * Local DAO is a standardized interface to store and retrieve aspects from a document store. * See http://go/gma for more details. * * @param must be a valid aspect union type defined in com.linkedin.metadata.aspect * @param must be the entity URN type in {@code ASPECT_UNION} */ public abstract class BaseLocalDAO extends BaseReadDAO { @Value static class AspectEntry { RecordTemplate aspect; ExtraInfo extraInfo; } @Value static class AddResult { RecordTemplate oldValue; RecordTemplate newValue; } private static final String DEFAULT_ID_NAMESPACE = "global"; private static final IndefiniteRetention INDEFINITE_RETENTION = new IndefiniteRetention(); private static final int DEFAULT_MAX_TRANSACTION_RETRY = 3; protected final BaseMetadataEventProducer _producer; // Maps an aspect class to the corresponding retention policy private final Map, Retention> _aspectRetentionMap = new HashMap<>(); // Maps an aspect class to the corresponding equality tester private final Map, EqualityTester> _aspectEqualityTesterMap = new HashMap<>(); private boolean _modelValidationOnWrite = true; private Clock _clock = Clock.systemUTC(); public BaseLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull BaseMetadataEventProducer producer) { super(aspectUnionClass); _producer = producer; } /** * For tests to override the internal clock */ public void setClock(@Nonnull Clock clock) { _clock = clock; } /** * Sets {@link Retention} for a specific aspect type. */ public void setRetention(@Nonnull Class aspectClass, @Nonnull Retention retention) { checkValidAspect(aspectClass); _aspectRetentionMap.put(aspectClass, retention); } /** * Gets the {@link Retention} for an aspect type, or {@link IndefiniteRetention} if none is registered. */ @Nonnull public Retention getRetention(@Nonnull Class aspectClass) { checkValidAspect(aspectClass); return _aspectRetentionMap.getOrDefault(aspectClass, INDEFINITE_RETENTION); } /** * Sets the {@link EqualityTester} for a specific aspect type. */ public void setEqualityTester(@Nonnull Class aspectClass, @Nonnull EqualityTester tester) { checkValidAspect(aspectClass); _aspectEqualityTesterMap.put(aspectClass, tester); } /** * Gets the {@link EqualityTester} for an aspect, or {@link DefaultEqualityTester} if none is registered. */ @Nonnull public EqualityTester getEqualityTester(@Nonnull Class aspectClass) { checkValidAspect(aspectClass); return (EqualityTester) _aspectEqualityTesterMap.computeIfAbsent(aspectClass, key -> new DefaultEqualityTester()); } /** * Enables or disables model validation before persisting. */ public void enableModelValidationOnWrite(boolean enabled) { _modelValidationOnWrite = enabled; } /** * Adds a new version of aspect for an entity. * * The new aspect will have an automatically assigned version number, which is guaranteed to be positive and * monotonically increasing. Older versions of aspect will be purged automatically based on the retention setting. * A MetadataAuditEvent is also emitted if there's an actual update. * * @param urn the URN for the entity the aspect is attached to * @param auditStamp the audit stamp for the operation * @param updateLambda a lambda expression that takes the previous version of aspect and returns the new version * @return {@link RecordTemplate} of the new value of aspect, empty if the transaction fails */ @Nonnull public Optional add(@Nonnull URN urn, @Nonnull Class aspectClass, @Nonnull Function, RecordTemplate> updateLambda, @Nonnull AuditStamp auditStamp, int maxTransactionRetry) { checkValidAspect(aspectClass); final EqualityTester equalityTester = getEqualityTester(aspectClass); AddResult result = runInTransactionWithRetry(() -> { // 1. Compute newValue based on oldValue AspectEntry latest = getLatest(urn, aspectClass); final ASPECT oldValue = latest == null ? null : (ASPECT) latest.getAspect(); final ASPECT newValue = (ASPECT) updateLambda.apply(Optional.ofNullable(oldValue)); checkValidAspect(newValue.getClass()); if (_modelValidationOnWrite) { validateAgainstSchema(newValue); } // 2. Skip saving if there's no actual change if (oldValue != null && equalityTester.equals(oldValue, newValue)) { return new AddResult(oldValue, oldValue); } // 3. Save the newValue as the latest version long largestVersion = saveLatest(urn, aspectClass, oldValue, latest == null ? null : latest.getExtraInfo().getAudit(), newValue, auditStamp); // 4. Apply retention policy applyRetention(urn, aspectClass, getRetention(aspectClass), largestVersion); return new AddResult(oldValue, newValue); }, maxTransactionRetry); // 5. Produce MAE after a successful update if (result != null) { _producer.produceMetadataAuditEvent(urn, result.getOldValue(), result.getNewValue()); } return Optional.ofNullable(result).map(r -> result.getNewValue()); } /** * Similar to {@link #add(Urn, Class, Function, AuditStamp, int)} but uses the default maximum transaction retry. */ @Nonnull public Optional add(@Nonnull URN urn, @Nonnull Class aspectClass, @Nonnull Function, RecordTemplate> updateLambda, @Nonnull AuditStamp auditStamp) { return add(urn, aspectClass, updateLambda, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY); } /** * Similar to {@link #add(Urn, Class, Function, AuditStamp)} but takes the new value directly. */ @Nonnull public Optional add(@Nonnull URN urn, @Nonnull ASPECT newValue, @Nonnull AuditStamp auditStamp) { return add(urn, newValue.getClass(), ignored -> newValue, auditStamp); } private void applyRetention(@Nonnull URN urn, @Nonnull Class aspectClass, @Nonnull Retention retention, long largestVersion) { if (retention instanceof IndefiniteRetention) { return; } if (retention instanceof VersionBasedRetention) { applyVersionBasedRetention(aspectClass, urn, (VersionBasedRetention) retention, largestVersion); return; } if (retention instanceof TimeBasedRetention) { applyTimeBasedRetention(aspectClass, urn, (TimeBasedRetention) retention, _clock.millis()); return; } } /** * Saves the latest aspect * * @param urn the URN for the entity the aspect is attached to * @param aspectClass the aspectClass of the aspect being saved * @param oldEntry {@link RecordTemplate} of the previous latest value of aspect, null if new value is the first version * @param oldAuditStamp the audit stamp of the previous latest aspect, null if new value is the first version * @param newEntry {@link RecordTemplate} of the new latest value of aspect * @param newAuditStamp the audit stamp for the operation * @return the largestVersion */ protected abstract long saveLatest(@Nonnull URN urn, @Nonnull Class aspectClass, @Nullable ASPECT oldEntry, @Nullable AuditStamp oldAuditStamp, @Nonnull ASPECT newEntry, @Nonnull AuditStamp newAuditStamp); /** * Runs the given lambda expression in a transaction with a limited number of retries. * * @param block the lambda expression to run * @param maxTransactionRetry maximum number of transaction retries before throwing an exception * @param type for the result object * @return the result object from a successfully committed transaction */ protected abstract T runInTransactionWithRetry(@Nonnull Supplier block, int maxTransactionRetry); /** * Gets the latest version of a specific aspect type for an entity * * @param urn {@link Urn} for the entity * @param aspectClass the type of aspect to get * @return the latest version for the aspect type, or null if there's none */ @Nullable protected abstract AspectEntry getLatest(@Nonnull URN urn, @Nonnull Class aspectClass); /** * Gets the next version to use for an entity's specific aspect type. * * @param urn {@link Urn} for the entity * @param aspectClass the type of aspect to get * @return the next version number to use, or {@link #LATEST_VERSION} if there's no previous versions */ protected abstract long getNextVersion(@Nonnull URN urn, @Nonnull Class aspectClass); /** * Saves an aspect for an entity with specific version & {@link AuditStamp}. * * @param urn {@link Urn} for the entity * @param value the aspect to save * @param auditStamp the {@link AuditStamp} for the aspect * @param version the version for the aspect * @param insert use insert, instead of update, operation to save */ protected abstract void save(@Nonnull URN urn, @Nonnull RecordTemplate value, @Nonnull AuditStamp auditStamp, long version, boolean insert); /** * Applies version-based retention against a specific aspect type for an entity * * @param aspectClass the type of aspect to apply retention to * @param urn {@link Urn} for the entity * @param retention the retention configuration * @param largestVersion the largest version number for the aspect type */ protected abstract void applyVersionBasedRetention(@Nonnull Class aspectClass, @Nonnull URN urn, @Nonnull VersionBasedRetention retention, long largestVersion); /** * Applies time-based retention against a specific aspect type for an entity * * @param aspectClass the type of aspect to apply retention to * @param urn {@link Urn} for the entity * @param retention the retention configuration * @param currentTime the current timestamp */ protected abstract void applyTimeBasedRetention(@Nonnull Class aspectClass, @Nonnull URN urn, @Nonnull TimeBasedRetention retention, long currentTime); /** * Emits backfill MetadataAuditEvent for the latest version of an aspect for an entity. * * @param aspectClass the type of aspect to backfill * @param urn {@link Urn} for the entity * @param must be a supported aspect type in {@code ASPECT_UNION}. * @return the aspect emitted in the backfill message */ @Nonnull public Optional backfill(@Nonnull Class aspectClass, @Nonnull URN urn) { checkValidAspect(aspectClass); Optional aspect = get(aspectClass, urn, LATEST_VERSION); aspect.ifPresent(value -> _producer.produceMetadataAuditEvent(urn, value, value)); return aspect; } /** * Paginates over all available versions of an aspect for an entity. * * @param aspectClass the type of the aspect to query * @param urn {@link Urn} for the entity * @param start the starting offset of the page * @param pageSize the size of the page * @param must be a supported aspect type in {@code ASPECT_UNION}. * @return a {@link ListResult} containing a list of version numbers and other pagination information */ @Nonnull public abstract ListResult listVersions(@Nonnull Class aspectClass, @Nonnull URN urn, int start, int pageSize); /** * Paginates over all URNs for entities that have a specific aspect. * * @param aspectClass the type of the aspect to query * @param start the starting offset of the page * @param pageSize the size of the page * @param must be a supported aspect type in {@code ASPECT_UNION}. * @return a {@link ListResult} containing a list of URN and other pagination information */ @Nonnull public abstract ListResult listUrns(@Nonnull Class aspectClass, int start, int pageSize); /** * Paginates over all versions of an aspect for a specific Urn. * * @param aspectClass the type of the aspect to query * @param urn {@link Urn} for the entity * @param start the starting offset of the page * @param pageSize the size of the page * @param must be a supported aspect type in {@code ASPECT_UNION}. * @return a {@link ListResult} containing a list of aspects and other pagination information */ @Nonnull public abstract ListResult list(@Nonnull Class aspectClass, @Nonnull URN urn, int start, int pageSize); /** * Paginates over a specific version of a specific aspect for all Urns * * @param aspectClass the type of the aspect to query * @param version the version of the aspect * @param start the starting offset of the page * @param pageSize the size of the page * @param must be a supported aspect type in {@code ASPECT_UNION}. * @return a {@link ListResult} containing a list of aspects and other pagination information */ @Nonnull public abstract ListResult list(@Nonnull Class aspectClass, long version, int start, int pageSize); /** * Paginates over the latest version of a specific aspect for all Urns * * @param aspectClass the type of the aspect to query * @param start the starting offset of the page * @param pageSize the size of the page * @param must be a supported aspect type in {@code ASPECT_UNION}. * @return a {@link ListResult} containing a list of aspects and other pagination information */ @Nonnull public abstract ListResult list(@Nonnull Class aspectClass, int start, int pageSize); /** * Generates a new string ID that's guaranteed to be globally unique. */ @Nonnull public String newStringId() { return UUID.randomUUID().toString(); } /** * Generates a new numeric ID that's guaranteed to increase monotonically within the given namespace. */ public abstract long newNumericId(@Nonnull String namespace, int maxTransactionRetry); /** * Similar to {@link #newNumericId(String, int)} but uses default maximum transaction retry count. */ public long newNumericId(@Nonnull String namespace) { return newNumericId(namespace, DEFAULT_MAX_TRANSACTION_RETRY); } /** * Similar to {@link #newNumericId(String, int)} but uses a single global namespace */ public long newNumericId() { return newNumericId(DEFAULT_ID_NAMESPACE); } /** * Validates a model against its schema. */ protected void validateAgainstSchema(@Nonnull RecordTemplate model) { ValidationResult result = ValidateDataAgainstSchema.validate(model, new ValidationOptions(RequiredMode.CAN_BE_ABSENT_IF_HAS_DEFAULT, CoercionMode.NORMAL, UnrecognizedFieldMode.DISALLOW)); if (!result.isValid()) { throw new ModelValidationException(result.getMessages().toString()); } } }