Merge pull request #16773 from strapi/fix/transfer-push-stuck

[DTS] Retry sending messages if no response was recieved
This commit is contained in:
Bassel Kanso 2023-06-09 18:23:29 +03:00 committed by GitHub
commit 60c5f18f6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 85 additions and 10 deletions

View File

@ -3,6 +3,7 @@ import { TRANSFER_PATH } from '../../../remote/constants';
import { CommandMessage } from '../../../../../types/remote/protocol/client';
import { createDispatcher } from '../../utils';
jest.useFakeTimers();
jest.mock('ws', () => ({
WebSocket: jest.fn().mockImplementation(() => {
return {

View File

@ -20,6 +20,7 @@ export interface IRemoteStrapiDestinationProviderOptions
extends Pick<ILocalStrapiDestinationProviderOptions, 'restore' | 'strategy'> {
url: URL;
auth?: ITransferTokenAuth;
retryMessageOptions?: { retryMessageTimeout: number; retryMessageMaxRetries: number };
}
const jsonLength = (obj: object) => Buffer.byteLength(JSON.stringify(obj));
@ -213,7 +214,8 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
}
this.ws = ws;
this.dispatcher = createDispatcher(this.ws);
const { retryMessageOptions } = this.options;
this.dispatcher = createDispatcher(this.ws, retryMessageOptions);
this.transferID = await this.initTransfer();
@ -315,8 +317,6 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
}
if (hasStarted) {
await this.#streamStep('assets', null);
const endStepError = await this.#endStep('assets');
if (endStepError) {

View File

@ -24,6 +24,7 @@ interface ITransferTokenAuth {
export interface IRemoteStrapiSourceProviderOptions extends ILocalStrapiSourceProviderOptions {
url: URL;
auth?: ITransferTokenAuth;
retryMessageOptions?: { retryMessageTimeout: number; retryMessageMaxRetries: number };
}
class RemoteStrapiSourceProvider implements ISourceProvider {
@ -210,7 +211,8 @@ class RemoteStrapiSourceProvider implements ISourceProvider {
}
this.ws = ws;
this.dispatcher = createDispatcher(this.ws);
const { retryMessageOptions } = this.options;
this.dispatcher = createDispatcher(this.ws, retryMessageOptions);
const transferID = await this.initTransfer();
this.dispatcher.setTransferProperties({ id: transferID, kind: 'pull' });

View File

@ -18,7 +18,13 @@ interface IDispatchOptions {
type Dispatch<T> = Omit<T, 'transferID' | 'uuid'>;
export const createDispatcher = (ws: WebSocket) => {
export const createDispatcher = (
ws: WebSocket,
retryMessageOptions = {
retryMessageMaxRetries: 5,
retryMessageTimeout: 15000,
}
) => {
const state: IDispatcherState = {};
type DispatchMessage = Dispatch<client.Message>;
@ -34,26 +40,40 @@ export const createDispatcher = (ws: WebSocket) => {
return new Promise<U | null>((resolve, reject) => {
const uuid = randomUUID();
const payload = { ...message, uuid };
let numberOfTimesMessageWasSent = 0;
if (options.attachTransfer) {
Object.assign(payload, { transferID: state.transfer?.id });
}
const stringifiedPayload = JSON.stringify(payload);
ws.send(stringifiedPayload, (error) => {
if (error) {
reject(error);
}
});
const { retryMessageMaxRetries, retryMessageTimeout } = retryMessageOptions;
const sendPeriodically = () => {
if (numberOfTimesMessageWasSent <= retryMessageMaxRetries) {
numberOfTimesMessageWasSent += 1;
ws.send(stringifiedPayload, (error) => {
if (error) {
reject(error);
}
});
} else {
reject(new ProviderError('error', 'Request timed out'));
}
};
const interval = setInterval(sendPeriodically, retryMessageTimeout);
const onResponse = (raw: RawData) => {
const response: server.Message<U> = JSON.parse(raw.toString());
if (response.uuid === uuid) {
clearInterval(interval);
if (response.error) {
return reject(new ProviderError('error', response.error.message));
}
resolve(response.data ?? null);
} else {
ws.once('message', onResponse);

View File

@ -8,6 +8,11 @@ type BufferLike = Parameters<WebSocket['send']>[0];
export interface TransferState {
id?: string;
startedAt?: number;
response?: {
uuid?: string;
e?: Error | null;
data?: unknown;
};
}
export interface Handler {
@ -19,6 +24,15 @@ export interface Handler {
get startedAt(): TransferState['startedAt'];
set startedAt(id: TransferState['startedAt']);
get response(): TransferState['response'];
set response(response: TransferState['response']);
// Add message UUIDs
addUUID(uuid: string): void;
// Check if a message UUID exists
hasUUID(uuid: string): boolean;
/**
* Returns whether a transfer is currently in progress or not
*/

View File

@ -70,8 +70,16 @@ export const createPullController = handlerControllerFactory<Partial<PullHandler
await this.respond(undefined, new Error('Missing uuid in message'));
}
const { uuid, type } = msg;
if (proto.hasUUID(msg.uuid)) {
const previousResponse = proto.response;
if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
}
return;
}
const { uuid, type } = msg;
proto.addUUID(uuid);
// Regular command message (init, end, status)
if (type === 'command') {
const { command } = msg;

View File

@ -205,8 +205,16 @@ export const createPushController = handlerControllerFactory<Partial<PushHandler
await this.respond(undefined, new Error('Missing uuid in message'));
}
const { uuid, type } = msg;
if (proto.hasUUID(msg.uuid)) {
const previousResponse = proto.response;
if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
}
return;
}
const { uuid, type } = msg;
proto.addUUID(uuid);
// Regular command message (init, end, status)
if (type === 'command') {
const { command } = msg;

View File

@ -79,6 +79,7 @@ export const handlerControllerFactory =
return async (ctx: Context) => {
handleWSUpgrade(wss, ctx, (ws) => {
const state: TransferState = { id: undefined };
const messageUUIDs = new Set<string>();
const prototype: Handler = {
// Transfer ID
@ -99,6 +100,22 @@ export const handlerControllerFactory =
state.startedAt = timestamp;
},
get response() {
return state.response;
},
set response(response) {
state.response = response;
},
addUUID(uuid) {
messageUUIDs.add(uuid);
},
hasUUID(uuid) {
return messageUUIDs.has(uuid);
},
isTransferStarted() {
return this.transferID !== undefined && this.startedAt !== undefined;
},
@ -126,7 +143,11 @@ export const handlerControllerFactory =
reject(new Error('Missing uuid for this message'));
return;
}
this.response = {
uuid,
data,
e,
};
const payload = JSON.stringify({
uuid,
data: data ?? null,
@ -199,6 +220,7 @@ export const handlerControllerFactory =
cleanup() {
this.transferID = undefined;
this.startedAt = undefined;
this.response = undefined;
},
teardown() {