knex/src/client.js

275 lines
6.8 KiB
JavaScript
Raw Normal View History

2016-03-02 17:07:05 +01:00
import Promise from './promise';
import * as helpers from './helpers';
2016-03-02 17:07:05 +01:00
import Raw from './raw';
import Runner from './runner';
import Formatter from './formatter';
import Transaction from './transaction';
2016-03-02 17:07:05 +01:00
import QueryBuilder from './query/builder';
import QueryCompiler from './query/compiler';
2016-03-02 17:07:05 +01:00
import SchemaBuilder from './schema/builder';
import SchemaCompiler from './schema/compiler';
import TableBuilder from './schema/tablebuilder';
import TableCompiler from './schema/tablecompiler';
import ColumnBuilder from './schema/columnbuilder';
import ColumnCompiler from './schema/columncompiler';
2016-03-02 17:07:05 +01:00
import Pool2 from 'pool2';
import inherits from 'inherits';
import { EventEmitter } from 'events';
import SqlString from './query/string';
2016-03-02 17:07:05 +01:00
import { assign, uniqueId, cloneDeep } from 'lodash'
2016-03-02 17:07:05 +01:00
const debug = require('debug')('knex:client')
const debugQuery = require('debug')('knex:query')
2016-03-02 17:07:05 +01:00
// The base client provides the general structure
// for a dialect specific client object.
function Client(config = {}) {
this.config = config
this.connectionSettings = cloneDeep(config.connection || {})
if (this.driverName && config.connection) {
this.initializeDriver()
if (!config.pool || (config.pool && config.pool.max !== 0)) {
this.initializePool(config)
}
}
this.valueForUndefined = this.raw('DEFAULT');
if (config.useNullAsDefault) {
this.valueForUndefined = null
}
}
inherits(Client, EventEmitter)
assign(Client.prototype, {
Formatter,
2016-03-02 17:07:05 +01:00
formatter() {
2016-03-02 17:07:05 +01:00
return new this.Formatter(this)
},
QueryBuilder,
2016-03-02 17:07:05 +01:00
queryBuilder() {
2016-03-02 17:07:05 +01:00
return new this.QueryBuilder(this)
},
QueryCompiler,
2016-03-02 17:07:05 +01:00
queryCompiler(builder) {
2016-03-02 17:07:05 +01:00
return new this.QueryCompiler(this, builder)
},
SchemaBuilder,
2016-03-02 17:07:05 +01:00
schemaBuilder() {
2016-03-02 17:07:05 +01:00
return new this.SchemaBuilder(this)
},
SchemaCompiler,
2016-03-02 17:07:05 +01:00
schemaCompiler(builder) {
2016-03-02 17:07:05 +01:00
return new this.SchemaCompiler(this, builder)
},
TableBuilder,
2016-03-02 17:07:05 +01:00
tableBuilder(type, tableName, fn) {
2016-03-02 17:07:05 +01:00
return new this.TableBuilder(this, type, tableName, fn)
},
TableCompiler,
2016-03-02 17:07:05 +01:00
tableCompiler(tableBuilder) {
2016-03-02 17:07:05 +01:00
return new this.TableCompiler(this, tableBuilder)
},
ColumnBuilder,
2016-03-02 17:07:05 +01:00
columnBuilder(tableBuilder, type, args) {
2016-03-02 17:07:05 +01:00
return new this.ColumnBuilder(this, tableBuilder, type, args)
},
ColumnCompiler,
2016-03-02 17:07:05 +01:00
columnCompiler(tableBuilder, columnBuilder) {
2016-03-02 17:07:05 +01:00
return new this.ColumnCompiler(this, tableBuilder, columnBuilder)
},
Runner,
2016-03-02 17:07:05 +01:00
runner(connection) {
2016-03-02 17:07:05 +01:00
return new this.Runner(this, connection)
},
SqlString,
2016-03-02 17:07:05 +01:00
Transaction,
2016-03-02 17:07:05 +01:00
transaction(container, config, outerTx) {
2016-03-02 17:07:05 +01:00
return new this.Transaction(this, container, config, outerTx)
},
Raw,
2016-03-02 17:07:05 +01:00
raw() {
const raw = new this.Raw(this)
2016-03-02 17:07:05 +01:00
return raw.set.apply(raw, arguments)
},
query(connection, obj) {
2016-03-02 17:07:05 +01:00
if (typeof obj === 'string') obj = {sql: obj}
this.emit('query', assign({__knexUid: connection.__knexUid}, obj))
debugQuery(obj.sql)
return this._query.call(this, connection, obj).catch((err) => {
err.message = SqlString.format(obj.sql, obj.bindings) + ' - ' + err.message
this.emit('query-error', err, assign({__knexUid: connection.__knexUid}, obj))
2016-03-02 17:07:05 +01:00
throw err
})
},
stream(connection, obj, stream, options) {
2016-03-02 17:07:05 +01:00
if (typeof obj === 'string') obj = {sql: obj}
this.emit('query', assign({__knexUid: connection.__knexUid}, obj))
debugQuery(obj.sql)
return this._stream.call(this, connection, obj, stream, options)
},
prepBindings(bindings) {
return bindings;
2016-03-02 17:07:05 +01:00
},
wrapIdentifier(value) {
return (value !== '*' ? `"${value.replace(/"/g, '""')}"` : '*')
2016-03-02 17:07:05 +01:00
},
initializeDriver() {
2016-03-02 17:07:05 +01:00
try {
this.driver = this._driver()
} catch (e) {
helpers.exit(`Knex: run\n$ npm install ${this.driverName} --save\n${e.stack}`)
2016-03-02 17:07:05 +01:00
}
},
Pool: Pool2,
initializePool(config) {
2016-03-02 17:07:05 +01:00
if (this.pool) this.destroy()
this.pool = new this.Pool(assign(this.poolDefaults(config.pool || {}), config.pool))
this.pool.on('error', function(err) {
helpers.error(`Pool2 - ${err}`)
2016-03-02 17:07:05 +01:00
})
this.pool.on('warn', function(msg) {
helpers.warn(`Pool2 - ${msg}`)
2016-03-02 17:07:05 +01:00
})
},
poolDefaults(poolConfig) {
const client = this
2016-03-02 17:07:05 +01:00
return {
min: 2,
max: 10,
acquire(callback) {
2016-03-02 17:07:05 +01:00
client.acquireRawConnection()
.tap(function(connection) {
connection.__knexUid = uniqueId('__knexUid')
if (poolConfig.afterCreate) {
return Promise.promisify(poolConfig.afterCreate)(connection)
}
})
.asCallback(callback)
2016-03-02 17:07:05 +01:00
},
dispose(connection, callback) {
2016-03-02 17:07:05 +01:00
if (poolConfig.beforeDestroy) {
poolConfig.beforeDestroy(connection, function() {
if (connection !== undefined) {
client.destroyRawConnection(connection, callback)
}
})
} else if (connection !== void 0) {
client.destroyRawConnection(connection, callback)
}
},
ping(resource, callback) {
return client.ping(resource, callback);
2016-03-02 17:07:05 +01:00
}
}
},
// Acquire a connection from the pool.
acquireConnection() {
const client = this
let request = null
const completed = new Promise(function(resolver, rejecter) {
2016-03-02 17:07:05 +01:00
if (!client.pool) {
return rejecter(new Error('There is no pool defined on the current client'))
}
request = client.pool.acquire(function(err, connection) {
2016-03-02 17:07:05 +01:00
if (err) return rejecter(err)
debug('acquired connection from pool: %s', connection.__knexUid)
2016-03-02 17:07:05 +01:00
resolver(connection)
})
})
const abort = function(reason) {
if (request && !request.fulfilled) {
request.abort(reason)
}
}
return {
completed: completed,
abort: abort
}
2016-03-02 17:07:05 +01:00
},
// Releases a connection back to the connection pool,
// returning a promise resolved when the connection is released.
releaseConnection(connection) {
const { pool } = this
2016-03-02 17:07:05 +01:00
return new Promise(function(resolver) {
debug('releasing connection to pool: %s', connection.__knexUid)
pool.release(connection)
resolver()
})
},
// Destroy the current connection pool for the client.
destroy(callback) {
const client = this
const promise = new Promise(function(resolver) {
2016-03-02 17:07:05 +01:00
if (!client.pool) return resolver()
client.pool.end(function() {
client.pool = undefined
resolver()
})
})
// Allow either a callback or promise interface for destruction.
if (typeof callback === 'function') {
promise.asCallback(callback)
2016-03-02 17:07:05 +01:00
} else {
return promise
}
},
// Return the database being used by this client.
database() {
2016-03-02 17:07:05 +01:00
return this.connectionSettings.database
},
toString() {
2016-03-02 17:07:05 +01:00
return '[object KnexClient]'
2016-05-26 11:06:33 -07:00
},
canCancelQuery: false,
cancelQuery() {
throw new Error("Query cancelling not supported for this dialect")
2016-03-02 17:07:05 +01:00
}
})
export default Client