Future接口是Java標準API的一部分,在java.util.concurrent包中。Future接口是Java線程Future模式的實現,可以來進行異步計算。
有了Future就可以進行三段式的編程了,1.啟動多線程任務2.處理其他事3.收集多線程任務結果。從而實現了非阻塞的任務調用。在途中遇到一個問題,那就是雖然能異步獲取結果,但是Future的結果需要通過isdone來判斷是否有結果,或者使用get()函數來阻塞式獲取執行結果。這樣就不能實時跟蹤其他線程的結果狀態了,所以直接使用get還是要慎用,最好配合isdone來使用。
這里有一種更好的方式來實現對任意一個線程運行完成后的結果都能及時獲取的辦法:使用CompletionService,它內部添加了阻塞隊列,從而獲取future中的值,然后根據返回值做對應的處理。一般future使用和CompletionService使用的兩個測試案例如下:
import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;/** * 多線程執行,異步獲取結果 * * @author i-clarechen * */public class AsyncThread { public static void main(String[] args) { AsyncThread t = new AsyncThread(); List<Future<String>> futureList = new ArrayList<Future<String>>(); t.generate(3, futureList); t.doOtherThings(); t.getResult(futureList); } /** * 生成指定數量的線程,都放入future數組 * * @param threadNum * @param fList */ public void generate(int threadNum, List<Future<String>> fList) { ExecutorService service = Executors.newFixedThreadPool(threadNum); for (int i = 0; i < threadNum; i++) { Future<String> f = service.submit(getJob(i)); fList.add(f); } service.shutdown(); } /** * other things */ public void doOtherThings() { try { for (int i = 0; i < 3; i++) { System.out.運行結果打印和future放入列表時的順序一致,為0,1,2:
do thing no:0do thing no:1do thing no:2Future:java.util.concurrent.FutureTask@68e1ca74,Result:thread-0Future:java.util.concurrent.FutureTask@3fb2bb77,Result:thread-1Future:java.util.concurrent.FutureTask@6f31a24c,Result:thread-2
下面是先執行完的線程先處理的方案:
import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingDeque;public class testCallable { public static void main(String[] args) { try { completionServiceCount(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } /** * 使用completionService收集callable結果 * @throws ExecutionException * @throws InterruptedException */ public static void completionServiceCount() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newCachedThreadPool(); CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( executorService); int threadNum = 5; for (int i = 0; i < threadNum; i++) { completionService.submit(getTask(i)); } int sum = 0; int temp = 0; for(int i=0;i<threadNum;i++){ temp = completionService.take().get(); sum += temp; System.out.print(temp + "/t"); } System.out.println("CompletionService all is : " + sum); executorService.shutdown(); } public static Callable<Integer> getTask(final int no) { final Random rand = new Random(); Callable<Integer> task = new Callable<Integer>() { @Override public Integer call() throws Exception { int time = rand.nextInt(100)*100; System.out.println("thead:"+no+" time is:"+time); Thread.sleep(time); return no; } }; return task; }}運行結果為最先結束的線程結果先被處理:
thead:0 time is:4200thead:1 time is:6900thead:2 time is:2900thead:3 time is:9000thead:4 time is:71002 0 1 4 3 CompletionService all is : 10
新聞熱點
疑難解答