diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java index 57a2ec6956..21e22094c7 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java @@ -14,8 +14,6 @@ import com.linkedin.metadata.entity.ebean.EbeanUtils; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.registry.EntityRegistry; -import com.linkedin.metadata.utils.GenericAspectUtils; -import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.SystemMetadata; import io.ebean.EbeanServer; import io.ebean.PagedList; @@ -108,16 +106,9 @@ public class SendMAEStep implements UpgradeStep { SystemMetadata latestSystemMetadata = EbeanUtils.parseSystemMetadata(aspect.getSystemMetadata()); - final MetadataChangeLog metadataChangeLog = new MetadataChangeLog(); - metadataChangeLog.setEntityType(entityName); - metadataChangeLog.setEntityUrn(urn); - metadataChangeLog.setChangeType(ChangeType.UPSERT); - metadataChangeLog.setAspectName(aspectName); - metadataChangeLog.setAspect(GenericAspectUtils.serializeAspect(aspectRecord)); - metadataChangeLog.setSystemMetadata(latestSystemMetadata); - // 5. Produce MAE events for the aspect record - _entityService.produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog); + _entityService.produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, null, aspectRecord, null, + latestSystemMetadata, ChangeType.UPSERT); totalRowsMigrated++; } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java index 85f672159d..256334c862 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java @@ -10,6 +10,7 @@ import com.linkedin.data.schema.TyperefDataSchema; import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.UnionTemplate; import com.linkedin.entity.Entity; +import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.VersionedAspect; import com.linkedin.metadata.dao.exception.ModelConversionException; import com.linkedin.metadata.dao.utils.RecordUtils; @@ -23,6 +24,7 @@ import com.linkedin.metadata.search.utils.BrowsePathUtils; import com.linkedin.metadata.snapshot.Snapshot; import com.linkedin.metadata.utils.DataPlatformInstanceUtils; import com.linkedin.metadata.utils.EntityKeyUtils; +import com.linkedin.metadata.utils.GenericAspectUtils; import com.linkedin.metadata.utils.PegasusUtils; import com.linkedin.mxe.MetadataAuditOperation; import com.linkedin.mxe.MetadataChangeLog; @@ -34,6 +36,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -250,6 +253,30 @@ public abstract class EntityService { _producer.produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog); } + public void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull String entityName, @Nonnull String aspectName, + @Nonnull final AspectSpec aspectSpec, @Nullable final RecordTemplate oldAspectValue, + @Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata, + @Nullable final SystemMetadata newSystemMetadata, @Nonnull final ChangeType changeType) { + final MetadataChangeLog metadataChangeLog = new MetadataChangeLog(); + metadataChangeLog.setEntityType(entityName); + metadataChangeLog.setEntityUrn(urn); + metadataChangeLog.setChangeType(changeType); + metadataChangeLog.setAspectName(aspectName); + if (newAspectValue != null) { + metadataChangeLog.setAspect(GenericAspectUtils.serializeAspect(newAspectValue)); + } + if (newSystemMetadata != null) { + metadataChangeLog.setSystemMetadata(newSystemMetadata); + } + if (oldAspectValue != null) { + metadataChangeLog.setPreviousAspectValue(GenericAspectUtils.serializeAspect(oldAspectValue)); + } + if (oldSystemMetadata != null) { + metadataChangeLog.setPreviousSystemMetadata(oldSystemMetadata); + } + produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog); + } + public void produceMetadataAuditEventForKey(@Nonnull final Urn urn, @Nullable final SystemMetadata newSystemMetadata) { @@ -311,7 +338,8 @@ public abstract class EntityService { .collect(Collectors.toList()))); } - public List> generateDefaultAspectsIfMissing(@Nonnull final Urn urn, Set includedAspects) { + public List> generateDefaultAspectsIfMissing(@Nonnull final Urn urn, + Set includedAspects) { List> aspects = new ArrayList<>(); final String keyAspectName = getKeyAspectName(urn); @@ -343,9 +371,7 @@ public abstract class EntityService { return aspects; } - private void ingestSnapshotUnion( - @Nonnull final Snapshot snapshotUnion, - @Nonnull final AuditStamp auditStamp, + private void ingestSnapshotUnion(@Nonnull final Snapshot snapshotUnion, @Nonnull final AuditStamp auditStamp, SystemMetadata systemMetadata) { final RecordTemplate snapshotRecord = RecordUtils.getSelectedRecordTemplateFromUnion(snapshotUnion); final Urn urn = com.linkedin.metadata.dao.utils.ModelUtils.getUrnFromSnapshot(snapshotRecord); @@ -353,10 +379,8 @@ public abstract class EntityService { NewModelUtils.getAspectsFromSnapshot(snapshotRecord); log.info("INGEST urn {} with system metadata {}", urn.toString(), systemMetadata.toString()); - aspectRecordsToIngest.addAll(generateDefaultAspectsIfMissing( - urn, - aspectRecordsToIngest.stream().map(pair -> pair.getFirst()).collect(Collectors.toSet()) - )); + aspectRecordsToIngest.addAll(generateDefaultAspectsIfMissing(urn, + aspectRecordsToIngest.stream().map(pair -> pair.getFirst()).collect(Collectors.toSet()))); aspectRecordsToIngest.forEach(aspectNamePair -> { ingestAspect(urn, aspectNamePair.getFirst(), aspectNamePair.getSecond(), auditStamp, systemMetadata); @@ -395,6 +419,11 @@ public abstract class EntityService { return spec.getKeyAspectSpec(); } + public Optional getAspectSpec(@Nonnull final String entityName, @Nonnull final String aspectName) { + final EntitySpec entitySpec = _entityRegistry.getEntitySpec(entityName); + return Optional.ofNullable(entitySpec.getAspectSpec(aspectName)); + } + public String getKeyAspectName(@Nonnull final Urn urn) { final EntitySpec spec = _entityRegistry.getEntitySpec(urnToEntityName(urn)); final AspectSpec keySpec = spec.getKeyAspectSpec(); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/RollbackResult.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/RollbackResult.java index 7f33523d74..76a12a67b3 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/RollbackResult.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/RollbackResult.java @@ -1,8 +1,9 @@ package com.linkedin.metadata.entity; import com.linkedin.common.urn.Urn; + import com.linkedin.data.template.RecordTemplate; -import com.linkedin.mxe.MetadataAuditOperation; +import com.linkedin.events.metadata.ChangeType; import com.linkedin.mxe.SystemMetadata; import lombok.Value; @@ -10,11 +11,13 @@ import lombok.Value; @Value public class RollbackResult { public Urn urn; + public String entityName; + public String aspectName; public RecordTemplate oldValue; public RecordTemplate newValue; public SystemMetadata oldSystemMetadata; public SystemMetadata newSystemMetadata; - public MetadataAuditOperation operation; + public ChangeType changeType; public Boolean keyAffected; public Integer additionalRowsAffected; } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java index c6de80bb30..492f1a5b36 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java @@ -19,6 +19,7 @@ import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.ListResult; import com.linkedin.metadata.entity.RollbackResult; import com.linkedin.metadata.entity.RollbackRunResult; +import com.linkedin.metadata.entity.ValidationUtils; import com.linkedin.metadata.event.EntityEventProducer; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; @@ -28,7 +29,6 @@ import com.linkedin.metadata.run.AspectRowSummary; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericAspectUtils; import com.linkedin.metadata.utils.metrics.MetricUtils; -import com.linkedin.metadata.entity.ValidationUtils; import com.linkedin.mxe.MetadataAuditOperation; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.MetadataChangeProposal; @@ -106,9 +106,8 @@ public class EbeanEntityService extends EntityService { }); Map batchGetResults = new HashMap<>(); - Iterators.partition(dbKeys.iterator(), 500).forEachRemaining( - batch -> batchGetResults.putAll(_entityDao.batchGet(ImmutableSet.copyOf(batch))) - ); + Iterators.partition(dbKeys.iterator(), 500) + .forEachRemaining(batch -> batchGetResults.putAll(_entityDao.batchGet(ImmutableSet.copyOf(batch)))); batchGetResults.forEach((key, aspectEntry) -> { final Urn urn = toUrn(key.getUrn()); @@ -337,20 +336,8 @@ public class EbeanEntityService extends EntityService { if (emitMae) { log.debug(String.format("Producing MetadataAuditEvent for updated aspect %s, urn %s", aspectName, urn)); - final MetadataChangeLog metadataChangeLog = new MetadataChangeLog(); - metadataChangeLog.setEntityType(entityName); - metadataChangeLog.setEntityUrn(urn); - metadataChangeLog.setChangeType(ChangeType.UPSERT); - metadataChangeLog.setAspectName(aspectName); - metadataChangeLog.setAspect(GenericAspectUtils.serializeAspect(newValue)); - metadataChangeLog.setSystemMetadata(result.newSystemMetadata); - if (oldValue != null) { - metadataChangeLog.setPreviousAspectValue(GenericAspectUtils.serializeAspect(oldValue)); - } - if (result.oldSystemMetadata != null) { - metadataChangeLog.setPreviousSystemMetadata(result.oldSystemMetadata); - } - produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog); + produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, oldValue, newValue, result.oldSystemMetadata, + result.newSystemMetadata, ChangeType.UPSERT); } else { log.debug(String.format("Skipped producing MetadataAuditEvent for updated aspect %s, urn %s. emitMAE is false.", aspectName, urn)); @@ -532,11 +519,11 @@ public class EbeanEntityService extends EntityService { : toAspectRecord(Urn.createFromString(previousAspect.getKey().getUrn()), previousAspect.getKey().getAspect(), previousMetadata, getEntityRegistry()); - return new RollbackResult(Urn.createFromString(urn), latestValue, + final Urn urnObj = Urn.createFromString(urn); + return new RollbackResult(urnObj, urnObj.getEntityType(), latest.getAspect(), latestValue, previousValue == null ? latestValue : previousValue, latestSystemMetadata, previousValue == null ? null : parseSystemMetadata(previousAspect.getSystemMetadata()), - previousAspect == null ? MetadataAuditOperation.DELETE : MetadataAuditOperation.UPDATE, isKeyAspect, - additionalRowsDeleted); + previousAspect == null ? ChangeType.DELETE : ChangeType.UPSERT, isKeyAspect, additionalRowsDeleted); } catch (URISyntaxException e) { e.printStackTrace(); } @@ -555,12 +542,18 @@ public class EbeanEntityService extends EntityService { aspectRows.forEach(aspectToRemove -> { RollbackResult result = deleteAspect(aspectToRemove.getUrn(), aspectToRemove.getAspectName(), runId); - if (result != null) { + Optional aspectSpec = getAspectSpec(result.entityName, result.aspectName); + if (!aspectSpec.isPresent()) { + log.error("Issue while rolling back: unknown aspect {} for entity {}", result.entityName, result.aspectName); + return; + } + rowsDeletedFromEntityDeletion.addAndGet(result.additionalRowsAffected); removedAspects.add(aspectToRemove); - produceMetadataAuditEvent(result.getUrn(), result.getOldValue(), result.getNewValue(), - result.getOldSystemMetadata(), result.getNewSystemMetadata(), result.getOperation()); + produceMetadataChangeLog(result.getUrn(), result.getEntityName(), result.getAspectName(), aspectSpec.get(), + result.getOldValue(), result.getNewValue(), result.getOldSystemMetadata(), result.getNewSystemMetadata(), + result.getChangeType()); } }); @@ -581,8 +574,12 @@ public class EbeanEntityService extends EntityService { SystemMetadata latestKeySystemMetadata = parseSystemMetadata(latestKey.getSystemMetadata()); RollbackResult result = deleteAspect(urn.toString(), keyAspectName, latestKeySystemMetadata.getRunId()); + Optional aspectSpec = getAspectSpec(result.entityName, result.aspectName); + if (!aspectSpec.isPresent()) { + log.error("Issue while rolling back: unknown aspect {} for entity {}", result.entityName, result.aspectName); + } - if (result != null) { + if (result != null && aspectSpec.isPresent()) { AspectRowSummary summary = new AspectRowSummary(); summary.setUrn(urn.toString()); summary.setKeyAspect(true); @@ -592,8 +589,9 @@ public class EbeanEntityService extends EntityService { rowsDeletedFromEntityDeletion = result.additionalRowsAffected; removedAspects.add(summary); - produceMetadataAuditEvent(result.getUrn(), result.getOldValue(), result.getNewValue(), - result.getOldSystemMetadata(), result.getNewSystemMetadata(), result.getOperation()); + produceMetadataChangeLog(result.getUrn(), result.getEntityName(), result.getAspectName(), aspectSpec.get(), + result.getOldValue(), result.getNewValue(), result.getOldSystemMetadata(), result.getNewSystemMetadata(), + result.getChangeType()); } return new RollbackRunResult(removedAspects, rowsDeletedFromEntityDeletion); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java index 78674a3782..5d2c1d7891 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java @@ -44,7 +44,12 @@ public class SearchDocumentTransformer { return Optional.of(searchDocument.toString()); } - public static Optional transformAspect(final Urn urn, final RecordTemplate aspect, final AspectSpec aspectSpec) { + public static Optional transformAspect( + final Urn urn, + final RecordTemplate aspect, + final AspectSpec aspectSpec, + final Boolean forDelete + ) { final Map> extractedFields = FieldExtractor.extractFields(aspect, aspectSpec.getSearchableFieldSpecs()); if (extractedFields.isEmpty()) { @@ -52,7 +57,7 @@ public class SearchDocumentTransformer { } final ObjectNode searchDocument = JsonNodeFactory.instance.objectNode(); searchDocument.put("urn", urn.toString()); - extractedFields.forEach((key, value) -> setValue(key, value, searchDocument, false)); + extractedFields.forEach((key, value) -> setValue(key, value, searchDocument, forDelete)); return Optional.of(searchDocument.toString()); } diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java index ad1dd6b580..9e1dcdad8c 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java @@ -28,7 +28,6 @@ import com.linkedin.metadata.systemmetadata.SystemMetadataService; import com.linkedin.metadata.utils.PegasusUtils; import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.mxe.MetadataAuditEvent; -import com.linkedin.mxe.MetadataAuditOperation; import com.linkedin.mxe.SystemMetadata; import com.linkedin.mxe.Topics; import java.io.UnsupportedEncodingException; @@ -90,78 +89,25 @@ public class MetadataAuditEventsProcessor { try { final MetadataAuditEvent event = EventUtils.avroToPegasusMAE(record); - final MetadataAuditOperation operation = - event.hasOperation() ? event.getOperation() : MetadataAuditOperation.UPDATE; - - if (operation.equals(MetadataAuditOperation.DELETE)) { - // in this case, we deleted an entity and want to de-index the previous value - - // 1. verify an old snapshot is present-- if not, we cannot process the delete - if (!event.hasOldSnapshot()) { - return; - } - - final RecordTemplate snapshot = RecordUtils.getSelectedRecordTemplateFromUnion(event.getOldSnapshot()); - - log.info("deleting {}", snapshot); - - final EntitySpec entitySpec = - SnapshotEntityRegistry.getInstance().getEntitySpec(PegasusUtils.getEntityNameFromSchema(snapshot.schema())); - final Map aspectsToUpdate = AspectExtractor.extractAspects(snapshot); - boolean deleteEntity = - aspectsToUpdate.containsKey(entitySpec.getKeyAspectName()) && aspectsToUpdate.keySet().size() == 1; - updateSearchService(snapshot, entitySpec, true, deleteEntity); - updateGraphService(snapshot, entitySpec, true, deleteEntity); - updateSystemMetadata(RecordUtils.getSelectedRecordTemplateFromUnion(event.getOldSnapshot()), - event.hasNewSnapshot() ? RecordUtils.getSelectedRecordTemplateFromUnion(event.getNewSnapshot()) : null, - event.hasNewSystemMetadata() ? event.getNewSystemMetadata() : null, operation, entitySpec); - return; - } final RecordTemplate snapshot = RecordUtils.getSelectedRecordTemplateFromUnion(event.getNewSnapshot()); - RecordTemplate oldSnapshot = null; - - if (event.hasOldSnapshot()) { - oldSnapshot = RecordUtils.getSelectedRecordTemplateFromUnion(event.getOldSnapshot()); - } log.info(snapshot.toString()); final EntitySpec entitySpec = SnapshotEntityRegistry.getInstance().getEntitySpec(PegasusUtils.getEntityNameFromSchema(snapshot.schema())); - updateSearchService(snapshot, entitySpec, false, false); - updateGraphService(snapshot, entitySpec, false, false); - updateSystemMetadata(oldSnapshot, snapshot, event.getNewSystemMetadata(), operation, entitySpec); + updateSearchService(snapshot, entitySpec); + updateGraphService(snapshot, entitySpec); + updateSystemMetadata(snapshot, event.getNewSystemMetadata(), entitySpec); } catch (Exception e) { log.error("Error deserializing message: {}", e.toString()); log.error("Message: {}", record.toString()); } } - private void updateSystemMetadata(@Nullable final RecordTemplate oldSnapshot, + private void updateSystemMetadata( @Nullable final RecordTemplate newSnapshot, @Nullable final SystemMetadata newSystemMetadata, - @Nonnull final MetadataAuditOperation operation, @Nonnull final EntitySpec entitySpec) { - - // if we are deleting the aspect, we want to remove it from the index - if (operation.equals(MetadataAuditOperation.DELETE)) { - if (oldSnapshot == null) { - return; - } - - Map oldAspects = AspectExtractor.extractAspects(oldSnapshot); - String oldUrn = oldSnapshot.data().get("urn").toString(); - String finalOldUrn = oldUrn; - // an MAE containing just a key signifies that the entity is being deleted- only then should we delete the key - // run id pair - oldAspects.keySet().forEach(aspect -> { - if (!aspect.equals(entitySpec.getKeyAspectName())) { - _systemMetadataService.delete(finalOldUrn, aspect); - } else if (aspect.equals(entitySpec.getKeyAspectName()) && oldAspects.keySet().size() == 1) { - _systemMetadataService.deleteUrn(finalOldUrn); - } - }); - return; - } + @Nonnull final EntitySpec entitySpec) { // otherwise, we want to update the index with a new run id if (newSnapshot != null) { @@ -184,8 +130,7 @@ public class MetadataAuditEventsProcessor { * * @param snapshot Snapshot */ - private void updateGraphService(final RecordTemplate snapshot, final EntitySpec entitySpec, final boolean delete, - final boolean deleteEntity) { + private void updateGraphService(final RecordTemplate snapshot, final EntitySpec entitySpec) { final Set relationshipTypesBeingAdded = new HashSet<>(); final List edgesToAdd = new ArrayList<>(); final String sourceUrnStr = snapshot.data().get("urn").toString(); @@ -212,19 +157,11 @@ public class MetadataAuditEventsProcessor { } } - if (deleteEntity) { - new Thread(() -> { - _graphService.removeNode(sourceUrn); - }).start(); - } else if (edgesToAdd.size() > 0) { + if (edgesToAdd.size() > 0) { new Thread(() -> { _graphService.removeEdgesFromNode(sourceUrn, new ArrayList<>(relationshipTypesBeingAdded), createRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING)); - if (!delete) { - edgesToAdd.forEach(edge -> _graphService.addEdge(edge)); - } else if (deleteEntity) { - _graphService.removeNode(sourceUrn); - } + edgesToAdd.forEach(edge -> _graphService.addEdge(edge)); }).start(); } } @@ -234,13 +171,12 @@ public class MetadataAuditEventsProcessor { * * @param snapshot Snapshot */ - private void updateSearchService(final RecordTemplate snapshot, final EntitySpec entitySpec, final boolean delete, - final boolean deleteEntity) { + private void updateSearchService(final RecordTemplate snapshot, final EntitySpec entitySpec) { String urn = snapshot.data().get("urn").toString(); Optional searchDocument; try { - searchDocument = SearchDocumentTransformer.transformSnapshot(snapshot, entitySpec, delete); + searchDocument = SearchDocumentTransformer.transformSnapshot(snapshot, entitySpec, false); } catch (Exception e) { log.error("Error in getting documents from snapshot: {} for snapshot {}", e, snapshot); return; @@ -258,11 +194,6 @@ public class MetadataAuditEventsProcessor { return; } - if (deleteEntity) { - _entitySearchService.deleteDocument(entitySpec.getName(), docId); - return; - } - _entitySearchService.upsertDocument(entitySpec.getName(), searchDocument.get(), docId); } } diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java index a5a0beeccc..c240a94e8a 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java @@ -1,6 +1,7 @@ package com.linkedin.metadata.kafka; import com.codahale.metrics.Histogram; + import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -8,6 +9,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.gms.factory.common.GraphServiceFactory; +import com.linkedin.gms.factory.common.SystemMetadataServiceFactory; import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory; import com.linkedin.gms.factory.search.EntitySearchServiceFactory; import com.linkedin.gms.factory.timeseries.TimeseriesAspectServiceFactory; @@ -25,6 +27,7 @@ import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.search.transformer.SearchDocumentTransformer; +import com.linkedin.metadata.systemmetadata.SystemMetadataService; import com.linkedin.metadata.timeseries.TimeseriesAspectService; import com.linkedin.metadata.timeseries.transformer.TimeseriesAspectTransformer; import com.linkedin.metadata.utils.EntityKeyUtils; @@ -33,6 +36,7 @@ import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.SystemMetadata; import com.linkedin.mxe.Topics; +import com.linkedin.util.Pair; import java.io.UnsupportedEncodingException; import java.net.URISyntaxException; import java.net.URLEncoder; @@ -52,30 +56,32 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; -import static com.linkedin.metadata.search.utils.QueryUtils.newRelationshipFilter; +import static com.linkedin.metadata.search.utils.QueryUtils.*; @Slf4j @Component @Conditional(MetadataChangeLogProcessorCondition.class) @Import({GraphServiceFactory.class, EntitySearchServiceFactory.class, TimeseriesAspectServiceFactory.class, - EntityRegistryFactory.class}) + EntityRegistryFactory.class, SystemMetadataServiceFactory.class}) @EnableKafka public class MetadataChangeLogProcessor { private final GraphService _graphService; private final EntitySearchService _entitySearchService; private final TimeseriesAspectService _timeseriesAspectService; + private final SystemMetadataService _systemMetadataService; private final EntityRegistry _entityRegistry; private final Histogram kafkaLagStats = MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag")); @Autowired public MetadataChangeLogProcessor(GraphService graphService, EntitySearchService entitySearchService, - TimeseriesAspectService timeseriesAspectService, EntityRegistry entityRegistry) { + TimeseriesAspectService timeseriesAspectService, SystemMetadataService systemMetadataService, EntityRegistry entityRegistry) { _graphService = graphService; _entitySearchService = entitySearchService; _timeseriesAspectService = timeseriesAspectService; + _systemMetadataService = systemMetadataService; _entityRegistry = entityRegistry; _timeseriesAspectService.configure(); @@ -99,16 +105,16 @@ public class MetadataChangeLogProcessor { return; } - if (event.getChangeType() == ChangeType.UPSERT) { - EntitySpec entitySpec; - try { - entitySpec = _entityRegistry.getEntitySpec(event.getEntityType()); - } catch (IllegalArgumentException e) { - log.error("Error while processing entity type {}: {}", event.getEntityType(), e.toString()); - return; - } + EntitySpec entitySpec; + try { + entitySpec = _entityRegistry.getEntitySpec(event.getEntityType()); + } catch (IllegalArgumentException e) { + log.error("Error while processing entity type {}: {}", event.getEntityType(), e.toString()); + return; + } + Urn urn = EntityKeyUtils.getUrnFromLog(event, entitySpec.getKeyAspectSpec()); - Urn urn = EntityKeyUtils.getUrnFromLog(event, entitySpec.getKeyAspectSpec()); + if (event.getChangeType() == ChangeType.UPSERT) { if (!event.hasAspectName() || !event.hasAspect()) { log.error("Aspect or aspect name is missing"); @@ -130,14 +136,34 @@ public class MetadataChangeLogProcessor { } else { updateSearchService(entitySpec.getName(), urn, aspectSpec, aspect); updateGraphService(urn, aspectSpec, aspect); + updateSystemMetadata(event.getSystemMetadata(), urn, aspectSpec); + } + } else if (event.getChangeType() == ChangeType.DELETE) { + if (!event.hasAspectName() || !event.hasAspect()) { + log.error("Aspect or aspect name is missing"); + return; + } + + AspectSpec aspectSpec = entitySpec.getAspectSpec(event.getAspectName()); + if (aspectSpec == null) { + log.error("Unrecognized aspect name {} for entity {}", event.getAspectName(), event.getEntityType()); + return; + } + + RecordTemplate aspect = + GenericAspectUtils.deserializeAspect(event.getAspect().getValue(), event.getAspect().getContentType(), + aspectSpec); + Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName()); + + if (!aspectSpec.isTimeseries()) { + deleteSystemMetadata(urn, aspectSpec, isDeletingKey); + deleteGraphData(urn, aspectSpec, aspect, isDeletingKey); + deleteSearchData(urn, entitySpec.getName(), aspectSpec, aspect, isDeletingKey); } } } - /** - * Process snapshot and update graph index - */ - private void updateGraphService(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) { + private Pair, Set> getEdgesAndRelationshipTypesFromAspect(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) { final Set relationshipTypesBeingAdded = new HashSet<>(); final List edgesToAdd = new ArrayList<>(); @@ -155,6 +181,19 @@ public class MetadataChangeLogProcessor { } } } + return Pair.of(edgesToAdd, relationshipTypesBeingAdded); + } + + /** + * Process snapshot and update graph index + */ + private void updateGraphService(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) { + Pair, Set> edgeAndRelationTypes = + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect); + + final List edgesToAdd = edgeAndRelationTypes.getFirst(); + final Set relationshipTypesBeingAdded = edgeAndRelationTypes.getSecond(); + log.info(String.format("Here's the relationship types found %s", relationshipTypesBeingAdded)); if (relationshipTypesBeingAdded.size() > 0) { new Thread(() -> { @@ -171,7 +210,7 @@ public class MetadataChangeLogProcessor { private void updateSearchService(String entityName, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) { Optional searchDocument; try { - searchDocument = SearchDocumentTransformer.transformAspect(urn, aspect, aspectSpec); + searchDocument = SearchDocumentTransformer.transformAspect(urn, aspect, aspectSpec, false); } catch (Exception e) { log.error("Error in getting documents from aspect: {} for aspect {}", e, aspectSpec.getName()); return; @@ -208,4 +247,60 @@ public class MetadataChangeLogProcessor { _timeseriesAspectService.upsertDocument(entityType, aspectName, document.getKey(), document.getValue()); }); } + + private void updateSystemMetadata(SystemMetadata systemMetadata, Urn urn, AspectSpec aspectSpec) { + _systemMetadataService.insert(systemMetadata, urn.toString(), aspectSpec.getName()); + } + + private void deleteSystemMetadata(Urn urn, AspectSpec aspectSpec, Boolean isKeyAspect) { + if (isKeyAspect) { + _systemMetadataService.deleteUrn(urn.toString()); + } + _systemMetadataService.delete(urn.toString(), aspectSpec.getName()); + } + + private void deleteGraphData(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect, Boolean isKeyAspect) { + if (isKeyAspect) { + _graphService.removeNode(urn); + return; + } + + Pair, Set> edgeAndRelationTypes = + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect); + + final Set relationshipTypesBeingAdded = edgeAndRelationTypes.getSecond(); + if (relationshipTypesBeingAdded.size() > 0) { + _graphService.removeEdgesFromNode(urn, new ArrayList<>(relationshipTypesBeingAdded), + createRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING)); + } + } + + private void deleteSearchData(Urn urn, String entityName, AspectSpec aspectSpec, RecordTemplate aspect, Boolean isKeyAspect) { + String docId; + try { + docId = URLEncoder.encode(urn.toString(), "UTF-8"); + } catch (UnsupportedEncodingException e) { + log.error("Failed to encode the urn with error: {}", e.toString()); + return; + } + + if (isKeyAspect) { + _entitySearchService.deleteDocument(entityName, docId); + return; + } + + Optional searchDocument; + try { + searchDocument = SearchDocumentTransformer.transformAspect(urn, aspect, aspectSpec, true); + } catch (Exception e) { + log.error("Error in getting documents from aspect: {} for aspect {}", e, aspectSpec.getName()); + return; + } + + if (!searchDocument.isPresent()) { + return; + } + + _entitySearchService.upsertDocument(entityName, searchDocument.get(), docId); + } }