改变启动方式,解决多次连接报错问题

This commit is contained in:
zhangzc@shuhua.com
2017-06-21 19:03:53 +08:00
parent 896e851cf0
commit 172c55fe5c
9 changed files with 101 additions and 215 deletions

View File

@@ -4,7 +4,7 @@
<groupId>cn.94zichao</groupId>
<artifactId>server</artifactId>
<version>1.0.1-SNAPSHOT</version>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>zzcserver</name>
<url>https://github.com/mmx1960/zzc-server</url>

View File

@@ -6,7 +6,7 @@ import cn._94zichao.server.entity.SocketModel;
/**
* Created by Administrator on 2017/6/17 0017.
*/
@ZzcService("TestService")
public class TestServiceImpl implements TestService {
@Override

View File

@@ -4,12 +4,21 @@ import cn._94zichao.server.annotation.ZzcService;
import cn._94zichao.server.decoder.EndBasedDecoder;
import cn._94zichao.server.encoder.ToModelEncoder;
import cn._94zichao.server.handler.BarrierServerHandler;
import cn._94zichao.server.tcpServer.BaseServer;
import cn._94zichao.server.util.Content;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.lang.reflect.Method;
import java.util.HashMap;
@@ -18,8 +27,8 @@ import java.util.Map;
/**
* Created by Administrator on 2017/6/17 0017.
*/
public class ZzcServer implements ApplicationContextAware {
public static Map<Object,Method[]> methodsMap;
public class ZzcServer implements ApplicationContextAware,InitializingBean {
public Map<Object,Method[]> methodsMap;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String,Object> map =applicationContext.getBeansWithAnnotation(ZzcService.class);
@@ -31,16 +40,52 @@ public class ZzcServer implements ApplicationContextAware {
//拿到类下面的所有方法
Method[] methods = serviceBean.getClass().getDeclaredMethods();
methodsMap = new HashMap();
methodsMap.put(serviceBean,methods);
//启动服务器
BaseServer.create().port(9999).decoder(new EndBasedDecoder(Content.END,true)).encoder(new ToModelEncoder()).in(new BarrierServerHandler()).start();
methodsMap.put(serviceBean, methods);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public void afterPropertiesSet() throws Exception {
final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
public static void main(String[] args) {
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("spring.xml");
ch.pipeline().addLast(new EndBasedDecoder(Content.END,true));
//添加编码器
ch.pipeline().addLast(new ToModelEncoder());
//添加业务处理器
ch.pipeline().addLast(group,new BarrierServerHandler(methodsMap));
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(9999).sync(); // (7)
System.out.println(5);
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

View File

@@ -2,6 +2,7 @@ package cn._94zichao.server.decoder;
import cn._94zichao.server.util.ByteUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
@@ -12,6 +13,7 @@ import java.util.List;
* 以结束符判断一帧是否传输结束的解码器
*
**/
public class EndBasedDecoder extends ByteToMessageDecoder {
private byte end;
private boolean skip;

View File

@@ -3,6 +3,7 @@ package cn._94zichao.server.encoder;
import cn._94zichao.server.util.ByteUtil;
import cn._94zichao.server.util.Content;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
@@ -10,6 +11,7 @@ import io.netty.handler.codec.MessageToByteEncoder;
* Created by zzc on 2017/5/16.
*
*/
public class ToModelEncoder extends MessageToByteEncoder<byte[]> {
/**
* Encode a message into a {@link ByteBuf}. This method will be called for each written message that can be handled

View File

@@ -1,6 +1,7 @@
package cn._94zichao.server.factory;
import cn._94zichao.server.entity.SocketModel;
import cn._94zichao.server.factory.executor.Executor;
import cn._94zichao.server.util.CacheUtil;
import io.netty.channel.ChannelHandlerContext;
@@ -11,35 +12,11 @@ import io.netty.channel.ChannelHandlerContext;
public class ChannelExecutorFactory {
public static Executor getExecutor(SocketModel sk){
return new Executor(sk);
}
/**
* 执行类向channel写入信息
*/
static class Executor {
private ChannelHandlerContext ctx;
byte[] data;
Executor(SocketModel sk){
this.data = sk.getData();
this.ctx = CacheUtil.getChannelCache(sk.getChannelId());
}
public void exec(){
if (ctx.executor().inEventLoop()) {
ctx.writeAndFlush(data);
} else {
ctx.executor().execute(new Runnable() {
public void run() {
ctx.pipeline().writeAndFlush(data);
}
});
}
ctx = null;
}
}
}

View File

@@ -0,0 +1,29 @@
package cn._94zichao.server.factory.executor;
import cn._94zichao.server.entity.SocketModel;
import cn._94zichao.server.util.CacheUtil;
import io.netty.channel.ChannelHandlerContext;
/**
* 执行类向channel写入信息
*/
public class Executor {
private ChannelHandlerContext ctx;
byte[] data;
public Executor(SocketModel sk){
this.data = sk.getData();
this.ctx = CacheUtil.getChannelCache(sk.getChannelId());
}
public void exec(){
if (ctx.executor().inEventLoop()) {
ctx.writeAndFlush(data);
} else {
ctx.executor().execute(new Runnable() {
public void run() {
ctx.pipeline().writeAndFlush(data);
}
});
}
ctx = null;
}
}

View File

@@ -1,20 +1,27 @@
package cn._94zichao.server.handler;
import cn._94zichao.server.bootstrap.ZzcServer;
import cn._94zichao.server.entity.SocketModel;
import cn._94zichao.server.util.CacheUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
import java.util.Map;
/**
* Created by zzc on 2017/5/16.
*根据类型调用不同的业务方法,并发送返回包
*/
public class BarrierServerHandler extends ChannelInboundHandlerAdapter { // (1)
public Map<Object,Method[]> methodsMap;
public BarrierServerHandler(Map<Object, Method[]> methodsMap) {
this.methodsMap = methodsMap;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -28,11 +35,11 @@ public class BarrierServerHandler extends ChannelInboundHandlerAdapter { // (1)
SocketModel sk = new SocketModel();
sk.setChannelId(ctx.channel().id().asShortText());
sk.setData((byte[])msg);
if (ZzcServer.methodsMap.size() == 0){
if (methodsMap.size() == 0){
}
for (Object serviceBean:ZzcServer.methodsMap.keySet()){
Method[] methods = ZzcServer.methodsMap.get(serviceBean);
for (Object serviceBean:methodsMap.keySet()){
Method[] methods = methodsMap.get(serviceBean);
for (Method method:methods){
try {
method.invoke(serviceBean,sk);

View File

@@ -1,176 +0,0 @@
package cn._94zichao.server.tcpServer;
import cn._94zichao.server.decoder.EndBasedDecoder;
import cn._94zichao.server.encoder.ToModelEncoder;
import cn._94zichao.server.handler.BarrierServerHandler;
import cn._94zichao.server.util.Content;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by SHUA on 2017/5/25.
*/
public class BaseServer implements Runnable {
public static Map map = new ConcurrentHashMap();
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
private int port;
//编码器
private List<ChannelInboundHandlerAdapter> decoders;
//解码器
private List<ChannelOutboundHandlerAdapter> encoders;
//入站业务处理器
private List<ChannelInboundHandlerAdapter> inHandler;
//出站业务处理器
private List<ChannelOutboundHandlerAdapter> outHandler;
public BaseServer() {
decoders = new ArrayList<>();
encoders = new ArrayList<>();
inHandler = new ArrayList<>();
outHandler = new ArrayList<>();
this.port = 8888;
}
/**
* 创建一个新类
* @return
*/
public static BaseServer create() {
return new BaseServer();
}
/**
*设置端口
* @param port
* @return
*/
public BaseServer port(int port) {
this.port = port;
return this;
}
/**
*解码器
* @param list
* @return
*/
public BaseServer decoder(ChannelInboundHandlerAdapter... list) {
for (int i = 0 ;i< list.length;i++) {
decoders.add(list[i]);
}
return this;
}
/**
*编码器
* @param list
* @return
*/
public BaseServer encoder(ChannelOutboundHandlerAdapter... list) {
for (int i = 0 ;i< list.length;i++) {
encoders.add(list[i]);
}
return this;
}
/**入站处理类
*
* @param list
* @return
*/
public BaseServer in(ChannelInboundHandlerAdapter... list) {
for (int i = 0 ;i< list.length;i++) {
inHandler.add(list[i]);
}
return this;
}
/**
*出站处理类
* @param list
* @return
*/
public BaseServer out(ChannelOutboundHandlerAdapter... list) {
for (int i = 0 ;i< list.length;i++) {
outHandler.add(list[i]);
}
return this;
}
/**
* 主方法
*/
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
//添加解码器
if (decoders.size()>0){
for (ChannelInboundHandlerAdapter cha:decoders){
ch.pipeline().addLast(cha.getClass().getName(),cha);
}
}
//添加编码器
if (encoders.size() > 0) {
for (ChannelOutboundHandlerAdapter cha:encoders){
ch.pipeline().addLast(cha.getClass().getName(),cha);
}
}
//添加业务处理器
if (inHandler.size()>0){
for (ChannelInboundHandlerAdapter cha:inHandler){
ch.pipeline().addLast(group,cha.getClass().getName(),cha);
}
}
//添加业务处理器
if (outHandler.size()>0){
for (ChannelOutboundHandlerAdapter cha:outHandler){
ch.pipeline().addLast(group,cha.getClass().getName(),cha);
}
}
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
System.out.println(5);
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public void start(){
new Thread(this).start();
}
public static void main(String[] args) {
BaseServer.create().port(9999).decoder(new EndBasedDecoder(Content.END,true)).encoder(new ToModelEncoder()).in(new BarrierServerHandler()).start();
}
}