feat(lineage) Add column-level impact analysis feature (#6272)

This commit is contained in:
Chris Collins 2022-10-26 16:43:39 -04:00 committed by GitHub
parent 228c10de43
commit cd1331fffe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 989 additions and 109 deletions

View File

@ -44,6 +44,7 @@ import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.DatasetStatsSummary;
import com.linkedin.datahub.graphql.generated.Domain;
import com.linkedin.datahub.graphql.generated.EntityPath;
import com.linkedin.datahub.graphql.generated.EntityRelationship;
import com.linkedin.datahub.graphql.generated.EntityRelationshipLegacy;
import com.linkedin.datahub.graphql.generated.ForeignKeyConstraint;
@ -71,6 +72,7 @@ import com.linkedin.datahub.graphql.generated.Notebook;
import com.linkedin.datahub.graphql.generated.Owner;
import com.linkedin.datahub.graphql.generated.PolicyMatchCriterionValue;
import com.linkedin.datahub.graphql.generated.RecommendationContent;
import com.linkedin.datahub.graphql.generated.SchemaFieldEntity;
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResult;
import com.linkedin.datahub.graphql.generated.SearchResult;
import com.linkedin.datahub.graphql.generated.SiblingProperties;
@ -242,6 +244,7 @@ import com.linkedin.datahub.graphql.types.mlmodel.MLPrimaryKeyType;
import com.linkedin.datahub.graphql.types.notebook.NotebookType;
import com.linkedin.datahub.graphql.types.policy.DataHubPolicyType;
import com.linkedin.datahub.graphql.types.role.DataHubRoleType;
import com.linkedin.datahub.graphql.types.schemafield.SchemaFieldType;
import com.linkedin.datahub.graphql.types.tag.TagType;
import com.linkedin.datahub.graphql.types.test.TestType;
import com.linkedin.entity.client.EntityClient;
@ -352,6 +355,7 @@ public class GmsGraphQLEngine {
private final TestType testType;
private final DataHubPolicyType dataHubPolicyType;
private final DataHubRoleType dataHubRoleType;
private final SchemaFieldType schemaFieldType;
/**
* Configures the graph objects that can be fetched primary key.
@ -449,6 +453,7 @@ public class GmsGraphQLEngine {
this.testType = new TestType(entityClient);
this.dataHubPolicyType = new DataHubPolicyType(entityClient);
this.dataHubRoleType = new DataHubRoleType(entityClient);
this.schemaFieldType = new SchemaFieldType();
// Init Lists
this.entityTypes = ImmutableList.of(
datasetType,
@ -476,7 +481,8 @@ public class GmsGraphQLEngine {
accessTokenMetadataType,
testType,
dataHubPolicyType,
dataHubRoleType
dataHubRoleType,
schemaFieldType
);
this.loadableTypes = new ArrayList<>(entityTypes);
this.ownerTypes = ImmutableList.of(corpUserType, corpGroupType);
@ -535,6 +541,8 @@ public class GmsGraphQLEngine {
configureAccessAccessTokenMetadataResolvers(builder);
configureTestResultResolvers(builder);
configureRoleResolvers(builder);
configureSchemaFieldResolvers(builder);
configureEntityPathResolvers(builder);
}
public GraphQLEngine.Builder builder() {
@ -1008,6 +1016,20 @@ public class GmsGraphQLEngine {
);
}
private void configureSchemaFieldResolvers(final RuntimeWiring.Builder builder) {
builder.type("SchemaFieldEntity", typeWiring -> typeWiring
.dataFetcher("parent", new EntityTypeResolver(entityTypes,
(env) -> ((SchemaFieldEntity) env.getSource()).getParent()))
);
}
private void configureEntityPathResolvers(final RuntimeWiring.Builder builder) {
builder.type("EntityPath", typeWiring -> typeWiring
.dataFetcher("path", new BatchGetEntitiesResolver(entityTypes,
(env) -> ((EntityPath) env.getSource()).getPath()))
);
}
/**
* Configures resolvers responsible for resolving the {@link com.linkedin.datahub.graphql.generated.CorpUser} type.
*/

View File

@ -25,6 +25,7 @@ import com.linkedin.datahub.graphql.generated.MLModel;
import com.linkedin.datahub.graphql.generated.MLModelGroup;
import com.linkedin.datahub.graphql.generated.MLPrimaryKey;
import com.linkedin.datahub.graphql.generated.Notebook;
import com.linkedin.datahub.graphql.generated.SchemaFieldEntity;
import com.linkedin.datahub.graphql.generated.Tag;
import com.linkedin.datahub.graphql.generated.Test;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
@ -163,6 +164,11 @@ public class UrnToEntityMapper implements ModelMapper<com.linkedin.common.urn.Ur
((DataHubPolicy) partialEntity).setUrn(input.toString());
((DataHubPolicy) partialEntity).setType(EntityType.DATAHUB_POLICY);
}
if (input.getEntityType().equals(SCHEMA_FIELD_ENTITY_NAME)) {
partialEntity = new SchemaFieldEntity();
((SchemaFieldEntity) partialEntity).setUrn(input.toString());
((SchemaFieldEntity) partialEntity).setType(EntityType.SCHEMA_FIELD);
}
return partialEntity;
}
}

View File

