feat(ingestion): add rollback button (#13677)

This commit is contained in:
v-tarasevich-blitz-brain 2025-06-04 18:50:33 +03:00 committed by GitHub
parent 65ae48e774
commit 93aa64200a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 360 additions and 12 deletions

View File

@ -110,6 +110,7 @@ import com.linkedin.datahub.graphql.resolvers.incident.UpdateIncidentStatusResol
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CancelIngestionExecutionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CreateIngestionExecutionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CreateTestConnectionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.GetLatestSuccessfulExecutionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.IngestionSourceExecutionRequestsResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.ListExecutionRequestsResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.RollbackIngestionResolver;
@ -3194,7 +3195,11 @@ public class GmsGraphQLEngine {
return ingestionSource.getPlatform() != null
? ingestionSource.getPlatform().getUrn()
: null;
})))
}))
.dataFetcher(
"latestSuccessfulExecution",
new GetLatestSuccessfulExecutionRequestResolver(
this.entityClient, this.executionRequestType)))
.type(
"ListIngestionSourcesResult",
typeWiring ->

View File

@ -0,0 +1,112 @@
package com.linkedin.datahub.graphql.resolvers.ingest.execution;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.ExecutionRequest;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.FilterOperator;
import com.linkedin.datahub.graphql.generated.IngestionSource;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.ingestion.ExecutionRequestType;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchResult;
import graphql.execution.DataFetcherResult;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
/** Returns the latest successful execution request by ingestion source */
public class GetLatestSuccessfulExecutionRequestResolver
implements DataFetcher<CompletableFuture<ExecutionRequest>> {
private final EntityClient _entityClient;
private final ExecutionRequestType _executionRequestType;
private final String INGESTION_SOURCE_FIELD = "ingestionSource";
private final String RESULT_STATUS_FIELD = "executionResultStatus";
private final String RESULT_REQUEST_TIME_MS_FIELD = "requestTimeMs";
private final String STATUS_SUCCESS = "SUCCESS";
public GetLatestSuccessfulExecutionRequestResolver(
final EntityClient entityClient, final ExecutionRequestType executionRequestType) {
_entityClient = entityClient;
_executionRequestType = executionRequestType;
}
@Override
public CompletableFuture<ExecutionRequest> get(final DataFetchingEnvironment environment)
throws Exception {
final QueryContext context = environment.getContext();
final String ingestionSourceUrn = ((IngestionSource) environment.getSource()).getUrn();
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
final Urn executionRequestUrn =
getLatestSuccessfulExecutionRequestUrn(ingestionSourceUrn, context);
if (executionRequestUrn == null) {
return null;
}
Optional<DataFetcherResult<ExecutionRequest>> resp =
_executionRequestType
.batchLoad(List.of(executionRequestUrn.toString()), context)
.stream()
.findFirst();
return resp.map(DataFetcherResult::getData).orElse(null);
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to get latest successful execution request by source: %s",
ingestionSourceUrn),
e);
}
},
this.getClass().getSimpleName(),
"get");
}
private Urn getLatestSuccessfulExecutionRequestUrn(
@Nonnull final String ingestionSourceUrn, @Nonnull QueryContext context) throws Exception {
final SearchResult gmsResult =
_entityClient.filter(
context.getOperationContext(),
Constants.EXECUTION_REQUEST_ENTITY_NAME,
Objects.requireNonNull(
ResolverUtils.buildFilter(
List.of(
new FacetFilterInput(
INGESTION_SOURCE_FIELD,
null,
List.of(ingestionSourceUrn),
false,
FilterOperator.EQUAL),
new FacetFilterInput(
RESULT_STATUS_FIELD,
null,
List.of(STATUS_SUCCESS),
false,
FilterOperator.EQUAL)),
Collections.emptyList())),
Collections.singletonList(
new SortCriterion()
.setField(RESULT_REQUEST_TIME_MS_FIELD)
.setOrder(SortOrder.DESCENDING)),
0,
1);
Optional<SearchEntity> optionalSearchEntity = gmsResult.getEntities().stream().findFirst();
return optionalSearchEntity.map(SearchEntity::getEntity).orElse(null);
}
}

