feat(deletes): support deletion of non-snapshot aspects (#3518)

Co-authored-by: Dexter Lee <dexter@acryl.io>
This commit is contained in:
Gabe Lyons 2021-11-08 16:22:24 -08:00 committed by GitHub
parent c7d3f8b930
commit b87efa43b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 198 additions and 146 deletions

View File

@ -14,8 +14,6 @@ import com.linkedin.metadata.entity.ebean.EbeanUtils;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.utils.GenericAspectUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.SystemMetadata;
import io.ebean.EbeanServer;
import io.ebean.PagedList;
@ -108,16 +106,9 @@ public class SendMAEStep implements UpgradeStep {
SystemMetadata latestSystemMetadata = EbeanUtils.parseSystemMetadata(aspect.getSystemMetadata());
final MetadataChangeLog metadataChangeLog = new MetadataChangeLog();
metadataChangeLog.setEntityType(entityName);
metadataChangeLog.setEntityUrn(urn);
metadataChangeLog.setChangeType(ChangeType.UPSERT);
metadataChangeLog.setAspectName(aspectName);
metadataChangeLog.setAspect(GenericAspectUtils.serializeAspect(aspectRecord));
metadataChangeLog.setSystemMetadata(latestSystemMetadata);
// 5. Produce MAE events for the aspect record
_entityService.produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog);
_entityService.produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, null, aspectRecord, null,
latestSystemMetadata, ChangeType.UPSERT);
totalRowsMigrated++;
}

View File

