start server.js in cluster mode

This commit is contained in:
VincentChanX
2017-04-07 16:04:38 +08:00
parent 0a8481de48
commit 3bf624f397
12 changed files with 1300 additions and 70 deletions

View File

@@ -1,4 +1,5 @@
const net = require('net');
const path = require('path');
const log4js = require('log4js');
const WebSocket = require('ws');
const Encryptor = require('shadowsocks/lib/shadowsocks/encrypt').Encryptor;
@@ -93,7 +94,7 @@ function parseAddressHeader(data, offset) {
};
}
function TCPRelay(config, isLocal, logLevel) {
function TCPRelay(config, isLocal) {
this.isLocal = isLocal;
this.server = null;
this.status = SERVER_STATUS_INIT;
@@ -101,46 +102,59 @@ function TCPRelay(config, isLocal, logLevel) {
if (config) {
this.config = Object.assign(this.config, config);
}
this.logger = log4js.getLogger(isLocal ? 'sslocal' : 'ssserver');
this.logger.setLevel(logLevel ? logLevel : 'error');
this.logger = null;
this.logLevel = 'error';
this.logFile = null;
this.serverName = null;
}
TCPRelay.prototype.getServerName = function() {
return this.isLocal ? 'sslocal' : 'ssserver';
};
TCPRelay.prototype.bootstrap = function() {
return this.init();
};
TCPRelay.prototype.stop = function() {
var self = this;
var connId = null;
return new Promise(function(resolve, reject) {
if (self.server) {
self.server.close(function() {
resolve();
});
for (connId in connections) {
if (connections[connId]) {
self.isLocal ? connections[connId].destroy() : connections[connId].terminate();
}
}
} else {
resolve();
}
});
};
TCPRelay.prototype.getStatus = function() {
return this.status;
};
TCPRelay.prototype.init = function() {
TCPRelay.prototype.setServerName = function(serverName) {
this.serverName = serverName;
return this;
};
TCPRelay.prototype.getServerName = function() {
if (!this.serverName) {
this.serverName = this.isLocal ? 'local' : 'server';
}
return this.serverName;
};
TCPRelay.prototype.setLogLevel = function(logLevel) {
this.logLevel = logLevel;
return this;
};
TCPRelay.prototype.getLogLevel = function() {
return this.logLevel;
};
TCPRelay.prototype.setLogFile = function(logFile) {
if (logFile && !path.isAbsolute(logFile)) {
logFile = process.cwd() + '/' + logFile;
}
this.logFile = logFile;
return this;
};
TCPRelay.prototype.getLogFile = function() {
return this.logFile;
};
TCPRelay.prototype.initLogger = function() {
if (this.logFile) {
log4js.loadAppender('file');
log4js.addAppender(log4js.appenders.file(this.logFile), this.getServerName());
}
this.logger = log4js.getLogger(this.getServerName());
this.logger.setLevel(this.logLevel);
};
TCPRelay.prototype.initServer = function() {
var self = this;
return new Promise(function(resolve, reject) {
var config = self.config;
@@ -173,7 +187,7 @@ TCPRelay.prototype.init = function() {
});
}
server.on('error', function(error) {
self.logger.error('an error of', self.getServerName(), 'occured', error);
self.logger.fatal('an error of', self.getServerName(), 'occured', error);
self.status = SERVER_STATUS_STOPPED;
reject(error);
});
@@ -207,7 +221,7 @@ TCPRelay.prototype.handleConnectionByServer = function(connection) {
connections[connectionId] = connection;
connection.on('message', function(data) {
data = encryptor.decrypt(data);
logger.info(`[${connectionId}]: read data[length = ${data.length}] from local connection at stage[${STAGE[stage]}]`);
logger.debug(`[${connectionId}]: read data[length = ${data.length}] from local connection at stage[${STAGE[stage]}]`);
switch (stage) {
@@ -234,19 +248,19 @@ TCPRelay.prototype.handleConnectionByServer = function(connection) {
dataCache = Buffer.concat(dataCache);
targetConnection.write(dataCache, function() {
logger.info(`[${connectionId}]: write data[length = ${dataCache.length}] to target connection`);
logger.debug(`[${connectionId}]: write data[length = ${dataCache.length}] to target connection`);
dataCache = null;
});
stage = STAGE_STREAM;
});
targetConnection.on('data', function(data) {
logger.info(`[${connectionId}]: read data[length = ${data.length}] from target connection`);
logger.debug(`[${connectionId}]: read data[length = ${data.length}] from target connection`);
if (connection.readyState == WebSocket.OPEN) {
connection.send(encryptor.encrypt(data), {
binary: true
}, function() {
logger.info(`[${connectionId}]: write data[length = ${data.length}] to local connection`);
logger.debug(`[${connectionId}]: write data[length = ${data.length}] to local connection`);
});
}
});
@@ -272,7 +286,7 @@ TCPRelay.prototype.handleConnectionByServer = function(connection) {
case STAGE_STREAM:
targetConnection.write(data, function() {
logger.info(`[${connectionId}]: write data[length = ${data.length}] to target connection`);
logger.debug(`[${connectionId}]: write data[length = ${data.length}] to target connection`);
});
break;
}
@@ -291,7 +305,7 @@ TCPRelay.prototype.handleConnectionByServer = function(connection) {
connections[connectionId] = null;
targetConnection && targetConnection.end();
});
}
};
//local
TCPRelay.prototype.handleConnectionByLocal = function(connection) {
@@ -316,7 +330,7 @@ TCPRelay.prototype.handleConnectionByLocal = function(connection) {
connections[connectionId] = connection;
connection.setKeepAlive(true, 10000);
connection.on('data', function(data) {
logger.info(`[${connectionId}]: read data[length = ${data.length}] from client connection at stage[${STAGE[stage]}]`);
logger.debug(`[${connectionId}]: read data[length = ${data.length}] from client connection at stage[${STAGE[stage]}]`);
switch (stage) {
case STAGE_INIT:
@@ -363,7 +377,7 @@ TCPRelay.prototype.handleConnectionByLocal = function(connection) {
serverConnection.send(encryptor.encrypt(dataCache), {
binary: true
}, function() {
logger.info(`[${connectionId}]: write data[length = ${dataCache.length}] to client connection`);
logger.debug(`[${connectionId}]: write data[length = ${dataCache.length}] to client connection`);
dataCache = null;
});
});
@@ -373,9 +387,9 @@ TCPRelay.prototype.handleConnectionByLocal = function(connection) {
}, 30000);
});
serverConnection.on('message', function(data) {
logger.info(`[${connectionId}]: read data[length = ${data.length}] from websocket server connection`);
logger.debug(`[${connectionId}]: read data[length = ${data.length}] from websocket server connection`);
canWriteToLocalConnection && connection.write(encryptor.decrypt(data), function() {
logger.info(`[${connectionId}]: write data[length = ${data.length}] to client connection`);
logger.debug(`[${connectionId}]: write data[length = ${data.length}] to client connection`);
});
});
serverConnection.on('error', function(error) {
@@ -385,7 +399,7 @@ TCPRelay.prototype.handleConnectionByLocal = function(connection) {
connection.end();
});
serverConnection.on('close', function() {
logger.info(`[${connectionId}]: server connection isclosed`);
logger.info(`[${connectionId}]: server connection is closed`);
ping && clearInterval(ping);
stage = STAGE_DESTROYED;
connection.end();
@@ -404,7 +418,7 @@ TCPRelay.prototype.handleConnectionByLocal = function(connection) {
canWriteToLocalConnection && serverConnection.send(encryptor.encrypt(data), {
binary: true
}, function() {
logger.info(`[${connectionId}]: write data[length = ${data.length}] to websocket server connection`);
logger.debug(`[${connectionId}]: write data[length = ${data.length}] to websocket server connection`);
});
break;
}
@@ -428,6 +442,33 @@ TCPRelay.prototype.handleConnectionByLocal = function(connection) {
connections[connectionId] = null;
serverConnection && serverConnection.close();
});
}
};
TCPRelay.prototype.bootstrap = function() {
this.initLogger();
return this.initServer();
};
TCPRelay.prototype.stop = function() {
var self = this;
var connId = null;
return new Promise(function(resolve, reject) {
if (self.server) {
self.server.close(function() {
resolve();
});
for (connId in connections) {
if (connections[connId]) {
self.isLocal ? connections[connId].destroy() : connections[connId].terminate();
}
}
} else {
resolve();
}
});
};
module.exports.TCPRelay = TCPRelay;