From 1343082535c829f93eab32d256b58e5693123dcf Mon Sep 17 00:00:00 2001 From: Chris Collins Date: Sun, 25 Jun 2023 13:41:22 -0400 Subject: [PATCH] feat(ui) Add ability to view CLL through DataJobs in lineage visualization (#8281) --- .../mappers/FineGrainedLineagesMapper.java | 53 +++++++++++++++++++ .../mappers/UpstreamLineagesMapper.java | 31 ++--------- .../types/datajob/mappers/DataJobMapper.java | 5 ++ .../src/main/resources/entity.graphql | 17 ++++-- .../src/app/entity/shared/types.ts | 2 + .../src/app/entity/shared/utils.ts | 4 +- datahub-web-react/src/app/lineage/types.ts | 2 +- .../__tests__/columnLineageUtils.test.tsx | 44 +++++++++++++++ .../__tests__/extendAsyncEntities.test.ts | 39 ++++++++++++++ .../app/lineage/utils/columnLineageUtils.ts | 39 ++++++++++++-- .../app/lineage/utils/extendAsyncEntities.ts | 40 +++++++++++--- .../src/app/lineage/utils/layoutTree.ts | 14 +++-- datahub-web-react/src/graphql/lineage.graphql | 12 +++++ 13 files changed, 251 insertions(+), 51 deletions(-) create mode 100644 datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/common/mappers/FineGrainedLineagesMapper.java create mode 100644 datahub-web-react/src/app/lineage/utils/__tests__/extendAsyncEntities.test.ts diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/common/mappers/FineGrainedLineagesMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/common/mappers/FineGrainedLineagesMapper.java new file mode 100644 index 0000000000..9f4517c89a --- /dev/null +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/common/mappers/FineGrainedLineagesMapper.java @@ -0,0 +1,53 @@ +package com.linkedin.datahub.graphql.types.common.mappers; + +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.graphql.generated.FineGrainedLineage; +import com.linkedin.datahub.graphql.generated.SchemaFieldRef; +import com.linkedin.dataset.FineGrainedLineageArray; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static com.linkedin.metadata.Constants.SCHEMA_FIELD_ENTITY_NAME; + +public class FineGrainedLineagesMapper { + + public static final FineGrainedLineagesMapper INSTANCE = new FineGrainedLineagesMapper(); + + public static List map(@Nonnull final FineGrainedLineageArray fineGrainedLineages) { + return INSTANCE.apply(fineGrainedLineages); + } + + public List apply(@Nonnull final FineGrainedLineageArray fineGrainedLineages) { + final List result = new ArrayList<>(); + if (fineGrainedLineages.size() == 0) { + return result; + } + + for (com.linkedin.dataset.FineGrainedLineage fineGrainedLineage : fineGrainedLineages) { + com.linkedin.datahub.graphql.generated.FineGrainedLineage resultEntry = new com.linkedin.datahub.graphql.generated.FineGrainedLineage(); + if (fineGrainedLineage.hasUpstreams()) { + resultEntry.setUpstreams(fineGrainedLineage.getUpstreams().stream() + .filter(entry -> entry.getEntityType().equals(SCHEMA_FIELD_ENTITY_NAME)) + .map(FineGrainedLineagesMapper::mapDatasetSchemaField).collect( + Collectors.toList())); + } + if (fineGrainedLineage.hasDownstreams()) { + resultEntry.setDownstreams(fineGrainedLineage.getDownstreams().stream() + .filter(entry -> entry.getEntityType().equals(SCHEMA_FIELD_ENTITY_NAME)) + .map(FineGrainedLineagesMapper::mapDatasetSchemaField).collect( + Collectors.toList())); + } + result.add(resultEntry); + } + return result; + } + + private static SchemaFieldRef mapDatasetSchemaField(final Urn schemaFieldUrn) { + return new SchemaFieldRef(schemaFieldUrn.getEntityKey().get(0), schemaFieldUrn.getEntityKey().get(1)); + } +} + + diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/common/mappers/UpstreamLineagesMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/common/mappers/UpstreamLineagesMapper.java index 40f0ca90b0..8359f1ec86 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/common/mappers/UpstreamLineagesMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/common/mappers/UpstreamLineagesMapper.java @@ -1,11 +1,7 @@ package com.linkedin.datahub.graphql.types.common.mappers; -import com.linkedin.common.urn.Urn; -import com.linkedin.datahub.graphql.generated.SchemaFieldRef; -import com.linkedin.dataset.FineGrainedLineage; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -23,31 +19,10 @@ public class UpstreamLineagesMapper { } public List apply(@Nonnull final com.linkedin.dataset.UpstreamLineage upstreamLineage) { - final List result = new ArrayList<>(); - if (!upstreamLineage.hasFineGrainedLineages()) { - return result; + if (!upstreamLineage.hasFineGrainedLineages() || upstreamLineage.getFineGrainedLineages() == null) { + return new ArrayList<>(); } - for (FineGrainedLineage fineGrainedLineage : upstreamLineage.getFineGrainedLineages()) { - com.linkedin.datahub.graphql.generated.FineGrainedLineage resultEntry = new com.linkedin.datahub.graphql.generated.FineGrainedLineage(); - if (fineGrainedLineage.hasUpstreams()) { - resultEntry.setUpstreams(fineGrainedLineage.getUpstreams().stream() - .filter(entry -> entry.getEntityType().equals("schemaField")) - .map(entry -> mapDatasetSchemaField(entry)).collect( - Collectors.toList())); - } - if (fineGrainedLineage.hasDownstreams()) { - resultEntry.setDownstreams(fineGrainedLineage.getDownstreams().stream() - .filter(entry -> entry.getEntityType().equals("schemaField")) - .map(entry -> mapDatasetSchemaField(entry)).collect( - Collectors.toList())); - } - result.add(resultEntry); - } - return result; - } - - private static SchemaFieldRef mapDatasetSchemaField(final Urn schemaFieldUrn) { - return new SchemaFieldRef(schemaFieldUrn.getEntityKey().get(0), schemaFieldUrn.getEntityKey().get(1)); + return FineGrainedLineagesMapper.map(upstreamLineage.getFineGrainedLineages()); } } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapper.java index e592ce2cdc..88fa137c96 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapper.java @@ -22,6 +22,7 @@ import com.linkedin.datahub.graphql.generated.EntityType; import com.linkedin.datahub.graphql.types.common.mappers.BrowsePathsV2Mapper; import com.linkedin.datahub.graphql.types.common.mappers.DataPlatformInstanceAspectMapper; import com.linkedin.datahub.graphql.types.common.mappers.DeprecationMapper; +import com.linkedin.datahub.graphql.types.common.mappers.FineGrainedLineagesMapper; import com.linkedin.datahub.graphql.types.common.mappers.InstitutionalMemoryMapper; import com.linkedin.datahub.graphql.types.common.mappers.OwnershipMapper; import com.linkedin.datahub.graphql.types.common.mappers.StatusMapper; @@ -170,6 +171,10 @@ public class DataJobMapper implements ModelMapper { result.setInputDatajobs(ImmutableList.of()); } + if (inputOutput.hasFineGrainedLineages() && inputOutput.getFineGrainedLineages() != null) { + result.setFineGrainedLineages(FineGrainedLineagesMapper.map(inputOutput.getFineGrainedLineages())); + } + return result; } } diff --git a/datahub-graphql-core/src/main/resources/entity.graphql b/datahub-graphql-core/src/main/resources/entity.graphql index 2dff8c0f27..18fb524746 100644 --- a/datahub-graphql-core/src/main/resources/entity.graphql +++ b/datahub-graphql-core/src/main/resources/entity.graphql @@ -1382,7 +1382,9 @@ type Dataset implements EntityWithRelationships & Entity & BrowsableEntity { siblings: SiblingProperties """ - fine grained lineage + Lineage information for the column-level. Includes a list of objects + detailing which columns are upstream and which are downstream of each other. + The upstream and downstream columns are from datasets. """ fineGrainedLineages: [FineGrainedLineage!] @@ -5648,11 +5650,9 @@ type DataJob implements EntityWithRelationships & Entity & BrowsableEntity { info: DataJobInfo @deprecated """ - Deprecated, use relationship Produces, Consumes, DownstreamOf instead - Information about the inputs and outputs of a Data processing job + Information about the inputs and outputs of a Data processing job including column-level lineage. """ - inputOutput: DataJobInputOutput @deprecated - + inputOutput: DataJobInputOutput """ Deprecated, use the tags field instead @@ -5877,6 +5877,13 @@ type DataJobInputOutput { Input datajobs that this data job depends on """ inputDatajobs: [DataJob!] @deprecated + + """ + Lineage information for the column-level. Includes a list of objects + detailing which columns are upstream and which are downstream of each other. + The upstream and downstream columns are from datasets. + """ + fineGrainedLineages: [FineGrainedLineage!] } """ diff --git a/datahub-web-react/src/app/entity/shared/types.ts b/datahub-web-react/src/app/entity/shared/types.ts index d1fbfe41fc..9b7c794840 100644 --- a/datahub-web-react/src/app/entity/shared/types.ts +++ b/datahub-web-react/src/app/entity/shared/types.ts @@ -36,6 +36,7 @@ import { Embed, FabricType, BrowsePathV2, + DataJobInputOutput, } from '../../../types.generated'; import { FetchedEntity } from '../../lineage/types'; @@ -109,6 +110,7 @@ export type GenericEntityProperties = { exists?: boolean; origin?: Maybe; browsePathV2?: Maybe; + inputOutput?: Maybe; }; export type GenericEntityUpdate = { diff --git a/datahub-web-react/src/app/entity/shared/utils.ts b/datahub-web-react/src/app/entity/shared/utils.ts index 2180e36d41..7ec604785d 100644 --- a/datahub-web-react/src/app/entity/shared/utils.ts +++ b/datahub-web-react/src/app/entity/shared/utils.ts @@ -153,7 +153,9 @@ export function getFineGrainedLineageWithSiblings( entityData: GenericEntityProperties | null, getGenericEntityProperties: (type: EntityType, data: Entity) => GenericEntityProperties | null, ) { - const fineGrainedLineages = [...(entityData?.fineGrainedLineages || [])]; + const fineGrainedLineages = [ + ...(entityData?.fineGrainedLineages || entityData?.inputOutput?.fineGrainedLineages || []), + ]; entityData?.siblings?.siblings?.forEach((sibling) => { if (sibling) { const genericSiblingProps = getGenericEntityProperties(sibling.type, sibling); diff --git a/datahub-web-react/src/app/lineage/types.ts b/datahub-web-react/src/app/lineage/types.ts index e5f932de9a..4c2c88d1c0 100644 --- a/datahub-web-react/src/app/lineage/types.ts +++ b/datahub-web-react/src/app/lineage/types.ts @@ -51,7 +51,7 @@ export type FetchedEntity = { platform?: DataPlatform; status?: Maybe; siblingPlatforms?: Maybe; - fineGrainedLineages?: [FineGrainedLineage]; + fineGrainedLineages?: FineGrainedLineage[]; siblings?: Maybe; schemaMetadata?: SchemaMetadata; inputFields?: InputFields; diff --git a/datahub-web-react/src/app/lineage/utils/__tests__/columnLineageUtils.test.tsx b/datahub-web-react/src/app/lineage/utils/__tests__/columnLineageUtils.test.tsx index 0b3d6886ec..cd0a5f1385 100644 --- a/datahub-web-react/src/app/lineage/utils/__tests__/columnLineageUtils.test.tsx +++ b/datahub-web-react/src/app/lineage/utils/__tests__/columnLineageUtils.test.tsx @@ -2,8 +2,12 @@ import { decodeSchemaField, encodeSchemaField, getFieldPathFromSchemaFieldUrn, + getPopulatedColumnsByUrn, getSourceUrnFromSchemaFieldUrn, } from '../columnLineageUtils'; +import { dataJob1, dataset1, dataset2 } from '../../../../Mocks'; +import { FetchedEntity } from '../../types'; +import { FineGrainedLineage, SchemaFieldDataType } from '../../../../types.generated'; describe('getSourceUrnFromSchemaFieldUrn', () => { it('should get the source urn for a chart schemaField', () => { @@ -82,3 +86,43 @@ describe('encodeSchemaField', () => { expect(decodedSchemaField).toBe(schemaField); }); }); + +describe('getPopulatedColumnsByUrn', () => { + it('should update columns by urn with data job fine grained data so that the data job appears to have the upstream columns', () => { + const dataJobWithCLL = { + ...dataJob1, + name: '', + fineGrainedLineages: [ + { + upstreams: [{ urn: dataset1.urn, path: 'test1' }], + downstreams: [{ urn: dataset2.urn, path: 'test2' }], + }, + { + upstreams: [{ urn: dataset1.urn, path: 'test3' }], + downstreams: [{ urn: dataset2.urn, path: 'test4' }], + }, + ] as FineGrainedLineage[], + }; + const fetchedEntities = { + [dataJobWithCLL.urn]: dataJobWithCLL as FetchedEntity, + }; + const columnsByUrn = getPopulatedColumnsByUrn({}, fetchedEntities); + + expect(columnsByUrn).toMatchObject({ + [dataJobWithCLL.urn]: [ + { + fieldPath: 'test1', + nullable: false, + recursive: false, + type: SchemaFieldDataType.String, + }, + { + fieldPath: 'test3', + nullable: false, + recursive: false, + type: SchemaFieldDataType.String, + }, + ], + }); + }); +}); diff --git a/datahub-web-react/src/app/lineage/utils/__tests__/extendAsyncEntities.test.ts b/datahub-web-react/src/app/lineage/utils/__tests__/extendAsyncEntities.test.ts new file mode 100644 index 0000000000..ad28bccbbd --- /dev/null +++ b/datahub-web-react/src/app/lineage/utils/__tests__/extendAsyncEntities.test.ts @@ -0,0 +1,39 @@ +import { dataJob1, dataset1, dataset2 } from '../../../../Mocks'; +import { FetchedEntity } from '../../types'; +import { FineGrainedLineage } from '../../../../types.generated'; +import { extendColumnLineage } from '../extendAsyncEntities'; + +describe('extendColumnLineage', () => { + it('should update fineGrainedMap to draw lines from downstream and upstream datasets with a datajob in between', () => { + const dataJobWithCLL = { + ...dataJob1, + name: '', + fineGrainedLineages: [ + { + upstreams: [{ urn: dataset1.urn, path: 'test1' }], + downstreams: [{ urn: dataset2.urn, path: 'test2' }], + }, + { + upstreams: [{ urn: dataset1.urn, path: 'test3' }], + downstreams: [{ urn: dataset2.urn, path: 'test4' }], + }, + ] as FineGrainedLineage[], + }; + const fetchedEntities = { + [dataJobWithCLL.urn]: dataJobWithCLL as FetchedEntity, + }; + const fineGrainedMap = { forward: {}, reverse: {} }; + extendColumnLineage(dataJobWithCLL, fineGrainedMap, {}, fetchedEntities); + + expect(fineGrainedMap).toMatchObject({ + forward: { + [dataJob1.urn]: { test1: { [dataset2.urn]: ['test2'] }, test3: { [dataset2.urn]: ['test4'] } }, + [dataset1.urn]: { test1: { [dataJob1.urn]: ['test1'] }, test3: { [dataJob1.urn]: ['test3'] } }, + }, + reverse: { + [dataJob1.urn]: { test1: { [dataset1.urn]: ['test1'] }, test3: { [dataset1.urn]: ['test3'] } }, + [dataset2.urn]: { test4: { [dataJob1.urn]: ['test3'] }, test2: { [dataJob1.urn]: ['test1'] } }, + }, + }); + }); +}); diff --git a/datahub-web-react/src/app/lineage/utils/columnLineageUtils.ts b/datahub-web-react/src/app/lineage/utils/columnLineageUtils.ts index 29437395ca..505b3d9453 100644 --- a/datahub-web-react/src/app/lineage/utils/columnLineageUtils.ts +++ b/datahub-web-react/src/app/lineage/utils/columnLineageUtils.ts @@ -1,5 +1,5 @@ import { ColumnEdge, FetchedEntity, NodeData } from '../types'; -import { InputFields, SchemaField } from '../../../types.generated'; +import { EntityType, InputFields, SchemaField, SchemaFieldDataType } from '../../../types.generated'; import { downgradeV2FieldPath } from '../../entity/dataset/profile/schema/utils/utils'; export function getHighlightedColumnsForNode(highlightedEdges: ColumnEdge[], fields: SchemaField[], nodeUrn: string) { @@ -63,10 +63,15 @@ export function convertInputFieldsToSchemaFields(inputFields?: InputFields) { return inputFields?.fields?.map((field) => field?.schemaField) as SchemaField[] | undefined; } -export function populateColumnsByUrn( +/* + * Populate a columnsByUrn map with a list of columns per entity in the order that they will appear. + * We need columnsByUrn in order to ensure that an entity does have a column that lineage data is + * pointing to and to know where to draw column arrows in and out of the entity. DataJobs won't show columns + * underneath them, but we need this populated for validating that this column "exists" on the entity. + */ +export function getPopulatedColumnsByUrn( columnsByUrn: Record, fetchedEntities: { [x: string]: FetchedEntity }, - setColumnsByUrn: (colsByUrn: Record) => void, ) { let populatedColumnsByUrn = { ...columnsByUrn }; Object.entries(fetchedEntities).forEach(([urn, fetchedEntity]) => { @@ -82,9 +87,35 @@ export function populateColumnsByUrn( convertInputFieldsToSchemaFields(fetchedEntity.inputFields) as SchemaField[], ), }; + } else if (fetchedEntity.type === EntityType.DataJob && fetchedEntity.fineGrainedLineages) { + // Add upstream fields from fineGrainedLineage onto DataJob to mimic upstream dataset fields. + // DataJobs will virtually "have" these fields so we can draw full column paths + // from upstream dataset fields to downstream dataset fields. + const fields: SchemaField[] = []; + fetchedEntity.fineGrainedLineages.forEach((fineGrainedLineage) => { + fineGrainedLineage.upstreams?.forEach((upstream) => { + if (!fields.some((field) => field.fieldPath === upstream.path)) { + fields.push({ + fieldPath: downgradeV2FieldPath(upstream.path) || '', + nullable: false, + recursive: false, + type: SchemaFieldDataType.String, + }); + } + }); + }); + populatedColumnsByUrn = { ...populatedColumnsByUrn, [urn]: fields }; } }); - setColumnsByUrn(populatedColumnsByUrn); + return populatedColumnsByUrn; +} + +export function populateColumnsByUrn( + columnsByUrn: Record, + fetchedEntities: { [x: string]: FetchedEntity }, + setColumnsByUrn: (colsByUrn: Record) => void, +) { + setColumnsByUrn(getPopulatedColumnsByUrn(columnsByUrn, fetchedEntities)); } export function haveDisplayedFieldsChanged(displayedFields: SchemaField[], previousDisplayedFields?: SchemaField[]) { diff --git a/datahub-web-react/src/app/lineage/utils/extendAsyncEntities.ts b/datahub-web-react/src/app/lineage/utils/extendAsyncEntities.ts index 34ed875f79..860b5715f3 100644 --- a/datahub-web-react/src/app/lineage/utils/extendAsyncEntities.ts +++ b/datahub-web-react/src/app/lineage/utils/extendAsyncEntities.ts @@ -1,4 +1,4 @@ -import { SchemaFieldRef } from '../../../types.generated'; +import { EntityType, SchemaFieldRef } from '../../../types.generated'; import EntityRegistry from '../../entity/EntityRegistry'; import { EntityAndType, FetchedEntities, FetchedEntity } from '../types'; import { @@ -54,7 +54,7 @@ function updateFineGrainedMap( mapForFieldReverse[upstreamEntityUrn] = listForDownstreamReverse; } -function extendColumnLineage( +export function extendColumnLineage( lineageVizConfig: FetchedEntity, fineGrainedMap: any, fineGrainedMapForSiblings: any, @@ -64,18 +64,42 @@ function extendColumnLineage( lineageVizConfig.fineGrainedLineages.forEach((fineGrainedLineage) => { fineGrainedLineage.upstreams?.forEach((upstream) => { const [upstreamEntityUrn, upstreamField] = breakFieldUrn(upstream); - fineGrainedLineage.downstreams?.forEach((downstream) => { - const downstreamField = breakFieldUrn(downstream)[1]; - // fineGrainedLineage always belongs on the downstream urn with upstreams pointing to another entity - // pass in the visualized node's urn and not the urn from the schema field as the downstream urn, - // as they will either be the same or if they are different, it belongs to a "hidden" sibling + + if (lineageVizConfig.type === EntityType.DataJob) { + // draw a line from upstream dataset field to datajob updateFineGrainedMap( fineGrainedMap, upstreamEntityUrn, upstreamField, lineageVizConfig.urn, - downstreamField, + upstreamField, ); + } + + fineGrainedLineage.downstreams?.forEach((downstream) => { + const [downstreamEntityUrn, downstreamField] = breakFieldUrn(downstream); + + if (lineageVizConfig.type === EntityType.DataJob) { + // draw line from datajob upstream field to downstream fields + updateFineGrainedMap( + fineGrainedMap, + lineageVizConfig.urn, + upstreamField, + downstreamEntityUrn, + downstreamField, + ); + } else { + // fineGrainedLineage always belongs on the downstream urn with upstreams pointing to another entity + // pass in the visualized node's urn and not the urn from the schema field as the downstream urn, + // as they will either be the same or if they are different, it belongs to a "hidden" sibling + updateFineGrainedMap( + fineGrainedMap, + upstreamEntityUrn, + upstreamField, + lineageVizConfig.urn, + downstreamField, + ); + } // upstreamEntityUrn could belong to a sibling we don't "render", so store its inputs to updateFineGrainedMap // and update the fine grained map later when we see the entity with these siblings diff --git a/datahub-web-react/src/app/lineage/utils/layoutTree.ts b/datahub-web-react/src/app/lineage/utils/layoutTree.ts index fa82d89f50..cc70400704 100644 --- a/datahub-web-react/src/app/lineage/utils/layoutTree.ts +++ b/datahub-web-react/src/app/lineage/utils/layoutTree.ts @@ -1,4 +1,4 @@ -import { SchemaField } from '../../../types.generated'; +import { EntityType, SchemaField } from '../../../types.generated'; import { COLUMN_HEIGHT, CURVE_PADDING, @@ -203,9 +203,12 @@ function drawColumnEdge({ visibleColumnsByUrn, }: DrawColumnEdgeProps) { const targetFieldIndex = targetFields.findIndex((candidate) => candidate.fieldPath === targetField) || 0; - const targetFieldY = targetNode?.y || 0 + 1; + const targetFieldY = targetNode?.y || 0 + 3; let targetFieldX = (targetNode?.x || 0) + 35 + targetTitleHeight; - if (!collapsedColumnsNodes[targetNode?.data.urn || 'no-op']) { + // if currentNode is a dataJob, draw line to center of data job + if (targetNode?.data.type === EntityType.DataJob) { + targetFieldX = targetNode?.x || 0; + } else if (!collapsedColumnsNodes[targetNode?.data.urn || 'no-op']) { if (!visibleColumnsByUrn[targetUrn]?.has(targetField)) { targetFieldX = (targetNode?.x || 0) + @@ -292,7 +295,10 @@ function layoutColumnTree( const sourceFieldY = currentNode?.y || 0 + 1; let sourceFieldX = (currentNode?.x || 0) + 30 + sourceTitleHeight; - if (!collapsedColumnsNodes[currentNode?.data.urn || 'no-op']) { + // if currentNode is a dataJob, draw line from center of data job + if (currentNode?.data.type === EntityType.DataJob) { + sourceFieldX = currentNode?.x || 0; + } else if (!collapsedColumnsNodes[currentNode?.data.urn || 'no-op']) { if (!visibleColumnsByUrn[entityUrn]?.has(sourceField)) { sourceFieldX = (currentNode?.x || 0) + diff --git a/datahub-web-react/src/graphql/lineage.graphql b/datahub-web-react/src/graphql/lineage.graphql index d8c07674a9..fa19c57844 100644 --- a/datahub-web-react/src/graphql/lineage.graphql +++ b/datahub-web-react/src/graphql/lineage.graphql @@ -42,6 +42,18 @@ fragment lineageNodeProperties on EntityWithRelationships { status { removed } + inputOutput { + fineGrainedLineages { + upstreams { + urn + path + } + downstreams { + urn + path + } + } + } } ... on DataFlow { orchestrator