feat(lineage): restrict the creation of finegrained lineage in graphstore (#14199)

This commit is contained in:
Deepak Garg 2025-07-25 20:35:52 +05:30 committed by GitHub
parent 3cf7e6353f
commit f13f0fd83c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 602 additions and 67 deletions

View File

@ -1,7 +1,9 @@
package com.linkedin.metadata.service;
import static com.linkedin.metadata.Constants.FORCE_INDEXING_KEY;
import static com.linkedin.metadata.Constants.SCHEMA_FIELD_ENTITY_NAME;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME;
import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter;
import static com.linkedin.metadata.search.utils.QueryUtils.newRelationshipFilter;
@ -25,13 +27,17 @@ import com.linkedin.metadata.entity.SearchIndicesService;
import com.linkedin.metadata.entity.ebean.batch.MCLItemImpl;
import com.linkedin.metadata.graph.GraphIndexUtils;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.key.DataPlatformKey;
import com.linkedin.metadata.key.DatasetKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.RelationshipFieldSpec;
import com.linkedin.metadata.models.extractor.FieldExtractor;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.SchemaFieldUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.SystemMetadata;
@ -44,6 +50,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
@ -51,6 +58,7 @@ import javax.annotation.Nullable;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
@Slf4j
public class UpdateGraphIndicesService implements SearchIndicesService {
@ -58,6 +66,8 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
private static final String GRAPH_DIFF_MODE_REMOVE_METRIC = "diff_remove_edge";
private static final String GRAPH_DIFF_MODE_ADD_METRIC = "diff_add_edge";
private static final String GRAPH_DIFF_MODE_UPDATE_METRIC = "diff_update_edge";
private List<String> fineGrainedLineageNotAllowedForPlatforms;
private final String FINE_GRAINED_LINEAGE_PATH = "/fineGrainedLineages/*/upstreams/*";
public static UpdateGraphIndicesService withService(GraphService graphService) {
return new UpdateGraphIndicesService(graphService);
@ -69,6 +79,11 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
@Getter @Setter @VisibleForTesting private boolean graphDiffMode;
@VisibleForTesting
public void setFineGrainedLineageNotAllowedForPlatforms(List<String> platforms) {
this.fineGrainedLineageNotAllowedForPlatforms = platforms;
}
private static final Set<ChangeType> UPDATE_CHANGE_TYPES =
ImmutableSet.of(
ChangeType.CREATE,
@ -78,14 +93,18 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
ChangeType.PATCH);
public UpdateGraphIndicesService(GraphService graphService) {
this(graphService, true, true);
this(graphService, true, true, Collections.emptyList());
}
public UpdateGraphIndicesService(
GraphService graphService, boolean graphDiffMode, boolean graphStatusEnabled) {
GraphService graphService,
boolean graphDiffMode,
boolean graphStatusEnabled,
List<String> fineGrainedLineageNotAllowedForPlatforms) {
this.graphService = graphService;
this.graphDiffMode = graphDiffMode;
this.graphStatusEnabled = graphStatusEnabled;
this.fineGrainedLineageNotAllowedForPlatforms = fineGrainedLineageNotAllowedForPlatforms;
}
@Override
@ -204,7 +223,8 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
Urn entity,
FineGrainedLineageArray fineGrainedLineageArray,
List<Edge> edgesToAdd,
HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded) {
HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded,
final EntityRegistry entityRegistry) {
if (fineGrainedLineageArray != null) {
for (FineGrainedLineage fineGrainedLineage : fineGrainedLineageArray) {
if (!fineGrainedLineage.hasDownstreams() || !fineGrainedLineage.hasUpstreams()) {
@ -219,6 +239,17 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
for (Urn downstream : fineGrainedLineage.getDownstreams()) {
for (Urn upstream : fineGrainedLineage.getUpstreams()) {
// TODO: add edges uniformly across aspects
// restrict the creation of schemafield nodes and their relationships
// especially for platforms like hdfs
if (isFineGrainedLineageNotAllowedForPlatforms(downstream, upstream, entityRegistry)) {
log.debug(
"Skipping fine grained lineage for downstream {} and upstream {}",
downstream,
upstream);
continue;
}
edgesToAdd.add(
new Edge(
downstream,
@ -279,29 +310,42 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
@Nonnull final AspectSpec aspectSpec,
@Nonnull final RecordTemplate aspect,
@Nonnull final MetadataChangeLog event,
final boolean isNewAspectVersion) {
final boolean isNewAspectVersion,
final EntityRegistry entityRegistry) {
final List<Edge> edges = new ArrayList<>();
final HashMap<Urn, Set<String>> urnToRelationshipTypes = new HashMap<>();
// we need to manually set schemaField <-> schemaField edges for fineGrainedLineage and
// inputFields
// since @Relationship only links between the parent entity urn and something else.
if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) {
if (aspectSpec.getName().equals(UPSTREAM_LINEAGE_ASPECT_NAME)) {
UpstreamLineage upstreamLineage = new UpstreamLineage(aspect.data());
updateFineGrainedEdgesAndRelationships(
urn, upstreamLineage.getFineGrainedLineages(), edges, urnToRelationshipTypes);
urn,
upstreamLineage.getFineGrainedLineages(),
edges,
urnToRelationshipTypes,
entityRegistry);
} else if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) {
final InputFields inputFields = new InputFields(aspect.data());
updateInputFieldEdgesAndRelationships(urn, inputFields, edges, urnToRelationshipTypes);
} else if (aspectSpec.getName().equals(Constants.DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)) {
DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(aspect.data());
updateFineGrainedEdgesAndRelationships(
urn, dataJobInputOutput.getFineGrainedLineages(), edges, urnToRelationshipTypes);
urn,
dataJobInputOutput.getFineGrainedLineages(),
edges,
urnToRelationshipTypes,
entityRegistry);
}
Map<RelationshipFieldSpec, List<Object>> extractedFields =
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs(), true);
// restrict the creation of schema field nodes and their relationships especially for
// platforms like hdfs
removeFineGrainedLineageForNotAllowedPlatforms(extractedFields, aspectSpec, entityRegistry);
for (Map.Entry<RelationshipFieldSpec, List<Object>> entry : extractedFields.entrySet()) {
Set<String> relationshipTypes = urnToRelationshipTypes.getOrDefault(urn, new HashSet<>());
relationshipTypes.add(entry.getKey().getRelationshipName());
@ -321,7 +365,8 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
@Nonnull final RecordTemplate aspect,
@Nonnull final MetadataChangeLog event) {
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);
getEdgesAndRelationshipTypesFromAspect(
urn, aspectSpec, aspect, event, true, opContext.getEntityRegistry());
final List<Edge> edgesToAdd = edgeAndRelationTypes.getFirst();
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
@ -352,7 +397,8 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
Pair<List<Edge>, HashMap<Urn, Set<String>>> oldEdgeAndRelationTypes = null;
if (oldAspect != null) {
oldEdgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, oldAspect, event, false);
getEdgesAndRelationshipTypesFromAspect(
urn, aspectSpec, oldAspect, event, false, opContext.getEntityRegistry());
}
final List<Edge> oldEdges =
@ -362,7 +408,8 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
final Set<Edge> oldEdgeSet = new HashSet<>(oldEdges);
Pair<List<Edge>, HashMap<Urn, Set<String>>> newEdgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, newAspect, event, true);
getEdgesAndRelationshipTypesFromAspect(
urn, aspectSpec, newAspect, event, true, opContext.getEntityRegistry());
final List<Edge> newEdges = newEdgeAndRelationTypes.getFirst();
final Set<Edge> newEdgeSet = new HashSet<>(newEdges);
@ -451,7 +498,8 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
if (aspect != null) {
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);
getEdgesAndRelationshipTypesFromAspect(
urn, aspectSpec, aspect, event, true, opContext.getEntityRegistry());
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingRemoved =
edgeAndRelationTypes.getSecond();
@ -473,4 +521,55 @@ public class UpdateGraphIndicesService implements SearchIndicesService {
urn);
}
}
private void removeFineGrainedLineageForNotAllowedPlatforms(
Map<RelationshipFieldSpec, List<Object>> extractedFields,
AspectSpec aspectSpec,
EntityRegistry entityRegistry) {
if (!aspectSpec.getName().equals(UPSTREAM_LINEAGE_ASPECT_NAME)) {
return;
}
RelationshipFieldSpec fineGrainedLineageFieldSpec =
aspectSpec.getRelationshipFieldSpecMap().get(FINE_GRAINED_LINEAGE_PATH);
List<Object> fineGrainedLineageUrnsList = extractedFields.get(fineGrainedLineageFieldSpec);
fineGrainedLineageUrnsList.removeIf(
fineGrainedLineageUrn -> {
if (fineGrainedLineageUrn instanceof Urn) {
Urn upstreamSchemaFieldUrn = (Urn) fineGrainedLineageUrn;
return isFineGrainedLineageNotAllowedForPlatforms(
null, upstreamSchemaFieldUrn, entityRegistry);
}
return false;
});
}
private boolean isFineGrainedLineageNotAllowedForPlatforms(
Urn downstream, Urn upstream, EntityRegistry entityRegistry) {
return !CollectionUtils.isEmpty(fineGrainedLineageNotAllowedForPlatforms)
&& ((Objects.nonNull(downstream)
&& downstream.getEntityType().equals(SCHEMA_FIELD_ENTITY_NAME)
&& fineGrainedLineageNotAllowedForPlatforms.contains(
getDatasetPlatformName(entityRegistry, downstream.getIdAsUrn())))
|| (Objects.nonNull(upstream)
&& upstream.getEntityType().equals(SCHEMA_FIELD_ENTITY_NAME)
&& fineGrainedLineageNotAllowedForPlatforms.contains(
getDatasetPlatformName(entityRegistry, upstream.getIdAsUrn()))));
}
private String getDatasetPlatformName(EntityRegistry entityRegistry, Urn datasetUrn) {
DatasetKey dsKey =
(DatasetKey)
EntityKeyUtils.convertUrnToEntityKey(
datasetUrn,
entityRegistry.getEntitySpec(datasetUrn.getEntityType()).getKeyAspectSpec());
DataPlatformKey dpKey =
(DataPlatformKey)
EntityKeyUtils.convertUrnToEntityKey(
dsKey.getPlatform(),
entityRegistry
.getEntitySpec(dsKey.getPlatform().getEntityType())
.getKeyAspectSpec());
return dpKey.getPlatformName();
}
}

View File

@ -62,6 +62,8 @@ import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Value;
@ -80,6 +82,13 @@ public class UpdateIndicesHookTest {
"urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD)";
static final String TEST_DATASET_URN_3 =
"urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressKafkaDataset,PROD)";
static final String TEST_DATASET_URN_4 =
"urn:li:dataset:(urn:li:dataPlatform:hdfs,CypressHdfsDataset,PROD)";
static final String TEST_SCHEMA_FIELD_HDFS_FIELD_INFO =
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),field_foo)";
static final String TEST_SCHEMA_FIELD_HIVE_FIELD_INFO =
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHdfsDataset,PROD),field_foo)";
static final String TEST_HDFS_PLATFORM = "hdfs";
static final String TEST_CHART_URN = "urn:li:chart:(looker,dashboard_elements.1)";
static final String TEST_ACTOR_URN = "urn:li:corpuser:test";
static final String DOWNSTREAM_OF = "DownstreamOf";
@ -140,15 +149,12 @@ public class UpdateIndicesHookTest {
@Test
public void testFineGrainedLineageEdgesAreAdded() throws Exception {
updateIndicesService.getUpdateGraphIndicesService().setGraphDiffMode(false);
Urn upstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)");
Urn downstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)");
Urn lifeCycleOwner =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)");
MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn);
Urn upstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HDFS_FIELD_INFO);
Urn downstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HIVE_FIELD_INFO);
Urn lifeCycleOwner = UrnUtils.getUrn(TEST_DATASET_URN);
MetadataChangeLog event =
createUpstreamLineageMCL(
List.of(upstreamUrn), downstreamUrn, List.of(TEST_DATASET_URN_2), TEST_DATASET_URN);
updateIndicesHook.invoke(event);
Edge edge =
@ -178,16 +184,16 @@ public class UpdateIndicesHookTest {
@Test
public void testFineGrainedLineageEdgesAreAddedRestate() throws Exception {
updateIndicesService.getUpdateGraphIndicesService().setGraphDiffMode(false);
Urn upstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)");
Urn downstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)");
Urn lifeCycleOwner =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)");
Urn upstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HDFS_FIELD_INFO);
Urn downstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HIVE_FIELD_INFO);
Urn lifeCycleOwner = UrnUtils.getUrn(TEST_DATASET_URN);
MetadataChangeLog event =
createUpstreamLineageMCL(upstreamUrn, downstreamUrn, ChangeType.RESTATE);
createUpstreamLineageMCL(
List.of(upstreamUrn),
downstreamUrn,
ChangeType.RESTATE,
List.of(TEST_DATASET_URN_2),
TEST_DATASET_URN);
updateIndicesHook.invoke(event);
Edge edge =
@ -217,10 +223,7 @@ public class UpdateIndicesHookTest {
any(OperationContext.class),
Mockito.eq(DATASET_ENTITY_NAME),
Mockito.any(),
Mockito.eq(
URLEncoder.encode(
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)",
StandardCharsets.UTF_8)));
Mockito.eq(URLEncoder.encode(TEST_DATASET_URN, StandardCharsets.UTF_8)));
}
@Test
@ -233,7 +236,7 @@ public class UpdateIndicesHookTest {
EntityRegistry mockEntityRegistry = createMockEntityRegistry();
updateIndicesService =
new UpdateIndicesService(
new UpdateGraphIndicesService(mockGraphService, false, true),
new UpdateGraphIndicesService(mockGraphService, false, true, Collections.emptyList()),
mockEntitySearchService,
mockTimeseriesAspectService,
mockSystemMetadataService,
@ -438,15 +441,16 @@ public class UpdateIndicesHookTest {
@Test
public void testMCLUIPreProcessed() throws Exception {
Urn upstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)");
Urn downstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)");
Urn upstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HDFS_FIELD_INFO);
Urn downstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HIVE_FIELD_INFO);
MetadataChangeLog changeLog =
createUpstreamLineageMCLUIPreProcessed(upstreamUrn, downstreamUrn, ChangeType.UPSERT);
createUpstreamLineageMCLUIPreProcessed(
List.of(upstreamUrn),
downstreamUrn,
ChangeType.UPSERT,
List.of(TEST_DATASET_URN_2),
TEST_DATASET_URN);
updateIndicesHook.invoke(changeLog);
Mockito.verifyNoInteractions(
mockEntitySearchService,
@ -457,21 +461,385 @@ public class UpdateIndicesHookTest {
@Test
public void testMCLUIPreProcessedReprocess() throws Exception {
Urn upstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info2)");
Urn downstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo2)");
Urn upstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HDFS_FIELD_INFO);
Urn downstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HIVE_FIELD_INFO);
MetadataChangeLog changeLog =
createUpstreamLineageMCLUIPreProcessed(upstreamUrn, downstreamUrn, ChangeType.UPSERT);
createUpstreamLineageMCLUIPreProcessed(
List.of(upstreamUrn),
downstreamUrn,
ChangeType.UPSERT,
List.of(TEST_DATASET_URN_2),
TEST_DATASET_URN);
reprocessUIHook.invoke(changeLog);
Mockito.verify(mockGraphService, Mockito.times(3)).addEdge(Mockito.any());
Mockito.verify(mockEntitySearchService, Mockito.times(1))
.upsertDocument(any(OperationContext.class), Mockito.any(), Mockito.any(), Mockito.any());
}
@Test
public void testFineGrainedLineageNotAllowed_When_downstream_type_hdfs() throws Exception {
updateIndicesService.getUpdateGraphIndicesService().setGraphDiffMode(false);
updateIndicesService
.getUpdateGraphIndicesService()
.setFineGrainedLineageNotAllowedForPlatforms(List.of(TEST_HDFS_PLATFORM));
Urn upstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HIVE_FIELD_INFO);
Urn downstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HDFS_FIELD_INFO);
Urn lifeCycleOwner = UrnUtils.getUrn(TEST_DATASET_URN_2);
MetadataChangeLog event =
createUpstreamLineageMCL(
List.of(upstreamUrn), downstreamUrn, List.of(TEST_DATASET_URN), TEST_DATASET_URN_2);
updateIndicesHook.invoke(event);
Edge edge =
new Edge(
downstreamUrn,
upstreamUrn,
DOWNSTREAM_OF,
null,
null,
null,
null,
null,
lifeCycleOwner,
null);
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge));
Mockito.verify(mockGraphService, Mockito.times(0))
.removeEdgesFromNode(
any(OperationContext.class),
Mockito.eq(downstreamUrn),
Mockito.eq(Set.of(DOWNSTREAM_OF)),
Mockito.eq(
newRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING)));
}
@Test
public void testFineGrainedLineageNotAllowed_When_upstream_type_hdfs() throws Exception {
updateIndicesService.getUpdateGraphIndicesService().setGraphDiffMode(false);
updateIndicesService
.getUpdateGraphIndicesService()
.setFineGrainedLineageNotAllowedForPlatforms(List.of(TEST_HDFS_PLATFORM));
Urn upstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HDFS_FIELD_INFO);
Urn downstreamUrn = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HIVE_FIELD_INFO);
Urn lifeCycleOwner = UrnUtils.getUrn(TEST_DATASET_URN);
MetadataChangeLog event =
createUpstreamLineageMCL(
List.of(upstreamUrn), downstreamUrn, List.of(TEST_DATASET_URN_2), TEST_DATASET_URN);
updateIndicesHook.invoke(event);
Edge edge =
new Edge(
downstreamUrn,
upstreamUrn,
DOWNSTREAM_OF,
null,
null,
null,
null,
null,
lifeCycleOwner,
null);
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge));
Mockito.verify(mockGraphService, Mockito.times(0))
.removeEdgesFromNode(
any(OperationContext.class),
Mockito.eq(downstreamUrn),
Mockito.eq(Set.of(DOWNSTREAM_OF)),
Mockito.eq(
newRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING)));
}
@Test
public void testFineGrainedLineageNotAllowed_When_upstream_downstream_type_hdfs()
throws Exception {
updateIndicesService.getUpdateGraphIndicesService().setGraphDiffMode(false);
updateIndicesService
.getUpdateGraphIndicesService()
.setFineGrainedLineageNotAllowedForPlatforms(List.of(TEST_HDFS_PLATFORM));
Urn upstreamUrn_1 =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),field_foo_1)");
Urn upstreamUrn_2 =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),field_foo_2)");
Urn downstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,CypressHdfsDataset,PROD),field_foo)");
Urn lifeCycleOwner =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,CypressHdfsDataset,PROD)");
MetadataChangeLog event =
createUpstreamLineageMCL(
List.of(upstreamUrn_1, upstreamUrn_2),
downstreamUrn,
List.of(TEST_DATASET_URN_2),
TEST_DATASET_URN_4);
updateIndicesHook.invoke(event);
Edge edge_1 =
new Edge(
downstreamUrn,
upstreamUrn_1,
DOWNSTREAM_OF,
null,
null,
null,
null,
null,
lifeCycleOwner,
null);
Edge edge_2 =
new Edge(
downstreamUrn,
upstreamUrn_2,
DOWNSTREAM_OF,
null,
null,
null,
null,
null,
lifeCycleOwner,
null);
Edge edge_3 =
new Edge(
Urn.createFromString(TEST_DATASET_URN_4),
upstreamUrn_1,
DOWNSTREAM_OF,
0L,
Urn.createFromString(TEST_ACTOR_URN),
0L,
Urn.createFromString(TEST_ACTOR_URN),
null,
null,
null);
Edge edge_4 =
new Edge(
Urn.createFromString(TEST_DATASET_URN_4),
upstreamUrn_2,
DOWNSTREAM_OF,
0L,
Urn.createFromString(TEST_ACTOR_URN),
0L,
Urn.createFromString(TEST_ACTOR_URN),
null,
null,
null);
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge_1));
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge_2));
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge_3));
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge_4));
Mockito.verify(mockGraphService, Mockito.times(0))
.removeEdgesFromNode(
any(OperationContext.class),
Mockito.eq(downstreamUrn),
Mockito.eq(Set.of(DOWNSTREAM_OF)),
Mockito.eq(
newRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING)));
}
@Test
public void testFineGrainedLineageNotAllowed_When_multiple_upstreams() throws Exception {
updateIndicesService.getUpdateGraphIndicesService().setGraphDiffMode(false);
updateIndicesService
.getUpdateGraphIndicesService()
.setFineGrainedLineageNotAllowedForPlatforms(List.of(TEST_HDFS_PLATFORM));
Urn upstreamUrn_1 = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HIVE_FIELD_INFO);
Urn upstreamUrn_2 = UrnUtils.getUrn(TEST_SCHEMA_FIELD_HDFS_FIELD_INFO);
Urn downstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,CypressHdfsDataset,PROD),field_foo)");
Urn lifeCycleOwner =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,CypressHdfsDataset,PROD)");
MetadataChangeLog event =
createUpstreamLineageMCL(
List.of(upstreamUrn_1, upstreamUrn_2),
downstreamUrn,
List.of(TEST_DATASET_URN_2, TEST_DATASET_URN),
TEST_DATASET_URN_4);
updateIndicesHook.invoke(event);
Edge edge_1 =
new Edge(
downstreamUrn,
upstreamUrn_1,
DOWNSTREAM_OF,
null,
null,
null,
null,
null,
lifeCycleOwner,
null);
Edge edge_2 =
new Edge(
downstreamUrn,
upstreamUrn_2,
DOWNSTREAM_OF,
null,
null,
null,
null,
null,
lifeCycleOwner,
null);
Edge edge_3 =
new Edge(
Urn.createFromString(TEST_DATASET_URN_4),
upstreamUrn_1,
DOWNSTREAM_OF,
0L,
Urn.createFromString(TEST_ACTOR_URN),
0L,
Urn.createFromString(TEST_ACTOR_URN),
null,
null,
null);
Edge edge_4 =
new Edge(
Urn.createFromString(TEST_DATASET_URN_4),
upstreamUrn_2,
DOWNSTREAM_OF,
0L,
Urn.createFromString(TEST_ACTOR_URN),
0L,
Urn.createFromString(TEST_ACTOR_URN),
null,
null,
null);
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge_1));
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge_2));
Mockito.verify(mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge_3));
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge_4));
Mockito.verify(mockGraphService, Mockito.times(0))
.removeEdgesFromNode(
any(OperationContext.class),
Mockito.eq(downstreamUrn),
Mockito.eq(Set.of(DOWNSTREAM_OF)),
Mockito.eq(
newRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING)));
}
@Test
public void testFineGrainedLineageNotAllowed_with_Old_aspect_and_graphDiffMode_true()
throws Exception {
updateIndicesService.getUpdateGraphIndicesService().setGraphDiffMode(true);
updateIndicesService
.getUpdateGraphIndicesService()
.setFineGrainedLineageNotAllowedForPlatforms(List.of(TEST_HDFS_PLATFORM));
Urn new_upstreamUrn_1 =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)");
Urn new_upstreamUrn_2 =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),field_foo)");
Urn old_upstreamUrn_1 =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo_1)");
Urn old_upstreamUrn_2 =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),field_foo_1)");
Urn downstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,CypressHdfsDataset,PROD),field_foo)");
Urn lifeCycleOwner =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,CypressHdfsDataset,PROD)");
MetadataChangeLog event =
createUpstreamLineageMCLWithOldAspect(
List.of(new_upstreamUrn_1, new_upstreamUrn_2),
List.of(old_upstreamUrn_1, old_upstreamUrn_2),
downstreamUrn,
List.of(TEST_DATASET_URN_2, TEST_DATASET_URN),
TEST_DATASET_URN_4);
updateIndicesHook.invoke(event);
Edge edge_1 =
new Edge(
downstreamUrn,
new_upstreamUrn_1,
DOWNSTREAM_OF,
null,
null,
null,
null,
null,
lifeCycleOwner,
null);
Edge edge_2 =
new Edge(
downstreamUrn,
new_upstreamUrn_2,
DOWNSTREAM_OF,
null,
null,
null,
null,
null,
lifeCycleOwner,
null);
Edge edge_3 =
new Edge(
Urn.createFromString(TEST_DATASET_URN_4),
new_upstreamUrn_1,
DOWNSTREAM_OF,
0L,
Urn.createFromString(TEST_ACTOR_URN),
0L,
Urn.createFromString(TEST_ACTOR_URN),
null,
null,
null);
Edge edge_4 =
new Edge(
Urn.createFromString(TEST_DATASET_URN_4),
new_upstreamUrn_2,
DOWNSTREAM_OF,
0L,
Urn.createFromString(TEST_ACTOR_URN),
0L,
Urn.createFromString(TEST_ACTOR_URN),
null,
null,
null);
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge_1));
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge_2));
Mockito.verify(mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge_3));
Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.eq(edge_4));
Mockito.verify(mockGraphService, Mockito.times(0))
.removeEdgesFromNode(
any(OperationContext.class),
Mockito.eq(downstreamUrn),
Mockito.eq(Set.of(DOWNSTREAM_OF)),
Mockito.eq(
newRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING)));
}
@Test
public void testUpdateIndexMappings() throws CloneNotSupportedException {
// ensure no mutation
@ -536,13 +904,23 @@ public class UpdateIndicesHookTest {
return mockSpec;
}
private MetadataChangeLog createUpstreamLineageMCL(Urn upstreamUrn, Urn downstreamUrn)
private MetadataChangeLog createUpstreamLineageMCL(
List<Urn> upstreamUrn,
Urn downstreamUrn,
List<String> upstreamDatasets,
String downstreamDataset)
throws Exception {
return createUpstreamLineageMCL(upstreamUrn, downstreamUrn, ChangeType.UPSERT);
return createUpstreamLineageMCL(
upstreamUrn, downstreamUrn, ChangeType.UPSERT, upstreamDatasets, downstreamDataset);
}
private MetadataChangeLog createUpstreamLineageMCL(
Urn upstreamUrn, Urn downstreamUrn, ChangeType changeType) throws Exception {
List<Urn> upstreamUrn,
Urn downstreamUrn,
ChangeType changeType,
List<String> upstreamDatasets,
String downstreamDataset)
throws Exception {
MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(Constants.DATASET_ENTITY_NAME);
event.setAspectName(Constants.UPSTREAM_LINEAGE_ASPECT_NAME);
@ -551,10 +929,10 @@ public class UpdateIndicesHookTest {
UpstreamLineage upstreamLineage = new UpstreamLineage();
FineGrainedLineageArray fineGrainedLineages = new FineGrainedLineageArray();
FineGrainedLineage fineGrainedLineage = new FineGrainedLineage();
fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD);
fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.DATASET);
fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
UrnArray upstreamUrns = new UrnArray();
upstreamUrns.add(upstreamUrn);
upstreamUrns.addAll(upstreamUrn);
fineGrainedLineage.setUpstreams(upstreamUrns);
UrnArray downstreamUrns = new UrnArray();
downstreamUrns.add(downstreamUrn);
@ -562,25 +940,69 @@ public class UpdateIndicesHookTest {
fineGrainedLineages.add(fineGrainedLineage);
upstreamLineage.setFineGrainedLineages(fineGrainedLineages);
final UpstreamArray upstreamArray = new UpstreamArray();
final Upstream upstream = new Upstream();
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(
DatasetUrn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD)"));
upstreamArray.add(upstream);
for (String upstreamDataset : upstreamDatasets) {
final Upstream upstream = new Upstream();
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(DatasetUrn.createFromString(upstreamDataset));
upstreamArray.add(upstream);
}
upstreamLineage.setUpstreams(upstreamArray);
event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage));
event.setEntityUrn(Urn.createFromString(TEST_DATASET_URN));
event.setEntityUrn(Urn.createFromString(downstreamDataset));
event.setEntityType(DATASET_ENTITY_NAME);
event.setCreated(new AuditStamp().setActor(actorUrn).setTime(EVENT_TIME));
return event;
}
private MetadataChangeLog createUpstreamLineageMCLWithOldAspect(
List<Urn> newUpstreamSchemaFieldUrn,
List<Urn> oldUpstreamSchemaFieldUrn,
Urn downstreamUrn,
List<String> upstreamDatasets,
String downstreamDataset)
throws Exception {
MetadataChangeLog event =
createUpstreamLineageMCL(
newUpstreamSchemaFieldUrn, downstreamUrn, upstreamDatasets, downstreamDataset);
// Set the Old Aspect;
UpstreamLineage upstreamLineage = new UpstreamLineage();
FineGrainedLineageArray fineGrainedLineages = new FineGrainedLineageArray();
FineGrainedLineage fineGrainedLineage = new FineGrainedLineage();
fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
UrnArray upstreamUrns = new UrnArray();
upstreamUrns.addAll(oldUpstreamSchemaFieldUrn);
fineGrainedLineage.setUpstreams(upstreamUrns);
UrnArray downstreamUrns = new UrnArray();
downstreamUrns.add(downstreamUrn);
fineGrainedLineage.setDownstreams(downstreamUrns);
fineGrainedLineages.add(fineGrainedLineage);
upstreamLineage.setFineGrainedLineages(fineGrainedLineages);
final UpstreamArray upstreamArray = new UpstreamArray();
for (String upstreamDataset : upstreamDatasets) {
final Upstream upstream = new Upstream();
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(DatasetUrn.createFromString(upstreamDataset));
upstreamArray.add(upstream);
}
upstreamLineage.setUpstreams(upstreamArray);
event.setPreviousAspectValue(GenericRecordUtils.serializeAspect(upstreamLineage));
return event;
}
private MetadataChangeLog createUpstreamLineageMCLUIPreProcessed(
Urn upstreamUrn, Urn downstreamUrn, ChangeType changeType) throws Exception {
List<Urn> upstreamUrn,
Urn downstreamUrn,
ChangeType changeType,
List<String> upstreamDataset,
String downstreamDataset)
throws Exception {
final MetadataChangeLog metadataChangeLog =
createUpstreamLineageMCL(upstreamUrn, downstreamUrn, changeType);
createUpstreamLineageMCL(
upstreamUrn, downstreamUrn, changeType, upstreamDataset, downstreamDataset);
final StringMap properties = new StringMap();
properties.put(APP_SOURCE, UI_SOURCE);
final SystemMetadata systemMetadata = new SystemMetadata().setProperties(properties);

View File

@ -658,6 +658,7 @@ featureFlags:
showProductUpdates: ${SHOW_PRODUCT_UPDATES:true} # Whether to show in-product update popover on new updates.
logicalModelsEnabled: ${LOGICAL_MODELS_ENABLED:false} # Enables logical models feature
showHomepageUserRole: ${SHOW_HOMEPAGE_USER_ROLE:false} # Enables displaying the homepage user role underneath the name. Only relevant for custom home page
fineGrainedLineageNotAllowedForPlatforms: ${FINE_GRAINED_LINEAGE_NOT_ALLOWED_FOR_PLATFORMS:} # Comma separated list of platforms for which schemaFields entity edges will not be allowed to be created. for example: "hdfs, s3"
entityChangeEvents:
enabled: ${ENABLE_ENTITY_CHANGE_EVENTS_HOOK:true}

View File

@ -9,6 +9,7 @@ import com.linkedin.metadata.service.UpdateGraphIndicesService;
import com.linkedin.metadata.service.UpdateIndicesService;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import java.util.List;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
@ -46,10 +47,16 @@ public class UpdateIndicesServiceFactory {
TimeseriesAspectService timeseriesAspectService,
SystemMetadataService systemMetadataService,
SearchDocumentTransformer searchDocumentTransformer,
@Value("${elasticsearch.idHashAlgo}") final String idHashAlgo) {
@Value("${elasticsearch.idHashAlgo}") final String idHashAlgo,
@Value("#{'${featureFlags.fineGrainedLineageNotAllowedForPlatforms}'.split(',')}")
final List<String> fineGrainedLineageNotAllowedForPlatforms) {
return new UpdateIndicesService(
new UpdateGraphIndicesService(graphService, graphDiffMode, graphStatusEnabled),
new UpdateGraphIndicesService(
graphService,
graphDiffMode,
graphStatusEnabled,
fineGrainedLineageNotAllowedForPlatforms),
entitySearchService,
timeseriesAspectService,
systemMetadataService,
@ -69,11 +76,17 @@ public class UpdateIndicesServiceFactory {
final SystemMetadataService systemMetadataService,
final SearchDocumentTransformer searchDocumentTransformer,
final EntityService<?> entityService,
@Value("${elasticsearch.idHashAlgo}") final String idHashAlgo) {
@Value("${elasticsearch.idHashAlgo}") final String idHashAlgo,
@Value("#{'${featureFlags.fineGrainedLineageNotAllowedForPlatforms}'.split(',')}")
final List<String> fineGrainedLineageNotAllowedForPlatforms) {
UpdateIndicesService updateIndicesService =
new UpdateIndicesService(
new UpdateGraphIndicesService(graphService, graphDiffMode, graphStatusEnabled),
new UpdateGraphIndicesService(
graphService,
graphDiffMode,
graphStatusEnabled,
fineGrainedLineageNotAllowedForPlatforms),
entitySearchService,
timeseriesAspectService,
systemMetadataService,