Fix timeout method (#4324)

This commit is contained in:
martinmacko47 2021-03-22 00:33:59 +01:00 committed by GitHub
parent 910c009870
commit 1744c8c265
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 160 additions and 22 deletions

View File

@ -162,16 +162,18 @@ class Client_MySQL extends Client {
}
async cancelQuery(connectionToKill) {
const conn = await this.acquireConnection();
const conn = await this.acquireRawConnection();
try {
return await this.query(conn, {
method: 'raw',
return await this._query(conn, {
sql: 'KILL QUERY ?',
bindings: [connectionToKill.threadId],
options: {},
});
} finally {
await this.releaseConnection(conn);
await this.destroyRawConnection(conn);
if (conn.__knex__disposed) {
this.logger.warn(`Connection Error: ${conn.__knex__disposed}`);
}
}
}
}

View File

@ -237,24 +237,19 @@ class Client_PG extends Client {
}
async cancelQuery(connectionToKill) {
// Error out if we can't acquire connection in time.
// Purposely not putting timeout on `pg_cancel_backend` execution because erroring
// early there would release the `connectionToKill` back to the pool with
// a `KILL QUERY` command yet to finish.
const conn = await this.acquireConnection();
const conn = await this.acquireRawConnection();
try {
return await this._wrappedCancelQueryCall(conn, connectionToKill);
} finally {
// NOT returning this promise because we want to release the connection
// in a non-blocking fashion
this.releaseConnection(conn);
await this.destroyRawConnection(conn).catch((err) => {
this.logger.warn(`Connection Error: ${err}`);
});
}
}
_wrappedCancelQueryCall(conn, connectionToKill) {
return this.query(conn, {
method: 'raw',
sql: 'SELECT pg_cancel_backend(?);',
return this._query(conn, {
sql: 'SELECT pg_cancel_backend($1);',
bindings: [connectionToKill.processID],
options: {},
});

View File

@ -867,7 +867,118 @@ module.exports = function (knex) {
});
});
it('.timeout(ms, {cancel: true}) should throw error if cancellation cannot acquire connection', async function () {
it('.timeout(ms, {cancel: true}) should throw TimeoutError and cancel slow query in transaction', function () {
const driverName = knex.client.driverName;
if (driverName === 'sqlite3') {
return this.skip();
} //TODO -- No built-in support for sleeps
if (/redshift/.test(driverName)) {
return this.skip();
}
// There's unexpected behavior caused by knex releasing a connection back
// to the pool because of a timeout when a long query is still running.
// A subsequent query will acquire the connection (still in-use) and hang
// until the first query finishes. Setting a sleep time longer than the
// mocha timeout exposes this behavior.
const testQueries = {
pg: function () {
return knex.raw('SELECT pg_sleep(10)');
},
mysql: function () {
return knex.raw('SELECT SLEEP(10)');
},
mysql2: function () {
return knex.raw('SELECT SLEEP(10)');
},
mssql: function () {
return knex.raw("WAITFOR DELAY '00:00:10'");
},
oracledb: function () {
return knex.raw('begin dbms_lock.sleep(10); end;');
},
};
if (!Object.prototype.hasOwnProperty.call(testQueries, driverName)) {
throw new Error('Missing test query for driverName: ' + driverName);
}
const query = testQueries[driverName]();
function addTimeout() {
return query.timeout(200, { cancel: true });
}
// Only mysql/postgres query cancelling supported for now
if (
!_.startsWith(driverName, 'mysql') &&
!_.startsWith(driverName, 'pg')
) {
expect(addTimeout).to.throw(
'Query cancelling not supported for this dialect'
);
return; // TODO: Use `this.skip()` here?
}
const getProcessesQueries = {
pg: function () {
return knex.raw('SELECT * from pg_stat_activity');
},
mysql: function () {
return knex.raw('SHOW PROCESSLIST');
},
mysql2: function () {
return knex.raw('SHOW PROCESSLIST');
},
};
if (
!Object.prototype.hasOwnProperty.call(getProcessesQueries, driverName)
) {
throw new Error('Missing test query for driverName: ' + driverName);
}
const getProcessesQuery = getProcessesQueries[driverName]();
return knex.transaction((trx) => addTimeout().transacting(trx))
.then(function () {
expect(true).to.equal(false);
})
.catch(function (error) {
expect(_.pick(error, 'timeout', 'name', 'message')).to.deep.equal({
timeout: 200,
name: 'KnexTimeoutError',
message:
'Defined query timeout of 200ms exceeded when running query.',
});
// Ensure sleep command is removed.
// This query will hang if a connection gets released back to the pool
// too early.
// 50ms delay since killing query doesn't seem to have immediate effect to the process listing
return delay(50)
.then(function () {
return getProcessesQuery;
})
.then(function (results) {
let processes;
let sleepProcess;
if (_.startsWith(driverName, 'pg')) {
processes = results.rows;
sleepProcess = _.find(processes, { query: query.toString() });
} else {
processes = results[0];
sleepProcess = _.find(processes, {
Info: 'SELECT SLEEP(10)',
});
}
expect(sleepProcess).to.equal(undefined);
});
});
});
it('.timeout(ms, {cancel: true}) should cancel slow query even if connection pool is exhausted', async function () {
// Only mysql/postgres query cancelling supported for now
if (!isMysql(knex) && !isPostgreSQL(knex)) {
return this.skip();
@ -907,15 +1018,45 @@ module.exports = function (knex) {
const query = testQueries[driverName]();
// We must use the original knex instance without the exhausted pool to list running queries
const getProcessesForDriver = {
pg: async () => {
const results = await knex.raw('SELECT * from pg_stat_activity');
return _.map(_.filter(results.rows, {state: 'active'}), 'query');
},
mysql: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
mysql2: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
};
if (
!Object.prototype.hasOwnProperty.call(getProcessesForDriver, driverName)
) {
throw new Error('Missing test query for driverName: ' + driverName);
}
const getProcesses = getProcessesForDriver[driverName];
try {
await expect(
query.timeout(1, { cancel: true })
).to.eventually.be.rejected.and.to.deep.include({
timeout: 1,
const promise = query.timeout(50, { cancel: true }).then(_.identity)
await delay(10)
const processesBeforeTimeout = await getProcesses();
expect(processesBeforeTimeout).to.include(query.toString())
await expect(promise).to.eventually.be.rejected.and.to.deep.include({
timeout: 50,
name: 'KnexTimeoutError',
message:
'After query timeout of 1ms exceeded, cancelling of query failed.',
message: 'Defined query timeout of 50ms exceeded when running query.',
});
const processesAfterTimeout = await getProcesses();
expect(processesAfterTimeout).to.not.include(query.toString())
} finally {
await knexDb.destroy();
}