上篇文章一直追蹤到了ForkJoinWorkerThread的pushTask方法,仍然沒有辦法解釋Fork的原理,那么不妨來看看ForkJoinWorkerThread的run方法:
    public void run() {        Throwable exception = null;        try {            // 初始化任務隊列            onStart();            // 線程運行            pool.work(this);        } catch (Throwable ex) {            exception = ex;        } finally {            // 結束后的工作            onTermination(exception);        }    }因此我們需要再次回到ForkJoinPool,看看work方法:
    final void work(ForkJoinWorkerThread w) {        boolean swept = false;                // 下面scan方法沒有掃描到任務返回true        long c;        // ctl是一個64位長的數據,它的格式如下:        // 48-63:AC,正在運行的worker線程數減去系統的并發數(減去系統的并發得出的實際是在某一瞬間等待并發資源的線程數量)        // 32-47:TC,所有的worker線程數減去系統的并發數        // 31:   ST,1表示線程池正在關閉        // 16-30:EC,第一個等待線程的等待數        // 0- 15:ID,Treiber棧(存儲等待線程)頂的worker線程在線程池的線程隊列中的索引        // (int)(c = ctl) >= 0表示ST位為0,即線程池不是正在關閉的狀態        while (!w.terminate && (int)(c = ctl) >= 0) {            int a; // 正在運行的worker線程數,ctl中的AC部分            // swept為false可能有三種:            // 1. scan返回false            // 2. 首次循環            // 3. tryAwaitWork成功            if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)                swept = scan(w, a);            else if (tryAwaitWork(w, c))                swept = false;        }    }接下來分析scan方法,我承認我看得有點暈。
PRivate boolean scan(ForkJoinWorkerThread w, int a) { int g = scanGuard; // mask 0 avoids useless scans if only one active int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws == null || ws.length <= m) // staleness check return false; // 代碼看起來暈啊,看來當前的ForkJoinWorkerThread不一定是運行自己的 // Task,可以運行其他ForkJoinWorkerThread的Task。 // 似乎有點明白了,這樣可以實現Fork出來的任務被多線程執行 // 看起來這是一個較為復雜的算法 for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) { ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; ForkJoinWorkerThread v = ws[k & m]; if (v != null && (b = v.queueBase) != v.queueTop && (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && v.queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { int d = (v.queueBase = b + 1) - v.queueTop; v.stealHint = w.poolIndex; if (d != 0) signalWork(); // propagate if nonempty w.execTask(t); } r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5); return false; // store next seed } else if (j < 0) { // xorshift r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; } else ++k; } if (scanGuard != g) // staleness check return false; else { // try to take submission ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; if ((b = queueBase) != queueTop && (q = submissionQueue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { queueBase = b + 1; w.execTask(t); } return false; } return true; // all queues empty } }
但是起碼能看出來,Fork出來的任務是如何被其他線程運行以實現多線程運行的了。面對這么個有點復雜的算法,我只能先去查查,發現原來叫做Work-Stealing,好吧,下一篇來研究這個Work-Stealing。
新聞熱點
疑難解答