国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

ThreadPoolExecutor機制探索-我們到底能走多遠系列(41)

2019-11-14 23:03:25
字體:
來源:轉載
供稿:網友
ThreadPoolExecutor機制探索-我們到底能走多遠系列(41)我們到底能走多遠系列(41)

扯淡:

  這一年過的不匆忙,也頗多感受,成長的路上難免彎路,這個世界上沒人關心你有沒有變強,只有自己時刻提醒自己,不要忘記最初出發的原因。

  其實這個世界上比我們聰明的人無數,很多人都比我們努力,當我門奇怪為什么他們可以如此輕松的時候,是不會問他們付出過什么。怨天尤人是無用的,使自己變好,哪怕是變好一點點,我覺得生活著就是有意義的。

  未來,太遠。唯有不停的積累,不要著急,抓得住的才能叫機會。

  羊年,一定要不做被動的人。大家加油!

目錄留白:

  * ArrayBlockingQueue

主題:

直接進ThreadPoolExecutor源碼看一看:(版本是1.7.0)首先,這個線程池的狀態是怎么樣的呢?我們看下面的字段定義,ctl作為ThreadPoolExecutor的核心狀態控制字段,包含來兩個信息: 1,工作線程總數 workerCount 2,線程池狀態RUNNING SHUTDOWNSTOPTIDYINGTERMINATED下面代碼解釋一下: COUNT_BITS 是32減去3 就是29,下面的線程池狀態就是-1 到 3 分別向左移動29位。 如此,int的右側29位,代表著線程數量,總數可以達到2的29次,29位后的3位代表線程池的狀態這樣,線程池增加一個線程,只需吧ctl加1即可,而我們也發現實際這個線程池的最高線程數量是2的29次減1。并不是先前我們現象的2的32次減1。這個作者在注釋中也提到了,說如果后續需要增大這個值,可以吧ctl定義成AtomicLong。這個關鍵的控制字段的理解,對閱讀源碼很有幫助。
    PRivate final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    private static final int COUNT_BITS = Integer.SIZE - 3;    private static final int CAPACITY  = (1 << COUNT_BITS) - 1;    // runState is stored in the high-order bits    private static final int RUNNING    = -1 << COUNT_BITS;// 111 00000000000000000000000000000    private static final int SHUTDOWN  =  0 << COUNT_BITS;// 000 00000000000000000000000000000    private static final int STOP      =  1 << COUNT_BITS;// 001 00000000000000000000000000000    private static final int TIDYING    =  2 << COUNT_BITS;// 010 00000000000000000000000000000    private static final int TERMINATED =  3 << COUNT_BITS;// 100 00000000000000000000000000000    // Packing and unpacking ctl    private static int runStateOf(int c)     { return c & ~CAPACITY; }//最高3位    private static int workerCountOf(int c)  { return c & CAPACITY; }//后29位    private static int ctlOf(int rs, int wc) { return rs | wc; }

代碼里我們可能會這樣使用ThreadPoolExecutor的方法:

Future<?> future = this.threadPoolExecutor.submit(runnable);

那么就從submit方法入手,這個submit的代碼在 AbstractExecutorService,因為 ThreadPoolExecutor繼承了它。

    public Future<?> submit(Runnable task) {        if (task == null) throw new NullPointerException();        RunnableFuture<Void> ftask = newTaskFor(task, null);        execute(ftask);        return ftask;    }
