feat(ui) Add ability to view CLL through DataJobs in lineage visualization (#8281)

This commit is contained in:
Chris Collins 2023-06-25 13:41:22 -04:00 committed by GitHub
parent a4ab7672ac
commit 1343082535
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 251 additions and 51 deletions

View File

@ -0,0 +1,53 @@
package com.linkedin.datahub.graphql.types.common.mappers;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.generated.FineGrainedLineage;
import com.linkedin.datahub.graphql.generated.SchemaFieldRef;
import com.linkedin.dataset.FineGrainedLineageArray;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static com.linkedin.metadata.Constants.SCHEMA_FIELD_ENTITY_NAME;
public class FineGrainedLineagesMapper {
public static final FineGrainedLineagesMapper INSTANCE = new FineGrainedLineagesMapper();
public static List<FineGrainedLineage> map(@Nonnull final FineGrainedLineageArray fineGrainedLineages) {
return INSTANCE.apply(fineGrainedLineages);
}
public List<com.linkedin.datahub.graphql.generated.FineGrainedLineage> apply(@Nonnull final FineGrainedLineageArray fineGrainedLineages) {
final List<com.linkedin.datahub.graphql.generated.FineGrainedLineage> result = new ArrayList<>();
if (fineGrainedLineages.size() == 0) {
return result;
}
for (com.linkedin.dataset.FineGrainedLineage fineGrainedLineage : fineGrainedLineages) {
com.linkedin.datahub.graphql.generated.FineGrainedLineage resultEntry = new com.linkedin.datahub.graphql.generated.FineGrainedLineage();
if (fineGrainedLineage.hasUpstreams()) {
resultEntry.setUpstreams(fineGrainedLineage.getUpstreams().stream()
.filter(entry -> entry.getEntityType().equals(SCHEMA_FIELD_ENTITY_NAME))
.map(FineGrainedLineagesMapper::mapDatasetSchemaField).collect(
Collectors.toList()));
}
if (fineGrainedLineage.hasDownstreams()) {
resultEntry.setDownstreams(fineGrainedLineage.getDownstreams().stream()
.filter(entry -> entry.getEntityType().equals(SCHEMA_FIELD_ENTITY_NAME))
.map(FineGrainedLineagesMapper::mapDatasetSchemaField).collect(
Collectors.toList()));
}
result.add(resultEntry);
}
return result;
}
private static SchemaFieldRef mapDatasetSchemaField(final Urn schemaFieldUrn) {
return new SchemaFieldRef(schemaFieldUrn.getEntityKey().get(0), schemaFieldUrn.getEntityKey().get(1));
}
}

View File

@ -1,11 +1,7 @@
package com.linkedin.datahub.graphql.types.common.mappers;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.generated.SchemaFieldRef;
import com.linkedin.dataset.FineGrainedLineage;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
@ -23,31 +19,10 @@ public class UpstreamLineagesMapper {
}
public List<com.linkedin.datahub.graphql.generated.FineGrainedLineage> apply(@Nonnull final com.linkedin.dataset.UpstreamLineage upstreamLineage) {
final List<com.linkedin.datahub.graphql.generated.FineGrainedLineage> result = new ArrayList<>();
if (!upstreamLineage.hasFineGrainedLineages()) {
return result;
if (!upstreamLineage.hasFineGrainedLineages() || upstreamLineage.getFineGrainedLineages() == null) {
return new ArrayList<>();
}
for (FineGrainedLineage fineGrainedLineage : upstreamLineage.getFineGrainedLineages()) {
com.linkedin.datahub.graphql.generated.FineGrainedLineage resultEntry = new com.linkedin.datahub.graphql.generated.FineGrainedLineage();
if (fineGrainedLineage.hasUpstreams()) {
resultEntry.setUpstreams(fineGrainedLineage.getUpstreams().stream()
.filter(entry -> entry.getEntityType().equals("schemaField"))
.map(entry -> mapDatasetSchemaField(entry)).collect(
Collectors.toList()));
}
if (fineGrainedLineage.hasDownstreams()) {
resultEntry.setDownstreams(fineGrainedLineage.getDownstreams().stream()
.filter(entry -> entry.getEntityType().equals("schemaField"))
.map(entry -> mapDatasetSchemaField(entry)).collect(
Collectors.toList()));
}
result.add(resultEntry);
}
return result;
}
private static SchemaFieldRef mapDatasetSchemaField(final Urn schemaFieldUrn) {
return new SchemaFieldRef(schemaFieldUrn.getEntityKey().get(0), schemaFieldUrn.getEntityKey().get(1));
return FineGrainedLineagesMapper.map(upstreamLineage.getFineGrainedLineages());
}
}

View File

@ -22,6 +22,7 @@ import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.types.common.mappers.BrowsePathsV2Mapper;
import com.linkedin.datahub.graphql.types.common.mappers.DataPlatformInstanceAspectMapper;
import com.linkedin.datahub.graphql.types.common.mappers.DeprecationMapper;
import com.linkedin.datahub.graphql.types.common.mappers.FineGrainedLineagesMapper;
import com.linkedin.datahub.graphql.types.common.mappers.InstitutionalMemoryMapper;
import com.linkedin.datahub.graphql.types.common.mappers.OwnershipMapper;
import com.linkedin.datahub.graphql.types.common.mappers.StatusMapper;
@ -170,6 +171,10 @@ public class DataJobMapper implements ModelMapper<EntityResponse, DataJob> {
result.setInputDatajobs(ImmutableList.of());
}
if (inputOutput.hasFineGrainedLineages() && inputOutput.getFineGrainedLineages() != null) {
result.setFineGrainedLineages(FineGrainedLineagesMapper.map(inputOutput.getFineGrainedLineages()));
}
return result;
}
}

