国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

《java.util.concurrent 包源碼閱讀》10 線程池系列之AbstractExecutorService

2019-11-14 20:52:12
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友
java.util.concurrent 包源碼閱讀》10 線程池系列之AbstractExecutorService

AbstractExecutorService對(duì)ExecutorService的執(zhí)行任務(wù)類型的方法提供了一個(gè)默認(rèn)實(shí)現(xiàn)。這些方法包括submit,invokeAny和InvokeAll。

注意的是來(lái)自Executor接口的execute方法是未被實(shí)現(xiàn),execute方法是整個(gè)體系的核心,所有的任務(wù)都是在這個(gè)方法里被真正執(zhí)行的,因此該方法的不同實(shí)現(xiàn)會(huì)帶來(lái)不同的執(zhí)行策略。這個(gè)在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出來(lái)。

首先來(lái)看submit方法,它的基本邏輯是這樣的:

1. 生成一個(gè)任務(wù)類型和Future接口的包裝接口RunnableFuture的對(duì)象

2. 執(zhí)行任務(wù)

3. 返回future。

    public Future<?> submit(Runnable task) {        if (task == null) throw new NullPointerException();        RunnableFuture<Void> ftask = newTaskFor(task, null);        execute(ftask);        return ftask;    }    public <T> Future<T> submit(Callable<T> task) {        if (task == null) throw new NullPointerException();        RunnableFuture<T> ftask = newTaskFor(task);        execute(ftask);        return ftask;    }

因?yàn)閟ubmit支持Callable和Runnable兩種類型的任務(wù),因此newTaskFor方法有兩個(gè)重載方法:

    PRotected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {        return new FutureTask<T>(callable);    }    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {        return new FutureTask<T>(runnable, value);    }

上一篇文章里曾經(jīng)說(shuō)過(guò)Callable和Runnable的區(qū)別在于前者帶返回值,也就是說(shuō)Callable=Runnable+返回值。因此java中提供了一種adapter,把Runnable+返回值轉(zhuǎn)換成Callable類型。這點(diǎn)可以在newTaskFor中的FutureTask類型的構(gòu)造函數(shù)的代碼中看到:

    public FutureTask(Callable<V> callable) {        if (callable == null)            throw new NullPointerException();        sync = new Sync(callable);    }    public FutureTask(Runnable runnable, V result) {        sync = new Sync(Executors.callable(runnable, result));    }

以下是Executors.callable方法的代碼:

    public static <T> Callable<T> callable(Runnable task, T result) {        if (task == null)            throw new NullPointerException();        return new RunnableAdapter<T>(task, result);    }

那么RunnableAdapter的代碼就很好理解了,它是一個(gè)Callable的實(shí)現(xiàn),call方法的實(shí)現(xiàn)就是執(zhí)行Runnable的run方法,然后返回那個(gè)value。

    static final class RunnableAdapter<T> implements Callable<T> {        final Runnable task;        final T result;        RunnableAdapter(Runnable task, T result) {            this.task = task;            this.result = result;        }        public T call() {            task.run();            return result;        }    }

接下來(lái)先說(shuō)說(shuō)較為簡(jiǎn)單的invokeAll:

1. 為每個(gè)task調(diào)用newTaskFor方法生成得到一個(gè)既是Task也是Future的包裝類對(duì)象的List

2. 循環(huán)調(diào)用execute執(zhí)行每個(gè)任務(wù)

3. 再次循環(huán)調(diào)用每個(gè)Future的get方法等待每個(gè)task執(zhí)行完成

