一個thread + 隊列 == 一個單線程線程池 =====> 線程安全的,任務是線性串行執行的
線程安全,不會產生阻塞效應 ,使用對象組 下圖是原理圖

線程不安全,會產生阻塞效應, 使用對象池,下圖是對象池的原理圖

再看下線程池的原理圖

下面是netty5的服務端代碼參考
package com.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * netty5服務端 * */public class Server { public static void main(String[] args) { //服務類 ServerBootstrap bootstrap = new ServerBootstrap(); //boss和worker EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { //設置線程池 bootstrap.group(boss, worker); //設置socket工廠、 bootstrap.channel(NioServerSocketChannel.class); //設置管道工廠 bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override PRotected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } }); //netty3中對應設置如下 //bootstrap.setOption("backlog", 1024); //bootstrap.setOption("tcpNoDelay", true); //bootstrap.setOption("keepAlive", true); //設置參數,TCP參數 bootstrap.option(ChannelOption.SO_BACKLOG, 1);//serverSocketchannel的設置,鏈接緩沖池的大小 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的設置,維持鏈接的活躍,清除死鏈接 bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的設置,關閉延遲發送 //綁定端口 ChannelFuture future = bootstrap.bind(10101); System.out.println("start"); //等待服務端關閉 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally{ //釋放資源 boss.shutdownGracefully(); worker.shutdownGracefully(); } }}package com.server;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * 服務端消息處理 * */public class ServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); ctx.channel().writeAndFlush("hi"); ctx.writeAndFlush("hi"); } /** * 新客戶端接入 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); } /** * 客戶端斷開 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive"); } /** * 異常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }再看看普通客戶端的代碼package com.client;import java.io.BufferedReader;import java.io.InputStreamReader;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * netty5的客戶端 * */public class Client { public static void main(String[] args) { //服務類 Bootstrap bootstrap = new Bootstrap(); //worker EventLoopGroup worker = new NioEventLoopGroup(); try { //設置線程池 bootstrap.group(worker); //設置socket工廠、 bootstrap.channel(NioSocketChannel.class); //設置管道 bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture connect = bootstrap.connect("127.0.0.1", 10101); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while(true){ System.out.println("請輸入:"); String msg = bufferedReader.readLine(); connect.channel().writeAndFlush(msg); } } catch (Exception e) { e.printStackTrace(); } finally{ worker.shutdownGracefully(); } }}package com.client;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * 客戶端消息處理 * */public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("客戶端收到消息:"+msg); }}再看看單客戶端多連接程序package com.client;import java.io.BufferedReader;import java.io.InputStreamReader;/** * 啟動類 * */public class Start { public static void main(String[] args) { MultClient client = new MultClient(); client.init(5); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while(true){ try { System.out.println("請輸入:"); String msg = bufferedReader.readLine(); client.nextChannel().writeAndFlush(msg); } catch (Exception e) { e.printStackTrace(); } } }}package com.client;import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * 多連接客戶端 * */public class MultClient { /** * 服務類 */ private Bootstrap bootstrap = new Bootstrap(); /** * 會話 */ private List<Channel> channels = new ArrayList<>(); /** * 引用計數 */ private final AtomicInteger index = new AtomicInteger(); /** * 初始化 * @param count */ public void init(int count){ //worker EventLoopGroup worker = new NioEventLoopGroup(); //設置線程池 bootstrap.group(worker); //設置socket工廠、 bootstrap.channel(NioSocketChannel.class); //設置管道 bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); for(int i=1; i<=count; i++){ ChannelFuture future = bootstrap.connect("127.0.0.1", 10101); channels.add(future.channel()); } } /** * 獲取會話 * @return */ public Channel nextChannel(){ return getFirstActiveChannel(0); } private Channel getFirstActiveChannel(int count){ Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size())); if(!channel.isActive()){ //重連 reconnect(channel); if(count >= channels.size()){ throw new RuntimeException("no can use channel"); } return getFirstActiveChannel(count + 1); } return channel; } /** * 重連 * @param channel */ private void reconnect(Channel channel){ synchronized(channel){ if(channels.indexOf(channel) == -1){ return ; } Channel newChannel = bootstrap.connect("127.0.0.1", 10101).channel(); channels.set(channels.indexOf(channel), newChannel); } }}
新聞熱點
疑難解答