CompletionService接口定義為Interface CompletionService<V>接口定它在java7中只有一個實現ExecutorCompletionService,這個接口內部集成了一個BlockingQueue,因此可以實現對多線程運行結果的收集工作。為了更好的測試該接口,我使用了兩個測試,第一個測試是自己定義一個外部BlockingQueue來接收callable返回的數據。第二個測試是用CompletionService對executor進行裝飾,使得返回的CompletionService對象能直接submit任務。
但是我發現它submit的后并沒有馬上調用executor的submit,而是對它進行了封裝,因此出現了一點點延遲。如果在submit之后使用shutdown()命令結束的話,實際上該task可能還沒有 放到executor的taskpool中。所以這一點值得注意。
import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingDeque;public class testCallable { public static void main(String[] args) { try { futureCount(); completionServiceCount(); } catch (InterruptedException e) { e.PRintStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } /** * 使用自定義阻塞隊列得到任務執行結果 * * @throws InterruptedException * @throws ExecutionException */ public static void futureCount() throws InterruptedException, ExecutionException { BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>(); ExecutorService executorService = Executors.newCachedThreadPool(); int threadNum = 5; for (int i = 0; i < threadNum; i++) { Future<Integer> future = executorService.submit(getTask()); queue.put(future); } int sum = 0; int temp = 0; while(!queue.isEmpty()){ temp = queue.take().get(); sum += temp; System.out.print(temp + "/t"); } System.out.println("BlockingQueue all is : " + sum); executorService.shutdown(); } /** * 使用completionService收集callable結果 * @throws ExecutionException * @throws InterruptedException */ public static void completionServiceCount() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newCachedThreadPool(); CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( executorService); int threadNum = 5; for (int i = 0; i < threadNum; i++) { completionService.submit(getTask()); } int sum = 0; int temp = 0; for(int i=0;i<threadNum;i++){ temp = completionService.take().get(); sum += temp; System.out.print(temp + "/t"); } System.out.println("CompletionService all is : " + sum); executorService.shutdown(); } public static Callable<Integer> getTask() { final Random rand = new Random(); Callable<Integer> task = new Callable<Integer>() { @Override public Integer call() throws Exception { int num = 0; for (int i = 0; i < 10; i++) { num = num + rand.nextInt(10); } return num; } }; return task; }}新聞熱點
疑難解答