最近學(xué)習(xí)了線程池和隊(duì)列相關(guān)知識(shí),基于隊(duì)列和線程池實(shí)現(xiàn)的多任務(wù)、多處理器的任務(wù)處理器。
任務(wù)隊(duì)列的接口
public interface TaskQueue { /** * 設(shè)置任務(wù)隊(duì)列大小,默認(rèn)2000,最大10000 * @param size */ void setQueueSize(int size); /** * 設(shè)置核心處理線程 * @param coreThreadSize */ void setCoreThreadSize(int coreThreadSize); /** * 提交任務(wù) * @param task */ boolean submitTask(Task task); /** * 設(shè)置任務(wù)處理器 * @param taskPRocessors */ void setTaskProcessors(List<TaskProcessor> taskProcessors);}任務(wù)Task
*/public class Task<T> { private T data; private String key; private long submitTime; private long processTime; public T getData() { return data; } public void setData(T data) { this.data = data; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public long getSubmitTime() { return submitTime; } public void setSubmitTime(long submitTime) { this.submitTime = submitTime; } public long getProcessTime() { return processTime; } public void setProcessTime(long processTime) { this.processTime = processTime; }}TaskProcessor 任務(wù)處理器
public interface TaskProcessor<T> { /** * 處理任務(wù) * @param task */ void process(Task<T> task);}任務(wù)隊(duì)列實(shí)現(xiàn)類(bean 對(duì)象生成)
public class LinkedBlockTaskQueue implements TaskQueue, InitializingBean, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(LinkedBlockTaskQueue.class); /** * 最大的隊(duì)列大小 */ private final int maxQueueSize = 10000; private final int maxCoreThreadSize = 20; /** * 默認(rèn)隊(duì)列大小 */ private int queueSize = 2000; /** * 核心線程數(shù) */ private int coreThreadSize = 10; private LinkedBlockingQueue<Task<?>> queue = null; private ExecutorService executorService = null; private List<TaskProcessor> taskProcessors; private volatile boolean breakLoop = false; @Override public void setQueueSize(int size) { size = size <= 0 ? queueSize : size; this.queueSize = size > this.maxQueueSize ? this.maxQueueSize : size; } @Override public void setCoreThreadSize(int coreThreadSize) { coreThreadSize = coreThreadSize <= 0 ? this.coreThreadSize : coreThreadSize; coreThreadSize = coreThreadSize > this.maxCoreThreadSize ? this.maxCoreThreadSize : coreThreadSize; this.coreThreadSize = coreThreadSize; } @Override public boolean submitTask(Task task) { return queue.offer(task); } @Override public void setTaskProcessors(List<TaskProcessor> taskProcessors) { this.taskProcessors = taskProcessors; } @Override public void destroy() throws Exception { this.breakLoop = true; } @Override public void afterPropertiesSet() throws Exception { this.queue = new LinkedBlockingQueue<>(this.queueSize); this.executorService = Executors.newFixedThreadPool(this.coreThreadSize); start(); } private void start() { if (taskProcessors == null || taskProcessors.isEmpty()) { logger.error("taskProcessors is null or empty"); return; } for (int i = 0; i < coreThreadSize; i++) { executorService.submit(new MultiConsumer()); } } private class MultiConsumer implements Runnable { @Override public void run() { while (true) { Task task = null; try { task = queue.take(); } catch (InterruptedException e) { continue; } for (TaskProcessor processor : taskProcessors) { try { processor.process(task); } catch (Exception e) { logger.error("process task exception", e); } } if (breakLoop) { break; } } } }}簡(jiǎn)單寫了測(cè)試類(這里為了簡(jiǎn)便,沒(méi)有使用spring作為容器),直接通過(guò)new 實(shí)例化任務(wù)隊(duì)列的。
public class StringTaskProcessor implements TaskProcessor<String> { @Override public void process(Task<String> task) { System.out.println("thread Id:"+Thread.currentThread().getId()+",data:"+task.getData().toString()); }}public class Test { public static void main(String[] args) throws Exception { LinkedBlockTaskQueue linkedBlockTaskQueue=new LinkedBlockTaskQueue(); List<TaskProcessor> taskProcessors=new ArrayList<>(); taskProcessors.add(new StringTaskProcessor()); linkedBlockTaskQueue.setTaskProcessors(taskProcessors); linkedBlockTaskQueue.afterPropertiesSet(); for (int i=0;i<100;i++) { Task<String> task=new Task<>(); task.setData(i+""); linkedBlockTaskQueue.submitTask(task); } }}新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注