国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

《java.util.concurrent 包源碼閱讀》26 Fork/Join框架之Join

2019-11-14 21:10:44
字體:
來源:轉載
供稿:網友
java.util.concurrent 包源碼閱讀》26 Fork/Join框架之Join

接下來看看調用ForkJoinTask的join方法都發生了什么:

    public final V join() {        // doJoin方法返回該任務的狀態,狀態值有三種:        // NORMAL, CANCELLED和EXCEPTIONAL        // join的等待過程在doJoin方法中進行        if (doJoin() != NORMAL)            // reportResult方法針對任務的三種狀態有三種處理方式:            // NORMAL: 直接返回getRawResult()方法的返回值            // CANCELLED: 拋出CancellationException            // EXCEPTIONAL: 如果任務執行過程拋出了異常,則拋出該異常,否則返回getRawResult()            return reportResult();        else            // getRawResult是抽象方法,由子類來實現            return getRawResult();    }

RecursiveAction和RecursiveTask實現了getRawResult方法。

RecursiveAction用于沒有返回值的場合,因此getRawResult方法返回null。

RecursiveTask用于有返回值的場合,因此返回的是抽象方法compute方法的返回值。

接下來繼續看join的核心方法doJoin方法:

    PRivate int doJoin() {        Thread t; ForkJoinWorkerThread w; int s; boolean completed;        // 針對ForkJoinWorkerThread調用join的情況        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {            // status值的初始化值是0,在任務沒有完成以前一直是非負值            // 因此一旦status的值變成負數,表示任務已經完成,直接返回            if ((s = status) < 0)                return s;            // 檢查當前worker線程的任務棧(因為采用LIFO方式,所有這里稱為棧)            // 的棧頂的任務是不是當前任務,如果是,從棧中取走該任務并執行            // 然后返回執行之后任務的狀態            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {                try {                    completed = exec();                } catch (Throwable rex) {                    return setExceptionalCompletion(rex);                }                if (completed)                    return setCompletion(NORMAL);            }            // 如果不是棧頂任務的情況            return w.joinTask(this);        }        else            // 外部線程等待任務結束的情況            return externalAwaitDone();    }

前面文章中曾經舉了幾個例子來演示如何實現RecursiveTask的子類。在compute方法中會看到了join方法的調用,也就是ForkJoinWorkerThread調用join的情況。

因此首先來看ForkJoinWorkerThread的joinTask方法的實現:

    final int joinTask(ForkJoinTask<?> joinMe) {        ForkJoinTask<?> prevJoin = currentJoin;        currentJoin = joinMe;        for (int s, retries = MAX_HELP;;) {            // 當前任務已經完成,返回到前面一個join的任務            if ((s = joinMe.status) < 0) {                currentJoin = prevJoin;                return s;            }            // 剩余的嘗試次數大于0(MAX_HELP值為16)的情況,繼續做嘗試            if (retries > 0) {                if (queueTop != queueBase) {                    // 檢查當前線程的任務棧,如果任務棧不為空,當前任務處在棧頂位置則                    // 執行該任務返回true,否則返回false,直接認為嘗試失敗                    if (!localHelpJoinTask(joinMe))                        retries = 0;                }                // 嘗試了最大允許次數的一半                else if (retries == MAX_HELP >>> 1) {                    --retries;                    // 檢查當前任務是否在某個worker線程的任務隊列的隊首位置                    // 如果是的話,偷走這個任務并且執行掉該任務。tryDeqAndExec                    // 返回任務的status值,因此大于等于0意味著任務還沒有執行結束,                    // 當前線程讓出控制權以便其他線程執行任務                    if (tryDeqAndExec(joinMe) >= 0)                        Thread.yield();                }                else                    // helpJoinTask方法檢查當前任務是不是被某個Worker線程偷走了,                    // 并且是這個線程最新偷走的任務(currentSteal),如果是的話,                    // 當前線程幫助執行這個任務,這個過程成功則返回true                    retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;            }            else {                // 嘗試了最大允許次數還沒有成功,重置以便再次嘗試                retries = MAX_HELP;                // 一輪嘗試失敗,進入進程池等待任務                pool.tryAwaitJoin(joinMe);            }        }    }

來看一輪嘗試失敗之后,調用線程池的tryAwaitJoin方法會發生一些什么:

    final void tryAwaitJoin(ForkJoinTask<?> joinMe) {        int s;        // 檢查任務是否結束之前先清除當前線程的中斷狀態        // 因為tryAwaitDone會調用wait可能產生中斷異常        Thread.interrupted();        // 任務還在執行的情況,否則執行完成就直接返回        if (joinMe.status >= 0) {            // blockedCount加1,把當前線程標記為阻塞            // 成功則返回true,否則返回false            if (tryPreBlock()) {                // 調用wait方法等待任務完成                joinMe.tryAwaitDone(0L);                // blockedCount減1,把當前線程標記為活躍狀態                postBlock();            }            // 線程處于關閉狀態的情況,取消該任務            else if ((ctl & STOP_BIT) != 0L)                joinMe.cancelIgnoringExceptions();        }    }

最后又回歸到了原點,來看task的tryAwaitDone方法:

    final void tryAwaitDone(long millis) {        int s;        try {            // status為0,設為1。成功了然后才會用wait等待            if (((s = status) > 0 ||                 (s == 0 &&                  UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&                status > 0) {                synchronized (this) {                    if (status > 0)                        wait(millis);                }            }        } catch (InterruptedException ie) {            // 因為wait被中斷了,不能保證任務被正確執行結束,因此調用該方法時要注意            // 檢查任務是否已經執行結束了    }

走完了Worker線程內的join的流程,最后來看其他線程join等待發生了什么,來看externalAwaitDone方法:

    private int externalAwaitDone() {        int s;        if ((s = status) >= 0) {            boolean interrupted = false;            synchronized (this) {                // 循環等待直到任務執行結束                while ((s = status) >= 0) {                    if (s == 0)                        UNSAFE.compareAndSwapInt(this, statusOffset,                                                 0, SIGNAL);                    else {                        try {                            wait();                        } catch (InterruptedException ie) {                            interrupted = true;                        }                    }                }            }            // 清除中斷狀態            if (interrupted)                Thread.currentThread().interrupt();        }        return s;    }

externalAwaitDone邏輯較為簡單,采用循環的方式,使用wait方法等待直到任務執行結束。

既然使用wait方法等待,那么必然在任務執行結束后需要調用notify或者notifyAll的方法,在setCompletion方法找到了:

    private int setCompletion(int completion) {        for (int s;;) {            if ((s = status) < 0)                return s;            if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {                if (s != 0)                    synchronized (this) { notifyAll(); }                return completion;            }        }    }

到這里把Fork/Join框架簡單地講完了,因為水平所限,遺漏了很多的細節,各位見諒。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 得荣县| 丹凤县| 镇巴县| 桐柏县| 开平市| 石家庄市| 包头市| 兴宁市| 六枝特区| 交城县| 宝坻区| 南汇区| 孝昌县| 甘谷县| 弥渡县| 安丘市| 连江县| 本溪市| 松原市| 城市| 灵石县| 星座| 大渡口区| 项城市| 灵台县| 通海县| 泽州县| 上饶市| 屯昌县| 迁西县| 铁岭市| 佛教| 建阳市| 芜湖市| 云和县| 玉环县| 桂阳县| 新竹县| 湖南省| 潞西市| 伊吾县|