mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-31 12:52:13 +00:00
fix(lineage) Filter dataset -> dataset lineage edges if data is transformed (#4732)
Co-authored-by: Chris Collins <chriscollins@Chriss-MBP-73.lan>
This commit is contained in:
parent
9d1168317c
commit
0e6dc9edae
@ -32,6 +32,7 @@ import { GetGlossaryTermDocument, GetGlossaryTermQuery } from './graphql/glossar
|
||||
import { GetEntityCountsDocument } from './graphql/app.generated';
|
||||
import { GetMeDocument } from './graphql/me.generated';
|
||||
import { ListRecommendationsDocument } from './graphql/recommendations.generated';
|
||||
import { FetchedEntity } from './app/lineage/types';
|
||||
|
||||
const user1 = {
|
||||
username: 'sdas',
|
||||
@ -115,7 +116,7 @@ const dataPlatform = {
|
||||
},
|
||||
};
|
||||
|
||||
const dataset1 = {
|
||||
export const dataset1 = {
|
||||
urn: 'urn:li:dataset:1',
|
||||
type: EntityType.Dataset,
|
||||
platform: {
|
||||
@ -208,7 +209,7 @@ const dataset1 = {
|
||||
deprecation: null,
|
||||
};
|
||||
|
||||
const dataset2 = {
|
||||
export const dataset2 = {
|
||||
urn: 'urn:li:dataset:2',
|
||||
type: EntityType.Dataset,
|
||||
platform: {
|
||||
@ -231,6 +232,7 @@ const dataset2 = {
|
||||
name: 'Some Other Dataset',
|
||||
description: 'This is some other dataset, so who cares!',
|
||||
customProperties: [],
|
||||
origin: 'PROD',
|
||||
},
|
||||
editableProperties: null,
|
||||
created: {
|
||||
@ -1055,8 +1057,8 @@ export const dataJob1 = {
|
||||
editableProperties: null,
|
||||
inputOutput: {
|
||||
__typename: 'DataJobInputOutput',
|
||||
inputDatasets: [dataset3],
|
||||
outputDatasets: [dataset3],
|
||||
inputDatasets: [dataset5],
|
||||
outputDatasets: [dataset6],
|
||||
inputDatajobs: [],
|
||||
},
|
||||
globalTags: {
|
||||
@ -1298,6 +1300,28 @@ export const mlModel = {
|
||||
deprecation: null,
|
||||
} as MlModel;
|
||||
|
||||
export const dataset1FetchedEntity = {
|
||||
urn: dataset1.urn,
|
||||
name: dataset1.name,
|
||||
type: dataset1.type,
|
||||
upstreamChildren: [],
|
||||
downstreamChildren: [
|
||||
{ type: EntityType.Dataset, entity: dataset2 },
|
||||
{ type: EntityType.DataJob, entity: dataJob1 },
|
||||
],
|
||||
} as FetchedEntity;
|
||||
|
||||
export const dataset2FetchedEntity = {
|
||||
urn: dataset2.urn,
|
||||
name: 'test name',
|
||||
type: dataset2.type,
|
||||
upstreamChildren: [
|
||||
{ type: EntityType.Dataset, entity: dataset1 },
|
||||
{ type: EntityType.DataJob, entity: dataJob1 },
|
||||
],
|
||||
downstreamChildren: [],
|
||||
} as FetchedEntity;
|
||||
|
||||
export const mlModelGroup = {
|
||||
__typename: 'MLModelGroup',
|
||||
urn: 'urn:li:mlModelGroup:(urn:li:dataPlatform:sagemaker,another-group,PROD)',
|
||||
|
@ -0,0 +1,70 @@
|
||||
import { dataset1, dataset2, dataJob1, dataset1FetchedEntity, dataset2FetchedEntity } from '../../../Mocks';
|
||||
import { EntityType } from '../../../types.generated';
|
||||
import { Direction, EntityAndType, FetchedEntity } from '../types';
|
||||
import { shouldIncludeChildEntity } from '../utils/constructFetchedNode';
|
||||
|
||||
describe('shouldIncludeChildEntity', () => {
|
||||
const parentChildren = [
|
||||
{ entity: dataset1, type: dataset1.type },
|
||||
{ entity: dataJob1, type: dataJob1.type },
|
||||
] as EntityAndType[];
|
||||
|
||||
it('should return false if parent and child are datasets and the child has a datajob child that belongs to the parent children', () => {
|
||||
const shouldBeIncluded = shouldIncludeChildEntity(
|
||||
Direction.Upstream,
|
||||
parentChildren,
|
||||
dataset1FetchedEntity,
|
||||
dataset2FetchedEntity,
|
||||
);
|
||||
|
||||
expect(shouldBeIncluded).toBe(false);
|
||||
});
|
||||
|
||||
it('should return true if the datajob is not a child of the parent', () => {
|
||||
const parentChild = [{ entity: dataset1, type: dataset1.type }] as EntityAndType[];
|
||||
const shouldBeIncluded = shouldIncludeChildEntity(
|
||||
Direction.Upstream,
|
||||
parentChild,
|
||||
dataset1FetchedEntity,
|
||||
dataset2FetchedEntity,
|
||||
);
|
||||
|
||||
expect(shouldBeIncluded).toBe(true);
|
||||
});
|
||||
|
||||
it('should return true if either parent or child is not a dataset', () => {
|
||||
const fetchedDatajobEntity = { ...dataset1FetchedEntity, type: EntityType.DataJob };
|
||||
let shouldBeIncluded = shouldIncludeChildEntity(
|
||||
Direction.Upstream,
|
||||
parentChildren,
|
||||
fetchedDatajobEntity,
|
||||
dataset2FetchedEntity,
|
||||
);
|
||||
expect(shouldBeIncluded).toBe(true);
|
||||
|
||||
const fetchedDashboardEntity = { ...dataset2FetchedEntity, type: EntityType.Dashboard };
|
||||
shouldBeIncluded = shouldIncludeChildEntity(
|
||||
Direction.Upstream,
|
||||
parentChildren,
|
||||
dataset1FetchedEntity,
|
||||
fetchedDashboardEntity,
|
||||
);
|
||||
expect(shouldBeIncluded).toBe(true);
|
||||
});
|
||||
|
||||
it('should return true if the parent has a datajob child that is not a child of the dataset child', () => {
|
||||
const updatedDataset1FetchedEntity = {
|
||||
...dataset1FetchedEntity,
|
||||
downstreamChildren: [{ type: EntityType.Dataset, entity: dataset2 }],
|
||||
} as FetchedEntity;
|
||||
|
||||
const shouldBeIncluded = shouldIncludeChildEntity(
|
||||
Direction.Upstream,
|
||||
parentChildren,
|
||||
updatedDataset1FetchedEntity,
|
||||
dataset2FetchedEntity,
|
||||
);
|
||||
|
||||
expect(shouldBeIncluded).toBe(true);
|
||||
});
|
||||
});
|
@ -1,4 +1,5 @@
|
||||
import {
|
||||
dataJob1,
|
||||
dataset3,
|
||||
dataset3WithLineage,
|
||||
dataset4,
|
||||
@ -7,7 +8,7 @@ import {
|
||||
dataset5WithLineage,
|
||||
dataset6WithLineage,
|
||||
} from '../../../Mocks';
|
||||
import { EntityType } from '../../../types.generated';
|
||||
import { EntityType, RelationshipDirection } from '../../../types.generated';
|
||||
import { getTestEntityRegistry } from '../../../utils/test-utils/TestPageContainer';
|
||||
import { Direction, FetchedEntities } from '../types';
|
||||
import constructTree from '../utils/constructTree';
|
||||
@ -299,4 +300,89 @@ describe('constructTree', () => {
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('should not include a Dataset as a child if that Dataset has a Datajob child which points to the parent', () => {
|
||||
// dataset6 is downstream of dataset5 and datajob1, datajob 1 is downstream of dataset 5
|
||||
const updatedDataset6WithLineage = {
|
||||
...dataset6WithLineage,
|
||||
downstream: null,
|
||||
upstream: {
|
||||
start: 0,
|
||||
count: 2,
|
||||
total: 2,
|
||||
relationships: [
|
||||
{
|
||||
type: 'DownstreamOf',
|
||||
direction: RelationshipDirection.Incoming,
|
||||
entity: dataset5,
|
||||
},
|
||||
{
|
||||
type: 'DownstreamOf',
|
||||
direction: RelationshipDirection.Incoming,
|
||||
entity: dataJob1,
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
const updatedDataset5WithLineage = {
|
||||
...dataset5WithLineage,
|
||||
downstream: {
|
||||
...dataset5WithLineage.downstream,
|
||||
relationships: [
|
||||
...dataset5WithLineage.downstream.relationships,
|
||||
{
|
||||
type: 'DownstreamOf',
|
||||
direction: RelationshipDirection.Outgoing,
|
||||
entity: dataJob1,
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
const fetchedEntities = [
|
||||
{ entity: updatedDataset5WithLineage, direction: Direction.Upstream, fullyFetched: true },
|
||||
{ entity: dataJob1, direction: Direction.Upstream, fullyFetched: true },
|
||||
];
|
||||
const mockFetchedEntities = fetchedEntities.reduce(
|
||||
(acc, entry) =>
|
||||
extendAsyncEntities(
|
||||
acc,
|
||||
testEntityRegistry,
|
||||
{ entity: entry.entity, type: entry.entity.type },
|
||||
entry.fullyFetched,
|
||||
),
|
||||
{} as FetchedEntities,
|
||||
);
|
||||
expect(
|
||||
constructTree(
|
||||
{ entity: updatedDataset6WithLineage, type: EntityType.Dataset },
|
||||
mockFetchedEntities,
|
||||
Direction.Upstream,
|
||||
testEntityRegistry,
|
||||
),
|
||||
).toEqual({
|
||||
name: 'Display Name of Sixth',
|
||||
expandedName: 'Fully Qualified Name of Sixth Test Dataset',
|
||||
urn: 'urn:li:dataset:6',
|
||||
type: EntityType.Dataset,
|
||||
unexploredChildren: 0,
|
||||
icon: undefined,
|
||||
platform: 'Kafka',
|
||||
subtype: undefined,
|
||||
children: [
|
||||
{
|
||||
name: 'DataJobInfoName',
|
||||
expandedName: undefined,
|
||||
type: EntityType.DataJob,
|
||||
unexploredChildren: 0,
|
||||
urn: dataJob1.urn,
|
||||
children: [],
|
||||
countercurrentChildrenUrns: [],
|
||||
icon: '',
|
||||
status: null,
|
||||
platform: 'Airflow',
|
||||
subtype: undefined,
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -1,4 +1,28 @@
|
||||
import { Direction, FetchedEntities, NodeData } from '../types';
|
||||
import { EntityType } from '../../../types.generated';
|
||||
import { Direction, EntityAndType, FetchedEntities, FetchedEntity, NodeData } from '../types';
|
||||
|
||||
// If there are nodes A, B, C and A -> B, B -> C, A -> C, where A and C are Datasets and B is a DataJob, we don't want to show edge A -> C
|
||||
export function shouldIncludeChildEntity(
|
||||
direction: Direction,
|
||||
parentChildren?: EntityAndType[],
|
||||
childEntity?: FetchedEntity | null,
|
||||
parentEntity?: FetchedEntity,
|
||||
) {
|
||||
if (
|
||||
parentEntity?.type === EntityType.Dataset &&
|
||||
childEntity?.type === EntityType.Dataset &&
|
||||
childEntity &&
|
||||
parentChildren
|
||||
) {
|
||||
// we want the children of this child entity in the opposite direction of the parent to see if we connect back to the parent
|
||||
const childrenKey = direction === Direction.Upstream ? 'downstreamChildren' : 'upstreamChildren';
|
||||
return !childEntity[childrenKey]?.some(
|
||||
(child) =>
|
||||
child.type === EntityType.DataJob && parentChildren.some((c) => c.entity.urn === child.entity.urn),
|
||||
);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
export default function constructFetchedNode(
|
||||
urn: string,
|
||||
@ -18,6 +42,8 @@ export default function constructFetchedNode(
|
||||
return constructedNodes[urn];
|
||||
}
|
||||
|
||||
const childrenKey = direction === Direction.Upstream ? 'upstreamChildren' : 'downstreamChildren';
|
||||
|
||||
if (fetchedNode && !constructedNodes[urn]) {
|
||||
const node: NodeData = {
|
||||
name: fetchedNode.name,
|
||||
@ -27,9 +53,7 @@ export default function constructFetchedNode(
|
||||
subtype: fetchedNode.subtype,
|
||||
icon: fetchedNode.icon,
|
||||
unexploredChildren:
|
||||
fetchedNode?.[direction === Direction.Upstream ? 'upstreamChildren' : 'downstreamChildren']?.filter(
|
||||
(childUrn) => !(childUrn.entity.urn in fetchedEntities),
|
||||
).length || 0,
|
||||
fetchedNode?.[childrenKey]?.filter((childUrn) => !(childUrn.entity.urn in fetchedEntities)).length || 0,
|
||||
countercurrentChildrenUrns:
|
||||
fetchedNode?.[direction === Direction.Downstream ? 'upstreamChildren' : 'downstreamChildren']?.map(
|
||||
(child) => child.entity.urn,
|
||||
@ -43,7 +67,7 @@ export default function constructFetchedNode(
|
||||
constructedNodes[urn] = node;
|
||||
|
||||
node.children =
|
||||
(fetchedNode?.[direction === Direction.Upstream ? 'upstreamChildren' : 'downstreamChildren']
|
||||
(fetchedNode?.[childrenKey]
|
||||
?.map((child) => {
|
||||
if (child.entity.urn === node.urn) {
|
||||
return null;
|
||||
@ -56,6 +80,11 @@ export default function constructFetchedNode(
|
||||
newConstructionPath,
|
||||
);
|
||||
})
|
||||
?.filter((child) => {
|
||||
const childEntity = fetchedEntities[child?.urn || ''];
|
||||
const parentChildren = fetchedNode[childrenKey];
|
||||
return shouldIncludeChildEntity(direction, parentChildren, childEntity, fetchedNode);
|
||||
})
|
||||
.filter(Boolean) as Array<NodeData>) || [];
|
||||
|
||||
return node;
|
||||
|
@ -1,6 +1,6 @@
|
||||
import EntityRegistry from '../../entity/EntityRegistry';
|
||||
import { Direction, EntityAndType, FetchedEntities, NodeData } from '../types';
|
||||
import constructFetchedNode from './constructFetchedNode';
|
||||
import constructFetchedNode, { shouldIncludeChildEntity } from './constructFetchedNode';
|
||||
|
||||
export default function constructTree(
|
||||
entityAndType: EntityAndType | null | undefined,
|
||||
@ -41,6 +41,10 @@ export default function constructTree(
|
||||
root.urn || '',
|
||||
]);
|
||||
})
|
||||
?.filter((child) => {
|
||||
const childEntity = fetchedEntities[child?.urn || ''];
|
||||
return shouldIncludeChildEntity(direction, children, childEntity, fetchedEntity);
|
||||
})
|
||||
?.filter(Boolean) as Array<NodeData>;
|
||||
return root;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user