把task包裝成RunnableFuture,然后執行execute,下面是ThreadPoolExecutor的execute方法:這個方法就是我們把任務提交給線程池去完成,至于線程池按照怎樣的一個管理機制來完成這個task我們不關心,task關系的是run方法中的邏輯。如此,對于開發來說是極其方便的,配置一個線程池,只需一句代碼,然后專心完成task的邏輯。那么,了解這個線程池的機制,我感覺只需要看下這個execute方法大概也明白了。特別是方法中的注釋。1,當一個task被安排進來的時候,再確定不是空值后,直接判斷在池中已經有工作的線程是否小于corePoolSize,小于則增加一個線程來負責這個task。2,如果池中已經工作的線程大于等于corePoolSize,就向隊列里存task,而不是繼續增加線程。3,當workQueue.offer失敗時,也就是說task不能再向隊列里放的時候,而此時工作線程大于等于corePoolSize,那么新進的task,就要新開一個線程來接待了。根據代碼分析諸多判斷和邏輯,而對于使用這個線程池的外部來說,機制是這樣:a、如果正在運行的線程數 < corePoolSize,那就馬上創建線程并運行這個任務,而不會進行排隊。b、如果正在運行的線程數 >= corePoolSize,那就把這個任務放入隊列。c、如果隊列滿了,并且正在運行的線程數 < maximumPoolSize,那么還是要創建線程并運行這個任務。d、如果隊列滿了,并且正在運行的線程數 >= maximumPoolSize,那么線程池就會調用handler里方法。(采用LinkedBlockingDeque就不會出現隊列滿情況)
/**     * Executes the given task sometime in the future.  The task     * may execute in a new thread or in an existing pooled thread.     *     * If the task cannot be submitted for execution, either because this     * executor has been shutdown or because its capacity has been reached,     * the task is handled by the current {@code RejectedExecutionHandler}.     *     * @param command the task to execute     * @throws RejectedExecutionException at discretion of     *         {@code RejectedExecutionHandler}, if the task     *         cannot be accepted for execution     * @throws NullPointerException if {@code command} is null     */    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*         * Proceed in 3 steps:         *         * 1. If fewer than corePoolSize threads are running, try to         * start a new thread with the given command as its first         * task.  The call to addWorker atomically checks runState and         * workerCount, and so prevents false alarms that would add         * threads when it shouldn't, by returning false.         *         * 2. If a task can be successfully queued, then we still need         * to double-check whether we should have added a thread         * (because existing ones died since last checking) or that         * the pool shut down since entry into this method. So we         * recheck state and if necessary roll back the enqueuing if         * stopped, or start a new thread if there are none.         *         * 3. If we cannot queue task, then we try to add a new         * thread.  If it fails, we know we are shut down or saturated         * and so reject the task.         */        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }
單從execute方法,大概能了解整個線程池的工作機制。那么,全局的觀看以下,我們一定明白這個ThreadPoolExecutor維護著一個池:
    /**     * Set containing all worker threads in pool. accessed only when     * holding mainLock.     */    private final HashSet<Worker> workers = new HashSet<Worker>();
猜測execute方法中的addWorker應該是向這個set中add一個worker,而這里面的worker里有一個線程,這個線程執行完成時,就會從這個set中remove掉。看一下開進程開始工作的addWorker方法:
  /*     * Methods for creating, running and cleaning up after workers     */    /**     * Checks if a new worker can be added with respect to current     * pool state and the given bound (either core or maximum). If so,     * the worker count is adjusted accordingly, and, if possible, a     * new worker is created and started, running firstTask as its     * first task. This method returns false if the pool is stopped or     * eligible to shut down. It also returns false if the thread     * factory fails to create a thread when asked.  If the thread     * creation fails, either due to the thread factory returning     * null, or due to an exception (typically OutOfMemoryError in     * Thread#start), we roll back cleanly.     *     * @param firstTask the task the new thread should run first (or     * null if none). Workers are created with an initial first task     * (in method execute()) to bypass queuing when there are fewer     * than corePoolSize threads (in which case we always start one),     * or when the queue is full (in which case we must bypass queue).     * Initially idle threads are usually created via     * prestartCoreThread or to replace other dying workers.     *     * @param core if true use corePoolSize as bound, else     * maximumPoolSize. (A boolean indicator is used here rather than a     * value to ensure reads of fresh values after checking other pool     * state).     * @return true if successful     */    private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // Check if queue empty only if necessary.            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;            for (;;) {                int wc = workerCountOf(c);                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                    return false;                if (compareAndIncrementWorkerCount(c))                    break retry;                c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs)                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }        }        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            final ReentrantLock mainLock = this.mainLock;            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int c = ctl.get();                    int rs = runStateOf(c);                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }
View Code

方法前面的retry循環,最終break的時候,執行compareAndIncrementWorkerCount(c),是的,最前面提到的ctl加1啦!這里利用CAS原則,可以參考先前的文章:摸我

    /**     * Attempt to CAS-increment the workerCount field of ctl.     */    private boolean compareAndIncrementWorkerCount(int expect) {        return ctl.compareAndSet(expect, expect + 1);    }
