From c2ed9e85aacefc38c3676bc9347b6cef671e0b29 Mon Sep 17 00:00:00 2001 From: he-is-harry Date: Wed, 16 Apr 2025 10:36:52 -0400 Subject: [PATCH 1/2] Added isValid connection health check - Added initial isValid(timeout) check which will return true if the client can send a request to the server and receive a reply within the timeout - timeout is given in seconds - Updated Connection.send so that it can accept a communicationTimeout option to timeout a request to the server and return a timeout error - Added unit and integration tests for isValid --- lib/Client.js | 5 +++ lib/protocol/Connection.js | 82 ++++++++++++++++++++++++++---------- lib/protocol/ExecuteTask.js | 20 +++++---- test/acceptance/db.Events.js | 50 ++++++++++++++++++++++ test/hdb.Client.js | 16 +++---- test/lib.Connection.js | 74 +++++++++++++++++++++++++++----- test/lib.ExecuteTask.js | 2 +- 7 files changed, 200 insertions(+), 49 deletions(-) diff --git a/lib/Client.js b/lib/Client.js index 3f367cc..547083a 100644 --- a/lib/Client.js +++ b/lib/Client.js @@ -260,6 +260,11 @@ Client.prototype.setClientInfo = function setClientInfo(key, val) { } }; +Client.prototype.isValid = function isValid(timeout, cb) { + // Timeout is inputted in seconds, convert to milliseconds + this._connection.isValid(timeout * 1000, cb); +} + Client.prototype._execute = function _execute(command, options, cb) { var result = this._createResult(this._connection, options); this._connection.executeDirect({ diff --git a/lib/protocol/Connection.js b/lib/protocol/Connection.js index 1b998e1..2cfd87e 100644 --- a/lib/protocol/Connection.js +++ b/lib/protocol/Connection.js @@ -250,6 +250,13 @@ Connection.prototype._addListeners = function _addListeners(socket) { socket.removeListener('close', onclose); } + function clearStateTimeout() { + if (self._state.timeoutObject) { + clearTimeout(self._state.timeoutObject); + self._state.timeoutObject = undefined; + } + } + // register listerners on socket function ondata(chunk) { packet.push(chunk); @@ -258,17 +265,24 @@ Connection.prototype._addListeners = function _addListeners(socket) { self._state.sessionId = packet.header.sessionId; self._state.packetCount = -1; } - var buffer = packet.getData(); - packet.clear(); - var cb = self._state.receive; - self._state.receive = undefined; - self._state.messageType = undefined; - self.receive(buffer, cb); + // Ensure the packet corresponds to the current request + if (self._state.packetCount === -1 || self._state.packetCount === packet.header.packetCount) { + var buffer = packet.getData(); + packet.clear(); + var cb = self._state.receive; + self._state.receive = undefined; + self._state.messageType = undefined; + clearStateTimeout(); + self.receive(buffer, cb); + } else { + packet.clear(); + } } } socket.on('data', ondata); function onerror(err) { + clearStateTimeout(); var cb = self._state && self._state.receive; if (cb) { self._state.receive = null; // a callback should be called only once @@ -313,7 +327,7 @@ Connection.prototype._clearQueue = function _clearQueue(err) { } }; -Connection.prototype.send = function send(message, receive) { +Connection.prototype.send = function send(message, options, receive) { if (this._statementContext) { message.unshift(PartKind.STATEMENT_CONTEXT, this._statementContext.getOptions()); } @@ -354,6 +368,14 @@ Connection.prototype.send = function send(message, receive) { if (this._socket) { this._socket.write(packet); } + var self = this; + function onTimeout() { + self._state.receive = undefined; + receive(new Error('Socket receive timeout (receive took longer than ' + options.communicationTimeout + ' ms)')); + } + if (options.communicationTimeout) { + state.timeoutObject = setTimeout(onTimeout, options.communicationTimeout); + } }; @@ -422,7 +444,7 @@ Connection.prototype.receive = function receive(buffer, cb) { } }; -Connection.prototype.enqueue = function enqueue(task, cb) { +Connection.prototype.enqueue = function enqueue(task, options, cb) { var queueable; if (!this._socket || !this._queue || this.readyState === 'closed') { @@ -439,7 +461,7 @@ Connection.prototype.enqueue = function enqueue(task, cb) { queueable.name = task.name; } else if (util.isObject(task)) { if (task instanceof request.Segment) { - queueable = this._queue.createTask(this.send.bind(this, task), cb); + queueable = this._queue.createTask(this.send.bind(this, task, options), cb); queueable.name = MessageTypeName[task.type]; } else if (util.isFunction(task.run)) { queueable = task; @@ -574,7 +596,7 @@ Connection.prototype.connect = function connect(options, cb) { clientId: self.clientId, connectOptions: self.connectOptions.getOptions(), useCesu8: self.useCesu8 - }), connReceive); + }), options, connReceive); }); } @@ -588,7 +610,7 @@ Connection.prototype.connect = function connect(options, cb) { authOptions.dbConnectInfo = true; } - this.send(request.authenticate(authOptions), authReceive); + this.send(request.authenticate(authOptions), options, authReceive); }; Connection.prototype.disconnect = function disconnect(cb) { @@ -600,7 +622,7 @@ Connection.prototype.disconnect = function disconnect(cb) { } function enqueueDisconnect() { - self.enqueue(request.disconnect(), done); + self.enqueue(request.disconnect(), {}, done); } if (this.isIdle()) { @@ -616,7 +638,7 @@ Connection.prototype.executeDirect = function executeDirect(options, cb) { scrollableCursor: this.scrollableCursor, useCesu8: this.useCesu8 }, options); - this.enqueue(request.executeDirect(options), cb); + this.enqueue(request.executeDirect(options), options, cb); }; Connection.prototype.prepare = function prepare(options, cb) { @@ -625,7 +647,7 @@ Connection.prototype.prepare = function prepare(options, cb) { scrollableCursor: this.scrollableCursor, useCesu8: this.useCesu8 }, options); - this.enqueue(request.prepare(options), cb); + this.enqueue(request.prepare(options), options, cb); }; Connection.prototype.readLob = function readLob(options, cb) { @@ -635,7 +657,7 @@ Connection.prototype.readLob = function readLob(options, cb) { }; } options.autoCommit = this.autoCommit; - this.enqueue(request.readLob(options), cb); + this.enqueue(request.readLob(options), options, cb); }; Connection.prototype.execute = function execute(options, cb) { @@ -646,7 +668,7 @@ Connection.prototype.execute = function execute(options, cb) { parameters: EMPTY_BUFFER }, options); if (options.parameters === EMPTY_BUFFER) { - return this.enqueue(request.execute(options), cb); + return this.enqueue(request.execute(options), options, cb); } this.enqueue(createExecuteTask(this, options, cb)); }; @@ -654,16 +676,16 @@ Connection.prototype.execute = function execute(options, cb) { Connection.prototype.fetchNext = function fetchNext(options, cb) { options.autoCommit = this.autoCommit; options.useCesu8 = this.useCesu8; - this.enqueue(request.fetchNext(options), cb); + this.enqueue(request.fetchNext(options), options, cb); }; Connection.prototype.closeResultSet = function closeResultSet(options, cb) { - this.enqueue(request.closeResultSet(options), cb); + this.enqueue(request.closeResultSet(options), options, cb); }; Connection.prototype.dropStatement = function dropStatement(options, cb) { options.useCesu8 = this.useCesu8; - this.enqueue(request.dropStatementId(options), cb); + this.enqueue(request.dropStatementId(options), options, cb); }; Connection.prototype.commit = function commit(options, cb) { @@ -671,7 +693,7 @@ Connection.prototype.commit = function commit(options, cb) { cb = options; options = {}; } - this.enqueue(request.commit(options), cb); + this.enqueue(request.commit(options), options, cb); }; Connection.prototype.rollback = function rollback(options, cb) { @@ -679,7 +701,7 @@ Connection.prototype.rollback = function rollback(options, cb) { cb = options; options = {}; } - this.enqueue(request.rollback(options), cb); + this.enqueue(request.rollback(options), options, cb); }; // The function doesn't use the queue. It's used before the queue starts running @@ -689,7 +711,7 @@ Connection.prototype.fetchDbConnectInfo = function (options, cb) { err.code = 'EHDBCLOSE'; return cb(err) } - this.send(request.dbConnectInfo(options), function(err, reply) { + this.send(request.dbConnectInfo(options), options, function(err, reply) { if (err) { return cb(err); } @@ -728,6 +750,21 @@ Connection.prototype.destroy = function destroy(err) { } }; +Connection.prototype.isValid = function isValid(timeout, cb) { + var options = { + command: 'SELECT 1 FROM DUMMY WHERE 1 = 0', + communicationTimeout: timeout > 0 ? timeout : undefined, + } + this.executeDirect(options, function (err, reply) { + if (err) { + // Currently, isValid will swallow all errors and indicate invalid + cb(null, false); + } else { + cb(null, true); + } + }); +} + Connection.prototype.isIdle = function isIdle() { return this._queue.empty && !this._queue.busy; }; @@ -759,6 +796,7 @@ function ConnectionState() { this.packetCount = -1; this.messageType = undefined; this.receive = undefined; + this.timeoutObject = undefined; } function Version(major, minor) { diff --git a/lib/protocol/ExecuteTask.js b/lib/protocol/ExecuteTask.js index 4df380e..163272f 100644 --- a/lib/protocol/ExecuteTask.js +++ b/lib/protocol/ExecuteTask.js @@ -203,14 +203,15 @@ ExecuteTask.prototype.sendExecute = function sendExecute(cb) { if (err) { return cb(err); } - self.connection.send(request.execute({ + var options = { autoCommit: self.autoCommit, holdCursorsOverCommit: self.holdCursorsOverCommit, scrollableCursor: self.scrollableCursor, statementId: self.statementId, parameters: parameters, useCesu8: self.connection.useCesu8 - }), cb); + }; + self.connection.send(request.execute(options), options, cb); }); }; @@ -221,24 +222,27 @@ ExecuteTask.prototype.sendWriteLobRequest = function sendWriteLobRequest(cb) { if (err) { return cb(err); } - self.connection.send(request.writeLob({ + var options = { writeLobRequest: buffer - }), cb); + }; + self.connection.send(request.writeLob(options), options, cb); }); }; ExecuteTask.prototype.sendCommit = function sendCommit(cb) { var self = this; - self.connection.send(request.commit({ + var options = { holdCursorsOverCommit: self.holdCursorsOverCommit - }), cb); + }; + self.connection.send(request.commit(options), options, cb); }; ExecuteTask.prototype.sendRollback = function sendRollback(cb) { var self = this; - self.connection.send(request.rollback({ + var options = { holdCursorsOverCommit: self.holdCursorsOverCommit - }), cb); + }; + self.connection.send(request.rollback(options), options, cb); }; function createInvalidFunctionCodeError() { diff --git a/test/acceptance/db.Events.js b/test/acceptance/db.Events.js index 03650e5..6960b5c 100644 --- a/test/acceptance/db.Events.js +++ b/test/acceptance/db.Events.js @@ -14,6 +14,8 @@ 'use strict'; var db = require('../db')(); +var RemoteDB = require('../db/RemoteDB'); +var describeRemoteDB = db instanceof RemoteDB ? describe : describe.skip; describe('db', function () { @@ -59,4 +61,52 @@ describe('db', function () { }); }); + describeRemoteDB('isValid', function () { + before(db.init.bind(db)); + after(function (done) { + if (client.readyState !== 'closed') { + db.end(done); + } else { + done(); + } + }); + var client = db.client; + + it('should be valid when connected', function (done) { + client.isValid(0, function (err, ret) { // no timeout + if (err) done(err); + ret.should.be.true(); + done(); + }) + }); + + it('should be valid with timeout when connected', function (done) { + client.isValid(1, function (err, ret) { // 1 second timeout + if (err) done(err); + ret.should.be.true(); + done(); + }); + }); + + it('should be invalid when disconnected', function (done) { + client.exec('SELECT CURRENT_CONNECTION FROM DUMMY', function (err, res) { + var connId = res[0].CURRENT_CONNECTION; + var adminDB = require('../db')(); + adminDB.init(function (err) { + if (err) done(err); + var adminClient = adminDB.client; + var disconnectSQL = "ALTER SYSTEM DISCONNECT SESSION '" + connId + "'"; + adminClient.exec(disconnectSQL, function (err) { + if (err) done(err); + client.isValid(0, function (err, ret) { // disconnected + if (err) done(err); + ret.should.be.false(); + adminDB.end(done); + }); + }); + }); + }); + }); + }); + }); diff --git a/test/hdb.Client.js b/test/hdb.Client.js index 705388c..8d6a88e 100644 --- a/test/hdb.Client.js +++ b/test/hdb.Client.js @@ -173,7 +173,7 @@ describe('hdb', function () { client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(true); client._connection.getClientInfo().getProperty("SESSVAR1").should.equal("TESTVAR1"); client._connection.getClientInfo().getProperty("SESSVAR2").should.equal("TESTVAR2"); - client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), null); + client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), {}, null); client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(false); client._connection.getClientInfo().getProperty("SESSVAR1").should.equal("TESTVAR1"); client._connection.getClientInfo().getProperty("SESSVAR2").should.equal("TESTVAR2"); @@ -188,14 +188,14 @@ describe('hdb', function () { client.setClientInfo("VARKEY1", "VARVAL1"); client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(true); client._connection.getClientInfo().getProperty("VARKEY1").should.equal("VARVAL1"); - client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), null); + client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), {}, null); client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(false); client.setClientInfo("VARKEY2", "VARVAL2"); client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(true); client._connection.getClientInfo().getProperty("VARKEY1").should.equal("VARVAL1"); client._connection.getClientInfo().getProperty("VARKEY2").should.equal("VARVAL2"); - client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), null); + client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), {}, null); client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(false); done(); }); @@ -211,7 +211,7 @@ describe('hdb', function () { } var sendCount = 0; - var mock_send = function(data, cb) { + var mock_send = function(data, options, cb) { sendCount += 1; if (sendCount === 1) { data.parts.forEach(function(part) { @@ -263,7 +263,7 @@ describe('hdb', function () { } var sendCount = 0; - var mock_send = function(data, cb) { + var mock_send = function(data, options, cb) { sendCount += 1; if (sendCount === 1) { data.parts.forEach(function(part) { @@ -313,7 +313,7 @@ describe('hdb', function () { } var sendCount = 0; - var mock_send = function(data, cb) { + var mock_send = function(data, options, cb) { sendCount += 1; if (sendCount === 1) { cb(undefined, mock_auth_reply); @@ -1104,7 +1104,7 @@ describe('hdb', function () { { name: 3, type: 3, value: 30041 } ] }; - var mock_send = function (data, cb) { + var mock_send = function (data, options, cb) { ++sendCount; if (sendCount == 1) { cb(new Error(), reply1); @@ -1224,7 +1224,7 @@ describe('hdb', function () { connectOptions: [] }; - var mock_send = function (data, cb) { + var mock_send = function (data, options, cb) { ++sendCount; if (sendCount == 1) { cb(undefined, reply1); diff --git a/test/lib.Connection.js b/test/lib.Connection.js index 3754d56..81ae210 100644 --- a/test/lib.Connection.js +++ b/test/lib.Connection.js @@ -46,7 +46,7 @@ function getAuthenticationPart(req) { }).shift().args; } -function sendAuthenticationRequest(req, done) { +function sendAuthenticationRequest(req, options, done) { var reply = { authentication: getAuthenticationPart(req) }; @@ -77,7 +77,7 @@ describe('Lib', function () { connection.open({}, function () { connection.getClientInfo().setProperty('LOCALE', 'en_US'); connection.getClientInfo().shouldSend(MessageType.EXECUTE).should.eql(true); - connection.send(new lib.request.Segment(MessageType.EXECUTE), null); + connection.send(new lib.request.Segment(MessageType.EXECUTE), {}, null); connection.getClientInfo().shouldSend(MessageType.EXECUTE).should.eql(false); connection.getClientInfo().getProperty('LOCALE').should.eql('en_US'); done(); @@ -221,7 +221,7 @@ describe('Lib', function () { it('should destroy socket after disconnect', function (done) { var connection = createConnection(); - connection.enqueue = function enqueue(msg, cb) { + connection.enqueue = function enqueue(msg, options, cb) { msg.type.should.equal(MessageType.DISCONNECT); setImmediate(function () { cb(); @@ -405,7 +405,8 @@ describe('Lib', function () { it('should report error in enqueue when connection is invalid', function (done) { var connection = createConnection(); connection._queue.pause(); - connection.enqueue(function firstTask() { }, function (err) { + var options = {}; + connection.enqueue(function firstTask() { }, options, function (err) { err.code.should.equal('EHDBCLOSE'); done(); }); @@ -413,7 +414,7 @@ describe('Lib', function () { it('should rollback a transaction', function () { var connection = createConnection(); - connection.enqueue = function enqueue(msg, done) { + connection.enqueue = function enqueue(msg, options, done) { done(msg); }; @@ -426,7 +427,7 @@ describe('Lib', function () { it('should commit a transaction', function () { var connection = createConnection(); - connection.enqueue = function enqueue(msg, done) { + connection.enqueue = function enqueue(msg, options, done) { done(msg); }; @@ -439,7 +440,7 @@ describe('Lib', function () { it('should execute a statement', function () { var connection = createConnection(); - connection.enqueue = function enqueue(msg, done) { + connection.enqueue = function enqueue(msg, options, done) { done(msg); }; @@ -457,7 +458,7 @@ describe('Lib', function () { connection._socket = { readyState: 'open' }; - connection.send = function (msg, cb) { + connection.send = function (msg, options, cb) { var segment = new lib.reply.Segment(segmentData.kind, segmentData.functionCode); var part = segmentData.parts[0]; segment.push(new lib.reply.Part(part.kind, part.attributes, part.argumentCount, part.buffer)); @@ -481,7 +482,7 @@ describe('Lib', function () { connection._socket = { readyState: 'open' }; - connection.send = function (msg, cb) { + connection.send = function (msg, options, cb) { cb(new Error('Request was not successful')); }; @@ -577,7 +578,7 @@ describe('Lib', function () { it('should fail to disconnect from the database', function (done) { var connection = createConnection(); var error = new Error('DISCONNECT_ERROR'); - connection.enqueue = function enqueue(msg, cb) { + connection.enqueue = function enqueue(msg, options, cb) { msg.type.should.equal(MessageType.DISCONNECT); setImmediate(function () { cb(error); @@ -617,6 +618,59 @@ describe('Lib', function () { }) }); + it('should receive the correct packet after timeout', function (done) { + var connection = createConnection(); + connection._parseReplySegment = function parseReplySegment(reply) { + return reply; + } + + var messageHeader = Buffer.from( + '0000000000000000' + // Session id + '00000000' + // Packet count + '01000000' + // Varpart length + '21000000' + // Varpart size + '0100' + // Number of segments + '00000000000000000000', // Extra options + 'hex'); + // Note that for simplicity the packet body is not valid + var firstReply = Buffer.concat([messageHeader, Buffer.from('01', 'hex')]); + messageHeader[8] = 1; // Update packet count + var secondReply = Buffer.concat([messageHeader, Buffer.from('02', 'hex')]); + + connection.open({}, function (err) { + (!!err).should.be.not.ok; + + // Overwrite MockSocket write + var firstWrite = true; + connection._socket.write = function write() { + if (firstWrite) { + firstWrite = false; + } else { + var self = this; + setImmediate(function () { + self.emit('data', firstReply); + self.emit('data', secondReply); + }); + } + } + + // Mimic connect + connection._queue.resume(); + connection._state.sessionId = 0; + + // 1 ms timeout + connection.isValid(1, function (err, ret) { + (!!err).should.be.not.ok; + ret.should.be.false(); // MockSocket does not write back yet + connection.executeDirect({ command: 'second sql' }, function (err, reply) { + if (err) done(err); + reply.should.eql(Buffer.from('02', 'hex')); + done(); + }); + }); + }); + }); + context('cesu-8 support', function() { it('should create a connection with a useCesu8 set correctly', function () { diff --git a/test/lib.ExecuteTask.js b/test/lib.ExecuteTask.js index c8074ad..e59bc6d 100644 --- a/test/lib.ExecuteTask.js +++ b/test/lib.ExecuteTask.js @@ -647,7 +647,7 @@ function Connection(size, sizeForLobs, replies) { this.useCesu8 = true; } -Connection.prototype.send = function (msg, cb) { +Connection.prototype.send = function (msg, options, cb) { var reply = this.replies.shift(); msg.type.should.equal(reply.type); if (typeof reply.checkMessage === 'function') { From d532e30d11d24a057d072ab17baa7ecffe745b6a Mon Sep 17 00:00:00 2001 From: he-is-harry Date: Mon, 21 Apr 2025 11:12:00 -0400 Subject: [PATCH 2/2] Added communicationTimeout options - Added the connect option communicationTimeout which indicates in milliseconds how long the client will wait for a response to individual packet requests before timing out and producing an error - The connect option will affect all packet requests to the server including packets sent when connecting, fetches, write lobs, commits and rollbacks - Note that although the client may timeout, the packet can still reach the server, i.e. commit packets can commit on the server despite a timeout on the client side - Added the communicationTimeout to the options field of Client.exec, Client.execute, Client.prepare, Statement.exec and Statement.execute - Will timeout packet requests associated with the function after the given milliseconds - Updated the unit and integration tests to test and include the communicationTimeout option --- lib/Client.js | 6 +- lib/protocol/Connection.js | 21 ++- lib/protocol/ExecuteTask.js | 7 +- lib/protocol/Statement.js | 3 +- test/acceptance/db.Events.js | 255 +++++++++++++++++++++++++++++++++++ test/hdb.Client.js | 11 +- test/lib.Connection.js | 18 +++ test/lib.Statement.js | 1 + 8 files changed, 311 insertions(+), 11 deletions(-) diff --git a/lib/Client.js b/lib/Client.js index 547083a..2e7b2ec 100644 --- a/lib/Client.js +++ b/lib/Client.js @@ -268,7 +268,8 @@ Client.prototype.isValid = function isValid(timeout, cb) { Client.prototype._execute = function _execute(command, options, cb) { var result = this._createResult(this._connection, options); this._connection.executeDirect({ - command: command + command: command, + communicationTimeout: options.communicationTimeout }, function handleReply(err, reply) { result.handle(err, reply, cb); }); @@ -278,7 +279,8 @@ Client.prototype._execute = function _execute(command, options, cb) { Client.prototype._prepare = function _prepare(command, options, cb) { var statement = this._createStatement(this._connection, options); this._connection.prepare({ - command: command + command: command, + communicationTimeout: options.communicationTimeout }, function handleReply(err, reply) { statement.handle(err, reply, cb); }); diff --git a/lib/protocol/Connection.js b/lib/protocol/Connection.js index 2cfd87e..db7162f 100644 --- a/lib/protocol/Connection.js +++ b/lib/protocol/Connection.js @@ -369,12 +369,19 @@ Connection.prototype.send = function send(message, options, receive) { this._socket.write(packet); } var self = this; + var communicationTimeout; function onTimeout() { self._state.receive = undefined; - receive(new Error('Socket receive timeout (receive took longer than ' + options.communicationTimeout + ' ms)')); + var err = new Error('Socket receive timeout (receive took longer than ' + communicationTimeout + ' ms)'); + err.code = 'EHDBTIMEOUT'; + receive(err); } if (options.communicationTimeout) { - state.timeoutObject = setTimeout(onTimeout, options.communicationTimeout); + communicationTimeout = options.communicationTimeout; + state.timeoutObject = setTimeout(onTimeout, communicationTimeout); + } else if (this._settings.communicationTimeout) { + communicationTimeout = this._settings.communicationTimeout; + state.timeoutObject = setTimeout(onTimeout, communicationTimeout); } }; @@ -529,6 +536,12 @@ Connection.prototype.connect = function connect(options, cb) { function connReceive(err, reply) { if (err) { + if (err.code === 'EHDBTIMEOUT') { + // With connect timeouts, the connection should be reset to a closed state + self.destroy(); + // Convert to connect timeout to match hana-client + return cb(new Error("Connect failed (connect timeout expired)")); + } return cb(err); } if (Array.isArray(reply.connectOptions)) { @@ -545,6 +558,10 @@ Connection.prototype.connect = function connect(options, cb) { function authReceive(err, reply) { if (err) { + if (err.code === 'EHDBTIMEOUT') { + self.destroy(); + return cb(new Error("Connect failed (connect timeout expired)")); + } return cb(err, reply); } manager.initialize(reply.authentication, function(err) { diff --git a/lib/protocol/ExecuteTask.js b/lib/protocol/ExecuteTask.js index 163272f..4279abb 100644 --- a/lib/protocol/ExecuteTask.js +++ b/lib/protocol/ExecuteTask.js @@ -30,6 +30,7 @@ function ExecuteTask(connection, options, callback) { this.scrollableCursor = options.scrollableCursor; this.statementId = options.statementId; this.functionCode = options.functionCode; + this.communicationTimeout = options.communicationTimeout; this.writer = new Writer(options.parameters.types, connection.useCesu8, connection.spatialTypes); var values = options.parameters.values; if (values.length && Array.isArray(values[0])) { @@ -209,7 +210,8 @@ ExecuteTask.prototype.sendExecute = function sendExecute(cb) { scrollableCursor: self.scrollableCursor, statementId: self.statementId, parameters: parameters, - useCesu8: self.connection.useCesu8 + useCesu8: self.connection.useCesu8, + communicationTimeout: self.communicationTimeout }; self.connection.send(request.execute(options), options, cb); }); @@ -223,7 +225,8 @@ ExecuteTask.prototype.sendWriteLobRequest = function sendWriteLobRequest(cb) { return cb(err); } var options = { - writeLobRequest: buffer + writeLobRequest: buffer, + communicationTimeout: self.communicationTimeout }; self.connection.send(request.writeLob(options), options, cb); }); diff --git a/lib/protocol/Statement.js b/lib/protocol/Statement.js index b16e7d4..9a83e2d 100644 --- a/lib/protocol/Statement.js +++ b/lib/protocol/Statement.js @@ -148,7 +148,8 @@ Statement.prototype._execute = function _execute(values, options, cb) { this._connection.execute({ functionCode: this.functionCode, statementId: this.id, - parameters: inputParams + parameters: inputParams, + communicationTimeout: options.communicationTimeout }, function handleReply(err, reply) { result.handle(err, reply, cb); }); diff --git a/test/acceptance/db.Events.js b/test/acceptance/db.Events.js index 6960b5c..f9ca75e 100644 --- a/test/acceptance/db.Events.js +++ b/test/acceptance/db.Events.js @@ -13,9 +13,12 @@ // language governing permissions and limitations under the License. 'use strict'; +var fs = require('fs'); +var path = require('path'); var db = require('../db')(); var RemoteDB = require('../db/RemoteDB'); var describeRemoteDB = db instanceof RemoteDB ? describe : describe.skip; +var isRemoteDB = db instanceof RemoteDB; describe('db', function () { @@ -109,4 +112,256 @@ describe('db', function () { }); }); + describeRemoteDB('communicationTimeout', function () { + var client = db.client; + + describeRemoteDB('no timeout', function () { + describeRemoteDB('execute option', function () { + before(db.init.bind(db)); + after(db.end.bind(db)); + + it('should client exec with no timeout set', function (done) { + client.exec('SELECT * FROM DUMMY', {communicationTimeout: 0}, function (err, rows) { + if (err) done(err); + rows.should.have.length(1); + done(); + }); + }); + + it('should prepare and exec with a timeout set', function (done) { + client.prepare('SELECT * FROM DUMMY', {communicationTimeout: 2000}, function (err, stmt) { + if (err) done(err); + stmt.exec([], {communicationTimeout: 2000}, function (err, rows) { + if (err) done(err); + rows.should.have.length(1); + done(); + }); + }); + }); + }); + + describeRemoteDB('connect option', function () { + before(function (done) { + client.set('communicationTimeout', 2000); + db.init(done); + }); + after(db.end.bind(db)); + + it('should prepare and execute with a timeout set', function (done) { + client.prepare('SELECT 1 AS A FROM DUMMY', function (err, stmt) { + if (err) done(err); + stmt.execute([], function (err, rs) { + if (err) done(err); + rs.fetch(function (err, rows) { + if (err) done(err); + rows.should.eql([{A: 1}]) + done(); + }); + }); + }); + }); + }); + }); + + describeRemoteDB('timeout', function () { + // Setup a db connection which will delay packet sends when delayCountdown equals 0. + // When delayCountdown < 0, there is no delay. + var delayCountdown = -1; + var delayedDB, delayedClient; + + // When the client disconnects, reconnecting will create a new connection which will + // overwrite the _connect we made to delay, so we have to make a new client + function createDelayedDB(packetDelay) { + delayedDB = require('../db')(); + delayedClient = delayedDB.client; + var originalConnect = delayedClient._connection._connect; + delayedClient._connection._connect = function (options, cb) { + var socket = originalConnect(options, cb); + var originalWrite = socket.write; + + socket.write = function (data) { + if (delayCountdown == 0) { + setTimeout(function () { + originalWrite.call(socket, data); + }, packetDelay); + } else { + if (delayCountdown > 0) { + delayCountdown--; + } + originalWrite.call(socket, data); + } + } + return socket; + } + } + + // Since delayedDB is undefined to begin, we encapsulate the end in a function + function endDelayedDB(done) { + delayedDB.end(done); + } + + // Tests that a timeout error was raised, currently timeouts are 25ms + function validateTimeout(timeout, done) { + // Return a closure which is the callback to check the timeout error + return function (err) { + err.should.be.an.instanceOf(Error); + err.message.should.equal("Socket receive timeout (receive took longer than " + timeout + " ms)"); + done(); + } + } + + // Reset the delay countdown after each test + afterEach(function () { + delayCountdown = -1; + }); + + it('should timeout a connect', function (done) { + createDelayedDB(1000); // 1000ms packet delay + // Initialization request timeout is set by initializationTimeout, communicationTimeout + // will trigger a timeout in the authentication packet and connect packet (3rd packet) + // Here we test the timeout case on the 3rd packet, unit test will test 2nd packet + delayCountdown = 2; + delayedClient.set('communicationTimeout', 500); + delayedDB.init(function (err) { // Connect errors are overwritten to match hana-client + err.should.be.an.instanceOf(Error); + err.message.should.equal("Connect failed (connect timeout expired)"); + delayedClient.readyState.should.equal('closed'); + done(); + }); + }); + + describeRemoteDB('execute option', function () { + before(function (done) { + createDelayedDB(120); + delayedClient.set('communicationTimeout', 2000); + delayedDB.init(done); + }); + after(endDelayedDB); + + + it('should timeout a client exec', function (done) { + delayCountdown = 0; // Delay immediately + delayedClient.exec('SELECT * FROM DUMMY', {communicationTimeout: 100}, validateTimeout(100, done)); + }); + + it('should timeout a client execute', function (done) { + delayCountdown = 0; + delayedClient.execute('SELECT * FROM DUMMY', {communicationTimeout: 100}, validateTimeout(100, done)); + }); + + it('should timeout a prepare', function (done) { + delayCountdown = 0; + delayedClient.prepare('SELECT * FROM DUMMY', {communicationTimeout: 100}, validateTimeout(100, done)); + }); + + it('should timeout a statement exec', function (done) { + delayedClient.prepare('SELECT * FROM DUMMY', function (err, stmt) { + if (err) done(err); + delayCountdown = 0; + stmt.exec([], {communicationTimeout: 100}, validateTimeout(100, done)); + }); + }); + + it('should timeout a statement execute', function (done) { + delayedClient.prepare('SELECT * FROM DUMMY', function (err, stmt) { + if (err) done(err); + delayCountdown = 0; + stmt.execute([], {communicationTimeout: 100}, validateTimeout(100, done)); + }); + }); + + it('should give the correct query result after a timeout', function (done) { + delayedClient.exec('SELECT 1 AS A FROM DUMMY', {communicationTimeout: 25}, validateTimeout(25, function () { + setTimeout(function () { + delayedClient.exec('SELECT 2 AS A FROM DUMMY', function (err, rows) { + if (err) done(err); + rows.should.eql([{A: 2}]); + done(); + }); + }, 25); // Wait the rest of the time so that the first request is sent + })); + }); + }); + + describeRemoteDB('write lob', function () { + before(function (done) { + if (isRemoteDB) { + createDelayedDB(300); + delayedClient.set('communicationTimeout', 2000); + delayedDB.init(function (err) { + if (err) done(err); + delayedDB.createTable.bind(delayedDB)('BLOB_TABLE', ['A BLOB'], null, done); + }); + } else { + done(); + } + }); + after(function (done) { + if (isRemoteDB) { + delayedDB.dropTable.bind(delayedDB)('BLOB_TABLE', function () { + endDelayedDB(done); + }); + } else { + done(); + } + }); + + var dirname = path.join(__dirname, '..', 'fixtures', 'img'); + + function testWriteLob(lobPacketDelay, done) { + delayedClient.prepare('INSERT INTO BLOB_TABLE VALUES (?)', function (err, stmt) { + if (err) done(err); + delayCountdown = lobPacketDelay; + stmt.execute([fs.createReadStream(path.join(dirname, 'lobby.jpg'))], + {communicationTimeout: 250}, validateTimeout(250, function () { + delayCountdown = -1; + delayedClient.rollback(done); + })); + }); + } + + it('should timeout a lob initial execute request', function (done) { + testWriteLob(0, done); + }); + + it('should timeout a write lob request', function (done) { + // Wait for first execute packet to be sent before delaying write lob + testWriteLob(1, done); + }); + }); + + describeRemoteDB('connect option', function () { + before(function (done) { + createDelayedDB(1000); + // Packets should receive replies within 750ms or these test can fail. + // The timeout can be increased but the packetDelay passed into createDelayedDB + // in the line above should also be increased to a number at least as large. + // 750ms was chosen to prevent this test from waiting too long for the timeout + delayedClient.set('communicationTimeout', 750); + delayedDB.init(done); + }); + after(function (done) { + endDelayedDB(done); + }); + + it('should timeout a client exec', function (done) { + delayCountdown = 0; + delayedClient.exec('SELECT * FROM DUMMY', validateTimeout(750, done)); + }); + + it('should timeout fetch packets', function (done) { + delayedClient.execute('SELECT TOP 33 * FROM OBJECTS', function (err, rs) { + if (err) done(err); + delayCountdown = 0; + rs.fetch(validateTimeout(750, done)); + }); + }); + + it('should timeout commit packets', function (done) { + delayCountdown = 0; + delayedClient.commit(validateTimeout(750, done)); + }) + }); + }); + }); }); diff --git a/test/hdb.Client.js b/test/hdb.Client.js index 8d6a88e..ad560df 100644 --- a/test/hdb.Client.js +++ b/test/hdb.Client.js @@ -630,7 +630,8 @@ describe('hdb', function () { client.exec('sql', options, function (err, reply) { should(err === null).be.ok; client._connection.options.should.eql({ - command: 'sql' + command: 'sql', + communicationTimeout: undefined }); reply.should.equal(connection.replies.executeDirect); done(); @@ -661,7 +662,8 @@ describe('hdb', function () { }, function (err, reply) { should(err === null).be.ok; connection.options.should.eql({ - command: command + command: command, + communicationTimeout: undefined }); client._result.options.autoFetch.should.be.true; reply.should.equal(connection.replies.executeDirect); @@ -684,7 +686,8 @@ describe('hdb', function () { var client = new TestClient(); var connection = client._connection; var options = { - command: 'sql' + command: 'sql', + communicationTimeout: undefined }; client.prepare(options.command, function (err, statement) { should(err === null).be.ok; @@ -703,7 +706,7 @@ describe('hdb', function () { }; client.prepare(options, function (err, statement) { (err === null).should.be.ok; - connection.options.should.eql(options); + connection.options.should.eql(util.extend({communicationTimeout: undefined}, options)); statement.parameterMetadata.should.equal('parameterMetadata'); statement.resultSetMetadata.should.equal('metadata'); done(); diff --git a/test/lib.Connection.js b/test/lib.Connection.js index 81ae210..fb836b3 100644 --- a/test/lib.Connection.js +++ b/test/lib.Connection.js @@ -671,6 +671,24 @@ describe('Lib', function () { }); }); + it('should result in a closed connection readyState after timeout', function (done) { + var connection = createConnection(); + + connection.open({}, function (err) { + (!!err).should.be.not.ok; + + // Overwrite MockSocket write + connection._socket.write = function write() { /* Don't send back a packet */ } + + connection.connect({ user: 'user', password: 'pass', communicationTimeout: 1 }, function (err) { + err.should.be.instanceOf(Error); + err.message.should.equal("Connect failed (connect timeout expired)"); + connection.readyState.should.equal('closed'); + done(); + }); + }); + }); + context('cesu-8 support', function() { it('should create a connection with a useCesu8 set correctly', function () { diff --git a/test/lib.Statement.js b/test/lib.Statement.js index 4719c3e..abe9d2e 100644 --- a/test/lib.Statement.js +++ b/test/lib.Statement.js @@ -65,6 +65,7 @@ describe('Lib', function () { var values = [1]; statement.execute(values, function (err, rowsAffected) { connection.options.should.eql({ + communicationTimeout: undefined, functionCode: statement.functionCode, statementId: statement.id, parameters: {