国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

生產者消費者模式之工作竊取算法

2019-11-08 19:42:26
字體:
來源:轉載
供稿:網友

生產者消費者模式之工作竊取算法 1、一個通道只有一個隊列,多個消費者共享一個隊列實例,導致鎖的競爭,如果一個通道擁有對個隊列,則消費者可以從通道中獲取各自隊列獲取數據。 2、如要服務有高性能和可靠性的要求,Consumer-PRoducer模式請使用 kafka等開源工具

public interface WorkStealingEnableChannel<P> extends Chanel<P> { P take(BlockingDeque<P> preferredQueue) throws InterruptedException;}public class WorkStealingChannel<P> implements WorkStealingEnableChannel<P> { //雙端隊列,可以從2端插入值或獲取值,繼承了BlockingQueue private final BlockingDeque<P>[] managedQueues; public WorkStealingChannel(BlockingDeque<P>[] managedQueues) { super(); this.managedQueues = managedQueues; } @Override public P take() throws InterruptedException { return take(null); } @Override public void put(P product) throws InterruptedException { int targetIndex = (product.hashCode() % managedQueues.length); BlockingQueue<P> targetQueue = managedQueues[targetIndex]; targetQueue.put(product); } @Override public P take(BlockingDeque<P> preferredQueue) throws InterruptedException { BlockingDeque<P> targetQueue = preferredQueue; P product = null; //優先從指定的隊列獲取值 if(null != targetQueue){ product = targetQueue.poll(); } int queueIndex = -1; while(null != product){ queueIndex = (queueIndex +1) % managedQueues.length; targetQueue = managedQueues[queueIndex]; //試圖從其他受管隊列的隊尾“竊取”“產品” product = targetQueue.pollLast(); if(preferredQueue == targetQueue){ break; } } if(null == product){ //隨機竊取 其他受管隊列的產品 queueIndex = (int) (System.currentTimeMillis() % managedQueues.length); targetQueue = managedQueues[queueIndex]; product = targetQueue.pollLast(); System.out.println("stealed from " + queueIndex + ": " + product); } return product; }}public class WorkStealingExample { private final WorkStealingEnableChannel<String> channel; private final TerminationToken token = new TerminationToken(); public static void main(String[] args) throws InterruptedException { WorkStealingExample wse = new WorkStealingExample(); //Thread.sleep(3500); } public WorkStealingExample(){ int nCPU = Runtime.getRuntime().availableProcessors(); int consumerCount = nCPU/2 + 1; BlockingDeque<String>[] managedQueues = new LinkedBlockingDeque[consumerCount]; channel = new WorkStealingChannel<String>(managedQueues); Consumer[] consumers = new Consumer[consumerCount]; for(int i=0; i<consumerCount; i++){ managedQueues[i] = new LinkedBlockingDeque<String>(); consumers[i] = new Consumer(token, managedQueues[i]); } for(int i=0; i<nCPU; i++){ new Producer().start(); } for(int i=0; i<consumerCount; i++){ consumers[i].start(); } } private class Producer extends AbstractTerminatableThread{ private int i = 0; @Override protected void doRun() throws Exception { channel.put(String.valueOf(i++)); Thread.sleep(10); token.reservations.incrementAndGet(); } } private class Consumer extends AbstractTerminatableThread{ private final BlockingDeque<String> workQueue; public Consumer(TerminationToken token, BlockingDeque<String> workQueue) { super(token); this.workQueue = workQueue; } @Override protected void doRun() throws Exception { /** * 實現了工作竊取算法 */ String product = channel.take(); if(product != null){ } System.out.println("Processing product:" + product); try { Thread.sleep(new Random().nextInt(50)); } catch (Exception e) { }finally{ token.reservations.decrementAndGet(); } } }}
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 双牌县| 静乐县| 北京市| 永善县| 辉县市| 特克斯县| 进贤县| 左权县| 彭州市| 溆浦县| 东光县| 新昌县| 阳东县| 大英县| 凭祥市| 邛崃市| 阜南县| 枣强县| 晴隆县| 灵台县| 行唐县| 页游| 元氏县| 曲靖市| 台前县| 萨嘎县| 修武县| 高密市| 沙洋县| 星座| 三都| 玉树县| 龙泉市| 绍兴市| 仪征市| 上虞市| 伊春市| 昌乐县| 凤翔县| 正镶白旗| 清原|