fix(graph-edge): fix graph edge delete exception (#12025)

This commit is contained in:
david-leifker 2024-12-05 07:44:20 -06:00 committed by GitHub
parent 3c388a56a5
commit 3f3f777c06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 267 additions and 38 deletions

View File

@ -75,6 +75,8 @@ jobs:
path: |
~/.cache/uv
key: ${{ runner.os }}-uv-${{ hashFiles('**/requirements.txt') }}
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Set up JDK 17
uses: actions/setup-java@v4
with:

View File

@ -14,7 +14,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/** Extracts fields from a RecordTemplate based on the appropriate {@link FieldSpec}. */
public class FieldExtractor {
@ -30,15 +30,34 @@ public class FieldExtractor {
// Extract the value of each field in the field specs from the input record
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nonnull RecordTemplate record, List<T> fieldSpecs) {
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH);
@Nullable RecordTemplate record, List<T> fieldSpecs) {
return extractFields(record, fieldSpecs, false);
}
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nonnull RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
@Nullable RecordTemplate record, List<T> fieldSpecs, boolean requiredFieldExtract) {
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH, requiredFieldExtract);
}
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nullable RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
return extractFields(record, fieldSpecs, maxValueLength, false);
}
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nullable RecordTemplate record,
List<T> fieldSpecs,
int maxValueLength,
boolean requiredFieldExtract) {
final Map<T, List<Object>> extractedFields = new HashMap<>();
for (T fieldSpec : fieldSpecs) {
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
if (requiredFieldExtract && record == null) {
throw new IllegalArgumentException(
"Field extraction is required and the RecordTemplate is null");
}
Optional<Object> value =
Optional.ofNullable(record)
.flatMap(maybeRecord -> RecordUtils.getFieldValue(maybeRecord, fieldSpec.getPath()));
if (!value.isPresent()) {
extractedFields.put(fieldSpec, Collections.emptyList());
} else {

View File

@ -190,7 +190,10 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
urn.getEntityType(), event.getAspectName()));
}
RecordTemplate aspect = event.getRecordTemplate();
final RecordTemplate aspect =
event.getPreviousRecordTemplate() != null
? event.getPreviousRecordTemplate()
: event.getRecordTemplate();
Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName());
if (!aspectSpec.isTimeseries()) {
@ -280,8 +283,8 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
@Nonnull final RecordTemplate aspect,
@Nonnull final MetadataChangeLog event,
final boolean isNewAspectVersion) {
final List<Edge> edgesToAdd = new ArrayList<>();
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = new HashMap<>();
final List<Edge> edges = new ArrayList<>();
final HashMap<Urn, Set<String>> urnToRelationshipTypes = new HashMap<>();
// we need to manually set schemaField <-> schemaField edges for fineGrainedLineage and
// inputFields
@ -289,36 +292,28 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) {
UpstreamLineage upstreamLineage = new UpstreamLineage(aspect.data());
updateFineGrainedEdgesAndRelationships(
urn,
upstreamLineage.getFineGrainedLineages(),
edgesToAdd,
urnToRelationshipTypesBeingAdded);
urn, upstreamLineage.getFineGrainedLineages(), edges, urnToRelationshipTypes);
} else if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) {
final InputFields inputFields = new InputFields(aspect.data());
updateInputFieldEdgesAndRelationships(
urn, inputFields, edgesToAdd, urnToRelationshipTypesBeingAdded);
updateInputFieldEdgesAndRelationships(urn, inputFields, edges, urnToRelationshipTypes);
} else if (aspectSpec.getName().equals(Constants.DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)) {
DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(aspect.data());
updateFineGrainedEdgesAndRelationships(
urn,
dataJobInputOutput.getFineGrainedLineages(),
edgesToAdd,
urnToRelationshipTypesBeingAdded);
urn, dataJobInputOutput.getFineGrainedLineages(), edges, urnToRelationshipTypes);
}
Map<RelationshipFieldSpec, List<Object>> extractedFields =
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs());
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs(), true);
for (Map.Entry<RelationshipFieldSpec, List<Object>> entry : extractedFields.entrySet()) {
Set<String> relationshipTypes =
urnToRelationshipTypesBeingAdded.getOrDefault(urn, new HashSet<>());
Set<String> relationshipTypes = urnToRelationshipTypes.getOrDefault(urn, new HashSet<>());
relationshipTypes.add(entry.getKey().getRelationshipName());
urnToRelationshipTypesBeingAdded.put(urn, relationshipTypes);
urnToRelationshipTypes.put(urn, relationshipTypes);
final List<Edge> newEdges =
GraphIndexUtils.extractGraphEdges(entry, aspect, urn, event, isNewAspectVersion);
edgesToAdd.addAll(newEdges);
edges.addAll(newEdges);
}
return Pair.of(edgesToAdd, urnToRelationshipTypesBeingAdded);
return Pair.of(edges, urnToRelationshipTypes);
}
/** Process snapshot and update graph index */
@ -433,7 +428,7 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
@Nonnull final OperationContext opContext,
@Nonnull final Urn urn,
@Nonnull final AspectSpec aspectSpec,
@Nonnull final RecordTemplate aspect,
@Nullable final RecordTemplate aspect,
@Nonnull final Boolean isKeyAspect,
@Nonnull final MetadataChangeLog event) {
if (isKeyAspect) {
@ -441,21 +436,28 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
return;
}
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);
if (aspect != null) {
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
edgeAndRelationTypes.getSecond();
if (!urnToRelationshipTypesBeingAdded.isEmpty()) {
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
graphService.removeEdgesFromNode(
opContext,
entry.getKey(),
new ArrayList<>(entry.getValue()),
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING));
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingRemoved =
edgeAndRelationTypes.getSecond();
if (!urnToRelationshipTypesBeingRemoved.isEmpty()) {
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingRemoved.entrySet()) {
graphService.removeEdgesFromNode(
opContext,
entry.getKey(),
new ArrayList<>(entry.getValue()),
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING));
}
}
} else {
log.warn(
"Insufficient information to perform graph delete. Missing deleted aspect {} for entity {}",
aspectSpec.getName(),
urn);
}
}
}

