Bluebird remove map mapSeries (#3474)

This commit is contained in:
maximelkin 2019-10-11 11:12:56 +03:00 committed by Igor Savin
parent f782348da7
commit f56eaf5cfc
6 changed files with 51 additions and 68 deletions

View File

@ -122,9 +122,7 @@ Object.assign(Client_SQLite3.prototype, {
return client
._query(connection, sql)
.then((obj) => obj.response)
.map(function(row) {
stream.write(row);
})
.then((rows) => rows.forEach((row) => stream.write(row)))
.catch(function(err) {
stream.emit('error', err);
})

View File

@ -436,14 +436,18 @@ class Migrator {
qb.max('batch').from(getTableName(tableName, schemaName));
})
.orderBy('id', 'desc')
.map((migration) => {
return allMigrations.find((entry) => {
return (
this.config.migrationSource.getMigrationName(entry) ===
migration.name
);
});
});
.then((migrations) =>
Promise.all(
migrations.map((migration) => {
return allMigrations.find((entry) => {
return (
this.config.migrationSource.getMigrationName(entry) ===
migration.name
);
});
})
)
);
}
// Returns the latest batch number.

View File

@ -1,5 +1,5 @@
const { isNumber, chunk, flatten } = require('lodash');
const Bluebird = require('bluebird');
const delay = require('./delay');
module.exports = function batchInsert(
client,
@ -8,63 +8,38 @@ module.exports = function batchInsert(
chunkSize = 1000
) {
let returning = void 0;
let autoTransaction = true;
let transaction = null;
const getTransaction = () =>
new Bluebird((resolve, reject) => {
if (transaction) {
autoTransaction = false;
return resolve(transaction);
}
autoTransaction = true;
client.transaction(resolve).catch(reject);
});
const wrapper = Object.assign(
new Bluebird((resolve, reject) => {
const chunks = chunk(batch, chunkSize);
const runInTransaction = (cb) => {
if (transaction) {
return cb(transaction);
}
return client.transaction(cb);
};
return Object.assign(
Promise.resolve().then(async () => {
if (!isNumber(chunkSize) || chunkSize < 1) {
return reject(new TypeError(`Invalid chunkSize: ${chunkSize}`));
throw new TypeError(`Invalid chunkSize: ${chunkSize}`);
}
if (!Array.isArray(batch)) {
return reject(
new TypeError(`Invalid batch: Expected array, got ${typeof batch}`)
throw new TypeError(
`Invalid batch: Expected array, got ${typeof batch}`
);
}
const chunks = chunk(batch, chunkSize);
//Next tick to ensure wrapper functions are called if needed
return Bluebird.delay(1)
.then(getTransaction)
.then((tr) => {
return Bluebird.mapSeries(chunks, (items) =>
tr(tableName).insert(items, returning)
)
.then((result) => {
result = flatten(result || []);
if (autoTransaction) {
//TODO: -- Oracle tr.commit() does not return a 'thenable' !? Ugly hack for now.
return (tr.commit(result) || Bluebird.resolve()).then(
() => result
);
}
return result;
})
.catch((error) => {
if (autoTransaction) {
return tr.rollback(error).then(() => Bluebird.reject(error));
}
return Bluebird.reject(error);
});
})
.then(resolve)
.catch(reject);
await delay(1);
return runInTransaction(async (tr) => {
const chunksResults = [];
for (const items of chunks) {
chunksResults.push(await tr(tableName).insert(items, returning));
}
return flatten(chunksResults);
});
}),
{
returning(columns) {
@ -79,6 +54,4 @@ module.exports = function batchInsert(
},
}
);
return wrapper;
};

3
lib/util/delay.js Normal file
View File

@ -0,0 +1,3 @@
const { promisify } = require('util');
module.exports = promisify(setTimeout);

View File

@ -5,6 +5,7 @@
const uuid = require('uuid');
const _ = require('lodash');
const bluebird = require('bluebird');
const sinon = require('sinon');
module.exports = function(knex) {
describe('Inserts', function() {
@ -1147,11 +1148,13 @@ module.exports = function(knex) {
});
});
it('#1880 - Duplicate keys in batchInsert should not throw unhandled exception', function() {
it('#1880 - Duplicate keys in batchInsert should not throw unhandled exception', async function() {
if (/redshift/i.test(knex.client.driverName)) {
return;
}
return new bluebird(function(resolve, reject) {
const fn = sinon.stub();
process.on('unhandledRejection', fn);
await new bluebird(function(resolve, reject) {
return knex.schema
.dropTableIfExists('batchInsertDuplicateKey')
.then(function() {
@ -1182,6 +1185,8 @@ module.exports = function(knex) {
resolve(error);
});
}).timeout(10000);
expect(fn).have.not.been.called;
process.removeListener('unhandledRejection', fn);
});
it('knex.batchInsert with specified transaction', function() {

View File

@ -28,7 +28,7 @@ module.exports = function(knex) {
.where('expiry', unsafeBigint)
.select('*');
})
.map(function(row) {
.then(function(row) {
// triggers request execution
})
.then(function() {
@ -36,7 +36,7 @@ module.exports = function(knex) {
.where('expiry', negativeUnsafeBigint)
.select('*');
})
.map(function(row) {
.then(function(row) {
// triggers request execution
})
.catch(function(err) {
@ -79,16 +79,16 @@ module.exports = function(knex) {
.where('expiry', bigintTimestamp)
.select('*');
})
.map(function(row) {
expect(row.id).to.equal('positive');
.then(function(rows) {
rows.forEach((row) => expect(row.id).to.equal('positive'));
})
.then(function() {
return knex(tableName)
.where('expiry', negativeBigintTimestamp)
.select('*');
})
.map(function(row) {
expect(row.id).to.equal('negative');
.then(function(rows) {
rows.forEach((row) => expect(row.id).to.equal('negative'));
})
.catch(function(err) {
expect(err).to.be.undefined;