2021-06-03 13:24:33 -07:00
|
|
|
package com.linkedin.metadata.entity;
|
|
|
|
|
2021-12-14 11:18:02 +09:00
|
|
|
import com.codahale.metrics.Timer;
|
2021-06-03 13:24:33 -07:00
|
|
|
import com.google.common.collect.ImmutableList;
|
2021-07-29 20:04:40 -07:00
|
|
|
import com.google.common.collect.Streams;
|
2021-06-03 13:24:33 -07:00
|
|
|
import com.linkedin.common.AuditStamp;
|
2021-10-21 11:15:10 -07:00
|
|
|
import com.linkedin.common.BrowsePaths;
|
2021-06-03 13:24:33 -07:00
|
|
|
import com.linkedin.common.urn.Urn;
|
|
|
|
import com.linkedin.data.schema.RecordDataSchema;
|
2021-07-30 17:41:03 -07:00
|
|
|
import com.linkedin.data.schema.TyperefDataSchema;
|
2021-06-03 13:24:33 -07:00
|
|
|
import com.linkedin.data.template.RecordTemplate;
|
|
|
|
import com.linkedin.data.template.UnionTemplate;
|
|
|
|
import com.linkedin.entity.Entity;
|
2022-01-04 00:19:46 +09:00
|
|
|
import com.linkedin.entity.EntityResponse;
|
|
|
|
import com.linkedin.entity.EnvelopedAspect;
|
|
|
|
import com.linkedin.entity.EnvelopedAspectMap;
|
|
|
|
import com.linkedin.metadata.utils.PegasusUtils;
|
2021-11-08 16:22:24 -08:00
|
|
|
import com.linkedin.events.metadata.ChangeType;
|
2021-06-16 10:03:21 -07:00
|
|
|
import com.linkedin.metadata.aspect.VersionedAspect;
|
2021-06-03 13:24:33 -07:00
|
|
|
import com.linkedin.metadata.dao.exception.ModelConversionException;
|
|
|
|
import com.linkedin.metadata.dao.utils.RecordUtils;
|
|
|
|
import com.linkedin.metadata.event.EntityEventProducer;
|
|
|
|
import com.linkedin.metadata.models.AspectSpec;
|
|
|
|
import com.linkedin.metadata.models.EntitySpec;
|
|
|
|
import com.linkedin.metadata.models.registry.EntityRegistry;
|
2021-09-02 19:05:13 -07:00
|
|
|
import com.linkedin.metadata.query.ListUrnsResult;
|
2021-07-29 20:04:40 -07:00
|
|
|
import com.linkedin.metadata.run.AspectRowSummary;
|
2021-10-07 11:41:29 -07:00
|
|
|
import com.linkedin.metadata.search.utils.BrowsePathUtils;
|
2021-06-03 13:24:33 -07:00
|
|
|
import com.linkedin.metadata.snapshot.Snapshot;
|
2021-10-07 11:41:29 -07:00
|
|
|
import com.linkedin.metadata.utils.DataPlatformInstanceUtils;
|
|
|
|
import com.linkedin.metadata.utils.EntityKeyUtils;
|
2021-11-08 16:22:24 -08:00
|
|
|
import com.linkedin.metadata.utils.GenericAspectUtils;
|
2021-12-14 11:18:02 +09:00
|
|
|
import com.linkedin.metadata.utils.metrics.MetricUtils;
|
2021-07-29 20:04:40 -07:00
|
|
|
import com.linkedin.mxe.MetadataAuditOperation;
|
2021-07-30 17:41:03 -07:00
|
|
|
import com.linkedin.mxe.MetadataChangeLog;
|
|
|
|
import com.linkedin.mxe.MetadataChangeProposal;
|
2021-07-29 20:04:40 -07:00
|
|
|
import com.linkedin.mxe.SystemMetadata;
|
2021-07-30 17:41:03 -07:00
|
|
|
import com.linkedin.util.Pair;
|
2021-06-03 13:24:33 -07:00
|
|
|
import java.net.URISyntaxException;
|
2021-10-21 11:15:10 -07:00
|
|
|
import java.util.ArrayList;
|
2021-06-03 13:24:33 -07:00
|
|
|
import java.util.Collections;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Map;
|
2021-11-08 16:22:24 -08:00
|
|
|
import java.util.Optional;
|
2021-06-03 13:24:33 -07:00
|
|
|
import java.util.Set;
|
2021-12-14 11:18:02 +09:00
|
|
|
import java.util.function.Function;
|
2021-06-03 13:24:33 -07:00
|
|
|
import java.util.stream.Collectors;
|
|
|
|
import javax.annotation.Nonnull;
|
|
|
|
import javax.annotation.Nullable;
|
2021-12-14 11:18:02 +09:00
|
|
|
import lombok.Getter;
|
|
|
|
import lombok.Setter;
|
|
|
|
import lombok.Value;
|
2021-06-25 10:56:45 -07:00
|
|
|
import lombok.extern.slf4j.Slf4j;
|
2021-06-03 13:24:33 -07:00
|
|
|
|
2021-10-07 11:41:29 -07:00
|
|
|
import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
|
2021-09-02 19:05:13 -07:00
|
|
|
import static com.linkedin.metadata.utils.PegasusUtils.getDataTemplateClassFromSchema;
|
|
|
|
import static com.linkedin.metadata.utils.PegasusUtils.urnToEntityName;
|
2021-06-03 13:24:33 -07:00
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* An abstract base class specifying create, update, and read operations against metadata entities and aspects
|
|
|
|
* by primary key (urn).
|
|
|
|
*
|
|
|
|
* This interface is meant to abstract away the storage concerns of these pieces of metadata, permitting
|
|
|
|
* any underlying storage system to be used in materializing GMS domain objects, which are implemented using Pegasus
|
|
|
|
* {@link RecordTemplate}s.
|
|
|
|
*
|
|
|
|
* A key requirement of any implementation is being able to bind what is persisted in storage to an aspect
|
|
|
|
* {@link RecordTemplate}, using help from the {@link EntityRegistry}.
|
|
|
|
*
|
|
|
|
* Another requirement is that any implementation honors the internal versioning semantics. The latest version of any aspect
|
|
|
|
* is set to 0 for efficient retrieval; in most cases the latest state of an aspect will be the only fetched.
|
|
|
|
*
|
|
|
|
* As such, 0 is treated as a special number. Once an aspect is no longer the latest, versions will increment
|
|
|
|
* monotonically, starting from 1. Thus, the second-to-last version of an aspect will be equal to total # versions
|
|
|
|
* of the aspect - 1.
|
|
|
|
*
|
|
|
|
* For example, if there are 5 instances of a single aspect, the latest will have version 0, and the second-to-last
|
|
|
|
* will have version 4. The "true" latest version of an aspect is always equal to the highest stored version
|
|
|
|
* of a given aspect + 1.
|
|
|
|
*
|
|
|
|
* Note that currently, implementations of this interface are responsible for producing Metadata Audit Events on
|
2021-07-29 20:04:40 -07:00
|
|
|
* ingestion using {@link #produceMetadataAuditEvent(
|
2021-07-30 17:41:03 -07:00
|
|
|
*Urn, RecordTemplate, RecordTemplate, SystemMetadata, SystemMetadata, MetadataAuditOperation)}.
|
2021-06-03 13:24:33 -07:00
|
|
|
*
|
|
|
|
* TODO: Consider whether we can abstract away virtual versioning semantics to subclasses of this class.
|
|
|
|
* TODO: Extract out a nested 'AspectService'.
|
|
|
|
*/
|
2021-06-25 10:56:45 -07:00
|
|
|
@Slf4j
|
2021-06-03 13:24:33 -07:00
|
|
|
public abstract class EntityService {
|
|
|
|
|
|
|
|
/**
|
|
|
|
* As described above, the latest version of an aspect should <b>always</b> take the value 0, with
|
|
|
|
* monotonically increasing version incrementing as usual once the latest version is replaced.
|
|
|
|
*/
|
|
|
|
|
|
|
|
private final EntityEventProducer _producer;
|
|
|
|
private final EntityRegistry _entityRegistry;
|
|
|
|
private final Map<String, Set<String>> _entityToValidAspects;
|
2021-12-14 11:18:02 +09:00
|
|
|
@Getter
|
|
|
|
@Setter
|
|
|
|
private RetentionService retentionService;
|
|
|
|
private Boolean _alwaysEmitAuditEvent = false;
|
2021-07-29 20:04:40 -07:00
|
|
|
public static final String DEFAULT_RUN_ID = "no-run-id-provided";
|
2021-10-07 11:41:29 -07:00
|
|
|
public static final String BROWSE_PATHS = "browsePaths";
|
|
|
|
public static final String DATA_PLATFORM_INSTANCE = "dataPlatformInstance";
|
2021-06-03 13:24:33 -07:00
|
|
|
|
|
|
|
protected EntityService(@Nonnull final EntityEventProducer producer, @Nonnull final EntityRegistry entityRegistry) {
|
|
|
|
_producer = producer;
|
|
|
|
_entityRegistry = entityRegistry;
|
|
|
|
_entityToValidAspects = buildEntityToValidAspects(entityRegistry);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Retrieves the latest aspects corresponding to a batch of {@link Urn}s based on a provided
|
|
|
|
* set of aspect names.
|
|
|
|
*
|
|
|
|
* @param urns set of urns to fetch aspects for
|
|
|
|
* @param aspectNames aspects to fetch for each urn in urns set
|
|
|
|
* @return a map of provided {@link Urn} to a List containing the requested aspects.
|
|
|
|
*/
|
2022-01-04 00:19:46 +09:00
|
|
|
public abstract Map<Urn, List<RecordTemplate>> getLatestAspects(
|
|
|
|
@Nonnull final Set<Urn> urns,
|
2021-06-03 13:24:33 -07:00
|
|
|
@Nonnull final Set<String> aspectNames);
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Retrieves an aspect having a specific {@link Urn}, name, & version.
|
|
|
|
*
|
|
|
|
* Note that once we drop support for legacy aspect-specific resources,
|
|
|
|
* we should make this a protected method. Only visible for backwards compatibility.
|
|
|
|
*
|
|
|
|
* @param urn an urn associated with the requested aspect
|
|
|
|
* @param aspectName name of the aspect requested
|
|
|
|
* @param version specific version of the aspect being requests
|
2021-11-22 16:33:14 -08:00
|
|
|
* @return the {@link RecordTemplate} representation of the requested aspect object, or null if one cannot be found
|
2021-06-03 13:24:33 -07:00
|
|
|
*/
|
2021-11-22 16:33:14 -08:00
|
|
|
@Nullable
|
2021-07-30 17:41:03 -07:00
|
|
|
public abstract RecordTemplate getAspect(@Nonnull final Urn urn, @Nonnull final String aspectName, long version);
|
2021-06-03 13:24:33 -07:00
|
|
|
|
2022-01-04 00:19:46 +09:00
|
|
|
/**
|
|
|
|
* Retrieves the latest aspects for the given set of urns as dynamic aspect objects
|
|
|
|
* (Without having to define union objects)
|
|
|
|
*
|
|
|
|
* @param entityName name of the entity to fetch
|
|
|
|
* @param urns set of urns to fetch
|
|
|
|
* @param aspectNames set of aspects to fetch
|
|
|
|
* @return a map of {@link Urn} to {@link Entity} object
|
|
|
|
*/
|
|
|
|
public Map<Urn, EntityResponse> getEntitiesV2(
|
|
|
|
@Nonnull final String entityName,
|
|
|
|
@Nonnull final Set<Urn> urns,
|
|
|
|
@Nonnull final Set<String> aspectNames) throws Exception {
|
|
|
|
return getLatestEnvelopedAspects(entityName, urns, aspectNames)
|
|
|
|
.entrySet()
|
|
|
|
.stream()
|
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, entry -> toEntityResponse(entry.getKey(), entry.getValue())));
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Retrieves the latest aspects for the given set of urns as a list of enveloped aspects
|
|
|
|
*
|
|
|
|
* @param entityName name of the entity to fetch
|
|
|
|
* @param urns set of urns to fetch
|
|
|
|
* @param aspectNames set of aspects to fetch
|
|
|
|
* @return a map of {@link Urn} to {@link EnvelopedAspect} object
|
|
|
|
*/
|
|
|
|
public abstract Map<Urn, List<EnvelopedAspect>> getLatestEnvelopedAspects(
|
|
|
|
@Nonnull final String entityName,
|
|
|
|
@Nonnull final Set<Urn> urns,
|
|
|
|
@Nonnull final Set<String> aspectNames) throws Exception;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Retrieves the latest aspect for the given urn as a list of enveloped aspects
|
|
|
|
*
|
|
|
|
* @param entityName name of the entity to fetch
|
|
|
|
* @param urn urn to fetch
|
|
|
|
* @param aspectName name of the aspect to fetch
|
|
|
|
* @return {@link EnvelopedAspect} object, or null if one cannot be found
|
|
|
|
*/
|
|
|
|
public abstract EnvelopedAspect getLatestEnvelopedAspect(
|
|
|
|
@Nonnull final String entityName,
|
|
|
|
@Nonnull final Urn urn,
|
|
|
|
@Nonnull final String aspectName) throws Exception;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Retrieves the specific version of the aspect for the given urn
|
|
|
|
*
|
|
|
|
* @param entityName name of the entity to fetch
|
|
|
|
* @param urn urn to fetch
|
|
|
|
* @param aspectName name of the aspect to fetch
|
|
|
|
* @param version version to fetch
|
|
|
|
* @return {@link EnvelopedAspect} object, or null if one cannot be found
|
|
|
|
*/
|
|
|
|
public abstract EnvelopedAspect getEnvelopedAspect(
|
|
|
|
@Nonnull final String entityName,
|
|
|
|
@Nonnull final Urn urn,
|
|
|
|
@Nonnull final String aspectName,
|
|
|
|
long version) throws Exception;
|
|
|
|
|
2021-11-22 16:33:14 -08:00
|
|
|
/**
|
|
|
|
* Retrieves an {@link VersionedAspect}, or null if one cannot be found.
|
|
|
|
*/
|
|
|
|
@Nullable
|
2021-07-30 17:41:03 -07:00
|
|
|
public abstract VersionedAspect getVersionedAspect(@Nonnull final Urn urn, @Nonnull final String aspectName,
|
2021-06-16 10:03:21 -07:00
|
|
|
long version);
|
|
|
|
|
2021-06-03 13:24:33 -07:00
|
|
|
/**
|
2021-06-17 19:52:50 -07:00
|
|
|
* Retrieves a list of all aspects belonging to an entity of a particular type, sorted by urn.
|
2021-06-03 13:24:33 -07:00
|
|
|
*
|
|
|
|
* Note that once we drop support for legacy 'getAllDataPlatforms' endpoint,
|
|
|
|
* we can drop support for this unless otherwise required. Only visible for backwards compatibility.
|
|
|
|
*
|
2021-06-17 19:52:50 -07:00
|
|
|
* @param entityName name of the entity type the aspect belongs to, e.g. 'dataset'
|
|
|
|
* @param aspectName name of the aspect requested, e.g. 'ownership'
|
2021-06-03 13:24:33 -07:00
|
|
|
* @param start the starting index of the returned aspects, used in pagination
|
|
|
|
* @param count the count of the aspects to be returned, used in pagination
|
|
|
|
* @return a {@link ListResult} of {@link RecordTemplate}s representing the requested aspect.
|
|
|
|
*/
|
2021-07-30 17:41:03 -07:00
|
|
|
public abstract ListResult<RecordTemplate> listLatestAspects(@Nonnull final String entityName,
|
|
|
|
@Nonnull final String aspectName, final int start, int count);
|
2021-06-03 13:24:33 -07:00
|
|
|
|
2021-12-14 11:18:02 +09:00
|
|
|
/**
|
|
|
|
* Checks whether there is an actual update to the aspect by applying the updateLambda
|
|
|
|
* If there is an update, push the new version into the local DB.
|
|
|
|
* Otherwise, do not push the new version, but just update the system metadata.
|
|
|
|
*
|
|
|
|
* @param urn an urn associated with the new aspect
|
|
|
|
* @param aspectName name of the aspect being inserted
|
|
|
|
* @param updateLambda Function to apply to the latest version of the aspect to get the updated version
|
|
|
|
* @param auditStamp an {@link AuditStamp} containing metadata about the writer & current time * @param providedSystemMetadata
|
|
|
|
* @return Details about the new and old version of the aspect
|
|
|
|
*/
|
|
|
|
@Nonnull
|
|
|
|
protected abstract UpdateAspectResult ingestAspectToLocalDB(@Nonnull final Urn urn, @Nonnull final String aspectName,
|
|
|
|
@Nonnull final Function<Optional<RecordTemplate>, RecordTemplate> updateLambda,
|
|
|
|
@Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata providedSystemMetadata);
|
|
|
|
|
2021-06-03 13:24:33 -07:00
|
|
|
/**
|
|
|
|
* Ingests (inserts) a new version of an entity aspect & emits a {@link com.linkedin.mxe.MetadataAuditEvent}.
|
|
|
|
*
|
|
|
|
* Note that in general, this should not be used externally. It is currently serving upgrade scripts and
|
|
|
|
* is as such public.
|
|
|
|
*
|
|
|
|
* @param urn an urn associated with the new aspect
|
|
|
|
* @param aspectName name of the aspect being inserted
|
|
|
|
* @param newValue value of the aspect being inserted
|
|
|
|
* @param auditStamp an {@link AuditStamp} containing metadata about the writer & current time
|
2021-07-29 20:04:40 -07:00
|
|
|
* @param systemMetadata
|
2021-06-03 13:24:33 -07:00
|
|
|
* @return the {@link RecordTemplate} representation of the written aspect object
|
|
|
|
*/
|
2021-12-14 11:18:02 +09:00
|
|
|
public RecordTemplate ingestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName,
|
|
|
|
@Nonnull final RecordTemplate newValue, @Nonnull final AuditStamp auditStamp, SystemMetadata systemMetadata) {
|
|
|
|
|
|
|
|
log.debug("Invoked ingestAspect with urn: {}, aspectName: {}, newValue: {}", urn, aspectName, newValue);
|
|
|
|
|
|
|
|
if (!urn.toString().trim().equals(urn.toString())) {
|
|
|
|
throw new IllegalArgumentException("Error: cannot provide an URN with leading or trailing whitespace");
|
|
|
|
}
|
|
|
|
|
|
|
|
Timer.Context ingestToLocalDBTimer = MetricUtils.timer(this.getClass(), "ingestAspectToLocalDB").time();
|
|
|
|
UpdateAspectResult result = ingestAspectToLocalDB(urn, aspectName, ignored -> newValue, auditStamp, systemMetadata);
|
|
|
|
ingestToLocalDBTimer.stop();
|
|
|
|
|
|
|
|
final RecordTemplate oldValue = result.getOldValue();
|
|
|
|
final RecordTemplate updatedValue = result.getNewValue();
|
|
|
|
|
|
|
|
// Apply retention policies asynchronously if there was an update to existing aspect value
|
|
|
|
if (oldValue != updatedValue && oldValue != null && retentionService != null) {
|
|
|
|
retentionService.applyRetention(urn, aspectName,
|
|
|
|
Optional.of(new RetentionService.RetentionContext(Optional.of(result.maxVersion))));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Produce MAE after a successful update
|
|
|
|
if (oldValue != updatedValue || _alwaysEmitAuditEvent) {
|
|
|
|
log.debug(String.format("Producing MetadataAuditEvent for ingested aspect %s, urn %s", aspectName, urn));
|
|
|
|
Timer.Context produceMAETimer = MetricUtils.timer(this.getClass(), "produceMAE").time();
|
|
|
|
if (aspectName.equals(getKeyAspectName(urn))) {
|
|
|
|
produceMetadataAuditEventForKey(urn, result.getNewSystemMetadata());
|
|
|
|
} else {
|
|
|
|
produceMetadataAuditEvent(urn, oldValue, updatedValue, result.getOldSystemMetadata(),
|
|
|
|
result.getNewSystemMetadata(), MetadataAuditOperation.UPDATE);
|
|
|
|
}
|
|
|
|
produceMAETimer.stop();
|
|
|
|
} else {
|
|
|
|
log.debug(
|
|
|
|
String.format("Skipped producing MetadataAuditEvent for ingested aspect %s, urn %s. Aspect has not changed.",
|
|
|
|
aspectName, urn));
|
|
|
|
}
|
|
|
|
|
|
|
|
return updatedValue;
|
|
|
|
}
|
2021-07-29 20:04:40 -07:00
|
|
|
|
|
|
|
public RecordTemplate ingestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName,
|
|
|
|
@Nonnull final RecordTemplate newValue, @Nonnull final AuditStamp auditStamp) {
|
|
|
|
|
|
|
|
SystemMetadata generatedSystemMetadata = new SystemMetadata();
|
|
|
|
generatedSystemMetadata.setLastObserved(System.currentTimeMillis());
|
|
|
|
|
|
|
|
return ingestAspect(urn, aspectName, newValue, auditStamp, generatedSystemMetadata);
|
|
|
|
}
|
2021-06-03 13:24:33 -07:00
|
|
|
|
2021-12-14 11:18:02 +09:00
|
|
|
public IngestProposalResult ingestProposal(@Nonnull MetadataChangeProposal metadataChangeProposal, AuditStamp auditStamp) {
|
|
|
|
|
|
|
|
log.debug("entity type = {}", metadataChangeProposal.getEntityType());
|
|
|
|
EntitySpec entitySpec = getEntityRegistry().getEntitySpec(metadataChangeProposal.getEntityType());
|
|
|
|
log.debug("entity spec = {}", entitySpec);
|
|
|
|
|
|
|
|
Urn entityUrn = EntityKeyUtils.getUrnFromProposal(metadataChangeProposal, entitySpec.getKeyAspectSpec());
|
|
|
|
|
|
|
|
if (metadataChangeProposal.getChangeType() != ChangeType.UPSERT) {
|
|
|
|
throw new UnsupportedOperationException("Only upsert operation is supported");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!metadataChangeProposal.hasAspectName() || !metadataChangeProposal.hasAspect()) {
|
|
|
|
throw new UnsupportedOperationException("Aspect and aspect name is required for create and update operations");
|
|
|
|
}
|
|
|
|
|
|
|
|
AspectSpec aspectSpec = entitySpec.getAspectSpec(metadataChangeProposal.getAspectName());
|
|
|
|
|
|
|
|
if (aspectSpec == null) {
|
|
|
|
throw new RuntimeException(
|
|
|
|
String.format("Unknown aspect %s for entity %s", metadataChangeProposal.getAspectName(),
|
|
|
|
metadataChangeProposal.getEntityType()));
|
|
|
|
}
|
|
|
|
|
|
|
|
log.debug("aspect spec = {}", aspectSpec);
|
|
|
|
|
|
|
|
RecordTemplate aspect;
|
|
|
|
try {
|
|
|
|
aspect = GenericAspectUtils.deserializeAspect(metadataChangeProposal.getAspect().getValue(),
|
|
|
|
metadataChangeProposal.getAspect().getContentType(), aspectSpec);
|
|
|
|
ValidationUtils.validateOrThrow(aspect);
|
|
|
|
} catch (ModelConversionException e) {
|
|
|
|
throw new RuntimeException(
|
|
|
|
String.format("Could not deserialize {} for aspect {}", metadataChangeProposal.getAspect().getValue(),
|
|
|
|
metadataChangeProposal.getAspectName()));
|
|
|
|
}
|
|
|
|
log.debug("aspect = {}", aspect);
|
|
|
|
|
|
|
|
SystemMetadata systemMetadata = metadataChangeProposal.getSystemMetadata();
|
|
|
|
if (systemMetadata == null) {
|
|
|
|
systemMetadata = new SystemMetadata();
|
|
|
|
systemMetadata.setRunId(DEFAULT_RUN_ID);
|
|
|
|
systemMetadata.setLastObserved(System.currentTimeMillis());
|
|
|
|
}
|
|
|
|
systemMetadata.setRegistryName(aspectSpec.getRegistryName());
|
|
|
|
systemMetadata.setRegistryVersion(aspectSpec.getRegistryVersion().toString());
|
|
|
|
|
|
|
|
RecordTemplate oldAspect = null;
|
|
|
|
SystemMetadata oldSystemMetadata = null;
|
|
|
|
RecordTemplate newAspect = aspect;
|
|
|
|
SystemMetadata newSystemMetadata = systemMetadata;
|
|
|
|
|
|
|
|
if (!aspectSpec.isTimeseries()) {
|
|
|
|
Timer.Context ingestToLocalDBTimer = MetricUtils.timer(this.getClass(), "ingestProposalToLocalDB").time();
|
|
|
|
UpdateAspectResult result =
|
|
|
|
ingestAspectToLocalDB(entityUrn, metadataChangeProposal.getAspectName(), ignored -> aspect, auditStamp,
|
|
|
|
systemMetadata);
|
|
|
|
ingestToLocalDBTimer.stop();
|
|
|
|
oldAspect = result.getOldValue();
|
|
|
|
oldSystemMetadata = result.getOldSystemMetadata();
|
|
|
|
newAspect = result.getNewValue();
|
|
|
|
newSystemMetadata = result.getNewSystemMetadata();
|
|
|
|
// Apply retention policies asynchronously if there was an update to existing aspect value
|
|
|
|
if (oldAspect != newAspect && oldAspect != null && retentionService != null) {
|
|
|
|
retentionService.applyRetention(entityUrn, aspectSpec.getName(),
|
|
|
|
Optional.of(new RetentionService.RetentionContext(Optional.of(result.maxVersion))));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (oldAspect != newAspect || getAlwaysEmitAuditEvent()) {
|
|
|
|
log.debug(String.format("Producing MetadataChangeLog for ingested aspect %s, urn %s",
|
|
|
|
metadataChangeProposal.getAspectName(), entityUrn));
|
|
|
|
|
|
|
|
final MetadataChangeLog metadataChangeLog = new MetadataChangeLog(metadataChangeProposal.data());
|
|
|
|
if (oldAspect != null) {
|
|
|
|
metadataChangeLog.setPreviousAspectValue(GenericAspectUtils.serializeAspect(oldAspect));
|
|
|
|
}
|
|
|
|
if (oldSystemMetadata != null) {
|
|
|
|
metadataChangeLog.setPreviousSystemMetadata(oldSystemMetadata);
|
|
|
|
}
|
|
|
|
if (newAspect != null) {
|
|
|
|
metadataChangeLog.setAspect(GenericAspectUtils.serializeAspect(newAspect));
|
|
|
|
}
|
|
|
|
if (newSystemMetadata != null) {
|
|
|
|
metadataChangeLog.setSystemMetadata(newSystemMetadata);
|
|
|
|
}
|
|
|
|
|
|
|
|
log.debug(String.format("Serialized MCL event: %s", metadataChangeLog));
|
|
|
|
// Since only timeseries aspects are ingested as of now, simply produce mae event for it
|
|
|
|
produceMetadataChangeLog(entityUrn, aspectSpec, metadataChangeLog);
|
|
|
|
} else {
|
|
|
|
log.debug(
|
|
|
|
String.format("Skipped producing MetadataAuditEvent for ingested aspect %s, urn %s. Aspect has not changed.",
|
|
|
|
metadataChangeProposal.getAspectName(), entityUrn));
|
|
|
|
}
|
|
|
|
|
|
|
|
return new IngestProposalResult(entityUrn, oldAspect != newAspect);
|
|
|
|
}
|
|
|
|
|
2021-06-03 13:24:33 -07:00
|
|
|
/**
|
|
|
|
* Updates a particular version of an aspect & optionally emits a {@link com.linkedin.mxe.MetadataAuditEvent}.
|
|
|
|
*
|
|
|
|
* Note that in general, this should not be used externally. It is currently serving upgrade scripts and
|
|
|
|
* is as such public.
|
|
|
|
*
|
|
|
|
* @param urn an urn associated with the aspect to update
|
2021-10-18 22:52:48 -07:00
|
|
|
* @param entityName name of the entity being updated
|
2021-06-03 13:24:33 -07:00
|
|
|
* @param aspectName name of the aspect being updated
|
2021-10-18 22:52:48 -07:00
|
|
|
* @param aspectSpec spec of the aspect being updated
|
2021-06-03 13:24:33 -07:00
|
|
|
* @param newValue new value of the aspect being updated
|
|
|
|
* @param auditStamp an {@link AuditStamp} containing metadata about the writer & current time
|
|
|
|
* @param version specific version of the aspect being requests
|
|
|
|
* @param emitMae whether a {@link com.linkedin.mxe.MetadataAuditEvent} should be emitted in correspondence upon
|
|
|
|
* successful update
|
|
|
|
* @return the {@link RecordTemplate} representation of the requested aspect object
|
|
|
|
*/
|
2021-10-18 22:52:48 -07:00
|
|
|
public abstract RecordTemplate updateAspect(@Nonnull final Urn urn, @Nonnull final String entityName,
|
|
|
|
@Nonnull final String aspectName, @Nonnull final AspectSpec aspectSpec, @Nonnull final RecordTemplate newValue,
|
|
|
|
@Nonnull final AuditStamp auditStamp, @Nonnull final long version, @Nonnull final boolean emitMae);
|
2021-06-03 13:24:33 -07:00
|
|
|
|
2021-09-02 19:05:13 -07:00
|
|
|
/**
|
|
|
|
* Lists the entity URNs found in storage.
|
|
|
|
*
|
|
|
|
* @param entityName the name associated with the entity
|
|
|
|
* @param start the start offset
|
|
|
|
* @param count the count
|
|
|
|
*/
|
|
|
|
public abstract ListUrnsResult listUrns(@Nonnull final String entityName, final int start, final int count);
|
|
|
|
|
2021-06-03 13:24:33 -07:00
|
|
|
/**
|
|
|
|
* Default implementations. Subclasses should feel free to override if it's more efficient to do so.
|
|
|
|
*/
|
|
|
|
public Entity getEntity(@Nonnull final Urn urn, @Nonnull final Set<String> aspectNames) {
|
2021-07-30 17:41:03 -07:00
|
|
|
return getEntities(Collections.singleton(urn), aspectNames).values().stream().findFirst().orElse(null);
|
2021-06-03 13:24:33 -07:00
|
|
|
}
|
|
|
|
|
2021-06-04 11:28:53 -07:00
|
|
|
/**
|
|
|
|
* Retrieves multiple entities.
|
|
|
|
*
|
|
|
|
* @param urns set of urns to fetch
|
|
|
|
* @param aspectNames set of aspects to fetch
|
|
|
|
* @return a map of {@link Urn} to {@link Entity} object
|
|
|
|
*/
|
|
|
|
public Map<Urn, Entity> getEntities(@Nonnull final Set<Urn> urns, @Nonnull Set<String> aspectNames) {
|
2021-06-25 10:56:45 -07:00
|
|
|
log.debug(String.format("Invoked getEntities with urns %s, aspects %s", urns, aspectNames));
|
2021-06-04 11:28:53 -07:00
|
|
|
if (urns.isEmpty()) {
|
|
|
|
return Collections.emptyMap();
|
|
|
|
}
|
2021-07-30 17:41:03 -07:00
|
|
|
return getSnapshotUnions(urns, aspectNames).entrySet()
|
|
|
|
.stream()
|
2021-06-03 13:24:33 -07:00
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, entry -> toEntity(entry.getValue())));
|
|
|
|
}
|
|
|
|
|
2021-06-30 16:49:02 -07:00
|
|
|
/**
|
|
|
|
* Produce metadata audit event and push.
|
|
|
|
*
|
|
|
|
* @param urn Urn to push
|
|
|
|
* @param oldAspectValue Value of aspect before the update.
|
|
|
|
* @param newAspectValue Value of aspect after the update
|
|
|
|
*/
|
2021-07-30 17:41:03 -07:00
|
|
|
public void produceMetadataAuditEvent(@Nonnull final Urn urn, @Nullable final RecordTemplate oldAspectValue,
|
|
|
|
@Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata,
|
|
|
|
@Nullable final SystemMetadata newSystemMetadata, @Nullable final MetadataAuditOperation operation) {
|
2021-06-30 16:49:02 -07:00
|
|
|
|
|
|
|
final Snapshot newSnapshot = buildSnapshot(urn, newAspectValue);
|
|
|
|
Snapshot oldSnapshot = null;
|
|
|
|
if (oldAspectValue != null) {
|
|
|
|
oldSnapshot = buildSnapshot(urn, oldAspectValue);
|
|
|
|
}
|
|
|
|
|
2021-07-29 20:04:40 -07:00
|
|
|
_producer.produceMetadataAuditEvent(urn, oldSnapshot, newSnapshot, oldSystemMetadata, newSystemMetadata, operation);
|
2021-06-30 16:49:02 -07:00
|
|
|
}
|
|
|
|
|
2021-07-30 17:41:03 -07:00
|
|
|
/**
|
|
|
|
* Produces a {@link com.linkedin.mxe.MetadataChangeLog} from a
|
|
|
|
* new & previous aspect.
|
|
|
|
*
|
|
|
|
* @param urn the urn associated with the entity changed
|
|
|
|
* @param aspectSpec AspectSpec of the aspect being updated
|
|
|
|
* @param metadataChangeLog metadata change log to push into MCL kafka topic
|
|
|
|
*/
|
|
|
|
public void produceMetadataChangeLog(@Nonnull final Urn urn, AspectSpec aspectSpec,
|
|
|
|
@Nonnull final MetadataChangeLog metadataChangeLog) {
|
|
|
|
_producer.produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog);
|
|
|
|
}
|
|
|
|
|
2021-11-08 16:22:24 -08:00
|
|
|
public void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull String entityName, @Nonnull String aspectName,
|
|
|
|
@Nonnull final AspectSpec aspectSpec, @Nullable final RecordTemplate oldAspectValue,
|
|
|
|
@Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata,
|
|
|
|
@Nullable final SystemMetadata newSystemMetadata, @Nonnull final ChangeType changeType) {
|
|
|
|
final MetadataChangeLog metadataChangeLog = new MetadataChangeLog();
|
|
|
|
metadataChangeLog.setEntityType(entityName);
|
|
|
|
metadataChangeLog.setEntityUrn(urn);
|
|
|
|
metadataChangeLog.setChangeType(changeType);
|
|
|
|
metadataChangeLog.setAspectName(aspectName);
|
|
|
|
if (newAspectValue != null) {
|
|
|
|
metadataChangeLog.setAspect(GenericAspectUtils.serializeAspect(newAspectValue));
|
|
|
|
}
|
|
|
|
if (newSystemMetadata != null) {
|
|
|
|
metadataChangeLog.setSystemMetadata(newSystemMetadata);
|
|
|
|
}
|
|
|
|
if (oldAspectValue != null) {
|
|
|
|
metadataChangeLog.setPreviousAspectValue(GenericAspectUtils.serializeAspect(oldAspectValue));
|
|
|
|
}
|
|
|
|
if (oldSystemMetadata != null) {
|
|
|
|
metadataChangeLog.setPreviousSystemMetadata(oldSystemMetadata);
|
|
|
|
}
|
|
|
|
produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog);
|
|
|
|
}
|
|
|
|
|
2021-07-30 17:41:03 -07:00
|
|
|
public void produceMetadataAuditEventForKey(@Nonnull final Urn urn,
|
|
|
|
@Nullable final SystemMetadata newSystemMetadata) {
|
2021-07-29 20:04:40 -07:00
|
|
|
|
|
|
|
final Snapshot newSnapshot = buildKeySnapshot(urn);
|
|
|
|
|
2021-07-30 17:41:03 -07:00
|
|
|
_producer.produceMetadataAuditEvent(urn, null, newSnapshot, null, newSystemMetadata, MetadataAuditOperation.UPDATE);
|
2021-07-29 20:04:40 -07:00
|
|
|
}
|
|
|
|
|
2021-06-03 13:24:33 -07:00
|
|
|
public RecordTemplate getLatestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName) {
|
2021-06-25 10:56:45 -07:00
|
|
|
log.debug(String.format("Invoked getLatestAspect with urn %s, aspect %s", urn, aspectName));
|
2021-09-02 19:05:13 -07:00
|
|
|
return getAspect(urn, aspectName, ASPECT_LATEST_VERSION);
|
2021-06-03 13:24:33 -07:00
|
|
|
}
|
|
|
|
|
2021-07-30 17:41:03 -07:00
|
|
|
public void ingestEntities(@Nonnull final List<Entity> entities, @Nonnull final AuditStamp auditStamp,
|
2021-07-29 20:04:40 -07:00
|
|
|
@Nonnull final List<SystemMetadata> systemMetadata) {
|
2021-06-25 10:56:45 -07:00
|
|
|
log.debug(String.format("Invoked ingestEntities with entities %s, audit stamp %s", entities, auditStamp));
|
2021-07-30 17:41:03 -07:00
|
|
|
Streams.zip(entities.stream(), systemMetadata.stream(), (a, b) -> new Pair<Entity, SystemMetadata>(a, b))
|
|
|
|
.forEach(pair -> ingestEntity(pair.getFirst(), auditStamp, pair.getSecond()));
|
2021-07-29 20:04:40 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
public void ingestEntity(Entity entity, AuditStamp auditStamp) {
|
|
|
|
SystemMetadata generatedSystemMetadata = new SystemMetadata();
|
|
|
|
generatedSystemMetadata.setRunId(DEFAULT_RUN_ID);
|
|
|
|
generatedSystemMetadata.setLastObserved(System.currentTimeMillis());
|
|
|
|
|
|
|
|
ingestEntity(entity, auditStamp, generatedSystemMetadata);
|
2021-06-03 13:24:33 -07:00
|
|
|
}
|
|
|
|
|
2021-07-30 17:41:03 -07:00
|
|
|
public void ingestEntity(@Nonnull Entity entity, @Nonnull AuditStamp auditStamp,
|
|
|
|
@Nonnull SystemMetadata systemMetadata) {
|
|
|
|
log.debug(String.format("Invoked ingestEntity with entity %s, audit stamp %s systemMetadata %s", entity, auditStamp,
|
|
|
|
systemMetadata.toString()));
|
2021-07-29 20:04:40 -07:00
|
|
|
ingestSnapshotUnion(entity.getValue(), auditStamp, systemMetadata);
|
2021-06-03 13:24:33 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Nonnull
|
|
|
|
protected Map<Urn, Snapshot> getSnapshotUnions(@Nonnull final Set<Urn> urns, @Nonnull final Set<String> aspectNames) {
|
|
|
|
return getSnapshotRecords(urns, aspectNames).entrySet()
|
|
|
|
.stream()
|
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, entry -> toSnapshotUnion(entry.getValue())));
|
|
|
|
}
|
|
|
|
|
|
|
|
@Nonnull
|
2021-07-30 17:41:03 -07:00
|
|
|
protected Map<Urn, RecordTemplate> getSnapshotRecords(@Nonnull final Set<Urn> urns,
|
|
|
|
@Nonnull final Set<String> aspectNames) {
|
2021-06-03 13:24:33 -07:00
|
|
|
return getLatestAspectUnions(urns, aspectNames).entrySet()
|
|
|
|
.stream()
|
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, entry -> toSnapshotRecord(entry.getKey(), entry.getValue())));
|
|
|
|
}
|
|
|
|
|
|
|
|
@Nonnull
|
2022-01-04 00:19:46 +09:00
|
|
|
protected Map<Urn, List<UnionTemplate>> getLatestAspectUnions(
|
|
|
|
@Nonnull final Set<Urn> urns,
|
2021-07-30 17:41:03 -07:00
|
|
|
@Nonnull final Set<String> aspectNames) {
|
|
|
|
return getLatestAspects(urns, aspectNames).entrySet()
|
|
|
|
.stream()
|
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue()
|
|
|
|
.stream()
|
|
|
|
.map(aspectRecord -> toAspectUnion(entry.getKey(), aspectRecord))
|
|
|
|
.collect(Collectors.toList())));
|
2021-06-03 13:24:33 -07:00
|
|
|
}
|
|
|
|
|
2021-11-08 16:22:24 -08:00
|
|
|
public List<Pair<String, RecordTemplate>> generateDefaultAspectsIfMissing(@Nonnull final Urn urn,
|
|
|
|
Set<String> includedAspects) {
|
2021-10-21 11:15:10 -07:00
|
|
|
|
|
|
|
List<Pair<String, RecordTemplate>> aspects = new ArrayList<>();
|
2021-10-07 11:41:29 -07:00
|
|
|
final String keyAspectName = getKeyAspectName(urn);
|
|
|
|
RecordTemplate keyAspect = getLatestAspect(urn, keyAspectName);
|
|
|
|
if (keyAspect == null) {
|
|
|
|
keyAspect = buildKeyAspect(urn);
|
2021-10-21 11:15:10 -07:00
|
|
|
aspects.add(Pair.of(keyAspectName, keyAspect));
|
2021-10-07 11:41:29 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
String entityType = urnToEntityName(urn);
|
|
|
|
if (_entityRegistry.getEntitySpec(entityType).getAspectSpecMap().containsKey(BROWSE_PATHS)
|
2021-10-21 11:15:10 -07:00
|
|
|
&& getLatestAspect(urn, BROWSE_PATHS) == null && !includedAspects.contains(BROWSE_PATHS)) {
|
2021-10-07 11:41:29 -07:00
|
|
|
try {
|
2021-10-31 22:06:36 -07:00
|
|
|
BrowsePaths generatedBrowsePath = BrowsePathUtils.buildBrowsePath(urn, getEntityRegistry());
|
2021-10-21 11:15:10 -07:00
|
|
|
if (generatedBrowsePath != null) {
|
|
|
|
aspects.add(Pair.of(BROWSE_PATHS, generatedBrowsePath));
|
|
|
|
}
|
2021-10-07 11:41:29 -07:00
|
|
|
} catch (URISyntaxException e) {
|
|
|
|
log.error("Failed to parse urn: {}", urn);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (_entityRegistry.getEntitySpec(entityType).getAspectSpecMap().containsKey(DATA_PLATFORM_INSTANCE)
|
2021-10-21 11:15:10 -07:00
|
|
|
&& getLatestAspect(urn, DATA_PLATFORM_INSTANCE) == null && !includedAspects.contains(DATA_PLATFORM_INSTANCE)) {
|
2021-10-07 11:41:29 -07:00
|
|
|
DataPlatformInstanceUtils.buildDataPlatformInstance(entityType, keyAspect)
|
2021-10-21 11:15:10 -07:00
|
|
|
.ifPresent(aspect -> aspects.add(Pair.of(DATA_PLATFORM_INSTANCE, aspect)));
|
2021-10-07 11:41:29 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
return aspects;
|
|
|
|
}
|
|
|
|
|
2021-11-08 16:22:24 -08:00
|
|
|
private void ingestSnapshotUnion(@Nonnull final Snapshot snapshotUnion, @Nonnull final AuditStamp auditStamp,
|
2021-07-30 17:41:03 -07:00
|
|
|
SystemMetadata systemMetadata) {
|
2021-06-03 13:24:33 -07:00
|
|
|
final RecordTemplate snapshotRecord = RecordUtils.getSelectedRecordTemplateFromUnion(snapshotUnion);
|
|
|
|
final Urn urn = com.linkedin.metadata.dao.utils.ModelUtils.getUrnFromSnapshot(snapshotRecord);
|
2021-10-21 11:15:10 -07:00
|
|
|
final List<Pair<String, RecordTemplate>> aspectRecordsToIngest =
|
|
|
|
NewModelUtils.getAspectsFromSnapshot(snapshotRecord);
|
2021-06-03 13:24:33 -07:00
|
|
|
|
2021-10-07 11:41:29 -07:00
|
|
|
log.info("INGEST urn {} with system metadata {}", urn.toString(), systemMetadata.toString());
|
2021-11-08 16:22:24 -08:00
|
|
|
aspectRecordsToIngest.addAll(generateDefaultAspectsIfMissing(urn,
|
|
|
|
aspectRecordsToIngest.stream().map(pair -> pair.getFirst()).collect(Collectors.toSet())));
|
2021-07-29 20:04:40 -07:00
|
|
|
|
2021-10-21 11:15:10 -07:00
|
|
|
aspectRecordsToIngest.forEach(aspectNamePair -> {
|
|
|
|
ingestAspect(urn, aspectNamePair.getFirst(), aspectNamePair.getSecond(), auditStamp, systemMetadata);
|
2021-06-03 13:24:33 -07:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2021-09-13 09:16:37 -07:00
|
|
|
public Snapshot buildSnapshot(@Nonnull final Urn urn, @Nonnull final RecordTemplate aspectValue) {
|
2021-07-29 20:04:40 -07:00
|
|
|
// if the aspect value is the key, we do not need to include the key a second time
|
|
|
|
if (PegasusUtils.getAspectNameFromSchema(aspectValue.schema()).equals(getKeyAspectName(urn))) {
|
2021-07-30 17:41:03 -07:00
|
|
|
return toSnapshotUnion(toSnapshotRecord(urn, ImmutableList.of(toAspectUnion(urn, aspectValue))));
|
2021-07-29 20:04:40 -07:00
|
|
|
}
|
|
|
|
|
2021-06-03 13:24:33 -07:00
|
|
|
final RecordTemplate keyAspectValue = buildKeyAspect(urn);
|
|
|
|
return toSnapshotUnion(
|
2021-07-30 17:41:03 -07:00
|
|
|
toSnapshotRecord(urn, ImmutableList.of(toAspectUnion(urn, keyAspectValue), toAspectUnion(urn, aspectValue))));
|
2021-06-03 13:24:33 -07:00
|
|
|
}
|
|
|
|
|
2021-07-29 20:04:40 -07:00
|
|
|
protected Snapshot buildKeySnapshot(@Nonnull final Urn urn) {
|
|
|
|
final RecordTemplate keyAspectValue = buildKeyAspect(urn);
|
2021-07-30 17:41:03 -07:00
|
|
|
return toSnapshotUnion(toSnapshotRecord(urn, ImmutableList.of(toAspectUnion(urn, keyAspectValue))));
|
2021-07-29 20:04:40 -07:00
|
|
|
}
|
|
|
|
|
2021-06-03 13:24:33 -07:00
|
|
|
protected RecordTemplate buildKeyAspect(@Nonnull final Urn urn) {
|
|
|
|
final EntitySpec spec = _entityRegistry.getEntitySpec(urnToEntityName(urn));
|
|
|
|
final AspectSpec keySpec = spec.getKeyAspectSpec();
|
|
|
|
final RecordDataSchema keySchema = keySpec.getPegasusSchema();
|
|
|
|
return EntityKeyUtils.convertUrnToEntityKey(urn, keySchema);
|
|
|
|
}
|
|
|
|
|
2021-09-02 19:05:13 -07:00
|
|
|
public AspectSpec getKeyAspectSpec(@Nonnull final Urn urn) {
|
|
|
|
return getKeyAspectSpec(urnToEntityName(urn));
|
|
|
|
}
|
|
|
|
|
|
|
|
public AspectSpec getKeyAspectSpec(@Nonnull final String entityName) {
|
|
|
|
final EntitySpec spec = _entityRegistry.getEntitySpec(entityName);
|
|
|
|
return spec.getKeyAspectSpec();
|
|
|
|
}
|
|
|
|
|
2021-11-08 16:22:24 -08:00
|
|
|
public Optional<AspectSpec> getAspectSpec(@Nonnull final String entityName, @Nonnull final String aspectName) {
|
|
|
|
final EntitySpec entitySpec = _entityRegistry.getEntitySpec(entityName);
|
|
|
|
return Optional.ofNullable(entitySpec.getAspectSpec(aspectName));
|
|
|
|
}
|
|
|
|
|
2021-07-29 20:04:40 -07:00
|
|
|
public String getKeyAspectName(@Nonnull final Urn urn) {
|
|
|
|
final EntitySpec spec = _entityRegistry.getEntitySpec(urnToEntityName(urn));
|
|
|
|
final AspectSpec keySpec = spec.getKeyAspectSpec();
|
|
|
|
return keySpec.getName();
|
|
|
|
}
|
|
|
|
|
2021-06-03 13:24:33 -07:00
|
|
|
protected Entity toEntity(@Nonnull final Snapshot snapshot) {
|
|
|
|
return new Entity().setValue(snapshot);
|
|
|
|
}
|
|
|
|
|
|
|
|
protected Snapshot toSnapshotUnion(@Nonnull final RecordTemplate snapshotRecord) {
|
|
|
|
final Snapshot snapshot = new Snapshot();
|
2021-07-30 17:41:03 -07:00
|
|
|
RecordUtils.setSelectedRecordTemplateInUnion(snapshot, snapshotRecord);
|
2021-06-03 13:24:33 -07:00
|
|
|
return snapshot;
|
|
|
|
}
|
|
|
|
|
2021-07-30 17:41:03 -07:00
|
|
|
protected RecordTemplate toSnapshotRecord(@Nonnull final Urn urn,
|
2021-06-03 13:24:33 -07:00
|
|
|
@Nonnull final List<UnionTemplate> aspectUnionTemplates) {
|
|
|
|
final String entityName = urnToEntityName(urn);
|
|
|
|
final EntitySpec entitySpec = _entityRegistry.getEntitySpec(entityName);
|
|
|
|
return com.linkedin.metadata.dao.utils.ModelUtils.newSnapshot(
|
2021-07-30 17:41:03 -07:00
|
|
|
getDataTemplateClassFromSchema(entitySpec.getSnapshotSchema(), RecordTemplate.class), urn,
|
2021-06-03 13:24:33 -07:00
|
|
|
aspectUnionTemplates);
|
|
|
|
}
|
|
|
|
|
2021-07-30 17:41:03 -07:00
|
|
|
protected UnionTemplate toAspectUnion(@Nonnull final Urn urn, @Nonnull final RecordTemplate aspectRecord) {
|
2021-06-03 13:24:33 -07:00
|
|
|
final EntitySpec entitySpec = _entityRegistry.getEntitySpec(urnToEntityName(urn));
|
2021-07-30 17:41:03 -07:00
|
|
|
final TyperefDataSchema aspectSchema = entitySpec.getAspectTyperefSchema();
|
|
|
|
if (aspectSchema == null) {
|
|
|
|
throw new RuntimeException(
|
|
|
|
String.format("Aspect schema for %s is null: v4 operation is not supported on this entity registry",
|
|
|
|
entitySpec.getName()));
|
|
|
|
}
|
2021-06-03 13:24:33 -07:00
|
|
|
return com.linkedin.metadata.dao.utils.ModelUtils.newAspectUnion(
|
2021-07-30 17:41:03 -07:00
|
|
|
getDataTemplateClassFromSchema(entitySpec.getAspectTyperefSchema(), UnionTemplate.class), aspectRecord);
|
2021-06-03 13:24:33 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
protected Urn toUrn(final String urnStr) {
|
|
|
|
try {
|
|
|
|
return Urn.createFromString(urnStr);
|
|
|
|
} catch (URISyntaxException e) {
|
2021-06-25 10:56:45 -07:00
|
|
|
log.error(String.format("Failed to convert urn string %s into Urn object", urnStr));
|
2021-06-03 13:24:33 -07:00
|
|
|
throw new ModelConversionException(String.format("Failed to convert urn string %s into Urn object ", urnStr), e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-04 00:19:46 +09:00
|
|
|
private EntityResponse toEntityResponse(final Urn urn, final List<EnvelopedAspect> envelopedAspects) {
|
|
|
|
final EntityResponse response = new EntityResponse();
|
|
|
|
response.setUrn(urn);
|
|
|
|
response.setEntityName(urnToEntityName(urn));
|
|
|
|
response.setAspects(new EnvelopedAspectMap(
|
|
|
|
envelopedAspects.stream().collect(Collectors.toMap(EnvelopedAspect::getName, aspect -> aspect))
|
|
|
|
));
|
|
|
|
return response;
|
|
|
|
}
|
|
|
|
|
2021-06-03 13:24:33 -07:00
|
|
|
private Map<String, Set<String>> buildEntityToValidAspects(final EntityRegistry entityRegistry) {
|
|
|
|
return entityRegistry.getEntitySpecs()
|
2021-07-30 17:41:03 -07:00
|
|
|
.values()
|
2021-06-03 13:24:33 -07:00
|
|
|
.stream()
|
|
|
|
.collect(Collectors.toMap(EntitySpec::getName,
|
2021-07-30 17:41:03 -07:00
|
|
|
entry -> entry.getAspectSpecs().stream().map(AspectSpec::getName).collect(Collectors.toSet())));
|
2021-06-03 13:24:33 -07:00
|
|
|
}
|
|
|
|
|
2021-12-14 11:18:02 +09:00
|
|
|
public Boolean getAlwaysEmitAuditEvent() {
|
|
|
|
return _alwaysEmitAuditEvent;
|
2021-06-03 13:24:33 -07:00
|
|
|
}
|
|
|
|
|
2021-12-14 11:18:02 +09:00
|
|
|
public void setAlwaysEmitAuditEvent(Boolean alwaysEmitAuditEvent) {
|
|
|
|
_alwaysEmitAuditEvent = alwaysEmitAuditEvent;
|
2021-07-30 17:41:03 -07:00
|
|
|
}
|
|
|
|
|
2021-09-02 19:05:13 -07:00
|
|
|
public EntityRegistry getEntityRegistry() {
|
2021-06-03 13:24:33 -07:00
|
|
|
return _entityRegistry;
|
|
|
|
}
|
|
|
|
|
|
|
|
protected Set<String> getEntityAspectNames(final Urn entityUrn) {
|
|
|
|
return getEntityAspectNames(urnToEntityName(entityUrn));
|
|
|
|
}
|
|
|
|
|
2022-01-04 00:19:46 +09:00
|
|
|
public Set<String> getEntityAspectNames(final String entityName) {
|
2021-06-03 13:24:33 -07:00
|
|
|
return _entityToValidAspects.get(entityName);
|
|
|
|
}
|
|
|
|
|
2021-06-30 16:49:02 -07:00
|
|
|
public abstract void setWritable(boolean canWrite);
|
2021-07-29 20:04:40 -07:00
|
|
|
|
2021-11-28 21:06:27 -08:00
|
|
|
public RollbackRunResult rollbackRun(List<AspectRowSummary> aspectRows, String runId) {
|
|
|
|
return rollbackWithConditions(aspectRows, Collections.singletonMap("runId", runId));
|
|
|
|
}
|
|
|
|
|
|
|
|
public abstract RollbackRunResult rollbackWithConditions(List<AspectRowSummary> aspectRows,
|
|
|
|
Map<String, String> conditions);
|
2021-07-29 20:04:40 -07:00
|
|
|
|
|
|
|
public abstract RollbackRunResult deleteUrn(Urn urn);
|
2021-09-20 11:58:31 -07:00
|
|
|
|
|
|
|
public abstract Boolean exists(Urn urn);
|
2021-12-14 11:18:02 +09:00
|
|
|
|
|
|
|
@Value
|
|
|
|
public static class UpdateAspectResult {
|
|
|
|
Urn urn;
|
|
|
|
RecordTemplate oldValue;
|
|
|
|
RecordTemplate newValue;
|
|
|
|
SystemMetadata oldSystemMetadata;
|
|
|
|
SystemMetadata newSystemMetadata;
|
|
|
|
MetadataAuditOperation operation;
|
|
|
|
long maxVersion;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Value
|
|
|
|
public static class IngestProposalResult {
|
|
|
|
Urn urn;
|
|
|
|
boolean didUpdate;
|
|
|
|
}
|
2021-06-03 13:24:33 -07:00
|
|
|
}
|