mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-17 11:43:54 +00:00
get pipeline status from elastic search (#16089)
* get pipeline status from elastic search * fix cypress
This commit is contained in:
parent
7cc6015574
commit
818030b65d
@ -65,6 +65,7 @@ const EdgeInfoDrawer = ({
|
||||
const getEdgeInfo = () => {
|
||||
const { source, target, data } = edge;
|
||||
const { sourceHandle, targetHandle } = getColumnSourceTargetHandles(edge);
|
||||
const { pipeline, pipelineEntityType } = data?.edge ?? {};
|
||||
|
||||
let sourceData: Node | undefined, targetData: Node | undefined;
|
||||
nodes.forEach((node) => {
|
||||
@ -110,14 +111,12 @@ const EdgeInfoDrawer = ({
|
||||
},
|
||||
pipeline: {
|
||||
key: t('label.edge'),
|
||||
value: data?.edge?.pipeline
|
||||
? getEntityName(data?.edge?.pipeline)
|
||||
: undefined,
|
||||
value: pipeline ? getEntityName(pipeline) : undefined,
|
||||
link:
|
||||
data?.edge?.pipeline &&
|
||||
pipeline &&
|
||||
entityUtilClassBase.getEntityLink(
|
||||
data?.edge?.pipeline.type,
|
||||
data?.edge?.pipeline.fullyQualifiedName
|
||||
pipelineEntityType,
|
||||
pipeline.fullyQualifiedName
|
||||
),
|
||||
},
|
||||
functionInfo: {
|
||||
|
@ -14,13 +14,7 @@
|
||||
import Icon from '@ant-design/icons/lib/components/Icon';
|
||||
import { Button, Tag } from 'antd';
|
||||
import classNames from 'classnames';
|
||||
import React, {
|
||||
Fragment,
|
||||
useCallback,
|
||||
useEffect,
|
||||
useMemo,
|
||||
useRef,
|
||||
} from 'react';
|
||||
import React, { Fragment, useCallback, useMemo } from 'react';
|
||||
import { EdgeProps, getBezierPath } from 'reactflow';
|
||||
import { ReactComponent as FunctionIcon } from '../../../assets/svg/ic-function.svg';
|
||||
import { ReactComponent as IconTimesCircle } from '../../../assets/svg/ic-times-circle.svg';
|
||||
@ -61,15 +55,6 @@ export const LineageEdgeIcon = ({
|
||||
);
|
||||
};
|
||||
|
||||
function usePrevious<T>(value: T) {
|
||||
const ref = useRef<T>(value);
|
||||
useEffect(() => {
|
||||
ref.current = value;
|
||||
}, [value]);
|
||||
|
||||
return ref.current;
|
||||
}
|
||||
|
||||
export const CustomEdge = ({
|
||||
id,
|
||||
sourceX,
|
||||
@ -93,19 +78,18 @@ export const CustomEdge = ({
|
||||
} = data;
|
||||
const offset = 4;
|
||||
|
||||
const { fromEntity, toEntity, pipeline, pipelineEntityType } =
|
||||
data?.edge ?? {};
|
||||
|
||||
const {
|
||||
tracedNodes,
|
||||
tracedColumns,
|
||||
isEditMode,
|
||||
pipelineStatus,
|
||||
activeLayer,
|
||||
onAddPipelineClick,
|
||||
onColumnEdgeRemove,
|
||||
fetchPipelineStatus,
|
||||
} = useLineageProvider();
|
||||
|
||||
const prevActiveLayer = usePrevious(activeLayer);
|
||||
|
||||
const { theme } = useApplicationStore();
|
||||
|
||||
const isColumnHighlighted = useMemo(() => {
|
||||
@ -179,19 +163,18 @@ export const CustomEdge = ({
|
||||
};
|
||||
|
||||
const isColumnLineageAllowed =
|
||||
!isColumnLineage &&
|
||||
isPipelineEdgeAllowed(data.edge.fromEntity.type, data.edge.toEntity.type);
|
||||
!isColumnLineage && isPipelineEdgeAllowed(fromEntity.type, toEntity.type);
|
||||
|
||||
const hasLabel = useMemo(() => {
|
||||
if (isColumnLineage) {
|
||||
return false;
|
||||
}
|
||||
if (data.edge?.pipeline) {
|
||||
return getEntityName(data.edge?.pipeline);
|
||||
if (pipeline) {
|
||||
return getEntityName(pipeline);
|
||||
}
|
||||
|
||||
return false;
|
||||
}, [isColumnLineage, data]);
|
||||
}, [isColumnLineage, pipeline]);
|
||||
|
||||
const isSelectedEditMode = selected && isEditMode;
|
||||
const isSelected = selected;
|
||||
@ -210,8 +193,9 @@ export const CustomEdge = ({
|
||||
};
|
||||
|
||||
const currentPipelineStatus = useMemo(() => {
|
||||
const pipelineData = pipelineStatus[data.edge.pipeline?.fullyQualifiedName];
|
||||
if (pipelineData) {
|
||||
const isPipelineActiveNow = activeLayer.includes(LineageLayerView.PIPELINE);
|
||||
const pipelineData = pipeline?.pipelineStatus;
|
||||
if (pipelineData && isPipelineActiveNow) {
|
||||
switch (pipelineData.executionStatus) {
|
||||
case StatusType.Failed:
|
||||
return 'red';
|
||||
@ -220,11 +204,13 @@ export const CustomEdge = ({
|
||||
return 'amber';
|
||||
case StatusType.Successful:
|
||||
return 'green';
|
||||
default:
|
||||
return '';
|
||||
}
|
||||
} else {
|
||||
return '';
|
||||
}
|
||||
}, [data, pipelineStatus]);
|
||||
|
||||
return '';
|
||||
}, [pipeline, activeLayer]);
|
||||
|
||||
const blinkingClass = useMemo(() => {
|
||||
if (isPipelineRootNode && currentPipelineStatus) {
|
||||
@ -238,8 +224,7 @@ export const CustomEdge = ({
|
||||
|
||||
const getLineageEdgeIcon = useCallback(
|
||||
(icon: React.ReactNode, dataTestId: string, pipelineClass?: string) => {
|
||||
const pipelineData =
|
||||
pipelineStatus[data.edge.pipeline?.fullyQualifiedName];
|
||||
const pipelineData = pipeline?.pipelineStatus;
|
||||
|
||||
return (
|
||||
<LineageEdgeIcon offset={3} x={edgeCenterX} y={edgeCenterY}>
|
||||
@ -256,8 +241,8 @@ export const CustomEdge = ({
|
||||
/>
|
||||
) : (
|
||||
<EntityPopOverCard
|
||||
entityFQN={data.edge.pipeline?.fullyQualifiedName}
|
||||
entityType={data.edge.pipeline?.type}
|
||||
entityFQN={pipeline?.fullyQualifiedName}
|
||||
entityType={pipelineEntityType}
|
||||
extraInfo={
|
||||
pipelineData && (
|
||||
<Tag className={pipelineClass}>
|
||||
@ -280,15 +265,7 @@ export const CustomEdge = ({
|
||||
</LineageEdgeIcon>
|
||||
);
|
||||
},
|
||||
[
|
||||
edgeCenterX,
|
||||
edgeCenterY,
|
||||
rest,
|
||||
data,
|
||||
pipelineStatus,
|
||||
blinkingClass,
|
||||
isEditMode,
|
||||
]
|
||||
[edgeCenterX, edgeCenterY, rest, pipeline, blinkingClass, isEditMode]
|
||||
);
|
||||
|
||||
const getEditLineageIcon = useCallback(
|
||||
@ -335,22 +312,6 @@ export const CustomEdge = ({
|
||||
}
|
||||
}, [edge, isColumnLineage, sourceHandle, targetHandle]);
|
||||
|
||||
useEffect(() => {
|
||||
const wasPipelineAlreadyActive = prevActiveLayer?.includes(
|
||||
LineageLayerView.PIPELINE
|
||||
);
|
||||
const isPipelineActiveNow = activeLayer.includes(LineageLayerView.PIPELINE);
|
||||
|
||||
if (
|
||||
data.edge.pipeline &&
|
||||
data.edge.pipeline.type === EntityType.PIPELINE &&
|
||||
!wasPipelineAlreadyActive &&
|
||||
isPipelineActiveNow
|
||||
) {
|
||||
fetchPipelineStatus(data.edge.pipeline.fullyQualifiedName);
|
||||
}
|
||||
}, [data.edge.pipeline, activeLayer]);
|
||||
|
||||
return (
|
||||
<Fragment>
|
||||
<path
|
||||
|
@ -51,4 +51,5 @@ export interface EdgeDetails {
|
||||
sqlQuery?: string;
|
||||
columns?: ColumnLineage[];
|
||||
description?: string;
|
||||
pipelineEntityType?: EntityType.PIPELINE | EntityType.STORED_PROCEDURE;
|
||||
}
|
||||
|
@ -31,7 +31,6 @@ import {
|
||||
} from '../../components/Lineage/Lineage.interface';
|
||||
import { SourceType } from '../../components/SearchedData/SearchedData.interface';
|
||||
import { EntityType } from '../../enums/entity.enum';
|
||||
import { PipelineStatus } from '../../generated/entity/data/pipeline';
|
||||
import { EntityReference } from '../../generated/entity/type';
|
||||
|
||||
export interface LineageProviderProps {
|
||||
@ -70,7 +69,6 @@ export interface LineageContextType {
|
||||
upstreamDownstreamData: UpstreamDownstreamData;
|
||||
selectedColumn: string;
|
||||
expandAllColumns: boolean;
|
||||
pipelineStatus: Record<string, PipelineStatus>;
|
||||
activeLayer: LineageLayerView[];
|
||||
onInitReactFlow: (reactFlowInstance: ReactFlowInstance) => void;
|
||||
onPaneClick: () => void;
|
||||
@ -97,7 +95,6 @@ export interface LineageContextType {
|
||||
lineageConfig: LineageConfig
|
||||
) => void;
|
||||
onExportClick: () => void;
|
||||
fetchPipelineStatus: (pipelineFqn: string) => void;
|
||||
removeNodeHandler: (node: Node | NodeProps) => void;
|
||||
onColumnEdgeRemove: () => void;
|
||||
onAddPipelineClick: () => void;
|
||||
|
@ -69,7 +69,6 @@ import {
|
||||
EntityType,
|
||||
} from '../../enums/entity.enum';
|
||||
import { AddLineage } from '../../generated/api/lineage/addLineage';
|
||||
import { PipelineStatus } from '../../generated/entity/data/pipeline';
|
||||
import {
|
||||
ColumnLineage,
|
||||
EntityReference,
|
||||
@ -81,8 +80,6 @@ import {
|
||||
getLineageDataByFQN,
|
||||
updateLineageEdge,
|
||||
} from '../../rest/lineageAPI';
|
||||
import { getPipelineStatus } from '../../rest/pipelineAPI';
|
||||
import { getEpochMillisForPastDays } from '../../utils/date-time/DateTimeUtils';
|
||||
import {
|
||||
addLineageHandler,
|
||||
createEdges,
|
||||
@ -106,6 +103,7 @@ import {
|
||||
onLoad,
|
||||
removeLineageHandler,
|
||||
} from '../../utils/EntityLineageUtils';
|
||||
import { getEntityReferenceFromEntity } from '../../utils/EntityUtils';
|
||||
import { showErrorToast } from '../../utils/ToastUtils';
|
||||
import { useTourProvider } from '../TourProvider/TourProvider';
|
||||
import {
|
||||
@ -172,9 +170,6 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
|
||||
nodesPerLayer: 50,
|
||||
});
|
||||
const [queryFilter, setQueryFilter] = useState<string>('');
|
||||
const [pipelineStatus, setPipelineStatus] = useState<
|
||||
Record<string, PipelineStatus>
|
||||
>({});
|
||||
const [entityType, setEntityType] = useState('');
|
||||
const queryParams = new URLSearchParams(location.search);
|
||||
const isFullScreen = queryParams.get('fullscreen') === 'true';
|
||||
@ -364,26 +359,6 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
|
||||
[nodes, edges, lineageConfig, entityLineage, setEntityLineage, queryFilter]
|
||||
);
|
||||
|
||||
const fetchPipelineStatus = useCallback(async (pipelineFQN: string) => {
|
||||
try {
|
||||
const currentTime = Date.now();
|
||||
// past 1 day
|
||||
const startDay = getEpochMillisForPastDays(1);
|
||||
const response = await getPipelineStatus(pipelineFQN, {
|
||||
startTs: startDay,
|
||||
endTs: currentTime,
|
||||
});
|
||||
setPipelineStatus((prev) => {
|
||||
return {
|
||||
...prev,
|
||||
[pipelineFQN]: response.data[0],
|
||||
};
|
||||
});
|
||||
} catch (error) {
|
||||
// do not show toast error
|
||||
}
|
||||
}, []);
|
||||
|
||||
const handleLineageTracing = useCallback(
|
||||
(selectedNode: Node) => {
|
||||
const { normalEdge } = getClassifiedEdge(edges);
|
||||
@ -837,7 +812,12 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
|
||||
updatedColumns = getUpdatedColumnsFromEdge(params, currentEdge);
|
||||
|
||||
const lineageDetails: LineageDetails = {
|
||||
pipeline: currentEdge.pipeline,
|
||||
pipeline: currentEdge.pipeline
|
||||
? getEntityReferenceFromEntity(
|
||||
currentEdge.pipeline,
|
||||
currentEdge.pipelineEntityType ?? EntityType.PIPELINE
|
||||
)
|
||||
: undefined,
|
||||
columnsLineage: [],
|
||||
description: currentEdge?.description ?? '',
|
||||
sqlQuery: currentEdge?.sqlQuery,
|
||||
@ -871,7 +851,10 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
|
||||
setEntityLineage((pre) => {
|
||||
const newData = {
|
||||
...pre,
|
||||
nodes: uniqWith([pre.entity, ...allNodes], isEqual),
|
||||
nodes: uniqWith(
|
||||
[...(pre.entity ? [pre.entity] : []), ...allNodes],
|
||||
isEqual
|
||||
),
|
||||
edges: uniqWith(allEdges, isEqual),
|
||||
};
|
||||
|
||||
@ -963,6 +946,9 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
|
||||
|
||||
if (pipelineData) {
|
||||
existingEdge.pipeline = pipelineData;
|
||||
existingEdge.pipelineEntityType = pipelineData.type as
|
||||
| EntityType.PIPELINE
|
||||
| EntityType.STORED_PROCEDURE;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1101,9 +1087,13 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
|
||||
const redrawLineage = useCallback(
|
||||
(lineageData: EntityLineageResponse) => {
|
||||
const allNodes = uniqWith(
|
||||
[...(lineageData.nodes ?? []), lineageData.entity],
|
||||
[
|
||||
...(lineageData.nodes ?? []),
|
||||
...(lineageData.entity ? [lineageData.entity] : []),
|
||||
],
|
||||
isEqual
|
||||
);
|
||||
|
||||
const updatedNodes = createNodes(
|
||||
allNodes,
|
||||
lineageData.edges ?? [],
|
||||
@ -1215,7 +1205,6 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
|
||||
tracedNodes,
|
||||
tracedColumns,
|
||||
expandAllColumns,
|
||||
pipelineStatus,
|
||||
upstreamDownstreamData,
|
||||
init,
|
||||
activeLayer,
|
||||
@ -1234,7 +1223,6 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
|
||||
toggleColumnView,
|
||||
loadChildNodesHandler,
|
||||
fetchLineageData,
|
||||
fetchPipelineStatus,
|
||||
removeNodeHandler,
|
||||
onNodeClick,
|
||||
onEdgeClick,
|
||||
@ -1262,7 +1250,6 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
|
||||
tracedNodes,
|
||||
tracedColumns,
|
||||
expandAllColumns,
|
||||
pipelineStatus,
|
||||
upstreamDownstreamData,
|
||||
init,
|
||||
activeLayer,
|
||||
@ -1280,7 +1267,6 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
|
||||
updateEntityType,
|
||||
loadChildNodesHandler,
|
||||
fetchLineageData,
|
||||
fetchPipelineStatus,
|
||||
toggleColumnView,
|
||||
removeNodeHandler,
|
||||
onNodeClick,
|
||||
|
Loading…
x
Reference in New Issue
Block a user