From 564e1ce2d03f9cf52f1f4bde8c09c42864da56e6 Mon Sep 17 00:00:00 2001 From: VincentChanX <710852740@qq.com> Date: Thu, 6 Apr 2017 10:21:58 +0800 Subject: [PATCH] update --- tcprelay.js | 116 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 70 insertions(+), 46 deletions(-) diff --git a/tcprelay.js b/tcprelay.js index ff12e03..455c756 100644 --- a/tcprelay.js +++ b/tcprelay.js @@ -130,7 +130,8 @@ TCPRelay.prototype.init = function() { server = self.server = new WebSocket.Server({ host: address, port: port, - verifyClient: false + perMessageDeflate: false, + backlog: MAX_CONNECTIONS }); server.on('connection', function(connection) { return self.handleConnectionByServer(connection); @@ -160,12 +161,12 @@ TCPRelay.prototype.handleConnectionByServer = function(connection) { var connectionId = (globalConnectionId++) % MAX_CONNECTIONS; var targetConnection, addressHeader; - var canWriteToLocalConnection = true; + var dataCache = []; - logger.info(`accept connection from local[${connectionId}]`); + logger.info(`[${connectionId}]: accept connection from local`); connection.on('message', function(data) { data = encryptor.decrypt(data); - logger.info(`read data[length = ${data.length}] from local connection[${connectionId}] at stage[${STAGE[stage]}]`); + logger.info(`[${connectionId}]: read data[length = ${data.length}] from local connection at stage[${STAGE[stage]}]`); switch (stage) { @@ -180,70 +181,74 @@ TCPRelay.prototype.handleConnectionByServer = function(connection) { return connection.close(); } - logger.info(`connecting to ${addressHeader.dstAddr}:${addressHeader.dstPort}`); + logger.info(`[${connectionId}]: connecting to ${addressHeader.dstAddr}:${addressHeader.dstPort}`); stage = STAGE_CONNECTING; - connection.pause(); targetConnection = net.createConnection({ port: addressHeader.dstPort, host: addressHeader.dstAddr, allowHalfOpen: true }, function() { - logger.info(`connecting to target[${connectionId}]`); - connection.resume(); + logger.info(`[${connectionId}]: connecting to target`); + + dataCache = Buffer.concat(dataCache); + targetConnection.write(dataCache, function() { + logger.info(`[${connectionId}]: write data[length = ${dataCache.length}] to target connection`); + dataCache = null; + }); stage = STAGE_STREAM; }); targetConnection.on('data', function(data) { - logger.info(`read data[length = ${data.length}] from target connection[${connectionId}]`); - canWriteToLocalConnection && connection.send(encryptor.encrypt(data), { - binary: true - }, function() { - logger.info(`write data[length = ${data.length}] to local connection[${connectionId}]`); - }); + logger.info(`[${connectionId}]: read data[length = ${data.length}] from target connection`); + if (connection.readyState == WebSocket.OPEN) { + connection.send(encryptor.encrypt(data), { + binary: true + }, function() { + logger.info(`[${connectionId}]: write data[length = ${data.length}] to local connection`); + }); + } }); targetConnection.setKeepAlive(true, 5000); targetConnection.on('end', function() { connection.close(); }); targetConnection.on('error', function(error) { - logger.error(`an error of target connection[${connectionId}] occured`, error); + logger.error(`[${connectionId}]: an error of target connection occured`, error); stage = STAGE_DESTROYED; targetConnection.destroy(); connection.close(); }); if (data.length > addressHeader.headerLen) { - connection.pause(); - targetConnection.write(data.slice(addressHeader.headerLen), function() { - connection.resume(); - }); + dataCache.push(data.slice(addressHeader.headerLen)); } break; + case STAGE_CONNECTING: + dataCache.push(data); + break; + case STAGE_STREAM: - connection.pause(); - canWriteToLocalConnection && targetConnection.write(data, function() { - logger.info(`write data[length = ${data.length}] to target connection[${connectionId}]`); - connection.resume(); + targetConnection.write(data, function() { + logger.info(`[${connectionId}]: write data[length = ${data.length}] to target connection`); }); break; } }); + connection.on('ping', function() { + return connection.pong('', false, true); + }); connection.on('close', function(hadError) { - logger.info(`close event[had error = ${hadError}] of connection[${connectionId}] has been triggered`); - canWriteToLocalConnection = false; + logger.info(`[${connectionId}]: close event[had error = ${hadError}] of connection has been triggered`); }); connection.on('error', function(error) { - logger.error(`an error of connection[${connectionId}] occured`, error); + logger.error(`[${connectionId}]: an error of connection occured`, error); connection.terminate(); - canWriteToLocalConnection = false; targetConnection && targetConnection.end(); }); } - - //local TCPRelay.prototype.handleConnectionByLocal = function(connection) { var self = this; @@ -258,14 +263,15 @@ TCPRelay.prototype.handleConnectionByLocal = function(connection) { var stage = STAGE_INIT; var connectionId = (globalConnectionId++) % MAX_CONNECTIONS; - var serverConnection, cmd, addressHeader; + var serverConnection, cmd, addressHeader, ping; var canWriteToLocalConnection = true; + var dataCache = []; - logger.info(`accept connection from client[${connectionId}]`); + logger.info(`[${connectionId}]: accept connection from client`); connection.setKeepAlive(true, 10000); connection.on('data', function(data) { - logger.info(`read data[length = ${data.length}] from client connection[${connectionId}] at stage[${STAGE[stage]}]`); + logger.info(`[${connectionId}]: read data[length = ${data.length}] from client connection at stage[${STAGE[stage]}]`); switch (stage) { case STAGE_INIT: @@ -291,66 +297,84 @@ TCPRelay.prototype.handleConnectionByLocal = function(connection) { //only supports connect cmd if (cmd != CMD_CONNECT) { - logger.error('only supports connect cmd'); + logger.error('[${connectionId}]: only supports connect cmd'); stage = STAGE_DESTROYED; return connection.end("\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00"); } - logger.info(`connecting to ${addressHeader.dstAddr}:${addressHeader.dstPort}`); + logger.info(`[${connectionId}]: connecting to ${addressHeader.dstAddr}:${addressHeader.dstPort}`); connection.write("\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00"); stage = STAGE_CONNECTING; - connection.pause(); serverConnection = new WebSocket('ws://' + serverAddress + ':' + serverPort, { perMessageDeflate: false }); serverConnection.on('open', function() { - logger.info(`connecting to websocket server[${connectionId}]`); + logger.info(`[${connectionId}]: connecting to websocket server`); serverConnection.send(encryptor.encrypt(data.slice(3)), function() { stage = STAGE_STREAM; - connection.resume(); + dataCache = Buffer.concat(dataCache); + serverConnection.send(encryptor.encrypt(dataCache), { + binary: true + }, function() { + logger.info(`[${connectionId}]: write data[length = ${dataCache.length}] to client connection`); + dataCache = null; + }); }); + + ping = setInterval(function() { + serverConnection.ping('', false, true); + }, 30000); }); serverConnection.on('message', function(data) { - logger.info(`read data[length = ${data.length}] from websocket server connection[${connectionId}]`); + logger.info(`[${connectionId}]: read data[length = ${data.length}] from websocket server connection`); canWriteToLocalConnection && connection.write(encryptor.decrypt(data), function() { - logger.info(`write data[length = ${data.length}] to client connection[${connectionId}]`); + logger.info(`[${connectionId}]: write data[length = ${data.length}] to client connection`); }); }); serverConnection.on('error', function(error) { - logger.error(`an error of server connection[${connectionId}] occured`, error); + ping && clearInterval(ping); + logger.error(`[${connectionId}]: an error of server connection occured`, error); stage = STAGE_DESTROYED; connection.end(); }); serverConnection.on('close', function() { + logger.info(`[${connectionId}]: server connection isclosed`); + ping && clearInterval(ping); stage = STAGE_DESTROYED; connection.end(); }); + + if (data.length > addressHeader.headerLen + 3) { + dataCache.push(data.slice(addressHeader.headerLen + 3)); + } + break; + + case STAGE_CONNECTING: + dataCache.push(data); break; case STAGE_STREAM: - connection.pause(); canWriteToLocalConnection && serverConnection.send(encryptor.encrypt(data), { binary: true }, function() { - logger.info(`write data[length = ${data.length}] to websocket server connection[${connectionId}]`); - connection.resume(); + logger.info(`[${connectionId}]: write data[length = ${data.length}] to websocket server connection`); }); break; } }); connection.on('end', function() { stage = STAGE_DESTROYED; - logger.info(`end event of client connection[$connectionId] has been triggered`); + logger.info(`[${connectionId}]: end event of client connection has been triggered`); }); connection.on('close', function(hadError) { - logger.info(`close event[had error = ${hadError}] of client connection[${connectionId}] has been triggered`); + logger.info(`[${connectionId}]: close event[had error = ${hadError}] of client connection has been triggered`); stage = STAGE_DESTROYED; canWriteToLocalConnection = false; }); connection.on('error', function(error) { - logger.error(`an error of client connection[${connectionId}] occured`, error); + logger.error(`[${connectionId}]: an error of client connection occured`, error); stage = STAGE_DESTROYED; connection.destroy(); canWriteToLocalConnection = false;