4. 最后返回Future的list。

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,                                         long timeout, TimeUnit unit)        throws InterruptedException {        if (tasks == null || unit == null)            throw new NullPointerException();        long nanos = unit.toNanos(timeout);        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());        boolean done = false;        try {            // 為每個(gè)task生成包裝對(duì)象            for (Callable<T> t : tasks)                futures.add(newTaskFor(t));            long lastTime = System.nanoTime();            // 循環(huán)調(diào)用execute執(zhí)行每個(gè)方法            // 這里因?yàn)樵O(shè)置了超時(shí)時(shí)間,所以每次執(zhí)行完成后            // 檢查是否超時(shí),超時(shí)了就直接返回future集合            Iterator<Future<T>> it = futures.iterator();            while (it.hasNext()) {                execute((Runnable)(it.next()));                long now = System.nanoTime();                nanos -= now - lastTime;                lastTime = now;                if (nanos <= 0)                    return futures;            }            // 等待每個(gè)任務(wù)執(zhí)行完成            for (Future<T> f : futures) {                if (!f.isDone()) {                    if (nanos <= 0)                        return futures;                    try {                        f.get(nanos, TimeUnit.NANOSECONDS);                    } catch (CancellationException ignore) {                    } catch (ExecutionException ignore) {                    } catch (TimeoutException toe) {                        return futures;                    }                    long now = System.nanoTime();                    nanos -= now - lastTime;                    lastTime = now;                }            }            done = true;            return futures;        } finally {            if (!done)                for (Future<T> f : futures)                    f.cancel(true);        }    }

最后說(shuō)說(shuō)invokeAny,它的難點(diǎn)在于只要一個(gè)任務(wù)執(zhí)行成功就要返回,并且會(huì)取消其他任務(wù),也就是說(shuō)重點(diǎn)在于找到第一個(gè)執(zhí)行成功的任務(wù)。

這里我想到了BlockingQueue,當(dāng)所有的任務(wù)被提交后,任務(wù)執(zhí)行返回的Future會(huì)被依次添加到一個(gè)BlockingQueue中,然后找到第一個(gè)執(zhí)行成功任務(wù)的方法就是從BlockingQueue取出第一個(gè)元素,這個(gè)就是doInvokeAny方法用到的ExecutorCompletionService的基本原理。

因?yàn)閮蓚€(gè)invokeAny方法都是調(diào)用doInvokeAny方法,下面是doInvokeAny的代碼分析:

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,                            boolean timed, long nanos)        throws InterruptedException, ExecutionException, TimeoutException {        if (tasks == null)            throw new NullPointerException();        int ntasks = tasks.size();        if (ntasks == 0)            throw new IllegalArgumentException();        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);        // ExecutorCompletionService負(fù)責(zé)執(zhí)行任務(wù),后面調(diào)用用poll返回第一個(gè)執(zhí)行結(jié)果        ExecutorCompletionService<T> ecs =            new ExecutorCompletionService<T>(this);        // 這里出于效率的考慮,每次提交一個(gè)任務(wù)之后,就檢查一下有沒有執(zhí)行完成的任務(wù)        try {            ExecutionException ee = null;            long lastTime = timed ? System.nanoTime() : 0;            Iterator<? extends Callable<T>> it = tasks.iterator();            // 先提交一個(gè)任務(wù)            futures.add(ecs.submit(it.next()));            --ntasks;            int active = 1;            for (;;) {                // 嘗試獲取有沒有執(zhí)行結(jié)果(這個(gè)結(jié)果是立刻返回的)                Future<T> f = ecs.poll();                // 沒有執(zhí)行結(jié)果                if (f == null) {                    // 如果還有任務(wù)沒有被提交執(zhí)行的,就再提交一個(gè)任務(wù)                    if (ntasks > 0) {                        --ntasks;                        futures.add(ecs.submit(it.next()));                        ++active;                    }                    // 沒有任務(wù)在執(zhí)行了,而且沒有拿到一個(gè)成功的結(jié)果。                    else if (active == 0)                        break;                    // 如果設(shè)置了超時(shí)情況                    else if (timed) {                        // 等待執(zhí)行結(jié)果直到有結(jié)果或者超時(shí)                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);                        if (f == null)                            throw new TimeoutException();                        // 這里的更新不可少,因?yàn)檫@個(gè)Future可能是執(zhí)行失敗的情況,那么還需要再次等待下一個(gè)結(jié)果,超時(shí)的設(shè)置還是需要用到。                        long now = System.nanoTime();                        nanos -= now - lastTime;                        lastTime = now;                    }                    // 沒有設(shè)置超時(shí),并且所有任務(wù)都被提交了,則一直等到第一個(gè)執(zhí)行結(jié)果出來(lái)                    else                        f = ecs.take();                }                // 有返回結(jié)果了,嘗試從future中獲取結(jié)果,如果失敗了,那么需要接著等待下一個(gè)執(zhí)行結(jié)果                if (f != null) {                    --active;                    try {                        return f.get();                    } catch (ExecutionException eex) {                        ee = eex;                    } catch (RuntimeException rex) {                        ee = new ExecutionException(rex);                    }                }            }            // ExecutorCompletionService執(zhí)行時(shí)發(fā)生錯(cuò)誤返回了全是null的future            if (ee == null)                ee = new ExecutionException();            throw ee;        } finally {            // 嘗試取消所有的任務(wù)(對(duì)于已經(jīng)完成的任務(wù)沒有影響)            for (Future<T> f : futures)                f.cancel(true);        }    }

后面接著分析ThreadPoolExecutor和ScheduledThreadPoolExecutor。


發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 铜川市| 凤山市| 饶平县| 临泉县| 宁城县| 漳浦县| 芷江| 佛山市| 鸡西市| 东丰县| 太和县| 秦安县| 马山县| 同心县| 石渠县| 宁城县| 吉安县| 赤城县| 尖扎县| 乐陵市| 奉节县| 玛沁县| 玛纳斯县| 武夷山市| 封丘县| 延吉市| 松原市| 嘉黎县| 甘孜县| 商河县| 聂荣县| 依安县| 天柱县| 河北区| 莒南县| 栖霞市| 于都县| 昭通市| 比如县| 栖霞市| 剑川县|