mirror of
https://github.com/knex/knex.git
synced 2026-01-06 03:57:29 +00:00
Split and clean-up runner (#4177)
This commit is contained in:
parent
5f34fc170a
commit
5d3801cf51
@ -12,8 +12,6 @@ matrix:
|
||||
include:
|
||||
- node_js: '14'
|
||||
env: DB="oracledb" KNEX_TEST_TIMEOUT=60000
|
||||
- node_js: '12'
|
||||
env: DB="oracledb" KNEX_TEST_TIMEOUT=60000
|
||||
- node_js: '10'
|
||||
env: DB="oracledb" KNEX_TEST_TIMEOUT=60000
|
||||
install:
|
||||
|
||||
41
lib/internal/ensure-connection-callback.js
Normal file
41
lib/internal/ensure-connection-callback.js
Normal file
@ -0,0 +1,41 @@
|
||||
function ensureConnectionCallback(runner) {
|
||||
runner.client.emit('start', runner.builder);
|
||||
runner.builder.emit('start', runner.builder);
|
||||
const sql = runner.builder.toSQL();
|
||||
|
||||
if (runner.builder._debug) {
|
||||
runner.client.logger.debug(sql);
|
||||
}
|
||||
|
||||
if (Array.isArray(sql)) {
|
||||
return runner.queryArray(sql);
|
||||
}
|
||||
return runner.query(sql);
|
||||
}
|
||||
|
||||
function ensureConnectionStreamCallback(runner, params) {
|
||||
try {
|
||||
const sql = runner.builder.toSQL();
|
||||
|
||||
if (Array.isArray(sql) && params.hasHandler) {
|
||||
throw new Error(
|
||||
'The stream may only be used with a single query statement.'
|
||||
);
|
||||
}
|
||||
|
||||
return runner.client.stream(
|
||||
runner.connection,
|
||||
sql,
|
||||
params.stream,
|
||||
params.options
|
||||
);
|
||||
} catch (e) {
|
||||
params.stream.emit('error', e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
ensureConnectionCallback,
|
||||
ensureConnectionStreamCallback,
|
||||
};
|
||||
192
lib/runner.js
192
lib/runner.js
@ -1,74 +1,58 @@
|
||||
const { KnexTimeoutError } = require('./util/timeout');
|
||||
const { timeout } = require('./util/timeout');
|
||||
const {
|
||||
ensureConnectionCallback,
|
||||
ensureConnectionStreamCallback,
|
||||
} = require('./internal/ensure-connection-callback');
|
||||
|
||||
let Transform;
|
||||
|
||||
// The "Runner" constructor takes a "builder" (query, schema, or raw)
|
||||
// and runs through each of the query statements, calling any additional
|
||||
// "output" method provided alongside the query and bindings.
|
||||
function Runner(client, builder) {
|
||||
this.client = client;
|
||||
this.builder = builder;
|
||||
this.queries = [];
|
||||
class Runner {
|
||||
constructor(client, builder) {
|
||||
this.client = client;
|
||||
this.builder = builder;
|
||||
this.queries = [];
|
||||
|
||||
// The "connection" object is set on the runner when
|
||||
// "run" is called.
|
||||
this.connection = void 0;
|
||||
}
|
||||
// The "connection" object is set on the runner when
|
||||
// "run" is called.
|
||||
this.connection = undefined;
|
||||
}
|
||||
|
||||
Object.assign(Runner.prototype, {
|
||||
// "Run" the target, calling "toSQL" on the builder, returning
|
||||
// an object or array of queries to run, each of which are run on
|
||||
// a single connection.
|
||||
run() {
|
||||
async run() {
|
||||
const runner = this;
|
||||
return (
|
||||
this.ensureConnection(function (connection) {
|
||||
runner.connection = connection;
|
||||
try {
|
||||
const res = await this.ensureConnection(ensureConnectionCallback);
|
||||
|
||||
runner.client.emit('start', runner.builder);
|
||||
runner.builder.emit('start', runner.builder);
|
||||
const sql = runner.builder.toSQL();
|
||||
// Fire a single "end" event on the builder when
|
||||
// all queries have successfully completed.
|
||||
runner.builder.emit('end');
|
||||
return res;
|
||||
|
||||
if (runner.builder._debug) {
|
||||
runner.client.logger.debug(sql);
|
||||
}
|
||||
|
||||
if (Array.isArray(sql)) {
|
||||
return runner.queryArray(sql);
|
||||
}
|
||||
return runner.query(sql);
|
||||
})
|
||||
|
||||
// If there are any "error" listeners, we fire an error event
|
||||
// and then re-throw the error to be eventually handled by
|
||||
// the promise chain. Useful if you're wrapping in a custom `Promise`.
|
||||
.catch(function (err) {
|
||||
if (runner.builder._events && runner.builder._events.error) {
|
||||
runner.builder.emit('error', err);
|
||||
}
|
||||
throw err;
|
||||
})
|
||||
|
||||
// Fire a single "end" event on the builder when
|
||||
// all queries have successfully completed.
|
||||
.then(function (res) {
|
||||
runner.builder.emit('end');
|
||||
return res;
|
||||
})
|
||||
);
|
||||
},
|
||||
// If there are any "error" listeners, we fire an error event
|
||||
// and then re-throw the error to be eventually handled by
|
||||
// the promise chain. Useful if you're wrapping in a custom `Promise`.
|
||||
} catch (err) {
|
||||
if (runner.builder._events && runner.builder._events.error) {
|
||||
runner.builder.emit('error', err);
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
// Stream the result set, by passing through to the dialect's streaming
|
||||
// capabilities. If the options are
|
||||
stream(options, handler) {
|
||||
// If we specify stream(handler).then(...
|
||||
if (arguments.length === 1) {
|
||||
if (typeof options === 'function') {
|
||||
handler = options;
|
||||
options = {};
|
||||
}
|
||||
}
|
||||
stream(optionsOrHandler, handlerOrNil) {
|
||||
const firstOptionIsHandler =
|
||||
typeof optionsOrHandler === 'function' && arguments.length === 1;
|
||||
|
||||
const options = firstOptionIsHandler ? {} : optionsOrHandler;
|
||||
const handler = firstOptionIsHandler ? optionsOrHandler : handlerOrNil;
|
||||
|
||||
// Determines whether we emit an error or throw here.
|
||||
const hasHandler = typeof handler === 'function';
|
||||
@ -76,7 +60,6 @@ Object.assign(Runner.prototype, {
|
||||
// Lazy-load the "Transform" dependency.
|
||||
Transform = Transform || require('stream').Transform;
|
||||
|
||||
const runner = this;
|
||||
const queryContext = this.builder.queryContext();
|
||||
let queryStream;
|
||||
|
||||
@ -96,53 +79,43 @@ Object.assign(Runner.prototype, {
|
||||
queryStream = qs;
|
||||
});
|
||||
|
||||
let hasConnection = false;
|
||||
const promise = this.ensureConnection(function (connection) {
|
||||
hasConnection = true;
|
||||
runner.connection = connection;
|
||||
try {
|
||||
const sql = runner.builder.toSQL();
|
||||
|
||||
if (Array.isArray(sql) && hasHandler) {
|
||||
throw new Error(
|
||||
'The stream may only be used with a single query statement.'
|
||||
);
|
||||
}
|
||||
|
||||
return runner.client.stream(runner.connection, sql, stream, options);
|
||||
} catch (e) {
|
||||
stream.emit('error', e);
|
||||
throw e;
|
||||
const connectionAcquirePromise = this.ensureConnection(
|
||||
ensureConnectionStreamCallback,
|
||||
{
|
||||
options,
|
||||
hasHandler,
|
||||
stream,
|
||||
}
|
||||
});
|
||||
)
|
||||
// Emit errors on the stream if the error occurred before a connection
|
||||
// could be acquired.
|
||||
// If the connection was acquired, assume the error occurred in the client
|
||||
// code and has already been emitted on the stream. Don't emit it twice.
|
||||
.catch((err) => {
|
||||
if (!this.connection) {
|
||||
stream.emit('error', err);
|
||||
}
|
||||
});
|
||||
|
||||
// If a function is passed to handle the stream, send the stream
|
||||
// there and return the promise, otherwise just return the stream
|
||||
// and the promise will take care of itself.
|
||||
if (hasHandler) {
|
||||
handler(stream);
|
||||
return promise;
|
||||
return connectionAcquirePromise;
|
||||
}
|
||||
|
||||
// Emit errors on the stream if the error occurred before a connection
|
||||
// could be acquired.
|
||||
// If the connection was acquired, assume the error occurred in the client
|
||||
// code and has already been emitted on the stream. Don't emit it twice.
|
||||
promise.catch(function (err) {
|
||||
if (!hasConnection) stream.emit('error', err);
|
||||
});
|
||||
return stream;
|
||||
},
|
||||
}
|
||||
|
||||
// Allow you to pipe the stream to a writable stream.
|
||||
pipe(writable, options) {
|
||||
return this.stream(options).pipe(writable);
|
||||
},
|
||||
}
|
||||
|
||||
// "Runs" a query, returning a promise. All queries specified by the builder are guaranteed
|
||||
// to run in sequence, and on the same connection, especially helpful when schema building
|
||||
// and dealing with foreign key constraints, etc.
|
||||
query: async function (obj) {
|
||||
async query(obj) {
|
||||
const { __knexUid, __knexTxId } = this.connection;
|
||||
|
||||
this.builder.emit('query', Object.assign({ __knexUid, __knexTxId }, obj));
|
||||
@ -234,7 +207,7 @@ Object.assign(Runner.prototype, {
|
||||
);
|
||||
throw error;
|
||||
});
|
||||
},
|
||||
}
|
||||
|
||||
// In the case of the "schema builder" we call `queryArray`, which runs each
|
||||
// of the queries in sequence.
|
||||
@ -248,38 +221,39 @@ Object.assign(Runner.prototype, {
|
||||
results.push(await this.query(query));
|
||||
}
|
||||
return results;
|
||||
},
|
||||
}
|
||||
|
||||
// Check whether there's a transaction flag, and that it has a connection.
|
||||
async ensureConnection(cb) {
|
||||
async ensureConnection(cb, cbParams) {
|
||||
// Use override from a builder if passed
|
||||
if (this.builder._connection) {
|
||||
return cb(this.builder._connection);
|
||||
this.connection = this.builder._connection;
|
||||
}
|
||||
|
||||
if (this.connection) {
|
||||
return cb(this.connection);
|
||||
return cb(this, cbParams);
|
||||
}
|
||||
return this.client
|
||||
.acquireConnection()
|
||||
.catch((error) => {
|
||||
if (!(error instanceof KnexTimeoutError)) {
|
||||
return Promise.reject(error);
|
||||
}
|
||||
if (this.builder) {
|
||||
error.sql = this.builder.sql;
|
||||
error.bindings = this.builder.bindings;
|
||||
}
|
||||
throw error;
|
||||
})
|
||||
.then(async (connection) => {
|
||||
try {
|
||||
return await cb(connection);
|
||||
} finally {
|
||||
await this.client.releaseConnection(this.connection);
|
||||
}
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
let acquiredConnection;
|
||||
try {
|
||||
acquiredConnection = await this.client.acquireConnection();
|
||||
} catch (error) {
|
||||
if (!(error instanceof KnexTimeoutError)) {
|
||||
return Promise.reject(error);
|
||||
}
|
||||
if (this.builder) {
|
||||
error.sql = this.builder.sql;
|
||||
error.bindings = this.builder.bindings;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
try {
|
||||
this.connection = acquiredConnection;
|
||||
return await cb(this, cbParams);
|
||||
} finally {
|
||||
await this.client.releaseConnection(acquiredConnection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Runner;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user