mirror of
https://github.com/knex/knex.git
synced 2026-01-05 11:38:53 +00:00
Implement "skipLocked()" and "noWait()" (#2961)
This commit is contained in:
parent
c60d2b5f1a
commit
bc1ddcad01
@ -48,6 +48,16 @@ assign(QueryCompiler_MySQL.prototype, {
|
||||
return 'lock in share mode';
|
||||
},
|
||||
|
||||
// Only supported on MySQL 8.0+
|
||||
skipLocked() {
|
||||
return 'skip locked';
|
||||
},
|
||||
|
||||
// Supported on MySQL 8.0+ and MariaDB 10.3.0+
|
||||
noWait() {
|
||||
return 'nowait';
|
||||
},
|
||||
|
||||
// Compiles a `columnInfo` query.
|
||||
columnInfo() {
|
||||
const column = this.single.columnInfo;
|
||||
|
||||
@ -103,6 +103,14 @@ assign(QueryCompiler_PG.prototype, {
|
||||
);
|
||||
},
|
||||
|
||||
skipLocked() {
|
||||
return 'skip locked';
|
||||
},
|
||||
|
||||
noWait() {
|
||||
return 'nowait';
|
||||
},
|
||||
|
||||
// Compiles a columnInfo query
|
||||
columnInfo() {
|
||||
const column = this.single.columnInfo;
|
||||
|
||||
@ -25,6 +25,8 @@ const {
|
||||
} = require('lodash');
|
||||
const saveAsyncStack = require('../util/save-async-stack');
|
||||
|
||||
const { lockMode, waitMode } = require('./constants');
|
||||
|
||||
// Typically called from `knex.builder`,
|
||||
// start a new query building chain.
|
||||
function Builder(client) {
|
||||
@ -950,10 +952,8 @@ assign(Builder.prototype, {
|
||||
// Sets the values for a `select` query, informing that only the first
|
||||
// row should be returned (limit 1).
|
||||
first() {
|
||||
const { _method } = this;
|
||||
|
||||
if (!includes(['pluck', 'first', 'select'], _method)) {
|
||||
throw new Error(`Cannot chain .first() on "${_method}" query!`);
|
||||
if (!this._isSelectQuery()) {
|
||||
throw new Error(`Cannot chain .first() on "${this._method}" query!`);
|
||||
}
|
||||
|
||||
const args = new Array(arguments.length);
|
||||
@ -1081,18 +1081,52 @@ assign(Builder.prototype, {
|
||||
|
||||
// Set a lock for update constraint.
|
||||
forUpdate() {
|
||||
this._single.lock = 'forUpdate';
|
||||
this._single.lock = lockMode.forUpdate;
|
||||
this._single.lockTables = helpers.normalizeArr.apply(null, arguments);
|
||||
return this;
|
||||
},
|
||||
|
||||
// Set a lock for share constraint.
|
||||
forShare() {
|
||||
this._single.lock = 'forShare';
|
||||
this._single.lock = lockMode.forShare;
|
||||
this._single.lockTables = helpers.normalizeArr.apply(null, arguments);
|
||||
return this;
|
||||
},
|
||||
|
||||
// Skips locked rows when using a lock constraint.
|
||||
skipLocked() {
|
||||
if (!this._isSelectQuery()) {
|
||||
throw new Error(`Cannot chain .skipLocked() on "${this._method}" query!`);
|
||||
}
|
||||
if (!this._hasLockMode()) {
|
||||
throw new Error(
|
||||
'.skipLocked() can only be used after a call to .forShare() or .forUpdate()!'
|
||||
);
|
||||
}
|
||||
if (this._single.waitMode === waitMode.noWait) {
|
||||
throw new Error('.skipLocked() cannot be used together with .noWait()!');
|
||||
}
|
||||
this._single.waitMode = waitMode.skipLocked;
|
||||
return this;
|
||||
},
|
||||
|
||||
// Causes error when acessing a locked row instead of waiting for it to be released.
|
||||
noWait() {
|
||||
if (!this._isSelectQuery()) {
|
||||
throw new Error(`Cannot chain .noWait() on "${this._method}" query!`);
|
||||
}
|
||||
if (!this._hasLockMode()) {
|
||||
throw new Error(
|
||||
'.noWait() can only be used after a call to .forShare() or .forUpdate()!'
|
||||
);
|
||||
}
|
||||
if (this._single.waitMode === waitMode.skipLocked) {
|
||||
throw new Error('.noWait() cannot be used together with .skipLocked()!');
|
||||
}
|
||||
this._single.waitMode = waitMode.noWait;
|
||||
return this;
|
||||
},
|
||||
|
||||
// Takes a JS object of methods to call and calls them
|
||||
fromJS(obj) {
|
||||
each(obj, (val, key) => {
|
||||
@ -1179,6 +1213,16 @@ assign(Builder.prototype, {
|
||||
_clearGrouping(grouping) {
|
||||
this._statements = reject(this._statements, { grouping });
|
||||
},
|
||||
|
||||
// Helper function that checks if the builder will emit a select query
|
||||
_isSelectQuery() {
|
||||
return includes(['pluck', 'first', 'select'], this._method);
|
||||
},
|
||||
|
||||
// Helper function that checks if the query has a lock mode set
|
||||
_hasLockMode() {
|
||||
return includes([lockMode.forShare, lockMode.forUpdate], this._single.lock);
|
||||
},
|
||||
});
|
||||
|
||||
Object.defineProperty(Builder.prototype, 'or', {
|
||||
|
||||
@ -48,6 +48,7 @@ const components = [
|
||||
'limit',
|
||||
'offset',
|
||||
'lock',
|
||||
'waitMode',
|
||||
];
|
||||
|
||||
assign(QueryCompiler.prototype, {
|
||||
@ -542,6 +543,27 @@ assign(QueryCompiler.prototype, {
|
||||
}
|
||||
},
|
||||
|
||||
// Compiles the wait mode on the locks.
|
||||
waitMode() {
|
||||
if (this.single.waitMode) {
|
||||
return this[this.single.waitMode]();
|
||||
}
|
||||
},
|
||||
|
||||
// Fail on unsupported databases
|
||||
skipLocked() {
|
||||
throw new Error(
|
||||
'.skipLocked() is currently only supported on MySQL 8.0+ and PostgreSQL 9.5+'
|
||||
);
|
||||
},
|
||||
|
||||
// Fail on unsupported databases
|
||||
noWait() {
|
||||
throw new Error(
|
||||
'.noWait() is currently only supported on MySQL 8.0+, MariaDB 10.3.0+ and PostgreSQL 9.5+'
|
||||
);
|
||||
},
|
||||
|
||||
// On Clause
|
||||
// ------
|
||||
|
||||
|
||||
13
src/query/constants.js
Normal file
13
src/query/constants.js
Normal file
@ -0,0 +1,13 @@
|
||||
/**
|
||||
* internal constants, do not use in application code
|
||||
*/
|
||||
module.exports = {
|
||||
lockMode: {
|
||||
forShare: 'forShare',
|
||||
forUpdate: 'forUpdate',
|
||||
},
|
||||
waitMode: {
|
||||
skipLocked: 'skipLocked',
|
||||
noWait: 'noWait',
|
||||
},
|
||||
};
|
||||
@ -1247,5 +1247,131 @@ module.exports = function(knex) {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('forUpdate().skipLocked() with order by should return the first non-locked row', async function() {
|
||||
// Note: this test doesn't work properly on MySQL - see https://bugs.mysql.com/bug.php?id=67745
|
||||
if (knex.client.driverName !== 'pg') {
|
||||
return;
|
||||
}
|
||||
|
||||
const rowName = 'row for skipLocked() test #1';
|
||||
await knex('test_default_table').insert([
|
||||
{ string: rowName, tinyint: 1 },
|
||||
{ string: rowName, tinyint: 2 },
|
||||
]);
|
||||
|
||||
const res = await knex.transaction(async (trx) => {
|
||||
// lock the first row in the test
|
||||
await trx('test_default_table')
|
||||
.where({ string: rowName })
|
||||
.orderBy('tinyint', 'asc')
|
||||
.first()
|
||||
.forUpdate();
|
||||
|
||||
// try to lock the next available row
|
||||
return await knex('test_default_table')
|
||||
.where({ string: rowName })
|
||||
.orderBy('tinyint', 'asc')
|
||||
.forUpdate()
|
||||
.skipLocked()
|
||||
.first();
|
||||
});
|
||||
|
||||
// assert that we got the second row because the first one was locked
|
||||
expect(res.tinyint).to.equal(2);
|
||||
});
|
||||
|
||||
it('forUpdate().skipLocked() should return an empty set when all rows are locked', async function() {
|
||||
if (
|
||||
knex.client.driverName !== 'pg' &&
|
||||
knex.client.driverName !== 'mysql'
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const rowName = 'row for skipLocked() test #2';
|
||||
await knex('test_default_table').insert([
|
||||
{ string: rowName, tinyint: 1 },
|
||||
{ string: rowName, tinyint: 2 },
|
||||
]);
|
||||
|
||||
const res = await knex.transaction(async (trx) => {
|
||||
// lock all of the test rows
|
||||
await trx('test_default_table')
|
||||
.where({ string: rowName })
|
||||
.forUpdate();
|
||||
|
||||
// try to aquire the lock on one more row (which isn't available)
|
||||
return await knex('test_default_table')
|
||||
.where({ string: rowName })
|
||||
.forUpdate()
|
||||
.skipLocked()
|
||||
.limit(1);
|
||||
});
|
||||
|
||||
expect(res).to.be.empty;
|
||||
});
|
||||
|
||||
it('forUpdate().noWait() should throw immediately when a row is locked', async function() {
|
||||
if (
|
||||
knex.client.driverName !== 'pg' &&
|
||||
knex.client.driverName !== 'mysql'
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const rowName = 'row for noWait() test';
|
||||
await knex('test_default_table').insert([
|
||||
{ string: rowName, tinyint: 1 },
|
||||
{ string: rowName, tinyint: 2 },
|
||||
]);
|
||||
|
||||
const promise = knex.transaction(async (trx) => {
|
||||
// select and lock only the first row from this test
|
||||
// note: MySQL may lock both rows depending on how the results are fetched
|
||||
await trx('test_default_table')
|
||||
.where({ string: rowName })
|
||||
.orderBy('tinyint', 'asc')
|
||||
.first()
|
||||
.forUpdate();
|
||||
|
||||
// try to lock it again (and fail)
|
||||
await trx('test_default_table')
|
||||
.where({ string: rowName })
|
||||
.orderBy('tinyint', 'asc')
|
||||
.forUpdate()
|
||||
.noWait()
|
||||
.first();
|
||||
});
|
||||
|
||||
// catch the expected errors
|
||||
promise.catch((err) => {
|
||||
switch (knex.client.driverName) {
|
||||
case 'pg':
|
||||
expect(err.message).to.contain('could not obtain lock on row');
|
||||
break;
|
||||
case 'mysql':
|
||||
case 'mysql2':
|
||||
// mysql
|
||||
expect(err.message).to.contain(
|
||||
'lock(s) could not be acquired immediately'
|
||||
);
|
||||
// mariadb
|
||||
// TODO: detect if test is being run on mysql or mariadb to check for the correct error message
|
||||
// expect(err.message).to.contain('Lock wait timeout exceeded');
|
||||
break;
|
||||
default:
|
||||
// unsupported database
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
||||
// fail the test if the transaction succeeds
|
||||
promise.then(() => {
|
||||
expect(
|
||||
'The query should have been cancelled when trying to select a locked row with .noWait()'
|
||||
).to.be.false;
|
||||
});
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
@ -6998,6 +6998,86 @@ describe('QueryBuilder', function() {
|
||||
);
|
||||
});
|
||||
|
||||
it('lock for update with skip locked #1937', function() {
|
||||
testsql(
|
||||
qb()
|
||||
.select('*')
|
||||
.from('foo')
|
||||
.first()
|
||||
.forUpdate()
|
||||
.skipLocked(),
|
||||
{
|
||||
mysql: {
|
||||
sql: 'select * from `foo` limit ? for update skip locked',
|
||||
bindings: [1],
|
||||
},
|
||||
pg: {
|
||||
sql: 'select * from "foo" limit ? for update skip locked',
|
||||
bindings: [1],
|
||||
},
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
it('lock for update with nowait #1937', function() {
|
||||
testsql(
|
||||
qb()
|
||||
.select('*')
|
||||
.from('foo')
|
||||
.first()
|
||||
.forUpdate()
|
||||
.noWait(),
|
||||
{
|
||||
mysql: {
|
||||
sql: 'select * from `foo` limit ? for update nowait',
|
||||
bindings: [1],
|
||||
},
|
||||
pg: {
|
||||
sql: 'select * from "foo" limit ? for update nowait',
|
||||
bindings: [1],
|
||||
},
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
it('noWait and skipLocked require a lock mode to be set', function() {
|
||||
expect(function() {
|
||||
qb()
|
||||
.select('*')
|
||||
.noWait()
|
||||
.toString();
|
||||
}).to.throw(
|
||||
'.noWait() can only be used after a call to .forShare() or .forUpdate()!'
|
||||
);
|
||||
expect(function() {
|
||||
qb()
|
||||
.select('*')
|
||||
.skipLocked()
|
||||
.toString();
|
||||
}).to.throw(
|
||||
'.skipLocked() can only be used after a call to .forShare() or .forUpdate()!'
|
||||
);
|
||||
});
|
||||
|
||||
it('skipLocked conflicts with noWait and vice-versa', function() {
|
||||
expect(function() {
|
||||
qb()
|
||||
.select('*')
|
||||
.forUpdate()
|
||||
.noWait()
|
||||
.skipLocked()
|
||||
.toString();
|
||||
}).to.throw('.skipLocked() cannot be used together with .noWait()!');
|
||||
expect(function() {
|
||||
qb()
|
||||
.select('*')
|
||||
.forUpdate()
|
||||
.skipLocked()
|
||||
.noWait()
|
||||
.toString();
|
||||
}).to.throw('.noWait() cannot be used together with .skipLocked()!');
|
||||
});
|
||||
|
||||
it('allows insert values of sub-select, #121', function() {
|
||||
testsql(
|
||||
qb()
|
||||
|
||||
3
types/index.d.ts
vendored
3
types/index.d.ts
vendored
@ -1356,6 +1356,9 @@ declare namespace Knex {
|
||||
forShare(...tableNames: string[]): QueryBuilder<TRecord, TResult>;
|
||||
forShare(tableNames: string[]): QueryBuilder<TRecord, TResult>;
|
||||
|
||||
skipLocked(): QueryBuilder<TRecord, TResult>;
|
||||
noWait(): QueryBuilder<TRecord, TResult>;
|
||||
|
||||
toSQL(): Sql;
|
||||
|
||||
on(event: string, callback: Function): QueryBuilder<TRecord, TResult>;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user