從本文開始,我將用兩篇文章的篇幅,為各位讀者呈現java中原生的線程池技術。第一篇文章,我將講解JAVA原生線程池的基本使用,并由此延伸出JAVA中和線程管理相關的類結構體系,然后我們詳細描述JAVA原生線程池的結構和工作方式;第二篇文章,我們將繼續深入,講解JAVA原生線程池的高級特性,包括Thread工廠、隊列、拒絕原則、鉤子和相關工具類。
如果您是JAVA語言的初學者,請從本篇文章看起;如果您對線程池技術已有一定的了解,那么可以只看下一篇文章;如果您是高手,請繞行;如果您對我的觀點有任何意見和建議,請留言,謝謝。^-^

前文我們已經講到,線程是一個操作系統概念。操作系統負責這個線程的創建、掛起、運行、阻塞和終結操作。而操作系統創建線程、切換線程狀態、終結線程都要進行CPU調度——這是一個耗費時間和系統資源的事情(《操作系統知識回顧—進程線程模型》)
另一方面,目前大多數生產環境我們所面臨問題的技術背景一般是:處理某一次請求的時間是非常短暫的,但是請求數量是巨大的。這種技術背景下,如果我們為每一個請求都單獨創建一個線程,那么物理機的所有資源基本上都被操作系統創建線程、切換線程狀態、銷毀線程這些操作所占用,用于業務請求處理的資源反而減少了。所以最理想的處理方式是,將處理請求的線程數量控制在一個范圍,既保證后續的請求不會等待太長時間,又保證物理機將足夠的資源用于請求處理本身。
另外,一些操作系統是有最大線程數量限制的。當運行的線程數量逼近這個值的時候,操作系統會變得不穩定。這也是我們要限制線程數量的原因。
JAVA語言為我們提供了兩種基礎線程池的選擇:ScheduledThreadPoolExecutor和ThreadPoolExecutor。它們都實現了ExecutorService接口(注意,ExecutorService接口本身和“線程池”并沒有直接關系,它的定義更接近“執行器”,而“使用線程管理的方式進行實現”只是其中的一種實現方式)。這篇文章中,我們主要圍繞ThreadPoolExecutor類進行講解。
首先我們來看看ThreadPoolExecutor類的最簡單使用方式:
package test.thread.pool;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.log4j.BasicConfigurator;public class PoolThreadSimple { static { BasicConfigurator.configure(); } public static void main(String[] args) throws Throwable { /* * corePoolSize:核心大小,線程池初始化的時候,就會有這么大 * maximumPoolSize:線程池最大線程數 * keepAliveTime:如果當前線程池中線程數大于corePoolSize。 * 多余的線程,在等待keepAliveTime時間后如果還沒有新的線程任務指派給它,它就會被回收 * * unit:等待時間keepAliveTime的單位 * * workQueue:等待隊列。這個對象的設置是本文將重點介紹的內容 * */ ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>()); for(int index = 0 ; index < 10 ; index ++) { poolExecutor.submit(new PoolThreadSimple.TestRunnable(index)); } // 沒有特殊含義,只是為了保證main線程不會退出 synchronized (poolExecutor) { poolExecutor.wait(); } } /** * 這個就是測試用的線程 * @author yinwenjie */ PRivate static class TestRunnable implements Runnable { /** * 日志 */ private static Log LOGGER = LogFactory.getLog(TestRunnable.class); /** * 記錄任務的唯一編號,這樣在日志中好做識別 */ private Integer index; public TestRunnable(int index) { this.index = index; } /** * @return the index */ public Integer getIndex() { return index; } @Override public void run() { /* * 線程中,就只做一件事情: * 等待60秒鐘的事件,以便模擬業務操作過程 * */ Thread currentThread = Thread.currentThread(); TestRunnable.LOGGER.info("線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")開始執行==="); synchronized (currentThread) { try { currentThread.wait(60000); } catch (InterruptedException e) { TestRunnable.LOGGER.error(e.getMessage(), e); } } TestRunnable.LOGGER.info("線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")執行完成"); } }}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687隨后的文章中我們將對線程池中的corePoolSize、maximumPoolSize、keepAliveTime、timeUnit、workQueue、threadFactory、handler參數和一些常用/不常用的設置項進行逐一講解。
在上面的代碼中,我們創建線程池的時候使用了ThreadPoolExecutor中最簡單的一個構造函數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)12345
12345構造函數中需要傳入的參數包括corePoolSize、maximumPoolSize、keepAliveTime、timeUnit和workQueue。要明確理解這些參數(和后續將要介紹的參數)的含義,就首先要搞清楚ThreadPoolExecutor線程池的邏輯結構。

