fix(lineage): lineage incorrect for some entities (#13020)

This commit is contained in:
Chakru 2025-03-30 19:01:19 +05:30 committed by GitHub
parent d2dd54acf1
commit 04a8e305a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 90 additions and 12 deletions

View File

@ -1,6 +1,5 @@
package com.linkedin.metadata.models.registry;
import com.google.common.collect.Streams;
import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.annotation.RelationshipAnnotation;
@ -110,15 +109,30 @@ public class LineageRegistry {
public Set<String> getEntitiesWithLineageToEntityType(String entityType) {
Map<String, EntitySpec> specs = _entityRegistry.getEntitySpecs();
Set<String> result =
Streams.concat(
_lineageSpecMap.get(entityType.toLowerCase()).getDownstreamEdges().stream(),
_lineageSpecMap.get(entityType.toLowerCase()).getUpstreamEdges().stream())
.map(EdgeInfo::getOpposingEntityType)
.map(entity -> specs.get(entity.toLowerCase()).getName())
.collect(Collectors.toSet());
result.add(entityType);
return result;
Set<String> discoveredTypes = new HashSet<>();
Set<String> typesToProcess = new HashSet<>();
typesToProcess.add(entityType);
while (!typesToProcess.isEmpty()) {
Set<String> nextBatch = new HashSet<>();
for (String currentType : typesToProcess) {
if (discoveredTypes.add(currentType)) {
LineageSpec lineageSpec = _lineageSpecMap.get(currentType.toLowerCase());
if (lineageSpec != null) {
Stream.concat(
lineageSpec.getDownstreamEdges().stream(),
lineageSpec.getUpstreamEdges().stream())
.map(EdgeInfo::getOpposingEntityType)
.map(entity -> specs.get(entity.toLowerCase()).getName())
.forEach(nextBatch::add);
}
}
}
typesToProcess = nextBatch;
}
return discoveredTypes;
}
public List<EdgeInfo> getLineageRelationships(String entityName, LineageDirection direction) {

View File

@ -15,6 +15,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.testng.annotations.Test;
public class LineageRegistryTest {
@ -78,6 +79,65 @@ public class LineageRegistryTest {
"Consumes", RelationshipDirection.INCOMING, "dataJob")));
}
@Test
public void testGetEntitiesWithLineageToEntityType() {
Map<String, EntitySpec> mockEntitySpecs = new HashMap<>();
// Create dataset spec with lineage relationships
EntitySpec mockDatasetSpec = mock(EntitySpec.class);
List<RelationshipFieldSpec> datasetRelations =
ImmutableList.of(
buildSpec("DownstreamOf", ImmutableList.of("dataset"), true, true),
buildSpec("Produces", ImmutableList.of("dataJob"), false, true));
when(mockDatasetSpec.getRelationshipFieldSpecs()).thenReturn(datasetRelations);
when(mockDatasetSpec.getName()).thenReturn("dataset");
mockEntitySpecs.put("dataset", mockDatasetSpec);
// Create dataJob spec with lineage relationships
EntitySpec mockJobSpec = mock(EntitySpec.class);
List<RelationshipFieldSpec> jobRelations =
ImmutableList.of(
buildSpec("Produces", ImmutableList.of("dataset"), false, true),
buildSpec("Consumes", ImmutableList.of("dataset"), true, true));
when(mockJobSpec.getRelationshipFieldSpecs()).thenReturn(jobRelations);
when(mockJobSpec.getName()).thenReturn("dataJob");
mockEntitySpecs.put("datajob", mockJobSpec);
// Create chart spec with lineage relationships
EntitySpec mockChartSpec = mock(EntitySpec.class);
List<RelationshipFieldSpec> chartRelations =
ImmutableList.of(buildSpec("Consumes", ImmutableList.of("dataset"), true, true));
when(mockChartSpec.getRelationshipFieldSpecs()).thenReturn(chartRelations);
when(mockChartSpec.getName()).thenReturn("chart");
mockEntitySpecs.put("chart", mockChartSpec);
EntityRegistry entityRegistry = mock(EntityRegistry.class);
when(entityRegistry.getEntitySpecs()).thenReturn(mockEntitySpecs);
LineageRegistry lineageRegistry = new LineageRegistry(entityRegistry);
// Test getting entities with lineage to dataset
Set<String> entitiesWithLineage = lineageRegistry.getEntitiesWithLineageToEntityType("dataset");
assertEquals(entitiesWithLineage.size(), 3);
assertTrue(entitiesWithLineage.contains("dataset"));
assertTrue(entitiesWithLineage.contains("dataJob"));
assertTrue(entitiesWithLineage.contains("chart"));
// Test getting entities with lineage to dataJob
entitiesWithLineage = lineageRegistry.getEntitiesWithLineageToEntityType("dataJob");
assertEquals(entitiesWithLineage.size(), 3);
assertTrue(entitiesWithLineage.contains("dataset"));
assertTrue(entitiesWithLineage.contains("dataJob"));
assertTrue(entitiesWithLineage.contains("chart"));
// Test getting entities with lineage to chart
entitiesWithLineage = lineageRegistry.getEntitiesWithLineageToEntityType("chart");
assertEquals(entitiesWithLineage.size(), 3);
assertTrue(entitiesWithLineage.contains("dataset"));
assertTrue(entitiesWithLineage.contains("dataJob"));
assertTrue(entitiesWithLineage.contains("chart"));
}
private RelationshipFieldSpec buildSpec(
String relationshipType,
List<String> destinationEntityTypes,

View File

@ -10,6 +10,7 @@ const second_degree = [
"urn:li:dashboard:(looker,cypress_baz)",
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)",
"urn:li:mlPrimaryKey:(cypress-test-2,some-cypress-feature-2)",
"urn:li:mlModel:(urn:li:dataPlatform:sagemaker,cypress-model,PROD)",
];
const third_degree_plus = [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,cypress_dag_abc,PROD),cypress_task_123)",
@ -18,6 +19,7 @@ const third_degree_plus = [
"urn:li:dataset:(urn:li:dataPlatform:hive,fct_cypress_users_created,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hive,fct_cypress_users_created_no_tag,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hive,fct_cypress_users_deleted,PROD)",
"urn:li:mlModelGroup:(urn:li:dataPlatform:sagemaker,cypress-model-package-group,PROD)",
];
const downloadCsvFile = (filename) => {
cy.get('[data-testid="three-dot-menu"]').click();
@ -55,7 +57,7 @@ describe("download lineage results to .csv file", () => {
// Verify 1st and 2nd degree of dependencies
cy.get('[data-testid="facet-degree-2"]').click().wait(5000);
cy.contains(/1 - [7-8] of [7-8]/);
cy.contains(/1 - [8-9] of [8-9]/);
downloadCsvFile("second_degree_results.csv");
const second_degree_csv = cy.readFile(
"cypress/downloads/second_degree_results.csv",

View File

@ -10,6 +10,7 @@ const second_degree = [
"urn:li:dashboard:(looker,cypress_baz)",
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)",
"urn:li:mlPrimaryKey:(cypress-test-2,some-cypress-feature-2)",
"urn:li:mlModel:(urn:li:dataPlatform:sagemaker,cypress-model,PROD)",
];
const third_degree_plus = [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,cypress_dag_abc,PROD),cypress_task_123)",
@ -18,6 +19,7 @@ const third_degree_plus = [
"urn:li:dataset:(urn:li:dataPlatform:hive,fct_cypress_users_created,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hive,fct_cypress_users_created_no_tag,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hive,fct_cypress_users_deleted,PROD)",
"urn:li:mlModelGroup:(urn:li:dataPlatform:sagemaker,cypress-model-package-group,PROD)",
];
const downloadCsvFile = (filename) => {
cy.get(".ant-list-items").should("be.visible");
@ -57,7 +59,7 @@ describe("download lineage results to .csv file", () => {
// Verify 1st and 2nd degree of dependencies
cy.get('[data-testid="facet-degree-2"]').click().wait(5000);
cy.contains(/1 - [7-8] of [7-8]/);
cy.contains(/1 - [8-9] of [8-9]/);
downloadCsvFile("second_degree_results.csv");
const second_degree_csv = cy.readFile(
"cypress/downloads/second_degree_results.csv",