feat(async-utils): make mapAsync handle concurrency

This commit is contained in:
nathan-pichon 2022-12-20 12:00:49 +01:00
parent 1f4b7c196e
commit eb83b3c3fa
No known key found for this signature in database
3 changed files with 40 additions and 8 deletions

View File

@ -50,6 +50,26 @@ describe('Async utils', () => {
await mapFunc(() => true);
}).rejects.toThrow('input');
});
test('Should resolve elements two at a time', async () => {
const numberPromiseArray = [1, 2, 3];
const getPromiseDelayed = (speed = 0) =>
new Promise((resolve) => {
setTimeout(resolve, speed);
});
const opOrder = [];
const mapFunc = mapAsync(numberPromiseArray, { concurrency: 2 });
const result = await mapFunc(async (value, index) => {
if (index === 0) {
await getPromiseDelayed(20);
}
opOrder.push(index);
return value;
});
expect(result).toEqual([1, 2, 3]);
expect(opOrder).toEqual([1, 0, 2]);
});
});
describe('reduceAsync', () => {
test('Should return a incremented number', async () => {

View File

@ -5,6 +5,6 @@ type MapIteratee<T, R = T> = (value: T, index: number) => R | Promise<R>;
type ReduceIteratee<P, C = P, R = P> = (previousResult: P, currentValue: C, index: number) => R | Promise<R>;
export declare function mapAsync<T = unknown>(promiseArray: PromiseArray<T>): <R = T>(iteratee: MapIteratee<T, R>) => Promise<R[]>;
export declare function mapAsync<T = unknown>(numberPromiseArray: number[], options: { concurrency: number }): <R = T>(iteratee: MapIteratee<T, R>) => Promise<R[]>;
export declare function reduceAsync<T = unknown>(promiseArray: PromiseArray<T>): <R = unknown, I>(iteratee: ReduceIteratee<I | R, T, R>, initialValue?: I) => Promise<R>;

View File

@ -1,5 +1,7 @@
'use strict';
const { chunk } = require('lodash/fp');
function pipeAsync(...methods) {
return async (data) => {
let res = data;
@ -15,13 +17,23 @@ function pipeAsync(...methods) {
/**
* @type { import('./async').mapAsync }
*/
function mapAsync(promiseArray) {
return (callback) => {
const transformedPromiseArray = promiseArray.map(async (promiseValue, index) => {
const value = await promiseValue;
return callback(value, index);
});
return Promise.all(transformedPromiseArray);
function mapAsync(promiseArray, { concurrency = Infinity } = {}) {
const appliedConcurrency = concurrency > promiseArray.length ? promiseArray.length : concurrency;
const promiseArrayChunks = chunk(appliedConcurrency)(promiseArray);
return async (callback) => {
return promiseArrayChunks.reduce(async (prevChunksPromise, chunk, chunkIndex) => {
// Need to await previous promise in order to respect the concurrency option
const prevChunks = await prevChunksPromise;
// As chunks can contain promises, we need to await the chunk
const awaitedChunk = await Promise.all(chunk);
const transformedPromiseChunk = await Promise.all(
// Calculating the index based on the original array, we do not want to have the index of the element inside the chunk
awaitedChunk.map((value, index) => callback(value, chunkIndex * appliedConcurrency + index))
);
return prevChunks.concat(transformedPromiseChunk);
}, Promise.resolve([]));
};
}