This commit is contained in:
VincentChanX
2017-04-06 10:21:58 +08:00
parent 8d31c899b1
commit 564e1ce2d0

View File

@@ -130,7 +130,8 @@ TCPRelay.prototype.init = function() {
server = self.server = new WebSocket.Server({ server = self.server = new WebSocket.Server({
host: address, host: address,
port: port, port: port,
verifyClient: false perMessageDeflate: false,
backlog: MAX_CONNECTIONS
}); });
server.on('connection', function(connection) { server.on('connection', function(connection) {
return self.handleConnectionByServer(connection); return self.handleConnectionByServer(connection);
@@ -160,12 +161,12 @@ TCPRelay.prototype.handleConnectionByServer = function(connection) {
var connectionId = (globalConnectionId++) % MAX_CONNECTIONS; var connectionId = (globalConnectionId++) % MAX_CONNECTIONS;
var targetConnection, addressHeader; var targetConnection, addressHeader;
var canWriteToLocalConnection = true; var dataCache = [];
logger.info(`accept connection from local[${connectionId}]`); logger.info(`[${connectionId}]: accept connection from local`);
connection.on('message', function(data) { connection.on('message', function(data) {
data = encryptor.decrypt(data); data = encryptor.decrypt(data);
logger.info(`read data[length = ${data.length}] from local connection[${connectionId}] at stage[${STAGE[stage]}]`); logger.info(`[${connectionId}]: read data[length = ${data.length}] from local connection at stage[${STAGE[stage]}]`);
switch (stage) { switch (stage) {
@@ -180,70 +181,74 @@ TCPRelay.prototype.handleConnectionByServer = function(connection) {
return connection.close(); return connection.close();
} }
logger.info(`connecting to ${addressHeader.dstAddr}:${addressHeader.dstPort}`); logger.info(`[${connectionId}]: connecting to ${addressHeader.dstAddr}:${addressHeader.dstPort}`);
stage = STAGE_CONNECTING; stage = STAGE_CONNECTING;
connection.pause();
targetConnection = net.createConnection({ targetConnection = net.createConnection({
port: addressHeader.dstPort, port: addressHeader.dstPort,
host: addressHeader.dstAddr, host: addressHeader.dstAddr,
allowHalfOpen: true allowHalfOpen: true
}, function() { }, function() {
logger.info(`connecting to target[${connectionId}]`); logger.info(`[${connectionId}]: connecting to target`);
connection.resume();
dataCache = Buffer.concat(dataCache);
targetConnection.write(dataCache, function() {
logger.info(`[${connectionId}]: write data[length = ${dataCache.length}] to target connection`);
dataCache = null;
});
stage = STAGE_STREAM; stage = STAGE_STREAM;
}); });
targetConnection.on('data', function(data) { targetConnection.on('data', function(data) {
logger.info(`read data[length = ${data.length}] from target connection[${connectionId}]`); logger.info(`[${connectionId}]: read data[length = ${data.length}] from target connection`);
canWriteToLocalConnection && connection.send(encryptor.encrypt(data), { if (connection.readyState == WebSocket.OPEN) {
connection.send(encryptor.encrypt(data), {
binary: true binary: true
}, function() { }, function() {
logger.info(`write data[length = ${data.length}] to local connection[${connectionId}]`); logger.info(`[${connectionId}]: write data[length = ${data.length}] to local connection`);
}); });
}
}); });
targetConnection.setKeepAlive(true, 5000); targetConnection.setKeepAlive(true, 5000);
targetConnection.on('end', function() { targetConnection.on('end', function() {
connection.close(); connection.close();
}); });
targetConnection.on('error', function(error) { targetConnection.on('error', function(error) {
logger.error(`an error of target connection[${connectionId}] occured`, error); logger.error(`[${connectionId}]: an error of target connection occured`, error);
stage = STAGE_DESTROYED; stage = STAGE_DESTROYED;
targetConnection.destroy(); targetConnection.destroy();
connection.close(); connection.close();
}); });
if (data.length > addressHeader.headerLen) { if (data.length > addressHeader.headerLen) {
connection.pause(); dataCache.push(data.slice(addressHeader.headerLen));
targetConnection.write(data.slice(addressHeader.headerLen), function() {
connection.resume();
});
} }
break; break;
case STAGE_CONNECTING:
dataCache.push(data);
break;
case STAGE_STREAM: case STAGE_STREAM:
connection.pause(); targetConnection.write(data, function() {
canWriteToLocalConnection && targetConnection.write(data, function() { logger.info(`[${connectionId}]: write data[length = ${data.length}] to target connection`);
logger.info(`write data[length = ${data.length}] to target connection[${connectionId}]`);
connection.resume();
}); });
break; break;
} }
}); });
connection.on('ping', function() {
return connection.pong('', false, true);
});
connection.on('close', function(hadError) { connection.on('close', function(hadError) {
logger.info(`close event[had error = ${hadError}] of connection[${connectionId}] has been triggered`); logger.info(`[${connectionId}]: close event[had error = ${hadError}] of connection has been triggered`);
canWriteToLocalConnection = false;
}); });
connection.on('error', function(error) { connection.on('error', function(error) {
logger.error(`an error of connection[${connectionId}] occured`, error); logger.error(`[${connectionId}]: an error of connection occured`, error);
connection.terminate(); connection.terminate();
canWriteToLocalConnection = false;
targetConnection && targetConnection.end(); targetConnection && targetConnection.end();
}); });
} }
//local //local
TCPRelay.prototype.handleConnectionByLocal = function(connection) { TCPRelay.prototype.handleConnectionByLocal = function(connection) {
var self = this; var self = this;
@@ -258,14 +263,15 @@ TCPRelay.prototype.handleConnectionByLocal = function(connection) {
var stage = STAGE_INIT; var stage = STAGE_INIT;
var connectionId = (globalConnectionId++) % MAX_CONNECTIONS; var connectionId = (globalConnectionId++) % MAX_CONNECTIONS;
var serverConnection, cmd, addressHeader; var serverConnection, cmd, addressHeader, ping;
var canWriteToLocalConnection = true; var canWriteToLocalConnection = true;
var dataCache = [];
logger.info(`accept connection from client[${connectionId}]`); logger.info(`[${connectionId}]: accept connection from client`);
connection.setKeepAlive(true, 10000); connection.setKeepAlive(true, 10000);
connection.on('data', function(data) { connection.on('data', function(data) {
logger.info(`read data[length = ${data.length}] from client connection[${connectionId}] at stage[${STAGE[stage]}]`); logger.info(`[${connectionId}]: read data[length = ${data.length}] from client connection at stage[${STAGE[stage]}]`);
switch (stage) { switch (stage) {
case STAGE_INIT: case STAGE_INIT:
@@ -291,66 +297,84 @@ TCPRelay.prototype.handleConnectionByLocal = function(connection) {
//only supports connect cmd //only supports connect cmd
if (cmd != CMD_CONNECT) { if (cmd != CMD_CONNECT) {
logger.error('only supports connect cmd'); logger.error('[${connectionId}]: only supports connect cmd');
stage = STAGE_DESTROYED; stage = STAGE_DESTROYED;
return connection.end("\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00"); return connection.end("\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00");
} }
logger.info(`connecting to ${addressHeader.dstAddr}:${addressHeader.dstPort}`); logger.info(`[${connectionId}]: connecting to ${addressHeader.dstAddr}:${addressHeader.dstPort}`);
connection.write("\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00"); connection.write("\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00");
stage = STAGE_CONNECTING; stage = STAGE_CONNECTING;
connection.pause();
serverConnection = new WebSocket('ws://' + serverAddress + ':' + serverPort, { serverConnection = new WebSocket('ws://' + serverAddress + ':' + serverPort, {
perMessageDeflate: false perMessageDeflate: false
}); });
serverConnection.on('open', function() { serverConnection.on('open', function() {
logger.info(`connecting to websocket server[${connectionId}]`); logger.info(`[${connectionId}]: connecting to websocket server`);
serverConnection.send(encryptor.encrypt(data.slice(3)), function() { serverConnection.send(encryptor.encrypt(data.slice(3)), function() {
stage = STAGE_STREAM; stage = STAGE_STREAM;
connection.resume(); dataCache = Buffer.concat(dataCache);
serverConnection.send(encryptor.encrypt(dataCache), {
binary: true
}, function() {
logger.info(`[${connectionId}]: write data[length = ${dataCache.length}] to client connection`);
dataCache = null;
}); });
}); });
ping = setInterval(function() {
serverConnection.ping('', false, true);
}, 30000);
});
serverConnection.on('message', function(data) { serverConnection.on('message', function(data) {
logger.info(`read data[length = ${data.length}] from websocket server connection[${connectionId}]`); logger.info(`[${connectionId}]: read data[length = ${data.length}] from websocket server connection`);
canWriteToLocalConnection && connection.write(encryptor.decrypt(data), function() { canWriteToLocalConnection && connection.write(encryptor.decrypt(data), function() {
logger.info(`write data[length = ${data.length}] to client connection[${connectionId}]`); logger.info(`[${connectionId}]: write data[length = ${data.length}] to client connection`);
}); });
}); });
serverConnection.on('error', function(error) { serverConnection.on('error', function(error) {
logger.error(`an error of server connection[${connectionId}] occured`, error); ping && clearInterval(ping);
logger.error(`[${connectionId}]: an error of server connection occured`, error);
stage = STAGE_DESTROYED; stage = STAGE_DESTROYED;
connection.end(); connection.end();
}); });
serverConnection.on('close', function() { serverConnection.on('close', function() {
logger.info(`[${connectionId}]: server connection isclosed`);
ping && clearInterval(ping);
stage = STAGE_DESTROYED; stage = STAGE_DESTROYED;
connection.end(); connection.end();
}); });
if (data.length > addressHeader.headerLen + 3) {
dataCache.push(data.slice(addressHeader.headerLen + 3));
}
break;
case STAGE_CONNECTING:
dataCache.push(data);
break; break;
case STAGE_STREAM: case STAGE_STREAM:
connection.pause();
canWriteToLocalConnection && serverConnection.send(encryptor.encrypt(data), { canWriteToLocalConnection && serverConnection.send(encryptor.encrypt(data), {
binary: true binary: true
}, function() { }, function() {
logger.info(`write data[length = ${data.length}] to websocket server connection[${connectionId}]`); logger.info(`[${connectionId}]: write data[length = ${data.length}] to websocket server connection`);
connection.resume();
}); });
break; break;
} }
}); });
connection.on('end', function() { connection.on('end', function() {
stage = STAGE_DESTROYED; stage = STAGE_DESTROYED;
logger.info(`end event of client connection[$connectionId] has been triggered`); logger.info(`[${connectionId}]: end event of client connection has been triggered`);
}); });
connection.on('close', function(hadError) { connection.on('close', function(hadError) {
logger.info(`close event[had error = ${hadError}] of client connection[${connectionId}] has been triggered`); logger.info(`[${connectionId}]: close event[had error = ${hadError}] of client connection has been triggered`);
stage = STAGE_DESTROYED; stage = STAGE_DESTROYED;
canWriteToLocalConnection = false; canWriteToLocalConnection = false;
}); });
connection.on('error', function(error) { connection.on('error', function(error) {
logger.error(`an error of client connection[${connectionId}] occured`, error); logger.error(`[${connectionId}]: an error of client connection occured`, error);
stage = STAGE_DESTROYED; stage = STAGE_DESTROYED;
connection.destroy(); connection.destroy();
canWriteToLocalConnection = false; canWriteToLocalConnection = false;