newFixedThreadPoolnewFixedThreadPool內部有個任務隊列,假設線程池里有3個線程,提交了5個任務,那么后兩個任務就放在任務隊列了,即使前3個任務sleep或者堵塞了,也不會執行后兩個任務,除非前三個任務有執行完的
newFixedThreadPool使用范例:
java代碼
- importjava.io.IOException;
- importjava.util.concurrent.ExecutorService;
- importjava.util.concurrent.Executors;
- publicclassTest{
- publicstaticvoidmain(String[]args)throwsIOException,InterruptedException{
- ExecutorServiceservice=Executors.newFixedThreadPool(2);
- for(inti=0;i<6;i++){
- finalintindex=i;
- System.out.
- }
- };
- service.execute(run);
- }
- }
- }
輸出:task: 1task: 2thread start0task: 3task: 4task: 5task: 6task: 7thread start1task: 8task: 9task: 10task: 11task: 12task: 13task: 14task: 15 從實例可以看到for循環并沒有被固定的線程池阻塞住,也就是說所有的線程task都被提交到了ExecutorService中,查看Executors.newFixedThreadPool()如下:
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());} 可以看到task被提交都了LinkedBlockingQueue中。這里有個問題,如果任務列表很大,一定會把內存撐爆,如何解決?看下面:
Java代碼
- importjava.io.IOException;
- importjava.util.concurrent.ArrayBlockingQueue;
- importjava.util.concurrent.BlockingQueue;
- importjava.util.concurrent.ThreadPoolExecutor;
- importjava.util.concurrent.TimeUnit;
- publicclassTest{
- publicstaticvoidmain(String[]args)throwsIOException,InterruptedException{
- BlockingQueue<Runnable>queue=newArrayBlockingQueue<Runnable>(3);
- ThreadPoolExecutorexecutor=newThreadPoolExecutor(3,3,1,TimeUnit.HOURS,queue,newThreadPoolExecutor.CallerRunsPolicy());
- for(inti=0;i<10;i++){
- finalintindex=i;
- System.out.println("task:"+(index+1));
- Runnablerun=newRunnable(){
- @Override
- publicvoidrun(){
- System.out.println("threadstart"+(index+1));
- try{
- Thread.sleep(Long.MAX_VALUE);
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }
- System.out.println("threadend"+(index+1));
- }
- };
- executor.execute(run);
- }
- }
- }
輸出:task: 1task: 2thread start1task: 3task: 4task: 5task: 6task: 7thread start2thread start7thread start6 線程池最大值為4(??這里我不明白為什么是設置值+1,即3+1,而不是3),準備執行的任務隊列為3。可以看到for循環先處理4個task,然后把3個放到隊列。這樣就實現了自動阻塞隊列的效果。記得要使用ArrayBlockingQueue這個隊列,然后設置容量就OK了。
一、簡介線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構造方法為:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueueworkQueue,RejectedExecutionHandler handler)corePoolSize: 線程池維護線程的最少數量maximumPoolSize:線程池維護線程的最大數量keepAliveTime: 線程池維護線程所允許的空閑時間unit: 線程池維護線程所允許的空閑時間的單位workQueue: 線程池所使用的緩沖隊列handler: 線程池對拒絕任務的處理策略一個任務通過 execute(Runnable)方法被添加到線程池,任務就是一個 Runnable類型的對象,任務的執行方法就是 Runnable類型對象的run()方法。當一個任務通過execute(Runnable)方法欲添加到線程池時:如果此時線程池中的數量小于corePoolSize,即使線程池中的線程都處于空閑狀態,也要創建新的線程來處理被添加的任務。如果此時線程池中的數量等于 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量小于maximumPoolSize,建新的線程來處理被添加的任務。如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量等于maximumPoolSize,那么通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。當線程池中的線程數量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。unit可選的參數為java.util.concurrent.TimeUnit中的幾個靜態屬性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。workQueue我常用的是:java.util.concurrent.ArrayBlockingQueuehandler有四個選擇:ThreadPoolExecutor.AbortPolicy()拋出java.util.concurrent.RejectedExecutionException異常ThreadPoolExecutor.CallerRunsPolicy()重試添加當前的任務,他會自動重復調用execute()方法ThreadPoolExecutor.DiscardOldestPolicy()拋棄舊的任務ThreadPoolExecutor.DiscardPolicy()拋棄當前的任務二、一般用法舉例
點擊(此處)折疊或打開
- packagedemo;
- importjava.io.Serializable;
- importjava.util.concurrent.ArrayBlockingQueue;
- importjava.util.concurrent.ThreadPoolExecutor;
- importjava.util.concurrent.TimeUnit;
- publicclassTestThreadPool2
- {
- privatestaticintproduceTaskSleepTime=2;
- privatestaticintproduceTaskMaxNumber=10;
- publicstaticvoidmain(String[]args)
- {
- // 構造一個線程池
- ThreadPoolExecutorthreadPool=newThreadPoolExecutor(2,4,3,TimeUnit.SECONDS,newArrayBlockingQueue<Runnable>(3),
- newThreadPoolExecutor.DiscardOldestPolicy());
- for(inti=1;i<=produceTaskMaxNumber;i++)
- {
- try
- {
- // 產生一個任務,并將其加入到線程池
- Stringtask="task@ "+i;
- System.out.println("put "+task);
- threadPool.execute(newThreadPoolTask(task));
- // 便于觀察,等待一段時間
- Thread.sleep(produceTaskSleepTime);
- }
- catch(Exceptione)
- {
- e.printStackTrace();
- }
- }
- }
- }
- /**
- * 線程池執行的任務
- */
- classThreadPoolTaskimplementsRunnable,Serializable
- {
- privatestaticfinallongserialVersionUID=0;
- privatestaticintconsumeTaskSleepTime=2000;
- // 保存任務所需要的數據
- privateObjectthreadPoolTaskData;
- ThreadPoolTask(Objecttasks)
- {
- this.threadPoolTaskData=tasks;
- }
- publicvoidrun()
- {
- // 處理一個任務,這里的處理方式太簡單了,僅僅是一個打印語句
- System.out.println(Thread.currentThread().getName());
- System.out.println("start .."+threadPoolTaskData);
- try
- {
- // //便于觀察,等待一段時間
- Thread.sleep(consumeTaskSleepTime);
- }
- catch(Exceptione)
- {
- e.printStackTrace();
- }
- threadPoolTaskData=null;
- }
- publicObjectgetTask()
- {
- returnthis.threadPoolTaskData;
- }
- }
說明:1、在這段程序中,一個任務就是一個Runnable類型的對象,也就是一個ThreadPoolTask類型的對象。2、一般來說任務除了處理方式外,還需要處理的數據,處理的數據通過構造方法傳給任務。3、在這段程序中,main()方法相當于一個殘忍的領導,他派發出許多任務,丟給一個叫 threadPool的任勞任怨的小組來做。這個小組里面隊員至少有兩個,如果他們兩個忙不過來,任務就被放到任務列表里面。如果積壓的任務過多,多到任務列表都裝不下(超過3個)的時候,就雇傭新的隊員來幫忙。但是基于成本的考慮,不能雇傭太多的隊員,至多只能雇傭 4個。如果四個隊員都在忙時,再有新的任務,這個小組就處理不了了,任務就會被通過一種策略來處理,我們的處理方式是不停的派發,直到接受這個任務為止(更殘忍!呵呵)。因為隊員工作是需要成本的,如果工作很閑,閑到 3SECONDS都沒有新的任務了,那么有的隊員就會被解雇了,但是,為了小組的正常運轉,即使工作再閑,小組的隊員也不能少于兩個。4、通過調整 produceTaskSleepTime和 consumeTaskSleepTime的大小來實現對派發任務和處理任務的速度的控制,改變這兩個值就可以觀察不同速率下程序的工作情況。5、通過調整4中所指的數據,再加上調整任務丟棄策略,換上其他三種策略,就可以看出不同策略下的不同處理方式。6、對于其他的使用方法,參看jdk的幫助,很容易理解和使用。另一個例子:
點擊(此處)折疊或打開
- packagedemo;
- importjava.util.Queue;
- importjava.util.concurrent.ArrayBlockingQueue;
- importjava.util.concurrent.ThreadPoolExecutor;
- importjava.util.concurrent.TimeUnit;
- publicclassThreadPoolExecutorTest
- {
- privatestaticintqueueDeep=4;
- publicvoidcreateThreadPool()
- {
- /*
- * 創建線程池,最小線程數為2,最大線程數為4,線程池維護線程的空閑時間為3秒,
- * 使用隊列深度為4的有界隊列,如果執行程序尚未關閉,則位于工作隊列頭部的任務將被刪除,
- * 然后重試執行程序(如果再次失敗,則重復此過程),里面已經根據隊列深度對任務加載進行了控制。
- */
- ThreadPoolExecutortpe=newThreadPoolExecutor(2,4,3,TimeUnit.SECONDS,newArrayBlockingQueue<Runnable>(queueDeep),
- newThreadPoolExecutor.DiscardOldestPolicy());
- // 向線程池中添加 10 個任務
- for(inti=0;i<10;i++)
- {
- try
- {
- Thread.sleep(1);
- }
- catch(InterruptedExceptione)
- {
- e.printStackTrace();
- }
- while(getQueueSize(tpe.getQueue())>=queueDeep)
- {
- System.out.println("隊列已滿,等3秒再添加任務");
- try
- {
- Thread.sleep(3000);
- }
- catch(InterruptedExceptione)
- {
- e.printStackTrace();
- }
- }
- TaskThreadPool ttp=newTaskThreadPool(i);
- System.out.println("put i:"+i);
- tpe.execute(ttp);
- }
- tpe.shutdown();
- }
- privatesynchronizedintgetQueueSize(Queuequeue)
- {
- returnqueue.size();
- }
- publicstaticvoidmain(String[]args)
- {
- ThreadPoolExecutorTest test=newThreadPoolExecutorTest();
- test.createThreadPool();
- }
- classTaskThreadPoolimplementsRunnable
- {
- privateintindex;
- publicTaskThreadPool(intindex)
- {
- this.index=index;
- }
- publicvoidrun()
- {
- System.out.println(Thread.currentThread()+" index:"+index);
- try
- {
- Thread.sleep(3000);
- }
- catch(InterruptedExceptione)
- {
- e.printStackTrace();
- }
- }
- }
- }