国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

ThreadPoolExecutor參數解析

2019-11-14 21:16:50
字體:
來源:轉載
供稿:網友
ThreadPoolExecutor參數解析

ThreadPoolExecutor是一個非常重要的類,用來構建帶有線程池的任務執行器,通過配置不同的參數來構造具有不同規格線程池的任務執行器

寫在前面的是:

線程池任務執行器,線程池的定義比較直接,可以看做多個線程的集合。而任務執行器的概念比較的具有針對性,它用來執行任務,通過對線程池的管理實現多任務的并發,是線程池的載體。

線程和任務的區別,線程不是任務,線程是用來執行任務的。

隊列是用來存放任務的,不是用來存放線程的。

(37JUW4~P@]KZ]FYI@8@8`B

主要的幾個參數解析:

  • 核心線程數(core pool sizes)和最大線程數(maxmum pool sizes)

一開始兩者的存在很讓人摸不著頭腦,簡單的想法是用一個線程數(pool size)表示線程池的大小不就完了嗎,不到規定的線程數就創建新的線程來執行新的任務,到了規定的線程數就等待其他線程處理完成,怎么還出現兩個控制線程數的參數?

那這兩個參數是什么意思干什么用的?

核心線程數:這個數與上面那個簡單想法中的數有一個共同點,就是如果當前線程數達不到核心線程數時,不會使用已有的空閑的線程(如果有的話),來了新任務就會創建新的線程。

如果當前線程數達到核心線程數,而且沒有空閑線程,那么來了新任務是否要創建新的線程呢?這取決于兩點:

  1. 當前的任務隊列是否已滿。
  2. 線程池的最大線程數。

通過這個問題可以引出最大線程數的概念

最大線程數 : 最大線程數是和任務隊列匹配使用的,確切的說是和有長度限制的任務隊列(即有界任務隊列)匹配使用的。

補充回答上面的問題,ThreadPoolExecutor的線程池擁有一個任務隊列,這個任務隊列只有在當前線程數>核心線程數的時候才開始使用,如果該線程池使用的任務隊列是有界隊列,比如10,那么當該隊列被新任務填滿時也就是說隊列中有10個新任務時ThreadPoolExecutor才會創建一個新的線程來執行隊列中的一個任務,如果再發生隊列被填滿,而且依舊沒有空閑線程時ThreadPoolExecutor再次創建新的線程,一旦線程的數量等于最大線程數就不再創建新的線程了,如果此時隊列中還有10個任務,那么新來的任務就會被拒絕(reject)。

上述是針對有界隊列,如果這個任務執行器的隊列是無界隊列呢?

由于無界隊列不會被填滿,所以永遠不能達到創建新線程所需要的條件,所以也就不會有新線程被創建,所以最大線程數在這種情況下也就失去了其存在的意義。

  • 線程空閑存活時間(keepAliveTime)

在介紹上面的核心線程數和最大線程數時有提到空閑的線程,所謂空閑的線程就是執行完任務之后閑著的線程。

超過這個時間會使得那么核心線程之外的空閑線程被殺死,如果想把這個時間也作用在核心線程上需要設置allowCoreThreadTimeOut(boolean)為true

這里有必要說一下的是,任務執行器如何實現線程的重復利用,當任務執行器執行execute(task)的時候會創建一個worker,它是一個Runnable類,可以看做task的載體,worker包含一個thread對象,這個thread啟動的時候執行worker本身的run方法,這樣worker和線程就融為一體。當worker的thread start的時候,就會執行worker的run方法,而worker的run會調用任務執行器的runWorker(worker),并將自身傳遞過去,意思是任務執行器啟動了一個worker,而線程重復利用關鍵就在runWorker中,在啟動了一個worker后,worker會從任務執行器中尋找可以運行的任務,而一開始創建worker使用的task就是它的第一個任務。

下面是jdk1.7的源碼

//執行一個任務 taskpublic void execute(Runnable command) {         if (command == null)             throw new NullPointerException();         int c = ctl.get();         if (workerCountOf(c) < corePoolSize) {             if (addWorker(command, true)) // 將task裝配到一個worker中                 return;             c = ctl.get();         }         if (isRunning(c) && workQueue.offer(command)) {             int recheck = ctl.get();             if (! isRunning(recheck) && remove(command))                 reject(command);             else if (workerCountOf(recheck) == 0)                 addWorker(null, false);         }         else if (!addWorker(command, false))             reject(command);     }

 

添加一個worker 

PRivate boolean addWorker(Runnable firstTask, boolean core) {         retry:         for (;;) {             int c = ctl.get();             int rs = runStateOf(c);            // Check if queue empty only if necessary.             if (rs >= SHUTDOWN &&                 ! (rs == SHUTDOWN &&                    firstTask == null &&                    ! workQueue.isEmpty()))                 return false;            for (;;) {                 int wc = workerCountOf(c);                 if (wc >= CAPACITY ||                     wc >= (core ? corePoolSize : maximumPoolSize))                     return false;                 if (compareAndIncrementWorkerCount(c))                     break retry;                 c = ctl.get();  // Re-read ctl                 if (runStateOf(c) != rs)                     continue retry;                 // else CAS failed due to workerCount change; retry inner loop             }         }        boolean workerStarted = false;         boolean workerAdded = false;         Worker w = null;         try {             final ReentrantLock mainLock = this.mainLock;             w = new Worker(firstTask);             final Thread t = w.thread;             if (t != null) {                 mainLock.lock();                 try {                     // Recheck while holding lock.                     // Back out on ThreadFactory failure or if                     // shut down before lock acquired.                     int c = ctl.get();                     int rs = runStateOf(c);                    if (rs < SHUTDOWN ||                         (rs == SHUTDOWN && firstTask == null)) {                         if (t.isAlive()) // precheck that t is startable                             throw new IllegalThreadStateException();                         workers.add(w);                         int s = workers.size();                         if (s > largestPoolSize)                             largestPoolSize = s;                         workerAdded = true;                     }                 } finally {                     mainLock.unlock();                 }                 if (workerAdded) {//如果worker創建成功,就啟動它的對應的thread                     t.start(); //worker中的tread啟動                     workerStarted = true;                 }             }         } finally {             if (! workerStarted)                 addWorkerFailed(w);         }         return workerStarted;     }
運行worker
final void runWorker(Worker w) {         Thread wt = Thread.currentThread();         Runnable task = w.firstTask;         w.firstTask = null;         w.unlock(); // allow interrupts         boolean completedAbruptly = true;         try {             while (task != null || (task = getTask()) != null) {//這里是關鍵,使用一個while來尋找任務執行器中(主要還是從任務隊列中獲取)還未執行的task。                 w.lock();                 // If pool is stopping, ensure thread is interrupted;                 // if not, ensure thread is not interrupted.  This                 // requires a recheck in second case to deal with                 // shutdownNow race while clearing interrupt                 if ((runStateAtLeast(ctl.get(), STOP) ||                      (Thread.interrupted() &&                       runStateAtLeast(ctl.get(), STOP))) &&                     !wt.isInterrupted())                     wt.interrupt();                 try {                     beforeExecute(wt, task);                     Throwable thrown = null;                     try {                         task.run();                     } catch (RuntimeException x) {                         thrown = x; throw x;                     } catch (Error x) {                         thrown = x; throw x;                     } catch (Throwable x) {                         thrown = x; throw new Error(x);                     } finally {                         afterExecute(task, thrown);                     }                 } finally {                     task = null;                     w.completedTasks++;                     w.unlock();                 }             }             completedAbruptly = false;         } finally {             processWorkerExit(w, completedAbruptly);         }     }

  

 


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 新津县| 湘西| 濮阳县| 筠连县| 永嘉县| 宁陵县| 乐清市| 阳信县| 五大连池市| 灵宝市| 钟祥市| 泽库县| 怀仁县| 青冈县| 苗栗市| 沅江市| 宁安市| 荃湾区| 宁海县| 邵武市| 文登市| 海原县| 余姚市| 天长市| 太湖县| 昔阳县| 昌吉市| 太原市| 桑日县| 泰来县| 台湾省| 宜昌市| 莱芜市| 元阳县| 余干县| 浦北县| 江安县| 内乡县| 沧州市| 酒泉市| 基隆市|