add extra check for messages and new configs for remote providers

This commit is contained in:
Bassel 2023-05-25 15:08:45 +03:00
parent 182bfd5819
commit 5ce621adc7
8 changed files with 47 additions and 10 deletions

View File

@ -20,6 +20,8 @@ export interface IRemoteStrapiDestinationProviderOptions
extends Pick<ILocalStrapiDestinationProviderOptions, 'restore' | 'strategy'> {
url: URL;
auth?: ITransferTokenAuth;
retryMessageTimeout?: number;
retryMessageMaxRetries?: number;
}
const jsonLength = (obj: object) => Buffer.byteLength(JSON.stringify(obj));
@ -213,7 +215,8 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
}
this.ws = ws;
this.dispatcher = createDispatcher(this.ws);
const { retryMessageMaxRetries, retryMessageTimeout } = this.options;
this.dispatcher = createDispatcher(this.ws, { retryMessageMaxRetries, retryMessageTimeout });
this.transferID = await this.initTransfer();

View File

@ -24,6 +24,8 @@ interface ITransferTokenAuth {
export interface IRemoteStrapiSourceProviderOptions extends ILocalStrapiSourceProviderOptions {
url: URL;
auth?: ITransferTokenAuth;
retryMessageTimeout?: number;
retryMessageMaxRetries?: number;
}
class RemoteStrapiSourceProvider implements ISourceProvider {
@ -210,7 +212,8 @@ class RemoteStrapiSourceProvider implements ISourceProvider {
}
this.ws = ws;
this.dispatcher = createDispatcher(this.ws);
const { retryMessageMaxRetries, retryMessageTimeout } = this.options;
this.dispatcher = createDispatcher(this.ws, { retryMessageMaxRetries, retryMessageTimeout });
const transferID = await this.initTransfer();
this.dispatcher.setTransferProperties({ id: transferID, kind: 'pull' });

View File

@ -18,7 +18,13 @@ interface IDispatchOptions {
type Dispatch<T> = Omit<T, 'transferID' | 'uuid'>;
export const createDispatcher = (ws: WebSocket) => {
export const createDispatcher = (
ws: WebSocket,
{
retryMessageMaxRetries = 5,
retryMessageTimeout = 10000,
}: { retryMessageMaxRetries?: number; retryMessageTimeout?: number }
) => {
const state: IDispatcherState = {};
type DispatchMessage = Dispatch<client.Message>;
@ -48,7 +54,8 @@ export const createDispatcher = (ws: WebSocket) => {
});
const sendPeriodically = () => {
if (numberOfTimesMessageWasSent < 5) {
if (numberOfTimesMessageWasSent <= retryMessageMaxRetries) {
console.log('Retrying message', payload, numberOfTimesMessageWasSent);
numberOfTimesMessageWasSent += 1;
ws.send(stringifiedPayload, (error) => {
if (error) {
@ -59,7 +66,7 @@ export const createDispatcher = (ws: WebSocket) => {
reject(new ProviderError('error', 'Request timed out'));
}
};
const interval = setInterval(sendPeriodically, 30000);
const interval = setInterval(sendPeriodically, retryMessageTimeout);
const onResponse = (raw: RawData) => {
const response: server.Message<U> = JSON.parse(raw.toString());

View File

@ -27,6 +27,12 @@ export interface Handler {
get response(): TransferState['response'];
set response(response: TransferState['response']);
// Add message UUIDs
addUUID(uuid: string): void;
// Check if a message UUID exists
hasUUID(uuid: string): boolean;
/**
* Returns whether a transfer is currently in progress or not
*/

View File

@ -71,13 +71,15 @@ export const createPullController = handlerControllerFactory<Partial<PullHandler
}
const previousResponse = proto.response;
if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
if (proto.hasUUID(msg.uuid)) {
if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
}
return;
}
const { uuid, type } = msg;
proto.addUUID(uuid);
// Regular command message (init, end, status)
if (type === 'command') {
const { command } = msg;

View File

@ -206,12 +206,15 @@ export const createPushController = handlerControllerFactory<Partial<PushHandler
}
const previousResponse = proto.response;
if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
if (proto.hasUUID(msg.uuid)) {
if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
}
return;
}
const { uuid, type } = msg;
proto.addUUID(uuid);
// Regular command message (init, end, status)
if (type === 'command') {
const { command } = msg;

View File

@ -79,6 +79,7 @@ export const handlerControllerFactory =
return async (ctx: Context) => {
handleWSUpgrade(wss, ctx, (ws) => {
const state: TransferState = { id: undefined };
const messageuuids = new Set<string>();
const prototype: Handler = {
// Transfer ID
@ -107,6 +108,14 @@ export const handlerControllerFactory =
state.response = response;
},
addUUID(uuid) {
messageuuids.add(uuid);
},
hasUUID(uuid) {
return messageuuids.has(uuid);
},
isTransferStarted() {
return this.transferID !== undefined && this.startedAt !== undefined;
},

View File

@ -79,6 +79,8 @@ module.exports = async (opts) => {
type: 'token',
token: opts.fromToken,
},
retryMessageMaxRetries: 5,
retryMessageTimeout: 15000,
});
}
@ -108,6 +110,8 @@ module.exports = async (opts) => {
restore: {
entities: { exclude: DEFAULT_IGNORED_CONTENT_TYPES },
},
retryMessageMaxRetries: 5,
retryMessageTimeout: 15000,
});
}