ThreadPoolExecutor是一個非常重要的類,用來構建帶有線程池的任務執行器,通過配置不同的參數來構造具有不同規格線程池的任務執行器。
寫在前面的是:
線程池和任務執行器,線程池的定義比較直接,可以看做多個線程的集合。而任務執行器的概念比較的具有針對性,它用來執行任務,通過對線程池的管理實現多任務的并發,是線程池的載體。
線程和任務的區別,線程不是任務,線程是用來執行任務的。
隊列是用來存放任務的,不是用來存放線程的。
![(37JUW4~P@]KZ]FYI@8@8`B (37JUW4~P@]KZ]FYI@8@8`B](http://s1.VeVb.com/20150728/qdzyot5v3qb47.jpg)
主要的幾個參數解析:
一開始兩者的存在很讓人摸不著頭腦,簡單的想法是用一個線程數(pool size)表示線程池的大小不就完了嗎,不到規定的線程數就創建新的線程來執行新的任務,到了規定的線程數就等待其他線程處理完成,怎么還出現兩個控制線程數的參數?
那這兩個參數是什么意思干什么用的?
核心線程數:這個數與上面那個簡單想法中的數有一個共同點,就是如果當前線程數達不到核心線程數時,不會使用已有的空閑的線程(如果有的話),來了新任務就會創建新的線程。
如果當前線程數達到核心線程數,而且沒有空閑線程,那么來了新任務是否要創建新的線程呢?這取決于兩點:
- 當前的任務隊列是否已滿。
- 線程池的最大線程數。
通過這個問題可以引出最大線程數的概念
最大線程數 : 最大線程數是和任務隊列匹配使用的,確切的說是和有長度限制的任務隊列(即有界任務隊列)匹配使用的。
補充回答上面的問題,ThreadPoolExecutor的線程池擁有一個任務隊列,這個任務隊列只有在當前線程數>核心線程數的時候才開始使用,如果該線程池使用的任務隊列是有界隊列,比如10,那么當該隊列被新任務填滿時也就是說隊列中有10個新任務時ThreadPoolExecutor才會創建一個新的線程來執行隊列中的一個任務,如果再發生隊列被填滿,而且依舊沒有空閑線程時ThreadPoolExecutor再次創建新的線程,一旦線程的數量等于最大線程數就不再創建新的線程了,如果此時隊列中還有10個任務,那么新來的任務就會被拒絕(reject)。
上述是針對有界隊列,如果這個任務執行器的隊列是無界隊列呢?
由于無界隊列不會被填滿,所以永遠不能達到創建新線程所需要的條件,所以也就不會有新線程被創建,所以最大線程數在這種情況下也就失去了其存在的意義。
在介紹上面的核心線程數和最大線程數時有提到空閑的線程,所謂空閑的線程就是執行完任務之后閑著的線程。
超過這個時間會使得那么核心線程之外的空閑線程被殺死,如果想把這個時間也作用在核心線程上需要設置
allowCoreThreadTimeOut(boolean)為true
這里有必要說一下的是,任務執行器如何實現線程的重復利用,當任務執行器執行execute(task)的時候會創建一個worker,它是一個Runnable類,可以看做task的載體,worker包含一個thread對象,這個thread啟動的時候執行worker本身的run方法,這樣worker和線程就融為一體。當worker的thread start的時候,就會執行worker的run方法,而worker的run會調用任務執行器的runWorker(worker),并將自身傳遞過去,意思是任務執行器啟動了一個worker,而線程重復利用關鍵就在runWorker中,在啟動了一個worker后,worker會從任務執行器中尋找可以運行的任務,而一開始創建worker使用的task就是它的第一個任務。
下面是jdk1.7的源碼
//執行一個任務 taskpublic void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) // 將task裝配到一個worker中 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
添加一個worker
PRivate boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {//如果worker創建成功,就啟動它的對應的thread t.start(); //worker中的tread啟動 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }運行workerfinal void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {//這里是關鍵,使用一個while來尋找任務執行器中(主要還是從任務隊列中獲取)還未執行的task。 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
新聞熱點
疑難解答