import { PlusOutlined, RedoOutlined } from '@ant-design/icons'; import React, { useCallback, useEffect, useState } from 'react'; import * as QueryString from 'query-string'; import { useLocation } from 'react-router'; import { Button, message, Modal, Pagination, Select } from 'antd'; import styled from 'styled-components'; import { useCreateIngestionExecutionRequestMutation, useCreateIngestionSourceMutation, useDeleteIngestionSourceMutation, useListIngestionSourcesQuery, useUpdateIngestionSourceMutation, } from '../../../graphql/ingestion.generated'; import { Message } from '../../shared/Message'; import TabToolbar from '../../entity/shared/components/styled/TabToolbar'; import { IngestionSourceBuilderModal } from './builder/IngestionSourceBuilderModal'; import { CLI_EXECUTOR_ID, RUNNING } from './utils'; import { DEFAULT_EXECUTOR_ID, SourceBuilderState } from './builder/types'; import { IngestionSource, UpdateIngestionSourceInput } from '../../../types.generated'; import { SearchBar } from '../../search/SearchBar'; import { useEntityRegistry } from '../../useEntityRegistry'; import { ExecutionDetailsModal } from './ExecutionRequestDetailsModal'; import RecipeViewerModal from './RecipeViewerModal'; import IngestionSourceTable from './IngestionSourceTable'; const SourceContainer = styled.div``; const SourcePaginationContainer = styled.div` display: flex; justify-content: center; `; const StyledSelect = styled(Select)` margin-right: 15px; min-width: 75px; `; const FilterWrapper = styled.div` display: flex; `; export enum IngestionSourceType { ALL, UI, CLI, } export function shouldIncludeSource(source: any, sourceFilter: IngestionSourceType) { if (sourceFilter === IngestionSourceType.CLI) { return source.config.executorId === CLI_EXECUTOR_ID; } if (sourceFilter === IngestionSourceType.UI) { return source.config.executorId !== CLI_EXECUTOR_ID; } return true; } const DEFAULT_PAGE_SIZE = 25; const removeExecutionsFromIngestionSource = (source) => { if (source) { return { name: source.name, type: source.type, schedule: source.schedule, config: source.config, }; } return undefined; }; export const IngestionSourceList = () => { const entityRegistry = useEntityRegistry(); const location = useLocation(); const params = QueryString.parse(location.search, { arrayFormat: 'comma' }); const paramsQuery = (params?.query as string) || undefined; const [query, setQuery] = useState(undefined); useEffect(() => setQuery(paramsQuery), [paramsQuery]); const [page, setPage] = useState(1); const pageSize = DEFAULT_PAGE_SIZE; const start = (page - 1) * pageSize; const [isBuildingSource, setIsBuildingSource] = useState(false); const [isViewingRecipe, setIsViewingRecipe] = useState(false); const [focusSourceUrn, setFocusSourceUrn] = useState(undefined); const [focusExecutionUrn, setFocusExecutionUrn] = useState(undefined); const [lastRefresh, setLastRefresh] = useState(0); // Set of removed urns used to account for eventual consistency const [removedUrns, setRemovedUrns] = useState([]); const [refreshInterval, setRefreshInterval] = useState(null); const [sourceFilter, setSourceFilter] = useState(IngestionSourceType.ALL); // Ingestion Source Queries const { loading, error, data, refetch } = useListIngestionSourcesQuery({ variables: { input: { start, count: pageSize, query, }, }, }); const [createIngestionSource] = useCreateIngestionSourceMutation(); const [updateIngestionSource] = useUpdateIngestionSourceMutation(); // Execution Request queries const [createExecutionRequestMutation] = useCreateIngestionExecutionRequestMutation(); const [removeIngestionSourceMutation] = useDeleteIngestionSourceMutation(); const totalSources = data?.listIngestionSources?.total || 0; const sources = data?.listIngestionSources?.ingestionSources || []; const filteredSources = sources.filter( (source) => !removedUrns.includes(source.urn) && shouldIncludeSource(source, sourceFilter), ) as IngestionSource[]; const focusSource = (focusSourceUrn && filteredSources.find((source) => source.urn === focusSourceUrn)) || undefined; const onRefresh = useCallback(() => { refetch(); // Used to force a re-render of the child execution request list. setLastRefresh(new Date().getMilliseconds()); }, [refetch]); useEffect(() => { const runningSource = filteredSources.find((source) => source.executions?.executionRequests.find((request) => request.result?.status === RUNNING), ); if (runningSource) { if (!refreshInterval) { const interval = setInterval(onRefresh, 3000); setRefreshInterval(interval); } } else if (refreshInterval) { clearInterval(refreshInterval); setRefreshInterval(null); } }, [filteredSources, refreshInterval, onRefresh]); const executeIngestionSource = (urn: string) => { createExecutionRequestMutation({ variables: { input: { ingestionSourceUrn: urn, }, }, }) .then(() => { message.success({ content: `Successfully submitted ingestion execution request!`, duration: 3, }); setTimeout(() => onRefresh(), 3000); }) .catch((e) => { message.destroy(); message.error({ content: `Failed to submit ingestion execution request!: \n ${e.message || ''}`, duration: 3, }); }); }; const onCreateOrUpdateIngestionSourceSuccess = () => { setTimeout(() => refetch(), 2000); setIsBuildingSource(false); setFocusSourceUrn(undefined); }; const createOrUpdateIngestionSource = ( input: UpdateIngestionSourceInput, resetState: () => void, shouldRun?: boolean, ) => { if (focusSourceUrn) { // Update: updateIngestionSource({ variables: { urn: focusSourceUrn as string, input } }) .then(() => { message.success({ content: `Successfully updated ingestion source!`, duration: 3, }); onCreateOrUpdateIngestionSourceSuccess(); resetState(); if (shouldRun) executeIngestionSource(focusSourceUrn); }) .catch((e) => { message.destroy(); message.error({ content: `Failed to update ingestion source!: \n ${e.message || ''}`, duration: 3, }); }); } else { // Create createIngestionSource({ variables: { input } }) .then((result) => { message.loading({ content: 'Loading...', duration: 2 }); setTimeout(() => { refetch(); message.success({ content: `Successfully created ingestion source!`, duration: 3, }); if (shouldRun && result.data?.createIngestionSource) { executeIngestionSource(result.data.createIngestionSource); } }, 2000); setIsBuildingSource(false); setFocusSourceUrn(undefined); resetState(); // onCreateOrUpdateIngestionSourceSuccess(); }) .catch((e) => { message.destroy(); message.error({ content: `Failed to create ingestion source!: \n ${e.message || ''}`, duration: 3, }); }); } }; const onChangePage = (newPage: number) => { setPage(newPage); }; const deleteIngestionSource = async (urn: string) => { removeIngestionSourceMutation({ variables: { urn }, }) .then(() => { message.success({ content: 'Removed ingestion source.', duration: 2 }); const newRemovedUrns = [...removedUrns, urn]; setRemovedUrns(newRemovedUrns); setTimeout(function () { refetch?.(); }, 3000); }) .catch((e: unknown) => { message.destroy(); if (e instanceof Error) { message.error({ content: `Failed to remove ingestion source: \n ${e.message || ''}`, duration: 3 }); } }); }; const onSubmit = (recipeBuilderState: SourceBuilderState, resetState: () => void, shouldRun?: boolean) => { createOrUpdateIngestionSource( { type: recipeBuilderState.type as string, name: recipeBuilderState.name as string, config: { recipe: recipeBuilderState.config?.recipe as string, version: (recipeBuilderState.config?.version?.length && (recipeBuilderState.config?.version as string)) || undefined, executorId: (recipeBuilderState.config?.executorId?.length && (recipeBuilderState.config?.executorId as string)) || DEFAULT_EXECUTOR_ID, }, schedule: recipeBuilderState.schedule && { interval: recipeBuilderState.schedule?.interval as string, timezone: recipeBuilderState.schedule?.timezone as string, }, }, resetState, shouldRun, ); }; const onEdit = (urn: string) => { setIsBuildingSource(true); setFocusSourceUrn(urn); }; const onView = (urn: string) => { setIsViewingRecipe(true); setFocusSourceUrn(urn); }; const onExecute = (urn: string) => { Modal.confirm({ title: `Confirm Source Execution`, content: "Click 'Execute' to run this ingestion source.", onOk() { executeIngestionSource(urn); }, onCancel() {}, okText: 'Execute', maskClosable: true, closable: true, }); }; const onDelete = (urn: string) => { Modal.confirm({ title: `Confirm Ingestion Source Removal`, content: `Are you sure you want to remove this ingestion source? Removing will terminate any scheduled ingestion runs.`, onOk() { deleteIngestionSource(urn); }, onCancel() {}, okText: 'Yes', maskClosable: true, closable: true, }); }; const onCancel = () => { setIsBuildingSource(false); setIsViewingRecipe(false); setFocusSourceUrn(undefined); }; return ( <> {!data && loading && } {error && message.error({ content: `Failed to load ingestion sources! \n ${error.message || ''}`, duration: 3 })}
setSourceFilter(selection as IngestionSourceType)} > All UI CLI null} onQueryChange={(q) => setQuery(q)} entityRegistry={entityRegistry} />
{isViewingRecipe && } {focusExecutionUrn && ( setFocusExecutionUrn(undefined)} /> )} ); };