Pipeline lineage edge UI (#5891)

Pipeline lineage edge UI (#5891)
This commit is contained in:
Onkar Ravgan 2022-07-17 21:55:37 +05:30 committed by GitHub
parent 98304683c9
commit 4d898a120f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 685 additions and 173 deletions

View File

@ -13,8 +13,6 @@
package org.openmetadata.catalog.jdbi3;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -65,6 +63,16 @@ public class LineageRepository {
EntityReference to = addLineage.getEdge().getToEntity();
to = Entity.getEntityReferenceById(to.getType(), to.getId(), Include.NON_DELETED);
if (addLineage.getEdge().getLineageDetails() != null
&& addLineage.getEdge().getLineageDetails().getPipeline() != null) {
// Validate pipeline entity
EntityReference pipeline = addLineage.getEdge().getLineageDetails().getPipeline();
pipeline = Entity.getEntityReferenceById(pipeline.getType(), pipeline.getId(), Include.NON_DELETED);
// Add pipeline entity details to lineage details
addLineage.getEdge().getLineageDetails().withPipeline(pipeline);
}
// Validate lineage details
String detailsJson = validateLineageDetails(from, to, addLineage.getEdge().getLineageDetails());
@ -75,7 +83,7 @@ public class LineageRepository {
private String validateLineageDetails(EntityReference from, EntityReference to, LineageDetails details)
throws IOException {
if (details == null || listOrEmpty(details.getColumnsLineage()).isEmpty()) {
if (details == null) {
return null;
}
@ -86,17 +94,19 @@ public class LineageRepository {
Table fromTable = dao.tableDAO().findEntityById(from.getId());
Table toTable = dao.tableDAO().findEntityById(to.getId());
for (ColumnLineage columnLineage : columnsLineage) {
for (String fromColumn : columnLineage.getFromColumns()) {
// From column belongs to the fromNode
if (fromColumn.startsWith(fromTable.getFullyQualifiedName())) {
TableRepository.validateColumnFQN(fromTable, fromColumn);
} else {
Table otherTable = dao.tableDAO().findEntityByName(FullyQualifiedName.getTableFQN(fromColumn));
TableRepository.validateColumnFQN(otherTable, fromColumn);
if (columnsLineage != null) {
for (ColumnLineage columnLineage : columnsLineage) {
for (String fromColumn : columnLineage.getFromColumns()) {
// From column belongs to the fromNode
if (fromColumn.startsWith(fromTable.getFullyQualifiedName())) {
TableRepository.validateColumnFQN(fromTable, fromColumn);
} else {
Table otherTable = dao.tableDAO().findEntityByName(FullyQualifiedName.getTableFQN(fromColumn));
TableRepository.validateColumnFQN(otherTable, fromColumn);
}
}
TableRepository.validateColumnFQN(toTable, columnLineage.getToColumn());
}
TableRepository.validateColumnFQN(toTable, columnLineage.getToColumn());
}
return JsonUtils.pojoToJson(details);
}

View File

@ -7,46 +7,45 @@
"javaType": "org.openmetadata.catalog.type.EntityLineage",
"definitions": {
"columnLineage": {
"type" : "object",
"type": "object",
"properties": {
"fromColumns" : {
"fromColumns": {
"description": "One or more source columns identified by fully qualified column name used by transformation function to create destination column.",
"type" : "array",
"items" : {
"$ref" : "../type/basic.json#/definitions/fullyQualifiedEntityName"
"type": "array",
"items": {
"$ref": "../type/basic.json#/definitions/fullyQualifiedEntityName"
}
},
"toColumn" : {
"toColumn": {
"description": "Destination column identified by fully qualified column name created by the transformation of source columns.",
"$ref" : "../type/basic.json#/definitions/fullyQualifiedEntityName"
"$ref": "../type/basic.json#/definitions/fullyQualifiedEntityName"
},
"function" : {
"function": {
"description": "Transformation function applied to source columns to create destination column. That is `function(fromColumns) -> toColumn`.",
"$ref" : "../type/basic.json#/definitions/sqlFunction"
"$ref": "../type/basic.json#/definitions/sqlFunction"
}
}
},
"lineageDetails" : {
"description" : "Lineage details including sqlQuery + pipeline + columnLineage.",
"type" : "object",
"lineageDetails": {
"description": "Lineage details including sqlQuery + pipeline + columnLineage.",
"type": "object",
"properties": {
"sqlQuery" : {
"sqlQuery": {
"description": "SQL used for transformation.",
"$ref" : "../type/basic.json#/definitions/sqlQuery"
"$ref": "../type/basic.json#/definitions/sqlQuery"
},
"columnsLineage" : {
"description" : "Lineage information of how upstream columns were combined to get downstream column.",
"type" : "array",
"items" : {
"$ref" : "#/definitions/columnLineage"
"columnsLineage": {
"description": "Lineage information of how upstream columns were combined to get downstream column.",
"type": "array",
"items": {
"$ref": "#/definitions/columnLineage"
}
},
"pipeline" : {
"pipeline": {
"description": "Pipeline where the sqlQuery is periodically run.",
"$ref" : "../type/entityReference.json"
"$ref": "../type/entityReference.json"
}
},
"required": ["sqlQuery", "columnsLineage"]
}
},
"edge": {
"description": "Edge in the lineage graph from one entity to another by entity IDs.",

View File

@ -1,25 +1,26 @@
[{
"from": { "fqn":"sample_data.ecommerce_db.shopify.raw_customer", "type": "table"},
"to": { "fqn":"sample_airflow.dim_address_etl", "type": "pipeline"}
},
{
"from": {"fqn":"sample_airflow.dim_address_etl", "type": "pipeline"},
"to": {"fqn":"sample_data.ecommerce_db.shopify.dim_address", "type": "table"}
"to": {"fqn":"sample_data.ecommerce_db.shopify.dim_address", "type": "table"},
"edge_meta": { "fqn":"sample_airflow.dim_address_etl", "type": "pipeline"}
},
{
"from": {"fqn":"sample_data.ecommerce_db.shopify.raw_order", "type": "table"},
"to": {"fqn":"sample_airflow.dim_product_etl", "type": "pipeline"}
"to": {"fqn":"sample_data.ecommerce_db.shopify.\"dim.product\"", "type": "table"},
"edge_meta": {"fqn":"sample_airflow.dim_product_etl", "type": "pipeline"}
},
{
"from": {"fqn":"sample_data.ecommerce_db.shopify.raw_order", "type": "table"},
"to": {"fqn":"sample_data.ecommerce_db.shopify.\"dim.product.variant\"", "type": "table"},
"edge_meta": {"fqn":"sample_airflow.dim_product_etl", "type": "pipeline"}
},
{
"from": {"fqn":"sample_data.ecommerce_db.shopify.raw_customer", "type": "table"},
"to": {"fqn":"sample_airflow.dim_product_etl", "type": "pipeline"}
"to": {"fqn":"sample_data.ecommerce_db.shopify.\"dim.product\"", "type": "table"},
"edge_meta": {"fqn":"sample_airflow.dim_product_etl", "type": "pipeline"}
},
{
"from": {"fqn":"sample_airflow.dim_product_etl", "type": "pipeline"},
"to": {"fqn":"sample_data.ecommerce_db.shopify.\"dim.product\"", "type": "table"}
},
{
"from": {"fqn": "sample_airflow.dim_product_etl", "type": "pipeline"},
"to": {"fqn":"sample_data.ecommerce_db.shopify.\"dim.product.variant\"", "type": "table"}
"from": {"fqn":"sample_data.ecommerce_db.shopify.raw_customer", "type": "table"},
"to": {"fqn":"sample_data.ecommerce_db.shopify.\"dim.product.variant\"", "type": "table"},
"edge_meta": {"fqn":"sample_airflow.dim_product_etl", "type": "pipeline"}
}
]

View File

@ -43,7 +43,7 @@ from metadata.generated.schema.entity.services.pipelineService import (
PipelineService,
PipelineServiceType,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.helpers import datetime_to_ts
@ -351,31 +351,27 @@ def parse_lineage(
airflow_service_entity=airflow_service_entity,
metadata=metadata,
)
lineage_details = LineageDetails(
pipeline=EntityReference(id=pipeline.id, type="pipeline")
)
operator.log.info("Parsing Lineage")
for table in inlets if inlets else []:
table_entity = metadata.get_by_name(entity=Table, fqn=table)
operator.log.debug(f"from entity {table_entity}")
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=table_entity.id, type="table"),
toEntity=EntityReference(id=pipeline.id, type="pipeline"),
for from_table in inlets if inlets else []:
from_entity = metadata.get_by_name(entity=Table, fqn=from_table)
operator.log.debug(f"from entity {from_entity}")
for to_table in outlets if outlets else []:
to_entity = metadata.get_by_name(entity=Table, fqn=to_table)
operator.log.debug(f"To entity {to_entity}")
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=from_entity.id, type="table"),
toEntity=EntityReference(id=to_entity.id, type="table"),
)
)
)
operator.log.debug(f"From lineage {lineage}")
metadata.add_lineage(lineage)
for table in outlets if outlets else []:
table_entity = metadata.get_by_name(entity=Table, fqn=table)
operator.log.debug(f"To entity {table_entity}")
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=pipeline.id, type="pipeline"),
toEntity=EntityReference(id=table_entity.id, type="table"),
)
)
operator.log.debug(f"To lineage {lineage}")
metadata.add_lineage(lineage)
if lineage_details:
lineage.edge.lineageDetails = lineage_details
operator.log.debug(f"Lineage {lineage}")
metadata.add_lineage(lineage)
return pipeline

View File

@ -63,7 +63,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.columnTest import ColumnTestCase
from metadata.generated.schema.tests.tableTest import TableTestCase
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
@ -552,8 +552,16 @@ class SampleDataSource(Source[Entity]):
for edge in self.lineage:
from_entity_ref = get_lineage_entity_ref(edge["from"], self.metadata_config)
to_entity_ref = get_lineage_entity_ref(edge["to"], self.metadata_config)
edge_entity_ref = get_lineage_entity_ref(
edge["edge_meta"], self.metadata_config
)
lineage_details = LineageDetails(pipeline=edge_entity_ref)
lineage = AddLineageRequest(
edge=EntitiesEdge(fromEntity=from_entity_ref, toEntity=to_entity_ref)
edge=EntitiesEdge(
fromEntity=from_entity_ref,
toEntity=to_entity_ref,
lineageDetails=lineage_details,
)
)
yield lineage

View File

@ -39,7 +39,7 @@ from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
@ -211,6 +211,10 @@ class AirbyteSource(Source[CreatePipelineRequest]):
if not source_service or not destination_service:
return
lineage_details = LineageDetails(
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline")
)
for task in connection.get("syncCatalog", {}).get("streams") or []:
stream = task.get("stream")
from_fqn = fqn.build(
@ -231,23 +235,21 @@ class AirbyteSource(Source[CreatePipelineRequest]):
service_name=destination_connection.get("name"),
)
if not from_fqn and not to_fqn:
continue
from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn)
to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn)
yield AddLineageRequest(
if not from_entity or not to_entity:
continue
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=from_entity.id, type="table"),
toEntity=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
)
yield AddLineageRequest(
edge=EntitiesEdge(
toEntity=EntityReference(id=to_entity.id, type="table"),
fromEntity=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
)
if lineage_details:
lineage.edge.lineageDetails = lineage_details
yield lineage
def next_record(self) -> Iterable[Entity]:
"""

View File

@ -47,7 +47,7 @@ from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
@ -290,48 +290,40 @@ class AirflowSource(Source[CreatePipelineRequest]):
:return: Lineage from inlets and outlets
"""
dag: SerializedDAG = serialized_dag.dag
lineage_details = LineageDetails(
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline")
)
for task in dag.tasks:
for table_fqn in self.get_inlets(task) or []:
table_entity: Table = self.metadata.get_by_name(
entity=Table, fqn=table_fqn
)
if table_entity:
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=table_entity.id, type="table"
),
toEntity=EntityReference(
id=pipeline_entity.id, type="pipeline"
),
)
)
for from_fqn in self.get_inlets(task) or []:
from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn)
if from_entity:
for to_fqn in self.get_outlets(task) or []:
to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn)
if to_entity:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id, type="table"
),
toEntity=EntityReference(
id=to_entity.id, type="table"
),
)
)
if lineage_details:
lineage.edge.lineageDetails = lineage_details
yield lineage
else:
logger.warn(
f"Could not find Table [{to_fqn}] from "
f"[{pipeline_entity.fullyQualifiedName.__root__}] outlets"
)
else:
logger.warn(
f"Could not find Table [{table_fqn}] from "
f"Could not find Table [{from_fqn}] from "
f"[{pipeline_entity.fullyQualifiedName.__root__}] inlets"
)
for table_fqn in self.get_outlets(task) or []:
table_entity: Table = self.metadata.get_by_name(
entity=Table, fqn=table_fqn
)
if table_entity:
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=pipeline_entity.id, type="pipeline"
),
toEntity=EntityReference(id=table_entity.id, type="table"),
)
)
else:
logger.warn(
f"Could not find Table [{table_fqn}] from "
f"[{pipeline_entity.fullyQualifiedName.__root__}] outlets"
)
def next_record(self) -> Iterable[Entity]:
"""
Extract metadata information to create Pipelines with Tasks

View File

@ -0,0 +1,71 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { render, screen } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import React from 'react';
import AddPipeLineModal from './AddPipeLineModal';
const mockProps = {
showAddPipelineModal: true,
pipelineSearchValue: '',
selectedPipelineId: undefined,
pipelineOptions: [
{
displayName: 'Pipeline 1',
name: 'Pipeline 1',
id: 'test-pipeline-1',
type: 'pipeline',
},
],
handleModalCancel: jest.fn(),
handleModalSave: jest.fn(),
onClear: jest.fn(),
handleRemoveEdgeClick: jest.fn(),
onSearch: jest.fn(),
onSelect: jest.fn(),
};
describe('Test CustomEdge Component', () => {
it('AddPipeLineModal should render properly', async () => {
render(<AddPipeLineModal {...mockProps} />);
const pipelineModal = await screen.findByTestId('add-pipeline-modal');
const fieldSelect = await screen.findByTestId('field-select');
const removeEdge = await screen.findByTestId('remove-edge-button');
const saveButton = await screen.findByTestId('save-button');
expect(pipelineModal).toBeInTheDocument();
expect(fieldSelect).toBeInTheDocument();
expect(removeEdge).toBeInTheDocument();
expect(saveButton).toBeInTheDocument();
});
it('CTA should work properly', async () => {
render(
<AddPipeLineModal {...mockProps} selectedPipelineId="test-pipeline-1" />
);
const removeEdge = await screen.findByTestId('remove-edge-button');
const saveButton = await screen.findByTestId('save-button');
expect(removeEdge).toBeInTheDocument();
expect(saveButton).toBeInTheDocument();
userEvent.click(removeEdge);
userEvent.click(saveButton);
expect(mockProps.handleRemoveEdgeClick).toHaveBeenCalled();
expect(mockProps.handleModalSave).toHaveBeenCalled();
});
});

View File

@ -0,0 +1,104 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Modal, Select } from 'antd';
import { isUndefined } from 'lodash';
import React from 'react';
import { EntityReference } from '../../generated/api/services/createPipelineService';
import { getEntityName } from '../../utils/CommonUtils';
import { Button } from '../buttons/Button/Button';
interface AddPipeLineModalType {
showAddPipelineModal: boolean;
pipelineSearchValue: string;
selectedPipelineId: string | undefined;
pipelineOptions: EntityReference[];
handleModalCancel: () => void;
handleModalSave: () => void;
onClear: () => void;
handleRemoveEdgeClick: (evt: React.MouseEvent<HTMLButtonElement>) => void;
onSearch: (value: string) => void;
onSelect: (value: string) => void;
}
const AddPipeLineModal = ({
showAddPipelineModal,
pipelineOptions,
pipelineSearchValue,
selectedPipelineId,
handleRemoveEdgeClick,
handleModalCancel,
handleModalSave,
onClear,
onSearch,
onSelect,
}: AddPipeLineModalType) => {
const Footer = () => {
return (
<div className="tw-justify-end" data-testid="footer">
<Button
className="tw-mr-2"
data-testid="remove-edge-button"
size="regular"
theme="primary"
variant="text"
onClick={handleRemoveEdgeClick}>
Remove Edge
</Button>
<Button
className="tw-h-8 tw-px-3 tw-py-2 tw-rounded-md"
data-testid="save-button"
size="custom"
theme="primary"
variant="contained"
onClick={handleModalSave}>
Save
</Button>
</div>
);
};
return (
<Modal
destroyOnClose
data-testid="add-pipeline-modal"
footer={<Footer />}
title={isUndefined(selectedPipelineId) ? 'Add Pipeline' : 'Edit Pipeline'}
visible={showAddPipelineModal}
onCancel={handleModalCancel}>
<Select
allowClear
showSearch
className="tw-w-full"
data-testid="field-select"
defaultActiveFirstOption={false}
filterOption={false}
notFoundContent={false}
options={pipelineOptions.map((option) => ({
label: getEntityName(option),
value: option.id,
}))}
placeholder="Search to Select Pipeline"
searchValue={pipelineSearchValue}
showArrow={false}
value={selectedPipelineId}
onClear={onClear}
onSearch={onSearch}
onSelect={onSelect}
/>
</Modal>
);
};
export default AddPipeLineModal;

View File

@ -11,15 +11,11 @@
* limitations under the License.
*/
import {
findAllByTestId,
findByTestId,
queryByTestId,
render,
} from '@testing-library/react';
import { render, screen } from '@testing-library/react';
import React from 'react';
import { EdgeProps, Position } from 'react-flow-renderer';
import { MemoryRouter } from 'react-router-dom';
import { EntityType } from '../../enums/entity.enum';
import { CustomEdge } from './CustomEdge.component';
jest.mock('../../constants/Lineage.constants', () => ({
@ -39,46 +35,74 @@ const mockCustomEdgeProp = {
data: {
source: 'node1',
target: 'node2',
sourceType: EntityType.TABLE,
targetType: EntityType.DASHBOARD,
onEdgeClick: jest.fn(),
selectedNode: {
id: 'node1',
},
isColumnLineage: false,
isEditMode: true,
},
selected: true,
} as EdgeProps;
describe('Test CustomEdge Component', () => {
it('Check if CustomEdge has all child elements', async () => {
const { container } = render(<CustomEdge {...mockCustomEdgeProp} />, {
render(<CustomEdge {...mockCustomEdgeProp} />, {
wrapper: MemoryRouter,
});
const deleteButton = await findByTestId(container, 'delete-button');
const edgePathElement = await findAllByTestId(
container,
const deleteButton = await screen.findByTestId('delete-button');
const edgePathElement = await screen.findAllByTestId(
'react-flow-edge-path'
);
const pipelineLabelAsEdge = screen.queryByTestId('pipeline-label');
expect(deleteButton).toBeInTheDocument();
expect(pipelineLabelAsEdge).not.toBeInTheDocument();
expect(edgePathElement).toHaveLength(edgePathElement.length);
});
it('Check if CustomEdge has selected as false', async () => {
const { container } = render(
<CustomEdge {...mockCustomEdgeProp} selected={false} />,
render(<CustomEdge {...mockCustomEdgeProp} selected={false} />, {
wrapper: MemoryRouter,
});
const edgePathElement = await screen.findAllByTestId(
'react-flow-edge-path'
);
const deleteButton = screen.queryByTestId('delete-button');
expect(deleteButton).not.toBeInTheDocument();
expect(edgePathElement).toHaveLength(edgePathElement.length);
});
it('Pipeline as edge should be visible', async () => {
render(
<CustomEdge
{...mockCustomEdgeProp}
data={{
...mockCustomEdgeProp.data,
targetType: EntityType.TABLE,
label: 'Pipeline',
pipeline: {
id: 'pipeline1',
type: 'pipeline-id',
},
}}
/>,
{
wrapper: MemoryRouter,
}
);
const edgePathElement = await findAllByTestId(
container,
'react-flow-edge-path'
);
const pipelineLabelAsEdge = await screen.findByTestId('pipeline-label');
const pipelineName = await screen.findByTestId('pipeline-name');
const deleteButton = queryByTestId(container, 'delete-button');
expect(deleteButton).not.toBeInTheDocument();
expect(edgePathElement).toHaveLength(edgePathElement.length);
expect(pipelineLabelAsEdge).toBeInTheDocument();
expect(pipelineName).toBeInTheDocument();
expect(pipelineName.textContent).toEqual('Pipeline');
});
});

View File

@ -13,8 +13,12 @@
import React, { Fragment } from 'react';
import { EdgeProps, getBezierPath, getEdgeCenter } from 'react-flow-renderer';
import { foreignObjectSize } from '../../constants/Lineage.constants';
import SVGIcons from '../../utils/SvgUtils';
import {
foreignObjectSize,
pipelineEdgeWidth,
} from '../../constants/Lineage.constants';
import { EntityType } from '../../enums/entity.enum';
import SVGIcons, { Icons } from '../../utils/SvgUtils';
import { CustomEdgeData } from './EntityLineage.interface';
export const CustomEdge = ({
@ -30,7 +34,7 @@ export const CustomEdge = ({
data,
selected,
}: EdgeProps) => {
const { onEdgeClick, ...rest } = data;
const { onEdgeClick, addPipelineClick, ...rest } = data;
const offset = 4;
const edgePath = getBezierPath({
@ -65,6 +69,12 @@ export const CustomEdge = ({
targetY,
});
const isTableToTableEdge = () => {
const { sourceType, targetType } = data;
return sourceType === EntityType.TABLE && targetType === EntityType.TABLE;
};
const getInvisiblePath = (path: string) => {
return (
<path
@ -90,26 +100,90 @@ export const CustomEdge = ({
/>
{getInvisiblePath(invisibleEdgePath)}
{getInvisiblePath(invisibleEdgePath1)}
{selected ? (
<foreignObject
data-testid="delete-button"
height={foreignObjectSize}
requiredExtensions="http://www.w3.org/1999/xhtml"
width={foreignObjectSize}
x={edgeCenterX - foreignObjectSize / offset}
y={edgeCenterY - foreignObjectSize / offset}>
<button
className="tw-cursor-pointer tw-flex tw-z-9999"
onClick={(event) => onEdgeClick?.(event, rest as CustomEdgeData)}>
<SVGIcons
alt="times-circle"
icon="icon-times-circle"
width="16px"
/>
</button>
</foreignObject>
) : null}
{!data.isColumnLineage && isTableToTableEdge() ? (
data.label ? (
<foreignObject
data-testid="pipeline-label"
height={foreignObjectSize}
requiredExtensions="http://www.w3.org/1999/xhtml"
width={pipelineEdgeWidth}
x={edgeCenterX - pipelineEdgeWidth / 2}
y={edgeCenterY - foreignObjectSize / 2}>
<body
onClick={(event) =>
data.isEditMode &&
addPipelineClick?.(event, rest as CustomEdgeData)
}>
<div className="tw-flex-center tw-bg-body-main tw-gap-2 tw-border tw-rounded tw-p-2">
<div className="tw-flex tw-items-center tw-gap-2">
<SVGIcons
alt="times-circle"
icon={Icons.PIPELINE_GREY}
width="14px"
/>
<span data-testid="pipeline-name">{data.label}</span>
</div>
{data.isEditMode && (
<button className="tw-cursor-pointer tw-flex tw-z-9999">
<SVGIcons
alt="times-circle"
icon={Icons.EDIT_OUTLINE_PRIMARY}
width="16px"
/>
</button>
)}
</div>
</body>
</foreignObject>
) : (
selected &&
data.isEditMode && (
<foreignObject
data-testid="add-pipeline"
height={foreignObjectSize}
requiredExtensions="http://www.w3.org/1999/xhtml"
width={foreignObjectSize}
x={edgeCenterX - foreignObjectSize / offset}
y={edgeCenterY - foreignObjectSize / offset}>
<button
className="tw-cursor-pointer tw-flex tw-z-9999"
style={{
transform: 'rotate(45deg)',
}}
onClick={(event) =>
addPipelineClick?.(event, rest as CustomEdgeData)
}>
<SVGIcons
alt="times-circle"
icon="icon-times-circle"
width="16px"
/>
</button>
</foreignObject>
)
)
) : (
selected &&
data.isEditMode && (
<foreignObject
data-testid="delete-button"
height={foreignObjectSize}
requiredExtensions="http://www.w3.org/1999/xhtml"
width={foreignObjectSize}
x={edgeCenterX - foreignObjectSize / offset}
y={edgeCenterY - foreignObjectSize / offset}>
<button
className="tw-cursor-pointer tw-flex tw-z-9999"
onClick={(event) => onEdgeClick?.(event, rest as CustomEdgeData)}>
<SVGIcons
alt="times-circle"
icon="icon-times-circle"
width="16px"
/>
</button>
</foreignObject>
)
)}
</Fragment>
);
};

View File

@ -49,18 +49,27 @@ import ReactFlow, {
useNodesState,
} from 'react-flow-renderer';
import { useAuthContext } from '../../authentication/auth-provider/AuthProvider';
import { getSuggestions } from '../../axiosAPIs/miscAPI';
import { getTableDetails } from '../../axiosAPIs/tableAPI';
import { ELEMENT_DELETE_STATE } from '../../constants/Lineage.constants';
import { EntityType } from '../../enums/entity.enum';
import { SearchIndex } from '../../enums/search.enum';
import {
AddLineage,
ColumnLineage,
} from '../../generated/api/lineage/addLineage';
import { Column } from '../../generated/entity/data/table';
import { Operation } from '../../generated/entity/policies/accessControl/rule';
import { EntityLineage } from '../../generated/type/entityLineage';
import {
EntityLineage,
LineageDetails,
} from '../../generated/type/entityLineage';
import { EntityReference } from '../../generated/type/entityReference';
import { withLoader } from '../../hoc/withLoader';
import { useAuth } from '../../hooks/authHooks';
import jsonData from '../../jsons/en';
import { formatDataResponse as formatPipelineData } from '../../utils/APIUtils';
import { getEntityName } from '../../utils/CommonUtils';
import {
dragHandle,
getColumnType,
@ -84,6 +93,7 @@ import NonAdminAction from '../common/non-admin-action/NonAdminAction';
import EntityInfoDrawer from '../EntityInfoDrawer/EntityInfoDrawer.component';
import Loader from '../Loader/Loader';
import ConfirmationModal from '../Modals/ConfirmationModal/ConfirmationModal';
import AddPipeLineModal from './AddPipeLineModal';
import CustomControls, { ControlButton } from './CustomControls.component';
import { CustomEdge } from './CustomEdge.component';
import CustomNode from './CustomNode.component';
@ -133,10 +143,17 @@ const Entitylineage: FunctionComponent<EntityLineageProp> = ({
const [confirmDelete, setConfirmDelete] = useState<boolean>(false);
const [showdeleteModal, setShowDeleteModal] = useState<boolean>(false);
const [showAddPipelineModal, setShowAddPipelineModal] =
useState<boolean>(false);
const [pipelineSearchValue, setPipelineSearchValue] = useState<string>('');
const [pipelineOptions, setPipelineOptions] = useState<EntityReference[]>([]);
const [selectedEdge, setSelectedEdge] = useState<SelectedEdge>(
{} as SelectedEdge
);
const [selectedPipelineId, setSelectedPipelineId] = useState<
string | undefined
>();
const [loading, setLoading] = useState<boolean>(false);
const [status, setStatus] = useState<LoadingState>('initial');
@ -454,6 +471,32 @@ const Entitylineage: FunctionComponent<EntityLineageProp> = ({
});
};
const addPipelineClick = (
evt: React.MouseEvent<HTMLButtonElement>,
data: CustomEdgeData
) => {
setShowAddPipelineModal(true);
evt.stopPropagation();
if (!isUndefined(data.pipeline)) {
setSelectedPipelineId(data.pipeline.id);
setPipelineOptions([data.pipeline]);
}
setSelectedEdge({
id: data.id,
source: {} as EntityReference,
target: {} as EntityReference,
data,
});
};
const handleRemoveEdgeClick = (evt: React.MouseEvent<HTMLButtonElement>) => {
setShowAddPipelineModal(false);
if (selectedEdge.data) {
onEdgeClick(evt, selectedEdge.data);
}
};
/**
* Reset State between view and edit mode toggle
*/
@ -484,7 +527,8 @@ const Entitylineage: FunctionComponent<EntityLineageProp> = ({
// eslint-disable-next-line @typescript-eslint/no-use-before-define
removeNodeHandler,
tableColumnsRef.current,
currentData
currentData,
addPipelineClick
) as CustomeElement;
uniqueElements = {
@ -658,6 +702,7 @@ const Entitylineage: FunctionComponent<EntityLineageProp> = ({
});
}
newEdge.edge.lineageDetails = {
...currentEdge,
sqlQuery: currentEdge.sqlQuery || '',
columnsLineage: updatedColumnsLineage,
};
@ -670,7 +715,7 @@ const Entitylineage: FunctionComponent<EntityLineageProp> = ({
target: target || '',
sourceHandle: sourceHandle,
targetHandle: targetHandle,
type: isEditMode ? 'buttonedge' : 'custom',
type: 'buttonedge',
markerEnd: {
type: MarkerType.ArrowClosed,
},
@ -683,6 +728,7 @@ const Entitylineage: FunctionComponent<EntityLineageProp> = ({
sourceType: sourceNode?.type,
targetType: targetNode?.type,
isColumnLineage: true,
isEditMode,
onEdgeClick,
},
};
@ -696,7 +742,7 @@ const Entitylineage: FunctionComponent<EntityLineageProp> = ({
id: `edge-${params.source}-${params.target}`,
source: `${params.source}`,
target: `${params.target}`,
type: isEditMode ? 'buttonedge' : 'custom',
type: 'buttonedge',
style: { strokeWidth: '2px' },
markerEnd: {
type: MarkerType.ArrowClosed,
@ -708,7 +754,9 @@ const Entitylineage: FunctionComponent<EntityLineageProp> = ({
sourceType: sourceNode?.type,
targetType: targetNode?.type,
isColumnLineage: false,
isEditMode,
onEdgeClick,
addPipelineClick,
},
};
@ -1286,6 +1334,140 @@ const Entitylineage: FunctionComponent<EntityLineageProp> = ({
});
};
const handlePipelineSelection = (value: string) => {
setSelectedPipelineId(value);
};
const handleModalCancel = () => {
setSelectedPipelineId(undefined);
setShowAddPipelineModal(false);
setSelectedEdge({} as SelectedEdge);
setPipelineOptions([]);
};
const onPipelineSelectionClear = () => {
setSelectedPipelineId(undefined);
setPipelineSearchValue('');
};
const handleModalSave = () => {
if (selectedEdge.data) {
setStatus('waiting');
setLoading(true);
const { source, sourceType, target, targetType } = selectedEdge.data;
const allEdge = [
...(updatedLineageData.upstreamEdges || []),
...(updatedLineageData.downstreamEdges || []),
];
const selectedEdgeValue = allEdge.find(
(ed) => ed.fromEntity === source && ed.toEntity === target
);
const pipelineDetail = pipelineOptions.find(
(d) => d.id === selectedPipelineId
);
const updatedLineageDetails: LineageDetails = {
...selectedEdgeValue?.lineageDetails,
sqlQuery: selectedEdgeValue?.lineageDetails?.sqlQuery || '',
columnsLineage: selectedEdgeValue?.lineageDetails?.columnsLineage || [],
pipeline: isUndefined(selectedPipelineId)
? undefined
: {
id: selectedPipelineId,
type: EntityType.PIPELINE,
},
};
const newEdge: AddLineage = {
edge: {
fromEntity: {
id: source,
type: sourceType,
},
toEntity: {
id: target,
type: targetType,
},
lineageDetails: updatedLineageDetails,
},
};
const getUpdatedStreamEdge = (
streamEdges: EntityLineage['downstreamEdges']
) => {
if (isUndefined(streamEdges)) {
return [];
}
return streamEdges.map((ed) => {
if (ed.fromEntity === source && ed.toEntity === target) {
return {
...ed,
lineageDetails: {
...updatedLineageDetails,
pipeline: !isUndefined(updatedLineageDetails.pipeline)
? {
displayName: pipelineDetail?.displayName,
name: pipelineDetail?.name,
...updatedLineageDetails.pipeline,
}
: undefined,
},
};
}
return ed;
});
};
addLineageHandler(newEdge)
.then(() => {
setStatus('success');
setLoading(false);
setUpdatedLineageData((pre) => {
const newData = {
...pre,
downstreamEdges: getUpdatedStreamEdge(pre.downstreamEdges),
upstreamEdges: getUpdatedStreamEdge(pre.upstreamEdges),
};
return newData;
});
setEdges((pre) => {
return pre.map((edge) => {
if (edge.id === selectedEdge.id) {
return {
...edge,
animated: true,
data: {
...edge.data,
label: getEntityName(pipelineDetail),
pipeline: updatedLineageDetails.pipeline,
},
};
}
return edge;
});
});
setTimeout(() => {
setStatus('initial');
}, 100);
setNewAddedNode({} as Node);
setSelectedEntity({} as EntityReference);
})
.catch(() => {
setStatus('initial');
setLoading(false);
})
.finally(() => {
handleModalCancel();
});
}
};
useEffect(() => {
if (!deleted && !isEmpty(updatedLineageData)) {
setElementsHandle(updatedLineageData);
@ -1313,6 +1495,28 @@ const Entitylineage: FunctionComponent<EntityLineageProp> = ({
onEntitySelect();
}, [selectedEntity]);
useEffect(() => {
if (pipelineSearchValue) {
getSuggestions(pipelineSearchValue, SearchIndex.PIPELINE)
.then((res: AxiosResponse) => {
if (res.data) {
const data: EntityReference[] = formatPipelineData(
res.data.suggest['metadata-suggest'][0].options
);
setPipelineOptions(data);
} else {
throw jsonData['api-error-messages']['unexpected-server-response'];
}
})
.catch((err: AxiosError) => {
showErrorToast(
err,
jsonData['api-error-messages']['fetch-suggestions-error']
);
});
}
}, [pipelineSearchValue]);
useEffect(() => {
if (selectedEdge.data?.isColumnLineage) {
removeColumnEdge(selectedEdge, confirmDelete);
@ -1381,6 +1585,18 @@ const Entitylineage: FunctionComponent<EntityLineageProp> = ({
{getEntityDrawer()}
<EntityLineageSidebar newAddedNode={newAddedNode} show={isEditMode} />
{getConfirmationModal()}
<AddPipeLineModal
handleModalCancel={handleModalCancel}
handleModalSave={handleModalSave}
handleRemoveEdgeClick={handleRemoveEdgeClick}
pipelineOptions={pipelineOptions}
pipelineSearchValue={pipelineSearchValue}
selectedPipelineId={selectedPipelineId}
showAddPipelineModal={showAddPipelineModal}
onClear={onPipelineSelectionClear}
onSearch={(value) => setPipelineSearchValue(value)}
onSelect={handlePipelineSelection}
/>
</div>
);
};

View File

@ -64,6 +64,8 @@ export interface EdgeData {
export interface CustomEdgeData {
id: string;
source: string;
label?: string;
pipeline?: EntityReference;
target: string;
sourceType: string;
targetType: string;

View File

@ -5,6 +5,8 @@ import { EntityType } from '../enums/entity.enum';
export const foreignObjectSize = 40;
export const zoomValue = 1;
export const pipelineEdgeWidth = 200;
export const entityData = [
{
type: EntityType.TABLE,
@ -19,8 +21,8 @@ export const entityData = [
export const positionX = 150;
export const positionY = 60;
export const nodeWidth = 400;
export const nodeHeight = 50;
export const nodeWidth = 600;
export const nodeHeight = 70;
export const ELEMENT_DELETE_STATE = {
loading: false,

View File

@ -53,6 +53,7 @@ import { Column } from '../generated/entity/data/table';
import { EntityLineage } from '../generated/type/entityLineage';
import { EntityReference } from '../generated/type/entityReference';
import {
getEntityName,
getPartialNameFromFQN,
getPartialNameFromTableFQN,
prepareLabel,
@ -172,7 +173,11 @@ export const getLineageData = (
) => void,
removeNodeHandler: (node: Node) => void,
columns: { [key: string]: Column[] },
currentData: { nodes: Node[]; edges: Edge[] }
currentData: { nodes: Node[]; edges: Edge[] },
addPipelineClick?: (
evt: React.MouseEvent<HTMLButtonElement>,
data: CustomEdgeData
) => void
) => {
const [x, y] = [0, 0];
const nodes = [...(entityLineage['nodes'] || []), entityLineage['entity']];
@ -199,7 +204,7 @@ export const getLineageData = (
target: edge.toEntity,
targetHandle: toColumn,
sourceHandle: fromColumn,
type: isEditMode ? edgeType : 'custom',
type: edgeType,
markerEnd: {
type: MarkerType.ArrowClosed,
},
@ -209,6 +214,7 @@ export const getLineageData = (
target: edge.toEntity,
targetHandle: toColumn,
sourceHandle: fromColumn,
isEditMode,
onEdgeClick,
isColumnLineage: true,
},
@ -222,18 +228,23 @@ export const getLineageData = (
id: `edge-${edge.fromEntity}-${edge.toEntity}`,
source: `${edge.fromEntity}`,
target: `${edge.toEntity}`,
type: isEditMode ? edgeType : 'custom',
type: edgeType,
animated: !isUndefined(edge.lineageDetails?.pipeline),
style: { strokeWidth: '2px' },
markerEnd: {
type: MarkerType.ArrowClosed,
},
data: {
id: `edge-${edge.fromEntity}-${edge.toEntity}`,
label: getEntityName(edge.lineageDetails?.pipeline),
pipeline: edge.lineageDetails?.pipeline,
source: `${edge.fromEntity}`,
target: `${edge.toEntity}`,
sourceType: sourceType?.type,
targetType: targetType?.type,
isEditMode,
onEdgeClick,
addPipelineClick,
isColumnLineage: false,
},
});