ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue, RejectedExecutionHandler handler) corePoolSize: 線程池維護線程的最少數量 maximumPoolSize:線程池維護線程的最大數量 keepAliveTime: 線程池維護線程所允許的空閑時間 unit: 線程池維護線程所允許的空閑時間的單位 (unit可選的參數為java.util.concurrent.TimeUnit中的幾個靜態屬性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS
MICROSECONDS 微秒 一百萬分之一秒(就是毫秒/1000)MILLISECONDS 毫秒 千分之一秒 NANOSECONDS 毫微秒 十億分之一秒(就是微秒/1000)SECONDS 秒
)
workQueue: 線程池所使用的緩沖隊列 (workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue )handler: 線程池對拒絕任務的處理策略 (
handler有四個選擇: ThreadPoolExecutor.AbortPolicy() 拋出java.util.concurrent.RejectedExecutionException異常 ThreadPoolExecutor.CallerRunsPolicy() 重試添加當前的任務,他會自動重復調用execute()方法 ThreadPoolExecutor.DiscardOldestPolicy() 拋棄舊的任務 ThreadPoolExecutor.DiscardPolicy() 拋棄當前的任務
)
public class ThreadPool2 {PRivate static int produceTaskSleepTime = 2;private static int produceTaskMaxNumber = 10;public static void main(String[] args) {// 構造一個線程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 10, 3, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardOldestPolicy());for (int i = 1; i <= produceTaskMaxNumber; i++) {try {// 產生一個任務,并將其加入到線程池String task = "task@ " + i;//System.out.println("put " + task);threadPool.execute(new ThreadPoolTask(task));// 便于觀察,等待一段時間Thread.sleep(produceTaskSleepTime);} catch (Exception e) {e.printStackTrace();}}}}/** * 線程池執行的任務 */class ThreadPoolTask implements Runnable, Serializable {private static final long serialVersionUID = 0; //序列化private static int consumeTaskSleepTime = 2000; //睡眠時間// 保存任務所需要的數據private Object threadPoolTaskData;ThreadPoolTask(Object tasks) {this.threadPoolTaskData = tasks;}public void run() {// 處理一個任務,這里的處理方式太簡單了,僅僅是一個打印語句System.out.println(Thread.currentThread().getName()+"--> "+threadPoolTaskData);//System.out.println("start .." + threadPoolTaskData);try {// //便于觀察,等待一段時間Thread.sleep(consumeTaskSleepTime);} catch (Exception e) {e.printStackTrace();}threadPoolTaskData = null;}}
說明: 1、在這段程序中,一個任務就是一個Runnable類型的對象,也就是一個ThreadPoolTask類型的對象。 2、一般來說任務除了處理方式外,還需要處理的數據,處理的數據通過構造方法傳給任務。 3、在這段程序中,main()方法相當于一個殘忍的領導,他派發出許多任務,丟給一個叫 threadPool的任勞任怨的小組來做。 這個小組里面隊員至少有兩個,如果他們兩個忙不過來,任務就被放到任務列表里面。 如果積壓的任務過多,多到任務列表都裝不下(超過3個)的時候,就雇傭新的隊員來幫忙。但是基于成本的考慮,不能雇傭太多的隊員,至多只能雇傭 4個。 如果四個隊員都在忙時,再有新的任務,這個小組就處理不了了,任務就會被通過一種策略來處理,我們的處理方式是不停的派發,直到接受這個任務為止(更殘忍!呵呵)。 因為隊員工作是需要成本的,如果工作很閑,閑到 3SECONDS都沒有新的任務了,那么有的隊員就會被解雇了,但是,為了小組的正常運轉,即使工作再閑,小組的隊員也不能少于兩個。 4、通過調整 produceTaskSleepTime和 consumeTaskSleepTime的大小來實現對派發任務和處理任務的速度的控制,改變這兩個值就可以觀察不同速率下程序的工作情況。 5、通過調整4中所指的數據,再加上調整任務丟棄策略,換上其他三種策略,就可以看出不同策略下的不同處理方式。 6、對于其他的使用方法,參看jdk的幫助,很容易理解和使用。
public class ThreadPoolExecutorTest {private static int queueDeep = 4;public void createThreadPool() {/** 創建線程池,最小線程數為2,最大線程數為4,線程池維護線程的空閑時間為3秒,* 使用隊列深度為4的有界隊列,如果執行程序尚未關閉,則位于工作隊列頭部的任務將被刪除,* 然后重試執行程序(如果再次失敗,則重復此過程),里面已經根據隊列深度對任務加載進行了控制。*/ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueDeep), new ThreadPoolExecutor.DiscardOldestPolicy());// 向線程池中添加 10 個任務for (int i = 0; i < 10; i++) {try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}while (getQueueSize(tpe.getQueue()) >= queueDeep) {System.out.println("隊列已滿,等3秒再添加任務");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("put i:" + i);tpe.execute(new TaskThreadPool(i));}tpe.shutdown();}private synchronized int getQueueSize(Queue queue) {return queue.size();}class TaskThreadPool implements Runnable {private int index;public TaskThreadPool(int index) {this.index = index;}public void run() {System.out.println(Thread.currentThread() + " :" + index);try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();test.createThreadPool();}}
新聞熱點
疑難解答