From eb83b3c3fac47b576e3120adcdadeea900e891bd Mon Sep 17 00:00:00 2001 From: nathan-pichon Date: Tue, 20 Dec 2022 12:00:49 +0100 Subject: [PATCH] feat(async-utils): make mapAsync handle concurrency --- .../core/utils/lib/__tests__/async.test.js | 20 ++++++++++++++ packages/core/utils/lib/async.d.ts | 2 +- packages/core/utils/lib/async.js | 26 ++++++++++++++----- 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/packages/core/utils/lib/__tests__/async.test.js b/packages/core/utils/lib/__tests__/async.test.js index 54c7fce20c..a89151f2ec 100644 --- a/packages/core/utils/lib/__tests__/async.test.js +++ b/packages/core/utils/lib/__tests__/async.test.js @@ -50,6 +50,26 @@ describe('Async utils', () => { await mapFunc(() => true); }).rejects.toThrow('input'); }); + test('Should resolve elements two at a time', async () => { + const numberPromiseArray = [1, 2, 3]; + const getPromiseDelayed = (speed = 0) => + new Promise((resolve) => { + setTimeout(resolve, speed); + }); + const opOrder = []; + + const mapFunc = mapAsync(numberPromiseArray, { concurrency: 2 }); + const result = await mapFunc(async (value, index) => { + if (index === 0) { + await getPromiseDelayed(20); + } + opOrder.push(index); + return value; + }); + + expect(result).toEqual([1, 2, 3]); + expect(opOrder).toEqual([1, 0, 2]); + }); }); describe('reduceAsync', () => { test('Should return a incremented number', async () => { diff --git a/packages/core/utils/lib/async.d.ts b/packages/core/utils/lib/async.d.ts index c991c131d7..158e85c11b 100644 --- a/packages/core/utils/lib/async.d.ts +++ b/packages/core/utils/lib/async.d.ts @@ -5,6 +5,6 @@ type MapIteratee = (value: T, index: number) => R | Promise; type ReduceIteratee = (previousResult: P, currentValue: C, index: number) => R | Promise; -export declare function mapAsync(promiseArray: PromiseArray): (iteratee: MapIteratee) => Promise; +export declare function mapAsync(numberPromiseArray: number[], options: { concurrency: number }): (iteratee: MapIteratee) => Promise; export declare function reduceAsync(promiseArray: PromiseArray): (iteratee: ReduceIteratee, initialValue?: I) => Promise; diff --git a/packages/core/utils/lib/async.js b/packages/core/utils/lib/async.js index 24178ec3ce..058544fc12 100644 --- a/packages/core/utils/lib/async.js +++ b/packages/core/utils/lib/async.js @@ -1,5 +1,7 @@ 'use strict'; +const { chunk } = require('lodash/fp'); + function pipeAsync(...methods) { return async (data) => { let res = data; @@ -15,13 +17,23 @@ function pipeAsync(...methods) { /** * @type { import('./async').mapAsync } */ -function mapAsync(promiseArray) { - return (callback) => { - const transformedPromiseArray = promiseArray.map(async (promiseValue, index) => { - const value = await promiseValue; - return callback(value, index); - }); - return Promise.all(transformedPromiseArray); +function mapAsync(promiseArray, { concurrency = Infinity } = {}) { + const appliedConcurrency = concurrency > promiseArray.length ? promiseArray.length : concurrency; + const promiseArrayChunks = chunk(appliedConcurrency)(promiseArray); + + return async (callback) => { + return promiseArrayChunks.reduce(async (prevChunksPromise, chunk, chunkIndex) => { + // Need to await previous promise in order to respect the concurrency option + const prevChunks = await prevChunksPromise; + // As chunks can contain promises, we need to await the chunk + const awaitedChunk = await Promise.all(chunk); + const transformedPromiseChunk = await Promise.all( + // Calculating the index based on the original array, we do not want to have the index of the element inside the chunk + awaitedChunk.map((value, index) => callback(value, chunkIndex * appliedConcurrency + index)) + ); + + return prevChunks.concat(transformedPromiseChunk); + }, Promise.resolve([])); }; }