View File

@ -1,6 +1,11 @@
package com.linkedin.metadata.service;
import static com.linkedin.metadata.Constants.CONTAINER_ENTITY_NAME;
import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter;
import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
@ -8,9 +13,11 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.container.Container;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
@ -21,6 +28,14 @@ import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.utils.GenericRecordUtils;
@ -29,6 +44,8 @@ import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.MetadataChangeLog;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.util.List;
import javax.annotation.Nonnull;
import org.mockito.ArgumentCaptor;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.script.Script;
@ -180,4 +197,109 @@ public class UpdateGraphIndicesServiceTest {
verifyNoInteractions(mockWriteDAO);
}
@Test
public void testMissingAspectGraphDelete() {
// Test deleting a null aspect
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(TEST_URN.getEntityType())
.setEntityUrn(TEST_URN)
.setAspectName(Constants.CONTAINER_ASPECT_NAME));
// For missing aspects, verify no writes
verifyNoInteractions(mockWriteDAO);
}
@Test
public void testNodeGraphDelete() {
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");
// Test deleting container entity
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(CONTAINER_ENTITY_NAME)
.setEntityUrn(containerUrn)
.setAspectName(Constants.CONTAINER_KEY_ASPECT_NAME));
// Delete all outgoing edges of this entity
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(containerUrn)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING)));
// Delete all incoming edges of this entity
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(containerUrn)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)));
// Delete all edges where this entity is a lifecycle owner
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)),
eq(containerUrn.toString()));
}
@Test
public void testContainerDelete() {
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");
// Test deleting a container aspect
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(TEST_URN.getEntityType())
.setEntityUrn(TEST_URN)
.setAspectName(Constants.CONTAINER_ASPECT_NAME)
.setPreviousAspectValue(
GenericRecordUtils.serializeAspect(new Container().setContainer(containerUrn))));
// For container aspects, verify that only edges are removed in both cases
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(TEST_URN)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of("IsPartOf")),
eq(
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING)));
}
private static Filter createUrnFilter(@Nonnull final Urn urn) {
Filter filter = new Filter();
CriterionArray criterionArray = new CriterionArray();
Criterion criterion = buildCriterion("urn", Condition.EQUAL, urn.toString());
criterionArray.add(criterion);
filter.setOr(
new ConjunctiveCriterionArray(
ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray))));
return filter;
}
}

View File

@ -0,0 +1,84 @@
package com.linkedin.metadata.service;
import static com.linkedin.metadata.Constants.CONTAINER_ASPECT_NAME;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.MetadataChangeLog;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class UpdateIndicesServiceTest {
@Mock private UpdateGraphIndicesService updateGraphIndicesService;
@Mock private EntitySearchService entitySearchService;
@Mock private TimeseriesAspectService timeseriesAspectService;
@Mock private SystemMetadataService systemMetadataService;
@Mock private SearchDocumentTransformer searchDocumentTransformer;
@Mock private EntityIndexBuilders entityIndexBuilders;
private OperationContext operationContext;
private UpdateIndicesService updateIndicesService;
@BeforeMethod
public void setup() {
MockitoAnnotations.openMocks(this);
operationContext = TestOperationContexts.systemContextNoSearchAuthorization();
updateIndicesService =
new UpdateIndicesService(
updateGraphIndicesService,
entitySearchService,
timeseriesAspectService,
systemMetadataService,
searchDocumentTransformer,
entityIndexBuilders,
"MD5");
}
@Test
public void testContainerHandleDeleteEvent() throws Exception {
Urn urn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)");
EntitySpec entitySpec = operationContext.getEntityRegistry().getEntitySpec(DATASET_ENTITY_NAME);
AspectSpec aspectSpec = entitySpec.getAspectSpec(CONTAINER_ASPECT_NAME);
// Create test data
MetadataChangeLog event = new MetadataChangeLog();
event.setChangeType(ChangeType.DELETE);
event.setEntityUrn(urn);
event.setAspectName(CONTAINER_ASPECT_NAME);
event.setEntityType(urn.getEntityType());
event.setSystemMetadata(SystemMetadataUtils.createDefaultSystemMetadata());
// Execute Delete
updateIndicesService.handleChangeEvent(operationContext, event);
// Verify
verify(systemMetadataService).deleteAspect(urn.toString(), CONTAINER_ASPECT_NAME);
verify(searchDocumentTransformer)
.transformAspect(
eq(operationContext),
eq(urn),
nullable(RecordTemplate.class),
eq(aspectSpec),
eq(true));
verify(updateGraphIndicesService).handleChangeEvent(operationContext, event);
}
}