@ -10,6 +10,7 @@ import com.linkedin.data.schema.TyperefDataSchema;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.entity.Entity;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.VersionedAspect;
import com.linkedin.metadata.dao.exception.ModelConversionException;
import com.linkedin.metadata.dao.utils.RecordUtils;
@ -23,6 +24,7 @@ import com.linkedin.metadata.search.utils.BrowsePathUtils;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.metadata.utils.DataPlatformInstanceUtils;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericAspectUtils;
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.MetadataChangeLog;
@ -34,6 +36,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
@ -250,6 +253,30 @@ public abstract class EntityService {
_producer.produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog);
}
public void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull String entityName, @Nonnull String aspectName,
@Nonnull final AspectSpec aspectSpec, @Nullable final RecordTemplate oldAspectValue,
@Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata,
@Nullable final SystemMetadata newSystemMetadata, @Nonnull final ChangeType changeType) {
final MetadataChangeLog metadataChangeLog = new MetadataChangeLog();
metadataChangeLog.setEntityType(entityName);
metadataChangeLog.setEntityUrn(urn);
metadataChangeLog.setChangeType(changeType);
metadataChangeLog.setAspectName(aspectName);
if (newAspectValue != null) {
metadataChangeLog.setAspect(GenericAspectUtils.serializeAspect(newAspectValue));
}
if (newSystemMetadata != null) {
metadataChangeLog.setSystemMetadata(newSystemMetadata);
}
if (oldAspectValue != null) {
metadataChangeLog.setPreviousAspectValue(GenericAspectUtils.serializeAspect(oldAspectValue));
}
if (oldSystemMetadata != null) {
metadataChangeLog.setPreviousSystemMetadata(oldSystemMetadata);
}
produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog);
}
public void produceMetadataAuditEventForKey(@Nonnull final Urn urn,
@Nullable final SystemMetadata newSystemMetadata) {
@ -311,7 +338,8 @@ public abstract class EntityService {
.collect(Collectors.toList())));
}
public List<Pair<String, RecordTemplate>> generateDefaultAspectsIfMissing(@Nonnull final Urn urn, Set<String> includedAspects) {
public List<Pair<String, RecordTemplate>> generateDefaultAspectsIfMissing(@Nonnull final Urn urn,
Set<String> includedAspects) {
List<Pair<String, RecordTemplate>> aspects = new ArrayList<>();
final String keyAspectName = getKeyAspectName(urn);
@ -343,9 +371,7 @@ public abstract class EntityService {
return aspects;
}
private void ingestSnapshotUnion(
@Nonnull final Snapshot snapshotUnion,
@Nonnull final AuditStamp auditStamp,
private void ingestSnapshotUnion(@Nonnull final Snapshot snapshotUnion, @Nonnull final AuditStamp auditStamp,
SystemMetadata systemMetadata) {
final RecordTemplate snapshotRecord = RecordUtils.getSelectedRecordTemplateFromUnion(snapshotUnion);
final Urn urn = com.linkedin.metadata.dao.utils.ModelUtils.getUrnFromSnapshot(snapshotRecord);
@ -353,10 +379,8 @@ public abstract class EntityService {
NewModelUtils.getAspectsFromSnapshot(snapshotRecord);
log.info("INGEST urn {} with system metadata {}", urn.toString(), systemMetadata.toString());
aspectRecordsToIngest.addAll(generateDefaultAspectsIfMissing(
urn,
aspectRecordsToIngest.stream().map(pair -> pair.getFirst()).collect(Collectors.toSet())
));
aspectRecordsToIngest.addAll(generateDefaultAspectsIfMissing(urn,
aspectRecordsToIngest.stream().map(pair -> pair.getFirst()).collect(Collectors.toSet())));
aspectRecordsToIngest.forEach(aspectNamePair -> {
ingestAspect(urn, aspectNamePair.getFirst(), aspectNamePair.getSecond(), auditStamp, systemMetadata);
@ -395,6 +419,11 @@ public abstract class EntityService {
return spec.getKeyAspectSpec();
}
public Optional<AspectSpec> getAspectSpec(@Nonnull final String entityName, @Nonnull final String aspectName) {
final EntitySpec entitySpec = _entityRegistry.getEntitySpec(entityName);
return Optional.ofNullable(entitySpec.getAspectSpec(aspectName));
}
public String getKeyAspectName(@Nonnull final Urn urn) {
final EntitySpec spec = _entityRegistry.getEntitySpec(urnToEntityName(urn));
final AspectSpec keySpec = spec.getKeyAspectSpec();

View File

@ -1,8 +1,9 @@
package com.linkedin.metadata.entity;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.mxe.SystemMetadata;
import lombok.Value;
@ -10,11 +11,13 @@ import lombok.Value;
@Value
public class RollbackResult {
public Urn urn;
public String entityName;
public String aspectName;
public RecordTemplate oldValue;
public RecordTemplate newValue;
public SystemMetadata oldSystemMetadata;
public SystemMetadata newSystemMetadata;
public MetadataAuditOperation operation;
public ChangeType changeType;
public Boolean keyAffected;
public Integer additionalRowsAffected;
}

View File

@ -19,6 +19,7 @@ import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ListResult;
import com.linkedin.metadata.entity.RollbackResult;
import com.linkedin.metadata.entity.RollbackRunResult;
import com.linkedin.metadata.entity.ValidationUtils;
import com.linkedin.metadata.event.EntityEventProducer;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
@ -28,7 +29,6 @@ import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericAspectUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.metadata.entity.ValidationUtils;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.MetadataChangeProposal;
@ -106,9 +106,8 @@ public class EbeanEntityService extends EntityService {
});
Map<EbeanAspectV2.PrimaryKey, EbeanAspectV2> batchGetResults = new HashMap<>();
Iterators.partition(dbKeys.iterator(), 500).forEachRemaining(
batch -> batchGetResults.putAll(_entityDao.batchGet(ImmutableSet.copyOf(batch)))
);
Iterators.partition(dbKeys.iterator(), 500)
.forEachRemaining(batch -> batchGetResults.putAll(_entityDao.batchGet(ImmutableSet.copyOf(batch))));
batchGetResults.forEach((key, aspectEntry) -> {
final Urn urn = toUrn(key.getUrn());
@ -337,20 +336,8 @@ public class EbeanEntityService extends EntityService {
if (emitMae) {
log.debug(String.format("Producing MetadataAuditEvent for updated aspect %s, urn %s", aspectName, urn));
final MetadataChangeLog metadataChangeLog = new MetadataChangeLog();
metadataChangeLog.setEntityType(entityName);
metadataChangeLog.setEntityUrn(urn);
metadataChangeLog.setChangeType(ChangeType.UPSERT);
metadataChangeLog.setAspectName(aspectName);
metadataChangeLog.setAspect(GenericAspectUtils.serializeAspect(newValue));
metadataChangeLog.setSystemMetadata(result.newSystemMetadata);
if (oldValue != null) {
metadataChangeLog.setPreviousAspectValue(GenericAspectUtils.serializeAspect(oldValue));
}
if (result.oldSystemMetadata != null) {
metadataChangeLog.setPreviousSystemMetadata(result.oldSystemMetadata);
}
produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog);
produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, oldValue, newValue, result.oldSystemMetadata,
result.newSystemMetadata, ChangeType.UPSERT);
} else {
log.debug(String.format("Skipped producing MetadataAuditEvent for updated aspect %s, urn %s. emitMAE is false.",
aspectName, urn));
@ -532,11 +519,11 @@ public class EbeanEntityService extends EntityService {
: toAspectRecord(Urn.createFromString(previousAspect.getKey().getUrn()),
previousAspect.getKey().getAspect(), previousMetadata, getEntityRegistry());
return new RollbackResult(Urn.createFromString(urn), latestValue,
final Urn urnObj = Urn.createFromString(urn);
return new RollbackResult(urnObj, urnObj.getEntityType(), latest.getAspect(), latestValue,
previousValue == null ? latestValue : previousValue, latestSystemMetadata,
previousValue == null ? null : parseSystemMetadata(previousAspect.getSystemMetadata()),
previousAspect == null ? MetadataAuditOperation.DELETE : MetadataAuditOperation.UPDATE, isKeyAspect,
additionalRowsDeleted);
previousAspect == null ? ChangeType.DELETE : ChangeType.UPSERT, isKeyAspect, additionalRowsDeleted);
} catch (URISyntaxException e) {
e.printStackTrace();
}
@ -555,12 +542,18 @@ public class EbeanEntityService extends EntityService {
aspectRows.forEach(aspectToRemove -> {
RollbackResult result = deleteAspect(aspectToRemove.getUrn(), aspectToRemove.getAspectName(), runId);
if (result != null) {
Optional<AspectSpec> aspectSpec = getAspectSpec(result.entityName, result.aspectName);
if (!aspectSpec.isPresent()) {
log.error("Issue while rolling back: unknown aspect {} for entity {}", result.entityName, result.aspectName);
return;
}
rowsDeletedFromEntityDeletion.addAndGet(result.additionalRowsAffected);
removedAspects.add(aspectToRemove);
produceMetadataAuditEvent(result.getUrn(), result.getOldValue(), result.getNewValue(),
result.getOldSystemMetadata(), result.getNewSystemMetadata(), result.getOperation());
produceMetadataChangeLog(result.getUrn(), result.getEntityName(), result.getAspectName(), aspectSpec.get(),
result.getOldValue(), result.getNewValue(), result.getOldSystemMetadata(), result.getNewSystemMetadata(),
result.getChangeType());
}
});
@ -581,8 +574,12 @@ public class EbeanEntityService extends EntityService {
SystemMetadata latestKeySystemMetadata = parseSystemMetadata(latestKey.getSystemMetadata());
RollbackResult result = deleteAspect(urn.toString(), keyAspectName, latestKeySystemMetadata.getRunId());
Optional<AspectSpec> aspectSpec = getAspectSpec(result.entityName, result.aspectName);
if (!aspectSpec.isPresent()) {
log.error("Issue while rolling back: unknown aspect {} for entity {}", result.entityName, result.aspectName);
}
if (result != null) {
if (result != null && aspectSpec.isPresent()) {
AspectRowSummary summary = new AspectRowSummary();
summary.setUrn(urn.toString());
summary.setKeyAspect(true);
@ -592,8 +589,9 @@ public class EbeanEntityService extends EntityService {
rowsDeletedFromEntityDeletion = result.additionalRowsAffected;
removedAspects.add(summary);
produceMetadataAuditEvent(result.getUrn(), result.getOldValue(), result.getNewValue(),
result.getOldSystemMetadata(), result.getNewSystemMetadata(), result.getOperation());
produceMetadataChangeLog(result.getUrn(), result.getEntityName(), result.getAspectName(), aspectSpec.get(),
result.getOldValue(), result.getNewValue(), result.getOldSystemMetadata(), result.getNewSystemMetadata(),
result.getChangeType());
}
return new RollbackRunResult(removedAspects, rowsDeletedFromEntityDeletion);

View File

@ -44,7 +44,12 @@ public class SearchDocumentTransformer {
return Optional.of(searchDocument.toString());
}
public static Optional<String> transformAspect(final Urn urn, final RecordTemplate aspect, final AspectSpec aspectSpec) {
public static Optional<String> transformAspect(
final Urn urn,
final RecordTemplate aspect,
final AspectSpec aspectSpec,
final Boolean forDelete
) {
final Map<SearchableFieldSpec, List<Object>> extractedFields =
FieldExtractor.extractFields(aspect, aspectSpec.getSearchableFieldSpecs());
if (extractedFields.isEmpty()) {
@ -52,7 +57,7 @@ public class SearchDocumentTransformer {
}
final ObjectNode searchDocument = JsonNodeFactory.instance.objectNode();
searchDocument.put("urn", urn.toString());
extractedFields.forEach((key, value) -> setValue(key, value, searchDocument, false));
extractedFields.forEach((key, value) -> setValue(key, value, searchDocument, forDelete));
return Optional.of(searchDocument.toString());
}

View File

@ -28,7 +28,6 @@ import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataAuditEvent;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.mxe.Topics;
import java.io.UnsupportedEncodingException;
@ -90,78 +89,25 @@ public class MetadataAuditEventsProcessor {
try {
final MetadataAuditEvent event = EventUtils.avroToPegasusMAE(record);
final MetadataAuditOperation operation =
event.hasOperation() ? event.getOperation() : MetadataAuditOperation.UPDATE;
if (operation.equals(MetadataAuditOperation.DELETE)) {
// in this case, we deleted an entity and want to de-index the previous value
// 1. verify an old snapshot is present-- if not, we cannot process the delete
if (!event.hasOldSnapshot()) {
return;
}
final RecordTemplate snapshot = RecordUtils.getSelectedRecordTemplateFromUnion(event.getOldSnapshot());
log.info("deleting {}", snapshot);
final EntitySpec entitySpec =
SnapshotEntityRegistry.getInstance().getEntitySpec(PegasusUtils.getEntityNameFromSchema(snapshot.schema()));
final Map<String, DataElement> aspectsToUpdate = AspectExtractor.extractAspects(snapshot);
boolean deleteEntity =
aspectsToUpdate.containsKey(entitySpec.getKeyAspectName()) && aspectsToUpdate.keySet().size() == 1;
updateSearchService(snapshot, entitySpec, true, deleteEntity);
updateGraphService(snapshot, entitySpec, true, deleteEntity);
updateSystemMetadata(RecordUtils.getSelectedRecordTemplateFromUnion(event.getOldSnapshot()),
event.hasNewSnapshot() ? RecordUtils.getSelectedRecordTemplateFromUnion(event.getNewSnapshot()) : null,
event.hasNewSystemMetadata() ? event.getNewSystemMetadata() : null, operation, entitySpec);
return;
}
final RecordTemplate snapshot = RecordUtils.getSelectedRecordTemplateFromUnion(event.getNewSnapshot());
RecordTemplate oldSnapshot = null;
if (event.hasOldSnapshot()) {
oldSnapshot = RecordUtils.getSelectedRecordTemplateFromUnion(event.getOldSnapshot());
}
log.info(snapshot.toString());
final EntitySpec entitySpec =
SnapshotEntityRegistry.getInstance().getEntitySpec(PegasusUtils.getEntityNameFromSchema(snapshot.schema()));
updateSearchService(snapshot, entitySpec, false, false);
updateGraphService(snapshot, entitySpec, false, false);
updateSystemMetadata(oldSnapshot, snapshot, event.getNewSystemMetadata(), operation, entitySpec);
updateSearchService(snapshot, entitySpec);
updateGraphService(snapshot, entitySpec);
updateSystemMetadata(snapshot, event.getNewSystemMetadata(), entitySpec);
} catch (Exception e) {
log.error("Error deserializing message: {}", e.toString());
log.error("Message: {}", record.toString());
}
}
private void updateSystemMetadata(@Nullable final RecordTemplate oldSnapshot,
private void updateSystemMetadata(
@Nullable final RecordTemplate newSnapshot, @Nullable final SystemMetadata newSystemMetadata,
@Nonnull final MetadataAuditOperation operation, @Nonnull final EntitySpec entitySpec) {
// if we are deleting the aspect, we want to remove it from the index
if (operation.equals(MetadataAuditOperation.DELETE)) {
if (oldSnapshot == null) {
return;
}
Map<String, DataElement> oldAspects = AspectExtractor.extractAspects(oldSnapshot);
String oldUrn = oldSnapshot.data().get("urn").toString();
String finalOldUrn = oldUrn;
// an MAE containing just a key signifies that the entity is being deleted- only then should we delete the key
// run id pair
oldAspects.keySet().forEach(aspect -> {
if (!aspect.equals(entitySpec.getKeyAspectName())) {
_systemMetadataService.delete(finalOldUrn, aspect);
} else if (aspect.equals(entitySpec.getKeyAspectName()) && oldAspects.keySet().size() == 1) {
_systemMetadataService.deleteUrn(finalOldUrn);
}
});
return;
}
@Nonnull final EntitySpec entitySpec) {
// otherwise, we want to update the index with a new run id
if (newSnapshot != null) {
@ -184,8 +130,7 @@ public class MetadataAuditEventsProcessor {
*
* @param snapshot Snapshot
*/
private void updateGraphService(final RecordTemplate snapshot, final EntitySpec entitySpec, final boolean delete,
final boolean deleteEntity) {
private void updateGraphService(final RecordTemplate snapshot, final EntitySpec entitySpec) {
final Set<String> relationshipTypesBeingAdded = new HashSet<>();
final List<Edge> edgesToAdd = new ArrayList<>();
final String sourceUrnStr = snapshot.data().get("urn").toString();
@ -212,19 +157,11 @@ public class MetadataAuditEventsProcessor {
}
}
if (deleteEntity) {
new Thread(() -> {
_graphService.removeNode(sourceUrn);
}).start();
} else if (edgesToAdd.size() > 0) {
if (edgesToAdd.size() > 0) {
new Thread(() -> {
_graphService.removeEdgesFromNode(sourceUrn, new ArrayList<>(relationshipTypesBeingAdded),
createRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING));
if (!delete) {
edgesToAdd.forEach(edge -> _graphService.addEdge(edge));
} else if (deleteEntity) {
_graphService.removeNode(sourceUrn);
}
edgesToAdd.forEach(edge -> _graphService.addEdge(edge));
}).start();
}
}
@ -234,13 +171,12 @@ public class MetadataAuditEventsProcessor {
*
* @param snapshot Snapshot
*/
private void updateSearchService(final RecordTemplate snapshot, final EntitySpec entitySpec, final boolean delete,
final boolean deleteEntity) {
private void updateSearchService(final RecordTemplate snapshot, final EntitySpec entitySpec) {
String urn = snapshot.data().get("urn").toString();
Optional<String> searchDocument;
try {
searchDocument = SearchDocumentTransformer.transformSnapshot(snapshot, entitySpec, delete);
searchDocument = SearchDocumentTransformer.transformSnapshot(snapshot, entitySpec, false);
} catch (Exception e) {
log.error("Error in getting documents from snapshot: {} for snapshot {}", e, snapshot);
return;
@ -258,11 +194,6 @@ public class MetadataAuditEventsProcessor {
return;
}
if (deleteEntity) {
_entitySearchService.deleteDocument(entitySpec.getName(), docId);
return;
}
_entitySearchService.upsertDocument(entitySpec.getName(), searchDocument.get(), docId);
}
}

View File

@ -1,6 +1,7 @@
package com.linkedin.metadata.kafka;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
@ -8,6 +9,7 @@ import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.gms.factory.common.GraphServiceFactory;
import com.linkedin.gms.factory.common.SystemMetadataServiceFactory;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.search.EntitySearchServiceFactory;
import com.linkedin.gms.factory.timeseries.TimeseriesAspectServiceFactory;
@ -25,6 +27,7 @@ import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.timeseries.transformer.TimeseriesAspectTransformer;
import com.linkedin.metadata.utils.EntityKeyUtils;
@ -33,6 +36,7 @@ import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.mxe.Topics;
import com.linkedin.util.Pair;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.net.URLEncoder;
@ -52,30 +56,32 @@ import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import static com.linkedin.metadata.search.utils.QueryUtils.newRelationshipFilter;
import static com.linkedin.metadata.search.utils.QueryUtils.*;
@Slf4j
@Component
@Conditional(MetadataChangeLogProcessorCondition.class)
@Import({GraphServiceFactory.class, EntitySearchServiceFactory.class, TimeseriesAspectServiceFactory.class,
EntityRegistryFactory.class})
EntityRegistryFactory.class, SystemMetadataServiceFactory.class})
@EnableKafka
public class MetadataChangeLogProcessor {
private final GraphService _graphService;
private final EntitySearchService _entitySearchService;
private final TimeseriesAspectService _timeseriesAspectService;
private final SystemMetadataService _systemMetadataService;
private final EntityRegistry _entityRegistry;
private final Histogram kafkaLagStats = MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag"));
@Autowired
public MetadataChangeLogProcessor(GraphService graphService, EntitySearchService entitySearchService,
TimeseriesAspectService timeseriesAspectService, EntityRegistry entityRegistry) {
TimeseriesAspectService timeseriesAspectService, SystemMetadataService systemMetadataService, EntityRegistry entityRegistry) {
_graphService = graphService;
_entitySearchService = entitySearchService;
_timeseriesAspectService = timeseriesAspectService;
_systemMetadataService = systemMetadataService;
_entityRegistry = entityRegistry;
_timeseriesAspectService.configure();
@ -99,16 +105,16 @@ public class MetadataChangeLogProcessor {
return;
}
if (event.getChangeType() == ChangeType.UPSERT) {
EntitySpec entitySpec;
try {
entitySpec = _entityRegistry.getEntitySpec(event.getEntityType());
} catch (IllegalArgumentException e) {
log.error("Error while processing entity type {}: {}", event.getEntityType(), e.toString());
return;
}
EntitySpec entitySpec;
try {
entitySpec = _entityRegistry.getEntitySpec(event.getEntityType());
} catch (IllegalArgumentException e) {
log.error("Error while processing entity type {}: {}", event.getEntityType(), e.toString());
return;
}
Urn urn = EntityKeyUtils.getUrnFromLog(event, entitySpec.getKeyAspectSpec());
Urn urn = EntityKeyUtils.getUrnFromLog(event, entitySpec.getKeyAspectSpec());
if (event.getChangeType() == ChangeType.UPSERT) {
if (!event.hasAspectName() || !event.hasAspect()) {
log.error("Aspect or aspect name is missing");
@ -130,14 +136,34 @@ public class MetadataChangeLogProcessor {
} else {
updateSearchService(entitySpec.getName(), urn, aspectSpec, aspect);
updateGraphService(urn, aspectSpec, aspect);
updateSystemMetadata(event.getSystemMetadata(), urn, aspectSpec);
}
} else if (event.getChangeType() == ChangeType.DELETE) {
if (!event.hasAspectName() || !event.hasAspect()) {
log.error("Aspect or aspect name is missing");
return;
}
AspectSpec aspectSpec = entitySpec.getAspectSpec(event.getAspectName());
if (aspectSpec == null) {
log.error("Unrecognized aspect name {} for entity {}", event.getAspectName(), event.getEntityType());
return;
}
RecordTemplate aspect =
GenericAspectUtils.deserializeAspect(event.getAspect().getValue(), event.getAspect().getContentType(),
aspectSpec);
Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName());
if (!aspectSpec.isTimeseries()) {
deleteSystemMetadata(urn, aspectSpec, isDeletingKey);
deleteGraphData(urn, aspectSpec, aspect, isDeletingKey);
deleteSearchData(urn, entitySpec.getName(), aspectSpec, aspect, isDeletingKey);
}
}
}
/**
* Process snapshot and update graph index
*/
private void updateGraphService(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) {
private Pair<List<Edge>, Set<String>> getEdgesAndRelationshipTypesFromAspect(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) {
final Set<String> relationshipTypesBeingAdded = new HashSet<>();
final List<Edge> edgesToAdd = new ArrayList<>();
@ -155,6 +181,19 @@ public class MetadataChangeLogProcessor {
}
}
}
return Pair.of(edgesToAdd, relationshipTypesBeingAdded);
}
/**
* Process snapshot and update graph index
*/
private void updateGraphService(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) {
Pair<List<Edge>, Set<String>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect);
final List<Edge> edgesToAdd = edgeAndRelationTypes.getFirst();
final Set<String> relationshipTypesBeingAdded = edgeAndRelationTypes.getSecond();
log.info(String.format("Here's the relationship types found %s", relationshipTypesBeingAdded));
if (relationshipTypesBeingAdded.size() > 0) {
new Thread(() -> {
@ -171,7 +210,7 @@ public class MetadataChangeLogProcessor {
private void updateSearchService(String entityName, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) {
Optional<String> searchDocument;
try {
searchDocument = SearchDocumentTransformer.transformAspect(urn, aspect, aspectSpec);
searchDocument = SearchDocumentTransformer.transformAspect(urn, aspect, aspectSpec, false);
} catch (Exception e) {
log.error("Error in getting documents from aspect: {} for aspect {}", e, aspectSpec.getName());
return;
@ -208,4 +247,60 @@ public class MetadataChangeLogProcessor {
_timeseriesAspectService.upsertDocument(entityType, aspectName, document.getKey(), document.getValue());
});
}
private void updateSystemMetadata(SystemMetadata systemMetadata, Urn urn, AspectSpec aspectSpec) {
_systemMetadataService.insert(systemMetadata, urn.toString(), aspectSpec.getName());
}
private void deleteSystemMetadata(Urn urn, AspectSpec aspectSpec, Boolean isKeyAspect) {
if (isKeyAspect) {
_systemMetadataService.deleteUrn(urn.toString());
}
_systemMetadataService.delete(urn.toString(), aspectSpec.getName());
}
private void deleteGraphData(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect, Boolean isKeyAspect) {
if (isKeyAspect) {
_graphService.removeNode(urn);
return;
}
Pair<List<Edge>, Set<String>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect);
final Set<String> relationshipTypesBeingAdded = edgeAndRelationTypes.getSecond();
if (relationshipTypesBeingAdded.size() > 0) {
_graphService.removeEdgesFromNode(urn, new ArrayList<>(relationshipTypesBeingAdded),
createRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING));
}
}
private void deleteSearchData(Urn urn, String entityName, AspectSpec aspectSpec, RecordTemplate aspect, Boolean isKeyAspect) {
String docId;
try {
docId = URLEncoder.encode(urn.toString(), "UTF-8");
} catch (UnsupportedEncodingException e) {
log.error("Failed to encode the urn with error: {}", e.toString());
return;
}
if (isKeyAspect) {
_entitySearchService.deleteDocument(entityName, docId);
return;
}
Optional<String> searchDocument;
try {
searchDocument = SearchDocumentTransformer.transformAspect(urn, aspect, aspectSpec, true);
} catch (Exception e) {
log.error("Error in getting documents from aspect: {} for aspect {}", e, aspectSpec.getName());
return;
}
if (!searchDocument.isPresent()) {
return;
}
_entitySearchService.upsertDocument(entityName, searchDocument.get(), docId);
}
}