knex/lib/dialects/oracle/stream.js

51 lines
1.3 KiB
JavaScript
Raw Normal View History

'use strict';
2014-08-11 12:25:39 +02:00
/*jslint node:true, nomen: true*/
var inherits = require('inherits')
var merge = require('lodash/object/merge')
var Readable = require('stream').Readable
2014-08-11 12:25:39 +02:00
function OracleQueryStream(connection, sql, bindings, options) {
Readable.call(this, merge({}, {
objectMode: true,
highWaterMark: 1000
}, options))
this.oracleReader = connection.reader(sql, bindings || [])
2014-08-11 12:25:39 +02:00
}
inherits(OracleQueryStream, Readable)
2014-08-11 12:25:39 +02:00
2014-11-26 17:45:11 -06:00
OracleQueryStream.prototype._read = function() {
var stream = this;
2014-11-26 17:45:11 -06:00
function pushNull() {
process.nextTick(function() {
stream.push(null)
})
2014-11-26 17:45:11 -06:00
}
try {
this.oracleReader.nextRows(function(err, rows) {
if (err) return stream.emit('error', err)
2014-11-26 17:45:11 -06:00
if (rows.length === 0) {
pushNull()
2014-11-26 17:45:11 -06:00
} else {
for (var i = 0; i < rows.length; i++) {
if (rows[i]) {
stream.push(rows[i])
2014-11-26 17:45:11 -06:00
} else {
pushNull()
2014-11-26 17:45:11 -06:00
}
}
}
})
2014-11-26 17:45:11 -06:00
} catch (e) {
// Catch Error: invalid state: reader is busy with another nextRows call
// and return false to rate limit stream.
if (e.message ===
'invalid state: reader is busy with another nextRows call') {
return false
2014-11-26 17:45:11 -06:00
} else {
this.emit('error', e)
2014-11-26 17:45:11 -06:00
}
}
}
2014-11-26 17:45:11 -06:00
module.exports = OracleQueryStream