diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java index 9fc214968c..c0618dc154 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java @@ -110,6 +110,7 @@ import com.linkedin.datahub.graphql.resolvers.incident.UpdateIncidentStatusResol import com.linkedin.datahub.graphql.resolvers.ingest.execution.CancelIngestionExecutionRequestResolver; import com.linkedin.datahub.graphql.resolvers.ingest.execution.CreateIngestionExecutionRequestResolver; import com.linkedin.datahub.graphql.resolvers.ingest.execution.CreateTestConnectionRequestResolver; +import com.linkedin.datahub.graphql.resolvers.ingest.execution.GetLatestSuccessfulExecutionRequestResolver; import com.linkedin.datahub.graphql.resolvers.ingest.execution.IngestionSourceExecutionRequestsResolver; import com.linkedin.datahub.graphql.resolvers.ingest.execution.ListExecutionRequestsResolver; import com.linkedin.datahub.graphql.resolvers.ingest.execution.RollbackIngestionResolver; @@ -3194,7 +3195,11 @@ public class GmsGraphQLEngine { return ingestionSource.getPlatform() != null ? ingestionSource.getPlatform().getUrn() : null; - }))) + })) + .dataFetcher( + "latestSuccessfulExecution", + new GetLatestSuccessfulExecutionRequestResolver( + this.entityClient, this.executionRequestType))) .type( "ListIngestionSourcesResult", typeWiring -> diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/GetLatestSuccessfulExecutionRequestResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/GetLatestSuccessfulExecutionRequestResolver.java new file mode 100644 index 0000000000..2d42bdd33a --- /dev/null +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/GetLatestSuccessfulExecutionRequestResolver.java @@ -0,0 +1,112 @@ +package com.linkedin.datahub.graphql.resolvers.ingest.execution; + +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.graphql.QueryContext; +import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils; +import com.linkedin.datahub.graphql.generated.ExecutionRequest; +import com.linkedin.datahub.graphql.generated.FacetFilterInput; +import com.linkedin.datahub.graphql.generated.FilterOperator; +import com.linkedin.datahub.graphql.generated.IngestionSource; +import com.linkedin.datahub.graphql.resolvers.ResolverUtils; +import com.linkedin.datahub.graphql.types.ingestion.ExecutionRequestType; +import com.linkedin.entity.client.EntityClient; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.query.filter.SortOrder; +import com.linkedin.metadata.search.SearchEntity; +import com.linkedin.metadata.search.SearchResult; +import graphql.execution.DataFetcherResult; +import graphql.schema.DataFetcher; +import graphql.schema.DataFetchingEnvironment; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; + +/** Returns the latest successful execution request by ingestion source */ +public class GetLatestSuccessfulExecutionRequestResolver + implements DataFetcher> { + + private final EntityClient _entityClient; + private final ExecutionRequestType _executionRequestType; + + private final String INGESTION_SOURCE_FIELD = "ingestionSource"; + private final String RESULT_STATUS_FIELD = "executionResultStatus"; + private final String RESULT_REQUEST_TIME_MS_FIELD = "requestTimeMs"; + private final String STATUS_SUCCESS = "SUCCESS"; + + public GetLatestSuccessfulExecutionRequestResolver( + final EntityClient entityClient, final ExecutionRequestType executionRequestType) { + _entityClient = entityClient; + _executionRequestType = executionRequestType; + } + + @Override + public CompletableFuture get(final DataFetchingEnvironment environment) + throws Exception { + final QueryContext context = environment.getContext(); + final String ingestionSourceUrn = ((IngestionSource) environment.getSource()).getUrn(); + + return GraphQLConcurrencyUtils.supplyAsync( + () -> { + try { + final Urn executionRequestUrn = + getLatestSuccessfulExecutionRequestUrn(ingestionSourceUrn, context); + + if (executionRequestUrn == null) { + return null; + } + + Optional> resp = + _executionRequestType + .batchLoad(List.of(executionRequestUrn.toString()), context) + .stream() + .findFirst(); + return resp.map(DataFetcherResult::getData).orElse(null); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to get latest successful execution request by source: %s", + ingestionSourceUrn), + e); + } + }, + this.getClass().getSimpleName(), + "get"); + } + + private Urn getLatestSuccessfulExecutionRequestUrn( + @Nonnull final String ingestionSourceUrn, @Nonnull QueryContext context) throws Exception { + final SearchResult gmsResult = + _entityClient.filter( + context.getOperationContext(), + Constants.EXECUTION_REQUEST_ENTITY_NAME, + Objects.requireNonNull( + ResolverUtils.buildFilter( + List.of( + new FacetFilterInput( + INGESTION_SOURCE_FIELD, + null, + List.of(ingestionSourceUrn), + false, + FilterOperator.EQUAL), + new FacetFilterInput( + RESULT_STATUS_FIELD, + null, + List.of(STATUS_SUCCESS), + false, + FilterOperator.EQUAL)), + Collections.emptyList())), + Collections.singletonList( + new SortCriterion() + .setField(RESULT_REQUEST_TIME_MS_FIELD) + .setOrder(SortOrder.DESCENDING)), + 0, + 1); + Optional optionalSearchEntity = gmsResult.getEntities().stream().findFirst(); + + return optionalSearchEntity.map(SearchEntity::getEntity).orElse(null); + } +} diff --git a/datahub-graphql-core/src/main/resources/ingestion.graphql b/datahub-graphql-core/src/main/resources/ingestion.graphql index 4df455c3f8..2620a04409 100644 --- a/datahub-graphql-core/src/main/resources/ingestion.graphql +++ b/datahub-graphql-core/src/main/resources/ingestion.graphql @@ -503,6 +503,11 @@ type IngestionSource { Ownership metadata of the ingestion source """ ownership: Ownership + + """ + The latest successful execution request for this source + """ + latestSuccessfulExecution: ExecutionRequest } """ diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/GetLatestSuccessfulExecutionRequestResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/GetLatestSuccessfulExecutionRequestResolverTest.java new file mode 100644 index 0000000000..46f9e3cea1 --- /dev/null +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/GetLatestSuccessfulExecutionRequestResolverTest.java @@ -0,0 +1,149 @@ +package com.linkedin.datahub.graphql.resolvers.ingest.execution; + +import static com.linkedin.datahub.graphql.resolvers.ingest.IngestTestUtils.getMockAllowContext; +import static org.mockito.ArgumentMatchers.any; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; + +import com.google.common.collect.ImmutableSet; +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.graphql.QueryContext; +import com.linkedin.datahub.graphql.generated.*; +import com.linkedin.datahub.graphql.resolvers.ResolverUtils; +import com.linkedin.datahub.graphql.types.ingestion.ExecutionRequestType; +import com.linkedin.entity.client.EntityClient; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.query.filter.SortOrder; +import com.linkedin.metadata.search.SearchEntity; +import com.linkedin.metadata.search.SearchEntityArray; +import com.linkedin.metadata.search.SearchResult; +import graphql.execution.DataFetcherResult; +import graphql.schema.DataFetchingEnvironment; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +public class GetLatestSuccessfulExecutionRequestResolverTest { + + private static final String TEST_EXECUTION_REQUEST_URN = "urn:li:executionRequest:test-execution"; + private static final String TEST_INGESTION_SOURCE_URN = + "urn:li:dataHubIngestionSource:test-source"; + + private static final ListExecutionRequestsInput TEST_INPUT = + new ListExecutionRequestsInput(0, 20, null, null, null, null); + private static final Logger log = + LoggerFactory.getLogger(GetLatestSuccessfulExecutionRequestResolverTest.class); + + @Test + public void testGetSuccess() throws Exception { + EntityClient mockEntityClient = + getMockedEntityClient( + new SearchResult() + .setFrom(0) + .setPageSize(1) + .setNumEntities(1) + .setEntities( + new SearchEntityArray( + ImmutableSet.of( + new SearchEntity() + .setEntity(Urn.createFromString(TEST_EXECUTION_REQUEST_URN)))))); + ExecutionRequestType mockExecutionRequestType = Mockito.mock(ExecutionRequestType.class); + Mockito.when( + mockExecutionRequestType.batchLoad( + Mockito.eq(List.of(TEST_EXECUTION_REQUEST_URN)), any())) + .thenReturn( + List.of( + DataFetcherResult.newResult() + .data(getTestExecutionRequest()) + .build())); + QueryContext mockContext = getMockAllowContext(); + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + Mockito.when(mockEnv.getSource()).thenReturn(getTestIngestionSource()); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + GetLatestSuccessfulExecutionRequestResolver resolver = + new GetLatestSuccessfulExecutionRequestResolver(mockEntityClient, mockExecutionRequestType); + var result = resolver.get(mockEnv).get(); + + assertEquals(result.getUrn(), TEST_EXECUTION_REQUEST_URN); + } + + @Test + public void testGetSuccessWithEmptyResponse() throws Exception { + EntityClient mockEntityClient = + getMockedEntityClient( + new SearchResult() + .setFrom(0) + .setPageSize(1) + .setNumEntities(0) + .setEntities(new SearchEntityArray(ImmutableSet.of()))); + ExecutionRequestType mockExecutionRequestType = Mockito.mock(ExecutionRequestType.class); + QueryContext mockContext = getMockAllowContext(); + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + Mockito.when(mockEnv.getSource()).thenReturn(getTestIngestionSource()); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + GetLatestSuccessfulExecutionRequestResolver resolver = + new GetLatestSuccessfulExecutionRequestResolver(mockEntityClient, mockExecutionRequestType); + var result = resolver.get(mockEnv).get(); + + assertNull(result); + } + + private ExecutionRequest getTestExecutionRequest() { + return new ExecutionRequest( + TEST_EXECUTION_REQUEST_URN, + EntityType.EXECUTION_REQUEST, + "testId", + new ExecutionRequestInput("task", null, null, 0L, null, null, "default"), + null, + null, + null); + } + + private IngestionSource getTestIngestionSource() { + IngestionSource ingestionSource = new IngestionSource(); + ingestionSource.setUrn(TEST_INGESTION_SOURCE_URN); + return ingestionSource; + } + + private EntityClient getMockedEntityClient(SearchResult filterSearchResult) throws Exception { + EntityClient mockClient = Mockito.mock(EntityClient.class); + + Mockito.when( + mockClient.filter( + any(), + Mockito.eq(Constants.EXECUTION_REQUEST_ENTITY_NAME), + Mockito.eq( + Objects.requireNonNull( + ResolverUtils.buildFilter( + List.of( + new FacetFilterInput( + "ingestionSource", + null, + List.of(TEST_INGESTION_SOURCE_URN), + false, + FilterOperator.EQUAL), + new FacetFilterInput( + "executionResultStatus", + null, + List.of("SUCCESS"), + false, + FilterOperator.EQUAL)), + Collections.emptyList()))), + Mockito.eq( + Collections.singletonList( + new com.linkedin.metadata.query.filter.SortCriterion() + .setField("requestTimeMs") + .setOrder(SortOrder.DESCENDING))), + Mockito.eq(0), + Mockito.eq(1))) + .thenReturn(filterSearchResult); + + return mockClient; + } +} diff --git a/datahub-web-react/src/app/ingestV2/executions/ExecutionsTab.tsx b/datahub-web-react/src/app/ingestV2/executions/ExecutionsTab.tsx index 2e808be03c..be8b80600a 100644 --- a/datahub-web-react/src/app/ingestV2/executions/ExecutionsTab.tsx +++ b/datahub-web-react/src/app/ingestV2/executions/ExecutionsTab.tsx @@ -1,4 +1,5 @@ import { Pagination } from '@components'; +import { message } from 'antd'; import React, { useCallback, useEffect, useState } from 'react'; import styled from 'styled-components'; @@ -14,7 +15,7 @@ import { Message } from '@app/shared/Message'; import { scrollToTop } from '@app/shared/searchUtils'; import usePagination from '@app/sharedV2/pagination/usePagination'; -import { useListIngestionExecutionRequestsQuery } from '@graphql/ingestion.generated'; +import { useListIngestionExecutionRequestsQuery, useRollbackIngestionMutation } from '@graphql/ingestion.generated'; import { ExecutionRequest } from '@types'; const SourceContainer = styled.div` @@ -78,6 +79,27 @@ export const ExecutionsTab = () => { }, }); + const [rollbackIngestion] = useRollbackIngestionMutation(); + + const handleRollbackExecution = useCallback( + (runId: string) => { + message.loading('Requesting rollback...'); + + rollbackIngestion({ variables: { input: { runId } } }) + .then(() => { + setTimeout(() => { + message.destroy(); + refetch(); + message.success('Successfully requested ingestion rollback'); + }, 2000); + }) + .catch(() => { + message.error('Error requesting ingestion rollback'); + }); + }, + [refetch, rollbackIngestion], + ); + const totalExecutionRequests = data?.listExecutionRequests?.total || 0; const executionRequests: ExecutionRequest[] = data?.listExecutionRequests?.executionRequests || []; @@ -117,6 +139,7 @@ export const ExecutionsTab = () => { diff --git a/datahub-web-react/src/app/ingestV2/executions/components/ExecutionsTable.tsx b/datahub-web-react/src/app/ingestV2/executions/components/ExecutionsTable.tsx index 9db071fa98..07aa7cc9eb 100644 --- a/datahub-web-react/src/app/ingestV2/executions/components/ExecutionsTable.tsx +++ b/datahub-web-react/src/app/ingestV2/executions/components/ExecutionsTable.tsx @@ -1,5 +1,5 @@ import { Column, Table } from '@components'; -import React from 'react'; +import React, { useCallback, useState } from 'react'; import styled from 'styled-components/macro'; import { CLI_EXECUTOR_ID } from '@app/ingestV2/constants'; @@ -11,6 +11,7 @@ import DateTimeColumn from '@app/ingestV2/shared/components/columns/DateTimeColu import DurationColumn from '@app/ingestV2/shared/components/columns/DurationColumn'; import { StatusColumn } from '@app/ingestV2/shared/components/columns/StatusColumn'; import { getIngestionSourceStatus } from '@app/ingestV2/source/utils'; +import { ConfirmationModal } from '@app/sharedV2/modals/ConfirmationModal'; import { ExecutionRequest } from '@types'; @@ -22,9 +23,12 @@ interface Props { executionRequests: ExecutionRequest[]; setFocusExecutionUrn: (urn: string) => void; loading?: boolean; + handleRollback: (executionUrn: string) => void; } -export default function ExecutionsTable({ executionRequests, setFocusExecutionUrn, loading }: Props) { +export default function ExecutionsTable({ executionRequests, setFocusExecutionUrn, loading, handleRollback }: Props) { + const [runIdOfRollbackConfirmation, setRunIdOfRollbackConfirmation] = useState(); + const tableData: ExecutionRequestRecord[] = executionRequests.map((execution) => ({ urn: execution.urn, name: execution?.source?.name, @@ -35,11 +39,15 @@ export default function ExecutionsTable({ executionRequests, setFocusExecutionUr startedAt: execution.result?.startTimeMs, duration: execution.result?.durationMs, status: getIngestionSourceStatus(execution.result), - // TODO:: getting this field form backend is not implemented yet - showRollback: false, + showRollback: execution.source?.latestSuccessfulExecution?.urn === execution.urn, cliIngestion: execution.input.executorId === CLI_EXECUTOR_ID, })); + const handleConfirmRollback = useCallback(() => { + if (runIdOfRollbackConfirmation) handleRollback(runIdOfRollbackConfirmation); + setRunIdOfRollbackConfirmation(undefined); + }, [handleRollback, runIdOfRollbackConfirmation]); + const tableColumns: Column[] = [ { title: 'Name', @@ -76,10 +84,29 @@ export default function ExecutionsTable({ executionRequests, setFocusExecutionUr { title: '', key: 'actions', - render: (record) => , - width: '50px', + render: (record) => ( + setRunIdOfRollbackConfirmation(record.id)} + /> + ), + width: '100px', }, ]; - return ; + return ( + <> + + handleConfirmRollback()} + handleClose={() => setRunIdOfRollbackConfirmation(undefined)} + /> + + ); } diff --git a/datahub-web-react/src/app/ingestV2/executions/components/columns/ActionsColumn.tsx b/datahub-web-react/src/app/ingestV2/executions/components/columns/ActionsColumn.tsx index 5d76f6df69..3f646fdebc 100644 --- a/datahub-web-react/src/app/ingestV2/executions/components/columns/ActionsColumn.tsx +++ b/datahub-web-react/src/app/ingestV2/executions/components/columns/ActionsColumn.tsx @@ -1,15 +1,17 @@ +import { Icon } from '@components'; import React from 'react'; -import { EXECUTION_REQUEST_STATUS_RUNNING } from '@app/ingestV2/executions/constants'; +import { EXECUTION_REQUEST_STATUS_RUNNING, EXECUTION_REQUEST_STATUS_SUCCESS } from '@app/ingestV2/executions/constants'; import { ExecutionRequestRecord } from '@app/ingestV2/executions/types'; import BaseActionsColumn, { MenuItem } from '@app/ingestV2/shared/components/columns/BaseActionsColumn'; interface ActionsColumnProps { record: ExecutionRequestRecord; setFocusExecutionUrn: (urn: string) => void; + handleRollback: (urn: string) => void; } -export function ActionsColumn({ record, setFocusExecutionUrn }: ActionsColumnProps) { +export function ActionsColumn({ record, setFocusExecutionUrn, handleRollback }: ActionsColumnProps) { const items = [ { key: '0', @@ -42,5 +44,14 @@ export function ActionsColumn({ record, setFocusExecutionUrn }: ActionsColumnPro label: {}}>Cancel, }, ]; - return ; + return ( + handleRollback(record.id)} /> + ) : null + } + /> + ); } diff --git a/datahub-web-react/src/graphql/fragments.graphql b/datahub-web-react/src/graphql/fragments.graphql index 58c834143c..a55da2f584 100644 --- a/datahub-web-react/src/graphql/fragments.graphql +++ b/datahub-web-react/src/graphql/fragments.graphql @@ -1855,6 +1855,19 @@ fragment ingestionSourceFields on IngestionSource { } } +fragment minimalIngestionExecutionRequestFields on ExecutionRequest { + urn + id + type + input { + requestedAt + source { + type + } + task + } +} + fragment ingestionExecutionRequestFields on ExecutionRequest { urn id diff --git a/datahub-web-react/src/graphql/ingestion.graphql b/datahub-web-react/src/graphql/ingestion.graphql index 63d5c1fe26..8502ed04fd 100644 --- a/datahub-web-react/src/graphql/ingestion.graphql +++ b/datahub-web-react/src/graphql/ingestion.graphql @@ -52,6 +52,9 @@ query listIngestionExecutionRequests($input: ListExecutionRequestsInput!) { ...ingestionExecutionRequestFields source { ...ingestionSourceFields + latestSuccessfulExecution { + ...minimalIngestionExecutionRequestFields + } } } }