線程池的源碼及原理[JDK1.6實現]
1.線程池的包含的內容
2.線程池的數據結構【核心類ThreadPoolExecutor】:
worker:工作類,一個worker代表啟動了一個線程,它啟動后會循環執行workQueue里面的所有任務workQueue:任務隊列,用于存放待執行的任務keepAliveTime:線程活動保持時間,線程池的工作線程空閑后,保持存活的時間。線程池原理:預先啟動一些線程,線程無限循環從任務隊列中獲取一個任務進行執行,直到線程池被關閉。如果某個線程因為執行某個任務發生異常而終止,那么重新創建一個新的線程而已。如此反復。3.線程池任務submit及執行流程
a.一個任務提交,如果線程池大小沒達到corePoolSize,則每次都啟動一個worker也就是一個線程來立即執行b.如果來不及執行,則把多余的線程放到workQueue,等待已啟動的worker來循環執行c.如果隊列workQueue都放滿了還沒有執行,則在maximumPoolSize下面啟動新的worker來循環執行workQueued.如果啟動到maximumPoolSize還有任務進來,線程池已達到滿負載,此時就執行任務拒絕RejectedExecutionHandlerjava Code 線程池核心的代碼| 123456789101112131415161718192021222324 | //流程就是:沒達到corePoolSize,創建worker執行,達到corePoolSize加入workQueue//workQueue滿了且在maximumPoolSize下,創建新worker,達到maximumPoolSize,執行rejectpublicvoidexecute(Runnablecommand){if(command==null)thrownewNullPointerException();//1:poolSize達到corePoolSize,執行3把任務加入workQueue//2:poolSize沒達到,執行addIfUnderCorePoolSize()在corePoolSize內創建新worker立即執行任務//如果達到corePoolSize,則同上執行3if(poolSize>=corePoolSize||!addIfUnderCorePoolSize(command)){//3:workQueue滿了,執行5if(runState==RUNNING&&workQueue.offer(command)){if(runState!=RUNNING||poolSize==0){//4:如果線程池關閉,執行拒絕策略//如果poolSize==0,新啟動一個線程執行隊列內任務ensureQueuedTaskHandled(command);}//5:在maximumPoolSize內創建新worker立即執行任務//如果達到maximumPoolSize,執行6拒絕策略}elseif(!addIfUnderMaximumPoolSize(command))//6:拒絕策略reject(command);//isshutdownorsaturated}} |
| 1234567891011121314 | publicvoidrun(){try{Runnabletask=firstTask;firstTask=null;//getTask()是從workQueue里面阻塞獲取任務,如果getTask()返回null則終結本線程while(task!=null||(task=getTask())!=null){runTask(task);task=null;}}finally{//走到這里代表這個worker或者說這個線程由于線程池關閉或超過aliveTime需要關閉了workerDone(this);}} |
| 123456789101112131415161718192021222324252627282930 | RunnablegetTask(){for(;;){try{intstate=runState;if(state>SHUTDOWN)returnnull;Runnabler;if(state==SHUTDOWN)//Helpdrainqueuer=workQueue.poll();elseif(poolSize>corePoolSize||allowCoreThreadTimeOut)//在poolSize大于corePoolSize或允許核心線程超時時//阻塞超時獲取有可能獲取到null,此時worker線程銷毀r=workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);elser=workQueue.take();if(r!=null)returnr;//這里是是否運行worker線程銷毀的判斷if(workerCanExit()){if(runState>=SHUTDOWN)//STOP或TERMINATED狀態,終止空閑workerinterruptIdleWorkers();returnnull;// 這里返回null,代表工作線程worker銷毀}//其他:retry,繼續循環}catch(InterruptedExceptionie){//Oninterruption,re-checkrunState}}} |
taskCount:線程池需要執行的任務數量。completedTaskCount:線程池在運行過程中已完成的任務數量。小于或等于taskCount。largestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經滿了。getPoolSize:線程池的線程數量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不+ getActiveCount:獲取活動的線程數。通過擴展線程池進行監控。通過繼承線程池并重寫線程池的beforeExecute,afterExecute和terminated方法,我們可以在任務執行前,執行后和線程池關閉前干一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池里是空方法。8.線程池調優[更多可參考:線程池與工作隊列]調整線程池的大小 - 線程池的最佳大小取決于可用處理器的數目以及工作隊列中的任務的性質。調整線程池的大小基本上就是避免兩類錯誤:線程太少或線程太多。a.CPU限制的任務,提高CPU利用率。 在運行于具有 N 個處理器機器上的計算限制的應用程序中,在線程數目接近 N 時添加額外的線程可能會改善總處理能力,而在線程數目超過 N 時添加額外的線程將不起作用。事實上,太多的線程甚至會降低性能,因為它會導致額外的環境切換開銷。 若在一個具有 N 個處理器的系統上只有一個工作隊列,其中全部是計算性質的任務,在線程池具有N 或 N+1個線程時一般會獲得最大的 CPU 利用率。b.I/O限制的任務(例如,從套接字讀取 HTTP 請求的任務) 需要讓池的大小超過可用處理器的數目,因為并不是所有線程都一直在工作。通過使用概要分析,您可以或得一些數據,并計算出大概的線程池大小。 Amdahl 法則提供很好的近似公式。用 WT 表示每項任務的平均等待時間,ST 表示每項任務的平均服務時間(計算時間)。則 WT/ST 是每項任務等待所用時間的百分比。對于 N 處理器系統,池中可以近似有N*(1+WT/ST) 個線程。c.綜合考慮線程池性能瓶頸 a.處理器利用率 b.隨著線程池的增長,您可能會碰到調度程序、可用內存方面的限制,或者其它系統資源方面的限制,例如套接字、打開的文件句柄或數據庫連接等的數目。9.線程池擴展 - 延時線程池ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor是在普通線程池的基礎上增加了兩個功能,一是延時執行+定時執行,二是重復執行 定時Executor的流程在大體上與普通線程池一致,因此它繼承于ThreadPoolExecutor,對于問題1,它采用了DelayedQueue來實現此功能。對于問題2,定時Executor每次執行完調用ThreadPoolExecutor.runAndReset()重置狀態,然后重新把任務加入到Delayed隊列中 定時Executor在外部Runnable的基礎上套了一個ScheduledFutureTask,其核心源碼如下:Java Code 普通任務的外部封裝Future
| 1234567891011121314151617181920212223242526272829303132 | //加入的任務外部封裝了ScheduledFutureTask,繼承于FutureTask,因此也可以獲取任務結果PRivateclassScheduledFutureTask<V>extendsFutureTask<V>implementsRunnableScheduledFuture<V>{//省略部分代碼//周期性運行,執行完成就把任務加入到delay隊列中privatevoidrunPeriodic(){//這里重置線程池狀態booleanok=ScheduledFutureTask.super.runAndReset();booleandown=isShutdown();//Rescheduleifnotcancelledandnotshutdownorpolicyallowsif(ok&&(!down||(getContinueExistingPeriodicTasksAfterShutdownPolicy()&&!isTerminating()))){longp=period;if(p>0)time+=p;elsetime=now()-p;//重復把任務加入到線程池delay隊列中ScheduledThreadPoolExecutor.super.getQueue().add(this);}elseif(down)interruptIdleWorkers();}//線程池調用的run方法publicvoidrun(){if(isPeriodic())runPeriodic();elseScheduledFutureTask.super.run();}} |
| 1234567891011121314151617181920212223242526272829 | publicclassExecutorCompletionService<V>implementsCompletionService<V>{//部分代碼省略//外部Future的封裝類privateclassQueueingFutureextendsFutureTask<Void>{QueueingFuture(RunnableFuture<V>task){super(task,null);this.task=task;}//這里把Future加入到completionQueueprotectedvoiddone(){completionQueue.add(task);}privatefinalFuture<V>task;}publicFuture<V>submit(Callable<V>task){if(task==null)thrownewNullPointerException();RunnableFuture<V>f=newTaskFor(task);//對f外層又包了一層QueueingFutureexecutor.execute(newQueueingFuture(f));returnf;}//外部則可通過completionQueue來獲取已完成的任務FuturepublicFuture<V>take()throwsInterruptedException{returncompletionQueue.take();}} |
新聞熱點
疑難解答