View File

@ -503,6 +503,11 @@ type IngestionSource {
Ownership metadata of the ingestion source
"""
ownership: Ownership
"""
The latest successful execution request for this source
"""
latestSuccessfulExecution: ExecutionRequest
}
"""

View File

@ -0,0 +1,149 @@
package com.linkedin.datahub.graphql.resolvers.ingest.execution;
import static com.linkedin.datahub.graphql.resolvers.ingest.IngestTestUtils.getMockAllowContext;
import static org.mockito.ArgumentMatchers.any;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.*;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.ingestion.ExecutionRequestType;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.search.SearchResult;
import graphql.execution.DataFetcherResult;
import graphql.schema.DataFetchingEnvironment;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
public class GetLatestSuccessfulExecutionRequestResolverTest {
private static final String TEST_EXECUTION_REQUEST_URN = "urn:li:executionRequest:test-execution";
private static final String TEST_INGESTION_SOURCE_URN =
"urn:li:dataHubIngestionSource:test-source";
private static final ListExecutionRequestsInput TEST_INPUT =
new ListExecutionRequestsInput(0, 20, null, null, null, null);
private static final Logger log =
LoggerFactory.getLogger(GetLatestSuccessfulExecutionRequestResolverTest.class);
@Test
public void testGetSuccess() throws Exception {
EntityClient mockEntityClient =
getMockedEntityClient(
new SearchResult()
.setFrom(0)
.setPageSize(1)
.setNumEntities(1)
.setEntities(
new SearchEntityArray(
ImmutableSet.of(
new SearchEntity()
.setEntity(Urn.createFromString(TEST_EXECUTION_REQUEST_URN))))));
ExecutionRequestType mockExecutionRequestType = Mockito.mock(ExecutionRequestType.class);
Mockito.when(
mockExecutionRequestType.batchLoad(
Mockito.eq(List.of(TEST_EXECUTION_REQUEST_URN)), any()))
.thenReturn(
List.of(
DataFetcherResult.<ExecutionRequest>newResult()
.data(getTestExecutionRequest())
.build()));
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getSource()).thenReturn(getTestIngestionSource());
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
GetLatestSuccessfulExecutionRequestResolver resolver =
new GetLatestSuccessfulExecutionRequestResolver(mockEntityClient, mockExecutionRequestType);
var result = resolver.get(mockEnv).get();
assertEquals(result.getUrn(), TEST_EXECUTION_REQUEST_URN);
}
@Test
public void testGetSuccessWithEmptyResponse() throws Exception {
EntityClient mockEntityClient =
getMockedEntityClient(
new SearchResult()
.setFrom(0)
.setPageSize(1)
.setNumEntities(0)
.setEntities(new SearchEntityArray(ImmutableSet.of())));
ExecutionRequestType mockExecutionRequestType = Mockito.mock(ExecutionRequestType.class);
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getSource()).thenReturn(getTestIngestionSource());
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
GetLatestSuccessfulExecutionRequestResolver resolver =
new GetLatestSuccessfulExecutionRequestResolver(mockEntityClient, mockExecutionRequestType);
var result = resolver.get(mockEnv).get();
assertNull(result);
}
private ExecutionRequest getTestExecutionRequest() {
return new ExecutionRequest(
TEST_EXECUTION_REQUEST_URN,
EntityType.EXECUTION_REQUEST,
"testId",
new ExecutionRequestInput("task", null, null, 0L, null, null, "default"),
null,
null,
null);
}
private IngestionSource getTestIngestionSource() {
IngestionSource ingestionSource = new IngestionSource();
ingestionSource.setUrn(TEST_INGESTION_SOURCE_URN);
return ingestionSource;
}
private EntityClient getMockedEntityClient(SearchResult filterSearchResult) throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);
Mockito.when(
mockClient.filter(
any(),
Mockito.eq(Constants.EXECUTION_REQUEST_ENTITY_NAME),
Mockito.eq(
Objects.requireNonNull(
ResolverUtils.buildFilter(
List.of(
new FacetFilterInput(
"ingestionSource",
null,
List.of(TEST_INGESTION_SOURCE_URN),
false,
FilterOperator.EQUAL),
new FacetFilterInput(
"executionResultStatus",
null,
List.of("SUCCESS"),
false,
FilterOperator.EQUAL)),
Collections.emptyList()))),
Mockito.eq(
Collections.singletonList(
new com.linkedin.metadata.query.filter.SortCriterion()
.setField("requestTimeMs")
.setOrder(SortOrder.DESCENDING))),
Mockito.eq(0),
Mockito.eq(1)))
.thenReturn(filterSearchResult);
return mockClient;
}
}

