fix: Data transfer stream backpressure from providers

This commit is contained in:
Christian 2023-10-30 11:43:20 +01:00 committed by GitHub
parent 9e14912c58
commit c76dc9cb2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -383,44 +383,37 @@ export const createPushController = handlerControllerFactory<Partial<PushHandler
} }
for (const item of payload) { for (const item of payload) {
/** const { action, assetID } = item;
* This queues the given callback function to be executed in the next iteration
* of the event loop, immediately after the current operation completes.
*/
process.nextTick(async () => {
const { action, assetID } = item;
if (!assetsStream) { if (!assetsStream) {
throw new Error('Stream not defined'); throw new Error('Stream not defined');
} }
if (action === 'start') { if (action === 'start') {
this.assets[assetID] = { ...item.data, stream: new PassThrough() }; this.assets[assetID] = { ...item.data, stream: new PassThrough() };
writeAsync(assetsStream, this.assets[assetID]); writeAsync(assetsStream, this.assets[assetID]);
} }
if (action === 'stream') { if (action === 'stream') {
// The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array } // The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array }
// We need to transform it back into a Buffer instance // We need to transform it back into a Buffer instance
const rawBuffer = item.data as unknown as { type: 'Buffer'; data: Uint8Array }; const rawBuffer = item.data as unknown as { type: 'Buffer'; data: Uint8Array };
const chunk = Buffer.from(rawBuffer.data); const chunk = Buffer.from(rawBuffer.data);
await writeAsync(this.assets[assetID].stream, chunk);
}
await writeAsync(this.assets[assetID].stream, chunk); if (action === 'end') {
} await new Promise<void>((resolve, reject) => {
const { stream: assetStream } = this.assets[assetID];
if (action === 'end') { assetStream
await new Promise<void>((resolve, reject) => { .on('close', () => {
const { stream: assetStream } = this.assets[assetID]; delete this.assets[assetID];
assetStream resolve();
.on('close', () => { })
delete this.assets[assetID]; .on('error', reject)
resolve(); .end();
}) });
.on('error', reject) }
.end();
});
}
});
} }
}, },