原文地址
但凡是一個框架(“服務”框架),基本都會涉及線程池問題。雖然你可能沒有直接使用它,但這是因為框架幫你完成了這部分工作。
說,為什么需要線程池呢?試想,現在但凡是寫一個服務程序,如果不采用并發或并行的方式,都有點對不起4核、8核,甚至更多的CPU內核(物理內核,邏輯內核)。如果每次需要線程,都創建一次,使用完后,銷毀,對系統性能的消耗比較大,更加合適的做法是,在程序初始化時,一次性把所有線程都創建好,這樣,當需要的時候,直接用就行~
雖然框架提供線程池,不需要自己寫,但了解線程池,至少能大幅度提高你的編程能力。可能,不是每個項目都要搞一套框架,如果項目沒那么復雜,自己寫個簡單的線程池還是需要的。
在閱讀研究線程池的源碼之前,一直感覺線程池是一個框架中最高深的技術。研究后才發現,線程池的實現是如此精巧。本文從技術角度分析了線程池的本質原理和組成,同時分析了JDK、Jetty6、Jetty8、Tomcat的源碼實現,對于想了解線程池本質、更好的使用線程池或者定制實現自己的線程池的業務場景具有一定指導意義。
從內部實現上看,線程池技術可主要劃分為如下6個要點實現:

