2016-06-20 17:03:52 +02:00
|
|
|
const Utils = require('../oracle/utils');
|
2021-10-20 23:35:51 +03:00
|
|
|
const { promisify } = require('util');
|
|
|
|
const stream = require('stream');
|
2016-06-20 17:03:52 +02:00
|
|
|
|
|
|
|
function BlobHelper(columnName, value) {
|
|
|
|
this.columnName = columnName;
|
|
|
|
this.value = value;
|
|
|
|
this.returning = false;
|
|
|
|
}
|
|
|
|
|
2020-04-19 00:40:23 +02:00
|
|
|
BlobHelper.prototype.toString = function () {
|
2016-06-20 17:03:52 +02:00
|
|
|
return '[object BlobHelper:' + this.columnName + ']';
|
|
|
|
};
|
|
|
|
|
2021-10-20 23:35:51 +03:00
|
|
|
/**
|
|
|
|
* @param stream
|
|
|
|
* @param {'string' | 'buffer'} type
|
|
|
|
*/
|
|
|
|
function readStream(stream, type) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
let data = type === 'string' ? '' : Buffer.alloc(0);
|
|
|
|
|
|
|
|
stream.on('error', function (err) {
|
|
|
|
reject(err);
|
|
|
|
});
|
|
|
|
stream.on('data', function (chunk) {
|
|
|
|
if (type === 'string') {
|
|
|
|
data += chunk;
|
|
|
|
} else {
|
|
|
|
data = Buffer.concat([data, chunk]);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
stream.on('end', function () {
|
|
|
|
resolve(data);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
const lobProcessing = function (stream) {
|
|
|
|
const oracledb = require('oracledb');
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @type 'string' | 'buffer'
|
|
|
|
*/
|
|
|
|
let type;
|
|
|
|
|
|
|
|
if (stream.type) {
|
|
|
|
// v1.2-v4
|
|
|
|
if (stream.type === oracledb.BLOB) {
|
|
|
|
type = 'buffer';
|
|
|
|
} else if (stream.type === oracledb.CLOB) {
|
|
|
|
type = 'string';
|
|
|
|
}
|
|
|
|
} else if (stream.iLob) {
|
|
|
|
// v1
|
|
|
|
if (stream.iLob.type === oracledb.CLOB) {
|
|
|
|
type = 'string';
|
|
|
|
} else if (stream.iLob.type === oracledb.BLOB) {
|
|
|
|
type = 'buffer';
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
throw new Error('Unrecognized oracledb lob stream type');
|
|
|
|
}
|
|
|
|
if (type === 'string') {
|
|
|
|
stream.setEncoding('utf-8');
|
|
|
|
}
|
|
|
|
return readStream(stream, type);
|
|
|
|
};
|
|
|
|
|
|
|
|
function monkeyPatchConnection(connection, client) {
|
|
|
|
// Connection is already monkey-patched
|
|
|
|
if (connection.executeAsync) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
connection.commitAsync = function () {
|
|
|
|
return new Promise((commitResolve, commitReject) => {
|
|
|
|
this.commit(function (err) {
|
|
|
|
if (err) {
|
|
|
|
return commitReject(err);
|
|
|
|
}
|
|
|
|
commitResolve();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
};
|
|
|
|
connection.rollbackAsync = function () {
|
|
|
|
return new Promise((rollbackResolve, rollbackReject) => {
|
|
|
|
this.rollback(function (err) {
|
|
|
|
if (err) {
|
|
|
|
return rollbackReject(err);
|
|
|
|
}
|
|
|
|
rollbackResolve();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
};
|
|
|
|
const fetchAsync = promisify(function (sql, bindParams, options, cb) {
|
|
|
|
options = options || {};
|
|
|
|
options.outFormat = client.driver.OUT_FORMAT_OBJECT || client.driver.OBJECT;
|
|
|
|
if (!options.outFormat) {
|
|
|
|
throw new Error('not found oracledb.outFormat constants');
|
|
|
|
}
|
|
|
|
if (options.resultSet) {
|
|
|
|
connection.execute(
|
|
|
|
sql,
|
|
|
|
bindParams || [],
|
|
|
|
options,
|
|
|
|
function (err, result) {
|
|
|
|
if (err) {
|
|
|
|
if (Utils.isConnectionError(err)) {
|
|
|
|
connection.close().catch(function (err) {});
|
|
|
|
connection.__knex__disposed = err;
|
|
|
|
}
|
|
|
|
return cb(err);
|
|
|
|
}
|
|
|
|
const fetchResult = { rows: [], resultSet: result.resultSet };
|
|
|
|
const numRows = 100;
|
|
|
|
const fetchRowsFromRS = function (connection, resultSet, numRows) {
|
|
|
|
resultSet.getRows(numRows, function (err, rows) {
|
|
|
|
if (err) {
|
|
|
|
if (Utils.isConnectionError(err)) {
|
|
|
|
connection.close().catch(function (err) {});
|
|
|
|
connection.__knex__disposed = err;
|
|
|
|
}
|
|
|
|
resultSet.close(function () {
|
|
|
|
return cb(err);
|
|
|
|
});
|
|
|
|
} else if (rows.length === 0) {
|
|
|
|
return cb(null, fetchResult);
|
|
|
|
} else if (rows.length > 0) {
|
|
|
|
if (rows.length === numRows) {
|
|
|
|
fetchResult.rows = fetchResult.rows.concat(rows);
|
|
|
|
fetchRowsFromRS(connection, resultSet, numRows);
|
|
|
|
} else {
|
|
|
|
fetchResult.rows = fetchResult.rows.concat(rows);
|
|
|
|
return cb(null, fetchResult);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
};
|
|
|
|
fetchRowsFromRS(connection, result.resultSet, numRows);
|
|
|
|
}
|
|
|
|
);
|
|
|
|
} else {
|
|
|
|
connection.execute(
|
|
|
|
sql,
|
|
|
|
bindParams || [],
|
|
|
|
options,
|
|
|
|
function (err, result) {
|
|
|
|
if (err) {
|
|
|
|
// dispose the connection on connection error
|
|
|
|
if (Utils.isConnectionError(err)) {
|
|
|
|
connection.close().catch(function (err) {});
|
|
|
|
connection.__knex__disposed = err;
|
|
|
|
}
|
|
|
|
return cb(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
return cb(null, result);
|
|
|
|
}
|
|
|
|
);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
connection.executeAsync = function (sql, bindParams, options) {
|
|
|
|
// Read all lob
|
|
|
|
return fetchAsync(sql, bindParams, options).then(async (results) => {
|
|
|
|
const closeResultSet = () => {
|
|
|
|
return results.resultSet
|
|
|
|
? promisify(results.resultSet.close).call(results.resultSet)
|
|
|
|
: Promise.resolve();
|
|
|
|
};
|
|
|
|
|
|
|
|
// Collect LOBs to read
|
|
|
|
const lobs = [];
|
|
|
|
if (results.rows) {
|
|
|
|
if (Array.isArray(results.rows)) {
|
|
|
|
for (let i = 0; i < results.rows.length; i++) {
|
|
|
|
// Iterate through the rows
|
|
|
|
const row = results.rows[i];
|
|
|
|
for (const column in row) {
|
|
|
|
if (row[column] instanceof stream.Readable) {
|
|
|
|
lobs.push({ index: i, key: column, stream: row[column] });
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
try {
|
|
|
|
for (const lob of lobs) {
|
|
|
|
// todo should be fetchAsString/fetchAsBuffer polyfill only
|
|
|
|
results.rows[lob.index][lob.key] = await lobProcessing(lob.stream);
|
|
|
|
}
|
|
|
|
} catch (e) {
|
|
|
|
await closeResultSet().catch(() => {});
|
|
|
|
|
|
|
|
throw e;
|
|
|
|
}
|
|
|
|
|
|
|
|
await closeResultSet();
|
|
|
|
|
|
|
|
return results;
|
|
|
|
});
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2016-06-20 17:03:52 +02:00
|
|
|
Utils.BlobHelper = BlobHelper;
|
2021-10-20 23:35:51 +03:00
|
|
|
Utils.monkeyPatchConnection = monkeyPatchConnection;
|
2016-06-20 17:03:52 +02:00
|
|
|
module.exports = Utils;
|