本文由 ImportNew - paddx 翻譯自 tobyhobson。歡迎加入翻譯小組。轉(zhuǎn)載請見文末要求。
java 8 并行流(parallel stream)采用共享線程池,對性能造成了嚴(yán)重影響。可以包裝流來調(diào)用自己的線程池解決性能問題。
問題
Java 8 的并行流可以讓我們相對輕松地執(zhí)行并行任務(wù)。
1 | myList.parallelStream.map(obj -> longRunningOperation()) |
但是這樣存在一個嚴(yán)重的問題:在 JVM 的后臺,使用通用的 fork/join 池來完成上述功能,該池是所有并行流共享的。默認(rèn)情況,fork/join 池會為每個處理器分配一個線程。假設(shè)你有一臺16核的機(jī)器,這樣你就只能創(chuàng)建16個線程。對 CPU 密集型的任務(wù)來說,這樣是有意義的,因?yàn)槟愕臋C(jī)器確實(shí)只能執(zhí)行16個線程。但是真實(shí)情況下,不是所有的任務(wù)都是 CPU 密集型的。例如:
1 2 3 4 5 6 7 8 9 | myList.parallelStream .map(this::retrieveFromA) .forEach(this::saveToC)myList.parallelStream .map(this::retrieveFromD) .map(this::processUsingE) .forEach(this::saveToD) |
這兩個流很大程度上是受限于IO操作,所以會等待其他系統(tǒng)。但這兩個流使用相同的(小)線程池,因此會相互等待而被阻塞。這個非常不好,可以改進(jìn)。我們以一個流為例:
1 2 3 4 5 6 7 | final List<Integer> firstRange = buildIntRange(); firstRange.parallelStream().forEach((number) -> { try { // do something slow Thread.sleep(5); } catch (InterruptedException e) { }}); |
完整的代碼可以在gist上查看。
在執(zhí)行期間,我獲取了一份線程dump的文件。這是相關(guān)的線程(在我的Macbook上):
1 2 3 4 | ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-4 |
現(xiàn)在,我要并行的執(zhí)行這兩個并行流(對于那些不是以英語為母語的人士,我感到非常抱歉!)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | Runnable firstTask = () -> { firstRange.parallelStream().forEach((number) -> { try { // do something slow Thread.sleep(5); } catch (InterruptedException e) { } });};Runnable secondTask = () -> { secondRange.parallelStream().forEach((number) -> { try { // do something slow Thread.sleep(5); } catch (InterruptedException e) { } });};// run threads |
完整的代碼可以在gist上查看。
這次我們再看一下線程dump文件:
1 2 3 4 | ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-4 |
正如你所見,結(jié)果是一樣的。我們只使用了4個線程。
一種變通方案
正如我所提到的,JVM 后臺使用 fork/join 池,在 ForkJoinTask 的文檔中,我們可以看到:
如果合適,安排一個異步執(zhí)行的任務(wù)到當(dāng)前正在運(yùn)行的池中。如果任務(wù)不在inForkJoinPool()中,也可以調(diào)用ForkJoinPool.commonPool()獲取新的池來執(zhí)行。
讓我試一試……
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | ForkJoinPool forkJoinPool = new ForkJoinPool(3); forkJoinPool.submit(() -> { firstRange.parallelStream().forEach((number) -> { try { Thread.sleep(5); } catch (InterruptedException e) { } });});ForkJoinPool forkJoinPool2 = new ForkJoinPool(3); forkJoinPool2.submit(() -> { secondRange.parallelStream().forEach((number) -> { try { Thread.sleep(5); } catch (InterruptedException e) { } });}); |
完整的代碼可以在gist上查看。
現(xiàn)在,我們再次查看線程池:
1 2 3 4 5 6 7 8 | ForkJoinPool-1-worker-1 ForkJoinPool-1-worker-2 ForkJoinPool-1-worker-3 ForkJoinPool-1-worker-4 ForkJoinPool-2-worker-1 ForkJoinPool-2-worker-2 ForkJoinPool-2-worker-3 ForkJoinPool-1-worker-4 |
因?yàn)槲覀儎?chuàng)建自己的線程池,所以可以避免共享線程池,如果有需要,甚至可以分配比處理機(jī)數(shù)量更多的線程。
1 | ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>); |
原文鏈接: tobyhobson 翻譯: ImportNew.com - paddx
譯文鏈接: http://www.importnew.com/16801.html
[ 轉(zhuǎn)載請保留原文出處、譯者和譯文鏈接。]
新聞熱點(diǎn)
疑難解答