在上篇《Netty(一)引題》中,分別對AIO,BIO,PIO,NIO進行了簡單的闡述,并寫了簡單的demo。但是這里說的簡單,我也只能呵呵了,特別是NIO、AIO(我全手打的,好麻煩)。
在開始netty開發TimeServer之前,先回顧下NIO進行服務端開發的步驟:
一個簡單的NIO程序,需要經過繁瑣的十多步操作才能完成最基本的消息讀取和發送,這也是我學netty的原因,下面就看看使用netty是如何輕松寫服務器的。
在這里,我使用IDEA 14 + Maven用netty寫上篇中TimeServer的程序。這里我直接用Maven的pom.xml來直接下載netty的包(Maven是對依賴進行管理,支持自動化的測試、編譯、構建的項目管理工具,具體的Maven請讀者自行百度、google搜索)。
/* TimeServer */
1 public class TimeServer { 2 public void bind(int port)throws Exception{ 3 /* 配置服務端的NIO線程組 */ 4 // NioEventLoopGroup類 是個線程組,包含一組NIO線程,用于網絡事件的處理 5 // (實際上它就是Reactor線程組)。 6 // 創建的2個線程組,1個是服務端接收客戶端的連接,另一個是進行SocketChannel的 7 // 網絡讀寫 8 EventLoopGroup bossGroup = new NioEventLoopGroup(); 9 EventLoopGroup WorkerGroup = new NioEventLoopGroup();10 11 try {12 // ServerBootstrap 類,是啟動NIO服務器的輔助啟動類13 ServerBootstrap b = new ServerBootstrap();14 b.group(bossGroup,WorkerGroup)15 .channel(NioServerSocketChannel.class)16 .option(ChannelOption.SO_BACKLOG,1024)17 .childHandler(new ChildChannelHandler());18 19 // 綁定端口,同步等待成功20 ChannelFuture f= b.bind(port).sync();21 22 // 等待服務端監聽端口關閉23 f.channel().closeFuture().sync();24 }finally {25 // 釋放線程池資源26 bossGroup.shutdownGracefully();27 WorkerGroup.shutdownGracefully();28 }29 }30 31 PRivate class ChildChannelHandler extends ChannelInitializer<SocketChannel>{32 @Override33 protected void initChannel(SocketChannel arg0)throws Exception{34 arg0.pipeline().addLast(new TimeServerHandler());35 }36 }37 38 public static void main(String[]args)throws Exception{39 int port = 8080;40 if(args!=null && args.length>0){41 try {42 port = Integer.valueOf(args[0]);43 }44 catch (NumberFormatException ex){}45 }46 new TimeServer().bind(port);47 }48 }
1 public class TimeServerHandler extends ChannelHandlerAdapter{ 2 // 用于網絡的讀寫操作 3 @Override 4 public void channelRead(ChannelHandlerContext ctx,Object msg) 5 throws Exception{ 6 ByteBuf buf = (ByteBuf)msg; 7 byte[]req = new byte[buf.readableBytes()]; 8 buf.readBytes(req); 9 String body = new String(req,"UTF-8");10 System.out.println("the time server order : " + body);11 12 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(13 System.currentTimeMillis()).toString():"BAD ORDER";14 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());15 ctx.write(resp);16 }17 18 @Override19 public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{20 ctx.flush(); // 它的作用是把消息發送隊列中的消息寫入SocketChannel中發送給對方21 // 為了防止頻繁的喚醒Selector進行消息發送,Netty的write方法,并不直接將消息寫入SocketChannel中22 // 調用write方法只是把待發送的消息發到緩沖區中,再調用flush,將發送緩沖區中的消息23 // 全部寫到SocketChannel中。24 }25 26 @Override27 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){28 ctx.close();29 }30 }
/* TimeClient */
1 public class TimeClient { 2 public void connect(String host,int port)throws Exception{ 3 // 配置服務端的NIO線程組 4 EventLoopGroup group = new NioEventLoopGroup(); 5 6 try { 7 // Bootstrap 類,是啟動NIO服務器的輔助啟動類 8 Bootstrap b = new Bootstrap(); 9 b.group(group).channel(NioSocketChannel.class)10 .option(ChannelOption.TCP_NODELAY,true)11 .handler(new ChannelInitializer<SocketChannel>() {12 @Override13 public void initChannel(SocketChannel ch)14 throws Exception{15 ch.pipeline().addLast(new TimeClientHandler());16 }17 });18 19 // 發起異步連接操作20 ChannelFuture f= b.connect(host,port).sync();21 22 // 等待客服端鏈路關閉23 f.channel().closeFuture().sync();24 }finally {25 group.shutdownGracefully();26 }27 }28 29 public static void main(String[]args)throws Exception{30 int port = 8080;31 if(args!=null && args.length>0){32 try {33 port = Integer.valueOf(args[0]);34 }35 catch (NumberFormatException ex){}36 }37 new TimeClient().connect("127.0.0.1",port);38 }39 }
1 public class TimeClientHandler extends ChannelHandlerAdapter{ 2 3 // 寫日志 4 private static final Logger logger = 5 Logger.getLogger(TimeClientHandler.class.getName()); 6 7 private final ByteBuf firstMessage; 8 9 public TimeClientHandler(){10 byte[] req = "QUERY TIME ORDER".getBytes();11 firstMessage = Unpooled.buffer(req.length);12 firstMessage.writeBytes(req);13 }14 15 @Override16 public void channelRead(ChannelHandlerContext ctx,Object msg)17 throws Exception{18 ByteBuf buf = (ByteBuf)msg;19 byte[]req = new byte[buf.readableBytes()];20 buf.readBytes(req);21 String body = new String(req,"UTF-8");22 System.out.println("Now is : " + body);23 }24 25 @Override26 public void channelActive(ChannelHandlerContext ctx){27 // 當客戶端和服務端建立tcp成功之后,Netty的NIO線程會調用channelActive28 // 發送查詢時間的指令給服務端。29 // 調用ChannelHandlerContext的writeAndFlush方法,將請求消息發送給服務端30 // 當服務端應答時,channelRead方法被調用31 ctx.writeAndFlush(firstMessage);32 }33 34 @Override35 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){36 logger.warning("message from:"+cause.getMessage());37 ctx.close();38 }39 }
本例子沒有考慮讀半包的處理,對于功能演示和測試,本例子沒問題,但是如果進行性能或者壓力測試,就不能正常工作了。在下一節會弄正確處理半包消息的例子。
項目在源碼在src/main/java/Netty/下,分為客戶端和服務端。
源碼下載:GitHub地址:https://github.com/orange1438/Netty_Course
題外話:雖然文章全是我純手打,沒任何復制,但是文章大多數內容來自《Netty權威指南》,我也是順便學習的。之前我做C++服務端,因為狗血的面試C++,結果公司系統居然是java的,無耐我所在的重慶,C++少得可憐,所以只有在公司里學java了。當然,有epoll,select,事件驅動,TCP/IP概念的小伙伴來說,學這個netty,還是挺簡單的。
新聞熱點
疑難解答