圖1 線程池技術要點
線程池中的線程可以重復利用執行任務,一個worker在生命周期內會不停的處理多個任務job,本質上,就是復用一個worker去處理多個job。
“流控“本質是通過對worker數量的控制實現并發數控制。通過設置參數控制worker的數量,實現線程池的容量伸縮。
工作者線程workers的數量是有限的,同一時間最多只能處理workers數量個job。對于來不及處理的job需要保存到等待隊列,空閑的工作者worker會不停的讀取隊列里的job進行處理。
基于不同的隊列,可以實現多種線程池,如定制隊列出隊順序實現帶處理優先級的線程池、定制隊列為阻塞有界隊列實現可阻塞能力的線程池等。
流控一方面通過控制worker數控制并發數和處理能力,一方面可基于隊列控制線程池處理能力的上限。
線程池參數的設定和多個工作者workers的初始化。通常有一開始就初始化指定數量的workers或者有請求時逐步初始化工作者兩種方式。前者線程池啟動初期響應會比較快但造成了空載時的少量性能浪費,后者是基于請求量靈活擴容但犧牲了線程池啟動初期性能達不到最優。
業務給線程池添加任務job時線程池的處理算法。有的線程池基于算法識別直接處理job,還是增加工作者數量來處理job,或者放入待處理隊列,也有的線程池會直接將job放入待處理隊列,等待工作者worker去取出執行。
業務線程數不是持久不變的,有高低峰期。線程池要有自己的算法根據業務請求頻率高低調節自身工作者workers的數量,從而調節線程池大小,實現業務高峰期增加工作者數量提高響應速度,而業務低峰期減少工作者數來節省服務器資源。增加算法通常基于幾個維度進行:待處理工作job數、線程池定義的最大最小工作者數、工作者閑置時間。
停止時線程池要有自身的停止邏輯,保證所有job都得到執行或者拋棄。
結合上面的技術點,列舉幾種線程池實現方式。
表 1
|           實現  |                   工作者workers結構與并發保護  |                   待處理工作隊列結構  |      
|           JDK  |                   使用了HashSet來存儲工作者workers,通過可重入鎖ReentrantLock對其進行并發保護。每個worker都是一個Runnable接口。  |                   使用了實現接口BlockingQueue的阻塞隊列來存儲待處理工作job,并把隊列作為構造函數參數,從而實現業務可以靈活的擴展定制線程池的隊列。 業務也可使用JDK自身的同步阻塞隊列SynchronousQueue、有界隊列ArrayBlockingQueue、無界隊列LinkedBlockingQueue、優先級隊列PRiorityBlockingQueue。  |      
|           Jetty6  |                   同樣使用了HashSet存儲工作者workers,通過synchronized一個對象進行HashSet的并發保護。每個工作者實際上是一個Thread的擴展。  |                   使用了數組存儲待處理的job對象Runnable。數組初始化容量為_maxThreads個,使用變量_queued計算保存當前內部待處理job的個數即數組length。超過數組最大值時,擴大_maxThreads個容量,因此數組永遠夠用夠大,容量無界。同樣是用synchronized一個對象的方式實現同步。  |      
|           Jetty8  |                   使用了ConcurrentLinkedQueue存儲工作者workers,利用JDK基于CAS算法的實現提高了并發效率,同時也降低了線程池并發保護的復雜程度。 針對隊列ConcurrentLinkedQueue無法保證size()實時性問題引入原子變量AtomicInteger統計工作者數量。  |                   與JDK相同實現,使用了基于接口BlockingQueue的阻塞隊列來存儲待處理工作job,也支持在線程池構造函數的參數中傳入隊列類型。同時,Jetty8內部默認未設置隊列類型場景可自動設置使用2種隊列:有界無法擴容的ArrayBlockingQueue,以及Jetty自身定制擴展實現的可擴容隊列BlockingArrayQueue。  |      
|           Tomcat  |                   基于JDK的ThreadPoolExecutors實現,復用JDK業務  |                   復用JDK業務  |      
表 2
|           實現  |                   線程池構造與工作者初始化  |                   處理業務job的算法  |      
|           JDK  |                   1. 基于多個構造參數實現靈活初始化,幾個核心參數如下: corePoolSize:核心工作者數 maximumPoolSize:最大工作者數 keepAliveTime:超過核心工作者數時閑置工作者的存活時間。 workQueue:待處理job隊列,即前面提到的BlockingQueue接口。 2. 默認初始化后,不啟動工作者,等待有請求時才啟動。可以通過調用線程池接口提前啟動核心工作數指定的工作者線程,也可以啟動業務期望的多個工作者線程。  |                   1. 工作者workers數量低于核心工作者數corePoolSize時,會優先創建一個工作者worker處理job,處理成功則返回。 2. 工作者workers數量高于核心工作者數時,會優先把job放入到待處理隊列,放入隊列成功時處理結束。 3. 步驟2中入隊失敗會識別工作者數是否還小于最大工作者數maximumPoolsize,小于的話也會新創建一個工作者worker處理job。 4. 拒絕處理  |      
|           Jetty6  |                   1. 支持設置多個參數: _spawnOrShrinkAt:擴容/縮容閥值 _minThreads:最小工作者數 _maxThreads:最大工作者數 _maxIdleTimeMs:閑置工作者最大閑置超時時間 2. 初始化后直接啟動_minThreads個工作者線程  |                   1. 查找閑置的工作者worker,找到則分配job。 2. 沒有閑置的工作者,將job存入待處理數組。 3. 當識別到數組中待處理job超過擴容閥值參數時,擴容增加工作者處理job 4. 否則不處理  |      
|           Jetty8  |                   1. 配置參數類似Jetty6,但去除了_spawnOrShrinkAt閥值參數。 2. 初始化后直接啟動_minThreads個工作者線程  |                   直接將待處理job入隊。  |      
|           Tomcat  |                   1. 基于JDK線程池的構造方法 2. 來請求時啟動工作者  |                   處理方法復用JDK的,但是在開始提交前擴展了JDK的功能,實現了可以統計提交數submittedCount的能力  |      
表 3
|           實現  |                   工作者增加算法  |                   工作者減少算法  |      
|           JDK  |                   1. 待處理job來時,工作者workers數量低于核心工作者數corePoolSize時。 2. 待處理job來時,workers數超過核心數小于最大工作者數且入待處理隊列失敗場景。 3. 業務調用線程池的更新核心工作者數接口時,若發現擴容,會增加工作者數。  |                   1. 待處理任務隊列里沒有job并且工作者workers數量超過了核心工作者數corePoolSize。 2. 待處理任務隊列里沒有job并且允許工作者數量小于核心工作者參數為true,此場景會至少保留一個工作者線程。  |      
|           Jetty6  |                   1. 啟動線程池時會啟動_minThreads個工作者線程 2. 待處理的job數量高于了閥值參數且工作者數沒有達到最大值時會增加工作者。 3. 調用線程池接口setMinThreads更新最小工作者數時會根據需要增加工作者。  |                   如下三個條件同時滿足時會減少工作者: 1. 待處理任務數組中沒有待處理job。 2. 工作者workers數量超過了最小工作者數_minThreads。 3. 閑置工作者線程數高于了閥值參數。  |      
|           Jetty8  |                   1. 啟動線程池時啟動最小工作者參數個工作者線程。 2. 已經沒有閑置工作者或者閑置工作者的數量已經小于待處理的job的總數。 3. 調用線程池接口setMinThreads更新最小工作者數時。  |                   如下三個條件同時滿足時會減少工作者: 1. 待處理任務隊列里沒有待處理的job。 2. 工作者workers總數超過了最小工作者參數配置_minThreads。 3. 工作者線程的閑置時間超時。  |      
|           Tomcat  |                   同JDK增加工作者算法  |                   復用JDK減少算法,同時定制擴展延遲參數,超過參數時,直接拋出異常到外面來終止線程池工作者。  |      
對比幾種線程池實現,JDK的實現是最為靈活、功能最強且擴展性最好的。Tomcat即基于JDK線程池功能擴展實現,復用原有業務的同時擴充了自己的業務。Jetty6是完全自己定制的線程池業務,耦合線程池眾多復雜的業務邏輯到線程池類里面,邏輯相對最為復雜,擴展性也非常差。Jetty8相對Jetty6的實現簡化了很多,其中利用了JDK中的同步容器和原子變量,同時實現方式也越來越接近JDK。
新聞熱點
疑難解答