retry循環break之后,就是做核心的事,new一個worker出來然后add進set,然后啟動worker里的thread。 我們注意到做把worker放入set這個操作前,先獲取了鎖,這個mainLock是類靜態成員變量,是一個公用的可重入鎖:
    /**     * Lock held on access to workers set and related bookkeeping.     * While we could use a concurrent set of some sort, it turns out     * to be generally preferable to use a lock. Among the reasons is     * that this serializes interruptIdleWorkers, which avoids     * unnecessary interrupt storms, especially during shutdown.     * Otherwise exiting threads would concurrently interrupt those     * that have not yet interrupted. It also simplifies some of the     * associated statistics bookkeeping of largestPoolSize etc. We     * also hold mainLock on shutdown and shutdownNow, for the sake of     * ensuring workers set is stable while separately checking     * permission to interrupt and actually interrupting.     */    private final ReentrantLock mainLock = new ReentrantLock();

其實調用這個addWorker方法有4種傳參的方式:  1, addWorker(command, true);  2, addWorker(command, false);  3, addWorker(null, false);  4, addWorker(null, true);在execute方法中就使用了前3種,結合這個核心方法我們先進行一下分析。第一個:線程數小于corePoolSize時,放一個需要處理的task進worker set。如果worker set長度超過corePoolSize,就返回false。 第二個:當隊列被放滿時,就嘗試將這個新來的task直接放入workerset,而此時workerset 的長度限制是maximumPoolSize。如果線程池也滿了的話就返回false。 第三個:放入一個空的task進set,比較的的長度限制是maximumPoolSize。這樣一個task為空的worker在線程執行的時候會判斷出后去任務隊列里拿任務,這樣就相當于世創建了一個新的線程,只是沒有馬上分配任務。 第四個:這個方法就是放一個null的task進set,而且是在小于corePoolSize時。實際使用中是在 prestartCoreThread() 方法。這個方法用來為線程池先啟動一個worker等待在那邊,如果此時set中的數量已經達到corePoolSize那就返回false,什么也不干。還有是 prestartAllCoreThreads()方法,準備corePoolSize個worker:
   /**     * Starts all core threads, causing them to idly wait for work. This     * overrides the default policy of starting core threads only when     * new tasks are executed.     *     * @return the number of threads started     */    public int prestartAllCoreThreads() {        int n = 0;        while (addWorker(null, true))            ++n;        return n;    }
在addWorker中 t.start()使線程就緒,thread是怎么來的,就看下Worker的代碼Worker類的源碼:
/**     * Class Worker mainly maintains interrupt control state for     * threads running tasks, along with other minor bookkeeping.     * This class opportunistically extends AbstractQueuedSynchronizer     * to simplify acquiring and releasing a lock surrounding each     * task execution.  This protects against interrupts that are     * intended to wake up a worker thread waiting for a task from     * instead interrupting a task being run.  We implement a simple     * non-reentrant mutual exclusion lock rather than use     * ReentrantLock because we do not want worker tasks to be able to     * reacquire the lock when they invoke pool control methods like     * setCorePoolSize.  Additionally, to suppress interrupts until     * the thread actually starts running tasks, we initialize lock     * state to a negative value, and clear it upon start (in     * runWorker).     */    private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {        /**         * This class will never be serialized, but we provide a         * serialVersionUID to suppress a javac warning.         */        private static final long serialVersionUID = 6138294804551838833L;        /** Thread this worker is running in.  Null if factory fails. */        final Thread thread;        /** Initial task to run.  Possibly null. */        Runnable firstTask;        /** Per-thread task counter */        volatile long completedTasks;        /**         * Creates with given first task and thread from ThreadFactory.         * @param firstTask the first task (null if none)         */        Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }        /** Delegates main run loop to outer runWorker  */        public void run() {            runWorker(this);        }        // Lock methods        //        // The value 0 represents the unlocked state.        // The value 1 represents the locked state.        protected boolean isHeldExclusively() {            return getState() != 0;        }        protected boolean tryAcquire(int unused) {            if (compareAndSetState(0, 1)) {                setExclusiveOwnerThread(Thread.currentThread());                return true;            }            return false;        }        protected boolean tryRelease(int unused) {            setExclusiveOwnerThread(null);            setState(0);            return true;        }        public void lock()        { acquire(1); }        public boolean tryLock()  { return tryAcquire(1); }        public void unlock()      { release(1); }        public boolean isLocked() { return isHeldExclusively(); }        void interruptIfStarted() {            Thread t;            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {                try {                    t.interrupt();                } catch (SecurityException ignore) {                }            }        }    }
View Code線程啟動后就會調用run方法,也就是調用runWorker(Worker w),核心代碼了,英文注釋十分詳細。在執行task之前會先執行beforeExecute,task結束后執行afterExecute,pool的擴展性利用:摸我
/**     * Main worker run loop.  Repeatedly gets tasks from queue and     * executes them, while coping with a number of issues:     *     * 1. We may start out with an initial task, in which case we     * don't need to get the first one. Otherwise, as long as pool is     * running, we get tasks from getTask. If it returns null then the     * worker exits due to changed pool state or configuration     * parameters.  Other exits result from exception throws in     * external code, in which case completedAbruptly holds, which     * usually leads processWorkerExit to replace this thread.     *     * 2. Before running any task, the lock is acquired to prevent     * other pool interrupts while the task is executing, and     * clearInterruptsForTaskRun called to ensure that unless pool is     * stopping, this thread does not have its interrupt set.     *     * 3. Each task run is preceded by a call to beforeExecute, which     * might throw an exception, in which case we cause thread to die     * (breaking loop with completedAbruptly true) without processing     * the task.     *     * 4. Assuming beforeExecute completes normally, we run the task,     * gathering any of its thrown exceptions to send to     * afterExecute. We separately handle RuntimeException, Error     * (both of which the specs guarantee that we trap) and arbitrary     * Throwables.  Because we cannot rethrow Throwables within     * Runnable.run, we wrap them within Errors on the way out (to the     * thread's UncaughtExceptionHandler).  Any thrown exception also     * conservatively causes thread to die.     *     * 5. After task.run completes, we call afterExecute, which may     * also throw an exception, which will also cause thread to     * die. According to JLS Sec 14.20, this exception is the one that     * will be in effect even if task.run throws.     *     * The net effect of the exception mechanics is that afterExecute     * and the thread's UncaughtExceptionHandler have as accurate     * information as we can provide about any problems encountered by     * user code.     *     * @param w the worker     */    final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }
View Code

