fix(platform): Ensure time based lineage handles noop changes (#7657)

Co-authored-by: aditya-radhakrishnan <aditya@acryl.io>
Co-authored-by: RyanHolstien <RyanHolstien@users.noreply.github.com>
This commit is contained in:
Shirshanka Das 2023-03-21 08:43:56 -07:00 committed by GitHub
parent 7f5744200b
commit fc9ad49821
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 626 additions and 42 deletions

View File

@ -31,4 +31,8 @@ public class RelationshipFieldSpec implements FieldSpec {
public List<String> getValidDestinationTypes() {
return relationshipAnnotation.getValidDestinationTypes();
}
public boolean isLineageRelationship() {
return relationshipAnnotation.isLineage();
}
}

View File

@ -16,7 +16,12 @@ public class Constants {
public static final Long ASPECT_LATEST_VERSION = 0L;
public static final String UNKNOWN_DATA_PLATFORM = "urn:li:dataPlatform:unknown";
/**
* System Metadata
*/
public static final String DEFAULT_RUN_ID = "no-run-id-provided";
// Forces indexing for no-ops, enabled for restore indices calls. Only considered in the no-op case
public static final String FORCE_INDEXING_KEY = "forceIndexing";
/**
* Entities

View File

@ -27,6 +27,7 @@ import com.linkedin.data.schema.validator.Validator;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringArray;
import com.linkedin.data.template.StringMap;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.dataplatform.DataPlatformInfo;
import com.linkedin.entity.AspectType;
@ -49,6 +50,7 @@ import com.linkedin.metadata.entity.validation.ValidationUtils;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.RelationshipFieldSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.template.AspectTemplateEngine;
import com.linkedin.metadata.query.ListUrnsResult;
@ -781,7 +783,7 @@ public class EntityService {
// Produce MCL after a successful update
boolean isNoOp = oldValue == updatedValue;
if (!isNoOp || _alwaysEmitChangeLog) {
if (!isNoOp || _alwaysEmitChangeLog || shouldAspectEmitChangeLog(urn, aspectName)) {
log.debug(String.format("Producing MetadataChangeLog for ingested aspect %s, urn %s", aspectName, urn));
String entityName = urnToEntityName(urn);
EntitySpec entitySpec = getEntityRegistry().getEntitySpec(entityName);
@ -1028,32 +1030,39 @@ public class EntityService {
RecordTemplate newAspect, SystemMetadata newSystemMetadata,
MetadataChangeProposal mcp, Urn entityUrn,
AuditStamp auditStamp, AspectSpec aspectSpec) {
log.debug("Producing MetadataChangeLog for ingested aspect {}, urn {}", mcp.getAspectName(), entityUrn);
boolean isNoOp = oldAspect == newAspect;
if (!isNoOp || _alwaysEmitChangeLog || shouldAspectEmitChangeLog(aspectSpec)) {
log.debug("Producing MetadataChangeLog for ingested aspect {}, urn {}", mcp.getAspectName(), entityUrn);
final MetadataChangeLog metadataChangeLog = new MetadataChangeLog(mcp.data());
metadataChangeLog.setEntityUrn(entityUrn);
metadataChangeLog.setCreated(auditStamp);
// The change log produced by this method is always an upsert as it contains the entire RecordTemplate update
metadataChangeLog.setChangeType(ChangeType.UPSERT);
final MetadataChangeLog metadataChangeLog = new MetadataChangeLog(mcp.data());
metadataChangeLog.setEntityUrn(entityUrn);
metadataChangeLog.setCreated(auditStamp);
metadataChangeLog.setChangeType(isNoOp ? ChangeType.RESTATE : ChangeType.UPSERT);
if (oldAspect != null) {
metadataChangeLog.setPreviousAspectValue(GenericRecordUtils.serializeAspect(oldAspect));
if (oldAspect != null) {
metadataChangeLog.setPreviousAspectValue(GenericRecordUtils.serializeAspect(oldAspect));
}
if (oldSystemMetadata != null) {
metadataChangeLog.setPreviousSystemMetadata(oldSystemMetadata);
}
if (newAspect != null) {
metadataChangeLog.setAspect(GenericRecordUtils.serializeAspect(newAspect));
}
if (newSystemMetadata != null) {
metadataChangeLog.setSystemMetadata(newSystemMetadata);
}
log.debug("Serialized MCL event: {}", metadataChangeLog);
produceMetadataChangeLog(entityUrn, aspectSpec, metadataChangeLog);
return true;
} else {
log.debug(
"Skipped producing MetadataChangeLog for ingested aspect {}, urn {}. Aspect has not changed.",
mcp.getAspectName(), entityUrn);
return false;
}
if (oldSystemMetadata != null) {
metadataChangeLog.setPreviousSystemMetadata(oldSystemMetadata);
}
if (newAspect != null) {
metadataChangeLog.setAspect(GenericRecordUtils.serializeAspect(newAspect));
}
if (newSystemMetadata != null) {
metadataChangeLog.setSystemMetadata(newSystemMetadata);
}
log.debug("Serialized MCL event: {}", metadataChangeLog);
produceMetadataChangeLog(entityUrn, aspectSpec, metadataChangeLog);
return true;
}
public Integer getCountAspect(@Nonnull String aspectName, @Nullable String urnLike) {
@ -1130,7 +1139,12 @@ public class EntityService {
result.createRecordMs += System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();
// Force indexing to skip diff mode and fix error states
SystemMetadata latestSystemMetadata = EntityUtils.parseSystemMetadata(aspect.getSystemMetadata());
StringMap properties = latestSystemMetadata.getProperties() != null ? latestSystemMetadata.getProperties()
: new StringMap();
properties.put(FORCE_INDEXING_KEY, Boolean.TRUE.toString());
latestSystemMetadata.setProperties(properties);
// 5. Produce MAE events for the aspect record
produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, null, aspectRecord, null,
@ -2050,12 +2064,26 @@ public class EntityService {
urn,
ImmutableSet.of(Constants.DATA_PLATFORM_INFO_ASPECT_NAME)
);
if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.DATA_PLATFORM_INFO_ASPECT_NAME)) {
return new DataPlatformInfo(entityResponse.getAspects().get(Constants.DATA_PLATFORM_INFO_ASPECT_NAME).getValue().data());
if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects()
.containsKey(Constants.DATA_PLATFORM_INFO_ASPECT_NAME)) {
return new DataPlatformInfo(
entityResponse.getAspects().get(Constants.DATA_PLATFORM_INFO_ASPECT_NAME).getValue().data());
}
} catch (Exception e) {
log.warn(String.format("Failed to find Data Platform Info for urn %s", urn));
}
return null;
}
private boolean shouldAspectEmitChangeLog(@Nonnull final Urn urn, @Nonnull final String aspectName) {
final String entityName = urnToEntityName(urn);
final EntitySpec entitySpec = getEntityRegistry().getEntitySpec(entityName);
final AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName);
return shouldAspectEmitChangeLog(aspectSpec);
}
private boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspectSpec) {
final List<RelationshipFieldSpec> relationshipFieldSpecs = aspectSpec.getRelationshipFieldSpecs();
return relationshipFieldSpecs.stream().anyMatch(RelationshipFieldSpec::isLineageRelationship);
}
}

View File

@ -6,11 +6,15 @@ import com.linkedin.common.ChangeAuditStamps;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.Upstream;
import com.linkedin.dataset.UpstreamArray;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.mxe.SystemMetadata;
import java.util.Collections;
import javax.annotation.Nonnull;
@ -62,6 +66,14 @@ public class AspectGenerationUtils {
return chartInfo;
}
@Nonnull
public static UpstreamLineage createUpstreamLineage() {
final UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreams = new UpstreamArray();
upstreamLineage.setUpstreams(upstreams);
return upstreamLineage;
}
@Nonnull
public static String getAspectName(RecordTemplate record) {
return PegasusUtils.getAspectNameFromSchema(record.schema());

View File

@ -18,6 +18,7 @@ import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringMap;
import com.linkedin.dataset.DatasetProfile;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.entity.Entity;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
@ -28,6 +29,7 @@ import com.linkedin.metadata.aspect.Aspect;
import com.linkedin.metadata.aspect.CorpUserAspect;
import com.linkedin.metadata.aspect.CorpUserAspectArray;
import com.linkedin.metadata.aspect.VersionedAspect;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.models.AspectSpec;
@ -487,6 +489,133 @@ abstract public class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
verifyNoMoreInteractions(_mockProducer);
}
@Test
public void testReingestLineageAspect() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:looker,sample_dataset,PROD)");
List<Pair<String, RecordTemplate>> pairToIngest = new ArrayList<>();
final UpstreamLineage upstreamLineage = AspectGenerationUtils.createUpstreamLineage();
String aspectName1 = AspectGenerationUtils.getAspectName(upstreamLineage);
pairToIngest.add(getAspectRecordPair(upstreamLineage, UpstreamLineage.class));
SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata();
_entityService.ingestAspects(entityUrn, pairToIngest, TEST_AUDIT_STAMP, metadata1);
final MetadataChangeLog initialChangeLog = new MetadataChangeLog();
initialChangeLog.setEntityType(entityUrn.getEntityType());
initialChangeLog.setEntityUrn(entityUrn);
initialChangeLog.setChangeType(ChangeType.UPSERT);
initialChangeLog.setAspectName(aspectName1);
initialChangeLog.setCreated(TEST_AUDIT_STAMP);
GenericAspect aspect = GenericRecordUtils.serializeAspect(pairToIngest.get(0).getSecond());
initialChangeLog.setAspect(aspect);
initialChangeLog.setSystemMetadata(metadata1);
final MetadataChangeLog restateChangeLog = new MetadataChangeLog();
restateChangeLog.setEntityType(entityUrn.getEntityType());
restateChangeLog.setEntityUrn(entityUrn);
restateChangeLog.setChangeType(ChangeType.RESTATE);
restateChangeLog.setAspectName(aspectName1);
restateChangeLog.setCreated(TEST_AUDIT_STAMP);
restateChangeLog.setAspect(aspect);
restateChangeLog.setSystemMetadata(metadata1);
restateChangeLog.setPreviousAspectValue(aspect);
restateChangeLog.setPreviousSystemMetadata(simulatePullFromDB(metadata1, SystemMetadata.class));
Map<String, RecordTemplate> latestAspects = _entityService.getLatestAspectsForUrn(
entityUrn,
new HashSet<>(List.of(aspectName1))
);
assertTrue(DataTemplateUtil.areEqual(upstreamLineage, latestAspects.get(aspectName1)));
verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn),
Mockito.any(), Mockito.eq(initialChangeLog));
verify(_mockProducer, times(1)).produceMetadataAuditEvent(Mockito.eq(entityUrn),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
// Mockito detects the previous invocation and throws an error in verifying the second call unless invocations are cleared
clearInvocations(_mockProducer);
_entityService.ingestAspects(entityUrn, pairToIngest, TEST_AUDIT_STAMP, metadata1);
verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn),
Mockito.any(), Mockito.eq(restateChangeLog));
verify(_mockProducer, times(1)).produceMetadataAuditEvent(Mockito.eq(entityUrn),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
verifyNoMoreInteractions(_mockProducer);
}
@Test
public void testReingestLineageProposal() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:looker,sample_dataset,PROD)");
List<Pair<String, RecordTemplate>> pairToIngest = new ArrayList<>();
final UpstreamLineage upstreamLineage = AspectGenerationUtils.createUpstreamLineage();
String aspectName1 = AspectGenerationUtils.getAspectName(upstreamLineage);
SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata();
MetadataChangeProposal mcp1 = new MetadataChangeProposal();
mcp1.setEntityType(entityUrn.getEntityType());
GenericAspect genericAspect = GenericRecordUtils.serializeAspect(upstreamLineage);
mcp1.setAspect(genericAspect);
mcp1.setEntityUrn(entityUrn);
mcp1.setChangeType(ChangeType.UPSERT);
mcp1.setSystemMetadata(metadata1);
mcp1.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME);
_entityService.ingestProposal(mcp1, TEST_AUDIT_STAMP, false);
final MetadataChangeLog initialChangeLog = new MetadataChangeLog();
initialChangeLog.setEntityType(entityUrn.getEntityType());
initialChangeLog.setEntityUrn(entityUrn);
initialChangeLog.setChangeType(ChangeType.UPSERT);
initialChangeLog.setAspectName(aspectName1);
initialChangeLog.setCreated(TEST_AUDIT_STAMP);
initialChangeLog.setAspect(genericAspect);
initialChangeLog.setSystemMetadata(metadata1);
final MetadataChangeLog restateChangeLog = new MetadataChangeLog();
restateChangeLog.setEntityType(entityUrn.getEntityType());
restateChangeLog.setEntityUrn(entityUrn);
restateChangeLog.setChangeType(ChangeType.RESTATE);
restateChangeLog.setAspectName(aspectName1);
restateChangeLog.setCreated(TEST_AUDIT_STAMP);
restateChangeLog.setAspect(genericAspect);
restateChangeLog.setSystemMetadata(metadata1);
restateChangeLog.setPreviousAspectValue(genericAspect);
restateChangeLog.setPreviousSystemMetadata(simulatePullFromDB(metadata1, SystemMetadata.class));
Map<String, RecordTemplate> latestAspects = _entityService.getLatestAspectsForUrn(
entityUrn,
new HashSet<>(List.of(aspectName1))
);
assertTrue(DataTemplateUtil.areEqual(upstreamLineage, latestAspects.get(aspectName1)));
verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn),
Mockito.any(), Mockito.eq(initialChangeLog));
// Mockito detects the previous invocation and throws an error in verifying the second call unless invocations are cleared
clearInvocations(_mockProducer);
_entityService.ingestProposal(mcp1, TEST_AUDIT_STAMP, false);
verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn),
Mockito.any(), Mockito.eq(restateChangeLog));
verifyNoMoreInteractions(_mockProducer);
}
@Test
public void testIngestTimeseriesAspect() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:foo,bar,PROD)");
@ -1022,6 +1151,42 @@ abstract public class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
return RecordUtils.toRecordTemplate(clazz, objectMapper.writeValueAsString(aspect));
}
@Test
public void testRestoreIndices() throws Exception {
String urnStr = "urn:li:dataset:(urn:li:dataPlatform:looker,sample_dataset_unique,PROD)";
Urn entityUrn = UrnUtils.getUrn(urnStr);
List<Pair<String, RecordTemplate>> pairToIngest = new ArrayList<>();
final UpstreamLineage upstreamLineage = AspectGenerationUtils.createUpstreamLineage();
String aspectName1 = AspectGenerationUtils.getAspectName(upstreamLineage);
pairToIngest.add(getAspectRecordPair(upstreamLineage, UpstreamLineage.class));
SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata();
_entityService.ingestAspects(entityUrn, pairToIngest, TEST_AUDIT_STAMP, metadata1);
clearInvocations(_mockProducer);
RestoreIndicesArgs args = new RestoreIndicesArgs();
args.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME);
args.setBatchSize(1);
args.setStart(0);
args.setBatchDelayMs(1L);
args.setNumThreads(1);
args.setUrn(urnStr);
_entityService.restoreIndices(args, obj -> { });
ArgumentCaptor<MetadataChangeLog> mclCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class);
verify(_mockProducer, times(1)).produceMetadataChangeLog(
Mockito.eq(entityUrn), Mockito.any(), mclCaptor.capture());
MetadataChangeLog mcl = mclCaptor.getValue();
assertEquals(mcl.getEntityType(), "dataset");
assertNull(mcl.getPreviousAspectValue());
assertNull(mcl.getPreviousSystemMetadata());
assertEquals(mcl.getChangeType(), ChangeType.RESTATE);
assertEquals(mcl.getSystemMetadata().getProperties().get(FORCE_INDEXING_KEY), "true");
}
@Nonnull
protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String email) throws Exception {
CorpuserUrn corpuserUrn = CorpuserUrn.createFromUrn(entityUrn);

View File

@ -2,6 +2,7 @@ package com.linkedin.metadata.kafka.hook;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.InputField;
import com.linkedin.common.InputFields;
@ -21,6 +22,7 @@ import com.linkedin.gms.factory.timeseries.TimeseriesAspectServiceFactory;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.key.SchemaFieldKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
@ -61,6 +63,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Import;
import org.springframework.stereotype.Component;
import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.search.utils.QueryUtils.*;
// TODO: Backfill tests for this class in UpdateIndicesHookTest.java
@ -83,8 +86,20 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
private final EntityRegistry _entityRegistry;
private final SearchDocumentTransformer _searchDocumentTransformer;
@Value("${featureFlags.graphServiceDiffModeEnabled:false}")
private boolean _diffMode;
@Value("${featureFlags.graphServiceDiffModeEnabled:true}")
private boolean _graphDiffMode;
@Value("${featureFlags.searchServiceDiffModeEnabled:true}")
private boolean _searchDiffMode;
@VisibleForTesting
void setGraphDiffMode(boolean graphDiffMode) {
_graphDiffMode = graphDiffMode;
}
@VisibleForTesting
void setSearchDiffMode(boolean searchDiffMode) {
_searchDiffMode = searchDiffMode;
}
@Autowired
public UpdateIndicesHook(
@ -160,11 +175,13 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
}
// Step 1. For all aspects, attempt to update Search
updateSearchService(entitySpec.getName(), urn, aspectSpec, aspect,
event.hasSystemMetadata() ? event.getSystemMetadata().getRunId() : null);
updateSearchService(entitySpec.getName(), urn, aspectSpec, aspect, event.getSystemMetadata(), previousAspect);
// Step 2. For all aspects, attempt to update Graph
if (_diffMode) {
SystemMetadata systemMetadata = event.getSystemMetadata();
if (_graphDiffMode && _graphService instanceof ElasticSearchGraphService
&& (systemMetadata == null || systemMetadata.getProperties() == null
|| !Boolean.parseBoolean(systemMetadata.getProperties().get(FORCE_INDEXING_KEY)))) {
updateGraphServiceDiff(urn, aspectSpec, previousAspect, aspect, event);
} else {
updateGraphService(urn, aspectSpec, aspect, event);
@ -402,8 +419,9 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
* Process snapshot and update search index
*/
private void updateSearchService(String entityName, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect,
@Nullable String runId) {
@Nullable SystemMetadata systemMetadata, @Nullable RecordTemplate previousAspect) {
Optional<String> searchDocument;
Optional<String> previousSearchDocument = Optional.empty();
try {
searchDocument = _searchDocumentTransformer.transformAspect(urn, aspect, aspectSpec, false);
} catch (Exception e) {
@ -421,6 +439,28 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
return;
}
String searchDocumentValue = searchDocument.get();
if (_searchDiffMode && (systemMetadata == null || systemMetadata.getProperties() == null
|| !Boolean.parseBoolean(systemMetadata.getProperties().get(FORCE_INDEXING_KEY)))) {
if (previousAspect != null) {
try {
previousSearchDocument = _searchDocumentTransformer.transformAspect(urn, previousAspect, aspectSpec, false);
} catch (Exception e) {
log.error(
"Error in getting documents from previous aspect state: {} for aspect {}, continuing without diffing.", e,
aspectSpec.getName());
}
}
if (previousSearchDocument.isPresent()) {
String previousSearchDocumentValue = previousSearchDocument.get();
if (searchDocumentValue.equals(previousSearchDocumentValue)) {
// No changes to search document, skip writing no-op update
return;
}
}
}
_entitySearchService.upsertDocument(entityName, searchDocument.get(), docId.get());
}

View File

@ -0,0 +1,131 @@
package com.linkedin.metadata.kafka.hook;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.StringMap;
import com.linkedin.dataset.DatasetLineageType;
import com.linkedin.dataset.Upstream;
import com.linkedin.dataset.UpstreamArray;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.SystemMetadata;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.kafka.hook.UpdateIndicesHookTest.*;
public class MCLProcessingTestDataGenerator {
private MCLProcessingTestDataGenerator() {
}
public static MetadataChangeLog createBaseChangeLog() throws URISyntaxException {
MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(Constants.DATASET_ENTITY_NAME);
event.setAspectName(Constants.UPSTREAM_LINEAGE_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT);
UpstreamLineage upstreamLineage = createBaseLineageAspect();
event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage));
event.setEntityUrn(Urn.createFromString(TEST_DATASET_URN));
event.setEntityType(DATASET_ENTITY_NAME);
event.setCreated(new AuditStamp().setActor(UrnUtils.getUrn(TEST_ACTOR_URN)).setTime(EVENT_TIME));
return event;
}
public static MetadataChangeLog setToRestate(MetadataChangeLog changeLog) {
return changeLog.setChangeType(ChangeType.RESTATE);
}
public static MetadataChangeLog setToUpsert(MetadataChangeLog changeLog) {
return changeLog.setChangeType(ChangeType.UPSERT);
}
public static MetadataChangeLog setSystemMetadata(MetadataChangeLog changeLog) {
SystemMetadata systemMetadata = new SystemMetadata();
systemMetadata.setRunId(RUN_ID_1);
systemMetadata.setLastObserved(LAST_OBSERVED_1);
return changeLog.setSystemMetadata(systemMetadata);
}
public static MetadataChangeLog setSystemMetadataWithForceIndexing(MetadataChangeLog changeLog) {
SystemMetadata systemMetadata = new SystemMetadata();
systemMetadata.setRunId(RUN_ID_1);
systemMetadata.setLastObserved(LAST_OBSERVED_1);
StringMap stringMap = new StringMap();
stringMap.put(FORCE_INDEXING_KEY, Boolean.TRUE.toString());
systemMetadata.setProperties(stringMap);
return changeLog.setSystemMetadata(systemMetadata);
}
public static MetadataChangeLog setPreviousData(MetadataChangeLog changeLog, MetadataChangeLog previousState) {
changeLog.setPreviousAspectValue(previousState.getAspect());
return changeLog.setPreviousSystemMetadata(previousState.getSystemMetadata());
}
public static MetadataChangeLog setPreviousDataToEmpty(MetadataChangeLog changeLog) {
changeLog.removePreviousAspectValue();
changeLog.removePreviousSystemMetadata();
return changeLog;
}
public static MetadataChangeLog modifySystemMetadata(MetadataChangeLog changeLog) {
SystemMetadata systemMetadata = new SystemMetadata();
systemMetadata.setRunId(RUN_ID_2);
systemMetadata.setLastObserved(LAST_OBSERVED_2);
return changeLog.setSystemMetadata(systemMetadata);
}
public static MetadataChangeLog modifySystemMetadata2(MetadataChangeLog changeLog) {
SystemMetadata systemMetadata = new SystemMetadata();
systemMetadata.setRunId(RUN_ID_3);
systemMetadata.setLastObserved(LAST_OBSERVED_3);
return changeLog.setSystemMetadata(systemMetadata);
}
public static MetadataChangeLog modifyAspect(MetadataChangeLog changeLog, UpstreamLineage upstreamLineage) {
return changeLog.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage));
}
public static UpstreamLineage createBaseLineageAspect() throws URISyntaxException {
UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreamArray = new UpstreamArray();
final Upstream upstream = new Upstream();
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(DatasetUrn.createFromString(TEST_DATASET_URN_2));
upstreamArray.add(upstream);
upstreamLineage.setUpstreams(upstreamArray);
return upstreamLineage;
}
public static UpstreamLineage addLineageEdge(UpstreamLineage upstreamLineage) throws URISyntaxException {
UpstreamArray upstreamArray = upstreamLineage.getUpstreams();
Upstream upstream = new Upstream();
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(DatasetUrn.createFromString(TEST_DATASET_URN_3));
upstreamArray.add(upstream);
return upstreamLineage.setUpstreams(upstreamArray);
}
public static UpstreamLineage modifyNonSearchableField(UpstreamLineage upstreamLineage) {
UpstreamArray upstreamArray = upstreamLineage.getUpstreams();
Upstream upstream = upstreamArray.get(0);
Map<String, String> stringMap = new HashMap<>();
stringMap.put("key", "value");
upstream.setProperties(new StringMap(stringMap));
upstreamArray.set(0, upstream);
return upstreamLineage.setUpstreams(upstreamArray);
}
}

