diff --git a/packages/core/utils/lib/__tests__/async.test.js b/packages/core/utils/lib/__tests__/async.test.js index a89151f2ec..1cae0186ee 100644 --- a/packages/core/utils/lib/__tests__/async.test.js +++ b/packages/core/utils/lib/__tests__/async.test.js @@ -51,24 +51,30 @@ describe('Async utils', () => { }).rejects.toThrow('input'); }); test('Should resolve elements two at a time', async () => { - const numberPromiseArray = [1, 2, 3]; + const numberPromiseArray = [1, 2, 3, 4, 5, 6]; const getPromiseDelayed = (speed = 0) => new Promise((resolve) => { setTimeout(resolve, speed); }); - const opOrder = []; + let maxOperations = 0; + let operationsCounter = 0; - const mapFunc = mapAsync(numberPromiseArray, { concurrency: 2 }); - const result = await mapFunc(async (value, index) => { - if (index === 0) { + const mapFunc = mapAsync(numberPromiseArray); + const result = await mapFunc( + async (value) => { + operationsCounter += 1; + if (operationsCounter > maxOperations) { + maxOperations = operationsCounter; + } await getPromiseDelayed(20); - } - opOrder.push(index); - return value; - }); + operationsCounter -= 1; + return value; + }, + { concurrency: 2 } + ); - expect(result).toEqual([1, 2, 3]); - expect(opOrder).toEqual([1, 0, 2]); + expect(result).toEqual([1, 2, 3, 4, 5, 6]); + expect(maxOperations).toEqual(2); }); }); describe('reduceAsync', () => { diff --git a/packages/core/utils/lib/async.d.ts b/packages/core/utils/lib/async.d.ts index 158e85c11b..d1e52caf6d 100644 --- a/packages/core/utils/lib/async.d.ts +++ b/packages/core/utils/lib/async.d.ts @@ -1,3 +1,4 @@ +import * as pMap from "p-map"; type PromiseArray = (T | Promise)[]; @@ -5,6 +6,6 @@ type MapIteratee = (value: T, index: number) => R | Promise; type ReduceIteratee = (previousResult: P, currentValue: C, index: number) => R | Promise; -export declare function mapAsync(numberPromiseArray: number[], options: { concurrency: number }): (iteratee: MapIteratee) => Promise; +export type MapAsync = lodash.CurriedFunction3 R | Promise, { concurrency?: number }, Promise>; export declare function reduceAsync(promiseArray: PromiseArray): (iteratee: ReduceIteratee, initialValue?: I) => Promise; diff --git a/packages/core/utils/lib/async.js b/packages/core/utils/lib/async.js index c6e3041323..9c14998297 100644 --- a/packages/core/utils/lib/async.js +++ b/packages/core/utils/lib/async.js @@ -1,6 +1,7 @@ 'use strict'; -const { chunk } = require('lodash/fp'); +const pMap = require('p-map'); +const { curry } = require('lodash/fp'); function pipeAsync(...methods) { return async (data) => { @@ -15,27 +16,9 @@ function pipeAsync(...methods) { } /** - * @type { import('./async').mapAsync } + * @type { import('./async').MapAsync } */ -function mapAsync(promiseArray, { concurrency = Infinity } = {}) { - const appliedConcurrency = concurrency > promiseArray.length ? promiseArray.length : concurrency; - const promiseArrayChunks = chunk(appliedConcurrency)(promiseArray); - - return async (iteratee) => { - return promiseArrayChunks.reduce(async (prevChunksPromise, chunk, chunkIndex) => { - // Need to await previous promise in order to respect the concurrency option - const prevChunks = await prevChunksPromise; - // As chunks can contain promises, we need to await the chunk - const awaitedChunk = await Promise.all(chunk); - const transformedPromiseChunk = await Promise.all( - // Calculating the index based on the original array, we do not want to have the index of the element inside the chunk - awaitedChunk.map((value, index) => iteratee(value, chunkIndex * appliedConcurrency + index)) - ); - - return prevChunks.concat(transformedPromiseChunk); - }, Promise.resolve([])); - }; -} +const mapAsync = curry(pMap); /** * @type { import('./async').reduceAsync } diff --git a/packages/core/utils/package.json b/packages/core/utils/package.json index 57bd2352eb..997a2e58dd 100644 --- a/packages/core/utils/package.json +++ b/packages/core/utils/package.json @@ -39,6 +39,7 @@ "date-fns": "2.29.3", "http-errors": "1.8.1", "lodash": "4.17.21", + "p-map": "4.0.0", "yup": "0.32.9" }, "engines": {