diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/RelationshipFieldSpec.java b/entity-registry/src/main/java/com/linkedin/metadata/models/RelationshipFieldSpec.java index 8ff559bde5..76454850aa 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/RelationshipFieldSpec.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/RelationshipFieldSpec.java @@ -31,4 +31,8 @@ public class RelationshipFieldSpec implements FieldSpec { public List getValidDestinationTypes() { return relationshipAnnotation.getValidDestinationTypes(); } + + public boolean isLineageRelationship() { + return relationshipAnnotation.isLineage(); + } } diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index 1ce8173455..327f623122 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -16,7 +16,12 @@ public class Constants { public static final Long ASPECT_LATEST_VERSION = 0L; public static final String UNKNOWN_DATA_PLATFORM = "urn:li:dataPlatform:unknown"; + /** + * System Metadata + */ public static final String DEFAULT_RUN_ID = "no-run-id-provided"; + // Forces indexing for no-ops, enabled for restore indices calls. Only considered in the no-op case + public static final String FORCE_INDEXING_KEY = "forceIndexing"; /** * Entities diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java index d0e2876cd1..66db564d17 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java @@ -27,6 +27,7 @@ import com.linkedin.data.schema.validator.Validator; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.StringArray; +import com.linkedin.data.template.StringMap; import com.linkedin.data.template.UnionTemplate; import com.linkedin.dataplatform.DataPlatformInfo; import com.linkedin.entity.AspectType; @@ -49,6 +50,7 @@ import com.linkedin.metadata.entity.validation.ValidationUtils; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.RelationshipFieldSpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.template.AspectTemplateEngine; import com.linkedin.metadata.query.ListUrnsResult; @@ -781,7 +783,7 @@ public class EntityService { // Produce MCL after a successful update boolean isNoOp = oldValue == updatedValue; - if (!isNoOp || _alwaysEmitChangeLog) { + if (!isNoOp || _alwaysEmitChangeLog || shouldAspectEmitChangeLog(urn, aspectName)) { log.debug(String.format("Producing MetadataChangeLog for ingested aspect %s, urn %s", aspectName, urn)); String entityName = urnToEntityName(urn); EntitySpec entitySpec = getEntityRegistry().getEntitySpec(entityName); @@ -1028,32 +1030,39 @@ public class EntityService { RecordTemplate newAspect, SystemMetadata newSystemMetadata, MetadataChangeProposal mcp, Urn entityUrn, AuditStamp auditStamp, AspectSpec aspectSpec) { - log.debug("Producing MetadataChangeLog for ingested aspect {}, urn {}", mcp.getAspectName(), entityUrn); + boolean isNoOp = oldAspect == newAspect; + if (!isNoOp || _alwaysEmitChangeLog || shouldAspectEmitChangeLog(aspectSpec)) { + log.debug("Producing MetadataChangeLog for ingested aspect {}, urn {}", mcp.getAspectName(), entityUrn); - final MetadataChangeLog metadataChangeLog = new MetadataChangeLog(mcp.data()); - metadataChangeLog.setEntityUrn(entityUrn); - metadataChangeLog.setCreated(auditStamp); - // The change log produced by this method is always an upsert as it contains the entire RecordTemplate update - metadataChangeLog.setChangeType(ChangeType.UPSERT); + final MetadataChangeLog metadataChangeLog = new MetadataChangeLog(mcp.data()); + metadataChangeLog.setEntityUrn(entityUrn); + metadataChangeLog.setCreated(auditStamp); + metadataChangeLog.setChangeType(isNoOp ? ChangeType.RESTATE : ChangeType.UPSERT); - if (oldAspect != null) { - metadataChangeLog.setPreviousAspectValue(GenericRecordUtils.serializeAspect(oldAspect)); + if (oldAspect != null) { + metadataChangeLog.setPreviousAspectValue(GenericRecordUtils.serializeAspect(oldAspect)); + } + if (oldSystemMetadata != null) { + metadataChangeLog.setPreviousSystemMetadata(oldSystemMetadata); + } + if (newAspect != null) { + metadataChangeLog.setAspect(GenericRecordUtils.serializeAspect(newAspect)); + } + if (newSystemMetadata != null) { + metadataChangeLog.setSystemMetadata(newSystemMetadata); + } + + log.debug("Serialized MCL event: {}", metadataChangeLog); + + produceMetadataChangeLog(entityUrn, aspectSpec, metadataChangeLog); + + return true; + } else { + log.debug( + "Skipped producing MetadataChangeLog for ingested aspect {}, urn {}. Aspect has not changed.", + mcp.getAspectName(), entityUrn); + return false; } - if (oldSystemMetadata != null) { - metadataChangeLog.setPreviousSystemMetadata(oldSystemMetadata); - } - if (newAspect != null) { - metadataChangeLog.setAspect(GenericRecordUtils.serializeAspect(newAspect)); - } - if (newSystemMetadata != null) { - metadataChangeLog.setSystemMetadata(newSystemMetadata); - } - - log.debug("Serialized MCL event: {}", metadataChangeLog); - - produceMetadataChangeLog(entityUrn, aspectSpec, metadataChangeLog); - - return true; } public Integer getCountAspect(@Nonnull String aspectName, @Nullable String urnLike) { @@ -1130,7 +1139,12 @@ public class EntityService { result.createRecordMs += System.currentTimeMillis() - startTime; startTime = System.currentTimeMillis(); + // Force indexing to skip diff mode and fix error states SystemMetadata latestSystemMetadata = EntityUtils.parseSystemMetadata(aspect.getSystemMetadata()); + StringMap properties = latestSystemMetadata.getProperties() != null ? latestSystemMetadata.getProperties() + : new StringMap(); + properties.put(FORCE_INDEXING_KEY, Boolean.TRUE.toString()); + latestSystemMetadata.setProperties(properties); // 5. Produce MAE events for the aspect record produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, null, aspectRecord, null, @@ -2050,12 +2064,26 @@ public class EntityService { urn, ImmutableSet.of(Constants.DATA_PLATFORM_INFO_ASPECT_NAME) ); - if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.DATA_PLATFORM_INFO_ASPECT_NAME)) { - return new DataPlatformInfo(entityResponse.getAspects().get(Constants.DATA_PLATFORM_INFO_ASPECT_NAME).getValue().data()); + if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects() + .containsKey(Constants.DATA_PLATFORM_INFO_ASPECT_NAME)) { + return new DataPlatformInfo( + entityResponse.getAspects().get(Constants.DATA_PLATFORM_INFO_ASPECT_NAME).getValue().data()); } } catch (Exception e) { log.warn(String.format("Failed to find Data Platform Info for urn %s", urn)); } return null; } + + private boolean shouldAspectEmitChangeLog(@Nonnull final Urn urn, @Nonnull final String aspectName) { + final String entityName = urnToEntityName(urn); + final EntitySpec entitySpec = getEntityRegistry().getEntitySpec(entityName); + final AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName); + return shouldAspectEmitChangeLog(aspectSpec); + } + + private boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspectSpec) { + final List relationshipFieldSpecs = aspectSpec.getRelationshipFieldSpecs(); + return relationshipFieldSpecs.stream().anyMatch(RelationshipFieldSpec::isLineageRelationship); + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java b/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java index 3e82e43abd..a7f4b1a34a 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java @@ -6,11 +6,15 @@ import com.linkedin.common.ChangeAuditStamps; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.dataset.Upstream; +import com.linkedin.dataset.UpstreamArray; +import com.linkedin.dataset.UpstreamLineage; import com.linkedin.identity.CorpUserInfo; import com.linkedin.metadata.key.CorpUserKey; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.PegasusUtils; import com.linkedin.mxe.SystemMetadata; +import java.util.Collections; import javax.annotation.Nonnull; @@ -62,6 +66,14 @@ public class AspectGenerationUtils { return chartInfo; } + @Nonnull + public static UpstreamLineage createUpstreamLineage() { + final UpstreamLineage upstreamLineage = new UpstreamLineage(); + final UpstreamArray upstreams = new UpstreamArray(); + upstreamLineage.setUpstreams(upstreams); + return upstreamLineage; + } + @Nonnull public static String getAspectName(RecordTemplate record) { return PegasusUtils.getAspectNameFromSchema(record.schema()); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java index 90bcca0af8..0ea62a487e 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java @@ -18,6 +18,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.StringMap; import com.linkedin.dataset.DatasetProfile; import com.linkedin.dataset.DatasetProperties; +import com.linkedin.dataset.UpstreamLineage; import com.linkedin.entity.Entity; import com.linkedin.entity.EntityResponse; import com.linkedin.entity.EnvelopedAspect; @@ -28,6 +29,7 @@ import com.linkedin.metadata.aspect.Aspect; import com.linkedin.metadata.aspect.CorpUserAspect; import com.linkedin.metadata.aspect.CorpUserAspectArray; import com.linkedin.metadata.aspect.VersionedAspect; +import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.key.CorpUserKey; import com.linkedin.metadata.models.AspectSpec; @@ -487,6 +489,133 @@ abstract public class EntityServiceTest> pairToIngest = new ArrayList<>(); + + final UpstreamLineage upstreamLineage = AspectGenerationUtils.createUpstreamLineage(); + String aspectName1 = AspectGenerationUtils.getAspectName(upstreamLineage); + pairToIngest.add(getAspectRecordPair(upstreamLineage, UpstreamLineage.class)); + + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(); + + _entityService.ingestAspects(entityUrn, pairToIngest, TEST_AUDIT_STAMP, metadata1); + + final MetadataChangeLog initialChangeLog = new MetadataChangeLog(); + initialChangeLog.setEntityType(entityUrn.getEntityType()); + initialChangeLog.setEntityUrn(entityUrn); + initialChangeLog.setChangeType(ChangeType.UPSERT); + initialChangeLog.setAspectName(aspectName1); + initialChangeLog.setCreated(TEST_AUDIT_STAMP); + + GenericAspect aspect = GenericRecordUtils.serializeAspect(pairToIngest.get(0).getSecond()); + + initialChangeLog.setAspect(aspect); + initialChangeLog.setSystemMetadata(metadata1); + + final MetadataChangeLog restateChangeLog = new MetadataChangeLog(); + restateChangeLog.setEntityType(entityUrn.getEntityType()); + restateChangeLog.setEntityUrn(entityUrn); + restateChangeLog.setChangeType(ChangeType.RESTATE); + restateChangeLog.setAspectName(aspectName1); + restateChangeLog.setCreated(TEST_AUDIT_STAMP); + restateChangeLog.setAspect(aspect); + restateChangeLog.setSystemMetadata(metadata1); + restateChangeLog.setPreviousAspectValue(aspect); + restateChangeLog.setPreviousSystemMetadata(simulatePullFromDB(metadata1, SystemMetadata.class)); + + Map latestAspects = _entityService.getLatestAspectsForUrn( + entityUrn, + new HashSet<>(List.of(aspectName1)) + ); + assertTrue(DataTemplateUtil.areEqual(upstreamLineage, latestAspects.get(aspectName1))); + + verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn), + Mockito.any(), Mockito.eq(initialChangeLog)); + verify(_mockProducer, times(1)).produceMetadataAuditEvent(Mockito.eq(entityUrn), + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + + // Mockito detects the previous invocation and throws an error in verifying the second call unless invocations are cleared + clearInvocations(_mockProducer); + + _entityService.ingestAspects(entityUrn, pairToIngest, TEST_AUDIT_STAMP, metadata1); + + verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn), + Mockito.any(), Mockito.eq(restateChangeLog)); + verify(_mockProducer, times(1)).produceMetadataAuditEvent(Mockito.eq(entityUrn), + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + + + verifyNoMoreInteractions(_mockProducer); + } + + @Test + public void testReingestLineageProposal() throws Exception { + + Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:looker,sample_dataset,PROD)"); + + List> pairToIngest = new ArrayList<>(); + + final UpstreamLineage upstreamLineage = AspectGenerationUtils.createUpstreamLineage(); + String aspectName1 = AspectGenerationUtils.getAspectName(upstreamLineage); + + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(); + MetadataChangeProposal mcp1 = new MetadataChangeProposal(); + mcp1.setEntityType(entityUrn.getEntityType()); + GenericAspect genericAspect = GenericRecordUtils.serializeAspect(upstreamLineage); + mcp1.setAspect(genericAspect); + mcp1.setEntityUrn(entityUrn); + mcp1.setChangeType(ChangeType.UPSERT); + mcp1.setSystemMetadata(metadata1); + mcp1.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME); + + _entityService.ingestProposal(mcp1, TEST_AUDIT_STAMP, false); + + final MetadataChangeLog initialChangeLog = new MetadataChangeLog(); + initialChangeLog.setEntityType(entityUrn.getEntityType()); + initialChangeLog.setEntityUrn(entityUrn); + initialChangeLog.setChangeType(ChangeType.UPSERT); + initialChangeLog.setAspectName(aspectName1); + initialChangeLog.setCreated(TEST_AUDIT_STAMP); + + initialChangeLog.setAspect(genericAspect); + initialChangeLog.setSystemMetadata(metadata1); + + final MetadataChangeLog restateChangeLog = new MetadataChangeLog(); + restateChangeLog.setEntityType(entityUrn.getEntityType()); + restateChangeLog.setEntityUrn(entityUrn); + restateChangeLog.setChangeType(ChangeType.RESTATE); + restateChangeLog.setAspectName(aspectName1); + restateChangeLog.setCreated(TEST_AUDIT_STAMP); + restateChangeLog.setAspect(genericAspect); + restateChangeLog.setSystemMetadata(metadata1); + restateChangeLog.setPreviousAspectValue(genericAspect); + restateChangeLog.setPreviousSystemMetadata(simulatePullFromDB(metadata1, SystemMetadata.class)); + + Map latestAspects = _entityService.getLatestAspectsForUrn( + entityUrn, + new HashSet<>(List.of(aspectName1)) + ); + assertTrue(DataTemplateUtil.areEqual(upstreamLineage, latestAspects.get(aspectName1))); + + verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn), + Mockito.any(), Mockito.eq(initialChangeLog)); + + // Mockito detects the previous invocation and throws an error in verifying the second call unless invocations are cleared + clearInvocations(_mockProducer); + + _entityService.ingestProposal(mcp1, TEST_AUDIT_STAMP, false); + + verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn), + Mockito.any(), Mockito.eq(restateChangeLog)); + + + verifyNoMoreInteractions(_mockProducer); + } + @Test public void testIngestTimeseriesAspect() throws Exception { Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:foo,bar,PROD)"); @@ -1021,6 +1150,42 @@ abstract public class EntityServiceTest> pairToIngest = new ArrayList<>(); + + final UpstreamLineage upstreamLineage = AspectGenerationUtils.createUpstreamLineage(); + String aspectName1 = AspectGenerationUtils.getAspectName(upstreamLineage); + pairToIngest.add(getAspectRecordPair(upstreamLineage, UpstreamLineage.class)); + + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(); + + _entityService.ingestAspects(entityUrn, pairToIngest, TEST_AUDIT_STAMP, metadata1); + + clearInvocations(_mockProducer); + + RestoreIndicesArgs args = new RestoreIndicesArgs(); + args.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME); + args.setBatchSize(1); + args.setStart(0); + args.setBatchDelayMs(1L); + args.setNumThreads(1); + args.setUrn(urnStr); + _entityService.restoreIndices(args, obj -> { }); + + ArgumentCaptor mclCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class); + verify(_mockProducer, times(1)).produceMetadataChangeLog( + Mockito.eq(entityUrn), Mockito.any(), mclCaptor.capture()); + MetadataChangeLog mcl = mclCaptor.getValue(); + assertEquals(mcl.getEntityType(), "dataset"); + assertNull(mcl.getPreviousAspectValue()); + assertNull(mcl.getPreviousSystemMetadata()); + assertEquals(mcl.getChangeType(), ChangeType.RESTATE); + assertEquals(mcl.getSystemMetadata().getProperties().get(FORCE_INDEXING_KEY), "true"); + } @Nonnull protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String email) throws Exception { diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java index 4a9505d332..c7d3ae7fb7 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java @@ -2,6 +2,7 @@ package com.linkedin.metadata.kafka.hook; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.linkedin.common.InputField; import com.linkedin.common.InputFields; @@ -21,6 +22,7 @@ import com.linkedin.gms.factory.timeseries.TimeseriesAspectServiceFactory; import com.linkedin.metadata.Constants; import com.linkedin.metadata.graph.Edge; import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; import com.linkedin.metadata.key.SchemaFieldKey; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; @@ -61,6 +63,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Import; import org.springframework.stereotype.Component; +import static com.linkedin.metadata.Constants.*; import static com.linkedin.metadata.search.utils.QueryUtils.*; // TODO: Backfill tests for this class in UpdateIndicesHookTest.java @@ -83,8 +86,20 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { private final EntityRegistry _entityRegistry; private final SearchDocumentTransformer _searchDocumentTransformer; - @Value("${featureFlags.graphServiceDiffModeEnabled:false}") - private boolean _diffMode; + @Value("${featureFlags.graphServiceDiffModeEnabled:true}") + private boolean _graphDiffMode; + @Value("${featureFlags.searchServiceDiffModeEnabled:true}") + private boolean _searchDiffMode; + + @VisibleForTesting + void setGraphDiffMode(boolean graphDiffMode) { + _graphDiffMode = graphDiffMode; + } + + @VisibleForTesting + void setSearchDiffMode(boolean searchDiffMode) { + _searchDiffMode = searchDiffMode; + } @Autowired public UpdateIndicesHook( @@ -160,11 +175,13 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { } // Step 1. For all aspects, attempt to update Search - updateSearchService(entitySpec.getName(), urn, aspectSpec, aspect, - event.hasSystemMetadata() ? event.getSystemMetadata().getRunId() : null); + updateSearchService(entitySpec.getName(), urn, aspectSpec, aspect, event.getSystemMetadata(), previousAspect); // Step 2. For all aspects, attempt to update Graph - if (_diffMode) { + SystemMetadata systemMetadata = event.getSystemMetadata(); + if (_graphDiffMode && _graphService instanceof ElasticSearchGraphService + && (systemMetadata == null || systemMetadata.getProperties() == null + || !Boolean.parseBoolean(systemMetadata.getProperties().get(FORCE_INDEXING_KEY)))) { updateGraphServiceDiff(urn, aspectSpec, previousAspect, aspect, event); } else { updateGraphService(urn, aspectSpec, aspect, event); @@ -402,8 +419,9 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { * Process snapshot and update search index */ private void updateSearchService(String entityName, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect, - @Nullable String runId) { + @Nullable SystemMetadata systemMetadata, @Nullable RecordTemplate previousAspect) { Optional searchDocument; + Optional previousSearchDocument = Optional.empty(); try { searchDocument = _searchDocumentTransformer.transformAspect(urn, aspect, aspectSpec, false); } catch (Exception e) { @@ -421,6 +439,28 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { return; } + String searchDocumentValue = searchDocument.get(); + if (_searchDiffMode && (systemMetadata == null || systemMetadata.getProperties() == null + || !Boolean.parseBoolean(systemMetadata.getProperties().get(FORCE_INDEXING_KEY)))) { + if (previousAspect != null) { + try { + previousSearchDocument = _searchDocumentTransformer.transformAspect(urn, previousAspect, aspectSpec, false); + } catch (Exception e) { + log.error( + "Error in getting documents from previous aspect state: {} for aspect {}, continuing without diffing.", e, + aspectSpec.getName()); + } + } + + if (previousSearchDocument.isPresent()) { + String previousSearchDocumentValue = previousSearchDocument.get(); + if (searchDocumentValue.equals(previousSearchDocumentValue)) { + // No changes to search document, skip writing no-op update + return; + } + } + } + _entitySearchService.upsertDocument(entityName, searchDocument.get(), docId.get()); } diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/MCLProcessingTestDataGenerator.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/MCLProcessingTestDataGenerator.java new file mode 100644 index 0000000000..0897cfa010 --- /dev/null +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/MCLProcessingTestDataGenerator.java @@ -0,0 +1,131 @@ +package com.linkedin.metadata.kafka.hook; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.StringMap; +import com.linkedin.dataset.DatasetLineageType; +import com.linkedin.dataset.Upstream; +import com.linkedin.dataset.UpstreamArray; +import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.MetadataChangeLog; +import com.linkedin.mxe.SystemMetadata; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + +import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.kafka.hook.UpdateIndicesHookTest.*; + + +public class MCLProcessingTestDataGenerator { + + private MCLProcessingTestDataGenerator() { + + } + + public static MetadataChangeLog createBaseChangeLog() throws URISyntaxException { + MetadataChangeLog event = new MetadataChangeLog(); + event.setEntityType(Constants.DATASET_ENTITY_NAME); + event.setAspectName(Constants.UPSTREAM_LINEAGE_ASPECT_NAME); + event.setChangeType(ChangeType.UPSERT); + + UpstreamLineage upstreamLineage = createBaseLineageAspect(); + + event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage)); + event.setEntityUrn(Urn.createFromString(TEST_DATASET_URN)); + event.setEntityType(DATASET_ENTITY_NAME); + event.setCreated(new AuditStamp().setActor(UrnUtils.getUrn(TEST_ACTOR_URN)).setTime(EVENT_TIME)); + return event; + } + + public static MetadataChangeLog setToRestate(MetadataChangeLog changeLog) { + return changeLog.setChangeType(ChangeType.RESTATE); + } + + public static MetadataChangeLog setToUpsert(MetadataChangeLog changeLog) { + return changeLog.setChangeType(ChangeType.UPSERT); + } + + public static MetadataChangeLog setSystemMetadata(MetadataChangeLog changeLog) { + SystemMetadata systemMetadata = new SystemMetadata(); + systemMetadata.setRunId(RUN_ID_1); + systemMetadata.setLastObserved(LAST_OBSERVED_1); + return changeLog.setSystemMetadata(systemMetadata); + } + + public static MetadataChangeLog setSystemMetadataWithForceIndexing(MetadataChangeLog changeLog) { + SystemMetadata systemMetadata = new SystemMetadata(); + systemMetadata.setRunId(RUN_ID_1); + systemMetadata.setLastObserved(LAST_OBSERVED_1); + StringMap stringMap = new StringMap(); + stringMap.put(FORCE_INDEXING_KEY, Boolean.TRUE.toString()); + systemMetadata.setProperties(stringMap); + return changeLog.setSystemMetadata(systemMetadata); + } + + public static MetadataChangeLog setPreviousData(MetadataChangeLog changeLog, MetadataChangeLog previousState) { + changeLog.setPreviousAspectValue(previousState.getAspect()); + return changeLog.setPreviousSystemMetadata(previousState.getSystemMetadata()); + } + + public static MetadataChangeLog setPreviousDataToEmpty(MetadataChangeLog changeLog) { + changeLog.removePreviousAspectValue(); + changeLog.removePreviousSystemMetadata(); + return changeLog; + } + + public static MetadataChangeLog modifySystemMetadata(MetadataChangeLog changeLog) { + SystemMetadata systemMetadata = new SystemMetadata(); + systemMetadata.setRunId(RUN_ID_2); + systemMetadata.setLastObserved(LAST_OBSERVED_2); + return changeLog.setSystemMetadata(systemMetadata); + } + + public static MetadataChangeLog modifySystemMetadata2(MetadataChangeLog changeLog) { + SystemMetadata systemMetadata = new SystemMetadata(); + systemMetadata.setRunId(RUN_ID_3); + systemMetadata.setLastObserved(LAST_OBSERVED_3); + return changeLog.setSystemMetadata(systemMetadata); + } + + public static MetadataChangeLog modifyAspect(MetadataChangeLog changeLog, UpstreamLineage upstreamLineage) { + return changeLog.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage)); + } + + public static UpstreamLineage createBaseLineageAspect() throws URISyntaxException { + UpstreamLineage upstreamLineage = new UpstreamLineage(); + final UpstreamArray upstreamArray = new UpstreamArray(); + final Upstream upstream = new Upstream(); + upstream.setType(DatasetLineageType.TRANSFORMED); + upstream.setDataset(DatasetUrn.createFromString(TEST_DATASET_URN_2)); + upstreamArray.add(upstream); + upstreamLineage.setUpstreams(upstreamArray); + + return upstreamLineage; + } + + public static UpstreamLineage addLineageEdge(UpstreamLineage upstreamLineage) throws URISyntaxException { + UpstreamArray upstreamArray = upstreamLineage.getUpstreams(); + Upstream upstream = new Upstream(); + upstream.setType(DatasetLineageType.TRANSFORMED); + upstream.setDataset(DatasetUrn.createFromString(TEST_DATASET_URN_3)); + upstreamArray.add(upstream); + return upstreamLineage.setUpstreams(upstreamArray); + } + + public static UpstreamLineage modifyNonSearchableField(UpstreamLineage upstreamLineage) { + UpstreamArray upstreamArray = upstreamLineage.getUpstreams(); + Upstream upstream = upstreamArray.get(0); + Map stringMap = new HashMap<>(); + stringMap.put("key", "value"); + upstream.setProperties(new StringMap(stringMap)); + upstreamArray.set(0, upstream); + return upstreamLineage.setUpstreams(upstreamArray); + } + +} diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java index 9859682e41..b4079386ef 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java @@ -24,6 +24,7 @@ import com.linkedin.metadata.config.SystemUpdateConfiguration; import com.linkedin.metadata.graph.Edge; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener; +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; import com.linkedin.metadata.key.ChartKey; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; @@ -39,6 +40,9 @@ import com.linkedin.metadata.timeseries.TimeseriesAspectService; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.schema.SchemaField; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import org.mockito.Mockito; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -47,23 +51,32 @@ import java.util.ArrayList; import java.util.Collections; import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; +import static com.linkedin.metadata.kafka.hook.MCLProcessingTestDataGenerator.*; import static com.linkedin.metadata.search.utils.QueryUtils.newRelationshipFilter; public class UpdateIndicesHookTest { // going to want a test where we have an upstreamLineage aspect with finegrained, check that we call _graphService.addEdge for each edge // as well as _graphService.removeEdgesFromNode for each field and their relationships - private static final long EVENT_TIME = 123L; - private static final String TEST_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)"; - private static final String TEST_CHART_URN = "urn:li:chart:(looker,dashboard_elements.1)"; - private static final String TEST_ACTOR_URN = "urn:li:corpuser:test"; - private static final String DOWNSTREAM_OF = "DownstreamOf"; + static final long EVENT_TIME = 123L; + static final String TEST_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)"; + static final String TEST_DATASET_URN_2 = "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD)"; + static final String TEST_DATASET_URN_3 = "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressKafkaDataset,PROD)"; + static final String TEST_CHART_URN = "urn:li:chart:(looker,dashboard_elements.1)"; + static final String TEST_ACTOR_URN = "urn:li:corpuser:test"; + static final String DOWNSTREAM_OF = "DownstreamOf"; + static final String RUN_ID_1 = "123"; + static final String RUN_ID_2 = "456"; + static final String RUN_ID_3 = "789"; + static final long LAST_OBSERVED_1 = 123L; + static final long LAST_OBSERVED_2 = 456L; + static final long LAST_OBSERVED_3 = 789L; private UpdateIndicesHook _updateIndicesHook; private GraphService _mockGraphService; private EntitySearchService _mockEntitySearchService; private TimeseriesAspectService _mockTimeseriesAspectService; private SystemMetadataService _mockSystemMetadataService; - private SearchDocumentTransformer _mockSearchDocumentTransformer; + private SearchDocumentTransformer _searchDocumentTransformer; private DataHubUpgradeKafkaListener _mockDataHubUpgradeKafkaListener; private ConfigurationProvider _mockConfigurationProvider; private Urn _actorUrn; @@ -73,11 +86,11 @@ public class UpdateIndicesHookTest { _actorUrn = UrnUtils.getUrn(TEST_ACTOR_URN); EntityRegistry registry = new ConfigEntityRegistry( UpdateIndicesHookTest.class.getClassLoader().getResourceAsStream("test-entity-registry.yml")); - _mockGraphService = Mockito.mock(GraphService.class); + _mockGraphService = Mockito.mock(ElasticSearchGraphService.class); _mockEntitySearchService = Mockito.mock(EntitySearchService.class); _mockTimeseriesAspectService = Mockito.mock(TimeseriesAspectService.class); _mockSystemMetadataService = Mockito.mock(SystemMetadataService.class); - _mockSearchDocumentTransformer = Mockito.mock(SearchDocumentTransformer.class); + _searchDocumentTransformer = new SearchDocumentTransformer(1000, 1000); _mockDataHubUpgradeKafkaListener = Mockito.mock(DataHubUpgradeKafkaListener.class); _mockConfigurationProvider = Mockito.mock(ConfigurationProvider.class); ElasticSearchConfiguration elasticSearchConfiguration = new ElasticSearchConfiguration(); @@ -90,12 +103,13 @@ public class UpdateIndicesHookTest { _mockTimeseriesAspectService, _mockSystemMetadataService, registry, - _mockSearchDocumentTransformer + _searchDocumentTransformer ); } @Test public void testFineGrainedLineageEdgesAreAdded() throws Exception { + _updateIndicesHook.setGraphDiffMode(false); Urn upstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)"); Urn downstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)"); MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn); @@ -112,6 +126,7 @@ public class UpdateIndicesHookTest { @Test public void testFineGrainedLineageEdgesAreAddedRestate() throws Exception { + _updateIndicesHook.setGraphDiffMode(false); Urn upstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)"); Urn downstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)"); MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn, ChangeType.RESTATE); @@ -124,6 +139,10 @@ public class UpdateIndicesHookTest { Mockito.eq(new ArrayList<>(Collections.singleton(DOWNSTREAM_OF))), Mockito.eq(newRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING)) ); + Mockito.verify(_mockEntitySearchService, Mockito.times(1)) + .upsertDocument(Mockito.eq(DATASET_ENTITY_NAME), Mockito.any(), + Mockito.eq(URLEncoder.encode("urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)", + StandardCharsets.UTF_8))); } @Test @@ -138,7 +157,7 @@ public class UpdateIndicesHookTest { _mockTimeseriesAspectService, _mockSystemMetadataService, mockEntityRegistry, - _mockSearchDocumentTransformer + _searchDocumentTransformer ); _updateIndicesHook.invoke(event); @@ -154,15 +173,194 @@ public class UpdateIndicesHookTest { ); } + @Test + public void testMCLProcessExhaustive() throws URISyntaxException { + + _updateIndicesHook.setGraphDiffMode(true); + _updateIndicesHook.setSearchDiffMode(true); + /* + * newLineage + */ + MetadataChangeLog changeLog = createBaseChangeLog(); + + _updateIndicesHook.invoke(changeLog); + + // One new edge added + Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.any()); + // Update document + Mockito.verify(_mockEntitySearchService, Mockito.times(1)) + .upsertDocument(Mockito.eq(DATASET_ENTITY_NAME), Mockito.any(), + Mockito.eq(URLEncoder.encode(TEST_DATASET_URN, StandardCharsets.UTF_8))); + + /* + * restateNewLineage + */ + Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + changeLog = setSystemMetadata(changeLog); + changeLog = setPreviousData(setToRestate(changeLog), changeLog); + + _updateIndicesHook.invoke(changeLog); + + // No edges added + Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + // Timestamp updated + Mockito.verify(_mockGraphService, Mockito.times(1)).upsertEdge(Mockito.any()); + // No document change + Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); + + /* + * addLineage + */ + Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + changeLog = setPreviousData(setToUpsert(changeLog), changeLog); + UpstreamLineage currentLineage = addLineageEdge(createBaseLineageAspect()); + changeLog = modifyAspect(changeLog, currentLineage); + + _updateIndicesHook.invoke(changeLog); + + // New edge added + Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.any()); + // Update timestamp of old edge + Mockito.verify(_mockGraphService, Mockito.times(1)).upsertEdge(Mockito.any()); + // Document update for new upstream + Mockito.verify(_mockEntitySearchService, Mockito.times(1)) + .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); + + /* + * restateAddLineage + */ + Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + changeLog = setPreviousData(setToRestate(changeLog), changeLog); + + _updateIndicesHook.invoke(changeLog); + + // No new edges + Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + // Update timestamps of old edges + Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + // No document update + Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); + + + /* + * noOpUpsert + */ + Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + changeLog = setPreviousData(setToUpsert(changeLog), changeLog); + + _updateIndicesHook.invoke(changeLog); + + // No new edges + Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + // Update timestamps of old edges + Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + // No document update + Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); + + /* + * restateNoOp + */ + Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + changeLog = setPreviousData(setToRestate(changeLog), changeLog); + + _updateIndicesHook.invoke(changeLog); + + // No new edges + Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + // Update timestamps of old edges + Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + // No document update + Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); + + /* + * systemMetadataChange + */ + Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + changeLog = setPreviousData(setToUpsert(changeLog), changeLog); + changeLog = modifySystemMetadata(changeLog); + + _updateIndicesHook.invoke(changeLog); + + // No new edges + Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + // Update timestamps of old edges + Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + // No document update + Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); + + /* + * restateSystemMetadataChange + */ + Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + changeLog = setPreviousData(setToRestate(changeLog), changeLog); + changeLog = modifySystemMetadata2(changeLog); + + _updateIndicesHook.invoke(changeLog); + + // No new edges + Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + // Update timestamps of old edges + Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + // No document update + Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); + + /* + * modifyNonSearchableField + */ + Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + changeLog = setPreviousData(setToUpsert(changeLog), changeLog); + currentLineage = modifyNonSearchableField(currentLineage); + changeLog = modifyAspect(changeLog, currentLineage); + + _updateIndicesHook.invoke(changeLog); + + // No new edges + Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + // Update timestamps of old edges + Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + // No document update + Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); + + /* + * force reindexing + */ + Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + changeLog = setPreviousDataToEmpty(setToRestate(changeLog)); + changeLog = setSystemMetadataWithForceIndexing(changeLog); + + _updateIndicesHook.invoke(changeLog); + + // Forced removal of all edges + Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode(Mockito.any(), + Mockito.any(), Mockito.any()); + // Forced add of edges + Mockito.verify(_mockGraphService, Mockito.times(2)).addEdge(Mockito.any()); + // Forced document update + Mockito.verify(_mockEntitySearchService, Mockito.times(1)) + .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); + } + private EntityRegistry createMockEntityRegistry() { // need to mock this registry instead of using test-entity-registry.yml because inputFields does not work due to a known bug EntityRegistry mockEntityRegistry = Mockito.mock(EntityRegistry.class); EntitySpec entitySpec = Mockito.mock(EntitySpec.class); AspectSpec aspectSpec = createMockAspectSpec(InputFields.class, InputFields.dataSchema()); + AspectSpec upstreamLineageAspectSpec = createMockAspectSpec(UpstreamLineage.class, UpstreamLineage.dataSchema()); Mockito.when(mockEntityRegistry.getEntitySpec(Constants.CHART_ENTITY_NAME)).thenReturn(entitySpec); + Mockito.when(mockEntityRegistry.getEntitySpec(Constants.DATASET_ENTITY_NAME)).thenReturn(entitySpec); Mockito.when(entitySpec.getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME)).thenReturn(aspectSpec); + Mockito.when(entitySpec.getAspectSpec(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)).thenReturn(upstreamLineageAspectSpec); Mockito.when(aspectSpec.isTimeseries()).thenReturn(false); Mockito.when(aspectSpec.getName()).thenReturn(Constants.INPUT_FIELDS_ASPECT_NAME); + Mockito.when(upstreamLineageAspectSpec.isTimeseries()).thenReturn(false); + Mockito.when(upstreamLineageAspectSpec.getName()).thenReturn(Constants.UPSTREAM_LINEAGE_ASPECT_NAME); AspectSpec chartKeyAspectSpec = createMockAspectSpec(ChartKey.class, ChartKey.dataSchema()); Mockito.when(entitySpec.getKeyAspectSpec()).thenReturn(chartKeyAspectSpec); return mockEntityRegistry; diff --git a/metadata-service/factories/src/main/resources/application.yml b/metadata-service/factories/src/main/resources/application.yml index 5fed04af05..90f3fafaf5 100644 --- a/metadata-service/factories/src/main/resources/application.yml +++ b/metadata-service/factories/src/main/resources/application.yml @@ -264,6 +264,7 @@ featureFlags: graphServiceDiffModeEnabled: ${GRAPH_SERVICE_DIFF_MODE_ENABLED:true} # Enables diff mode for graph writes, uses a different code path that produces a diff from previous to next to write relationships instead of wholesale deleting edges and reading pointInTimeCreationEnabled: ${POINT_IN_TIME_CREATION_ENABLED:false} # Enables creation of point in time snapshots for the scroll API, only works with main line ElasticSearch releases after 7.10. OpenSearch is unsupported, plans to eventually target OpenSearch 2.4+ with a divergent client alwaysEmitChangeLog: ${ALWAYS_EMIT_CHANGE_LOG:false} # Enables always emitting a MCL even when no changes are detected. Used for Time Based Lineage when no changes occur. + searchServiceDiffModeEnabled: ${SEARCH_SERVICE_DIFF_MODE_ENABLED:true} # Enables diff mode for search document writes, reduces amount of writes to ElasticSearch documents for no-ops entityChangeEvents: enabled: ${ENABLE_ENTITY_CHANGE_EVENTS_HOOK:true}