var _ = require('lodash'); var Promise = require('./promise'); // The "Runner" constructor takes a "builder" (query, schema, or raw) // and runs through each of the query statements, calling any additional // "output" method provided alongside the query and bindings. function Runner(builder) { this.builder = builder; this.queries = []; // The "connection" object is set on the runner when // "run" is called. this.connection = void 0; } Runner.prototype._beginTransaction = 'begin;'; Runner.prototype._commitTransaction = 'commit;'; Runner.prototype._rollbackTransaction = 'rollback;'; // "Run" the target, calling "toSQL" on the builder, returning // an object or array of queries to run, each of which are run on // a single connection. Runner.prototype.run = Promise.method(function() { if (this.builder._transacting) { return this.transactionQuery(); } return Promise.bind(this) .then(this.ensureConnection) .then(function(connection) { this.connection = connection; var sql = this.builder.toSQL(); if (_.isArray(sql)) { return this.queryArray(sql); } return this.query(sql); }) .finally(this.cleanupConnection); }); // Stream the result set, by passing through to the dialect's streaming // capabilities. If the options are var PassThrough; Runner.prototype.stream = Promise.method(function(options, handler) { // If we specify stream(handler).then(... if (arguments.length === 1) { if (_.isFunction(options)) { handler = options; options = {}; } } return Promise.bind(this) .then(this.ensureConnection) .then(function(connection) { this.connection = connection; var sql = this.builder.toSQL(); var err = new Error('The stream may only be used with a single query statement.'); if (_.isArray(sql)) { stream.emit('error', err); throw err; } return sql; }).then(function(sql) { // Lazy-load the "PassThrough" dependency. PassThrough = PassThrough || require('stream').PassThrough; var stream = new PassThrough({objectMode: true}); // If a function is passed to handle the stream, send the stream // there and return the promise, otherwise just return the stream // and the promise will take care of itsself. if (_.isFunction(handler)) { handler(stream); return this._stream(sql, stream, options); } this._stream(sql, stream, options); return stream; }).finally(this.cleanupConnection); }); // "Runs" a query, returning a promise. All queries specified by the builder are guaranteed // to run in sequence, and on the same connection, especially helpful when schema building // and dealing with foreign key constraints, etc. Runner.prototype.query = Promise.method(function(obj) { this.builder.emit('query', obj); return this._query(obj).bind(this).then(this.processResponse); }); // In the case of the "schema builder" we call `queryArray`, which runs each // of the queries in sequence. Runner.prototype.queryArray = Promise.method(function(queries) { return queries.length === 1 ? this.query(queries[0]) : Promise.bind(this) .thenReturn(queries) .reduce(function(memo, query) { return this.query(query).then(function(resp) { memo.push(resp); return memo; }); }, []); }); // Check whether there's a transaction flag, and that it has a connection. Runner.prototype.ensureConnection = Promise.method(function() { if (this.builder._connection) { return this.builder._connection; } return this.client.acquireConnection(); }); // "Debug" the query being run. Runner.prototype.debug = function(obj) { console.dir(_.extend({__cid: this.connection.__cid}, obj)); }; // Check whether we're "debugging", based on either calling `debug` on the query. Runner.prototype.isDebugging = function() { return (this.client.isDebugging === true || this.builder._debug === true); }; // Transaction Methods: // ------- // Run the transaction on the correct "runner" instance. Runner.prototype.transactionQuery = Promise.method(function() { var runner = this.builder._transacting._runner; if (!(runner instanceof Runner)) { throw new Error('Invalid transaction object provided.'); } var sql = this.builder.toSQL(); if (_.isArray(sql)) { return runner.queryArray(sql); } return runner.query(sql); }); // Begins a transaction statement on the instance, // resolving with the connection of the current transaction. Runner.prototype.startTransaction = function() { return Promise.bind(this) .then(this.ensureConnection) .then(function(connection) { this.connection = connection; this.transaction = true; return this.query({sql: this._beginTransaction}); }).thenReturn(this); }; // Finishes the transaction statement and handles disposing of the connection, // resolving / rejecting the transaction's promise, and ensuring the transaction object's // `_runner` property is `null`'ed out so it cannot continue to be used. Runner.prototype.finishTransaction = Promise.method(function(action, containerObject, msg) { var query, dfd = containerObject._dfd; // Run the query to commit / rollback the transaction. switch (action) { case 0: query = this.commitTransaction(); break; case 1: query = this.rollbackTransaction(); break; } return query.then(function(resp) { switch (action) { case 0: dfd.fulfill(msg || resp); break; case 1: dfd.reject(msg || resp); break; } // If there was a problem committing the transaction, // reject the transaction block (to reject the entire transaction block), // then re-throw the error for any promises chained off the commit. }).catch(function(e) { dfd.reject(e); throw e; }).bind(this).finally(function() { // Kill the "_runner" object on the containerObject, // so it's not possible to continue using the transaction object. containerObject._runner = void 0; return this.cleanupConnection(); }); }); Runner.prototype.commitTransaction = function() { return this.query({sql: this._commitTransaction}); }; Runner.prototype.rollbackTransaction = function() { return this.query({sql: this._rollbackTransaction}); }; // Cleanup the connection as necessary, if the `_connection` was // explicitly set on the query we don't need to do anything here, // otherwise we Runner.prototype.cleanupConnection = Promise.method(function() { if (!this.builder._connection) { return this.client.releaseConnection(this.connection); } }); module.exports = Runner;