国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

基于隊(duì)列多任務(wù)處理

2019-11-06 06:23:34
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

概述

最近學(xué)習(xí)了線程池和隊(duì)列相關(guān)知識(shí),基于隊(duì)列和線程池實(shí)現(xiàn)的多任務(wù)、多處理器的任務(wù)處理器。

具體實(shí)現(xiàn)

任務(wù)隊(duì)列的接口

public interface TaskQueue { /** * 設(shè)置任務(wù)隊(duì)列大小,默認(rèn)2000,最大10000 * @param size */ void setQueueSize(int size); /** * 設(shè)置核心處理線程 * @param coreThreadSize */ void setCoreThreadSize(int coreThreadSize); /** * 提交任務(wù) * @param task */ boolean submitTask(Task task); /** * 設(shè)置任務(wù)處理器 * @param taskPRocessors */ void setTaskProcessors(List<TaskProcessor> taskProcessors);}

任務(wù)Task

*/public class Task<T> { private T data; private String key; private long submitTime; private long processTime; public T getData() { return data; } public void setData(T data) { this.data = data; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public long getSubmitTime() { return submitTime; } public void setSubmitTime(long submitTime) { this.submitTime = submitTime; } public long getProcessTime() { return processTime; } public void setProcessTime(long processTime) { this.processTime = processTime; }}

TaskProcessor 任務(wù)處理器

public interface TaskProcessor<T> { /** * 處理任務(wù) * @param task */ void process(Task<T> task);}

任務(wù)隊(duì)列實(shí)現(xiàn)類(bean 對(duì)象生成)

public class LinkedBlockTaskQueue implements TaskQueue, InitializingBean, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(LinkedBlockTaskQueue.class); /** * 最大的隊(duì)列大小 */ private final int maxQueueSize = 10000; private final int maxCoreThreadSize = 20; /** * 默認(rèn)隊(duì)列大小 */ private int queueSize = 2000; /** * 核心線程數(shù) */ private int coreThreadSize = 10; private LinkedBlockingQueue<Task<?>> queue = null; private ExecutorService executorService = null; private List<TaskProcessor> taskProcessors; private volatile boolean breakLoop = false; @Override public void setQueueSize(int size) { size = size <= 0 ? queueSize : size; this.queueSize = size > this.maxQueueSize ? this.maxQueueSize : size; } @Override public void setCoreThreadSize(int coreThreadSize) { coreThreadSize = coreThreadSize <= 0 ? this.coreThreadSize : coreThreadSize; coreThreadSize = coreThreadSize > this.maxCoreThreadSize ? this.maxCoreThreadSize : coreThreadSize; this.coreThreadSize = coreThreadSize; } @Override public boolean submitTask(Task task) { return queue.offer(task); } @Override public void setTaskProcessors(List<TaskProcessor> taskProcessors) { this.taskProcessors = taskProcessors; } @Override public void destroy() throws Exception { this.breakLoop = true; } @Override public void afterPropertiesSet() throws Exception { this.queue = new LinkedBlockingQueue<>(this.queueSize); this.executorService = Executors.newFixedThreadPool(this.coreThreadSize); start(); } private void start() { if (taskProcessors == null || taskProcessors.isEmpty()) { logger.error("taskProcessors is null or empty"); return; } for (int i = 0; i < coreThreadSize; i++) { executorService.submit(new MultiConsumer()); } } private class MultiConsumer implements Runnable { @Override public void run() { while (true) { Task task = null; try { task = queue.take(); } catch (InterruptedException e) { continue; } for (TaskProcessor processor : taskProcessors) { try { processor.process(task); } catch (Exception e) { logger.error("process task exception", e); } } if (breakLoop) { break; } } } }}

測(cè)試

簡(jiǎn)單寫了測(cè)試類(這里為了簡(jiǎn)便,沒(méi)有使用spring作為容器),直接通過(guò)new 實(shí)例化任務(wù)隊(duì)列的。

public class StringTaskProcessor implements TaskProcessor<String> { @Override public void process(Task<String> task) { System.out.println("thread Id:"+Thread.currentThread().getId()+",data:"+task.getData().toString()); }}public class Test { public static void main(String[] args) throws Exception { LinkedBlockTaskQueue linkedBlockTaskQueue=new LinkedBlockTaskQueue(); List<TaskProcessor> taskProcessors=new ArrayList<>(); taskProcessors.add(new StringTaskProcessor()); linkedBlockTaskQueue.setTaskProcessors(taskProcessors); linkedBlockTaskQueue.afterPropertiesSet(); for (int i=0;i<100;i++) { Task<String> task=new Task<>(); task.setData(i+""); linkedBlockTaskQueue.submitTask(task); } }}
發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 淮安市| 玉田县| 扬中市| 昌黎县| 虞城县| 巴中市| 曲靖市| 那曲县| 宁南县| 永春县| 诸城市| 宜君县| 兴隆县| 繁峙县| 馆陶县| 遂溪县| 泗水县| 丰台区| 巴彦淖尔市| 北流市| 盐边县| 寻乌县| 锡林郭勒盟| 商都县| 夹江县| 丰都县| 长子县| 江永县| 重庆市| 同心县| 怀来县| 东乡| 哈巴河县| 枣强县| 新龙县| 平湖市| 桐柏县| 茂名市| 胶南市| 神农架林区| 德惠市|