From c76dc9cb2bc31c698cbe466fd809d947233ff01b Mon Sep 17 00:00:00 2001 From: Christian Date: Mon, 30 Oct 2023 11:43:20 +0100 Subject: [PATCH] fix: Data transfer stream backpressure from providers --- .../src/strapi/remote/handlers/push.ts | 61 ++++++++----------- 1 file changed, 27 insertions(+), 34 deletions(-) diff --git a/packages/core/data-transfer/src/strapi/remote/handlers/push.ts b/packages/core/data-transfer/src/strapi/remote/handlers/push.ts index 9c26d2d3ff..8245a895a2 100644 --- a/packages/core/data-transfer/src/strapi/remote/handlers/push.ts +++ b/packages/core/data-transfer/src/strapi/remote/handlers/push.ts @@ -383,44 +383,37 @@ export const createPushController = handlerControllerFactory { - const { action, assetID } = item; + const { action, assetID } = item; - if (!assetsStream) { - throw new Error('Stream not defined'); - } + if (!assetsStream) { + throw new Error('Stream not defined'); + } - if (action === 'start') { - this.assets[assetID] = { ...item.data, stream: new PassThrough() }; - writeAsync(assetsStream, this.assets[assetID]); - } + if (action === 'start') { + this.assets[assetID] = { ...item.data, stream: new PassThrough() }; + writeAsync(assetsStream, this.assets[assetID]); + } - if (action === 'stream') { - // The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array } - // We need to transform it back into a Buffer instance - const rawBuffer = item.data as unknown as { type: 'Buffer'; data: Uint8Array }; - const chunk = Buffer.from(rawBuffer.data); + if (action === 'stream') { + // The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array } + // We need to transform it back into a Buffer instance + const rawBuffer = item.data as unknown as { type: 'Buffer'; data: Uint8Array }; + const chunk = Buffer.from(rawBuffer.data); + await writeAsync(this.assets[assetID].stream, chunk); + } - await writeAsync(this.assets[assetID].stream, chunk); - } - - if (action === 'end') { - await new Promise((resolve, reject) => { - const { stream: assetStream } = this.assets[assetID]; - assetStream - .on('close', () => { - delete this.assets[assetID]; - resolve(); - }) - .on('error', reject) - .end(); - }); - } - }); + if (action === 'end') { + await new Promise((resolve, reject) => { + const { stream: assetStream } = this.assets[assetID]; + assetStream + .on('close', () => { + delete this.assets[assetID]; + resolve(); + }) + .on('error', reject) + .end(); + }); + } } },