国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

《java.util.concurrent 包源碼閱讀》22 Fork/Join框架的初體驗

2019-11-14 21:04:14
字體:
來源:轉載
供稿:網友
java.util.concurrent 包源碼閱讀》22 Fork/Join框架的初體驗

JDK7引入了Fork/Join框架,所謂Fork/Join框架,個人解釋:Fork分解任務成獨立的子任務,用多線程去執行這些子任務,Join合并子任務的結果。這樣就能使用多線程的方式來執行一個任務。

JDK7引入的Fork/Join有三個核心類:

ForkJoinPool,執行任務的線程池

ForkJoinWorkerThread,執行任務的工作線程

ForkJoinTask,一個用于ForkJoinPool的任務抽象類。

因為ForkJoinTask比較復雜,抽象方法比較多,日常使用時一般不會繼承ForkJoinTask來實現自定義的任務,而是繼承ForkJoinTask的兩個子類:

RecursiveTask:子任務帶返回結果時使用

RecursiveAction:子任務不帶返回結果時使用

對于Fork/Join框架的原理,Doug Lea的文章:A Java Fork/Join Framework

在看了網上的很多例子之后,發現在自定義任務類實現compute方法的邏輯一般是這樣的:

if 任務足夠小    直接返回結果else    分割成N個子任務    依次調用每個子任務的fork方法執行子任務    依次調用每個子任務的join方法合并執行結果

而執行該自定義任務的調用的則是ForkJoinPool的execute方法,因此首先來看的就是ForkJoinPool的execute方法,看看和普通線程池執行任務有什么不同:

    public void execute(ForkJoinTask<?> task) {        if (task == null)            throw new NullPointerException();        forkOrSubmit(task);    }

因此forkOrSubmit是真正執行ForkJoinTask的方法:

    PRivate <T> void forkOrSubmit(ForkJoinTask<T> task) {        ForkJoinWorkerThread w;        Thread t = Thread.currentThread();        if (shutdown)            throw new RejectedExecutionException();        if ((t instanceof ForkJoinWorkerThread) &&            (w = (ForkJoinWorkerThread)t).pool == this)            w.pushTask(task);        else            // 正常執行的時候是主線程調用的,因此關注addSubmission            addSubmission(task);    }

那么我們首先要關注的是addSubmission方法,發覺所做的事情和普通線程池很類似,就是把任務加入到隊列中,不同的是直接使用Unsafe操作內存來添加任務對象

    private void addSubmission(ForkJoinTask<?> t) {        final ReentrantLock lock = this.submissionLock;        lock.lock();        try {            // 隊列只是普通的數組而不是普通線程池的BlockingQueue,            // 喚醒worker線程的工作由下面的signalWork來完成            // 使用Unsafe進行內存操作,把任務放置在數組中            ForkJoinTask<?>[] q; int s, m;            if ((q = submissionQueue) != null) {                long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;                UNSAFE.putOrderedObject(q, u, t);                queueTop = s + 1;                if (s - queueBase == m)                    // 數組已滿,為數組擴容                    growSubmissionQueue();            }        } finally {            lock.unlock();        }        // 通知有新任務來了:兩種操作,有空閑線程則喚醒該線程        // 否則如果可以新建worker線程則為這個任務新建worker線程        // 如果不可以就返回了,等到有空閑線程來執行這個任務        signalWork();    }

接下來要弄清楚就是在compute中fork時,按道理來說這個動作是和主任務在同一個線程中執行,fork是如果把子任務變成多線程執行的:

    public final ForkJoinTask<V> fork() {        ((ForkJoinWorkerThread) Thread.currentThread())            .pushTask(this);        return this;    }

在上面分析forkOrSubmit的時候同樣見到了ForkJoinWorkerThread的pushTask方法調用,那么來看這個方法:

    final void pushTask(ForkJoinTask<?> t) {        // 代碼的基本邏輯和ForkJoinPool的addSubmission方法基本一致        // 都是把任務加入了任務隊列中,這里是加入到ForkJoinWorkerThread        // 內置的任務隊列中        ForkJoinTask<?>[] q; int s, m;        if ((q = queue) != null) {    // ignore if queue removed            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;            UNSAFE.putOrderedObject(q, u, t);            queueTop = s + 1;         // or use putOrderedInt            // 這里不太明白            if ((s -= queueBase) <= 2)                pool.signalWork();            else if (s == m)                growQueue();        }    }

看到這里一下子陷入了僵局,為什么ForkJoinWorkerThread要內建一個隊列呢,而且如果子任務仍舊在同一個線程內的話,何以實現并發執行子任務呢?下一篇文章繼續。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 页游| 苏尼特左旗| 旺苍县| 云阳县| 越西县| 项城市| 阿巴嘎旗| 二连浩特市| 报价| 宜宾市| 甘谷县| 宁蒗| 葫芦岛市| 肃宁县| 扬州市| 峨眉山市| 永泰县| 武乡县| 临夏县| 岳西县| 澜沧| 大厂| 克什克腾旗| 建平县| 沂源县| 尼玛县| 两当县| 应城市| 盐源县| 宜丰县| 上蔡县| 长汀县| 皋兰县| 峨眉山市| 文登市| 香河县| 美姑县| 延长县| 清远市| 商城县| 图木舒克市|