import { CopyOutlined, DeleteOutlined, PlusOutlined, RedoOutlined } from '@ant-design/icons'; import React, { useEffect, useState } from 'react'; import * as QueryString from 'query-string'; import { useLocation } from 'react-router'; import { Button, Empty, Image, message, Modal, Pagination, Tooltip, Typography } from 'antd'; import styled from 'styled-components'; import cronstrue from 'cronstrue'; import { useCreateIngestionExecutionRequestMutation, useCreateIngestionSourceMutation, useDeleteIngestionSourceMutation, useListIngestionSourcesQuery, useUpdateIngestionSourceMutation, } from '../../../graphql/ingestion.generated'; import { Message } from '../../shared/Message'; import TabToolbar from '../../entity/shared/components/styled/TabToolbar'; import { IngestionSourceBuilderModal } from './builder/IngestionSourceBuilderModal'; import { StyledTable } from '../../entity/shared/components/styled/StyledTable'; import { IngestionSourceExecutionList } from './IngestionSourceExecutionList'; import { getExecutionRequestStatusDisplayColor, getExecutionRequestStatusDisplayText, getExecutionRequestStatusIcon, sourceTypeToIconUrl, } from './utils'; import { DEFAULT_EXECUTOR_ID, SourceBuilderState } from './builder/types'; import { UpdateIngestionSourceInput } from '../../../types.generated'; import { capitalizeFirstLetter } from '../../shared/textUtil'; import { SearchBar } from '../../search/SearchBar'; import { useEntityRegistry } from '../../useEntityRegistry'; const SourceContainer = styled.div``; const SourcePaginationContainer = styled.div` display: flex; justify-content: center; `; const PreviewImage = styled(Image)` max-height: 28px; width: auto; object-fit: contain; margin: 0px; background-color: transparent; `; const StatusContainer = styled.div` display: flex; justify-content: left; align-items: center; `; const ActionButtonContainer = styled.div` display: flex; justify-content: right; `; const DEFAULT_PAGE_SIZE = 25; const removeExecutionsFromIngestionSource = (source) => { if (source) { return { name: source.name, type: source.type, schedule: source.schedule, config: source.config, }; } return undefined; }; export const IngestionSourceList = () => { const entityRegistry = useEntityRegistry(); const location = useLocation(); const params = QueryString.parse(location.search, { arrayFormat: 'comma' }); const paramsQuery = (params?.query as string) || undefined; const [query, setQuery] = useState(undefined); useEffect(() => setQuery(paramsQuery), [paramsQuery]); const [page, setPage] = useState(1); const pageSize = DEFAULT_PAGE_SIZE; const start = (page - 1) * pageSize; const [isBuildingSource, setIsBuildingSource] = useState(false); const [focusSourceUrn, setFocusSourceUrn] = useState(undefined); const [lastRefresh, setLastRefresh] = useState(0); // Set of removed urns used to account for eventual consistency const [removedUrns, setRemovedUrns] = useState([]); // Ingestion Source Queries const { loading, error, data, refetch } = useListIngestionSourcesQuery({ variables: { input: { start, count: pageSize, query, }, }, }); const [createIngestionSource] = useCreateIngestionSourceMutation(); const [updateIngestionSource] = useUpdateIngestionSourceMutation(); // Execution Request queries const [createExecutionRequestMutation] = useCreateIngestionExecutionRequestMutation(); const [removeIngestionSourceMutation] = useDeleteIngestionSourceMutation(); const totalSources = data?.listIngestionSources?.total || 0; const sources = data?.listIngestionSources?.ingestionSources || []; const filteredSources = sources.filter((user) => !removedUrns.includes(user.urn)); const focusSource = (focusSourceUrn && filteredSources.find((source) => source.urn === focusSourceUrn)) || undefined; const onCreateOrUpdateIngestionSourceSuccess = () => { setTimeout(() => refetch(), 2000); setIsBuildingSource(false); setFocusSourceUrn(undefined); }; const createOrUpdateIngestionSource = (input: UpdateIngestionSourceInput, resetState: () => void) => { if (focusSourceUrn) { // Update: updateIngestionSource({ variables: { urn: focusSourceUrn as string, input } }) .then(() => { message.success({ content: `Successfully updated ingestion source!`, duration: 3, }); onCreateOrUpdateIngestionSourceSuccess(); resetState(); }) .catch((e) => { message.destroy(); message.error({ content: `Failed to update ingestion source!: \n ${e.message || ''}`, duration: 3, }); }); } else { // Create createIngestionSource({ variables: { input } }) .then(() => { setTimeout(() => refetch(), 2000); setIsBuildingSource(false); setFocusSourceUrn(undefined); resetState(); message.success({ content: `Successfully created ingestion source!`, duration: 3, }); // onCreateOrUpdateIngestionSourceSuccess(); }) .catch((e) => { message.destroy(); message.error({ content: `Failed to create ingestion source!: \n ${e.message || ''}`, duration: 3, }); }); } }; const onChangePage = (newPage: number) => { setPage(newPage); }; const onRefresh = () => { refetch(); // Used to force a re-render of the child execution request list. setLastRefresh(new Date().getMilliseconds()); }; const executeIngestionSource = (urn: string) => { createExecutionRequestMutation({ variables: { input: { ingestionSourceUrn: urn, }, }, }) .then(() => { message.success({ content: `Successfully submitted ingestion execution request!`, duration: 3, }); setInterval(() => onRefresh(), 3000); }) .catch((e) => { message.destroy(); message.error({ content: `Failed to submit ingestion execution request!: \n ${e.message || ''}`, duration: 3, }); }); }; const deleteIngestionSource = async (urn: string) => { removeIngestionSourceMutation({ variables: { urn }, }) .then(() => { message.success({ content: 'Removed ingestion source.', duration: 2 }); const newRemovedUrns = [...removedUrns, urn]; setRemovedUrns(newRemovedUrns); setTimeout(function () { refetch?.(); }, 3000); }) .catch((e: unknown) => { message.destroy(); if (e instanceof Error) { message.error({ content: `Failed to remove ingestion source: \n ${e.message || ''}`, duration: 3 }); } }); }; const onSubmit = (recipeBuilderState: SourceBuilderState, resetState: () => void) => { createOrUpdateIngestionSource( { type: recipeBuilderState.type as string, name: recipeBuilderState.name as string, config: { recipe: recipeBuilderState.config?.recipe as string, version: (recipeBuilderState.config?.version?.length && (recipeBuilderState.config?.version as string)) || undefined, executorId: (recipeBuilderState.config?.executorId?.length && (recipeBuilderState.config?.executorId as string)) || DEFAULT_EXECUTOR_ID, }, schedule: recipeBuilderState.schedule && { interval: recipeBuilderState.schedule?.interval as string, timezone: recipeBuilderState.schedule?.timezone as string, }, }, resetState, ); }; const onEdit = (urn: string) => { setIsBuildingSource(true); setFocusSourceUrn(urn); }; const onExecute = (urn: string) => { Modal.confirm({ title: `Confirm Source Execution`, content: "Click 'Execute' to run this ingestion source.", onOk() { executeIngestionSource(urn); }, onCancel() {}, okText: 'Execute', maskClosable: true, closable: true, }); }; const onDelete = (urn: string) => { Modal.confirm({ title: `Confirm Ingestion Source Removal`, content: `Are you sure you want to remove this ingestion source? Removing will terminate any scheduled ingestion runs.`, onOk() { deleteIngestionSource(urn); }, onCancel() {}, okText: 'Yes', maskClosable: true, closable: true, }); }; const onCancel = () => { setIsBuildingSource(false); setFocusSourceUrn(undefined); }; const tableColumns = [ { title: 'Type', dataIndex: 'type', key: 'type', render: (type: string) => { const iconUrl = sourceTypeToIconUrl(type); const typeDisplayName = capitalizeFirstLetter(type); return ( (iconUrl && ( )) || {typeDisplayName} ); }, }, { title: 'Name', dataIndex: 'name', key: 'name', render: (name: string) => name || '', }, { title: 'Schedule', dataIndex: 'schedule', key: 'schedule', render: (schedule: any, record: any) => { const tooltip = schedule && `Runs ${cronstrue.toString(schedule).toLowerCase()} (${record.timezone})`; return ( {schedule || 'None'} ); }, }, { title: 'Execution Count', dataIndex: 'execCount', key: 'execCount', render: (execCount: any) => { return {execCount || '0'}; }, }, { title: 'Last Execution', dataIndex: 'lastExecTime', key: 'lastExecTime', render: (time: any) => { const executionDate = time && new Date(time); const localTime = executionDate && `${executionDate.toLocaleDateString()} at ${executionDate.toLocaleTimeString()}`; return {localTime || 'N/A'}; }, }, { title: 'Last Status', dataIndex: 'lastExecStatus', key: 'lastExecStatus', render: (status: any) => { const Icon = getExecutionRequestStatusIcon(status); const text = getExecutionRequestStatusDisplayText(status); const color = getExecutionRequestStatusDisplayColor(status); return ( {Icon && } {text || 'N/A'} ); }, }, { title: '', dataIndex: '', key: 'x', render: (_, record: any) => ( ), }, ]; const tableData = filteredSources?.map((source) => ({ urn: source.urn, type: source.type, name: source.name, schedule: source.schedule?.interval, timezone: source.schedule?.timezone, execCount: source.executions?.total || 0, lastExecTime: source.executions?.total && source.executions?.total > 0 && source.executions?.executionRequests[0].result?.startTimeMs, lastExecStatus: source.executions?.total && source.executions?.total > 0 && source.executions?.executionRequests[0].result?.status, })); return ( <> {!data && loading && } {error && message.error({ content: `Failed to load ingestion sources! \n ${error.message || ''}`, duration: 3 })}
null} onQueryChange={(q) => setQuery(q)} entityRegistry={entityRegistry} />
, }} expandable={{ expandedRowRender: (record) => { return ( ); }, rowExpandable: (record) => { return record.execCount > 0; }, defaultExpandAllRows: false, indentSize: 0, }} pagination={false} />
); };