一定要注意一個概念,即存在于線程池中容器的一定是Thread對象,而不是您要求運行的任務(所以叫線程池而不叫任務池也不叫對象池,更不叫游泳池);您要求運行的任務將被線程池分配給某一個空閑的Thread運行。
從上圖中,我們可以看到構成線程池的幾個重要元素:
等待隊列:顧名思義,就是您調用線程池對象的submit()方法或者execute()方法,要求線程池運行的任務(這些任務必須實現Runnable接口或者Callable接口)。但是出于某些原因線程池并沒有馬上運行這些任務,而是送入一個隊列等待執行(這些原因后文馬上講解)。
核心線程:線程池主要用于執行任務的是“核心線程”,“核心線程”的數量是您創建線程時所設置的corePoolSize參數決定的。如果不進行特別的設定,線程池中始終會保持corePoolSize數量的線程數(不包括創建階段)。
非核心線程:一旦任務數量過多(由等待隊列的特性決定),線程池將創建“非核心線程”臨時幫助運行任務。您設置的大于corePoolSize參數小于maximumPoolSize參數的部分,就是線程池可以臨時創建的“非核心線程”的最大數量。這種情況下如果某個線程沒有運行任何任務,在等待keepAliveTime時間后,這個線程將會被銷毀,直到線程池的線程數量重新達到corePoolSize。
要重點理解上一條描述中黑體字部分的內容。也就是說,并不是所謂的“非核心線程”才會被回收;而是誰的空閑時間達到keepAliveTime這個閥值,就會被回收。直到線程池中線程數量等于corePoolSize為止。
maximumPoolSize參數也是當前線程池允許創建的最大線程數量。那么如果您設置的corePoolSize參數和您設置的maximumPoolSize參數一致時,線程池在任何情況下都不會回收空閑線程。keepAliveTime和timeUnit也就失去了意義。
keepAliveTime參數和timeUnit參數也是配合使用的。keepAliveTime參數指明等待時間的量化值,timeUnit指明量化值單位。例如keepAliveTime=1,timeUnit為TimeUnit.MINUTES,代表空閑線程的回收閥值為1分鐘。
說完了線程池的邏輯結構,下面我們討論一下線程池是怎樣處理某一個運行任務的。下圖描述了一個完整的任務處理過程:

1、首先您可以通過線程池提供的submit()方法或者execute()方法,要求線程池執行某個任務。線程池收到這個要求執行的任務后,會有幾種處理情況:
1.1、如果當前線程池中運行的線程數量還沒有達到corePoolSize大小時,線程池會創建一個新的線程運行您的任務,無論之前已經創建的線程是否處于空閑狀態。
1.2、如果當前線程池中運行的線程數量已經達到設置的corePoolSize大小,線程池會把您的這個任務加入到等待隊列中。直到某一個的線程空閑了,線程池會根據您設置的等待隊列規則,從隊列中取出一個新的任務執行。
1.3、如果根據隊列規則,這個任務無法加入等待隊列。這時線程池就會創建一個“非核心線程”直接運行這個任務。注意,如果這種情況下任務執行成功,那么當前線程池中的線程數量一定大于corePoolSize。
1.4、如果這個任務,無法被“核心線程”直接執行,又無法加入等待隊列,又無法創建“非核心線程”直接執行,且您沒有為線程池設置RejectedExecutionHandler。這時線程池會拋出RejectedExecutionException異常,即線程池拒絕接受這個任務。(實際上拋出RejectedExecutionException異常的操作,是ThreadPoolExecutor線程池中一個默認的RejectedExecutionHandler實現:AbortPolicy,這在后文會提到)
2、一旦線程池中某個線程完成了任務的執行,它就會試圖到任務等待隊列中拿去下一個等待任務(所有的等待任務都實現了BlockingQueue接口,按照接口字面上的理解,這是一個可阻塞的隊列接口),它會調用等待隊列的poll()方法,并停留在哪里。
3、當線程池中的線程超過您設置的corePoolSize參數,說明當前線程池中有所謂的“非核心線程”。那么當某個線程處理完任務后,如果等待keepAliveTime時間后仍然沒有新的任務分配給它,那么這個線程將會被回收。線程池回收線程時,對所謂的“核心線程”和“非核心線程”是一視同仁的,直到線程池中線程的數量等于您設置的corePoolSize參數時,回收過程才會停止。
在ThreadPoolExecutor線程池中,有一些不常用的設置。我建議如果您在應用場景中沒有特殊的要求,就不需要使用這些設置:
前文我們討論到,線程池回收線程只會發生在當前線程池中線程數量大于corePoolSize參數的時候;當線程池中線程數量小于等于corePoolSize參數的時候,回收過程就會停止。
allowCoreThreadTimeOut設置項可以要求線程池:將包括“核心線程”在內的,沒有任務分配的任何線程,在等待keepAliveTime時間后全部進行回收:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1));poolExecutor.allowCoreThreadTimeOut(true);123
123以下是設置前的效果:

以下是設置后的效果:

前文我們還討論到,當線程池中的線程還沒有達到您設置的corePoolSize參數值的時候,如果有新的任務到來,線程池將創建新的線程運行這個任務,無論之前已經創建的線程是否處于空閑狀態。這個描述可以用下面的示意圖表示出來:

prestartAllCoreThreads設置項,可以在線程池創建,但還沒有接收到任何任務的情況下,先行創建符合corePoolSize參數值的線程數:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1));poolExecutor.prestartAllCoreThreads();123
123================================
(后文預告:線程基礎:線程池(6)——高級特性(下))
ThreadPoolExecutor類結構體系使用ThreadFactory線程池任務隊列(重點講解)拒絕任務擴展ThreadPoolExecutor線程池 Hook methods Queue maintenance 工具類和后記 Executors Apache中的擴展 與spring結合新聞熱點
疑難解答