Migrate to classes (#4253)

This commit is contained in:
maximelkin 2021-01-31 13:40:13 +03:00 committed by GitHub
parent 50be910113
commit b9fb5992b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 966 additions and 961 deletions

View File

@ -1,5 +1,8 @@
# Master (Unreleased)
### Test / internal changes:
- Migrate database client code and `Raw` to es6 classes
# 0.21.15 - 26 December, 2020
### New features:

View File

@ -29,6 +29,40 @@ const knexInstance: Knex = knex(config)
* v8 flags are no longer supported in cli. To pass these flags use [`NODE_OPTIONS` environment variable](https://nodejs.org/api/cli.html#cli_node_options_options).
For example `NODE_OPTIONS="--max-old-space-size=1536" npm run knex`
* Clients are now classes instead of new-able functions. Please migrate your custom clients to classes.
```js
const Client = require('knex')
const {inherits} = require('util')
// old
function CustomClient(config) {
Client.call(this, config);
// construction logic
}
inherits(CustomClient, Client);
CustomClient.prototype.methodOverride = function () {
// logic
}
// new
class CustomClient extends Client {
// node 12+
driverName = 'abcd';
constructor(config) {
super(config);
this.driverName = 'abcd'; // bad way, will not work
// construction logic
}
methodOverride() {
// logic
}
}
// alternative to declare driverName
CustomClient.prototype.driverName = 'abcd';
```
### Upgrading to version 0.21.0+
* Node.js older than 10 is no longer supported, make sure to update your environment;

View File

@ -1,6 +1,6 @@
const { Pool, TimeoutError } = require('tarn');
const { EventEmitter } = require('events');
const { promisify, inherits } = require('util');
const { promisify } = require('util');
const { makeEscape } = require('./util/string');
const cloneDeep = require('lodash/cloneDeep');
const defaults = require('lodash/defaults');
@ -33,132 +33,126 @@ const debug = require('debug')('knex:client');
// The base client provides the general structure
// for a dialect specific client object.
function Client(config = {}) {
this.config = config;
this.logger = new Logger(config);
//Client is a required field, so throw error if it's not supplied.
//If 'this.dialect' is set, then this is a 'super()' call, in which case
//'client' does not have to be set as it's already assigned on the client prototype.
class Client extends EventEmitter {
constructor(config = {}) {
super();
this.config = config;
this.logger = new Logger(config);
if (this.dialect && !this.config.client) {
this.logger.warn(
`Using 'this.dialect' to identify the client is deprecated and support for it will be removed in the future. Please use configuration option 'client' instead.`
);
}
const dbClient = this.config.client || this.dialect;
if (!dbClient) {
throw new Error(`knex: Required configuration option 'client' is missing.`);
}
//Client is a required field, so throw error if it's not supplied.
//If 'this.dialect' is set, then this is a 'super()' call, in which case
//'client' does not have to be set as it's already assigned on the client prototype.
if (config.version) {
this.version = config.version;
}
if (this.dialect && !this.config.client) {
this.logger.warn(
`Using 'this.dialect' to identify the client is deprecated and support for it will be removed in the future. Please use configuration option 'client' instead.`
);
}
const dbClient = this.config.client || this.dialect;
if (!dbClient) {
throw new Error(
`knex: Required configuration option 'client' is missing.`
);
}
if (config.connection && config.connection instanceof Function) {
this.connectionConfigProvider = config.connection;
this.connectionConfigExpirationChecker = () => true; // causes the provider to be called on first use
} else {
this.connectionSettings = cloneDeep(config.connection || {});
this.connectionConfigExpirationChecker = null;
}
if (this.driverName && config.connection) {
this.initializeDriver();
if (!config.pool || (config.pool && config.pool.max !== 0)) {
this.initializePool(config);
if (config.version) {
this.version = config.version;
}
if (config.connection && config.connection instanceof Function) {
this.connectionConfigProvider = config.connection;
this.connectionConfigExpirationChecker = () => true; // causes the provider to be called on first use
} else {
this.connectionSettings = cloneDeep(config.connection || {});
this.connectionConfigExpirationChecker = null;
}
if (this.driverName && config.connection) {
this.initializeDriver();
if (!config.pool || (config.pool && config.pool.max !== 0)) {
this.initializePool(config);
}
}
this.valueForUndefined = this.raw('DEFAULT');
if (config.useNullAsDefault) {
this.valueForUndefined = null;
}
}
this.valueForUndefined = this.raw('DEFAULT');
if (config.useNullAsDefault) {
this.valueForUndefined = null;
}
}
inherits(Client, EventEmitter);
Object.assign(Client.prototype, {
formatter(builder) {
return new Formatter(this, builder);
},
}
queryBuilder() {
return new QueryBuilder(this);
},
}
queryCompiler(builder, formatter) {
return new QueryCompiler(this, builder, formatter);
},
}
schemaBuilder() {
return new SchemaBuilder(this);
},
}
schemaCompiler(builder) {
return new SchemaCompiler(this, builder);
},
}
tableBuilder(type, tableName, fn) {
return new TableBuilder(this, type, tableName, fn);
},
}
tableCompiler(tableBuilder) {
return new TableCompiler(this, tableBuilder);
},
}
columnBuilder(tableBuilder, type, args) {
return new ColumnBuilder(this, tableBuilder, type, args);
},
}
columnCompiler(tableBuilder, columnBuilder) {
return new ColumnCompiler(this, tableBuilder, columnBuilder);
},
}
runner(builder) {
return new Runner(this, builder);
},
}
transaction(container, config, outerTx) {
return new Transaction(this, container, config, outerTx);
},
}
raw() {
return new Raw(this).set(...arguments);
},
}
ref() {
return new Ref(this, ...arguments);
},
_escapeBinding: makeEscape({
escapeString(str) {
return `'${str.replace(/'/g, "''")}'`;
},
}),
}
query(connection, queryParam) {
const queryObject = enrichQueryObject(connection, queryParam, this);
return executeQuery(connection, queryObject, this);
},
}
stream(connection, queryParam, stream, options) {
const queryObject = enrichQueryObject(connection, queryParam, this);
return this._stream(connection, queryObject, stream, options);
},
}
prepBindings(bindings) {
return bindings;
},
}
positionBindings(sql) {
return sql;
},
}
postProcessResponse(resp, queryContext) {
if (this.config.postProcessResponse) {
return this.config.postProcessResponse(resp, queryContext);
}
return resp;
},
}
wrapIdentifier(value, queryContext) {
return this.customWrapIdentifier(
@ -166,18 +160,18 @@ Object.assign(Client.prototype, {
this.wrapIdentifierImpl,
queryContext
);
},
}
customWrapIdentifier(value, origImpl, queryContext) {
if (this.config.wrapIdentifier) {
return this.config.wrapIdentifier(value, origImpl, queryContext);
}
return origImpl(value);
},
}
wrapIdentifierImpl(value) {
return value !== '*' ? `"${value.replace(/"/g, '""')}"` : '*';
},
}
initializeDriver() {
try {
@ -187,11 +181,11 @@ Object.assign(Client.prototype, {
this.logger.error(`${message}\n${e.message}\n${e.stack}`);
throw new Error(`${message}\n${e.message}`);
}
},
}
poolDefaults() {
return { min: 2, max: 10, propagateCreateError: true };
},
}
getPoolSettings(poolConfig) {
poolConfig = defaults({}, poolConfig, this.poolDefaults());
@ -263,7 +257,7 @@ Object.assign(Client.prototype, {
return this.validateConnection(connection);
},
});
},
}
initializePool(config = this.config) {
if (this.pool) {
@ -280,11 +274,11 @@ Object.assign(Client.prototype, {
}
this.pool = new Pool(tarnPoolConfig);
},
}
validateConnection(connection) {
return true;
},
}
// Acquire a connection from the pool.
async acquireConnection() {
@ -305,7 +299,7 @@ Object.assign(Client.prototype, {
}
throw convertedError;
}
},
}
// Releases a connection back to the connection pool,
// returning a promise resolved when the connection is released.
@ -318,7 +312,7 @@ Object.assign(Client.prototype, {
}
return Promise.resolve();
},
}
// Destroy the current connection pool for the client.
async destroy(callback) {
@ -337,34 +331,32 @@ Object.assign(Client.prototype, {
}
throw err;
}
},
}
// Return the database being used by this client.
database() {
return this.connectionSettings.database;
},
}
toString() {
return '[object KnexClient]';
},
canCancelQuery: false,
}
assertCanCancelQuery() {
if (!this.canCancelQuery) {
throw new Error('Query cancelling not supported for this dialect');
}
},
}
cancelQuery() {
throw new Error('Query cancelling not supported for this dialect');
},
}
// Formatter part
alias(first, second) {
return first + ' as ' + second;
},
}
// Checks whether a value is a function... if it is, we compile it
// otherwise we check whether it's a raw
@ -378,7 +370,17 @@ Object.assign(Client.prototype, {
);
}
return unwrapRaw(value, true, builder, this, formatter) || '?';
},
}
}
Object.assign(Client.prototype, {
_escapeBinding: makeEscape({
escapeString(str) {
return `'${str.replace(/'/g, "''")}'`;
},
}),
canCancelQuery: false,
});
module.exports = Client;

