From 5ce621adc7d3330e488dc46f2152369f7b7af94e Mon Sep 17 00:00:00 2001 From: Bassel Date: Thu, 25 May 2023 15:08:45 +0300 Subject: [PATCH] add extra check for messages and new configs for remote providers --- .../strapi/providers/remote-destination/index.ts | 5 ++++- .../src/strapi/providers/remote-source/index.ts | 5 ++++- .../data-transfer/src/strapi/providers/utils.ts | 13 ++++++++++--- .../src/strapi/remote/handlers/abstract.ts | 6 ++++++ .../src/strapi/remote/handlers/pull.ts | 8 +++++--- .../src/strapi/remote/handlers/push.ts | 7 +++++-- .../src/strapi/remote/handlers/utils.ts | 9 +++++++++ .../strapi/lib/commands/actions/transfer/action.js | 4 ++++ 8 files changed, 47 insertions(+), 10 deletions(-) diff --git a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts index 13fef83cde..dc3b5c8bda 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts @@ -20,6 +20,8 @@ export interface IRemoteStrapiDestinationProviderOptions extends Pick { url: URL; auth?: ITransferTokenAuth; + retryMessageTimeout?: number; + retryMessageMaxRetries?: number; } const jsonLength = (obj: object) => Buffer.byteLength(JSON.stringify(obj)); @@ -213,7 +215,8 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { } this.ws = ws; - this.dispatcher = createDispatcher(this.ws); + const { retryMessageMaxRetries, retryMessageTimeout } = this.options; + this.dispatcher = createDispatcher(this.ws, { retryMessageMaxRetries, retryMessageTimeout }); this.transferID = await this.initTransfer(); diff --git a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts index 96667aafdc..15f331b2f7 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts @@ -24,6 +24,8 @@ interface ITransferTokenAuth { export interface IRemoteStrapiSourceProviderOptions extends ILocalStrapiSourceProviderOptions { url: URL; auth?: ITransferTokenAuth; + retryMessageTimeout?: number; + retryMessageMaxRetries?: number; } class RemoteStrapiSourceProvider implements ISourceProvider { @@ -210,7 +212,8 @@ class RemoteStrapiSourceProvider implements ISourceProvider { } this.ws = ws; - this.dispatcher = createDispatcher(this.ws); + const { retryMessageMaxRetries, retryMessageTimeout } = this.options; + this.dispatcher = createDispatcher(this.ws, { retryMessageMaxRetries, retryMessageTimeout }); const transferID = await this.initTransfer(); this.dispatcher.setTransferProperties({ id: transferID, kind: 'pull' }); diff --git a/packages/core/data-transfer/src/strapi/providers/utils.ts b/packages/core/data-transfer/src/strapi/providers/utils.ts index c421a94d96..a037935d7a 100644 --- a/packages/core/data-transfer/src/strapi/providers/utils.ts +++ b/packages/core/data-transfer/src/strapi/providers/utils.ts @@ -18,7 +18,13 @@ interface IDispatchOptions { type Dispatch = Omit; -export const createDispatcher = (ws: WebSocket) => { +export const createDispatcher = ( + ws: WebSocket, + { + retryMessageMaxRetries = 5, + retryMessageTimeout = 10000, + }: { retryMessageMaxRetries?: number; retryMessageTimeout?: number } +) => { const state: IDispatcherState = {}; type DispatchMessage = Dispatch; @@ -48,7 +54,8 @@ export const createDispatcher = (ws: WebSocket) => { }); const sendPeriodically = () => { - if (numberOfTimesMessageWasSent < 5) { + if (numberOfTimesMessageWasSent <= retryMessageMaxRetries) { + console.log('Retrying message', payload, numberOfTimesMessageWasSent); numberOfTimesMessageWasSent += 1; ws.send(stringifiedPayload, (error) => { if (error) { @@ -59,7 +66,7 @@ export const createDispatcher = (ws: WebSocket) => { reject(new ProviderError('error', 'Request timed out')); } }; - const interval = setInterval(sendPeriodically, 30000); + const interval = setInterval(sendPeriodically, retryMessageTimeout); const onResponse = (raw: RawData) => { const response: server.Message = JSON.parse(raw.toString()); diff --git a/packages/core/data-transfer/src/strapi/remote/handlers/abstract.ts b/packages/core/data-transfer/src/strapi/remote/handlers/abstract.ts index 59f5b1e8d0..05244fce08 100644 --- a/packages/core/data-transfer/src/strapi/remote/handlers/abstract.ts +++ b/packages/core/data-transfer/src/strapi/remote/handlers/abstract.ts @@ -27,6 +27,12 @@ export interface Handler { get response(): TransferState['response']; set response(response: TransferState['response']); + // Add message UUIDs + addUUID(uuid: string): void; + + // Check if a message UUID exists + hasUUID(uuid: string): boolean; + /** * Returns whether a transfer is currently in progress or not */ diff --git a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts index cb95283caf..0ab84dd657 100644 --- a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts +++ b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts @@ -71,13 +71,15 @@ export const createPullController = handlerControllerFactory { handleWSUpgrade(wss, ctx, (ws) => { const state: TransferState = { id: undefined }; + const messageuuids = new Set(); const prototype: Handler = { // Transfer ID @@ -107,6 +108,14 @@ export const handlerControllerFactory = state.response = response; }, + addUUID(uuid) { + messageuuids.add(uuid); + }, + + hasUUID(uuid) { + return messageuuids.has(uuid); + }, + isTransferStarted() { return this.transferID !== undefined && this.startedAt !== undefined; }, diff --git a/packages/core/strapi/lib/commands/actions/transfer/action.js b/packages/core/strapi/lib/commands/actions/transfer/action.js index a238878e69..fe62b9ec18 100644 --- a/packages/core/strapi/lib/commands/actions/transfer/action.js +++ b/packages/core/strapi/lib/commands/actions/transfer/action.js @@ -79,6 +79,8 @@ module.exports = async (opts) => { type: 'token', token: opts.fromToken, }, + retryMessageMaxRetries: 5, + retryMessageTimeout: 15000, }); } @@ -108,6 +110,8 @@ module.exports = async (opts) => { restore: { entities: { exclude: DEFAULT_IGNORED_CONTENT_TYPES }, }, + retryMessageMaxRetries: 5, + retryMessageTimeout: 15000, }); }