View File

@ -1382,7 +1382,9 @@ type Dataset implements EntityWithRelationships & Entity & BrowsableEntity {
siblings: SiblingProperties
"""
fine grained lineage
Lineage information for the column-level. Includes a list of objects
detailing which columns are upstream and which are downstream of each other.
The upstream and downstream columns are from datasets.
"""
fineGrainedLineages: [FineGrainedLineage!]
@ -5648,11 +5650,9 @@ type DataJob implements EntityWithRelationships & Entity & BrowsableEntity {
info: DataJobInfo @deprecated
"""
Deprecated, use relationship Produces, Consumes, DownstreamOf instead
Information about the inputs and outputs of a Data processing job
Information about the inputs and outputs of a Data processing job including column-level lineage.
"""
inputOutput: DataJobInputOutput @deprecated
inputOutput: DataJobInputOutput
"""
Deprecated, use the tags field instead
@ -5877,6 +5877,13 @@ type DataJobInputOutput {
Input datajobs that this data job depends on
"""
inputDatajobs: [DataJob!] @deprecated
"""
Lineage information for the column-level. Includes a list of objects
detailing which columns are upstream and which are downstream of each other.
The upstream and downstream columns are from datasets.
"""
fineGrainedLineages: [FineGrainedLineage!]
}
"""

View File

@ -36,6 +36,7 @@ import {
Embed,
FabricType,
BrowsePathV2,
DataJobInputOutput,
} from '../../../types.generated';
import { FetchedEntity } from '../../lineage/types';
@ -109,6 +110,7 @@ export type GenericEntityProperties = {
exists?: boolean;
origin?: Maybe<FabricType>;
browsePathV2?: Maybe<BrowsePathV2>;
inputOutput?: Maybe<DataJobInputOutput>;
};
export type GenericEntityUpdate = {

View File

@ -153,7 +153,9 @@ export function getFineGrainedLineageWithSiblings(
entityData: GenericEntityProperties | null,
getGenericEntityProperties: (type: EntityType, data: Entity) => GenericEntityProperties | null,
) {
const fineGrainedLineages = [...(entityData?.fineGrainedLineages || [])];
const fineGrainedLineages = [
...(entityData?.fineGrainedLineages || entityData?.inputOutput?.fineGrainedLineages || []),
];
entityData?.siblings?.siblings?.forEach((sibling) => {
if (sibling) {
const genericSiblingProps = getGenericEntityProperties(sibling.type, sibling);

View File

@ -51,7 +51,7 @@ export type FetchedEntity = {
platform?: DataPlatform;
status?: Maybe<Status>;
siblingPlatforms?: Maybe<DataPlatform[]>;
fineGrainedLineages?: [FineGrainedLineage];
fineGrainedLineages?: FineGrainedLineage[];
siblings?: Maybe<SiblingProperties>;
schemaMetadata?: SchemaMetadata;
inputFields?: InputFields;

View File

@ -2,8 +2,12 @@ import {
decodeSchemaField,
encodeSchemaField,
getFieldPathFromSchemaFieldUrn,
getPopulatedColumnsByUrn,
getSourceUrnFromSchemaFieldUrn,
} from '../columnLineageUtils';
import { dataJob1, dataset1, dataset2 } from '../../../../Mocks';
import { FetchedEntity } from '../../types';
import { FineGrainedLineage, SchemaFieldDataType } from '../../../../types.generated';
describe('getSourceUrnFromSchemaFieldUrn', () => {
it('should get the source urn for a chart schemaField', () => {
@ -82,3 +86,43 @@ describe('encodeSchemaField', () => {
expect(decodedSchemaField).toBe(schemaField);
});
});
describe('getPopulatedColumnsByUrn', () => {
it('should update columns by urn with data job fine grained data so that the data job appears to have the upstream columns', () => {
const dataJobWithCLL = {
...dataJob1,
name: '',
fineGrainedLineages: [
{
upstreams: [{ urn: dataset1.urn, path: 'test1' }],
downstreams: [{ urn: dataset2.urn, path: 'test2' }],
},
{
upstreams: [{ urn: dataset1.urn, path: 'test3' }],
downstreams: [{ urn: dataset2.urn, path: 'test4' }],
},
] as FineGrainedLineage[],
};
const fetchedEntities = {
[dataJobWithCLL.urn]: dataJobWithCLL as FetchedEntity,
};
const columnsByUrn = getPopulatedColumnsByUrn({}, fetchedEntities);
expect(columnsByUrn).toMatchObject({
[dataJobWithCLL.urn]: [
{
fieldPath: 'test1',
nullable: false,
recursive: false,
type: SchemaFieldDataType.String,
},
{
fieldPath: 'test3',
nullable: false,
recursive: false,
type: SchemaFieldDataType.String,
},
],
});
});
});

View File

@ -0,0 +1,39 @@
import { dataJob1, dataset1, dataset2 } from '../../../../Mocks';
import { FetchedEntity } from '../../types';
import { FineGrainedLineage } from '../../../../types.generated';
import { extendColumnLineage } from '../extendAsyncEntities';
describe('extendColumnLineage', () => {
it('should update fineGrainedMap to draw lines from downstream and upstream datasets with a datajob in between', () => {
const dataJobWithCLL = {
...dataJob1,
name: '',
fineGrainedLineages: [
{
upstreams: [{ urn: dataset1.urn, path: 'test1' }],
downstreams: [{ urn: dataset2.urn, path: 'test2' }],
},
{
upstreams: [{ urn: dataset1.urn, path: 'test3' }],
downstreams: [{ urn: dataset2.urn, path: 'test4' }],
},
] as FineGrainedLineage[],
};
const fetchedEntities = {
[dataJobWithCLL.urn]: dataJobWithCLL as FetchedEntity,
};
const fineGrainedMap = { forward: {}, reverse: {} };
extendColumnLineage(dataJobWithCLL, fineGrainedMap, {}, fetchedEntities);
expect(fineGrainedMap).toMatchObject({
forward: {
[dataJob1.urn]: { test1: { [dataset2.urn]: ['test2'] }, test3: { [dataset2.urn]: ['test4'] } },
[dataset1.urn]: { test1: { [dataJob1.urn]: ['test1'] }, test3: { [dataJob1.urn]: ['test3'] } },
},
reverse: {
[dataJob1.urn]: { test1: { [dataset1.urn]: ['test1'] }, test3: { [dataset1.urn]: ['test3'] } },
[dataset2.urn]: { test4: { [dataJob1.urn]: ['test3'] }, test2: { [dataJob1.urn]: ['test1'] } },
},
});
});
});

View File

@ -1,5 +1,5 @@
import { ColumnEdge, FetchedEntity, NodeData } from '../types';
import { InputFields, SchemaField } from '../../../types.generated';
import { EntityType, InputFields, SchemaField, SchemaFieldDataType } from '../../../types.generated';
import { downgradeV2FieldPath } from '../../entity/dataset/profile/schema/utils/utils';
export function getHighlightedColumnsForNode(highlightedEdges: ColumnEdge[], fields: SchemaField[], nodeUrn: string) {
@ -63,10 +63,15 @@ export function convertInputFieldsToSchemaFields(inputFields?: InputFields) {
return inputFields?.fields?.map((field) => field?.schemaField) as SchemaField[] | undefined;
}
export function populateColumnsByUrn(
/*
* Populate a columnsByUrn map with a list of columns per entity in the order that they will appear.
* We need columnsByUrn in order to ensure that an entity does have a column that lineage data is
* pointing to and to know where to draw column arrows in and out of the entity. DataJobs won't show columns
* underneath them, but we need this populated for validating that this column "exists" on the entity.
*/
export function getPopulatedColumnsByUrn(
columnsByUrn: Record<string, SchemaField[]>,
fetchedEntities: { [x: string]: FetchedEntity },
setColumnsByUrn: (colsByUrn: Record<string, SchemaField[]>) => void,
) {
let populatedColumnsByUrn = { ...columnsByUrn };
Object.entries(fetchedEntities).forEach(([urn, fetchedEntity]) => {
@ -82,9 +87,35 @@ export function populateColumnsByUrn(
convertInputFieldsToSchemaFields(fetchedEntity.inputFields) as SchemaField[],
),
};
} else if (fetchedEntity.type === EntityType.DataJob && fetchedEntity.fineGrainedLineages) {
// Add upstream fields from fineGrainedLineage onto DataJob to mimic upstream dataset fields.
// DataJobs will virtually "have" these fields so we can draw full column paths
// from upstream dataset fields to downstream dataset fields.
const fields: SchemaField[] = [];
fetchedEntity.fineGrainedLineages.forEach((fineGrainedLineage) => {
fineGrainedLineage.upstreams?.forEach((upstream) => {
if (!fields.some((field) => field.fieldPath === upstream.path)) {
fields.push({
fieldPath: downgradeV2FieldPath(upstream.path) || '',
nullable: false,
recursive: false,
type: SchemaFieldDataType.String,
});
}
});
});
populatedColumnsByUrn = { ...populatedColumnsByUrn, [urn]: fields };
}
});
setColumnsByUrn(populatedColumnsByUrn);
return populatedColumnsByUrn;
}
export function populateColumnsByUrn(
columnsByUrn: Record<string, SchemaField[]>,
fetchedEntities: { [x: string]: FetchedEntity },
setColumnsByUrn: (colsByUrn: Record<string, SchemaField[]>) => void,
) {
setColumnsByUrn(getPopulatedColumnsByUrn(columnsByUrn, fetchedEntities));
}
export function haveDisplayedFieldsChanged(displayedFields: SchemaField[], previousDisplayedFields?: SchemaField[]) {

View File

@ -1,4 +1,4 @@
import { SchemaFieldRef } from '../../../types.generated';
import { EntityType, SchemaFieldRef } from '../../../types.generated';
import EntityRegistry from '../../entity/EntityRegistry';
import { EntityAndType, FetchedEntities, FetchedEntity } from '../types';
import {
@ -54,7 +54,7 @@ function updateFineGrainedMap(
mapForFieldReverse[upstreamEntityUrn] = listForDownstreamReverse;
}
function extendColumnLineage(
export function extendColumnLineage(
lineageVizConfig: FetchedEntity,
fineGrainedMap: any,
fineGrainedMapForSiblings: any,
@ -64,18 +64,42 @@ function extendColumnLineage(
lineageVizConfig.fineGrainedLineages.forEach((fineGrainedLineage) => {
fineGrainedLineage.upstreams?.forEach((upstream) => {
const [upstreamEntityUrn, upstreamField] = breakFieldUrn(upstream);
fineGrainedLineage.downstreams?.forEach((downstream) => {
const downstreamField = breakFieldUrn(downstream)[1];
// fineGrainedLineage always belongs on the downstream urn with upstreams pointing to another entity
// pass in the visualized node's urn and not the urn from the schema field as the downstream urn,
// as they will either be the same or if they are different, it belongs to a "hidden" sibling
if (lineageVizConfig.type === EntityType.DataJob) {
// draw a line from upstream dataset field to datajob
updateFineGrainedMap(
fineGrainedMap,
upstreamEntityUrn,
upstreamField,
lineageVizConfig.urn,
downstreamField,
upstreamField,
);
}
fineGrainedLineage.downstreams?.forEach((downstream) => {
const [downstreamEntityUrn, downstreamField] = breakFieldUrn(downstream);
if (lineageVizConfig.type === EntityType.DataJob) {
// draw line from datajob upstream field to downstream fields
updateFineGrainedMap(
fineGrainedMap,
lineageVizConfig.urn,
upstreamField,
downstreamEntityUrn,
downstreamField,
);
} else {
// fineGrainedLineage always belongs on the downstream urn with upstreams pointing to another entity
// pass in the visualized node's urn and not the urn from the schema field as the downstream urn,
// as they will either be the same or if they are different, it belongs to a "hidden" sibling
updateFineGrainedMap(
fineGrainedMap,
upstreamEntityUrn,
upstreamField,
lineageVizConfig.urn,
downstreamField,
);
}
// upstreamEntityUrn could belong to a sibling we don't "render", so store its inputs to updateFineGrainedMap
// and update the fine grained map later when we see the entity with these siblings

View File

@ -1,4 +1,4 @@
import { SchemaField } from '../../../types.generated';
import { EntityType, SchemaField } from '../../../types.generated';
import {
COLUMN_HEIGHT,
CURVE_PADDING,
@ -203,9 +203,12 @@ function drawColumnEdge({
visibleColumnsByUrn,
}: DrawColumnEdgeProps) {
const targetFieldIndex = targetFields.findIndex((candidate) => candidate.fieldPath === targetField) || 0;
const targetFieldY = targetNode?.y || 0 + 1;
const targetFieldY = targetNode?.y || 0 + 3;
let targetFieldX = (targetNode?.x || 0) + 35 + targetTitleHeight;
if (!collapsedColumnsNodes[targetNode?.data.urn || 'no-op']) {
// if currentNode is a dataJob, draw line to center of data job
if (targetNode?.data.type === EntityType.DataJob) {
targetFieldX = targetNode?.x || 0;
} else if (!collapsedColumnsNodes[targetNode?.data.urn || 'no-op']) {
if (!visibleColumnsByUrn[targetUrn]?.has(targetField)) {
targetFieldX =
(targetNode?.x || 0) +
@ -292,7 +295,10 @@ function layoutColumnTree(
const sourceFieldY = currentNode?.y || 0 + 1;
let sourceFieldX = (currentNode?.x || 0) + 30 + sourceTitleHeight;
if (!collapsedColumnsNodes[currentNode?.data.urn || 'no-op']) {
// if currentNode is a dataJob, draw line from center of data job
if (currentNode?.data.type === EntityType.DataJob) {
sourceFieldX = currentNode?.x || 0;
} else if (!collapsedColumnsNodes[currentNode?.data.urn || 'no-op']) {
if (!visibleColumnsByUrn[entityUrn]?.has(sourceField)) {
sourceFieldX =
(currentNode?.x || 0) +

View File

@ -42,6 +42,18 @@ fragment lineageNodeProperties on EntityWithRelationships {
status {
removed
}
inputOutput {
fineGrainedLineages {
upstreams {
urn
path
}
downstreams {
urn
path
}
}
}
}
... on DataFlow {
orchestrator