View File

@ -3,7 +3,6 @@
const flatten = require('lodash/flatten');
const map = require('lodash/map');
const values = require('lodash/values');
const { inherits } = require('util');
const Client = require('../../client');
@ -19,31 +18,24 @@ const SQL_BIGINT_SAFE = { MIN: -9007199254740991, MAX: 9007199254740991 };
// Always initialize with the "QueryBuilder" and "QueryCompiler" objects, which
// extend the base 'lib/query/builder' and 'lib/query/compiler', respectively.
function Client_MSSQL(config = {}) {
// #1235 mssql module wants 'server', not 'host'. This is to enforce the same
// options object across all dialects.
if (config && config.connection && config.connection.host) {
config.connection.server = config.connection.host;
class Client_MSSQL extends Client {
constructor(config = {}) {
super(config);
// #1235 mssql module wants 'server', not 'host'. This is to enforce the same
// options object across all dialects.
if (config && config.connection && config.connection.host) {
config.connection.server = config.connection.host;
}
// mssql always creates pool :( lets try to unpool it as much as possible
this.mssqlPoolSettings = {
min: 1,
max: 1,
idleTimeoutMillis: Number.MAX_SAFE_INTEGER,
evictionRunIntervalMillis: 0,
};
}
// mssql always creates pool :( lets try to unpool it as much as possible
this.mssqlPoolSettings = {
min: 1,
max: 1,
idleTimeoutMillis: Number.MAX_SAFE_INTEGER,
evictionRunIntervalMillis: 0,
};
Client.call(this, config);
}
inherits(Client_MSSQL, Client);
Object.assign(Client_MSSQL.prototype, {
dialect: 'mssql',
driverName: 'mssql',
_driver() {
const tds = require('tedious');
const mssqlTedious = require('mssql');
@ -181,31 +173,31 @@ Object.assign(Client_MSSQL.prototype, {
}
return mssqlTedious;
},
}
formatter() {
return new MSSQL_Formatter(this, ...arguments);
},
}
transaction() {
return new Transaction(this, ...arguments);
},
}
queryCompiler(builder, formatter) {
return new QueryCompiler(this, builder, formatter);
},
}
schemaCompiler() {
return new SchemaCompiler(this, ...arguments);
},
}
tableCompiler() {
return new TableCompiler(this, ...arguments);
},
}
columnCompiler() {
return new ColumnCompiler(this, ...arguments);
},
}
wrapIdentifierImpl(value) {
if (value === '*') {
@ -213,7 +205,7 @@ Object.assign(Client_MSSQL.prototype, {
}
return `[${value.replace(/[[\]]+/g, '')}]`;
},
}
// Get a raw connection, called by the `pool` whenever a new
// connection needs to be added to the pool.
@ -233,7 +225,7 @@ Object.assign(Client_MSSQL.prototype, {
resolver(connection);
});
});
},
}
validateConnection(connection) {
if (connection.connected === true) {
@ -241,7 +233,7 @@ Object.assign(Client_MSSQL.prototype, {
}
return false;
},
}
// Used to explicitly close a connection, called internally by the pool
// when a connection times out or the pool is shutdown.
@ -250,7 +242,7 @@ Object.assign(Client_MSSQL.prototype, {
// some times close will reject just because pool has already been destoyed
// internally by the driver there is nothing we can do in this case
});
},
}
// Position the bindings for the query.
positionBindings(sql) {
@ -263,7 +255,7 @@ Object.assign(Client_MSSQL.prototype, {
questionCount += 1;
return `@p${questionCount}`;
});
},
}
// Grab a connection, run the query via the MSSQL streaming interface,
// and pass that through to the stream we've sent back to the client.
@ -288,7 +280,7 @@ Object.assign(Client_MSSQL.prototype, {
req.pipe(stream);
req.query(sql);
});
},
}
// Runs the query on the specified connection, providing the bindings
// and any other necessary prep work.
@ -314,7 +306,7 @@ Object.assign(Client_MSSQL.prototype, {
resolver(obj);
});
});
},
}
// sets a request input parameter. Detects bigints and decimals and sets type appropriately.
_setReqInput(req, i, binding) {
@ -334,7 +326,7 @@ Object.assign(Client_MSSQL.prototype, {
} else {
req.input(`p${i}`, binding);
}
},
}
// Process the response as returned from the query.
processResponse(obj, runner) {
@ -369,7 +361,13 @@ Object.assign(Client_MSSQL.prototype, {
default:
return response;
}
},
}
}
Object.assign(Client_MSSQL.prototype, {
dialect: 'mssql',
driverName: 'mssql',
});
module.exports = Client_MSSQL;

View File

@ -2,7 +2,7 @@
// -------
const defer = require('lodash/defer');
const map = require('lodash/map');
const { promisify, inherits } = require('util');
const { promisify } = require('util');
const Client = require('../../client');
const Transaction = require('./transaction');
@ -16,46 +16,34 @@ const { makeEscape } = require('../../util/string');
// Always initialize with the "QueryBuilder" and "QueryCompiler"
// objects, which extend the base 'lib/query/builder' and
// 'lib/query/compiler', respectively.
function Client_MySQL(config) {
Client.call(this, config);
}
inherits(Client_MySQL, Client);
Object.assign(Client_MySQL.prototype, {
dialect: 'mysql',
driverName: 'mysql',
class Client_MySQL extends Client {
_driver() {
return require('mysql');
},
}
queryCompiler(builder, formatter) {
return new QueryCompiler(this, builder, formatter);
},
}
schemaCompiler() {
return new SchemaCompiler(this, ...arguments);
},
}
tableCompiler() {
return new TableCompiler(this, ...arguments);
},
}
columnCompiler() {
return new ColumnCompiler(this, ...arguments);
},
}
transaction() {
return new Transaction(this, ...arguments);
},
_escapeBinding: makeEscape(),
}
wrapIdentifierImpl(value) {
return value !== '*' ? `\`${value.replace(/`/g, '``')}\`` : '*';
},
}
// Get a raw connection, called by the `pool` whenever a new
// connection needs to be added to the pool.
@ -74,7 +62,7 @@ Object.assign(Client_MySQL.prototype, {
resolver(connection);
});
});
},
}
// Used to explicitly close a connection, called internally by the pool
// when a connection times out or the pool is shutdown.
@ -88,7 +76,7 @@ Object.assign(Client_MySQL.prototype, {
// see discussion https://github.com/knex/knex/pull/3483
defer(() => connection.removeAllListeners());
}
},
}
validateConnection(connection) {
if (
@ -98,7 +86,7 @@ Object.assign(Client_MySQL.prototype, {
return true;
}
return false;
},
}
// Grab a connection, run the query via the MySQL streaming interface,
// and pass that through to the stream we've sent back to the client.
@ -119,7 +107,7 @@ Object.assign(Client_MySQL.prototype, {
queryStream.pipe(stream);
});
},
}
// Runs the query on the specified connection, providing the bindings
// and any other necessary prep work.
@ -141,7 +129,7 @@ Object.assign(Client_MySQL.prototype, {
}
);
});
},
}
// Process the response as returned from the query.
processResponse(obj, runner) {
@ -169,9 +157,7 @@ Object.assign(Client_MySQL.prototype, {
default:
return response;
}
},
canCancelQuery: true,
}
async cancelQuery(connectionToKill) {
const conn = await this.acquireConnection();
@ -185,7 +171,17 @@ Object.assign(Client_MySQL.prototype, {
} finally {
await this.releaseConnection(conn);
}
},
}
}
Object.assign(Client_MySQL.prototype, {
dialect: 'mysql',
driverName: 'mysql',
_escapeBinding: makeEscape(),
canCancelQuery: true,
});
module.exports = Client_MySQL;

View File

