fix(async-utils): use p-map instead of custom code

This commit is contained in:
nathan-pichon 2023-01-17 15:07:27 +01:00
parent 49a61a93a7
commit bf613fab35
No known key found for this signature in database
4 changed files with 24 additions and 33 deletions

View File

@ -51,24 +51,30 @@ describe('Async utils', () => {
}).rejects.toThrow('input');
});
test('Should resolve elements two at a time', async () => {
const numberPromiseArray = [1, 2, 3];
const numberPromiseArray = [1, 2, 3, 4, 5, 6];
const getPromiseDelayed = (speed = 0) =>
new Promise((resolve) => {
setTimeout(resolve, speed);
});
const opOrder = [];
let maxOperations = 0;
let operationsCounter = 0;
const mapFunc = mapAsync(numberPromiseArray, { concurrency: 2 });
const result = await mapFunc(async (value, index) => {
if (index === 0) {
const mapFunc = mapAsync(numberPromiseArray);
const result = await mapFunc(
async (value) => {
operationsCounter += 1;
if (operationsCounter > maxOperations) {
maxOperations = operationsCounter;
}
await getPromiseDelayed(20);
}
opOrder.push(index);
return value;
});
operationsCounter -= 1;
return value;
},
{ concurrency: 2 }
);
expect(result).toEqual([1, 2, 3]);
expect(opOrder).toEqual([1, 0, 2]);
expect(result).toEqual([1, 2, 3, 4, 5, 6]);
expect(maxOperations).toEqual(2);
});
});
describe('reduceAsync', () => {

View File

@ -1,3 +1,4 @@
import * as pMap from "p-map";
type PromiseArray<T> = (T | Promise<T>)[];
@ -5,6 +6,6 @@ type MapIteratee<T, R = T> = (value: T, index: number) => R | Promise<R>;
type ReduceIteratee<P, C = P, R = P> = (previousResult: P, currentValue: C, index: number) => R | Promise<R>;
export declare function mapAsync<T = unknown>(numberPromiseArray: number[], options: { concurrency: number }): <R = T>(iteratee: MapIteratee<T, R>) => Promise<R[]>;
export type MapAsync<T = any, R = any> = lodash.CurriedFunction3<T[], (element: T, index: number) => R | Promise<R>, { concurrency?: number }, Promise<R[]>>;
export declare function reduceAsync<T = unknown>(promiseArray: PromiseArray<T>): <R = unknown, I>(iteratee: ReduceIteratee<I | R, T, R>, initialValue?: I) => Promise<R>;

View File

@ -1,6 +1,7 @@
'use strict';
const { chunk } = require('lodash/fp');
const pMap = require('p-map');
const { curry } = require('lodash/fp');
function pipeAsync(...methods) {
return async (data) => {
@ -15,27 +16,9 @@ function pipeAsync(...methods) {
}
/**
* @type { import('./async').mapAsync }
* @type { import('./async').MapAsync }
*/
function mapAsync(promiseArray, { concurrency = Infinity } = {}) {
const appliedConcurrency = concurrency > promiseArray.length ? promiseArray.length : concurrency;
const promiseArrayChunks = chunk(appliedConcurrency)(promiseArray);
return async (iteratee) => {
return promiseArrayChunks.reduce(async (prevChunksPromise, chunk, chunkIndex) => {
// Need to await previous promise in order to respect the concurrency option
const prevChunks = await prevChunksPromise;
// As chunks can contain promises, we need to await the chunk
const awaitedChunk = await Promise.all(chunk);
const transformedPromiseChunk = await Promise.all(
// Calculating the index based on the original array, we do not want to have the index of the element inside the chunk
awaitedChunk.map((value, index) => iteratee(value, chunkIndex * appliedConcurrency + index))
);
return prevChunks.concat(transformedPromiseChunk);
}, Promise.resolve([]));
};
}
const mapAsync = curry(pMap);
/**
* @type { import('./async').reduceAsync }

View File

@ -39,6 +39,7 @@
"date-fns": "2.29.3",
"http-errors": "1.8.1",
"lodash": "4.17.21",
"p-map": "4.0.0",
"yup": "0.32.9"
},
"engines": {