knex/lib/runner.js

262 lines
8.2 KiB
JavaScript
Raw Normal View History

const Bluebird = require('bluebird');
2015-05-09 13:58:18 -04:00
let PassThrough;
2015-05-09 13:58:18 -04:00
// The "Runner" constructor takes a "builder" (query, schema, or raw)
// and runs through each of the query statements, calling any additional
// "output" method provided alongside the query and bindings.
function Runner(client, builder) {
this.client = client;
this.builder = builder;
this.queries = [];
2015-05-09 13:58:18 -04:00
// The "connection" object is set on the runner when
// "run" is called.
this.connection = void 0;
2015-05-09 13:58:18 -04:00
}
Object.assign(Runner.prototype, {
2015-05-09 13:58:18 -04:00
// "Run" the target, calling "toSQL" on the builder, returning
// an object or array of queries to run, each of which are run on
// a single connection.
run() {
const runner = this;
return (
2019-11-28 22:44:18 +03:00
this.ensureConnection(function(connection) {
runner.connection = connection;
2015-05-09 13:58:18 -04:00
runner.client.emit('start', runner.builder);
runner.builder.emit('start', runner.builder);
const sql = runner.builder.toSQL();
2015-05-09 13:58:18 -04:00
if (runner.builder._debug) {
runner.client.logger.debug(sql);
}
2015-05-09 13:58:18 -04:00
if (Array.isArray(sql)) {
return runner.queryArray(sql);
}
return runner.query(sql);
})
2015-05-09 13:58:18 -04:00
// If there are any "error" listeners, we fire an error event
// and then re-throw the error to be eventually handled by
// the promise chain. Useful if you're wrapping in a custom `Promise`.
.catch(function(err) {
if (runner.builder._events && runner.builder._events.error) {
runner.builder.emit('error', err);
}
throw err;
})
// Fire a single "end" event on the builder when
// all queries have successfully completed.
.then(function(res) {
runner.builder.emit('end');
return res;
})
);
2015-05-09 13:58:18 -04:00
},
// Stream the result set, by passing through to the dialect's streaming
// capabilities. If the options are
stream(options, handler) {
2015-05-09 13:58:18 -04:00
// If we specify stream(handler).then(...
if (arguments.length === 1) {
if (typeof options === 'function') {
handler = options;
options = {};
}
}
// Determines whether we emit an error or throw here.
const hasHandler = typeof handler === 'function';
2015-05-09 13:58:18 -04:00
// Lazy-load the "PassThrough" dependency.
PassThrough = PassThrough || require('stream').PassThrough;
const runner = this;
const stream = new PassThrough({ objectMode: true });
let hasConnection = false;
2019-11-28 22:44:18 +03:00
const promise = this.ensureConnection(function(connection) {
hasConnection = true;
2015-05-09 13:58:18 -04:00
runner.connection = connection;
try {
const sql = runner.builder.toSQL();
if (Array.isArray(sql) && hasHandler) {
throw new Error(
'The stream may only be used with a single query statement.'
);
}
return runner.client.stream(runner.connection, sql, stream, options);
} catch (e) {
stream.emit('error', e);
throw e;
2015-05-09 13:58:18 -04:00
}
});
2015-05-09 13:58:18 -04:00
// If a function is passed to handle the stream, send the stream
// there and return the promise, otherwise just return the stream
// and the promise will take care of itself.
2015-05-09 13:58:18 -04:00
if (hasHandler) {
handler(stream);
2019-11-28 22:44:18 +03:00
return Bluebird.resolve(promise);
2015-05-09 13:58:18 -04:00
}
// Emit errors on the stream if the error occurred before a connection
// could be acquired.
// If the connection was acquired, assume the error occurred in the client
// code and has already been emitted on the stream. Don't emit it twice.
promise.catch(function(err) {
if (!hasConnection) stream.emit('error', err);
});
2015-05-09 13:58:18 -04:00
return stream;
},
// Allow you to pipe the stream to a writable stream.
pipe(writable, options) {
2015-06-01 15:45:02 -04:00
return this.stream(options).pipe(writable);
2015-05-09 13:58:18 -04:00
},
// "Runs" a query, returning a promise. All queries specified by the builder are guaranteed
// to run in sequence, and on the same connection, especially helpful when schema building
// and dealing with foreign key constraints, etc.
query: async function(obj) {
const { __knexUid, __knexTxId } = this.connection;
this.builder.emit('query', Object.assign({ __knexUid, __knexTxId }, obj));
const runner = this;
let queryPromise = this.client.query(this.connection, obj);
2016-02-15 17:06:08 +01:00
if (obj.timeout) {
queryPromise = queryPromise.timeout(obj.timeout);
2016-02-15 17:06:08 +01:00
}
// Await the return value of client.processResponse; in the case of sqlite3's
// dropColumn()/renameColumn(), it will be a Promise for the transaction
// containing the complete rename procedure.
2016-02-15 17:06:08 +01:00
return queryPromise
.then((resp) => this.client.processResponse(resp, runner))
.then((processedResponse) => {
Add queryContext to schema and query builders (#2314) * feat(query-builder): add hookContext for wrapIdentifier * refactor: use isUndefined * test(transaction): test passing of hookContext * feat(runnner): pass context to postProcessResponse * test(runner): test postProcessResponse for raw responses * test(raw): test passing of hookContext * feat: add hookContext to Raw and SchemaBuilder * test(transaction): fix test for hookContext * chore: fix lint error * fix: check for hookContext before calling it * test(transaction): fix hookContext test * chore: remove whitespace * test(hookContext): test cloning of context object * refactor: hookContext -> queryContext * minor: use more descriptive variable name i.e. refactor: `context` => `queryContext` * fix: remove unnecessary checks for query builder * fix(Raw): pass query builder to formatter * fix(SchemaCompiler): pass schema builder to formatter * refactor: add addQueryContext helper * feat: add queryContext to TableBuilder and ColumnBuilder * fix(TableCompiler): pass table builder to formatter * fix(ColumnCompiler): pass column builder to formatter * fix(pushQuery): fix passing builder to formatter * test(Schema|Table|ColumnCompiler): test passing queryContext * fix(SchemaCompiler): pass queryContext to TableCompiler * fix(TableCompiler): pass queryContext to ColumnCompiler * test: add queryContext tests for all schema dialects * test(TableCompiler): test overwriting queryContext from SchemaCompiler * test(Raw): test passing queryContext to wrapIdentifier * tests: run all the tests
2018-02-01 23:41:01 +01:00
const queryContext = this.builder.queryContext();
const postProcessedResponse = this.client.postProcessResponse(
processedResponse,
queryContext
);
this.builder.emit(
'query-response',
postProcessedResponse,
Object.assign({ __knexUid: this.connection.__knexUid }, obj),
this.builder
);
this.client.emit(
'query-response',
postProcessedResponse,
Object.assign({ __knexUid: this.connection.__knexUid }, obj),
this.builder
);
return postProcessedResponse;
})
.catch(Bluebird.TimeoutError, (error) => {
const { timeout, sql, bindings } = obj;
2016-05-26 11:06:33 -07:00
let cancelQuery;
if (obj.cancelOnTimeout) {
cancelQuery = this.client.cancelQuery(this.connection);
} else {
// If we don't cancel the query, we need to mark the connection as disposed so that
// it gets destroyed by the pool and is never used again. If we don't do this and
// return the connection to the pool, it will be useless until the current operation
// that timed out, finally finishes.
this.connection.__knex__disposed = error;
cancelQuery = Bluebird.resolve();
2016-05-26 11:06:33 -07:00
}
return cancelQuery
.catch((cancelError) => {
// If the cancellation failed, we need to mark the connection as disposed so that
// it gets destroyed by the pool and is never used again. If we don't do this and
// return the connection to the pool, it will be useless until the current operation
// that timed out, finally finishes.
this.connection.__knex__disposed = error;
// cancellation failed
throw Object.assign(cancelError, {
message: `After query timeout of ${timeout}ms exceeded, cancelling of query failed.`,
sql,
bindings,
timeout,
});
})
2016-05-26 11:06:33 -07:00
.then(() => {
// cancellation succeeded, rethrow timeout error
throw Object.assign(error, {
2016-05-26 11:06:33 -07:00
message: `Defined query timeout of ${timeout}ms exceeded when running query.`,
sql,
bindings,
timeout,
2016-05-26 11:06:33 -07:00
});
});
2016-02-15 17:06:08 +01:00
})
.catch((error) => {
this.builder.emit(
'query-error',
error,
Object.assign({ __knexUid: this.connection.__knexUid }, obj)
);
throw error;
});
},
2015-05-09 13:58:18 -04:00
// In the case of the "schema builder" we call `queryArray`, which runs each
// of the queries in sequence.
async queryArray(queries) {
if (queries.length === 1) {
return this.query(queries[0]);
}
const results = [];
for (const query of queries) {
results.push(await this.query(query));
}
return results;
2015-05-09 13:58:18 -04:00
},
// Check whether there's a transaction flag, and that it has a connection.
2019-11-28 22:44:18 +03:00
async ensureConnection(cb) {
// Use override from a builder if passed
if (this.builder._connection) {
2019-11-28 22:44:18 +03:00
return cb(this.builder._connection);
}
if (this.connection) {
2019-11-28 22:44:18 +03:00
return cb(this.connection);
}
return this.client
.acquireConnection()
.catch(Bluebird.TimeoutError, (error) => {
if (this.builder) {
error.sql = this.builder.sql;
error.bindings = this.builder.bindings;
}
throw error;
})
2019-11-28 22:44:18 +03:00
.then(async (connection) => {
try {
return await cb(connection);
} finally {
await this.client.releaseConnection(this.connection);
}
});
},
});
2015-05-09 13:58:18 -04:00
module.exports = Runner;