knex/lib/runner.js

211 lines
6.7 KiB
JavaScript
Raw Normal View History

var _ = require('lodash');
var Promise = require('./promise');
// The "Runner" constructor takes a "builder" (query, schema, or raw)
// and runs through each of the query statements, calling any additional
// "output" method provided alongside the query and bindings.
function Runner(builder) {
2014-04-09 10:11:41 -04:00
this.builder = builder;
this.queries = [];
// The "connection" object is set on the runner when
// "run" is called.
this.connection = void 0;
}
2014-04-16 01:23:50 -04:00
Runner.prototype._beginTransaction = 'begin;';
Runner.prototype._commitTransaction = 'commit;';
Runner.prototype._rollbackTransaction = 'rollback;';
2014-04-09 10:11:41 -04:00
// "Run" the target, calling "toSQL" on the builder, returning
// an object or array of queries to run, each of which are run on
// a single connection.
Runner.prototype.run = Promise.method(function() {
if (this.builder._transacting) {
return this.transactionQuery();
}
return Promise.bind(this)
.then(this.ensureConnection)
2014-04-16 01:23:50 -04:00
.then(function(connection) {
this.connection = connection;
2014-04-09 10:11:41 -04:00
var sql = this.builder.toSQL();
if (_.isArray(sql)) {
return this.queryArray(sql);
}
return this.query(sql);
})
2014-04-09 10:11:41 -04:00
.finally(this.cleanupConnection);
});
2014-04-09 10:11:41 -04:00
// Stream the result set, by passing through to the dialect's streaming
// capabilities. If the options are
var PassThrough;
2014-05-05 19:48:12 -04:00
Runner.prototype.stream = Promise.method(function(options, handler) {
2014-04-09 10:11:41 -04:00
// If we specify stream(handler).then(...
if (arguments.length === 1) {
if (_.isFunction(options)) {
handler = options;
options = {};
}
}
// Lazy-load the "PassThrough" dependency.
PassThrough = PassThrough || require('readable-stream').PassThrough;
var stream = new PassThrough({objectMode: true});
var promise = Promise.bind(this)
2014-05-05 19:48:12 -04:00
.then(this.ensureConnection)
.then(function(connection) {
this.connection = connection;
var sql = this.builder.toSQL();
var err = new Error('The stream may only be used with a single query statement.');
if (_.isArray(sql)) {
2014-05-05 22:00:29 -04:00
stream.emit('error', err);
throw err;
2014-05-05 19:48:12 -04:00
}
return sql;
}).then(function(sql) {
return this._stream(sql, stream, options);
2014-05-05 19:48:12 -04:00
}).finally(this.cleanupConnection);
// If a function is passed to handle the stream, send the stream
// there and return the promise, otherwise just return the stream
// and the promise will take care of itsself.
if (_.isFunction(handler)) {
handler(stream);
return promise;
}
return stream;
2014-05-05 19:48:12 -04:00
});
2014-04-09 10:11:41 -04:00
// Allow you to pipe the stream to a writable stream.
Runner.prototype.pipe = function(writable) {
return this.stream().pipe(writable);
};
// "Runs" a query, returning a promise. All queries specified by the builder are guaranteed
// to run in sequence, and on the same connection, especially helpful when schema building
// and dealing with foreign key constraints, etc.
Runner.prototype.query = Promise.method(function(obj) {
obj.__cid = this.connection.__cid;
this.builder.emit('query', obj);
2014-04-16 01:23:50 -04:00
return this._query(obj).bind(this).then(this.processResponse);
});
// In the case of the "schema builder" we call `queryArray`, which runs each
// of the queries in sequence.
Runner.prototype.queryArray = Promise.method(function(queries) {
2014-04-16 01:23:50 -04:00
return queries.length === 1 ? this.query(queries[0]) : Promise.bind(this)
.thenReturn(queries)
.reduce(function(memo, query) {
return this.query(query).then(function(resp) {
memo.push(resp);
return memo;
});
}, []);
});
// Check whether there's a transaction flag, and that it has a connection.
Runner.prototype.ensureConnection = Promise.method(function() {
if (this.builder._connection) {
return this.builder._connection;
}
return this.client.acquireConnection();
});
2014-04-09 10:11:41 -04:00
// "Debug" the query being run.
Runner.prototype.debug = function(obj) {
2014-04-09 10:11:41 -04:00
console.dir(_.extend({__cid: this.connection.__cid}, obj));
};
2014-04-09 10:11:41 -04:00
// Check whether we're "debugging", based on either calling `debug` on the query.
Runner.prototype.isDebugging = function() {
return (this.client.isDebugging === true || this.builder._debug === true);
};
2014-04-09 10:11:41 -04:00
// Transaction Methods:
// -------
// Run the transaction on the correct "runner" instance.
Runner.prototype.transactionQuery = Promise.method(function() {
var runner = this.builder._transacting._runner;
if (!(runner instanceof Runner)) {
throw new Error('Invalid transaction object provided.');
}
2014-04-16 01:23:50 -04:00
var sql = this.builder.toSQL();
if (_.isArray(sql)) {
return runner.queryArray(sql);
}
return runner.query(sql);
});
// Begins a transaction statement on the instance,
// resolving with the connection of the current transaction.
Runner.prototype.startTransaction = Promise.method(function() {
return Promise.bind(this)
.then(this.ensureConnection)
.then(function(connection) {
this.connection = connection;
this.transaction = true;
return this.query({sql: this._beginTransaction});
}).thenReturn(this);
});
// Finishes the transaction statement and handles disposing of the connection,
// resolving / rejecting the transaction's promise, and ensuring the transaction object's
// `_runner` property is `null`'ed out so it cannot continue to be used.
Runner.prototype.finishTransaction = Promise.method(function(action, containerObject, msg) {
var query, dfd = containerObject.__dfd__;
// Run the query to commit / rollback the transaction.
switch (action) {
case 0:
2014-04-16 01:23:50 -04:00
query = this.commitTransaction();
break;
case 1:
2014-04-16 01:23:50 -04:00
query = this.rollbackTransaction();
break;
}
2014-04-09 10:11:41 -04:00
return query.then(function(resp) {
switch (action) {
case 0:
dfd.fulfill(msg || resp);
break;
case 1:
dfd.reject(msg || resp);
break;
}
2014-04-09 10:11:41 -04:00
// If there was a problem committing the transaction,
// reject the transaction block (to reject the entire transaction block),
// then re-throw the error for any promises chained off the commit.
}).catch(function(e) {
dfd.reject(e);
throw e;
}).bind(this).finally(function() {
// Kill the "_runner" object on the containerObject,
// so it's not possible to continue using the transaction object.
containerObject._runner = void 0;
2014-04-09 10:11:41 -04:00
return this.cleanupConnection();
});
});
2014-04-16 01:23:50 -04:00
Runner.prototype.commitTransaction = function() {
return this.query({sql: this._commitTransaction});
};
Runner.prototype.rollbackTransaction = function() {
return this.query({sql: this._rollbackTransaction});
};
2014-04-09 10:11:41 -04:00
// Cleanup the connection as necessary, if the `_connection` was
// explicitly set on the query we don't need to do anything here,
// otherwise we
Runner.prototype.cleanupConnection = Promise.method(function() {
if (!this.builder._connection) {
return this.client.releaseConnection(this.connection);
}
});
module.exports = Runner;