mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-06 16:49:03 +00:00
444 lines
16 KiB
TypeScript
444 lines
16 KiB
TypeScript
![]() |
import React, { useState } from 'react';
|
||
|
import { Button, Empty, Image, message, Modal, Pagination, Tooltip, Typography } from 'antd';
|
||
|
import styled from 'styled-components';
|
||
|
import cronstrue from 'cronstrue';
|
||
|
import { DeleteOutlined, PlusOutlined, RedoOutlined } from '@ant-design/icons';
|
||
|
import {
|
||
|
useCreateIngestionExecutionRequestMutation,
|
||
|
useCreateIngestionSourceMutation,
|
||
|
useDeleteIngestionSourceMutation,
|
||
|
useListIngestionSourcesQuery,
|
||
|
useUpdateIngestionSourceMutation,
|
||
|
} from '../../../graphql/ingestion.generated';
|
||
|
import { Message } from '../../shared/Message';
|
||
|
import TabToolbar from '../../entity/shared/components/styled/TabToolbar';
|
||
|
import { IngestionSourceBuilderModal } from './builder/IngestionSourceBuilderModal';
|
||
|
import { StyledTable } from '../../entity/shared/components/styled/StyledTable';
|
||
|
import { IngestionSourceExecutionList } from './IngestionSourceExecutionList';
|
||
|
import {
|
||
|
getExecutionRequestStatusDisplayColor,
|
||
|
getExecutionRequestStatusDisplayText,
|
||
|
getExecutionRequestStatusIcon,
|
||
|
sourceTypeToIconUrl,
|
||
|
} from './utils';
|
||
|
import { DEFAULT_EXECUTOR_ID, SourceBuilderState } from './builder/types';
|
||
|
import { UpdateIngestionSourceInput } from '../../../types.generated';
|
||
|
import { capitalizeFirstLetter } from '../../shared/textUtil';
|
||
|
|
||
|
const SourceContainer = styled.div``;
|
||
|
|
||
|
const SourcePaginationContainer = styled.div`
|
||
|
display: flex;
|
||
|
justify-content: center;
|
||
|
`;
|
||
|
|
||
|
const PreviewImage = styled(Image)`
|
||
|
max-height: 28px;
|
||
|
width: auto;
|
||
|
object-fit: contain;
|
||
|
margin: 0px;
|
||
|
background-color: transparent;
|
||
|
`;
|
||
|
|
||
|
const StatusContainer = styled.div`
|
||
|
display: flex;
|
||
|
justify-content: left;
|
||
|
align-items: center;
|
||
|
`;
|
||
|
|
||
|
const ActionButtonContainer = styled.div`
|
||
|
display: flex;
|
||
|
justify-content: right;
|
||
|
`;
|
||
|
const DEFAULT_PAGE_SIZE = 25;
|
||
|
|
||
|
const removeExecutionsFromIngestionSource = (source) => {
|
||
|
if (source) {
|
||
|
return {
|
||
|
name: source.name,
|
||
|
type: source.type,
|
||
|
schedule: source.schedule,
|
||
|
config: source.config,
|
||
|
};
|
||
|
}
|
||
|
return undefined;
|
||
|
};
|
||
|
|
||
|
export const IngestionSourceList = () => {
|
||
|
const [page, setPage] = useState(1);
|
||
|
|
||
|
const pageSize = DEFAULT_PAGE_SIZE;
|
||
|
const start = (page - 1) * pageSize;
|
||
|
|
||
|
const [isBuildingSource, setIsBuildingSource] = useState<boolean>(false);
|
||
|
const [focusSourceUrn, setFocusSourceUrn] = useState<undefined | string>(undefined);
|
||
|
const [lastRefresh, setLastRefresh] = useState(0);
|
||
|
// Set of removed urns used to account for eventual consistency
|
||
|
const [removedUrns, setRemovedUrns] = useState<string[]>([]);
|
||
|
|
||
|
// Ingestion Source Queries
|
||
|
const { loading, error, data, refetch } = useListIngestionSourcesQuery({
|
||
|
variables: {
|
||
|
input: {
|
||
|
start,
|
||
|
count: pageSize,
|
||
|
},
|
||
|
},
|
||
|
});
|
||
|
const [createIngestionSource] = useCreateIngestionSourceMutation();
|
||
|
const [updateIngestionSource] = useUpdateIngestionSourceMutation();
|
||
|
|
||
|
// Execution Request queries
|
||
|
const [createExecutionRequestMutation] = useCreateIngestionExecutionRequestMutation();
|
||
|
const [removeIngestionSourceMutation] = useDeleteIngestionSourceMutation();
|
||
|
|
||
|
const totalSources = data?.listIngestionSources?.total || 0;
|
||
|
const sources = data?.listIngestionSources?.ingestionSources || [];
|
||
|
const filteredSources = sources.filter((user) => !removedUrns.includes(user.urn));
|
||
|
const focusSource =
|
||
|
(focusSourceUrn && filteredSources.find((source) => source.urn === focusSourceUrn)) || undefined;
|
||
|
|
||
|
const onCreateOrUpdateIngestionSourceSuccess = () => {
|
||
|
setTimeout(() => refetch(), 2000);
|
||
|
setIsBuildingSource(false);
|
||
|
setFocusSourceUrn(undefined);
|
||
|
};
|
||
|
|
||
|
const createOrUpdateIngestionSource = (input: UpdateIngestionSourceInput, resetState: () => void) => {
|
||
|
if (focusSourceUrn) {
|
||
|
// Update:
|
||
|
updateIngestionSource({ variables: { urn: focusSourceUrn as string, input } })
|
||
|
.then(() => {
|
||
|
message.success({
|
||
|
content: `Successfully updated ingestion source!`,
|
||
|
duration: 3,
|
||
|
});
|
||
|
onCreateOrUpdateIngestionSourceSuccess();
|
||
|
resetState();
|
||
|
})
|
||
|
.catch((e) => {
|
||
|
message.destroy();
|
||
|
message.error({
|
||
|
content: `Failed to update ingestion source!: \n ${e.message || ''}`,
|
||
|
duration: 3,
|
||
|
});
|
||
|
});
|
||
|
} else {
|
||
|
// Create
|
||
|
createIngestionSource({ variables: { input } })
|
||
|
.then(() => {
|
||
|
setTimeout(() => refetch(), 2000);
|
||
|
setIsBuildingSource(false);
|
||
|
setFocusSourceUrn(undefined);
|
||
|
resetState();
|
||
|
message.success({
|
||
|
content: `Successfully created ingestion source!`,
|
||
|
duration: 3,
|
||
|
});
|
||
|
// onCreateOrUpdateIngestionSourceSuccess();
|
||
|
})
|
||
|
.catch((e) => {
|
||
|
message.destroy();
|
||
|
message.error({
|
||
|
content: `Failed to create ingestion source!: \n ${e.message || ''}`,
|
||
|
duration: 3,
|
||
|
});
|
||
|
});
|
||
|
}
|
||
|
};
|
||
|
|
||
|
const onChangePage = (newPage: number) => {
|
||
|
setPage(newPage);
|
||
|
};
|
||
|
|
||
|
const onRefresh = () => {
|
||
|
refetch();
|
||
|
// Used to force a re-render of the child execution request list.
|
||
|
setLastRefresh(new Date().getMilliseconds());
|
||
|
};
|
||
|
|
||
|
const executeIngestionSource = (urn: string) => {
|
||
|
createExecutionRequestMutation({
|
||
|
variables: {
|
||
|
input: {
|
||
|
ingestionSourceUrn: urn,
|
||
|
},
|
||
|
},
|
||
|
})
|
||
|
.then(() => {
|
||
|
message.success({
|
||
|
content: `Successfully submitted ingestion execution request!`,
|
||
|
duration: 3,
|
||
|
});
|
||
|
setInterval(() => onRefresh(), 3000);
|
||
|
})
|
||
|
.catch((e) => {
|
||
|
message.destroy();
|
||
|
message.error({
|
||
|
content: `Failed to submit ingestion execution request!: \n ${e.message || ''}`,
|
||
|
duration: 3,
|
||
|
});
|
||
|
});
|
||
|
};
|
||
|
|
||
|
const deleteIngestionSource = async (urn: string) => {
|
||
|
removeIngestionSourceMutation({
|
||
|
variables: { urn },
|
||
|
})
|
||
|
.then(() => {
|
||
|
message.success({ content: 'Removed ingestion source.', duration: 2 });
|
||
|
const newRemovedUrns = [...removedUrns, urn];
|
||
|
setRemovedUrns(newRemovedUrns);
|
||
|
setTimeout(function () {
|
||
|
refetch?.();
|
||
|
}, 3000);
|
||
|
})
|
||
|
.catch((e: unknown) => {
|
||
|
message.destroy();
|
||
|
if (e instanceof Error) {
|
||
|
message.error({ content: `Failed to remove ingestion source: \n ${e.message || ''}`, duration: 3 });
|
||
|
}
|
||
|
});
|
||
|
};
|
||
|
|
||
|
const onSubmit = (recipeBuilderState: SourceBuilderState, resetState: () => void) => {
|
||
|
createOrUpdateIngestionSource(
|
||
|
{
|
||
|
type: recipeBuilderState.type as string,
|
||
|
name: recipeBuilderState.name as string,
|
||
|
config: {
|
||
|
recipe: recipeBuilderState.config?.recipe as string,
|
||
|
version:
|
||
|
(recipeBuilderState.config?.version?.length &&
|
||
|
(recipeBuilderState.config?.version as string)) ||
|
||
|
undefined,
|
||
|
executorId:
|
||
|
(recipeBuilderState.config?.executorId?.length &&
|
||
|
(recipeBuilderState.config?.executorId as string)) ||
|
||
|
DEFAULT_EXECUTOR_ID,
|
||
|
},
|
||
|
schedule: recipeBuilderState.schedule && {
|
||
|
interval: recipeBuilderState.schedule?.interval as string,
|
||
|
timezone: recipeBuilderState.schedule?.timezone as string,
|
||
|
},
|
||
|
},
|
||
|
resetState,
|
||
|
);
|
||
|
};
|
||
|
|
||
|
const onEdit = (urn: string) => {
|
||
|
setIsBuildingSource(true);
|
||
|
setFocusSourceUrn(urn);
|
||
|
};
|
||
|
|
||
|
const onExecute = (urn: string) => {
|
||
|
Modal.confirm({
|
||
|
title: `Confirm Source Execution`,
|
||
|
content: "Click 'Execute' to run this ingestion source.",
|
||
|
onOk() {
|
||
|
executeIngestionSource(urn);
|
||
|
},
|
||
|
onCancel() {},
|
||
|
okText: 'Execute',
|
||
|
maskClosable: true,
|
||
|
closable: true,
|
||
|
});
|
||
|
};
|
||
|
|
||
|
const onDelete = (urn: string) => {
|
||
|
Modal.confirm({
|
||
|
title: `Confirm Ingestion Source Removal`,
|
||
|
content: `Are you sure you want to remove this ingestion source? Removing will terminate any scheduled ingestion runs.`,
|
||
|
onOk() {
|
||
|
deleteIngestionSource(urn);
|
||
|
},
|
||
|
onCancel() {},
|
||
|
okText: 'Yes',
|
||
|
maskClosable: true,
|
||
|
closable: true,
|
||
|
});
|
||
|
};
|
||
|
|
||
|
const onCancel = () => {
|
||
|
setIsBuildingSource(false);
|
||
|
setFocusSourceUrn(undefined);
|
||
|
};
|
||
|
|
||
|
const tableColumns = [
|
||
|
{
|
||
|
title: 'Type',
|
||
|
dataIndex: 'type',
|
||
|
key: 'type',
|
||
|
render: (type: string) => {
|
||
|
const iconUrl = sourceTypeToIconUrl(type);
|
||
|
const typeDisplayName = capitalizeFirstLetter(type);
|
||
|
return (
|
||
|
(iconUrl && (
|
||
|
<Tooltip overlay={typeDisplayName}>
|
||
|
<PreviewImage preview={false} src={iconUrl} alt={type || ''} />
|
||
|
</Tooltip>
|
||
|
)) || <Typography.Text strong>{typeDisplayName}</Typography.Text>
|
||
|
);
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
title: 'Name',
|
||
|
dataIndex: 'name',
|
||
|
key: 'name',
|
||
|
render: (name: string) => name || '',
|
||
|
},
|
||
|
{
|
||
|
title: 'Schedule',
|
||
|
dataIndex: 'schedule',
|
||
|
key: 'schedule',
|
||
|
render: (schedule: any, record: any) => {
|
||
|
const tooltip = schedule && `Runs ${cronstrue.toString(schedule).toLowerCase()} (${record.timezone})`;
|
||
|
return (
|
||
|
<Tooltip title={tooltip || 'Not scheduled'}>
|
||
|
<Typography.Text code>{schedule || 'None'}</Typography.Text>
|
||
|
</Tooltip>
|
||
|
);
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
title: 'Execution Count',
|
||
|
dataIndex: 'execCount',
|
||
|
key: 'execCount',
|
||
|
render: (execCount: any) => {
|
||
|
return <Typography.Text>{execCount || '0'}</Typography.Text>;
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
title: 'Last Execution',
|
||
|
dataIndex: 'lastExecTime',
|
||
|
key: 'lastExecTime',
|
||
|
render: (time: any) => {
|
||
|
const executionDate = time && new Date(time);
|
||
|
const localTime =
|
||
|
executionDate && `${executionDate.toLocaleDateString()} at ${executionDate.toLocaleTimeString()}`;
|
||
|
return <Typography.Text>{localTime || 'N/A'}</Typography.Text>;
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
title: 'Last Status',
|
||
|
dataIndex: 'lastExecStatus',
|
||
|
key: 'lastExecStatus',
|
||
|
render: (status: any) => {
|
||
|
const Icon = getExecutionRequestStatusIcon(status);
|
||
|
const text = getExecutionRequestStatusDisplayText(status);
|
||
|
const color = getExecutionRequestStatusDisplayColor(status);
|
||
|
return (
|
||
|
<StatusContainer>
|
||
|
{Icon && <Icon style={{ color }} />}
|
||
|
<Typography.Text strong style={{ color, marginLeft: 8 }}>
|
||
|
{text || 'N/A'}
|
||
|
</Typography.Text>
|
||
|
</StatusContainer>
|
||
|
);
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
title: '',
|
||
|
dataIndex: '',
|
||
|
key: 'x',
|
||
|
render: (_, record: any) => (
|
||
|
<ActionButtonContainer>
|
||
|
<Button style={{ marginRight: 16 }} onClick={() => onEdit(record.urn)}>
|
||
|
EDIT
|
||
|
</Button>
|
||
|
<Button
|
||
|
disabled={record.lastExecStatus === 'RUNNING'}
|
||
|
style={{ marginRight: 16 }}
|
||
|
onClick={() => onExecute(record.urn)}
|
||
|
>
|
||
|
EXECUTE
|
||
|
</Button>
|
||
|
|
||
|
<Button onClick={() => onDelete(record.urn)} type="text" shape="circle" danger>
|
||
|
<DeleteOutlined />
|
||
|
</Button>
|
||
|
</ActionButtonContainer>
|
||
|
),
|
||
|
},
|
||
|
];
|
||
|
|
||
|
const tableData = filteredSources?.map((source) => ({
|
||
|
urn: source.urn,
|
||
|
type: source.type,
|
||
|
name: source.name,
|
||
|
schedule: source.schedule?.interval,
|
||
|
timezone: source.schedule?.timezone,
|
||
|
execCount: source.executions?.total || 0,
|
||
|
lastExecTime:
|
||
|
source.executions?.total &&
|
||
|
source.executions?.total > 0 &&
|
||
|
source.executions?.executionRequests[0].result?.startTimeMs,
|
||
|
lastExecStatus:
|
||
|
source.executions?.total &&
|
||
|
source.executions?.total > 0 &&
|
||
|
source.executions?.executionRequests[0].result?.status,
|
||
|
}));
|
||
|
|
||
|
return (
|
||
|
<>
|
||
|
{!data && loading && <Message type="loading" content="Loading ingestion sources..." />}
|
||
|
{error &&
|
||
|
message.error({ content: `Failed to load ingestion sources! \n ${error.message || ''}`, duration: 3 })}
|
||
|
<SourceContainer>
|
||
|
<TabToolbar>
|
||
|
<div>
|
||
|
<Button type="text" onClick={() => setIsBuildingSource(true)}>
|
||
|
<PlusOutlined /> Create new source
|
||
|
</Button>
|
||
|
<Button type="text" onClick={onRefresh}>
|
||
|
<RedoOutlined /> Refresh
|
||
|
</Button>
|
||
|
</div>
|
||
|
</TabToolbar>
|
||
|
<StyledTable
|
||
|
columns={tableColumns}
|
||
|
dataSource={tableData}
|
||
|
rowKey="urn"
|
||
|
locale={{
|
||
|
emptyText: <Empty description="No Ingestion Sources!" image={Empty.PRESENTED_IMAGE_SIMPLE} />,
|
||
|
}}
|
||
|
expandable={{
|
||
|
expandedRowRender: (record) => {
|
||
|
return (
|
||
|
<IngestionSourceExecutionList
|
||
|
urn={record.urn}
|
||
|
lastRefresh={lastRefresh}
|
||
|
onRefresh={onRefresh}
|
||
|
/>
|
||
|
);
|
||
|
},
|
||
|
rowExpandable: (record) => {
|
||
|
return record.execCount > 0;
|
||
|
},
|
||
|
defaultExpandAllRows: false,
|
||
|
indentSize: 0,
|
||
|
}}
|
||
|
pagination={false}
|
||
|
/>
|
||
|
<SourcePaginationContainer>
|
||
|
<Pagination
|
||
|
style={{ margin: 40 }}
|
||
|
current={page}
|
||
|
pageSize={pageSize}
|
||
|
total={totalSources}
|
||
|
showLessItems
|
||
|
onChange={onChangePage}
|
||
|
showSizeChanger={false}
|
||
|
/>
|
||
|
</SourcePaginationContainer>
|
||
|
</SourceContainer>
|
||
|
<IngestionSourceBuilderModal
|
||
|
initialState={removeExecutionsFromIngestionSource(focusSource)}
|
||
|
visible={isBuildingSource}
|
||
|
onSubmit={onSubmit}
|
||
|
onCancel={onCancel}
|
||
|
/>
|
||
|
</>
|
||
|
);
|
||
|
};
|