@ -5,9 +5,12 @@ import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.ForeignKeyConstraint;
import com.linkedin.datahub.graphql.generated.SchemaFieldEntity;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import lombok.extern.slf4j.Slf4j;
import java.util.stream.Collectors;
@Slf4j
public class ForeignKeyConstraintMapper {
private ForeignKeyConstraintMapper() { }
@ -34,7 +37,12 @@ public class ForeignKeyConstraintMapper {
private static SchemaFieldEntity mapSchemaFieldEntity(Urn schemaFieldUrn) {
SchemaFieldEntity result = new SchemaFieldEntity();
result.setParent(schemaFieldUrn.getEntityKey().get(0));
try {
Urn resourceUrn = Urn.createFromString(schemaFieldUrn.getEntityKey().get(0));
result.setParent(UrnToEntityMapper.map(resourceUrn));
} catch (Exception e) {
throw new RuntimeException("Error converting schemaField parent urn string to Urn", e);
}
result.setFieldPath(schemaFieldUrn.getEntityKey().get(1));
return result;
}

View File

@ -1,9 +1,11 @@
package com.linkedin.datahub.graphql.types.mappers;
import com.linkedin.common.UrnArray;
import com.linkedin.data.template.DoubleMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.datahub.graphql.generated.AggregationMetadata;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityPath;
import com.linkedin.datahub.graphql.generated.FacetMetadata;
import com.linkedin.datahub.graphql.generated.MatchedField;
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResult;
@ -46,11 +48,17 @@ public class UrnSearchAcrossLineageResultsMapper<T extends RecordTemplate, E ext
.setEntity(UrnToEntityMapper.map(searchEntity.getEntity()))
.setInsights(getInsightsFromFeatures(searchEntity.getFeatures()))
.setMatchedFields(getMatchedFieldEntry(searchEntity.getMatchedFields()))
.setPath(searchEntity.getPath().stream().map(UrnToEntityMapper::map).collect(Collectors.toList()))
.setPaths(searchEntity.getPaths().stream().map(this::mapPath).collect(Collectors.toList()))
.setDegree(searchEntity.getDegree())
.build();
}
private EntityPath mapPath(UrnArray path) {
EntityPath entityPath = new EntityPath();
entityPath.setPath(path.stream().map(UrnToEntityMapper::map).collect(Collectors.toList()));
return entityPath;
}
private FacetMetadata mapFacet(com.linkedin.metadata.search.AggregationMetadata aggregationMetadata) {
final FacetMetadata facetMetadata = new FacetMetadata();
boolean isEntityTypeFilter = aggregationMetadata.getName().equals("entity");

View File

@ -0,0 +1,71 @@
package com.linkedin.datahub.graphql.types.schemafield;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.SchemaFieldEntity;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import graphql.execution.DataFetcherResult;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
public class SchemaFieldType implements com.linkedin.datahub.graphql.types.EntityType<SchemaFieldEntity, String> {
public SchemaFieldType() { }
@Override
public EntityType type() {
return EntityType.SCHEMA_FIELD;
}
@Override
public Function<Entity, String> getKeyProvider() {
return Entity::getUrn;
}
@Override
public Class<SchemaFieldEntity> objectClass() {
return SchemaFieldEntity.class;
}
@Override
public List<DataFetcherResult<SchemaFieldEntity>> batchLoad(@Nonnull List<String> urns, @Nonnull QueryContext context) throws Exception {
final List<Urn> schemaFieldUrns = urns.stream()
.map(UrnUtils::getUrn)
.collect(Collectors.toList());
try {
return schemaFieldUrns.stream()
.map(this::mapSchemaFieldUrn)
.map(schemaFieldEntity -> DataFetcherResult.<SchemaFieldEntity>newResult()
.data(schemaFieldEntity)
.build()
)
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("Failed to load schemaField entity", e);
}
}
private SchemaFieldEntity mapSchemaFieldUrn(Urn urn) {
try {
SchemaFieldEntity result = new SchemaFieldEntity();
result.setUrn(urn.toString());
result.setType(EntityType.SCHEMA_FIELD);
result.setFieldPath(urn.getEntityKey().get(1));
Urn parentUrn = Urn.createFromString(urn.getEntityKey().get(0));
result.setParent(UrnToEntityMapper.map(parentUrn));
return result;
} catch (Exception e) {
throw new RuntimeException("Failed to load schemaField entity", e);
}
}
}

View File

@ -2343,21 +2343,31 @@ type KeyValueSchema {
Standalone schema field entity. Differs from the SchemaField struct because it is not directly nested inside a
schema field
"""
type SchemaFieldEntity {
type SchemaFieldEntity implements Entity {
"""
Primary key of the schema field
"""
urn: String!
"""
A standard Entity Type
"""
type: EntityType!
"""
Field path identifying the field in its dataset
"""
fieldPath: String!
"""
The primary key of the field's parent.
The field's parent.
"""
parent: String!
parent: Entity!
"""
Granular API for querying edges extending from this entity
"""
relationships(input: RelationshipsInput!): EntityRelationshipsResult
}
"""

View File

@ -318,7 +318,7 @@ type SearchAcrossLineageResult {
"""
Optional list of entities between the source and destination node
"""
path: [Entity!]
paths: [EntityPath]
"""
Degree of relationship (number of hops to get to entity)
@ -326,6 +326,17 @@ type SearchAcrossLineageResult {
degree: Int!
}
"""
An overview of the field that was matched in the entity search document
"""
type EntityPath {
"""
Path of entities between source and destination nodes
"""
path: [Entity]
}
"""
An overview of the field that was matched in the entity search document
"""

View File

@ -204,14 +204,16 @@ export const sampleSchemaWithPkFk: SchemaMetadata = {
sourceFields: [
{
urn: 'datasetUrn',
parent: 'dataset',
type: EntityType.Dataset,
parent: { urn: 'test', type: EntityType.Dataset },
fieldPath: 'shipping_address',
},
],
foreignFields: [
{
urn: dataset3.urn,
parent: dataset3.name,
type: EntityType.Dataset,
parent: { urn: dataset3.name, type: EntityType.Dataset },
fieldPath: 'address',
},
],

View File

@ -31,7 +31,7 @@ export enum EntityMenuItems {
MOVE,
}
const MenuIcon = styled(MoreOutlined)<{ fontSize?: number }>`
export const MenuIcon = styled(MoreOutlined)<{ fontSize?: number }>`
display: flex;
justify-content: center;
align-items: center;

View File

@ -11,7 +11,7 @@ import { SearchFiltersSection } from '../../../../../search/SearchFiltersSection
const SearchBody = styled.div`
height: 100%;
overflow-y: scroll;
overflow-y: auto;
display: flex;
`;
@ -127,6 +127,8 @@ export const EmbeddedListSearchResults = ({
// when we add impact analysis, we will want to pipe the path to each element to the result this
// eslint-disable-next-line @typescript-eslint/dot-notation
degree: searchResult['degree'],
// eslint-disable-next-line @typescript-eslint/dot-notation
paths: searchResult['paths'],
})) || []
}
isSelectMode={isSelectMode}

View File

@ -40,9 +40,10 @@ export const EmbeddedListSearchSection = ({
}: Props) => {
const history = useHistory();
const location = useLocation();
const baseParams = useEntityQueryParams();
const entityQueryParams = useEntityQueryParams();
const params = QueryString.parse(location.search, { arrayFormat: 'comma' });
const baseParams = { ...params, ...entityQueryParams };
const query: string = params?.query as string;
const page: number = params.page && Number(params.page as string) > 0 ? Number(params.page as string) : 1;
const unionType: UnionType = Number(params.unionType as any as UnionType) || UnionType.AND;

View File

@ -30,11 +30,11 @@ export const navigateToEntitySearchUrl = ({
const search = QueryString.stringify(
{
...baseParams,
...filtersToQueryStringParams(constructedFilters),
query: newQuery,
page: newPage,
unionType,
...baseParams,
},
{ arrayFormat: 'comma' },
);

View File

@ -21,6 +21,7 @@ import { SchemaRow } from './components/SchemaRow';
import { FkContext } from './utils/selectedFkContext';
import useSchemaBlameRenderer from './utils/useSchemaBlameRenderer';
import { ANTD_GRAY } from '../../../constants';
import MenuColumn from './components/MenuColumn';
const TableContainer = styled.div`
&&& .ant-table-tbody > tr > .ant-table-cell-with-append {
@ -163,6 +164,14 @@ export default function SchemaTable({
render: usageStatsRenderer,
};
const menuColumn = {
width: '5%',
title: '',
dataIndex: '',
key: 'menu',
render: (field: SchemaField) => <MenuColumn field={field} />,
};
let allColumns: ColumnsType<ExtendedSchemaFields> = [fieldColumn, descriptionColumn, tagColumn, termColumn];
if (hasUsageStats) {
@ -173,6 +182,8 @@ export default function SchemaTable({
allColumns = [...allColumns, blameColumn];
}
allColumns = [...allColumns, menuColumn];
const [expandedRows, setExpandedRows] = useState<Set<string>>(new Set());
useEffect(() => {

View File

@ -0,0 +1,47 @@
import React from 'react';
import { VscGraphLeft } from 'react-icons/vsc';
import styled from 'styled-components/macro';
import { Dropdown, Menu } from 'antd';
import { MenuIcon } from '../../../../EntityDropdown/EntityDropdown';
import { useRouteToTab } from '../../../../EntityContext';
import { SchemaField } from '../../../../../../../types.generated';
export const ImpactAnalysisIcon = styled(VscGraphLeft)`
transform: scaleX(-1);
font-size: 18px;
`;
const MenuItem = styled.div`
align-items: center;
display: flex;
font-size: 12px;
padding: 0 4px;
color: #262626;
`;
interface Props {
field: SchemaField;
}
export default function MenuColumn({ field }: Props) {
const routeToTab = useRouteToTab();
return (
<Dropdown
overlay={
<Menu>
<Menu.Item key="0">
<MenuItem
onClick={() => routeToTab({ tabName: 'Lineage', tabParams: { column: field.fieldPath } })}
>
<ImpactAnalysisIcon /> &nbsp; See Column Lineage
</MenuItem>
</Menu.Item>
</Menu>
}
trigger={['click']}
>
<MenuIcon fontSize={16} />
</Dropdown>
);
}

View File

@ -0,0 +1,88 @@
import { Button, Select, Tooltip } from 'antd';
import * as React from 'react';
import styled from 'styled-components/macro';
import { blue } from '@ant-design/colors';
import { useHistory, useLocation } from 'react-router';
import { ImpactAnalysisIcon } from '../Dataset/Schema/components/MenuColumn';
import updateQueryParams from '../../../../shared/updateQueryParams';
import { downgradeV2FieldPath } from '../../../dataset/profile/schema/utils/utils';
import { useEntityData } from '../../EntityContext';
const StyledSelect = styled(Select)`
margin-right: 5px;
min-width: 140px;
max-width: 200px;
`;
const StyledButton = styled(Button)<{ isSelected: boolean }>`
transition: color 0s;
display: flex;
align-items: center;
${(props) =>
props.isSelected &&
`
color: ${blue[5]};
&:focus, &:hover {
color: ${blue[5]};
}
`};
`;
const TextWrapper = styled.span`
margin-left: 8px;
`;
interface Props {
selectedColumn?: string;
isColumnLevelLineage: boolean;
setSelectedColumn: (column: any) => void;
setIsColumnLevelLineage: (isShowing: boolean) => void;
}
export default function ColumnsLineageSelect({
selectedColumn,
isColumnLevelLineage,
setSelectedColumn,
setIsColumnLevelLineage,
}: Props) {
const { entityData } = useEntityData();
const location = useLocation();
const history = useHistory();
function selectColumn(column: any) {
updateQueryParams({ column }, location, history);
setSelectedColumn(column);
}
const columnButtonTooltip = isColumnLevelLineage ? 'Hide column level lineage' : 'Show column level lineage';
return (
<>
{isColumnLevelLineage && (
<StyledSelect
value={selectedColumn}
onChange={selectColumn}
showSearch
allowClear
placeholder="Select column"
>
{entityData?.schemaMetadata?.fields.map((field) => (
<Select.Option value={field.fieldPath}>{downgradeV2FieldPath(field.fieldPath)}</Select.Option>
))}
</StyledSelect>
)}
<Tooltip title={columnButtonTooltip}>
<StyledButton
type="text"
onClick={() => setIsColumnLevelLineage(!isColumnLevelLineage)}
data-testid="column-lineage-toggle"
isSelected={isColumnLevelLineage}
>
<ImpactAnalysisIcon />
<TextWrapper>Column Lineage</TextWrapper>
</StyledButton>
</Tooltip>
</>
);
}

View File

@ -1,8 +1,6 @@
import React, { useEffect } from 'react';
import * as QueryString from 'query-string';
import { useLocation } from 'react-router';
import styled from 'styled-components';
import { useSearchAcrossLineageQuery } from '../../../../../graphql/search.generated';
import { EntityType, FacetFilterInput, LineageDirection } from '../../../../../types.generated';
import { ENTITY_FILTER_NAME } from '../../../../search/utils/constants';
@ -12,10 +10,6 @@ import analytics, { EventType } from '../../../../analytics';
import generateUseSearchResultsViaRelationshipHook from './generateUseSearchResultsViaRelationshipHook';
import { EmbeddedListSearchSection } from '../../components/styled/search/EmbeddedListSearchSection';
const ImpactAnalysisWrapper = styled.div`
flex: 1;
`;
type Props = {
urn: string;
direction: LineageDirection;
@ -60,15 +54,13 @@ export const ImpactAnalysis = ({ urn, direction }: Props) => {
}, [query, data, loading]);
return (
<ImpactAnalysisWrapper>
<EmbeddedListSearchSection
useGetSearchResults={generateUseSearchResultsViaRelationshipHook({
urn,
direction,
})}
defaultShowFilters
defaultFilters={[{ field: 'degree', values: ['1'] }]}
/>
</ImpactAnalysisWrapper>
<EmbeddedListSearchSection
useGetSearchResults={generateUseSearchResultsViaRelationshipHook({
urn,
direction,
})}
defaultShowFilters
defaultFilters={[{ field: 'degree', values: ['1'] }]}
/>
);
};

View File

@ -1,6 +1,7 @@
import React, { useCallback, useState } from 'react';
import { Button } from 'antd';
import { useHistory } from 'react-router';
import * as QueryString from 'query-string';
import { useHistory, useLocation } from 'react-router';
import { ArrowDownOutlined, ArrowUpOutlined, PartitionOutlined } from '@ant-design/icons';
import styled from 'styled-components/macro';
@ -10,6 +11,10 @@ import { getEntityPath } from '../../containers/profile/utils';
import { useEntityRegistry } from '../../../../useEntityRegistry';
import { ImpactAnalysis } from './ImpactAnalysis';
import { LineageDirection } from '../../../../../types.generated';
import { generateSchemaFieldUrn } from './utils';
import { downgradeV2FieldPath } from '../../../dataset/profile/schema/utils/utils';
import ColumnsLineageSelect from './ColumnLineageSelect';
import { LineageTabContext } from './LineageTabContext';
const StyledTabToolbar = styled(TabToolbar)`
justify-content: space-between;
@ -26,6 +31,11 @@ const StyledButton = styled(Button)<{ isSelected: boolean }>`
`}
`;
const RightButtonsWrapper = styled.div`
align-items: center;
display: flex;
`;
export const LineageTab = ({
properties = { defaultDirection: LineageDirection.Downstream },
}: {
@ -34,12 +44,20 @@ export const LineageTab = ({
const { urn, entityType } = useEntityData();
const history = useHistory();
const entityRegistry = useEntityRegistry();
const [lineageDirection, setLineageDirection] = useState<string>(properties.defaultDirection);
const location = useLocation();
const params = QueryString.parse(location.search, { arrayFormat: 'comma' });
const [lineageDirection, setLineageDirection] = useState<LineageDirection>(properties.defaultDirection);
const [selectedColumn, setSelectedColumn] = useState<string | undefined>(params?.column as string);
const [isColumnLevelLineage, setIsColumnLevelLineage] = useState(!!params?.column);
const routeToLineage = useCallback(() => {
history.push(getEntityPath(entityType, urn, entityRegistry, true, false));
}, [history, entityType, urn, entityRegistry]);
const selectedV1FieldPath = downgradeV2FieldPath(selectedColumn) || '';
const selectedColumnUrn = generateSchemaFieldUrn(selectedV1FieldPath, urn);
const impactAnalysisUrn = isColumnLevelLineage && selectedColumnUrn ? selectedColumnUrn : urn;
return (
<>
<StyledTabToolbar>
@ -59,12 +77,22 @@ export const LineageTab = ({
<ArrowUpOutlined /> Upstream
</StyledButton>
</div>
<Button type="text" onClick={routeToLineage}>
<PartitionOutlined />
Visualize Lineage
</Button>
<RightButtonsWrapper>
<ColumnsLineageSelect
selectedColumn={selectedColumn}
isColumnLevelLineage={isColumnLevelLineage}
setSelectedColumn={setSelectedColumn}
setIsColumnLevelLineage={setIsColumnLevelLineage}
/>
<Button type="text" onClick={routeToLineage}>
<PartitionOutlined />
Visualize Lineage
</Button>
</RightButtonsWrapper>
</StyledTabToolbar>
<ImpactAnalysis urn={urn} direction={lineageDirection as LineageDirection} />
<LineageTabContext.Provider value={{ isColumnLevelLineage, selectedColumn, lineageDirection }}>
<ImpactAnalysis urn={impactAnalysisUrn} direction={lineageDirection as LineageDirection} />
</LineageTabContext.Provider>
</>
);
};

View File

@ -0,0 +1,14 @@
import React from 'react';
import { LineageDirection } from '../../../../../types.generated';
export const LineageTabContext = React.createContext<LineageTabContextType>({
isColumnLevelLineage: false,
lineageDirection: LineageDirection.Downstream,
selectedColumn: undefined,
});
type LineageTabContextType = {
isColumnLevelLineage: boolean;
lineageDirection: LineageDirection;
selectedColumn?: string;
};

View File

@ -0,0 +1,4 @@
export function generateSchemaFieldUrn(fieldPath: string | undefined, resourceUrn: string) {
if (!fieldPath) return null;
return `urn:li:schemaField:(${resourceUrn},${fieldPath})`;
}

View File

@ -15,6 +15,7 @@ import {
Deprecation,
Domain,
ParentNodesResult,
EntityPath,
} from '../../types.generated';
import TagTermGroup from '../shared/tags/TagTermGroup';
import { ANTD_GRAY } from '../entity/shared/constants';
@ -28,6 +29,7 @@ import { ExpandedActorGroup } from '../entity/shared/components/styled/ExpandedA
import { DeprecationPill } from '../entity/shared/components/styled/DeprecationPill';
import { PreviewType } from '../entity/Entity';
import ExternalUrlButton from '../entity/shared/ExternalUrlButton';
import EntityPaths from './EntityPaths/EntityPaths';
const PreviewContainer = styled.div`
display: flex;
@ -182,6 +184,7 @@ interface Props {
parentContainers?: ParentContainersResult | null;
parentNodes?: ParentNodesResult | null;
previewType?: Maybe<PreviewType>;
paths?: EntityPath[];
}
export default function DefaultPreviewCard({
@ -219,6 +222,7 @@ export default function DefaultPreviewCard({
platforms,
logoUrls,
previewType,
paths,
}: Props) {
// sometimes these lists will be rendered inside an entity container (for example, in the case of impact analysis)
// in those cases, we may want to enrich the preview w/ context about the container entity
@ -302,6 +306,7 @@ export default function DefaultPreviewCard({
{!!degree && entityCount && <PlatformDivider />}
<EntityCount entityCount={entityCount} />
</TitleContainer>
{paths && paths.length > 0 && <EntityPaths paths={paths} resultEntityUrn={urn || ''} />}
{description && description.length > 0 && (
<DescriptionContainer>
<NoMarkdownViewer
@ -345,27 +350,29 @@ export default function DefaultPreviewCard({
</InsightContainer>
)}
</LeftColumn>
<RightColumn>
{topUsers && topUsers?.length > 0 && (
<>
{shouldShowRightColumn && (
<RightColumn>
{topUsers && topUsers?.length > 0 && (
<>
<UserListContainer>
<UserListTitle strong>Top Users</UserListTitle>
<div>
<ExpandedActorGroup actors={topUsers} max={2} />
</div>
</UserListContainer>
</>
)}
{(topUsers?.length || 0) > 0 && (owners?.length || 0) > 0 && <UserListDivider type="vertical" />}
{owners && owners?.length > 0 && (
<UserListContainer>
<UserListTitle strong>Top Users</UserListTitle>
<UserListTitle strong>Owners</UserListTitle>
<div>
<ExpandedActorGroup actors={topUsers} max={2} />
<ExpandedActorGroup actors={owners.map((owner) => owner.owner)} max={2} />
</div>
</UserListContainer>
</>
)}
{(topUsers?.length || 0) > 0 && (owners?.length || 0) > 0 && <UserListDivider type="vertical" />}
{owners && owners?.length > 0 && (
<UserListContainer>
<UserListTitle strong>Owners</UserListTitle>
<div>
<ExpandedActorGroup actors={owners.map((owner) => owner.owner)} max={2} />
</div>
</UserListContainer>
)}
</RightColumn>
)}
</RightColumn>
)}
</PreviewContainer>
);
}

View File

@ -0,0 +1,67 @@
import { Tooltip } from 'antd';
import React, { useContext } from 'react';
import styled from 'styled-components/macro';
import { EntityPath, EntityType, LineageDirection, SchemaFieldEntity } from '../../../types.generated';
import { ANTD_GRAY } from '../../entity/shared/constants';
import { LineageTabContext } from '../../entity/shared/tabs/Lineage/LineageTabContext';
import ColumnsRelationshipText from './ColumnsRelationshipText';
import DisplayedColumns from './DisplayedColumns';
export const ResultText = styled.span`
&:hover {
border-bottom: 1px solid black;
cursor: pointer;
}
`;
const DescriptionWrapper = styled.span`
color: ${ANTD_GRAY[8]};
`;
export function getDisplayedColumns(paths: EntityPath[], resultEntityUrn: string) {
return paths
.map((path) =>
path.path?.filter(
(entity) =>
entity?.type === EntityType.SchemaField &&
(entity as SchemaFieldEntity).parent.urn === resultEntityUrn,
),
)
.flat();
}
interface Props {
paths: EntityPath[];
resultEntityUrn: string;
openModal: () => void;
}
export default function ColumnPathsText({ paths, resultEntityUrn, openModal }: Props) {
const { lineageDirection } = useContext(LineageTabContext);
const displayedColumns = getDisplayedColumns(paths, resultEntityUrn);
return (
<>
<DescriptionWrapper>
{lineageDirection === LineageDirection.Downstream ? 'Downstream' : 'Upstream'} column
{displayedColumns.length > 1 && 's'}
</DescriptionWrapper>
: &nbsp;
<ResultText onClick={openModal}>
<Tooltip
title={
<span>
Click to see column path{paths.length > 1 && 's'} from{' '}
<ColumnsRelationshipText displayedColumns={displayedColumns} />
</span>
}
>
<span>
<DisplayedColumns displayedColumns={displayedColumns} />
</span>
</Tooltip>
</ResultText>
</>
);
}

View File

@ -0,0 +1,39 @@
import { Maybe } from 'graphql/jsutils/Maybe';
import React, { useContext } from 'react';
import styled from 'styled-components/macro';
import { Entity, LineageDirection } from '../../../types.generated';
import { downgradeV2FieldPath } from '../../entity/dataset/profile/schema/utils/utils';
import { LineageTabContext } from '../../entity/shared/tabs/Lineage/LineageTabContext';
import DisplayedColumns from './DisplayedColumns';
const ColumnNameWrapper = styled.span<{ isBlack?: boolean }>`
font-family: 'Roboto Mono', monospace;
font-weight: bold;
${(props) => props.isBlack && 'color: black;'}
`;
interface Props {
displayedColumns: (Maybe<Entity> | undefined)[];
}
export default function ColumnsRelationshipText({ displayedColumns }: Props) {
const { selectedColumn, lineageDirection } = useContext(LineageTabContext);
const displayedFieldPath = downgradeV2FieldPath(selectedColumn);
return (
<>
{lineageDirection === LineageDirection.Downstream ? (
<span>
<ColumnNameWrapper>{displayedFieldPath}</ColumnNameWrapper> to&nbsp;
<DisplayedColumns displayedColumns={displayedColumns} />
</span>
) : (
<span>
<DisplayedColumns displayedColumns={displayedColumns} /> to{' '}
<ColumnNameWrapper>{displayedFieldPath}</ColumnNameWrapper>
</span>
)}
</>
);
}

View File

@ -0,0 +1,38 @@
import { Maybe } from 'graphql/jsutils/Maybe';
import React from 'react';
import styled from 'styled-components/macro';
import { Entity, EntityType, SchemaFieldEntity } from '../../../types.generated';
import { downgradeV2FieldPath } from '../../entity/dataset/profile/schema/utils/utils';
import { useEntityRegistry } from '../../useEntityRegistry';
const ColumnNameWrapper = styled.span<{ isBlack?: boolean }>`
font-family: 'Roboto Mono', monospace;
font-weight: bold;
${(props) => props.isBlack && 'color: black;'}
`;
interface Props {
displayedColumns: (Maybe<Entity> | undefined)[];
}
export default function DisplayedColumns({ displayedColumns }: Props) {
const entityRegistry = useEntityRegistry();
return (
<span>
{displayedColumns.map((entity, index) => {
if (entity) {
return (
<ColumnNameWrapper>
{entity.type === EntityType.SchemaField
? downgradeV2FieldPath((entity as SchemaFieldEntity).fieldPath)
: entityRegistry.getDisplayName(entity.type, entity)}
{index !== displayedColumns.length - 1 && ', '}
</ColumnNameWrapper>
);
}
return null;
})}
</span>
);
}

View File

@ -0,0 +1,41 @@
import React, { useContext, useState } from 'react';
import styled from 'styled-components/macro';
import { EntityPath } from '../../../types.generated';
import { LineageTabContext } from '../../entity/shared/tabs/Lineage/LineageTabContext';
import ColumnPathsText from './ColumnPathsText';
import EntityPathsModal from './EntityPathsModal';
const EntityPathsWrapper = styled.div`
margin-bottom: 5px;
`;
interface Props {
paths: EntityPath[];
resultEntityUrn: string;
}
export default function EntityPaths({ paths, resultEntityUrn }: Props) {
const { isColumnLevelLineage, selectedColumn } = useContext(LineageTabContext);
const [isPathsModalVisible, setIsPathsModalVisible] = useState(false);
if (!isColumnLevelLineage || !selectedColumn) return null;
return (
<>
<EntityPathsWrapper>
<ColumnPathsText
paths={paths}
resultEntityUrn={resultEntityUrn}
openModal={() => setIsPathsModalVisible(true)}
/>
</EntityPathsWrapper>
{isPathsModalVisible && (
<EntityPathsModal
paths={paths}
resultEntityUrn={resultEntityUrn}
hideModal={() => setIsPathsModalVisible(false)}
/>
)}
</>
);
}

View File

@ -0,0 +1,62 @@
import { Modal } from 'antd';
import React from 'react';
import styled from 'styled-components/macro';
import { Entity, EntityPath } from '../../../types.generated';
import { ANTD_GRAY } from '../../entity/shared/constants';
import { CompactEntityNameList } from '../../recommendations/renderer/component/CompactEntityNameList';
import { getDisplayedColumns } from './ColumnPathsText';
import ColumnsRelationshipText from './ColumnsRelationshipText';
const StyledModal = styled(Modal)`
width: 70vw;
max-width: 850px;
`;
const PathWrapper = styled.div`
display: inline-block;
margin: 15px 0 15px -4px;
padding: 20px;
border: 1px solid ${ANTD_GRAY[4]};
border-radius: 8px;
box-shadow: 1px 1px 12px 4px #0000000d;
width: 100%;
`;
const Header = styled.div`
color: ${ANTD_GRAY[8]};
font-size: 16px;
padding-top: 8px;
`;
interface Props {
paths: EntityPath[];
resultEntityUrn: string;
hideModal: () => void;
}
export default function EntityPathsModal({ paths, resultEntityUrn, hideModal }: Props) {
const displayedColumns = getDisplayedColumns(paths, resultEntityUrn);
return (
<StyledModal
title={
<Header>
Column path{paths.length > 1 && 's'} from{' '}
<ColumnsRelationshipText displayedColumns={displayedColumns} />
</Header>
}
width="75vw"
visible
onCancel={hideModal}
onOk={hideModal}
footer={null}
bodyStyle={{ padding: '16px 24px' }}
>
{paths.map((path) => (
<PathWrapper>
<CompactEntityNameList entities={path.path as Entity[]} showArrows />
</PathWrapper>
))}{' '}
</StyledModal>
);
}

View File

@ -1,54 +1,82 @@
import { ArrowRightOutlined } from '@ant-design/icons';
import React from 'react';
import styled from 'styled-components/macro';
import { useHistory } from 'react-router';
import { Entity } from '../../../../types.generated';
import { Entity, EntityType, SchemaFieldEntity } from '../../../../types.generated';
import { IconStyleType } from '../../../entity/Entity';
import { useEntityRegistry } from '../../../useEntityRegistry';
import { EntityPreviewTag } from './EntityPreviewTag';
import { HoverEntityTooltip } from './HoverEntityTooltip';
import { ANTD_GRAY } from '../../../entity/shared/constants';
const NameWrapper = styled.span<{ addMargin }>`
display: inline-flex;
align-items: center;
${(props) => props.addMargin && 'margin: 2px 0;'}
`;
const StyledArrow = styled(ArrowRightOutlined)`
color: ${ANTD_GRAY[8]};
margin: 0 4px;
`;
type Props = {
entities: Array<Entity>;
onClick?: (index: number) => void;
linkUrlParams?: Record<string, string | boolean>;
showTooltips?: boolean;
showArrows?: boolean;
};
export const CompactEntityNameList = ({ entities, onClick, linkUrlParams, showTooltips = true }: Props) => {
export const CompactEntityNameList = ({ entities, onClick, linkUrlParams, showTooltips = true, showArrows }: Props) => {
const entityRegistry = useEntityRegistry();
const history = useHistory();
return (
<>
{entities.map((entity, index) => {
if (!entity) return <></>;
{entities.map((mappedEntity, index) => {
if (!mappedEntity) return <></>;
let entity = mappedEntity;
let columnName;
if (entity.type === EntityType.SchemaField) {
const { parent, fieldPath } = entity as SchemaFieldEntity;
entity = parent;
columnName = fieldPath;
}
const isLastEntityInList = index === entities.length - 1;
const showArrow = showArrows && !isLastEntityInList;
const genericProps = entityRegistry.getGenericEntityProperties(entity.type, entity);
const platformLogoUrl = genericProps?.platform?.properties?.logoUrl;
const displayName = entityRegistry.getDisplayName(entity.type, entity);
const fallbackIcon = entityRegistry.getIcon(entity.type, 12, IconStyleType.ACCENT);
const url = entityRegistry.getEntityUrl(entity.type, entity.urn, linkUrlParams);
return (
<span
onClickCapture={(e) => {
// prevents the search links from taking over
e.preventDefault();
history.push(url);
}}
>
<HoverEntityTooltip entity={entity} canOpen={showTooltips}>
<span data-testid={`compact-entity-link-${entity.urn}`}>
<EntityPreviewTag
displayName={displayName}
url={url}
platformLogoUrl={platformLogoUrl || undefined}
platformLogoUrls={genericProps?.siblingPlatforms?.map(
(platform) => platform.properties?.logoUrl,
)}
logoComponent={fallbackIcon}
onClick={() => onClick?.(index)}
/>
</span>
</HoverEntityTooltip>
</span>
<NameWrapper addMargin={showArrow}>
<span
onClickCapture={(e) => {
// prevents the search links from taking over
e.preventDefault();
history.push(url);
}}
>
<HoverEntityTooltip entity={entity} canOpen={showTooltips}>
<span data-testid={`compact-entity-link-${entity.urn}`}>
<EntityPreviewTag
displayName={displayName}
url={url}
platformLogoUrl={platformLogoUrl || undefined}
platformLogoUrls={genericProps?.siblingPlatforms?.map(
(platform) => platform.properties?.logoUrl,
)}
logoComponent={fallbackIcon}
onClick={() => onClick?.(index)}
columnName={columnName}
/>
</span>
</HoverEntityTooltip>
</span>
{showArrow && <StyledArrow />}
</NameWrapper>
);
})}
</>

View File

@ -1,7 +1,7 @@
import React from 'react';
import { Divider, List, Checkbox } from 'antd';
import styled from 'styled-components';
import { Entity } from '../../../../types.generated';
import { Entity, EntityPath } from '../../../../types.generated';
import { useEntityRegistry } from '../../../useEntityRegistry';
import DefaultPreviewCard from '../../../preview/DefaultPreviewCard';
import { IconStyleType } from '../../../entity/Entity';
@ -60,6 +60,7 @@ const ThinDivider = styled(Divider)`
type AdditionalProperties = {
degree?: number;
paths?: EntityPath[];
};
type Props = {
@ -154,6 +155,7 @@ export const EntityNameList = ({
entityCount={entityCount}
degree={additionalProperties?.degree}
deprecation={deprecation}
paths={additionalProperties?.paths}
/>
</ListItem>
<ThinDivider />

View File

@ -1,8 +1,9 @@
import React from 'react';
import { Image, Tag } from 'antd';
import { Divider, Image, Tag } from 'antd';
import styled from 'styled-components';
import { Link } from 'react-router-dom';
import { Maybe } from 'graphql/jsutils/Maybe';
import { ANTD_GRAY } from '../../../entity/shared/constants';
const EntityTag = styled(Tag)`
margin: 4px;
@ -34,6 +35,16 @@ const DisplayNameContainer = styled.span`
padding-right: 4px;
`;
const ColumnName = styled.span`
font-family: 'Roboto Mono', monospace;
font-weight: bold;
`;
export const StyledDivider = styled(Divider)`
background-color: ${ANTD_GRAY[6]};
margin: 0 7px;
`;
type Props = {
displayName: string;
url: string;
@ -41,6 +52,7 @@ type Props = {
platformLogoUrls?: Maybe<string>[];
logoComponent?: React.ReactNode;
onClick?: () => void;
columnName?: string;
};
export const EntityPreviewTag = ({
@ -50,6 +62,7 @@ export const EntityPreviewTag = ({
platformLogoUrls,
logoComponent,
onClick,
columnName,
}: Props) => {
return (
<Link to={url} onClick={onClick}>
@ -69,6 +82,12 @@ export const EntityPreviewTag = ({
</IconContainer>
<DisplayNameContainer>
<span className="test-mini-preview-class">{displayName}</span>
{columnName && (
<>
<StyledDivider type="vertical" />
<ColumnName>{columnName}</ColumnName>
</>
)}
</DisplayNameContainer>
</TitleContainer>
</EntityTag>

View File

@ -0,0 +1,20 @@
import * as QueryString from 'query-string';
import { Location, History } from 'history';
type QueryParam = {
[key: string]: string | undefined;
};
export default function updateQueryParams(newParams: QueryParam, location: Location, history: History) {
const parsedParams = QueryString.parse(location.search, { arrayFormat: 'comma' });
const updatedParams = {
...parsedParams,
...newParams,
};
const stringifiedParams = QueryString.stringify(updatedParams, { arrayFormat: 'comma' });
history.push({
pathname: location.pathname,
search: stringifiedParams,
});
}

View File

@ -760,6 +760,15 @@ fragment searchResults on SearchResults {
}
}
fragment schemaFieldEntityFields on SchemaFieldEntity {
urn
type
fieldPath
parent {
...searchResultFields
}
}
fragment searchAcrossRelationshipResults on SearchAcrossLineageResults {
start
count
@ -776,6 +785,14 @@ fragment searchAcrossRelationshipResults on SearchAcrossLineageResults {
text
icon
}
paths {
path {
...searchResultFields
... on SchemaFieldEntity {
...schemaFieldEntityFields
}
}
}
degree
}
facets {

View File

@ -107,12 +107,26 @@ public class LineageRegistry {
return Collections.emptyList();
}
if (entityName.equals("schemaField")) {
return getSchemaFieldRelationships(direction);
}
if (direction == LineageDirection.UPSTREAM) {
return spec.getUpstreamEdges();
}
return spec.getDownstreamEdges();
}
private List<EdgeInfo> getSchemaFieldRelationships(LineageDirection direction) {
List<EdgeInfo> schemaFieldEdges = new ArrayList<>();
if (direction == LineageDirection.UPSTREAM) {
schemaFieldEdges.add(new EdgeInfo("DownstreamOf", RelationshipDirection.OUTGOING, "schemafield"));
} else {
schemaFieldEdges.add(new EdgeInfo("DownstreamOf", RelationshipDirection.INCOMING, "schemafield"));
}
return schemaFieldEdges;
}
@Value
private static class LineageEdge {
String sourceEntity;

View File

@ -48,6 +48,7 @@ public class Constants {
public static final String INVITE_TOKEN_ENTITY_NAME = "inviteToken";
public static final String DATAHUB_ROLE_ENTITY_NAME = "dataHubRole";
public static final String POST_ENTITY_NAME = "post";
public static final String SCHEMA_FIELD_ENTITY_NAME = "schemaField";
/**

View File

@ -4,6 +4,8 @@ import com.codahale.metrics.Timer;
import com.datahub.util.exception.ESQueryException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.linkedin.common.UrnArray;
import com.linkedin.common.UrnArrayArray;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.graph.GraphFilters;
@ -161,6 +163,7 @@ public class ESGraphQueryDAO {
// Do a Level-order BFS
Set<Urn> visitedEntities = ConcurrentHashMap.newKeySet();
visitedEntities.add(entityUrn);
UrnArrayArray existingPaths = new UrnArrayArray();
List<Urn> currentLevel = ImmutableList.of(entityUrn);
for (int i = 0; i < maxHops; i++) {
@ -176,7 +179,7 @@ public class ESGraphQueryDAO {
// Do one hop on the lineage graph
List<LineageRelationship> oneHopRelationships =
getLineageRelationshipsInBatches(currentLevel, direction, graphFilters, visitedEntities, i + 1, remainingTime);
getLineageRelationshipsInBatches(currentLevel, direction, graphFilters, visitedEntities, i + 1, remainingTime, existingPaths);
result.addAll(oneHopRelationships);
currentLevel = oneHopRelationships.stream().map(LineageRelationship::getEntity).collect(Collectors.toList());
currentTime = System.currentTimeMillis();
@ -197,11 +200,11 @@ public class ESGraphQueryDAO {
// Get 1-hop lineage relationships asynchronously in batches with timeout
@WithSpan
public List<LineageRelationship> getLineageRelationshipsInBatches(@Nonnull List<Urn> entityUrns,
@Nonnull LineageDirection direction, GraphFilters graphFilters, Set<Urn> visitedEntities, int numHops, long remainingTime) {
@Nonnull LineageDirection direction, GraphFilters graphFilters, Set<Urn> visitedEntities, int numHops, long remainingTime, UrnArrayArray existingPaths) {
List<List<Urn>> batches = Lists.partition(entityUrns, BATCH_SIZE);
return ConcurrencyUtils.getAllCompleted(batches.stream()
.map(batchUrns -> CompletableFuture.supplyAsync(
() -> getLineageRelationships(batchUrns, direction, graphFilters, visitedEntities, numHops)))
() -> getLineageRelationships(batchUrns, direction, graphFilters, visitedEntities, numHops, existingPaths)))
.collect(Collectors.toList()), remainingTime, TimeUnit.MILLISECONDS)
.stream()
.flatMap(List::stream)
@ -211,7 +214,7 @@ public class ESGraphQueryDAO {
// Get 1-hop lineage relationships
@WithSpan
private List<LineageRelationship> getLineageRelationships(@Nonnull List<Urn> entityUrns,
@Nonnull LineageDirection direction, GraphFilters graphFilters, Set<Urn> visitedEntities, int numHops) {
@Nonnull LineageDirection direction, GraphFilters graphFilters, Set<Urn> visitedEntities, int numHops, UrnArrayArray existingPaths) {
Map<String, List<Urn>> urnsPerEntityType = entityUrns.stream().collect(Collectors.groupingBy(Urn::getEntityType));
Map<String, List<EdgeInfo>> edgesPerEntityType = urnsPerEntityType.keySet()
.stream()
@ -228,14 +231,48 @@ public class ESGraphQueryDAO {
.stream()
.flatMap(entry -> entry.getValue().stream().map(edgeInfo -> Pair.of(entry.getKey(), edgeInfo)))
.collect(Collectors.toSet());
return extractRelationships(entityUrnSet, response, validEdges, visitedEntities, numHops);
return extractRelationships(entityUrnSet, response, validEdges, visitedEntities, numHops, existingPaths);
}
private UrnArrayArray getAndUpdatePaths(UrnArrayArray existingPaths, Urn parentUrn, Urn childUrn, RelationshipDirection direction) {
try {
UrnArrayArray currentPaths = existingPaths.stream()
.filter(path -> path.get(direction == RelationshipDirection.OUTGOING ? 0 : path.size() - 1).equals(parentUrn))
.collect(Collectors.toCollection(UrnArrayArray::new));
UrnArrayArray resultPaths = new UrnArrayArray();
if (currentPaths.size() > 0) {
for (UrnArray path : currentPaths) {
UrnArray copyOfPath = path.clone();
if (direction == RelationshipDirection.OUTGOING) {
copyOfPath.add(0, childUrn);
} else {
copyOfPath.add(childUrn);
}
resultPaths.add(copyOfPath);
existingPaths.add(copyOfPath);
}
} else {
UrnArray path = new UrnArray();
if (direction == RelationshipDirection.OUTGOING) {
path.addAll(ImmutableList.of(childUrn, parentUrn));
} else {
path.addAll(ImmutableList.of(parentUrn, childUrn));
}
resultPaths.add(path);
existingPaths.add(path);
}
return resultPaths;
} catch (CloneNotSupportedException e) {
log.error(String.format("Failed to create paths for parentUrn %s and childUrn %s", parentUrn, childUrn), e);
throw new RuntimeException(e);
}
}
// Given set of edges and the search response, extract all valid edges that originate from the input entityUrns
@WithSpan
private List<LineageRelationship> extractRelationships(@Nonnull Set<Urn> entityUrns,
@Nonnull SearchResponse searchResponse, Set<Pair<String, EdgeInfo>> validEdges, Set<Urn> visitedEntities,
int numHops) {
int numHops, UrnArrayArray existingPaths) {
List<LineageRelationship> result = new LinkedList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
Map<String, Object> document = hit.getSourceAsMap();
@ -251,7 +288,8 @@ public class ESGraphQueryDAO {
if (!visitedEntities.contains(destinationUrn) && validEdges.contains(
Pair.of(sourceUrn.getEntityType(), new EdgeInfo(type, RelationshipDirection.OUTGOING, destinationUrn.getEntityType().toLowerCase())))) {
visitedEntities.add(destinationUrn);
result.add(new LineageRelationship().setType(type).setEntity(destinationUrn).setDegree(numHops));
final UrnArrayArray paths = getAndUpdatePaths(existingPaths, sourceUrn, destinationUrn, RelationshipDirection.OUTGOING);
result.add(new LineageRelationship().setType(type).setEntity(destinationUrn).setDegree(numHops).setPaths(paths));
}
}
@ -262,7 +300,8 @@ public class ESGraphQueryDAO {
if (!visitedEntities.contains(sourceUrn) && validEdges.contains(
Pair.of(destinationUrn.getEntityType(), new EdgeInfo(type, RelationshipDirection.INCOMING, sourceUrn.getEntityType().toLowerCase())))) {
visitedEntities.add(sourceUrn);
result.add(new LineageRelationship().setType(type).setEntity(sourceUrn).setDegree(numHops));
final UrnArrayArray paths = getAndUpdatePaths(existingPaths, destinationUrn, sourceUrn, RelationshipDirection.INCOMING);
result.add(new LineageRelationship().setType(type).setEntity(sourceUrn).setDegree(numHops).setPaths(paths));
}
}
}

View File

@ -2,12 +2,15 @@ package com.linkedin.metadata.search;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.linkedin.common.UrnArrayArray;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.StringArray;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.graph.EntityLineageResult;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.graph.LineageRelationship;
import com.linkedin.metadata.graph.LineageRelationshipArray;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.Criterion;
@ -19,6 +22,7 @@ import com.linkedin.metadata.search.utils.QueryUtils;
import com.linkedin.metadata.search.utils.SearchUtils;
import io.opentelemetry.extension.annotations.WithSpan;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -93,6 +97,10 @@ public class LineageSearchService {
}
}
// set schemaField relationship entity to be its reference urn
LineageRelationshipArray updatedRelationships = convertSchemaFieldRelationships(lineageResult);
lineageResult.setRelationships(updatedRelationships);
// Filter hopped result based on the set of entities to return and inputFilters before sending to search
List<LineageRelationship> lineageRelationships =
filterRelationships(lineageResult, new HashSet<>(entities), inputFilters);
@ -101,6 +109,32 @@ public class LineageSearchService {
from, size);
}
// Necessary so we don't filter out schemaField entities and so that we search to get the parent reference entity
private LineageRelationshipArray convertSchemaFieldRelationships(EntityLineageResult lineageResult) {
return lineageResult.getRelationships().stream().map(relationship -> {
if (relationship.getEntity().getEntityType().equals("schemaField")) {
Urn entity = getSchemaFieldReferenceUrn(relationship.getEntity());
relationship.setEntity(entity);
}
return relationship;
}).collect(Collectors.toCollection(LineageRelationshipArray::new));
}
private Map<Urn, LineageRelationship> generateUrnToRelationshipMap(List<LineageRelationship> lineageRelationships) {
Map<Urn, LineageRelationship> urnToRelationship = new HashMap<>();
for (LineageRelationship relationship : lineageRelationships) {
LineageRelationship existingRelationship = urnToRelationship.get(relationship.getEntity());
if (existingRelationship == null) {
urnToRelationship.put(relationship.getEntity(), relationship);
} else {
UrnArrayArray paths = existingRelationship.getPaths();
paths.addAll(relationship.getPaths());
existingRelationship.setPaths(paths);
}
}
return urnToRelationship;
}
// Search service can only take up to 50K term filter, so query search service in batches
private LineageSearchResult getSearchResultInBatches(List<LineageRelationship> lineageRelationships,
@Nonnull String input, @Nullable Filter inputFilters, @Nullable SortCriterion sortCriterion, int from, int size) {
@ -118,8 +152,7 @@ public class LineageSearchService {
.map(relationship -> relationship.getEntity().getEntityType())
.distinct()
.collect(Collectors.toList());
Map<Urn, LineageRelationship> urnToRelationship =
batch.stream().collect(Collectors.toMap(LineageRelationship::getEntity, Function.identity()));
Map<Urn, LineageRelationship> urnToRelationship = generateUrnToRelationshipMap(batch);
Filter finalFilter = buildFilter(urnToRelationship.keySet(), inputFilters);
LineageSearchResult resultForBatch = buildLineageSearchResult(
_searchService.searchAcrossEntities(entitiesToQuery, input, finalFilter, sortCriterion, queryFrom, querySize,
@ -169,6 +202,18 @@ public class LineageSearchService {
}).reduce(x -> false, Predicate::or);
}
private Urn getSchemaFieldReferenceUrn(Urn urn) {
if (urn.getEntityType().equals(Constants.SCHEMA_FIELD_ENTITY_NAME)) {
try {
// Get the dataset urn referenced inside the schemaField urn
return Urn.createFromString(urn.getId());
} catch (Exception e) {
log.error("Invalid destination urn: {}", urn.getId(), e);
}
}
return urn;
}
private List<LineageRelationship> filterRelationships(@Nonnull EntityLineageResult lineageResult,
@Nonnull Set<String> entities, @Nullable Filter inputFilters) {
Stream<LineageRelationship> relationshipsFilteredByEntities = lineageResult.getRelationships().stream();
@ -231,7 +276,7 @@ public class LineageSearchService {
@Nullable LineageRelationship lineageRelationship) {
LineageSearchEntity entity = new LineageSearchEntity(searchEntity.data());
if (lineageRelationship != null) {
entity.setPath(lineageRelationship.getPath());
entity.setPaths(lineageRelationship.getPaths());
entity.setDegree(lineageRelationship.getDegree());
}
return entity;

View File

@ -5,6 +5,8 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.FineGrainedLineage;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.gms.factory.common.GraphServiceFactory;
import com.linkedin.gms.factory.common.SystemMetadataServiceFactory;
@ -38,6 +40,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -67,6 +70,7 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
private final EntityRegistry _entityRegistry;
private final SearchDocumentTransformer _searchDocumentTransformer;
public static final String DOWNSTREAM_OF = "DownstreamOf";
private static final Set<ChangeType> VALID_CHANGE_TYPES =
Stream.of(
ChangeType.UPSERT,
@ -152,15 +156,47 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
}
}
private Pair<List<Edge>, Set<String>> getEdgesAndRelationshipTypesFromAspect(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) {
final Set<String> relationshipTypesBeingAdded = new HashSet<>();
private void updateFineGrainedEdgesAndRelationships(
RecordTemplate aspect,
List<Edge> edgesToAdd,
HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded
) {
UpstreamLineage upstreamLineage = new UpstreamLineage(aspect.data());
if (upstreamLineage.getFineGrainedLineages() != null) {
for (FineGrainedLineage fineGrainedLineage : upstreamLineage.getFineGrainedLineages()) {
if (!fineGrainedLineage.hasDownstreams() || !fineGrainedLineage.hasUpstreams()) {
break;
}
// for every downstream, create an edge with each of the upstreams
for (Urn downstream : fineGrainedLineage.getDownstreams()) {
for (Urn upstream : fineGrainedLineage.getUpstreams()) {
edgesToAdd.add(new Edge(downstream, upstream, DOWNSTREAM_OF));
Set<String> relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(downstream, new HashSet<>());
relationshipTypes.add(DOWNSTREAM_OF);
urnToRelationshipTypesBeingAdded.put(downstream, relationshipTypes);
}
}
}
}
}
private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypesFromAspect(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) {
final List<Edge> edgesToAdd = new ArrayList<>();
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = new HashMap<>();
if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) {
// we need to manually set schemaField <-> schemaField edges for fineGrainedLineage since
// @Relationship only links between the parent entity urn and something else.
updateFineGrainedEdgesAndRelationships(aspect, edgesToAdd, urnToRelationshipTypesBeingAdded);
}
Map<RelationshipFieldSpec, List<Object>> extractedFields =
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs());
for (Map.Entry<RelationshipFieldSpec, List<Object>> entry : extractedFields.entrySet()) {
relationshipTypesBeingAdded.add(entry.getKey().getRelationshipName());
Set<String> relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(urn, new HashSet<>());
relationshipTypes.add(entry.getKey().getRelationshipName());
urnToRelationshipTypesBeingAdded.put(urn, relationshipTypes);
for (Object fieldValue : entry.getValue()) {
try {
edgesToAdd.add(
@ -170,23 +206,25 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
}
}
}
return Pair.of(edgesToAdd, relationshipTypesBeingAdded);
return Pair.of(edgesToAdd, urnToRelationshipTypesBeingAdded);
}
/**
* Process snapshot and update graph index
*/
private void updateGraphService(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) {
Pair<List<Edge>, Set<String>> edgeAndRelationTypes =
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect);
final List<Edge> edgesToAdd = edgeAndRelationTypes.getFirst();
final Set<String> relationshipTypesBeingAdded = edgeAndRelationTypes.getSecond();
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = edgeAndRelationTypes.getSecond();
log.debug("Here's the relationship types found {}", relationshipTypesBeingAdded);
if (relationshipTypesBeingAdded.size() > 0) {
_graphService.removeEdgesFromNode(urn, new ArrayList<>(relationshipTypesBeingAdded),
newRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING));
log.debug("Here's the relationship types found {}", urnToRelationshipTypesBeingAdded);
if (urnToRelationshipTypesBeingAdded.size() > 0) {
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
_graphService.removeEdgesFromNode(entry.getKey(), new ArrayList<>(entry.getValue()),
newRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING));
}
edgesToAdd.forEach(edge -> _graphService.addEdge(edge));
}
}
@ -260,13 +298,15 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
return;
}
Pair<List<Edge>, Set<String>> edgeAndRelationTypes =
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect);
final Set<String> relationshipTypesBeingAdded = edgeAndRelationTypes.getSecond();
if (relationshipTypesBeingAdded.size() > 0) {
_graphService.removeEdgesFromNode(urn, new ArrayList<>(relationshipTypesBeingAdded),
createRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING));
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = edgeAndRelationTypes.getSecond();
if (urnToRelationshipTypesBeingAdded.size() > 0) {
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
_graphService.removeEdgesFromNode(entry.getKey(), new ArrayList<>(entry.getValue()),
createRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING));
}
}
}

View File

@ -17,9 +17,16 @@ record LineageRelationship {
*/
entity: Urn
/**
* Optional list of entities between the source and destination node.
* There can be multiple paths from the source to the destination.
*/
paths: array[array[Urn]] = []
/**
* Optional list of entities between the source and destination node
*/
@deprecated
path: array[Urn] = []
/**

View File

@ -7,9 +7,16 @@ import com.linkedin.common.Urn
*/
record LineageSearchEntity includes SearchEntity {
/**
* Optional list of entities between the source and destination node.
* There can be multiple paths from the source to the destination.
*/
paths: array[array[Urn]] = []
/**
* Optional list of entities between the source and destination node
*/
@deprecated
path: array[Urn] = []
/**

View File

@ -5515,13 +5515,25 @@
} ]
} ],
"fields" : [ {
"name" : "paths",
"type" : {
"type" : "array",
"items" : {
"type" : "array",
"items" : "com.linkedin.common.Urn"
}
},
"doc" : "Optional list of entities between the source and destination node.\nThere can be multiple paths from the source to the destination.",
"default" : [ [ ] ]
}, {
"name" : "path",
"type" : {
"type" : "array",
"items" : "com.linkedin.common.Urn"
},
"doc" : "Optional list of entities between the source and destination node",
"default" : [ ]
"default" : [ ],
"deprecated" : true
}, {
"name" : "degree",
"type" : "int",

View File

@ -112,6 +112,17 @@
"name" : "entity",
"type" : "com.linkedin.common.Urn",
"doc" : "Entity that is related via lineage"
}, {
"name" : "paths",
"type" : {
"type" : "array",
"items" : {
"type" : "array",
"items" : "com.linkedin.common.Urn"
}
},
"doc" : "Optional list of entities between the source and destination node.\nThere can be multiple paths from the source to the destination.",
"default" : [ [ ] ]
}, {
"name" : "path",
"type" : {
@ -119,7 +130,8 @@
"items" : "com.linkedin.common.Urn"
},
"doc" : "Optional list of entities between the source and destination node",
"default" : [ ]
"default" : [ ],
"deprecated" : true
}, {
"name" : "degree",
"type" : "int",

View File

@ -55,4 +55,29 @@ describe("impact analysis", () => {
cy.contains("User Creations").should("not.exist");
cy.contains("User Deletions");
});
it("can view column level impact analysis and turn it off", () => {
cy.login();
cy.visit(
"/dataset/urn:li:dataset:(urn:li:dataPlatform:kafka,SampleCypressKafkaDataset,PROD)/Lineage?column=%5Bversion%3D2.0%5D.%5Btype%3Dboolean%5D.field_bar&is_lineage_mode=false"
);
// impact analysis can take a beat- don't want to time out here
cy.wait(5000);
cy.contains("SampleCypressHdfsDataset");
cy.contains("Downstream column: shipment_info");
cy.contains("some-cypress-feature-1").should("not.exist");
cy.contains("Baz Chart 1").should("not.exist");
// find button to turn off column-level impact analysis
cy.get('[data-testid="column-lineage-toggle"]').click({ force: true });
cy.wait(2000);
cy.contains("SampleCypressHdfsDataset");
cy.contains("Downstream column: shipment_info").should("not.exist");
cy.contains("some-cypress-feature-1");
cy.contains("Baz Chart 1");
});
});

View File

@ -189,6 +189,19 @@
"dataset": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleCypressKafkaDataset,PROD)",
"type": "TRANSFORMED"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleCypressKafkaDataset,PROD),field_bar)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),shipment_info)"
],
"confidenceScore": 1.0
}
]
}
},