国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學(xué)院 > 開發(fā)設(shè)計 > 正文

jetty源碼分析:QueuedThreadPool

2019-11-08 02:14:09
字體:
供稿:網(wǎng)友

概括

我們從幾個問題入手,如果搞清楚這幾個問題,也就理解了這個線程池的原理了

線程池中的線程是否固定的? 答: 不是的,線程池中的線程數(shù)量隨著請求量的變化而動態(tài)變化,不過一直保持在_minThreads到_maxThreads之間。

線程什么時候新增?什么時候會退出? 答: 當(dāng)請求過來,但是又沒有空閑的線程,那么就會新增,當(dāng)然這個也收_maxThreads限制。退出有兩種情況:一是線程被中斷了,另外一種設(shè)置了超時退出時間并且時間真的到了。

重要的成員

線程池常規(guī)成員:

PRivate int _idleTimeout; //線程空閑多久后退出 private int _maxThreads; //最大線程數(shù) private int _minThreads; //最小線程數(shù) private final BlockingQueue<Runnable> _jobs; //任務(wù)隊列,提交的任務(wù)都先放到這里來 private final ConcurrentHashSet<Thread> _threads=new ConcurrentHashSet<>(); //線程池中所有線程

構(gòu)造函數(shù)

線程池啟動

線程池通過doStart函數(shù)啟動

protected void doStart() throws Exception { super.doStart(); _threadsStarted.set(0); startThreads(_minThreads); }

代碼比較簡單,先調(diào)用父類的doStart,然后把線程數(shù)量設(shè)置為0,同時通過startThreads函數(shù)啟動了_minThreads個線程,我們來看下這個線程啟動函數(shù)