View File

@ -24,6 +24,7 @@ import com.linkedin.metadata.config.SystemUpdateConfiguration;
import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.key.ChartKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
@ -39,6 +40,9 @@ import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.schema.SchemaField;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@ -47,23 +51,32 @@ import java.util.ArrayList;
import java.util.Collections;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static com.linkedin.metadata.kafka.hook.MCLProcessingTestDataGenerator.*;
import static com.linkedin.metadata.search.utils.QueryUtils.newRelationshipFilter;
public class UpdateIndicesHookTest {
// going to want a test where we have an upstreamLineage aspect with finegrained, check that we call _graphService.addEdge for each edge
// as well as _graphService.removeEdgesFromNode for each field and their relationships
private static final long EVENT_TIME = 123L;
private static final String TEST_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)";
private static final String TEST_CHART_URN = "urn:li:chart:(looker,dashboard_elements.1)";
private static final String TEST_ACTOR_URN = "urn:li:corpuser:test";
private static final String DOWNSTREAM_OF = "DownstreamOf";
static final long EVENT_TIME = 123L;
static final String TEST_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)";
static final String TEST_DATASET_URN_2 = "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD)";
static final String TEST_DATASET_URN_3 = "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressKafkaDataset,PROD)";
static final String TEST_CHART_URN = "urn:li:chart:(looker,dashboard_elements.1)";
static final String TEST_ACTOR_URN = "urn:li:corpuser:test";
static final String DOWNSTREAM_OF = "DownstreamOf";
static final String RUN_ID_1 = "123";
static final String RUN_ID_2 = "456";
static final String RUN_ID_3 = "789";
static final long LAST_OBSERVED_1 = 123L;
static final long LAST_OBSERVED_2 = 456L;
static final long LAST_OBSERVED_3 = 789L;
private UpdateIndicesHook _updateIndicesHook;
private GraphService _mockGraphService;
private EntitySearchService _mockEntitySearchService;
private TimeseriesAspectService _mockTimeseriesAspectService;
private SystemMetadataService _mockSystemMetadataService;
private SearchDocumentTransformer _mockSearchDocumentTransformer;
private SearchDocumentTransformer _searchDocumentTransformer;
private DataHubUpgradeKafkaListener _mockDataHubUpgradeKafkaListener;
private ConfigurationProvider _mockConfigurationProvider;
private Urn _actorUrn;
@ -73,11 +86,11 @@ public class UpdateIndicesHookTest {
_actorUrn = UrnUtils.getUrn(TEST_ACTOR_URN);
EntityRegistry registry = new ConfigEntityRegistry(
UpdateIndicesHookTest.class.getClassLoader().getResourceAsStream("test-entity-registry.yml"));
_mockGraphService = Mockito.mock(GraphService.class);
_mockGraphService = Mockito.mock(ElasticSearchGraphService.class);
_mockEntitySearchService = Mockito.mock(EntitySearchService.class);
_mockTimeseriesAspectService = Mockito.mock(TimeseriesAspectService.class);
_mockSystemMetadataService = Mockito.mock(SystemMetadataService.class);
_mockSearchDocumentTransformer = Mockito.mock(SearchDocumentTransformer.class);
_searchDocumentTransformer = new SearchDocumentTransformer(1000, 1000);
_mockDataHubUpgradeKafkaListener = Mockito.mock(DataHubUpgradeKafkaListener.class);
_mockConfigurationProvider = Mockito.mock(ConfigurationProvider.class);
ElasticSearchConfiguration elasticSearchConfiguration = new ElasticSearchConfiguration();
@ -90,12 +103,13 @@ public class UpdateIndicesHookTest {
_mockTimeseriesAspectService,
_mockSystemMetadataService,
registry,
_mockSearchDocumentTransformer
_searchDocumentTransformer
);
}
@Test
public void testFineGrainedLineageEdgesAreAdded() throws Exception {
_updateIndicesHook.setGraphDiffMode(false);
Urn upstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)");
Urn downstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)");
MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn);
@ -112,6 +126,7 @@ public class UpdateIndicesHookTest {
@Test
public void testFineGrainedLineageEdgesAreAddedRestate() throws Exception {
_updateIndicesHook.setGraphDiffMode(false);
Urn upstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)");
Urn downstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)");
MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn, ChangeType.RESTATE);
@ -124,6 +139,10 @@ public class UpdateIndicesHookTest {
Mockito.eq(new ArrayList<>(Collections.singleton(DOWNSTREAM_OF))),
Mockito.eq(newRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING))
);
Mockito.verify(_mockEntitySearchService, Mockito.times(1))
.upsertDocument(Mockito.eq(DATASET_ENTITY_NAME), Mockito.any(),
Mockito.eq(URLEncoder.encode("urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)",
StandardCharsets.UTF_8)));
}
@Test
@ -138,7 +157,7 @@ public class UpdateIndicesHookTest {
_mockTimeseriesAspectService,
_mockSystemMetadataService,
mockEntityRegistry,
_mockSearchDocumentTransformer
_searchDocumentTransformer
);
_updateIndicesHook.invoke(event);
@ -154,15 +173,194 @@ public class UpdateIndicesHookTest {
);
}
@Test
public void testMCLProcessExhaustive() throws URISyntaxException {
_updateIndicesHook.setGraphDiffMode(true);
_updateIndicesHook.setSearchDiffMode(true);
/*
* newLineage
*/
MetadataChangeLog changeLog = createBaseChangeLog();
_updateIndicesHook.invoke(changeLog);
// One new edge added
Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.any());
// Update document
Mockito.verify(_mockEntitySearchService, Mockito.times(1))
.upsertDocument(Mockito.eq(DATASET_ENTITY_NAME), Mockito.any(),
Mockito.eq(URLEncoder.encode(TEST_DATASET_URN, StandardCharsets.UTF_8)));
/*
* restateNewLineage
*/
Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService);
changeLog = setSystemMetadata(changeLog);
changeLog = setPreviousData(setToRestate(changeLog), changeLog);
_updateIndicesHook.invoke(changeLog);
// No edges added
Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any());
// Timestamp updated
Mockito.verify(_mockGraphService, Mockito.times(1)).upsertEdge(Mockito.any());
// No document change
Mockito.verify(_mockEntitySearchService, Mockito.times(0))
.upsertDocument(Mockito.any(), Mockito.any(), Mockito.any());
/*
* addLineage
*/
Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService);
changeLog = setPreviousData(setToUpsert(changeLog), changeLog);
UpstreamLineage currentLineage = addLineageEdge(createBaseLineageAspect());
changeLog = modifyAspect(changeLog, currentLineage);
_updateIndicesHook.invoke(changeLog);
// New edge added
Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.any());
// Update timestamp of old edge
Mockito.verify(_mockGraphService, Mockito.times(1)).upsertEdge(Mockito.any());
// Document update for new upstream
Mockito.verify(_mockEntitySearchService, Mockito.times(1))
.upsertDocument(Mockito.any(), Mockito.any(), Mockito.any());
/*
* restateAddLineage
*/
Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService);
changeLog = setPreviousData(setToRestate(changeLog), changeLog);
_updateIndicesHook.invoke(changeLog);
// No new edges
Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any());
// Update timestamps of old edges
Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any());
// No document update
Mockito.verify(_mockEntitySearchService, Mockito.times(0))
.upsertDocument(Mockito.any(), Mockito.any(), Mockito.any());
/*
* noOpUpsert
*/
Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService);
changeLog = setPreviousData(setToUpsert(changeLog), changeLog);
_updateIndicesHook.invoke(changeLog);
// No new edges
Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any());
// Update timestamps of old edges
Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any());
// No document update
Mockito.verify(_mockEntitySearchService, Mockito.times(0))
.upsertDocument(Mockito.any(), Mockito.any(), Mockito.any());
/*
* restateNoOp
*/
Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService);
changeLog = setPreviousData(setToRestate(changeLog), changeLog);
_updateIndicesHook.invoke(changeLog);
// No new edges
Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any());
// Update timestamps of old edges
Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any());
// No document update
Mockito.verify(_mockEntitySearchService, Mockito.times(0))
.upsertDocument(Mockito.any(), Mockito.any(), Mockito.any());
/*
* systemMetadataChange
*/
Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService);
changeLog = setPreviousData(setToUpsert(changeLog), changeLog);
changeLog = modifySystemMetadata(changeLog);
_updateIndicesHook.invoke(changeLog);
// No new edges
Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any());
// Update timestamps of old edges
Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any());
// No document update
Mockito.verify(_mockEntitySearchService, Mockito.times(0))
.upsertDocument(Mockito.any(), Mockito.any(), Mockito.any());
/*
* restateSystemMetadataChange
*/
Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService);
changeLog = setPreviousData(setToRestate(changeLog), changeLog);
changeLog = modifySystemMetadata2(changeLog);
_updateIndicesHook.invoke(changeLog);
// No new edges
Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any());
// Update timestamps of old edges
Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any());
// No document update
Mockito.verify(_mockEntitySearchService, Mockito.times(0))
.upsertDocument(Mockito.any(), Mockito.any(), Mockito.any());
/*
* modifyNonSearchableField
*/
Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService);
changeLog = setPreviousData(setToUpsert(changeLog), changeLog);
currentLineage = modifyNonSearchableField(currentLineage);
changeLog = modifyAspect(changeLog, currentLineage);
_updateIndicesHook.invoke(changeLog);
// No new edges
Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any());
// Update timestamps of old edges
Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any());
// No document update
Mockito.verify(_mockEntitySearchService, Mockito.times(0))
.upsertDocument(Mockito.any(), Mockito.any(), Mockito.any());
/*
* force reindexing
*/
Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService);
changeLog = setPreviousDataToEmpty(setToRestate(changeLog));
changeLog = setSystemMetadataWithForceIndexing(changeLog);
_updateIndicesHook.invoke(changeLog);
// Forced removal of all edges
Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode(Mockito.any(),
Mockito.any(), Mockito.any());
// Forced add of edges
Mockito.verify(_mockGraphService, Mockito.times(2)).addEdge(Mockito.any());
// Forced document update
Mockito.verify(_mockEntitySearchService, Mockito.times(1))
.upsertDocument(Mockito.any(), Mockito.any(), Mockito.any());
}
private EntityRegistry createMockEntityRegistry() {
// need to mock this registry instead of using test-entity-registry.yml because inputFields does not work due to a known bug
EntityRegistry mockEntityRegistry = Mockito.mock(EntityRegistry.class);
EntitySpec entitySpec = Mockito.mock(EntitySpec.class);
AspectSpec aspectSpec = createMockAspectSpec(InputFields.class, InputFields.dataSchema());
AspectSpec upstreamLineageAspectSpec = createMockAspectSpec(UpstreamLineage.class, UpstreamLineage.dataSchema());
Mockito.when(mockEntityRegistry.getEntitySpec(Constants.CHART_ENTITY_NAME)).thenReturn(entitySpec);
Mockito.when(mockEntityRegistry.getEntitySpec(Constants.DATASET_ENTITY_NAME)).thenReturn(entitySpec);
Mockito.when(entitySpec.getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME)).thenReturn(aspectSpec);
Mockito.when(entitySpec.getAspectSpec(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)).thenReturn(upstreamLineageAspectSpec);
Mockito.when(aspectSpec.isTimeseries()).thenReturn(false);
Mockito.when(aspectSpec.getName()).thenReturn(Constants.INPUT_FIELDS_ASPECT_NAME);
Mockito.when(upstreamLineageAspectSpec.isTimeseries()).thenReturn(false);
Mockito.when(upstreamLineageAspectSpec.getName()).thenReturn(Constants.UPSTREAM_LINEAGE_ASPECT_NAME);
AspectSpec chartKeyAspectSpec = createMockAspectSpec(ChartKey.class, ChartKey.dataSchema());
Mockito.when(entitySpec.getKeyAspectSpec()).thenReturn(chartKeyAspectSpec);
return mockEntityRegistry;

View File

@ -264,6 +264,7 @@ featureFlags:
graphServiceDiffModeEnabled: ${GRAPH_SERVICE_DIFF_MODE_ENABLED:true} # Enables diff mode for graph writes, uses a different code path that produces a diff from previous to next to write relationships instead of wholesale deleting edges and reading
pointInTimeCreationEnabled: ${POINT_IN_TIME_CREATION_ENABLED:false} # Enables creation of point in time snapshots for the scroll API, only works with main line ElasticSearch releases after 7.10. OpenSearch is unsupported, plans to eventually target OpenSearch 2.4+ with a divergent client
alwaysEmitChangeLog: ${ALWAYS_EMIT_CHANGE_LOG:false} # Enables always emitting a MCL even when no changes are detected. Used for Time Based Lineage when no changes occur.
searchServiceDiffModeEnabled: ${SEARCH_SERVICE_DIFF_MODE_ENABLED:true} # Enables diff mode for search document writes, reduces amount of writes to ElasticSearch documents for no-ops
entityChangeEvents:
enabled: ${ENABLE_ENTITY_CHANGE_EVENTS_HOOK:true}