diff --git a/packages/core/data-transfer/src/strapi/providers/remote-destination/__tests__/utils.test.ts b/packages/core/data-transfer/src/strapi/providers/remote-destination/__tests__/utils.test.ts index ac31b0f872..25c8c53992 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-destination/__tests__/utils.test.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-destination/__tests__/utils.test.ts @@ -3,6 +3,7 @@ import { TRANSFER_PATH } from '../../../remote/constants'; import { CommandMessage } from '../../../../../types/remote/protocol/client'; import { createDispatcher } from '../../utils'; +jest.useFakeTimers(); jest.mock('ws', () => ({ WebSocket: jest.fn().mockImplementation(() => { return { diff --git a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts index 13fef83cde..374bf09c5f 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts @@ -20,6 +20,7 @@ export interface IRemoteStrapiDestinationProviderOptions extends Pick { url: URL; auth?: ITransferTokenAuth; + retryMessageOptions?: { retryMessageTimeout: number; retryMessageMaxRetries: number }; } const jsonLength = (obj: object) => Buffer.byteLength(JSON.stringify(obj)); @@ -213,7 +214,8 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { } this.ws = ws; - this.dispatcher = createDispatcher(this.ws); + const { retryMessageOptions } = this.options; + this.dispatcher = createDispatcher(this.ws, retryMessageOptions); this.transferID = await this.initTransfer(); @@ -315,8 +317,6 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { } if (hasStarted) { - await this.#streamStep('assets', null); - const endStepError = await this.#endStep('assets'); if (endStepError) { diff --git a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts index 96667aafdc..9c515dca0b 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts @@ -24,6 +24,7 @@ interface ITransferTokenAuth { export interface IRemoteStrapiSourceProviderOptions extends ILocalStrapiSourceProviderOptions { url: URL; auth?: ITransferTokenAuth; + retryMessageOptions?: { retryMessageTimeout: number; retryMessageMaxRetries: number }; } class RemoteStrapiSourceProvider implements ISourceProvider { @@ -210,7 +211,8 @@ class RemoteStrapiSourceProvider implements ISourceProvider { } this.ws = ws; - this.dispatcher = createDispatcher(this.ws); + const { retryMessageOptions } = this.options; + this.dispatcher = createDispatcher(this.ws, retryMessageOptions); const transferID = await this.initTransfer(); this.dispatcher.setTransferProperties({ id: transferID, kind: 'pull' }); diff --git a/packages/core/data-transfer/src/strapi/providers/utils.ts b/packages/core/data-transfer/src/strapi/providers/utils.ts index 11396b41cd..fbec438fed 100644 --- a/packages/core/data-transfer/src/strapi/providers/utils.ts +++ b/packages/core/data-transfer/src/strapi/providers/utils.ts @@ -18,7 +18,13 @@ interface IDispatchOptions { type Dispatch = Omit; -export const createDispatcher = (ws: WebSocket) => { +export const createDispatcher = ( + ws: WebSocket, + retryMessageOptions = { + retryMessageMaxRetries: 5, + retryMessageTimeout: 15000, + } +) => { const state: IDispatcherState = {}; type DispatchMessage = Dispatch; @@ -34,26 +40,40 @@ export const createDispatcher = (ws: WebSocket) => { return new Promise((resolve, reject) => { const uuid = randomUUID(); const payload = { ...message, uuid }; + let numberOfTimesMessageWasSent = 0; if (options.attachTransfer) { Object.assign(payload, { transferID: state.transfer?.id }); } const stringifiedPayload = JSON.stringify(payload); - ws.send(stringifiedPayload, (error) => { if (error) { reject(error); } }); + const { retryMessageMaxRetries, retryMessageTimeout } = retryMessageOptions; + const sendPeriodically = () => { + if (numberOfTimesMessageWasSent <= retryMessageMaxRetries) { + numberOfTimesMessageWasSent += 1; + ws.send(stringifiedPayload, (error) => { + if (error) { + reject(error); + } + }); + } else { + reject(new ProviderError('error', 'Request timed out')); + } + }; + const interval = setInterval(sendPeriodically, retryMessageTimeout); const onResponse = (raw: RawData) => { const response: server.Message = JSON.parse(raw.toString()); if (response.uuid === uuid) { + clearInterval(interval); if (response.error) { return reject(new ProviderError('error', response.error.message)); } - resolve(response.data ?? null); } else { ws.once('message', onResponse); diff --git a/packages/core/data-transfer/src/strapi/remote/handlers/abstract.ts b/packages/core/data-transfer/src/strapi/remote/handlers/abstract.ts index 14a8e910ed..05244fce08 100644 --- a/packages/core/data-transfer/src/strapi/remote/handlers/abstract.ts +++ b/packages/core/data-transfer/src/strapi/remote/handlers/abstract.ts @@ -8,6 +8,11 @@ type BufferLike = Parameters[0]; export interface TransferState { id?: string; startedAt?: number; + response?: { + uuid?: string; + e?: Error | null; + data?: unknown; + }; } export interface Handler { @@ -19,6 +24,15 @@ export interface Handler { get startedAt(): TransferState['startedAt']; set startedAt(id: TransferState['startedAt']); + get response(): TransferState['response']; + set response(response: TransferState['response']); + + // Add message UUIDs + addUUID(uuid: string): void; + + // Check if a message UUID exists + hasUUID(uuid: string): boolean; + /** * Returns whether a transfer is currently in progress or not */ diff --git a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts index ce1a2ce3b7..e7d26a4671 100644 --- a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts +++ b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts @@ -70,8 +70,16 @@ export const createPullController = handlerControllerFactory { handleWSUpgrade(wss, ctx, (ws) => { const state: TransferState = { id: undefined }; + const messageUUIDs = new Set(); const prototype: Handler = { // Transfer ID @@ -99,6 +100,22 @@ export const handlerControllerFactory = state.startedAt = timestamp; }, + get response() { + return state.response; + }, + + set response(response) { + state.response = response; + }, + + addUUID(uuid) { + messageUUIDs.add(uuid); + }, + + hasUUID(uuid) { + return messageUUIDs.has(uuid); + }, + isTransferStarted() { return this.transferID !== undefined && this.startedAt !== undefined; }, @@ -126,7 +143,11 @@ export const handlerControllerFactory = reject(new Error('Missing uuid for this message')); return; } - + this.response = { + uuid, + data, + e, + }; const payload = JSON.stringify({ uuid, data: data ?? null, @@ -199,6 +220,7 @@ export const handlerControllerFactory = cleanup() { this.transferID = undefined; this.startedAt = undefined; + this.response = undefined; }, teardown() {