private boolean startThreads(int threadsToStart) { while (threadsToStart > 0 && isRunning()) { int threads = _threadsStarted.get(); if (threads >= _maxThreads) return false; if (!_threadsStarted.compareAndSet(threads, threads + 1)) continue; boolean started = false; try { Thread thread = newThread(_runnable); thread.setDaemon(isDaemon()); thread.setPriority(getThreadsPriority()); thread.setName(_name + "-" + thread.getId()); _threads.add(thread); thread.start(); started = true; --threadsToStart; //啟動一個,任務(wù)少了一個 } finally { if (!started) _threadsStarted.decrementAndGet(); } } return true; }

這個函數(shù)的功能是批量啟動線程,所以里面有個循環(huán)的,判斷創(chuàng)建的線程數(shù)量是否到達(dá)了目標(biāo)數(shù)(當(dāng)然線程池一定是要啟動著的)。 具體創(chuàng)建一個線程流程如下: 1. 需要判斷當(dāng)前的線程數(shù)量是否已經(jīng)超過最大值,如果是就直接返回失敗,否則總線程數(shù)+1 2 已啟動線程+1,因為這里的實現(xiàn)中,每個線程都可以再創(chuàng)建線程的,所以這里需要原子操作 3 通過newThread新建一個線程,并啟動,然后放到_threads。如果創(chuàng)建成功,需要創(chuàng)建線程-1,如果創(chuàng)建失敗,那么總線程數(shù)量減-1

上面創(chuàng)建一個線程時候,我們它設(shè)置需要干的活,就是_runnable,我們進(jìn)去看下它在干什么?

private Runnable _runnable = new Runnable() { @Override public void run() { boolean shrink = false; boolean ignore = false; try { Runnable job = _jobs.poll(); if (job != null && _threadsIdle.get() == 0) //隊列中有任務(wù),但是空閑的線程為0,那么需要啟動一個線程 { startThreads(1); } loop: while (isRunning()) { // Job loop while (job != null && isRunning()) { if (LOG.isDebugEnabled()) LOG.debug("run {}",job); runJob(job); if (LOG.isDebugEnabled()) LOG.debug("ran {}",job); if (Thread.interrupted()) { ignore=true; break loop; } job = _jobs.poll(); //繼續(xù)拿一個任務(wù),進(jìn)行執(zhí)行 } // Idle loop try { _threadsIdle.incrementAndGet(); while (isRunning() && job == null) { if (_idleTimeout <= 0) //如果沒有設(shè)置超時退出時間 job = _jobs.take(); //阻塞線程等待有新的任務(wù) else //判斷是否需要退出線程 { // maybe we should shrink? final int size = _threadsStarted.get(); if (size > _minThreads) { long last = _lastShrink.get(); long now = System.nanoTime(); if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout)) { if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1)) { shrink=true; break loop; } } } job = idleJobPoll(); //如果這個線程不退出,那么繼續(xù)嘗試獲取新任務(wù) } } } finally { if (_threadsIdle.decrementAndGet() == 0) //空閑的線程為0了,那么創(chuàng)建一個 { startThreads(1); } } } } catch (InterruptedException e) { ignore=true; LOG.ignore(e); } catch (Throwable e) { LOG.warn(e); } finally { if (!shrink && isRunning()) { if (!ignore) LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this); // This is an unexpected thread death! if (_threadsStarted.decrementAndGet()<getMaxThreads()) startThreads(1); } _threads.remove(Thread.currentThread()); } } };

這個代碼有將近100行,但是其實內(nèi)容也不多,可以分為三塊: 1 首先從任務(wù)隊列中拿到一個任務(wù),如果有需要執(zhí)行的任務(wù),并且空閑的線程為0,通過startThreads啟動一個線程(當(dāng)然啟動線程可能失敗情況) 2 任務(wù)循環(huán):如果1中獲取了任務(wù),那么通過runjob把任務(wù)啟動起來,運行后通過interrupted()查看當(dāng)前線程是否被interrupted,如果是,那么就退出線程。否則從隊列中繼續(xù)獲取一個任務(wù)出來。 3 空閑處理:首先進(jìn)入到這里,就是一個任務(wù)執(zhí)行完成了,那么對應(yīng)的我們把空閑的線程數(shù)量+1,然后進(jìn)行一個循環(huán),但是有條件:線程池服務(wù)還在且任務(wù)隊列中沒有任務(wù)了,執(zhí)行下面的邏輯:首先看是否失職超時退出,如果沒有設(shè)置這個,也就是線程是永遠(yuǎn)不能退出的,那我們就讓線程阻塞在隊列中,一直等待到有新的任務(wù)出現(xiàn);否則的話,就可能需要把當(dāng)前的線程給退出了,我們先看當(dāng)前總的線程數(shù)是否大于_minThreads(否則太少我們也不會讓當(dāng)前線程退出的),如果是的且好久沒有線程退出了( 當(dāng)前時間距離上次有線程退出已經(jīng)超過閾值了),那么就需要把當(dāng)前線程退出并且是否當(dāng)前的時間為最后退出時間。 最后線程不滿足退出條件,那么再嘗試去獲取任務(wù) 4. 正常情況,如果2和3中的線程不退出,那么一直循環(huán)執(zhí)行新的任務(wù)(沒有任務(wù)時候就阻塞在任務(wù)隊列上)。但是如果2和3中退出來了,如果是2導(dǎo)致的(被中斷的),那么看下線程數(shù)是否太少了,如果是,繼續(xù)啟動一個線程。最后不管是什么原因?qū)е碌模?dāng)前線程是要退出的,所有把線程從線程隊列中(_threads)刪除。

任務(wù)接受

當(dāng)一個任務(wù)提交到線程池的時候,調(diào)用了execute函數(shù),我們從這個入口進(jìn)行分析

@Override public void execute(Runnable job) { if (LOG.isDebugEnabled()) LOG.debug("queue {}",job); if (!isRunning() || !_jobs.offer(job)) { LOG.warn("{} rejected {}", this, job); throw new RejectedExecutionException(job.toString()); } else { // Make sure there is at least one thread executing the job. if (getThreads() == 0) startThreads(1); } }

代碼很簡單,首先看線程池是否在運行,沒有運行就直接拋出異常,否則就往任務(wù)隊列中添加一個任務(wù),同樣添加失敗也會拋異常(任務(wù)隊列滿了),然后看下當(dāng)前的線程池中的線程是不是空的,如果空的,通過startThreads啟動一個線程(這個上面已經(jīng)介紹過了,直接看上面就可以了)


發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 满洲里市| 琼海市| 孟州市| 陇南市| 天长市| 雅安市| 阳江市| 谢通门县| 柳林县| 井冈山市| 绥棱县| 明光市| 嘉峪关市| 霍城县| 周口市| 邵东县| 花垣县| 克东县| 田东县| 巴彦淖尔市| 诸暨市| 普兰县| 和顺县| 天水市| 淮南市| 弋阳县| 涟源市| 沅江市| 崇阳县| 剑阁县| 祁东县| 玉屏| 康乐县| 右玉县| 兰坪| 渝北区| 阿拉善右旗| 东丽区| 太和县| 曲周县| 棋牌|