1、線程池技術(shù)的簡(jiǎn)單概括
對(duì)于服務(wù)端的程序,經(jīng)常面對(duì)的是客戶端傳入的短小(執(zhí)行時(shí)間短、工作內(nèi)容較為單一)任務(wù),需要服務(wù)端快速處理并返回結(jié)果。如果服務(wù)端每次接受到一個(gè)任務(wù),創(chuàng)建一個(gè)線程,然后進(jìn)行執(zhí)行,這在原型階段是個(gè)不錯(cuò)的選擇,但是面對(duì)成千上萬(wàn)的任務(wù)遞交進(jìn)服務(wù)器時(shí),如果還是采用一個(gè)任務(wù)一個(gè)線程的方式,那么將會(huì)創(chuàng)建數(shù)以萬(wàn)記的線程,這不是一個(gè)好的選擇。因?yàn)檫@會(huì)使操作系統(tǒng)頻繁的進(jìn)行線程上下文切換,無(wú)故增加系統(tǒng)的負(fù)載,而線程的創(chuàng)建和消亡都是需要耗費(fèi)系統(tǒng)資源的,也無(wú)疑浪費(fèi)了系統(tǒng)資源。
線程池技術(shù)能夠很好地解決這個(gè)問(wèn)題,它預(yù)先創(chuàng)建了若干數(shù)量的線程,并且不能由用戶直接對(duì)線程的創(chuàng)建進(jìn)行控制,在這個(gè)前提下重復(fù)使用固定或較為固定數(shù)目的線程來(lái)完成任務(wù)的執(zhí)行。這樣做的好處是,一方面,消除了頻繁創(chuàng)建和消亡線程的系統(tǒng)資源開(kāi)銷,另一方面,面對(duì)過(guò)量任務(wù)的提交能夠平緩的劣化。
2、簡(jiǎn)單的線程池接口定義
客戶端可以通過(guò)execute(Job)方法將Job提交入線程池執(zhí)行,而客戶端自身不用等待Job的執(zhí)行完成。除了execute(Job)方法以外,線程池接口提供了增大/減少工作者線程以及關(guān)閉線程池的方法。這里工作者線程代表著一個(gè)重復(fù)執(zhí)行Job的線程,而每個(gè)由客戶端提交的Job都將進(jìn)入到一個(gè)工作隊(duì)列中等待工作者線程的處理。
public interface ThreadPool<Job extends Runnable> { // 執(zhí)行一個(gè)Job,這個(gè)Job需要實(shí)現(xiàn)Runnable void execute(Job job); // 關(guān)閉線程池銷毀線程池,該方法保證在所有任務(wù)都完成的情況下才銷毀所有線程,否則等待任務(wù)完成才銷毀 void shutdown(); // 增加工作者線程 void addWorkers(int num); // 減少工作者線程 void removeWorker(int num); // 得到正在等待執(zhí)行的任務(wù)數(shù)量 int getJobSize(); // 獲得當(dāng)前線程池中線程的個(gè)數(shù) int getThreadSize();}3、線程池接口的默認(rèn)實(shí)現(xiàn)
import java.util.LinkedList;import java.util.List;import java.util.concurrent.atomic.AtomicLong;public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { // 線程池最大限制數(shù) PRivate static final int MAX_WORKER_NUMBERS = 10; // 線程池默認(rèn)的數(shù)量 private static final int DEFAULT_WORKER_NUMBERS = 5; // 線程池最小的數(shù)量 private static final int MIN_WORKER_NUMBERS = 1; // 這是一個(gè)工作列表,將會(huì)向里面插入工作 private final LinkedList<Job> jobs = new LinkedList<Job>(); // 工作者列表 private List<Worker> workers = new LinkedList<Worker>(); // 工作者線程的數(shù)量 private int workerNum = DEFAULT_WORKER_NUMBERS; // 線程編號(hào)生成 private AtomicLong threadNum = new AtomicLong(); public DefaultThreadPool() { initializeWokers(DEFAULT_WORKER_NUMBERS); } public DefaultThreadPool(int num) { workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num; initializeWokers(workerNum); } public void execute(Job job) { if (job != null) { // 添加一個(gè)工作,然后進(jìn)行通知 synchronized (jobs) { jobs.addLast(job); jobs.notify(); } } } public void shutdown() { // 如果還有任務(wù)沒(méi)執(zhí)行完成,就先睡會(huì)吧 while (!jobs.isEmpty()) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } for (Worker worker : workers) { worker.shutdown(); worker = null; } workers = null; // for (int i = 0; i < workers.size(); i++) { // workers.get(i).shutdown(); // } // for (int i = 0; i < workers.size() + 1; i++) { // workers.get(i) = null; // } jobs.clear();// 清空任務(wù)隊(duì)列 } public void addWorkers(int num) { synchronized (jobs) { // 限制新增的Worker數(shù)量不能超過(guò)最大值 if (num + this.workerNum > MAX_WORKER_NUMBERS) { num = MAX_WORKER_NUMBERS - this.workerNum; } initializeWokers(num); this.workerNum += num; } } public void removeWorker(int num) { synchronized (jobs) { if (num >= this.workerNum) { throw new IllegalArgumentException("beyond workNum"); } // 按照給定的數(shù)量停止Worker int count = 0; while (count < num) { Worker worker = workers.get(count); if (workers.remove(worker)) { worker.shutdown(); count++; } } this.workerNum -= count; } } public int getJobSize() { return jobs.size(); } // 初始化線程工作者 private void initializeWokers(int num) { for (int i = 0; i < num; i++) { Worker worker = new Worker(); workers.add(worker); Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet()); thread.start(); } } // 工作者,負(fù)責(zé)消費(fèi)任務(wù) class Worker implements Runnable { // 該工作線程是否有效,用于結(jié)束該工作線程 private volatile boolean running = true; /* * 關(guān)鍵所在啊,如果任務(wù)隊(duì)列不空,則取出任務(wù)執(zhí)行,若任務(wù)隊(duì)列空,則等待 */ @Override public void run() { Runnable job = null; while (running) { synchronized (jobs) { // 線程還在運(yùn)行 // 并且,如果工作者列表是空的,那么就wait while (running && jobs.isEmpty()) { // 隊(duì)列為空 try { jobs.wait(20); } catch (InterruptedException ex) { // 感知到外部對(duì)WorkerThread的中斷操作,返回 Thread.currentThread().interrupt(); return; } } if (!jobs.isEmpty()) job = jobs.remove(0);// 取出任務(wù) } if (job != null) { job.run(); // 執(zhí)行任務(wù) } // 釋放資源 job = null; } } // 停止工作,讓該線程自然執(zhí)行完run方法,自然結(jié)束 public void shutdown() { running = false; } } @Override public int getThreadSize() { return workerNum; }}4、線程池的實(shí)現(xiàn)分析
從線程池的實(shí)現(xiàn)可以看到,當(dāng)客戶端調(diào)用execute(Job)方法時(shí),會(huì)不斷地向任務(wù)列表jobs中添加Job,而每個(gè)工作者線程會(huì)不斷地從jobs上取出一個(gè)Job進(jìn)行執(zhí)行,當(dāng)jobs為空時(shí),工作者線程進(jìn)入等待狀態(tài)。添加一個(gè)Job后,對(duì)工作隊(duì)列jobs調(diào)用了其notify()方法,而不是notifyAll()方法,因?yàn)槟軌虼_定有工作者線程被喚醒,這時(shí)使用notify()方法將會(huì)比notifyAll()方法獲得更小的開(kāi)銷(避免將等待隊(duì)列中的線程全部移動(dòng)到阻塞隊(duì)列中)。 可以看到,線程池的本質(zhì)就是使用了一個(gè)線程安全的工作隊(duì)列連接工作者線程和客戶端線程,客戶端線程將任務(wù)放入工作隊(duì)列后便返回,而工作者線程則不斷地從工作隊(duì)列上取出工作并執(zhí)行。當(dāng)工作隊(duì)列為空時(shí),所有的工作者線程均等待在工作隊(duì)列上,當(dāng)有客戶端提交了一個(gè)任務(wù)之后會(huì)通知任意一個(gè)工作者線程,隨著大量的任務(wù)被提交,更多的工作者線程會(huì)被喚醒。
5、測(cè)試代碼
5.1 要執(zhí)行的任務(wù)
public class MyJob implements Runnable { private int age; public MyJob(int age) { this.age = age; } @Override public void run() { System.out.println(age); }} 5.2 測(cè)試代碼
public class TestThreadPool { public static void main(String[] args) { // 初始化線程池個(gè)數(shù) ThreadPool<Runnable> threadPool = new DefaultThreadPool<Runnable>(2); // 提交任務(wù) threadPool.execute(new MyJob(3)); threadPool.execute(new MyJob(4)); threadPool.execute(new MyJob(5)); // 銷毀線程池 threadPool.shutdown(); }}6、參考的書(shū)籍
Java并發(fā)編程的藝術(shù)
7、代碼參考地址
https://git.oschina.net/baowei/MyThreadPool
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注