refactoring and adding check to pull as well

This commit is contained in:
Bassel 2023-05-23 03:49:04 +03:00
parent b581df8424
commit 6058822ad0
5 changed files with 37 additions and 30 deletions

View File

@ -35,14 +35,12 @@ export const createDispatcher = (ws: WebSocket) => {
const uuid = randomUUID(); const uuid = randomUUID();
const payload = { ...message, uuid }; const payload = { ...message, uuid };
let numberOfTimesMessageWasSent = 0; let numberOfTimesMessageWasSent = 0;
let responseWasReceived = false;
if (options.attachTransfer) { if (options.attachTransfer) {
Object.assign(payload, { transferID: state.transfer?.id }); Object.assign(payload, { transferID: state.transfer?.id });
} }
const stringifiedPayload = JSON.stringify(payload); const stringifiedPayload = JSON.stringify(payload);
ws.send(stringifiedPayload, (error) => { ws.send(stringifiedPayload, (error) => {
if (error) { if (error) {
reject(error); reject(error);
@ -50,28 +48,26 @@ export const createDispatcher = (ws: WebSocket) => {
}); });
const sendPeriodically = () => { const sendPeriodically = () => {
setTimeout(() => {
if (!responseWasReceived) {
if (numberOfTimesMessageWasSent < 5) { if (numberOfTimesMessageWasSent < 5) {
numberOfTimesMessageWasSent += 1; numberOfTimesMessageWasSent += 1;
ws.send(stringifiedPayload); ws.send(stringifiedPayload, (error) => {
sendPeriodically(); if (error) {
reject(error);
}
});
} else { } else {
reject(new ProviderError('error', 'Request timed out')); reject(new ProviderError('error', 'Request timed out'));
} }
}
}, 50000);
}; };
const interval = setInterval(sendPeriodically, 30000);
const onResponse = (raw: RawData) => { const onResponse = (raw: RawData) => {
responseWasReceived = true;
numberOfTimesMessageWasSent = 0;
const response: server.Message<U> = JSON.parse(raw.toString()); const response: server.Message<U> = JSON.parse(raw.toString());
if (response.uuid === uuid) { if (response.uuid === uuid) {
clearInterval(interval);
if (response.error) { if (response.error) {
return reject(new ProviderError('error', response.error.message)); return reject(new ProviderError('error', response.error.message));
} }
resolve(response.data ?? null); resolve(response.data ?? null);
} else { } else {
ws.once('message', onResponse); ws.once('message', onResponse);
@ -79,7 +75,6 @@ export const createDispatcher = (ws: WebSocket) => {
}; };
ws.once('message', onResponse); ws.once('message', onResponse);
sendPeriodically();
}); });
}; };

View File

@ -8,6 +8,11 @@ type BufferLike = Parameters<WebSocket['send']>[0];
export interface TransferState { export interface TransferState {
id?: string; id?: string;
startedAt?: number; startedAt?: number;
response?: {
uuid?: string;
e?: Error | null;
data?: unknown;
};
} }
export interface Handler { export interface Handler {
@ -19,11 +24,8 @@ export interface Handler {
get startedAt(): TransferState['startedAt']; get startedAt(): TransferState['startedAt'];
set startedAt(id: TransferState['startedAt']); set startedAt(id: TransferState['startedAt']);
// Add message UUIDs get response(): TransferState['response'];
addUUID(uuid: string): void; set response(response: TransferState['response']);
// Check if a message UUID exists
hasUUID(uuid: string): boolean;
/** /**
* Returns whether a transfer is currently in progress or not * Returns whether a transfer is currently in progress or not

View File

@ -70,6 +70,12 @@ export const createPullController = handlerControllerFactory<Partial<PullHandler
await this.respond(undefined, new Error('Missing uuid in message')); await this.respond(undefined, new Error('Missing uuid in message'));
} }
const previousResponse = proto.response;
if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
return;
}
const { uuid, type } = msg; const { uuid, type } = msg;
// Regular command message (init, end, status) // Regular command message (init, end, status)

View File

@ -205,13 +205,13 @@ export const createPushController = handlerControllerFactory<Partial<PushHandler
await this.respond(undefined, new Error('Missing uuid in message')); await this.respond(undefined, new Error('Missing uuid in message'));
} }
if (proto.hasUUID(msg.uuid)) { const previousResponse = proto.response;
await this.respond(msg.uuid); if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
return; return;
} }
const { uuid, type } = msg; const { uuid, type } = msg;
proto.addUUID(uuid);
// Regular command message (init, end, status) // Regular command message (init, end, status)
if (type === 'command') { if (type === 'command') {
const { command } = msg; const { command } = msg;

View File

@ -79,7 +79,6 @@ export const handlerControllerFactory =
return async (ctx: Context) => { return async (ctx: Context) => {
handleWSUpgrade(wss, ctx, (ws) => { handleWSUpgrade(wss, ctx, (ws) => {
const state: TransferState = { id: undefined }; const state: TransferState = { id: undefined };
const messageuuids = new Set<string>();
const prototype: Handler = { const prototype: Handler = {
// Transfer ID // Transfer ID
@ -100,12 +99,12 @@ export const handlerControllerFactory =
state.startedAt = timestamp; state.startedAt = timestamp;
}, },
addUUID(uuid) { get response() {
messageuuids.add(uuid); return state.response;
}, },
hasUUID(uuid) { set response(response) {
return messageuuids.has(uuid); state.response = response;
}, },
isTransferStarted() { isTransferStarted() {
@ -135,7 +134,11 @@ export const handlerControllerFactory =
reject(new Error('Missing uuid for this message')); reject(new Error('Missing uuid for this message'));
return; return;
} }
this.response = {
uuid,
data,
e,
};
const payload = JSON.stringify({ const payload = JSON.stringify({
uuid, uuid,
data: data ?? null, data: data ?? null,
@ -208,6 +211,7 @@ export const handlerControllerFactory =
cleanup() { cleanup() {
this.transferID = undefined; this.transferID = undefined;
this.startedAt = undefined; this.startedAt = undefined;
this.response = undefined;
}, },
teardown() { teardown() {