mirror of
https://github.com/knex/knex.git
synced 2025-12-27 15:08:47 +00:00
Beginning to use =>
This commit is contained in:
parent
521a6f9e97
commit
e3ef628156
@ -16,21 +16,24 @@ function OracleQueryStream(connection, sql, bindings, options) {
|
||||
inherits(OracleQueryStream, Readable);
|
||||
|
||||
OracleQueryStream.prototype._read = function () {
|
||||
var stream = this;
|
||||
var _this = this;
|
||||
|
||||
function pushNull() {
|
||||
var _this2 = this;
|
||||
|
||||
process.nextTick(function () {
|
||||
stream.push(null);
|
||||
_this2.push(null);
|
||||
});
|
||||
}
|
||||
try {
|
||||
this.oracleReader.nextRows(function (err, rows) {
|
||||
if (err) return stream.emit('error', err);
|
||||
if (err) return _this.emit('error', err);
|
||||
if (rows.length === 0) {
|
||||
pushNull();
|
||||
} else {
|
||||
for (var i = 0; i < rows.length; i++) {
|
||||
if (rows[i]) {
|
||||
stream.push(rows[i]);
|
||||
_this.push(rows[i]);
|
||||
} else {
|
||||
pushNull();
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@ var debug = require('debug')('knex:tx');
|
||||
// Acts as a facade for a Promise, keeping the internal state
|
||||
// and managing any child transactions.
|
||||
function Transaction(client, container, config, outerTx) {
|
||||
var _this = this;
|
||||
|
||||
var txid = this.txid = uniqueId('trx');
|
||||
|
||||
@ -25,15 +26,13 @@ function Transaction(client, container, config, outerTx) {
|
||||
|
||||
debug('%s: Starting %s transaction', txid, outerTx ? 'nested' : 'top level');
|
||||
|
||||
var t = this;
|
||||
|
||||
this._promise = Promise.using(this.acquireConnection(client, config, txid), function (connection) {
|
||||
|
||||
var trxClient = t.trxClient = makeTxClient(t, client, connection);
|
||||
var init = client.transacting ? t.savepoint(connection) : t.begin(connection);
|
||||
var trxClient = _this.trxClient = makeTxClient(_this, client, connection);
|
||||
var init = client.transacting ? _this.savepoint(connection) : _this.begin(connection);
|
||||
|
||||
init.then(function () {
|
||||
return makeTransactor(t, connection, trxClient);
|
||||
return makeTransactor(_this, connection, trxClient);
|
||||
}).then(function (transactor) {
|
||||
|
||||
var result = container(transactor);
|
||||
@ -51,8 +50,8 @@ function Transaction(client, container, config, outerTx) {
|
||||
});
|
||||
|
||||
return new Promise(function (resolver, rejecter) {
|
||||
t._resolver = resolver;
|
||||
t._rejecter = rejecter;
|
||||
_this._resolver = resolver;
|
||||
_this._rejecter = rejecter;
|
||||
});
|
||||
});
|
||||
|
||||
@ -115,18 +114,19 @@ assign(Transaction.prototype, {
|
||||
},
|
||||
|
||||
query: function query(conn, sql, status, value) {
|
||||
var t = this;
|
||||
var _this2 = this;
|
||||
|
||||
var q = this.trxClient.query(conn, sql)['catch'](function (err) {
|
||||
status = 2;
|
||||
value = err;
|
||||
t._completed = true;
|
||||
debug('%s error running transaction query', t.txid);
|
||||
_this2._completed = true;
|
||||
debug('%s error running transaction query', _this2.txid);
|
||||
}).tap(function () {
|
||||
if (status === 1) t._resolver(value);
|
||||
if (status === 2) t._rejecter(value);
|
||||
if (status === 1) _this2._resolver(value);
|
||||
if (status === 2) _this2._rejecter(value);
|
||||
});
|
||||
if (status === 1 || status === 2) {
|
||||
t._completed = true;
|
||||
this._completed = true;
|
||||
}
|
||||
return q;
|
||||
},
|
||||
|
||||
@ -14,21 +14,20 @@ function OracleQueryStream(connection, sql, bindings, options) {
|
||||
inherits(OracleQueryStream, Readable)
|
||||
|
||||
OracleQueryStream.prototype._read = function() {
|
||||
var stream = this;
|
||||
function pushNull() {
|
||||
process.nextTick(function() {
|
||||
stream.push(null)
|
||||
process.nextTick(() => {
|
||||
this.push(null)
|
||||
})
|
||||
}
|
||||
try {
|
||||
this.oracleReader.nextRows(function(err, rows) {
|
||||
if (err) return stream.emit('error', err)
|
||||
this.oracleReader.nextRows((err, rows) => {
|
||||
if (err) return this.emit('error', err)
|
||||
if (rows.length === 0) {
|
||||
pushNull()
|
||||
} else {
|
||||
for (var i = 0; i < rows.length; i++) {
|
||||
if (rows[i]) {
|
||||
stream.push(rows[i])
|
||||
this.push(rows[i])
|
||||
} else {
|
||||
pushNull()
|
||||
}
|
||||
|
||||
@ -23,17 +23,15 @@ function Transaction(client, container, config, outerTx) {
|
||||
|
||||
debug('%s: Starting %s transaction', txid, outerTx ? 'nested' : 'top level')
|
||||
|
||||
var t = this
|
||||
|
||||
this._promise = Promise.using(this.acquireConnection(client, config, txid), function(connection) {
|
||||
this._promise = Promise.using(this.acquireConnection(client, config, txid), (connection) => {
|
||||
|
||||
var trxClient = t.trxClient = makeTxClient(t, client, connection)
|
||||
var init = client.transacting ? t.savepoint(connection) : t.begin(connection)
|
||||
var trxClient = this.trxClient = makeTxClient(this, client, connection)
|
||||
var init = client.transacting ? this.savepoint(connection) : this.begin(connection)
|
||||
|
||||
init.then(function() {
|
||||
return makeTransactor(t, connection, trxClient)
|
||||
init.then(() => {
|
||||
return makeTransactor(this, connection, trxClient)
|
||||
})
|
||||
.then(function(transactor) {
|
||||
.then((transactor) => {
|
||||
|
||||
var result = container(transactor)
|
||||
|
||||
@ -41,19 +39,19 @@ function Transaction(client, container, config, outerTx) {
|
||||
// and it's got the transaction object we're running for this, assume
|
||||
// the rollback and commit are chained to this object's success / failure.
|
||||
if (result && result.then && typeof result.then === 'function') {
|
||||
result.then(function(val) {
|
||||
result.then((val) => {
|
||||
transactor.commit(val)
|
||||
})
|
||||
.catch(function(err) {
|
||||
.catch((err) => {
|
||||
transactor.rollback(err)
|
||||
})
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
return new Promise(function(resolver, rejecter) {
|
||||
t._resolver = resolver
|
||||
t._rejecter = rejecter
|
||||
return new Promise((resolver, rejecter) => {
|
||||
this._resolver = resolver
|
||||
this._rejecter = rejecter
|
||||
})
|
||||
})
|
||||
|
||||
@ -118,20 +116,19 @@ assign(Transaction.prototype, {
|
||||
},
|
||||
|
||||
query: function(conn, sql, status, value) {
|
||||
var t = this
|
||||
var q = this.trxClient.query(conn, sql)
|
||||
.catch(function(err) {
|
||||
.catch((err) => {
|
||||
status = 2
|
||||
value = err
|
||||
t._completed = true
|
||||
debug('%s error running transaction query', t.txid)
|
||||
this._completed = true
|
||||
debug('%s error running transaction query', this.txid)
|
||||
})
|
||||
.tap(function() {
|
||||
if (status === 1) t._resolver(value)
|
||||
if (status === 2) t._rejecter(value)
|
||||
.tap(() => {
|
||||
if (status === 1) this._resolver(value)
|
||||
if (status === 2) this._rejecter(value)
|
||||
})
|
||||
if (status === 1 || status === 2) {
|
||||
t._completed = true
|
||||
this._completed = true
|
||||
}
|
||||
return q;
|
||||
},
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user