@ -3,9 +3,7 @@ const Debug = require('debug');
const debug = Debug('knex:tx');
class Transaction_MySQL extends Transaction {}
Object.assign(Transaction_MySQL.prototype, {
class Transaction_MySQL extends Transaction {
query(conn, sql, status, value) {
const t = this;
const q = this.trxClient
@ -42,7 +40,7 @@ Object.assign(Transaction_MySQL.prototype, {
t._completed = true;
}
return q;
},
});
}
}
module.exports = Transaction_MySQL;

View File

@ -1,35 +1,30 @@
// MySQL2 Client
// -------
const { inherits } = require('util');
const Client_MySQL = require('../mysql');
const Transaction = require('./transaction');
// Always initialize with the "QueryBuilder" and "QueryCompiler"
// objects, which extend the base 'lib/query/builder' and
// 'lib/query/compiler', respectively.
function Client_MySQL2(config) {
Client_MySQL.call(this, config);
}
inherits(Client_MySQL2, Client_MySQL);
Object.assign(Client_MySQL2.prototype, {
// The "dialect", for reference elsewhere.
driverName: 'mysql2',
class Client_MySQL2 extends Client_MySQL {
transaction() {
return new Transaction(this, ...arguments);
},
}
_driver() {
return require('mysql2');
},
}
validateConnection(connection) {
if (connection._fatalError) {
return false;
}
return true;
},
}
}
Object.assign(Client_MySQL2.prototype, {
// The "dialect", for reference elsewhere.
driverName: 'mysql2',
});
module.exports = Client_MySQL2;

View File

@ -1,9 +1,7 @@
const Transaction = require('../../execution/transaction');
const debug = require('debug')('knex:tx');
class Transaction_MySQL2 extends Transaction {}
Object.assign(Transaction_MySQL2.prototype, {
class Transaction_MySQL2 extends Transaction {
query(conn, sql, status, value) {
const t = this;
const q = this.trxClient
@ -40,7 +38,7 @@ Object.assign(Transaction_MySQL2.prototype, {
t._completed = true;
}
return q;
},
});
}
}
module.exports = Transaction_MySQL2;

View File

@ -1,7 +1,5 @@
// Oracle Client
// -------
const { inherits } = require('util');
const { ReturningHelper } = require('./utils');
const { isConnectionError } = require('./utils');
const Client = require('../../client');
@ -13,37 +11,27 @@ const TableCompiler = require('./schema/oracle-tablecompiler');
// Always initialize with the "QueryBuilder" and "QueryCompiler"
// objects, which extend the base 'lib/query/builder' and
// 'lib/query/compiler', respectively.
function Client_Oracle(config) {
Client.call(this, config);
}
inherits(Client_Oracle, Client);
Object.assign(Client_Oracle.prototype, {
dialect: 'oracle',
driverName: 'oracle',
class Client_Oracle extends Client {
schemaCompiler() {
return new SchemaCompiler(this, ...arguments);
},
}
columnBuilder() {
return new ColumnBuilder(this, ...arguments);
},
}
columnCompiler() {
return new ColumnCompiler(this, ...arguments);
},
}
tableCompiler() {
return new TableCompiler(this, ...arguments);
},
}
// Return the database for the Oracle client.
database() {
return this.connectionSettings.database;
},
}
// Position the bindings for the query.
positionBindings(sql) {
@ -52,7 +40,7 @@ Object.assign(Client_Oracle.prototype, {
questionCount += 1;
return `:${questionCount}`;
});
},
}
_stream(connection, obj, stream, options) {
return new Promise(function (resolver, rejecter) {
@ -74,13 +62,13 @@ Object.assign(Client_Oracle.prototype, {
stream.emit('error', error);
});
});
},
}
// Formatter part
alias(first, second) {
return first + ' ' + second;
},
}
parameter(value, builder, formatter) {
// Returning helper uses always ROWID as string
@ -90,7 +78,13 @@ Object.assign(Client_Oracle.prototype, {
value = value ? 1 : 0;
}
return super.parameter(value, builder, formatter);
},
}
}
Object.assign(Client_Oracle.prototype, {
dialect: 'oracle',
driverName: 'oracle',
});
module.exports = Client_Oracle;

View File