View File

@ -1,4 +1,5 @@
import { Pagination } from '@components';
import { message } from 'antd';
import React, { useCallback, useEffect, useState } from 'react';
import styled from 'styled-components';
@ -14,7 +15,7 @@ import { Message } from '@app/shared/Message';
import { scrollToTop } from '@app/shared/searchUtils';
import usePagination from '@app/sharedV2/pagination/usePagination';
import { useListIngestionExecutionRequestsQuery } from '@graphql/ingestion.generated';
import { useListIngestionExecutionRequestsQuery, useRollbackIngestionMutation } from '@graphql/ingestion.generated';
import { ExecutionRequest } from '@types';
const SourceContainer = styled.div`
@ -78,6 +79,27 @@ export const ExecutionsTab = () => {
},
});
const [rollbackIngestion] = useRollbackIngestionMutation();
const handleRollbackExecution = useCallback(
(runId: string) => {
message.loading('Requesting rollback...');
rollbackIngestion({ variables: { input: { runId } } })
.then(() => {
setTimeout(() => {
message.destroy();
refetch();
message.success('Successfully requested ingestion rollback');
}, 2000);
})
.catch(() => {
message.error('Error requesting ingestion rollback');
});
},
[refetch, rollbackIngestion],
);
const totalExecutionRequests = data?.listExecutionRequests?.total || 0;
const executionRequests: ExecutionRequest[] = data?.listExecutionRequests?.executionRequests || [];
@ -117,6 +139,7 @@ export const ExecutionsTab = () => {
<ExecutionsTable
executionRequests={executionRequests || []}
setFocusExecutionUrn={setExecutionRequestUrnToView}
handleRollback={handleRollbackExecution}
loading={loading}
/>
</TableContainer>

View File

@ -1,5 +1,5 @@
import { Column, Table } from '@components';
import React from 'react';
import React, { useCallback, useState } from 'react';
import styled from 'styled-components/macro';
import { CLI_EXECUTOR_ID } from '@app/ingestV2/constants';
@ -11,6 +11,7 @@ import DateTimeColumn from '@app/ingestV2/shared/components/columns/DateTimeColu
import DurationColumn from '@app/ingestV2/shared/components/columns/DurationColumn';
import { StatusColumn } from '@app/ingestV2/shared/components/columns/StatusColumn';
import { getIngestionSourceStatus } from '@app/ingestV2/source/utils';
import { ConfirmationModal } from '@app/sharedV2/modals/ConfirmationModal';
import { ExecutionRequest } from '@types';
@ -22,9 +23,12 @@ interface Props {
executionRequests: ExecutionRequest[];
setFocusExecutionUrn: (urn: string) => void;
loading?: boolean;
handleRollback: (executionUrn: string) => void;
}
export default function ExecutionsTable({ executionRequests, setFocusExecutionUrn, loading }: Props) {
export default function ExecutionsTable({ executionRequests, setFocusExecutionUrn, loading, handleRollback }: Props) {
const [runIdOfRollbackConfirmation, setRunIdOfRollbackConfirmation] = useState<string | undefined>();
const tableData: ExecutionRequestRecord[] = executionRequests.map((execution) => ({
urn: execution.urn,
name: execution?.source?.name,
@ -35,11 +39,15 @@ export default function ExecutionsTable({ executionRequests, setFocusExecutionUr
startedAt: execution.result?.startTimeMs,
duration: execution.result?.durationMs,
status: getIngestionSourceStatus(execution.result),
// TODO:: getting this field form backend is not implemented yet
showRollback: false,
showRollback: execution.source?.latestSuccessfulExecution?.urn === execution.urn,
cliIngestion: execution.input.executorId === CLI_EXECUTOR_ID,
}));
const handleConfirmRollback = useCallback(() => {
if (runIdOfRollbackConfirmation) handleRollback(runIdOfRollbackConfirmation);
setRunIdOfRollbackConfirmation(undefined);
}, [handleRollback, runIdOfRollbackConfirmation]);
const tableColumns: Column<ExecutionRequestRecord>[] = [
{
title: 'Name',
@ -76,10 +84,29 @@ export default function ExecutionsTable({ executionRequests, setFocusExecutionUr
{
title: '',
key: 'actions',
render: (record) => <ActionsColumn record={record} setFocusExecutionUrn={setFocusExecutionUrn} />,
width: '50px',
render: (record) => (
<ActionsColumn
record={record}
setFocusExecutionUrn={setFocusExecutionUrn}
handleRollback={() => setRunIdOfRollbackConfirmation(record.id)}
/>
),
width: '100px',
},
];
return <StyledTable columns={tableColumns} data={tableData} isScrollable isLoading={loading} />;
return (
<>
<StyledTable columns={tableColumns} data={tableData} isScrollable isLoading={loading} />
<ConfirmationModal
isOpen={!!runIdOfRollbackConfirmation}
modalTitle="Confirm Rollback"
modalText="Are you sure you want to continue?
Rolling back this ingestion run will remove any new data ingested during the run. This may
exclude data that was previously extracted, but did not change during this run."
handleConfirm={() => handleConfirmRollback()}
handleClose={() => setRunIdOfRollbackConfirmation(undefined)}
/>
</>
);
}

View File

@ -1,15 +1,17 @@
import { Icon } from '@components';
import React from 'react';
import { EXECUTION_REQUEST_STATUS_RUNNING } from '@app/ingestV2/executions/constants';
import { EXECUTION_REQUEST_STATUS_RUNNING, EXECUTION_REQUEST_STATUS_SUCCESS } from '@app/ingestV2/executions/constants';
import { ExecutionRequestRecord } from '@app/ingestV2/executions/types';
import BaseActionsColumn, { MenuItem } from '@app/ingestV2/shared/components/columns/BaseActionsColumn';
interface ActionsColumnProps {
record: ExecutionRequestRecord;
setFocusExecutionUrn: (urn: string) => void;
handleRollback: (urn: string) => void;
}
export function ActionsColumn({ record, setFocusExecutionUrn }: ActionsColumnProps) {
export function ActionsColumn({ record, setFocusExecutionUrn, handleRollback }: ActionsColumnProps) {
const items = [
{
key: '0',
@ -42,5 +44,14 @@ export function ActionsColumn({ record, setFocusExecutionUrn }: ActionsColumnPro
label: <MenuItem onClick={() => {}}>Cancel</MenuItem>,
},
];
return <BaseActionsColumn dropdownItems={items} />;
return (
<BaseActionsColumn
dropdownItems={items}
extraActions={
record.status === EXECUTION_REQUEST_STATUS_SUCCESS && record.showRollback ? (
<Icon icon="ArrowUUpLeft" source="phosphor" onClick={() => handleRollback(record.id)} />
) : null
}
/>
);
}

View File

@ -1855,6 +1855,19 @@ fragment ingestionSourceFields on IngestionSource {
}
}
fragment minimalIngestionExecutionRequestFields on ExecutionRequest {
urn
id
type
input {
requestedAt
source {
type
}
task
}
}
fragment ingestionExecutionRequestFields on ExecutionRequest {
urn
id

View File

@ -52,6 +52,9 @@ query listIngestionExecutionRequests($input: ListExecutionRequestsInput!) {
...ingestionExecutionRequestFields
source {
...ingestionSourceFields
latestSuccessfulExecution {
...minimalIngestionExecutionRequestFields
}
}
}
}