2017-02-24 10:55:13 -06:00
|
|
|
import { assign, isArray, noop } from 'lodash'
|
2016-08-09 17:23:07 -04:00
|
|
|
import Promise from 'bluebird';
|
2016-05-17 01:01:34 +10:00
|
|
|
import * as helpers from './helpers';
|
2015-05-09 13:58:18 -04:00
|
|
|
|
2016-05-17 01:01:34 +10:00
|
|
|
let PassThrough;
|
2015-05-09 13:58:18 -04:00
|
|
|
|
|
|
|
// The "Runner" constructor takes a "builder" (query, schema, or raw)
|
|
|
|
// and runs through each of the query statements, calling any additional
|
|
|
|
// "output" method provided alongside the query and bindings.
|
|
|
|
function Runner(client, builder) {
|
2016-05-18 19:59:24 +10:00
|
|
|
this.client = client
|
2015-05-09 13:58:18 -04:00
|
|
|
this.builder = builder
|
|
|
|
this.queries = []
|
|
|
|
|
|
|
|
// The "connection" object is set on the runner when
|
|
|
|
// "run" is called.
|
|
|
|
this.connection = void 0
|
|
|
|
}
|
|
|
|
|
|
|
|
assign(Runner.prototype, {
|
|
|
|
|
|
|
|
// "Run" the target, calling "toSQL" on the builder, returning
|
|
|
|
// an object or array of queries to run, each of which are run on
|
|
|
|
// a single connection.
|
2016-05-17 01:01:34 +10:00
|
|
|
run() {
|
|
|
|
const runner = this
|
2015-05-09 13:58:18 -04:00
|
|
|
return Promise.using(this.ensureConnection(), function(connection) {
|
|
|
|
runner.connection = connection;
|
|
|
|
|
|
|
|
runner.client.emit('start', runner.builder)
|
|
|
|
runner.builder.emit('start', runner.builder)
|
2016-05-17 01:01:34 +10:00
|
|
|
const sql = runner.builder.toSQL();
|
2015-05-09 13:58:18 -04:00
|
|
|
|
|
|
|
if (runner.builder._debug) {
|
2016-05-17 01:01:34 +10:00
|
|
|
helpers.debugLog(sql)
|
2015-05-09 13:58:18 -04:00
|
|
|
}
|
|
|
|
|
2016-03-02 16:52:32 +01:00
|
|
|
if (isArray(sql)) {
|
2015-05-09 13:58:18 -04:00
|
|
|
return runner.queryArray(sql);
|
|
|
|
}
|
|
|
|
return runner.query(sql);
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
// If there are any "error" listeners, we fire an error event
|
|
|
|
// and then re-throw the error to be eventually handled by
|
|
|
|
// the promise chain. Useful if you're wrapping in a custom `Promise`.
|
|
|
|
.catch(function(err) {
|
|
|
|
if (runner.builder._events && runner.builder._events.error) {
|
|
|
|
runner.builder.emit('error', err);
|
|
|
|
}
|
|
|
|
throw err;
|
|
|
|
})
|
|
|
|
|
|
|
|
// Fire a single "end" event on the builder when
|
|
|
|
// all queries have successfully completed.
|
|
|
|
.tap(function() {
|
|
|
|
runner.builder.emit('end');
|
|
|
|
})
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
// Stream the result set, by passing through to the dialect's streaming
|
|
|
|
// capabilities. If the options are
|
2016-05-17 01:01:34 +10:00
|
|
|
stream(options, handler) {
|
2016-03-08 08:41:13 +01:00
|
|
|
|
2015-05-09 13:58:18 -04:00
|
|
|
// If we specify stream(handler).then(...
|
|
|
|
if (arguments.length === 1) {
|
|
|
|
if (typeof options === 'function') {
|
|
|
|
handler = options;
|
|
|
|
options = {};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Determines whether we emit an error or throw here.
|
2016-05-17 01:01:34 +10:00
|
|
|
const hasHandler = typeof handler === 'function';
|
2015-05-09 13:58:18 -04:00
|
|
|
|
|
|
|
// Lazy-load the "PassThrough" dependency.
|
|
|
|
PassThrough = PassThrough || require('readable-stream').PassThrough;
|
2016-03-08 08:41:13 +01:00
|
|
|
|
2016-05-17 01:01:34 +10:00
|
|
|
const runner = this;
|
2016-05-18 19:59:24 +10:00
|
|
|
const stream = new PassThrough({objectMode: true});
|
2016-05-17 01:01:34 +10:00
|
|
|
const promise = Promise.using(this.ensureConnection(), function(connection) {
|
2015-05-09 13:58:18 -04:00
|
|
|
runner.connection = connection;
|
2016-05-17 01:01:34 +10:00
|
|
|
const sql = runner.builder.toSQL()
|
|
|
|
const err = new Error('The stream may only be used with a single query statement.');
|
2016-03-02 16:52:32 +01:00
|
|
|
if (isArray(sql)) {
|
2015-05-09 13:58:18 -04:00
|
|
|
if (hasHandler) throw err;
|
|
|
|
stream.emit('error', err);
|
|
|
|
}
|
|
|
|
return runner.client.stream(runner.connection, sql, stream, options);
|
|
|
|
})
|
|
|
|
|
|
|
|
// If a function is passed to handle the stream, send the stream
|
|
|
|
// there and return the promise, otherwise just return the stream
|
|
|
|
// and the promise will take care of itsself.
|
|
|
|
if (hasHandler) {
|
|
|
|
handler(stream);
|
|
|
|
return promise;
|
|
|
|
}
|
2017-02-24 10:55:13 -06:00
|
|
|
|
|
|
|
// This promise is unreachable since no handler was given, so noop any
|
|
|
|
// exceptions. Errors should be handled in the stream's 'error' event.
|
|
|
|
promise.catch(noop);
|
2015-05-09 13:58:18 -04:00
|
|
|
return stream;
|
|
|
|
},
|
|
|
|
|
|
|
|
// Allow you to pipe the stream to a writable stream.
|
2016-05-17 01:01:34 +10:00
|
|
|
pipe(writable, options) {
|
2015-06-01 15:45:02 -04:00
|
|
|
return this.stream(options).pipe(writable);
|
2015-05-09 13:58:18 -04:00
|
|
|
},
|
|
|
|
|
|
|
|
// "Runs" a query, returning a promise. All queries specified by the builder are guaranteed
|
|
|
|
// to run in sequence, and on the same connection, especially helpful when schema building
|
|
|
|
// and dealing with foreign key constraints, etc.
|
|
|
|
query: Promise.method(function(obj) {
|
|
|
|
this.builder.emit('query', assign({__knexUid: this.connection.__knexUid}, obj))
|
2016-05-17 01:01:34 +10:00
|
|
|
const runner = this
|
|
|
|
let queryPromise = this.client.query(this.connection, obj)
|
2016-02-15 17:06:08 +01:00
|
|
|
|
|
|
|
if(obj.timeout) {
|
|
|
|
queryPromise = queryPromise.timeout(obj.timeout)
|
|
|
|
}
|
|
|
|
|
|
|
|
return queryPromise
|
2016-02-26 19:51:35 +01:00
|
|
|
.then((resp) => {
|
2016-05-17 01:01:34 +10:00
|
|
|
const processedResponse = this.client.processResponse(resp, runner);
|
|
|
|
this.builder.emit(
|
|
|
|
'query-response',
|
|
|
|
processedResponse,
|
|
|
|
assign({__knexUid: this.connection.__knexUid}, obj),
|
|
|
|
this.builder
|
|
|
|
);
|
|
|
|
this.client.emit(
|
|
|
|
'query-response',
|
|
|
|
processedResponse,
|
|
|
|
assign({__knexUid: this.connection.__knexUid}, obj),
|
|
|
|
this.builder
|
|
|
|
);
|
2016-02-26 19:51:35 +01:00
|
|
|
return processedResponse;
|
2016-02-15 17:06:08 +01:00
|
|
|
}).catch(Promise.TimeoutError, error => {
|
2016-05-17 01:01:34 +10:00
|
|
|
const { timeout, sql, bindings } = obj;
|
2016-05-26 11:06:33 -07:00
|
|
|
|
|
|
|
let cancelQuery;
|
|
|
|
if (obj.cancelOnTimeout) {
|
|
|
|
cancelQuery = this.client.cancelQuery(this.connection);
|
|
|
|
} else {
|
|
|
|
cancelQuery = Promise.resolve();
|
|
|
|
}
|
|
|
|
|
|
|
|
return cancelQuery
|
2016-05-26 16:56:15 -07:00
|
|
|
.catch((cancelError) => {
|
|
|
|
// cancellation failed
|
|
|
|
throw assign(cancelError, {
|
|
|
|
message: `After query timeout of ${timeout}ms exceeded, cancelling of query failed.`,
|
|
|
|
sql, bindings, timeout
|
|
|
|
});
|
|
|
|
})
|
2016-05-26 11:06:33 -07:00
|
|
|
.then(() => {
|
2016-05-26 16:56:15 -07:00
|
|
|
// cancellation succeeded, rethrow timeout error
|
2016-05-26 11:06:33 -07:00
|
|
|
throw assign(error, {
|
|
|
|
message: `Defined query timeout of ${timeout}ms exceeded when running query.`,
|
|
|
|
sql, bindings, timeout
|
|
|
|
});
|
|
|
|
});
|
2016-02-15 17:06:08 +01:00
|
|
|
})
|
2016-03-24 18:59:37 +01:00
|
|
|
.catch((error) => {
|
|
|
|
this.builder.emit('query-error', error, assign({__knexUid: this.connection.__knexUid}, obj))
|
|
|
|
throw error;
|
|
|
|
});
|
2015-05-09 13:58:18 -04:00
|
|
|
}),
|
|
|
|
|
|
|
|
// In the case of the "schema builder" we call `queryArray`, which runs each
|
|
|
|
// of the queries in sequence.
|
2016-05-17 01:01:34 +10:00
|
|
|
queryArray(queries) {
|
2015-05-09 13:58:18 -04:00
|
|
|
return queries.length === 1 ? this.query(queries[0]) : Promise.bind(this)
|
|
|
|
.return(queries)
|
|
|
|
.reduce(function(memo, query) {
|
|
|
|
return this.query(query).then(function(resp) {
|
|
|
|
memo.push(resp)
|
|
|
|
return memo;
|
|
|
|
});
|
|
|
|
}, [])
|
|
|
|
},
|
|
|
|
|
|
|
|
// Check whether there's a transaction flag, and that it has a connection.
|
2016-05-17 01:01:34 +10:00
|
|
|
ensureConnection() {
|
2016-05-18 21:30:59 +10:00
|
|
|
return Promise.try(() => {
|
2016-10-09 14:00:55 -04:00
|
|
|
return this.connection || new Promise((resolver, rejecter) => {
|
2016-10-19 10:30:08 -05:00
|
|
|
// need to return promise or null from handler to prevent warning from bluebird
|
|
|
|
return this.client.acquireConnection()
|
2016-05-18 21:30:59 +10:00
|
|
|
.then(resolver)
|
|
|
|
.catch(Promise.TimeoutError, (error) => {
|
2016-10-09 14:00:55 -04:00
|
|
|
if (this.builder) {
|
|
|
|
error.sql = this.builder.sql;
|
|
|
|
error.bindings = this.builder.bindings;
|
2016-05-18 21:30:59 +10:00
|
|
|
}
|
2016-10-09 14:00:55 -04:00
|
|
|
throw error
|
2016-02-01 20:40:06 +01:00
|
|
|
})
|
2016-05-18 21:30:59 +10:00
|
|
|
.catch(rejecter)
|
2016-05-17 01:01:34 +10:00
|
|
|
})
|
2016-10-09 14:00:55 -04:00
|
|
|
}).disposer(() => {
|
2016-10-19 10:30:08 -05:00
|
|
|
// need to return promise or null from handler to prevent warning from bluebird
|
|
|
|
return this.client.releaseConnection(this.connection)
|
2015-05-09 13:58:18 -04:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
})
|
|
|
|
|
2016-05-17 01:01:34 +10:00
|
|
|
export default Runner;
|