while循環條件:先取worker自己的task,如果沒有,就是上面提到addWorker時task放null的那種,就調用getTask方法。

 /**     * Performs blocking or timed wait for a task, depending on     * current configuration settings, or returns null if this worker     * must exit because of any of:     * 1. There are more than maximumPoolSize workers (due to     *    a call to setMaximumPoolSize).     * 2. The pool is stopped.     * 3. The pool is shutdown and the queue is empty.     * 4. This worker timed out waiting for a task, and timed-out     *    workers are subject to termination (that is,     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})     *    both before and after the timed wait.     *     * @return task, or null if the worker must exit, in which case     *         workerCount is decremented     */    private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?        retry:        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // Check if queue empty only if necessary.            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                decrementWorkerCount();                return null;            }            boolean timed;      // Are workers subject to culling?            for (;;) {                int wc = workerCountOf(c);                timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果線程池允許線程  timeout或者當前線程數大于核心線程數,則會進行timeout的處理                if (wc <= maximumPoolSize && ! (timedOut && timed))//如果線程小于最大值,也不需要timeout判斷的,就直接退出                    break;                if (compareAndDecrementWorkerCount(c))//削減線程                    return null;                c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs)//狀態再判斷是否變化,發生變化需要重新再來                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }            try {               //keepAliveTime來控制獲取queue中元素時的等待時間                Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();                if (r != null)                    return r;                timedOut = true;            } catch (InterruptedException retry) {                timedOut = false;            }        }    }

至此基本了解了ThreadPoolExecutor源碼。在使用是也會更明了一些。

讓我們繼續前行

----------------------------------------------------------------------

努力不一定成功,但不努力肯定不會成功。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 化隆| 兴海县| 龙岩市| 威远县| 凌云县| 合肥市| 周至县| 潍坊市| 罗田县| 永年县| 平武县| 建始县| 米易县| 霞浦县| 九龙县| 剑川县| 株洲市| 海盐县| 察雅县| 调兵山市| 巴彦县| 观塘区| 三门县| 会昌县| 闻喜县| 重庆市| 玉屏| 石柱| 淳化县| 沾化县| 民县| 靖边县| 两当县| 宁波市| 武鸣县| 桑植县| 梨树县| 盈江县| 嵊泗县| 四会市| 孟津县|