knex/src/dialects/mysql/index.js

156 lines
4.2 KiB
JavaScript
Raw Normal View History

2016-03-02 17:07:05 +01:00
// MySQL Client
// -------
import inherits from 'inherits';
2016-03-02 17:07:05 +01:00
import Client from '../../client';
import Promise from '../../promise';
import * as helpers from '../../helpers';
2016-03-02 17:07:05 +01:00
import Transaction from './transaction';
import QueryCompiler from './query/compiler';
import SchemaCompiler from './schema/compiler';
import TableCompiler from './schema/tablecompiler';
import ColumnCompiler from './schema/columncompiler';
2016-03-02 17:07:05 +01:00
import { assign, map } from 'lodash'
2016-03-02 17:07:05 +01:00
// Always initialize with the "QueryBuilder" and "QueryCompiler"
// objects, which extend the base 'lib/query/builder' and
// 'lib/query/compiler', respectively.
function Client_MySQL(config) {
Client.call(this, config);
}
inherits(Client_MySQL, Client);
assign(Client_MySQL.prototype, {
dialect: 'mysql',
driverName: 'mysql',
_driver() {
2016-03-02 17:07:05 +01:00
return require('mysql')
},
QueryCompiler,
2016-03-02 17:07:05 +01:00
SchemaCompiler,
2016-03-02 17:07:05 +01:00
TableCompiler,
2016-03-02 17:07:05 +01:00
ColumnCompiler,
2016-03-02 17:07:05 +01:00
Transaction,
2016-03-02 17:07:05 +01:00
wrapIdentifier(value) {
return (value !== '*' ? `\`${value.replace(/`/g, '``')}\`` : '*')
2016-03-02 17:07:05 +01:00
},
// Get a raw connection, called by the `pool` whenever a new
// connection needs to be added to the pool.
acquireRawConnection() {
const client = this
const connection = this.driver.createConnection(this.connectionSettings)
2016-03-02 17:07:05 +01:00
return new Promise(function(resolver, rejecter) {
connection.connect(function(err) {
if (err) return rejecter(err)
connection.on('error', client._connectionErrorHandler.bind(null, client, connection))
connection.on('end', client._connectionErrorHandler.bind(null, client, connection))
2016-03-02 17:07:05 +01:00
resolver(connection)
});
});
},
// Used to explicitly close a connection, called internally by the pool
// when a connection times out or the pool is shutdown.
destroyRawConnection(connection, cb) {
2016-03-02 17:07:05 +01:00
connection.end(cb);
},
// Grab a connection, run the query via the MySQL streaming interface,
// and pass that through to the stream we've sent back to the client.
_stream(connection, obj, stream, options) {
2016-03-02 17:07:05 +01:00
options = options || {}
return new Promise(function(resolver, rejecter) {
stream.on('error', rejecter)
stream.on('end', resolver)
connection.query(obj.sql, obj.bindings).stream(options).pipe(stream)
})
},
// Runs the query on the specified connection, providing the bindings
// and any other necessary prep work.
_query(connection, obj) {
2016-03-02 17:07:05 +01:00
if (!obj || typeof obj === 'string') obj = {sql: obj}
return new Promise(function(resolver, rejecter) {
let { sql } = obj
2016-03-02 17:07:05 +01:00
if (!sql) return resolver()
if (obj.options) sql = assign({sql}, obj.options)
2016-03-02 17:07:05 +01:00
connection.query(sql, obj.bindings, function(err, rows, fields) {
if (err) return rejecter(err)
obj.response = [rows, fields]
resolver(obj)
})
})
},
// Process the response as returned from the query.
processResponse(obj, runner) {
2016-03-02 17:07:05 +01:00
if (obj == null) return;
const { response } = obj
const { method } = obj
const rows = response[0]
const fields = response[1]
2016-03-02 17:07:05 +01:00
if (obj.output) return obj.output.call(runner, rows, fields)
switch (method) {
case 'select':
case 'pluck':
case 'first': {
const resp = helpers.skim(rows)
if (method === 'pluck') return map(resp, obj.pluck)
2016-03-02 17:07:05 +01:00
return method === 'first' ? resp[0] : resp
}
2016-03-02 17:07:05 +01:00
case 'insert':
return [rows.insertId]
case 'del':
case 'update':
case 'counter':
return rows.affectedRows
default:
return response
}
},
// MySQL Specific error handler
_connectionErrorHandler: (client, connection, err) => {
if(connection && err && err.fatal && !connection.__knex__disposed) {
connection.__knex__disposed = true;
client.pool.destroy(connection);
}
},
ping(resource, callback) {
resource.query('SELECT 1', callback);
2016-05-26 11:06:33 -07:00
},
canCancelQuery: true,
cancelQuery(connectionToKill) {
return this.acquireConnection().completed
.then((conn) => {
return this.query(conn, {
method: 'raw',
sql: 'KILL ?',
bindings: [connectionToKill.threadId],
options: {},
})
.finally(() => {
this.releaseConnection(conn)
});
});
2016-03-02 17:07:05 +01:00
}
})
export default Client_MySQL