@ -1,6 +1,6 @@
// Oracledb Client
// -------
const { promisify, inherits } = require('util');
const { promisify } = require('util');
const stream = require('stream');
const each = require('lodash/each');
const flatten = require('lodash/flatten');
@ -18,87 +18,410 @@ const { isString } = require('../../util/is');
const { outputQuery, unwrapRaw } = require('../../formatter/wrappingFormatter');
const { compileCallback } = require('../../formatter/formatterUtils');
function Client_Oracledb() {
Client_Oracle.apply(this, arguments);
// Node.js only have 4 background threads by default, oracledb needs one by connection
if (this.driver) {
process.env.UV_THREADPOOL_SIZE = process.env.UV_THREADPOOL_SIZE || 1;
process.env.UV_THREADPOOL_SIZE =
parseInt(process.env.UV_THREADPOOL_SIZE) + this.driver.poolMax;
class Client_Oracledb extends Client_Oracle {
constructor(config) {
super(config);
if (this.driver) {
process.env.UV_THREADPOOL_SIZE = process.env.UV_THREADPOOL_SIZE || 1;
process.env.UV_THREADPOOL_SIZE =
parseInt(process.env.UV_THREADPOOL_SIZE) + this.driver.poolMax;
}
}
}
inherits(Client_Oracledb, Client_Oracle);
Client_Oracledb.prototype.driverName = 'oracledb';
Client_Oracledb.prototype._driver = function () {
const client = this;
const oracledb = require('oracledb');
client.fetchAsString = [];
if (this.config.fetchAsString && Array.isArray(this.config.fetchAsString)) {
this.config.fetchAsString.forEach(function (type) {
if (!isString(type)) return;
type = type.toUpperCase();
if (oracledb[type]) {
if (
type !== 'NUMBER' &&
type !== 'DATE' &&
type !== 'CLOB' &&
type !== 'BUFFER'
) {
this.logger.warn(
'Only "date", "number", "clob" and "buffer" are supported for fetchAsString'
);
_driver() {
const client = this;
const oracledb = require('oracledb');
client.fetchAsString = [];
if (this.config.fetchAsString && Array.isArray(this.config.fetchAsString)) {
this.config.fetchAsString.forEach(function (type) {
if (!isString(type)) return;
type = type.toUpperCase();
if (oracledb[type]) {
if (
type !== 'NUMBER' &&
type !== 'DATE' &&
type !== 'CLOB' &&
type !== 'BUFFER'
) {
this.logger.warn(
'Only "date", "number", "clob" and "buffer" are supported for fetchAsString'
);
}
client.fetchAsString.push(oracledb[type]);
}
client.fetchAsString.push(oracledb[type]);
});
}
return oracledb;
}
queryCompiler(builder, formatter) {
return new QueryCompiler(this, builder, formatter);
}
columnCompiler() {
return new ColumnCompiler(this, ...arguments);
}
formatter(builder) {
return new Formatter(this, builder);
}
transaction() {
return new Transaction(this, ...arguments);
}
prepBindings(bindings) {
return map(bindings, (value) => {
if (value instanceof BlobHelper && this.driver) {
return { type: this.driver.BLOB, dir: this.driver.BIND_OUT };
// Returning helper always use ROWID as string
} else if (value instanceof ReturningHelper && this.driver) {
return { type: this.driver.STRING, dir: this.driver.BIND_OUT };
} else if (typeof value === 'boolean') {
return value ? 1 : 0;
}
return value;
});
}
return oracledb;
};
Client_Oracledb.prototype.queryCompiler = function (builder, formatter) {
return new QueryCompiler(this, builder, formatter);
};
Client_Oracledb.prototype.columnCompiler = function () {
return new ColumnCompiler(this, ...arguments);
};
Client_Oracledb.prototype.formatter = function (builder) {
return new Formatter(this, builder);
};
Client_Oracledb.prototype.transaction = function () {
return new Transaction(this, ...arguments);
};
Client_Oracledb.prototype.prepBindings = function (bindings) {
return map(bindings, (value) => {
if (value instanceof BlobHelper && this.driver) {
return { type: this.driver.BLOB, dir: this.driver.BIND_OUT };
// Returning helper always use ROWID as string
} else if (value instanceof ReturningHelper && this.driver) {
return { type: this.driver.STRING, dir: this.driver.BIND_OUT };
} else if (typeof value === 'boolean') {
return value ? 1 : 0;
// Checks whether a value is a function... if it is, we compile it
// otherwise we check whether it's a raw
parameter(value, builder, formatter) {
if (typeof value === 'function') {
return outputQuery(
compileCallback(value, undefined, this, formatter),
true,
builder,
this
);
} else if (value instanceof BlobHelper) {
return 'EMPTY_BLOB()';
}
return value;
});
};
// Checks whether a value is a function... if it is, we compile it
// otherwise we check whether it's a raw
Client_Oracledb.prototype.parameter = function (value, builder, formatter) {
if (typeof value === 'function') {
return outputQuery(
compileCallback(value, undefined, this, formatter),
true,
builder,
this
);
} else if (value instanceof BlobHelper) {
return 'EMPTY_BLOB()';
return unwrapRaw(value, true, builder, this, formatter) || '?';
}
return unwrapRaw(value, true, builder, this, formatter) || '?';
};
// Get a raw connection, called by the `pool` whenever a new
// connection needs to be added to the pool.
acquireRawConnection() {
const client = this;
const asyncConnection = new Promise(function (resolver, rejecter) {
// If external authentication don't have to worry about username/password and
// if not need to set the username and password
const oracleDbConfig = client.connectionSettings.externalAuth
? { externalAuth: client.connectionSettings.externalAuth }
: {
user: client.connectionSettings.user,
password: client.connectionSettings.password,
};
// In the case of external authentication connection string will be given
oracleDbConfig.connectString = resolveConnectString(
client.connectionSettings
);
if (client.connectionSettings.prefetchRowCount) {
oracleDbConfig.prefetchRows =
client.connectionSettings.prefetchRowCount;
}
if (client.connectionSettings.stmtCacheSize !== undefined) {
oracleDbConfig.stmtCacheSize = client.connectionSettings.stmtCacheSize;
}
client.driver.fetchAsString = client.fetchAsString;
client.driver.getConnection(oracleDbConfig, function (err, connection) {
if (err) {
return rejecter(err);
}
connection.commitAsync = function () {
return new Promise((commitResolve, commitReject) => {
this.commit(function (err) {
if (err) {
return commitReject(err);
}
commitResolve();
});
});
};
connection.rollbackAsync = function () {
return new Promise((rollbackResolve, rollbackReject) => {
this.rollback(function (err) {
if (err) {
return rollbackReject(err);
}
rollbackResolve();
});
});
};
const fetchAsync = promisify(function (sql, bindParams, options, cb) {
options = options || {};
options.outFormat =
client.driver.OUT_FORMAT_OBJECT || client.driver.OBJECT;
if (!options.outFormat) {
throw new Error('not found oracledb.outFormat constants');
}
if (options.resultSet) {
connection.execute(
sql,
bindParams || [],
options,
function (err, result) {
if (err) {
if (isConnectionError(err)) {
connection.close().catch(function (err) {});
connection.__knex__disposed = err;
}
return cb(err);
}
const fetchResult = { rows: [], resultSet: result.resultSet };
const numRows = 100;
const fetchRowsFromRS = function (
connection,
resultSet,
numRows
) {
resultSet.getRows(numRows, function (err, rows) {
if (err) {
if (isConnectionError(err)) {
connection.close().catch(function (err) {});
connection.__knex__disposed = err;
}
resultSet.close(function () {
return cb(err);
});
} else if (rows.length === 0) {
return cb(null, fetchResult);
} else if (rows.length > 0) {
if (rows.length === numRows) {
fetchResult.rows = fetchResult.rows.concat(rows);
fetchRowsFromRS(connection, resultSet, numRows);
} else {
fetchResult.rows = fetchResult.rows.concat(rows);
return cb(null, fetchResult);
}
}
});
};
fetchRowsFromRS(connection, result.resultSet, numRows);
}
);
} else {
connection.execute(
sql,
bindParams || [],
options,
function (err, result) {
if (err) {
// dispose the connection on connection error
if (isConnectionError(err)) {
connection.close().catch(function (err) {});
connection.__knex__disposed = err;
}
return cb(err);
}
return cb(null, result);
}
);
}
});
connection.executeAsync = function (sql, bindParams, options) {
// Read all lob
return fetchAsync(sql, bindParams, options).then(async (results) => {
const closeResultSet = () => {
return results.resultSet
? promisify(results.resultSet.close).call(results.resultSet)
: Promise.resolve();
};
// Collect LOBs to read
const lobs = [];
if (results.rows) {
if (Array.isArray(results.rows)) {
for (let i = 0; i < results.rows.length; i++) {
// Iterate through the rows
const row = results.rows[i];
for (const column in row) {
if (row[column] instanceof stream.Readable) {
lobs.push({ index: i, key: column, stream: row[column] });
}
}
}
}
}
try {
for (const lob of lobs) {
// todo should be fetchAsString/fetchAsBuffer polyfill only
results.rows[lob.index][lob.key] = await lobProcessing(
lob.stream
);
}
} catch (e) {
await closeResultSet().catch(() => {});
throw e;
}
await closeResultSet();
return results;
});
};
resolver(connection);
});
});
return asyncConnection;
}
// Used to explicitly close a connection, called internally by the pool
// when a connection times out or the pool is shutdown.
destroyRawConnection(connection) {
return connection.release();
}
// Runs the query on the specified connection, providing the bindings
// and any other necessary prep work.
_query(connection, obj) {
if (!obj.sql) throw new Error('The query is empty');
const options = { autoCommit: false };
if (obj.method === 'select') {
options.resultSet = true;
}
return connection
.executeAsync(obj.sql, obj.bindings, options)
.then(async function (response) {
// Flatten outBinds
let outBinds = flatten(response.outBinds);
obj.response = response.rows || [];
obj.rowsAffected = response.rows
? response.rows.rowsAffected
: response.rowsAffected;
//added for outBind parameter
if (obj.method === 'raw' && outBinds.length > 0) {
return {
response: outBinds,
};
}
if (obj.method === 'update') {
const modifiedRowsCount = obj.rowsAffected.length || obj.rowsAffected;
const updatedObjOutBinding = [];
const updatedOutBinds = [];
const updateOutBinds = (i) =>
function (value, index) {
const OutBindsOffset = index * modifiedRowsCount;
updatedOutBinds.push(outBinds[i + OutBindsOffset]);
};
for (let i = 0; i < modifiedRowsCount; i++) {
updatedObjOutBinding.push(obj.outBinding[0]);
each(obj.outBinding[0], updateOutBinds(i));
}
outBinds = updatedOutBinds;
obj.outBinding = updatedObjOutBinding;
}
if (!obj.returning && outBinds.length === 0) {
if (!connection.isTransaction) {
await connection.commitAsync();
}
return obj;
}
const rowIds = [];
let offset = 0;
for (let line = 0; line < obj.outBinding.length; line++) {
const ret = obj.outBinding[line];
offset =
offset +
(obj.outBinding[line - 1] ? obj.outBinding[line - 1].length : 0);
for (let index = 0; index < ret.length; index++) {
const out = ret[index];
await new Promise(function (bindResolver, bindRejecter) {
if (out instanceof BlobHelper) {
const blob = outBinds[index + offset];
if (out.returning) {
obj.response[line] = obj.response[line] || {};
obj.response[line][out.columnName] = out.value;
}
blob.on('error', function (err) {
bindRejecter(err);
});
blob.on('finish', function () {
bindResolver();
});
blob.write(out.value);
blob.end();
} else if (obj.outBinding[line][index] === 'ROWID') {
rowIds.push(outBinds[index + offset]);
bindResolver();
} else {
obj.response[line] = obj.response[line] || {};
obj.response[line][out] = outBinds[index + offset];
bindResolver();
}
});
}
}
if (connection.isTransaction) {
return obj;
}
await connection.commitAsync();
if (obj.returningSql) {
const response = await connection.executeAsync(
obj.returningSql(),
rowIds,
{ resultSet: true }
);
obj.response = response.rows;
}
return obj;
});
}
// Process the response as returned from the query.
processResponse(obj, runner) {
let response = obj.response;
const method = obj.method;
if (obj.output) {
return obj.output.call(runner, response);
}
switch (method) {
case 'select':
case 'pluck':
case 'first':
if (obj.method === 'pluck') {
response = map(response, obj.pluck);
}
return obj.method === 'first' ? response[0] : response;
case 'insert':
case 'del':
case 'update':
case 'counter':
if (obj.returning && !isEmpty(obj.returning)) {
if (obj.returning.length === 1 && obj.returning[0] !== '*') {
return flatten(map(response, values));
}
return response;
} else if (obj.rowsAffected !== undefined) {
return obj.rowsAffected;
} else {
return 1;
}
default:
return response;
}
}
}
Client_Oracledb.prototype.driverName = 'oracledb';
function resolveConnectString(connectionSettings) {
if (connectionSettings.connectString) {
@ -118,291 +441,6 @@ function resolveConnectString(connectionSettings) {
);
}
// Get a raw connection, called by the `pool` whenever a new
// connection needs to be added to the pool.
Client_Oracledb.prototype.acquireRawConnection = function () {
const client = this;
const asyncConnection = new Promise(function (resolver, rejecter) {
// If external authentication don't have to worry about username/password and
// if not need to set the username and password
const oracleDbConfig = client.connectionSettings.externalAuth
? { externalAuth: client.connectionSettings.externalAuth }
: {
user: client.connectionSettings.user,
password: client.connectionSettings.password,
};
// In the case of external authentication connection string will be given
oracleDbConfig.connectString = resolveConnectString(
client.connectionSettings
);
if (client.connectionSettings.prefetchRowCount) {
oracleDbConfig.prefetchRows = client.connectionSettings.prefetchRowCount;
}
if (client.connectionSettings.stmtCacheSize !== undefined) {
oracleDbConfig.stmtCacheSize = client.connectionSettings.stmtCacheSize;
}
client.driver.fetchAsString = client.fetchAsString;
client.driver.getConnection(oracleDbConfig, function (err, connection) {
if (err) {
return rejecter(err);
}
connection.commitAsync = function () {
return new Promise((commitResolve, commitReject) => {
this.commit(function (err) {
if (err) {
return commitReject(err);
}
commitResolve();
});
});
};
connection.rollbackAsync = function () {
return new Promise((rollbackResolve, rollbackReject) => {
this.rollback(function (err) {
if (err) {
return rollbackReject(err);
}
rollbackResolve();
});
});
};
const fetchAsync = promisify(function (sql, bindParams, options, cb) {
options = options || {};
options.outFormat =
client.driver.OUT_FORMAT_OBJECT || client.driver.OBJECT;
if (!options.outFormat) {
throw new Error('not found oracledb.outFormat constants');
}
if (options.resultSet) {
connection.execute(
sql,
bindParams || [],
options,
function (err, result) {
if (err) {
if (isConnectionError(err)) {
connection.close().catch(function (err) {});
connection.__knex__disposed = err;
}
return cb(err);
}
const fetchResult = { rows: [], resultSet: result.resultSet };
const numRows = 100;
const fetchRowsFromRS = function (
connection,
resultSet,
numRows
) {
resultSet.getRows(numRows, function (err, rows) {
if (err) {
if (isConnectionError(err)) {
connection.close().catch(function (err) {});
connection.__knex__disposed = err;
}
resultSet.close(function () {
return cb(err);
});
} else if (rows.length === 0) {
return cb(null, fetchResult);
} else if (rows.length > 0) {
if (rows.length === numRows) {
fetchResult.rows = fetchResult.rows.concat(rows);
fetchRowsFromRS(connection, resultSet, numRows);
} else {
fetchResult.rows = fetchResult.rows.concat(rows);
return cb(null, fetchResult);
}
}
});
};
fetchRowsFromRS(connection, result.resultSet, numRows);
}
);
} else {
connection.execute(
sql,
bindParams || [],
options,
function (err, result) {
if (err) {
// dispose the connection on connection error
if (isConnectionError(err)) {
connection.close().catch(function (err) {});
connection.__knex__disposed = err;
}
return cb(err);
}
return cb(null, result);
}
);
}
});
connection.executeAsync = function (sql, bindParams, options) {
// Read all lob
return fetchAsync(sql, bindParams, options).then(async (results) => {
const closeResultSet = () => {
return results.resultSet
? promisify(results.resultSet.close).call(results.resultSet)
: Promise.resolve();
};
// Collect LOBs to read
const lobs = [];
if (results.rows) {
if (Array.isArray(results.rows)) {
for (let i = 0; i < results.rows.length; i++) {
// Iterate through the rows
const row = results.rows[i];
for (const column in row) {
if (row[column] instanceof stream.Readable) {
lobs.push({ index: i, key: column, stream: row[column] });
}
}
}
}
}
try {
for (const lob of lobs) {
// todo should be fetchAsString/fetchAsBuffer polyfill only
results.rows[lob.index][lob.key] = await lobProcessing(
lob.stream
);
}
} catch (e) {
await closeResultSet().catch(() => {});
throw e;
}
await closeResultSet();
return results;
});
};
resolver(connection);
});
});
return asyncConnection;
};
// Used to explicitly close a connection, called internally by the pool
// when a connection times out or the pool is shutdown.
Client_Oracledb.prototype.destroyRawConnection = function (connection) {
return connection.release();
};
// Runs the query on the specified connection, providing the bindings
// and any other necessary prep work.
Client_Oracledb.prototype._query = function (connection, obj) {
if (!obj.sql) throw new Error('The query is empty');
const options = { autoCommit: false };
if (obj.method === 'select') {
options.resultSet = true;
}
return connection
.executeAsync(obj.sql, obj.bindings, options)
.then(async function (response) {
// Flatten outBinds
let outBinds = flatten(response.outBinds);
obj.response = response.rows || [];
obj.rowsAffected = response.rows
? response.rows.rowsAffected
: response.rowsAffected;
//added for outBind parameter
if (obj.method === 'raw' && outBinds.length > 0) {
return {
response: outBinds,
};
}
if (obj.method === 'update') {
const modifiedRowsCount = obj.rowsAffected.length || obj.rowsAffected;
const updatedObjOutBinding = [];
const updatedOutBinds = [];
const updateOutBinds = (i) =>
function (value, index) {
const OutBindsOffset = index * modifiedRowsCount;
updatedOutBinds.push(outBinds[i + OutBindsOffset]);
};
for (let i = 0; i < modifiedRowsCount; i++) {
updatedObjOutBinding.push(obj.outBinding[0]);
each(obj.outBinding[0], updateOutBinds(i));
}
outBinds = updatedOutBinds;
obj.outBinding = updatedObjOutBinding;
}
if (!obj.returning && outBinds.length === 0) {
if (!connection.isTransaction) {
await connection.commitAsync();
}
return obj;
}
const rowIds = [];
let offset = 0;
for (let line = 0; line < obj.outBinding.length; line++) {
const ret = obj.outBinding[line];
offset =
offset +
(obj.outBinding[line - 1] ? obj.outBinding[line - 1].length : 0);
for (let index = 0; index < ret.length; index++) {
const out = ret[index];
await new Promise(function (bindResolver, bindRejecter) {
if (out instanceof BlobHelper) {
const blob = outBinds[index + offset];
if (out.returning) {
obj.response[line] = obj.response[line] || {};
obj.response[line][out.columnName] = out.value;
}
blob.on('error', function (err) {
bindRejecter(err);
});
blob.on('finish', function () {
bindResolver();
});
blob.write(out.value);
blob.end();
} else if (obj.outBinding[line][index] === 'ROWID') {
rowIds.push(outBinds[index + offset]);
bindResolver();
} else {
obj.response[line] = obj.response[line] || {};
obj.response[line][out] = outBinds[index + offset];
bindResolver();
}
});
}
}
if (connection.isTransaction) {
return obj;
}
await connection.commitAsync();
if (obj.returningSql) {
const response = await connection.executeAsync(
obj.returningSql(),
rowIds,
{ resultSet: true }
);
obj.response = response.rows;
}
return obj;
});
};
/**
* @param stream
* @param {'string' | 'buffer'} type
@ -427,40 +465,6 @@ function readStream(stream, type) {
});
}
// Process the response as returned from the query.
Client_Oracledb.prototype.processResponse = function (obj, runner) {
let response = obj.response;
const method = obj.method;
if (obj.output) {
return obj.output.call(runner, response);
}
switch (method) {
case 'select':
case 'pluck':
case 'first':
if (obj.method === 'pluck') {
response = map(response, obj.pluck);
}
return obj.method === 'first' ? response[0] : response;
case 'insert':
case 'del':
case 'update':
case 'counter':
if (obj.returning && !isEmpty(obj.returning)) {
if (obj.returning.length === 1 && obj.returning[0] !== '*') {
return flatten(map(response, values));
}
return response;
} else if (obj.rowsAffected !== undefined) {
return obj.rowsAffected;
} else {
return 1;
}
default:
return response;
}
};
const lobProcessing = function (stream) {
const oracledb = require('oracledb');

View File

@ -2,7 +2,7 @@
// -------
const extend = require('lodash/extend');
const map = require('lodash/map');
const { promisify, inherits } = require('util');
const { promisify } = require('util');
const Client = require('../../client');
const Transaction = require('./execution/pg-transaction');
@ -13,46 +13,255 @@ const SchemaCompiler = require('./schema/pg-compiler');
const { makeEscape } = require('../../util/string');
const { isString } = require('../../util/is');
function Client_PG(config) {
Client.apply(this, arguments);
if (config.returning) {
this.defaultReturning = config.returning;
}
class Client_PG extends Client {
constructor(config) {
super(config);
if (config.returning) {
this.defaultReturning = config.returning;
}
if (config.searchPath) {
this.searchPath = config.searchPath;
if (config.searchPath) {
this.searchPath = config.searchPath;
}
}
}
inherits(Client_PG, Client);
Object.assign(Client_PG.prototype, {
transaction() {
return new Transaction(this, ...arguments);
},
}
queryCompiler(builder, formatter) {
return new QueryCompiler(this, builder, formatter);
},
}
columnCompiler() {
return new ColumnCompiler(this, ...arguments);
},
}
schemaCompiler() {
return new SchemaCompiler(this, ...arguments);
},
}
tableCompiler() {
return new TableCompiler(this, ...arguments);
},
dialect: 'postgresql',
driverName: 'pg',
}
_driver() {
return require('pg');
},
}
wrapIdentifierImpl(value) {
if (value === '*') return value;
let arrayAccessor = '';
const arrayAccessorMatch = value.match(/(.*?)(\[[0-9]+\])/);
if (arrayAccessorMatch) {
value = arrayAccessorMatch[1];
arrayAccessor = arrayAccessorMatch[2];
}
return `"${value.replace(/"/g, '""')}"${arrayAccessor}`;
}
// Get a raw connection, called by the `pool` whenever a new
// connection needs to be added to the pool.
acquireRawConnection() {
const client = this;
return new Promise(function (resolver, rejecter) {
const connection = new client.driver.Client(client.connectionSettings);
connection.connect(function (err, connection) {
if (err) {
return rejecter(err);
}
connection.on('error', (err) => {
connection.__knex__disposed = err;
});
connection.on('end', (err) => {
connection.__knex__disposed = err || 'Connection ended unexpectedly';
});
if (!client.version) {
return client.checkVersion(connection).then(function (version) {
client.version = version;
resolver(connection);
});
}
resolver(connection);
});
}).then(function setSearchPath(connection) {
client.setSchemaSearchPath(connection);
return connection;
});
}
// Used to explicitly close a connection, called internally by the pool
// when a connection times out or the pool is shutdown.
async destroyRawConnection(connection) {
const end = promisify((cb) => connection.end(cb));
return end();
}
// In PostgreSQL, we need to do a version check to do some feature
// checking on the database.
checkVersion(connection) {
return new Promise(function (resolver, rejecter) {
connection.query('select version();', function (err, resp) {
if (err) return rejecter(err);
resolver(/^PostgreSQL (.*?)( |$)/.exec(resp.rows[0].version)[1]);
});
});
}
// Position the bindings for the query. The escape sequence for question mark
// is \? (e.g. knex.raw("\\?") since javascript requires '\' to be escaped too...)
positionBindings(sql) {
let questionCount = 0;
return sql.replace(/(\\*)(\?)/g, function (match, escapes) {
if (escapes.length % 2) {
return '?';
} else {
questionCount++;
return `$${questionCount}`;
}
});
}
setSchemaSearchPath(connection, searchPath) {
let path = searchPath || this.searchPath;
if (!path) return Promise.resolve(true);
if (!Array.isArray(path) && !isString(path)) {
throw new TypeError(
`knex: Expected searchPath to be Array/String, got: ${typeof path}`
);
}
if (isString(path)) {
if (path.includes(',')) {
const parts = path.split(',');
const arraySyntax = `[${parts
.map((searchPath) => `'${searchPath}'`)
.join(', ')}]`;
this.logger.warn(
`Detected comma in searchPath "${path}".` +
`If you are trying to specify multiple schemas, use Array syntax: ${arraySyntax}`
);
}
path = [path];
}
path = path.map((schemaName) => `"${schemaName}"`).join(',');
return new Promise(function (resolver, rejecter) {
connection.query(`set search_path to ${path}`, function (err) {
if (err) return rejecter(err);
resolver(true);
});
});
}
_stream(connection, obj, stream, options) {
const PGQueryStream = process.browser
? undefined
: require('pg-query-stream');
const sql = obj.sql;
return new Promise(function (resolver, rejecter) {
const queryStream = connection.query(
new PGQueryStream(sql, obj.bindings, options)
);
queryStream.on('error', function (error) {
rejecter(error);
stream.emit('error', error);
});
// 'end' IS propagated by .pipe, by default
stream.on('end', resolver);
queryStream.pipe(stream);
});
}
// Runs the query on the specified connection, providing the bindings
// and any other necessary prep work.
_query(connection, obj) {
let queryConfig = {
text: obj.sql,
values: obj.bindings || [],
};
if (obj.options) {
queryConfig = extend(queryConfig, obj.options);
}
return new Promise(function (resolver, rejecter) {
connection.query(queryConfig, function (err, response) {
if (err) return rejecter(err);
obj.response = response;
resolver(obj);
});
});
}
// Ensures the response is returned in the same format as other clients.
processResponse(obj, runner) {
const resp = obj.response;
if (obj.output) return obj.output.call(runner, resp);
if (obj.method === 'raw') return resp;
const { returning } = obj;
if (resp.command === 'SELECT') {
if (obj.method === 'first') return resp.rows[0];
if (obj.method === 'pluck') return map(resp.rows, obj.pluck);
return resp.rows;
}
if (returning) {
const returns = [];
for (let i = 0, l = resp.rows.length; i < l; i++) {
const row = resp.rows[i];
if (returning === '*' || Array.isArray(returning)) {
returns[i] = row;
} else {
// Pluck the only column in the row.
returns[i] = row[Object.keys(row)[0]];
}
}
return returns;
}
if (resp.command === 'UPDATE' || resp.command === 'DELETE') {
return resp.rowCount;
}
return resp;
}
async cancelQuery(connectionToKill) {
// Error out if we can't acquire connection in time.
// Purposely not putting timeout on `pg_cancel_backend` execution because erroring
// early there would release the `connectionToKill` back to the pool with
// a `KILL QUERY` command yet to finish.
const conn = await this.acquireConnection();
try {
return await this._wrappedCancelQueryCall(conn, connectionToKill);
} finally {
// NOT returning this promise because we want to release the connection
// in a non-blocking fashion
this.releaseConnection(conn);
}
}
_wrappedCancelQueryCall(conn, connectionToKill) {
return this.query(conn, {
method: 'raw',
sql: 'SELECT pg_cancel_backend(?);',
bindings: [connectionToKill.processID],
options: {},
});
}
}
Object.assign(Client_PG.prototype, {
dialect: 'postgresql',
driverName: 'pg',
canCancelQuery: true,
_escapeBinding: makeEscape({
escapeArray(val, esc) {
@ -92,215 +301,6 @@ Object.assign(Client_PG.prototype, {
return JSON.stringify(val);
},
}),
wrapIdentifierImpl(value) {
if (value === '*') return value;
let arrayAccessor = '';
const arrayAccessorMatch = value.match(/(.*?)(\[[0-9]+\])/);
if (arrayAccessorMatch) {
value = arrayAccessorMatch[1];
arrayAccessor = arrayAccessorMatch[2];
}
return `"${value.replace(/"/g, '""')}"${arrayAccessor}`;
},
// Get a raw connection, called by the `pool` whenever a new
// connection needs to be added to the pool.
acquireRawConnection() {
const client = this;
return new Promise(function (resolver, rejecter) {
const connection = new client.driver.Client(client.connectionSettings);
connection.connect(function (err, connection) {
if (err) {
return rejecter(err);
}
connection.on('error', (err) => {
connection.__knex__disposed = err;
});
connection.on('end', (err) => {
connection.__knex__disposed = err || 'Connection ended unexpectedly';
});
if (!client.version) {
return client.checkVersion(connection).then(function (version) {
client.version = version;
resolver(connection);
});
}
resolver(connection);
});
}).then(function setSearchPath(connection) {
client.setSchemaSearchPath(connection);
return connection;
});
},
// Used to explicitly close a connection, called internally by the pool
// when a connection times out or the pool is shutdown.
async destroyRawConnection(connection) {
const end = promisify((cb) => connection.end(cb));
return end();
},
// In PostgreSQL, we need to do a version check to do some feature
// checking on the database.
checkVersion(connection) {
return new Promise(function (resolver, rejecter) {
connection.query('select version();', function (err, resp) {
if (err) return rejecter(err);
resolver(/^PostgreSQL (.*?)( |$)/.exec(resp.rows[0].version)[1]);
});
});
},
// Position the bindings for the query. The escape sequence for question mark
// is \? (e.g. knex.raw("\\?") since javascript requires '\' to be escaped too...)
positionBindings(sql) {
let questionCount = 0;
return sql.replace(/(\\*)(\?)/g, function (match, escapes) {
if (escapes.length % 2) {
return '?';
} else {
questionCount++;
return `$${questionCount}`;
}
});
},
setSchemaSearchPath(connection, searchPath) {
let path = searchPath || this.searchPath;
if (!path) return Promise.resolve(true);
if (!Array.isArray(path) && !isString(path)) {
throw new TypeError(
`knex: Expected searchPath to be Array/String, got: ${typeof path}`
);
}
if (isString(path)) {
if (path.includes(',')) {
const parts = path.split(',');
const arraySyntax = `[${parts
.map((searchPath) => `'${searchPath}'`)
.join(', ')}]`;
this.logger.warn(
`Detected comma in searchPath "${path}".` +
`If you are trying to specify multiple schemas, use Array syntax: ${arraySyntax}`
);
}
path = [path];
}
path = path.map((schemaName) => `"${schemaName}"`).join(',');
return new Promise(function (resolver, rejecter) {
connection.query(`set search_path to ${path}`, function (err) {
if (err) return rejecter(err);
resolver(true);
});
});
},
_stream(connection, obj, stream, options) {
const PGQueryStream = process.browser
? undefined
: require('pg-query-stream');
const sql = obj.sql;
return new Promise(function (resolver, rejecter) {
const queryStream = connection.query(
new PGQueryStream(sql, obj.bindings, options)
);
queryStream.on('error', function (error) {
rejecter(error);
stream.emit('error', error);
});
// 'end' IS propagated by .pipe, by default
stream.on('end', resolver);
queryStream.pipe(stream);
});
},
// Runs the query on the specified connection, providing the bindings
// and any other necessary prep work.
_query(connection, obj) {
let queryConfig = {
text: obj.sql,
values: obj.bindings || [],
};
if (obj.options) {
queryConfig = extend(queryConfig, obj.options);
}
return new Promise(function (resolver, rejecter) {
connection.query(queryConfig, function (err, response) {
if (err) return rejecter(err);
obj.response = response;
resolver(obj);
});
});
},
// Ensures the response is returned in the same format as other clients.
processResponse(obj, runner) {
const resp = obj.response;
if (obj.output) return obj.output.call(runner, resp);
if (obj.method === 'raw') return resp;
const { returning } = obj;
if (resp.command === 'SELECT') {
if (obj.method === 'first') return resp.rows[0];
if (obj.method === 'pluck') return map(resp.rows, obj.pluck);
return resp.rows;
}
if (returning) {
const returns = [];
for (let i = 0, l = resp.rows.length; i < l; i++) {
const row = resp.rows[i];
if (returning === '*' || Array.isArray(returning)) {
returns[i] = row;
} else {
// Pluck the only column in the row.
returns[i] = row[Object.keys(row)[0]];
}
}
return returns;
}
if (resp.command === 'UPDATE' || resp.command === 'DELETE') {
return resp.rowCount;
}
return resp;
},
canCancelQuery: true,
async cancelQuery(connectionToKill) {
// Error out if we can't acquire connection in time.
// Purposely not putting timeout on `pg_cancel_backend` execution because erroring
// early there would release the `connectionToKill` back to the pool with
// a `KILL QUERY` command yet to finish.
const conn = await this.acquireConnection();
try {
return await this._wrappedCancelQueryCall(conn, connectionToKill);
} finally {
// NOT returning this promise because we want to release the connection
// in a non-blocking fashion
this.releaseConnection(conn);
}
},
_wrappedCancelQueryCall(conn, connectionToKill) {
return this.query(conn, {
method: 'raw',
sql: 'SELECT pg_cancel_backend(?);',
bindings: [connectionToKill.processID],
options: {},
});
},
});
function arrayString(arr, esc) {

View File

@ -97,7 +97,7 @@ class TableCompiler_PG extends TableCompiler {
}
} else {
// base class implementation for normal add
TableCompiler.prototype.addColumns.call(this, columns, prefix);
super.addColumns(columns, prefix);
}
}

View File

@ -1,6 +1,5 @@
// Redshift
// -------
const { inherits } = require('util');
const Client_PG = require('../postgres');
const map = require('lodash/map');
@ -11,43 +10,34 @@ const ColumnCompiler = require('./schema/redshift-columncompiler');
const TableCompiler = require('./schema/redshift-tablecompiler');
const SchemaCompiler = require('./schema/redshift-compiler');
function Client_Redshift(config) {
Client_PG.apply(this, arguments);
}
inherits(Client_Redshift, Client_PG);
Object.assign(Client_Redshift.prototype, {
class Client_Redshift extends Client_PG {
transaction() {
return new Transaction(this, ...arguments);
},
}
queryCompiler(builder, formatter) {
return new QueryCompiler(this, builder, formatter);
},
}
columnBuilder() {
return new ColumnBuilder(this, ...arguments);
},
}
columnCompiler() {
return new ColumnCompiler(this, ...arguments);
},
}
tableCompiler() {
return new TableCompiler(this, ...arguments);
},
}
schemaCompiler() {
return new SchemaCompiler(this, ...arguments);
},
dialect: 'redshift',
driverName: 'pg-redshift',
}
_driver() {
return require('pg');
},
}
// Ensures the response is returned in the same format as other clients.
processResponse(obj, runner) {
@ -67,7 +57,13 @@ Object.assign(Client_Redshift.prototype, {
return resp.rowCount;
}
return resp;
},
}
}
Object.assign(Client_Redshift.prototype, {
dialect: 'redshift',
driverName: 'pg-redshift',
});
module.exports = Client_Redshift;

View File

@ -8,7 +8,7 @@ class ColumnBuilder_Redshift extends ColumnBuilder {
// primary needs to set not null on non-preexisting columns, or fail
primary() {
this.notNullable();
return ColumnBuilder.prototype.primary.apply(this, arguments);
return super.primary(...arguments);
}
index() {

View File

@ -2,7 +2,7 @@
// -------
const defaults = require('lodash/defaults');
const map = require('lodash/map');
const { promisify, inherits } = require('util');
const { promisify } = require('util');
const Client = require('../../client');
@ -14,55 +14,49 @@ const TableCompiler = require('./schema/sqlite-tablecompiler');
const SQLite3_DDL = require('./schema/ddl');
const SQLite3_Formatter = require('./sqlite-formatter');
function Client_SQLite3(config) {
Client.call(this, config);
if (config.useNullAsDefault === undefined) {
this.logger.warn(
'sqlite does not support inserting default values. Set the ' +
'`useNullAsDefault` flag to hide this warning. ' +
'(see docs http://knexjs.org/#Builder-insert).'
);
class Client_SQLite3 extends Client {
constructor(config) {
super(config);
if (config.useNullAsDefault === undefined) {
this.logger.warn(
'sqlite does not support inserting default values. Set the ' +
'`useNullAsDefault` flag to hide this warning. ' +
'(see docs http://knexjs.org/#Builder-insert).'
);
}
}
}
inherits(Client_SQLite3, Client);
Object.assign(Client_SQLite3.prototype, {
dialect: 'sqlite3',
driverName: 'sqlite3',
_driver() {
return require('sqlite3');
},
}
schemaCompiler() {
return new SchemaCompiler(this, ...arguments);
},
}
transaction() {
return new Transaction(this, ...arguments);
},
}
queryCompiler(builder, formatter) {
return new SqliteQueryCompiler(this, builder, formatter);
},
}
columnCompiler() {
return new ColumnCompiler(this, ...arguments);
},
}
tableCompiler() {
return new TableCompiler(this, ...arguments);
},
}
ddl(compiler, pragma, connection) {
return new SQLite3_DDL(this, compiler, pragma, connection);
},
}
wrapIdentifierImpl(value) {
return value !== '*' ? `\`${value.replace(/`/g, '``')}\`` : '*';
},
}
// Get a raw connection from the database, returning a promise with the connection object.
acquireRawConnection() {
@ -77,14 +71,14 @@ Object.assign(Client_SQLite3.prototype, {
}
);
});
},
}
// Used to explicitly close a connection, called internally by the pool when
// a connection times out or the pool is shutdown.
async destroyRawConnection(connection) {
const close = promisify((cb) => connection.close(cb));
return close();
},
}
// Runs the query on the specified connection, providing the bindings and any
// other necessary prep work.
@ -117,7 +111,7 @@ Object.assign(Client_SQLite3.prototype, {
return resolver(obj);
});
});
},
}
_stream(connection, sql, stream) {
const client = this;
@ -135,7 +129,7 @@ Object.assign(Client_SQLite3.prototype, {
stream.end();
});
});
},
}
// Ensures the response is returned in the same format as other clients.
processResponse(obj, runner) {
@ -157,18 +151,21 @@ Object.assign(Client_SQLite3.prototype, {
default:
return response;
}
},
}
poolDefaults() {
return defaults(
{ min: 1, max: 1 },
Client.prototype.poolDefaults.call(this)
);
},
return defaults({ min: 1, max: 1 }, super.poolDefaults());
}
formatter() {
return new SQLite3_Formatter(this, ...arguments);
},
}
}
Object.assign(Client_SQLite3.prototype, {
dialect: 'sqlite3',
driverName: 'sqlite3',
});
module.exports = Client_SQLite3;

View File

@ -3,16 +3,6 @@ const assert = require('assert');
// JoinClause
// -------
// The "JoinClause" is an object holding any necessary info about a join,
// including the type, and any associated tables & columns being joined.
function JoinClause(table, type, schema) {
this.schema = schema;
this.table = table;
this.joinType = type;
this.and = this;
this.clauses = [];
}
function getClauseFromArguments(compilerType, bool, first, operator, second) {
let data = null;
@ -51,8 +41,20 @@ function getClauseFromArguments(compilerType, bool, first, operator, second) {
return data;
}
Object.assign(JoinClause.prototype, {
grouping: 'join',
// The "JoinClause" is an object holding any necessary info about a join,
// including the type, and any associated tables & columns being joined.
class JoinClause {
constructor(table, type, schema) {
this.schema = schema;
this.table = table;
this.joinType = type;
this.and = this;
this.clauses = [];
}
get or() {
return this._bool('or');
}
// Adds an "on" clause to the current join object.
on(first) {
@ -73,17 +75,17 @@ Object.assign(JoinClause.prototype, {
}
return this;
},
}
// Adds a "using" clause to the current join.
using(column) {
return this.clauses.push({ type: 'onUsing', column, bool: this._bool() });
},
}
// Adds an "or on" clause to the current join object.
orOn(first, operator, second) {
return this._bool('or').on.apply(this, arguments);
},
}
onVal(first) {
if (typeof first === 'object' && typeof first.toSQL !== 'function') {
@ -103,15 +105,15 @@ Object.assign(JoinClause.prototype, {
}
return this;
},
}
andOnVal() {
return this.onVal(...arguments);
},
}
orOnVal() {
return this._bool('or').onVal(...arguments);
},
}
onBetween(column, values) {
assert(
@ -130,19 +132,19 @@ Object.assign(JoinClause.prototype, {
not: this._not(),
});
return this;
},
}
onNotBetween(column, values) {
return this._not(true).onBetween(column, values);
},
}
orOnBetween(column, values) {
return this._bool('or').onBetween(column, values);
},
}
orOnNotBetween(column, values) {
return this._bool('or')._not(true).onBetween(column, values);
},
}
onIn(column, values) {
if (Array.isArray(values) && values.length === 0) return this.on(1, '=', 0);
@ -154,19 +156,19 @@ Object.assign(JoinClause.prototype, {
bool: this._bool(),
});
return this;
},
}
onNotIn(column, values) {
return this._not(true).onIn(column, values);
},
}
orOnIn(column, values) {
return this._bool('or').onIn(column, values);
},
}
orOnNotIn(column, values) {
return this._bool('or')._not(true).onIn(column, values);
},
}
onNull(column) {
this.clauses.push({
@ -176,19 +178,19 @@ Object.assign(JoinClause.prototype, {
bool: this._bool(),
});
return this;
},
}
orOnNull(callback) {
return this._bool('or').onNull(callback);
},
}
onNotNull(callback) {
return this._not(true).onNull(callback);
},
}
orOnNotNull(callback) {
return this._not(true)._bool('or').onNull(callback);
},
}
onExists(callback) {
this.clauses.push({
@ -198,25 +200,25 @@ Object.assign(JoinClause.prototype, {
bool: this._bool(),
});
return this;
},
}
orOnExists(callback) {
return this._bool('or').onExists(callback);
},
}
onNotExists(callback) {
return this._not(true).onExists(callback);
},
}
orOnNotExists(callback) {
return this._not(true)._bool('or').onExists(callback);
},
}
// Explicitly set the type of join, useful within a function when creating a grouped join.
type(type) {
this.joinType = type;
return this;
},
}
_bool(bool) {
if (arguments.length === 1) {
@ -226,7 +228,7 @@ Object.assign(JoinClause.prototype, {
const ret = this._boolFlag || 'and';
this._boolFlag = 'and';
return ret;
},
}
_not(val) {
if (arguments.length === 1) {
@ -236,13 +238,11 @@ Object.assign(JoinClause.prototype, {
const ret = this._notFlag;
this._notFlag = false;
return ret;
},
});
}
}
Object.defineProperty(JoinClause.prototype, 'or', {
get() {
return this._bool('or');
},
Object.assign(JoinClause.prototype, {
grouping: 'join',
});
JoinClause.prototype.andOn = JoinClause.prototype.on;

View File

@ -271,6 +271,15 @@ class Builder extends EventEmitter {
return this._joinType('raw').join.apply(this, arguments);
}
// Where modifiers:
get or() {
return this._bool('or');
}
get not() {
return this._not(true);
}
// The where function can be used in several ways:
// The most basic is `where(key, value)`, which expands to
// where key = value.
@ -1397,18 +1406,6 @@ const validateWithArgs = function (alias, statement, method) {
);
};
Object.defineProperty(Builder.prototype, 'or', {
get() {
return this._bool('or');
},
});
Object.defineProperty(Builder.prototype, 'not', {
get() {
return this._not(true);
},
});
Builder.prototype.select = Builder.prototype.columns;
Builder.prototype.column = Builder.prototype.columns;
Builder.prototype.andWhereNot = Builder.prototype.whereNot;

View File

@ -1,6 +1,5 @@
// Raw
// -------
const { inherits } = require('util');
const { EventEmitter } = require('events');
const debug = require('debug');
const assign = require('lodash/assign');
@ -17,24 +16,23 @@ const {
const debugBindings = debug('knex:bindings');
function Raw(client) {
this.client = client;
class Raw extends EventEmitter {
constructor(client) {
super();
this.sql = '';
this.bindings = [];
this.client = client;
// Todo: Deprecate
this._wrappedBefore = undefined;
this._wrappedAfter = undefined;
if (client && client.config) {
this._debug = client.config.debug;
saveAsyncStack(this, 4);
this.sql = '';
this.bindings = [];
// Todo: Deprecate
this._wrappedBefore = undefined;
this._wrappedAfter = undefined;
if (client && client.config) {
this._debug = client.config.debug;
saveAsyncStack(this, 4);
}
}
}
inherits(Raw, EventEmitter);
assign(Raw.prototype, {
set(sql, bindings) {
this.sql = sql;
this.bindings =
@ -43,7 +41,7 @@ assign(Raw.prototype, {
: [bindings];
return this;
},
}
timeout(ms, { cancel } = {}) {
if (isNumber(ms) && ms > 0) {
@ -54,19 +52,19 @@ assign(Raw.prototype, {
}
}
return this;
},
}
// Wraps the current sql with `before` and `after`.
wrap(before, after) {
this._wrappedBefore = before;
this._wrappedAfter = after;
return this;
},
}
// Calls `toString` on the Knex object.
toString() {
return this.toQuery();
},
}
// Returns the raw sql for the query.
toSQL(method, tz) {
@ -125,8 +123,8 @@ assign(Raw.prototype, {
});
return obj;
},
});
}
}
function replaceRawArrBindings(raw, formatter) {
const expectedBindings = raw.bindings.length;

View File

@ -1,4 +1,3 @@
const { inherits } = require('util');
const Knex = require('../../lib/index');
const QueryBuilder = require('../../lib/query/querybuilder');
const { expect } = require('chai');
@ -353,11 +352,7 @@ describe('knex', () => {
it('throws if client module has not been installed', () => {
// create dummy dialect which always fails when trying to load driver
const SqliteClient = require(`../../lib/dialects/sqlite3/index.js`);
function ClientFoobar(config) {
SqliteClient.call(this, config);
}
inherits(ClientFoobar, SqliteClient);
class ClientFoobar extends SqliteClient {}
ClientFoobar.prototype._driver = () => {
throw new Error('Cannot require...');