国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

基于Netty的RPC架構(gòu)筆記3之線程模型源碼分析(1)

2019-11-11 06:25:11
字體:
供稿:網(wǎng)友

      隨著用戶量上升,項(xiàng)目的架構(gòu)也在不斷的升級,由最開始的MVC的垂直架構(gòu)(傳統(tǒng)項(xiàng)目)到RPC架構(gòu)(webservice,rest,netty,mina),再到SOA模型(dubbo),再到最近的微服務(wù),又比如Tomcat6之前的IO模型都是BIO 也就是阻塞IO,到后來變成多路復(fù)用,也是阻塞IO。到非阻塞NIO,再到異步非阻塞AIO,

     言歸正傳,接著談netty,傳統(tǒng)IO是一個(gè)線程服務(wù)一個(gè)客戶,后來通過netty,可以一個(gè)線程服務(wù)多個(gè)客戶,下面的那個(gè)圖展示的是netty的NIO通過引入多線程來提高性能,既一個(gè)線程負(fù)責(zé)一片用戶

直接上代碼

package com.cn;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import com.cn.pool.NioSelectorRunnablePool;/** * 啟動函數(shù) * */public class Start {	public static void main(String[] args) {				//初始化線程		NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());				//獲取服務(wù)類		ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);				//綁定端口		bootstrap.bind(new InetSocketAddress(10101));				System.out.PRintln("start");	}}
package com.cn.pool;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import com.cn.NioServerBoss;import com.cn.NioServerWorker;/** * selector線程管理者 * */public class NioSelectorRunnablePool {	/**	 * boss線程數(shù)組	 */	private final AtomicInteger bossIndex = new AtomicInteger();	private Boss[] bosses;	/**	 * worker線程數(shù)組	 */	private final AtomicInteger workerIndex = new AtomicInteger();	private Worker[] workeres;		public NioSelectorRunnablePool(Executor boss, Executor worker) {		initBoss(boss, 1);		initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);	}	/**	 * 初始化boss線程	 * @param boss	 * @param count	 */	private void initBoss(Executor boss, int count) {		this.bosses = new NioServerBoss[count];		for (int i = 0; i < bosses.length; i++) {			bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);		}	}	/**	 * 初始化worker線程	 * @param worker	 * @param count	 */	private void initWorker(Executor worker, int count) {		this.workeres = new NioServerWorker[count];		for (int i = 0; i < workeres.length; i++) {			workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);		}	}	/**	 * 獲取一個(gè)worker	 * @return	 */	public Worker nextWorker() {		 return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];	}	/**	 * 獲取一個(gè)boss	 * @return	 */	public Boss nextBoss() {		 return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];	}}
package com.cn;import java.net.SocketAddress;import java.nio.channels.ServerSocketChannel;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;/** * 服務(wù)類 * */public class ServerBootstrap {private NioSelectorRunnablePool selectorRunnablePool;		public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {		this.selectorRunnablePool = selectorRunnablePool;	}		/**	 * 綁定端口	 * @param localAddress	 */	public void bind(final SocketAddress localAddress){		try {			// 獲得一個(gè)ServerSocket通道			ServerSocketChannel serverChannel = ServerSocketChannel.open();			// 設(shè)置通道為非阻塞			serverChannel.configureBlocking(false);			// 將該通道對應(yīng)的ServerSocket綁定到port端口			serverChannel.socket().bind(localAddress);						//獲取一個(gè)boss線程			Boss nextBoss = selectorRunnablePool.nextBoss();			//向boss注冊一個(gè)ServerSocket通道			nextBoss.registerAcceptChannelTask(serverChannel);		} catch (Exception e) {			e.printStackTrace();		}	}}
package com.cn.pool;import java.nio.channels.SocketChannel;/** * worker接口 * */public interface Worker {		/**	 * 加入一個(gè)新的客戶端會話	 * @param channel	 */	public void registerNewChannelTask(SocketChannel channel);}
package com.cn.pool;import java.nio.channels.ServerSocketChannel;/** * boss接口 * */public interface Boss {		/**	 * 加入一個(gè)新的ServerSocket	 * @param serverChannel	 */	public void registerAcceptChannelTask(ServerSocketChannel serverChannel);}
package com.cn;import java.io.IOException;import java.nio.channels.Selector;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;import com.cn.pool.NioSelectorRunnablePool;/** * 抽象selector線程類 *  *  */public abstract class AbstractNioSelector implements Runnable {	/**	 * 線程池	 */	private final Executor executor;	/**	 * 選擇器	 */	protected Selector selector;	/**	 * 選擇器wakenUp狀態(tài)標(biāo)記	 */	protected final AtomicBoolean wakenUp = new AtomicBoolean();	/**	 * 任務(wù)隊(duì)列	 */	private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();	/**	 * 線程名稱	 */	private String threadName;		/**	 * 線程管理對象	 */	protected NioSelectorRunnablePool selectorRunnablePool;	AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		this.executor = executor;		this.threadName = threadName;		this.selectorRunnablePool = selectorRunnablePool;		openSelector();	}	/**	 * 獲取selector并啟動線程	 */	private void openSelector() {		try {			this.selector = Selector.open();		} catch (IOException e) {			throw new RuntimeException("Failed to create a selector.");		}		executor.execute(this);	}	@Override	public void run() {				Thread.currentThread().setName(this.threadName);		while (true) {			try {				wakenUp.set(false);				select(selector);				processTaskQueue();				process(selector);			} catch (Exception e) {				// ignore			}		}	}	/**	 * 注冊一個(gè)任務(wù)并激活selector	 * 	 * @param task	 */	protected final void registerTask(Runnable task) {		taskQueue.add(task);		Selector selector = this.selector;		if (selector != null) {			if (wakenUp.compareAndSet(false, true)) {				selector.wakeup();			}		} else {			taskQueue.remove(task);		}	}	/**	 * 執(zhí)行隊(duì)列里的任務(wù)	 */	private void processTaskQueue() {		for (;;) {			final Runnable task = taskQueue.poll();			if (task == null) {				break;			}			task.run();		}	}		/**	 * 獲取線程管理對象	 * @return	 */	public NioSelectorRunnablePool getSelectorRunnablePool() {		return selectorRunnablePool;	}	/**	 * select抽象方法	 * 	 * @param selector	 * @return	 * @throws IOException	 */	protected abstract int select(Selector selector) throws IOException;	/**	 * selector的業(yè)務(wù)處理	 * 	 * @param selector	 * @throws IOException	 */	protected abstract void process(Selector selector) throws IOException;}
package com.cn;import java.io.IOException;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * boss實(shí)現(xiàn)類 * */public class NioServerBoss extends AbstractNioSelector implements Boss{	public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		super(executor, threadName, selectorRunnablePool);	}	@Override	protected void process(Selector selector) throws IOException {		Set<SelectionKey> selectedKeys = selector.selectedKeys();        if (selectedKeys.isEmpty()) {            return;        }                for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {            SelectionKey key = i.next();            i.remove();            ServerSocketChannel server = (ServerSocketChannel) key.channel();    		// 新客戶端    		SocketChannel channel = server.accept();    		// 設(shè)置為非阻塞    		channel.configureBlocking(false);    		// 獲取一個(gè)worker    		Worker nextworker = getSelectorRunnablePool().nextWorker();    		// 注冊新客戶端接入任務(wù)    		nextworker.registerNewChannelTask(channel);    		    		System.out.println("新客戶端鏈接");        }	}			public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){		 final Selector selector = this.selector;		 registerTask(new Runnable() {			@Override			public void run() {				try {					//注冊serverChannel到selector					serverChannel.register(selector, SelectionKey.OP_ACCEPT);				} catch (ClosedChannelException e) {					e.printStackTrace();				}			}		});	}		@Override	protected int select(Selector selector) throws IOException {		return selector.select();	}}
package com.cn;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * worker實(shí)現(xiàn)類 * */public class NioServerWorker extends AbstractNioSelector implements Worker{	public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		super(executor, threadName, selectorRunnablePool);	}	@Override	protected void process(Selector selector) throws IOException {		Set<SelectionKey> selectedKeys = selector.selectedKeys();        if (selectedKeys.isEmpty()) {            return;        }        Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();		while (ite.hasNext()) {			SelectionKey key = (SelectionKey) ite.next();			// 移除,防止重復(fù)處理			ite.remove();						// 得到事件發(fā)生的Socket通道			SocketChannel channel = (SocketChannel) key.channel();						// 數(shù)據(jù)總長度			int ret = 0;			boolean failure = true;			ByteBuffer buffer = ByteBuffer.allocate(1024);			//讀取數(shù)據(jù)			try {				ret = channel.read(buffer);				failure = false;			} catch (Exception e) {				// ignore			}			//判斷是否連接已斷開			if (ret <= 0 || failure) {				key.cancel();				System.out.println("客戶端斷開連接");	        }else{	        	 System.out.println("收到數(shù)據(jù):" + new String(buffer.array()));	        	 	     		//回寫數(shù)據(jù)	     		ByteBuffer outBuffer = ByteBuffer.wrap("收到/n".getBytes());	     		channel.write(outBuffer);// 將消息回送給客戶端	        }		}	}	/**	 * 加入一個(gè)新的socket客戶端	 */	public void registerNewChannelTask(final SocketChannel channel){		 final Selector selector = this.selector;		 registerTask(new Runnable() {			@Override			public void run() {				try {					//將客戶端注冊到selector中					channel.register(selector, SelectionKey.OP_READ);				} catch (ClosedChannelException e) {					e.printStackTrace();				}			}		});	}	@Override	protected int select(Selector selector) throws IOException {		return selector.select(500);	}	}


發(fā)表評論 共有條評論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 拉孜县| 双桥区| 临沧市| 临潭县| 九龙县| 丰原市| 沂南县| 富蕴县| 华坪县| 洛南县| 许昌市| 黄龙县| 遂溪县| 凭祥市| 巴楚县| 清镇市| 高阳县| 北流市| 海林市| 玛曲县| 肇州县| 连山| 罗山县| 娄烦县| 阳东县| 巫溪县| 桐庐县| 临西县| 安远县| 卫辉市| 平果县| 高密市| 文山县| 连城县| 隆化县| 张掖市| 湟中县| 平顶山市| 北京市| 沂南县| 鱼台县|