国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > JavaScript > 正文

深入理解Java線程編程中的阻塞隊列容器

2019-11-20 11:04:21
字體:
來源:轉載
供稿:網友

1. 什么是阻塞隊列?

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。

阻塞隊列提供了四種處理方法:

2015127142052051.png (522×105)

拋出異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException("Queue full")異常。當隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。
返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null
一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里take元素,隊列也會阻塞消費者線程,直到隊列可用。
超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。
2. Java里的阻塞隊列

JDK7提供了7個阻塞隊列。分別是

  1. ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
  2. LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
  3. PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
  4. DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
  5. SynchronousQueue:一個不存儲元素的阻塞隊列。
  6. LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  7. LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

ArrayBlockingQueue是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的所有生產者線程或消費者線程,當隊列可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產者線程,可以先往隊列里插入元素,先阻塞的消費者線程,可以先從隊列里獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下代碼創建一個公平的阻塞隊列:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

訪問者的公平性是使用可重入鎖實現的,代碼如下:

public ArrayBlockingQueue(int capacity, boolean fair) {    if (capacity <= 0)      throw new IllegalArgumentException();    this.items = new Object[capacity];    lock = new ReentrantLock(fair);    notEmpty = lock.newCondition();    notFull = lock.newCondition();}

LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。

PriorityBlockingQueue是一個支持優先級的無界隊列。默認情況下元素采取自然順序排列,也可以通過比較器comparator來指定元素的排序規則。元素按照升序排列。

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。我們可以將DelayQueue運用在以下應用場景:

緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。
隊列中的Delayed必須實現compareTo來指定元素的順序。比如讓延時時間最長的放在隊列的末尾。實現代碼如下:

public int compareTo(Delayed other) {      if (other == this) // compare zero ONLY if same object        return 0;      if (other instanceof ScheduledFutureTask) {        ScheduledFutureTask x = (ScheduledFutureTask)other;        long diff = time - x.time;        if (diff < 0)          return -1;        else if (diff > 0)          return 1;  else if (sequenceNumber < x.sequenceNumber)          return -1;        else          return 1;      }      long d = (getDelay(TimeUnit.NANOSECONDS) -           other.getDelay(TimeUnit.NANOSECONDS));      return (d == 0) ? 0 : ((d < 0) ? -1 : 1);    }

3.如何實現Delayed接口

我們可以參考ScheduledThreadPoolExecutor里ScheduledFutureTask類。這個類實現了Delayed接口。首先:在對象創建的時候,使用time記錄前對象什么時候可以使用,代碼如下:


ScheduledFutureTask(Runnable r, V result, long ns, long period) {      super(r, result);      this.time = ns;      this.period = period;      this.sequenceNumber = sequencer.getAndIncrement();}

然后使用getDelay可以查詢當前元素還需要延時多久,代碼如下:

public long getDelay(TimeUnit unit) {      return unit.convert(time - now(), TimeUnit.NANOSECONDS);    }

通過構造函數可以看出延遲時間參數ns的單位是納秒,自己設計的時候最好使用納秒,因為getDelay時可以指定任意單位,一旦以納秒作為單位,而延時的時間又精確不到納秒就麻煩了。使用時請注意當time小于當前時間時,getDelay會返回負數。

4.如何實現延時隊列

延時隊列的實現很簡單,當消費者從隊列里獲取元素時,如果元素沒有達到延時時間,就阻塞當前線程。

long delay = first.getDelay(TimeUnit.NANOSECONDS);          if (delay <= 0)            return q.poll();          else if (leader != null)            available.await();

SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。SynchronousQueue可以看成是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常適合于傳遞性場景,比如在一個線程中使用的數據,傳遞給另外一個線程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。

LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對于其他阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法。如果當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生產者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節點,并等到該元素被消費者消費了才返回。transfer方法的關鍵代碼如下:

Node pred = tryAppend(s, haveData);return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行代碼是試圖把存放當前元素的s節點作為tail節點。第二行代碼是讓CPU自旋等待消費者消費元素。因為自旋會消耗CPU,所以自旋一定的次數后使用Thread.yield()方法來暫停當前正在執行的線程,并執行其他線程。

tryTransfer方法。則是用來試探下生產者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回false。和transfer方法的區別是tryTransfer方法無論消費者是否接收,方法立即返回。而transfer方法是必須等到消費者消費了才返回。

對于帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再返回,如果超時還沒消費元素,則返回false,如果在超時時間內消費了元素,則返回true。

LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結尾的方法,表示插入,獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入,獲取或移除雙端隊列的最后一個元素。另外插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法卻等同于takeFirst,不知道是不是Jdk的bug,使用時還是用帶有First和Last后綴的方法更清楚。

在初始化LinkedBlockingDeque時可以設置容量防止其過渡膨脹。另外雙向阻塞隊列可以運用在“工作竊取”模式中。

5.阻塞隊列的實現原理
本文以ArrayBlockingQueue為例,其他阻塞隊列實現原理可能和ArrayBlockingQueue有一些差別,但是大體思路應該類似,有興趣的朋友可自行查看其他阻塞隊列的實現源碼。

  首先看一下ArrayBlockingQueue類中的幾個成員變量:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; /** The queued items */private final E[] items;/** items index for next take, poll or remove */private int takeIndex;/** items index for next put, offer, or add. */private int putIndex;/** Number of items in the queue */private int count; /** Concurrency control uses the classic two-condition algorithm* found in any textbook.*/ /** Main lock guarding all access */private final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;}

   可以看出,ArrayBlockingQueue中用來存儲元素的實際上是一個數組,takeIndex和putIndex分別表示隊首元素和隊尾元素的下標,count表示隊列中元素的個數。

  lock是一個可重入鎖,notEmpty和notFull是等待條件。

  下面看一下ArrayBlockingQueue的構造器,構造器有三個重載版本:

public ArrayBlockingQueue(int capacity) {}public ArrayBlockingQueue(int capacity, boolean fair) { }public ArrayBlockingQueue(int capacity, boolean fair,             Collection<? extends E> c) {}

   第一個構造器只有一個參數用來指定容量,第二個構造器可以指定容量和公平性,第三個構造器可以指定容量、公平性以及用另外一個集合進行初始化。

  然后看它的兩個關鍵方法的實現:put()和take():

public void put(E e) throws InterruptedException {  if (e == null) throw new NullPointerException();  final E[] items = this.items;  final ReentrantLock lock = this.lock;  lock.lockInterruptibly();  try {    try {      while (count == items.length)        notFull.await();    } catch (InterruptedException ie) {      notFull.signal(); // propagate to non-interrupted thread      throw ie;    }    insert(e);  } finally {    lock.unlock();  }}

   從put方法的實現可以看出,它先獲取了鎖,并且獲取的是可中斷鎖,然后判斷當前元素個數是否等于數組的長度,如果相等,則調用notFull.await()進行等待,如果捕獲到中斷異常,則喚醒線程并拋出異常。

  當被其他線程喚醒時,通過insert(e)方法插入元素,最后解鎖。

  我們看一下insert方法的實現:

private void insert(E x) {  items[putIndex] = x;  putIndex = inc(putIndex);  ++count;  notEmpty.signal();}

   它是一個private方法,插入成功后,通過notEmpty喚醒正在等待取元素的線程。

  下面是take()方法的實現:

public E take() throws InterruptedException {  final ReentrantLock lock = this.lock;  lock.lockInterruptibly();  try {    try {      while (count == 0)        notEmpty.await();    } catch (InterruptedException ie) {      notEmpty.signal(); // propagate to non-interrupted thread      throw ie;    }    E x = extract();    return x;  } finally {    lock.unlock();  }}


   跟put方法實現很類似,只不過put方法等待的是notFull信號,而take方法等待的是notEmpty信號。在take方法中,如果可以取元素,則通過extract方法取得元素,下面是extract方法的實現:


private E extract() {  final E[] items = this.items;  E x = items[takeIndex];  items[takeIndex] = null;  takeIndex = inc(takeIndex);  --count;  notFull.signal();  return x;}

   跟insert方法也很類似。

  其實從這里大家應該明白了阻塞隊列的實現原理,事實它和我們用Object.wait()、Object.notify()和非阻塞隊列實現生產者-消費者的思路類似,只不過它把這些工作一起集成到了阻塞隊列中實現。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 祁门县| 台北县| 河北省| 延边| 安泽县| 韶山市| 丰都县| 宜兴市| 小金县| 安图县| 泗洪县| 河曲县| 高台县| 邵东县| 景洪市| 汝城县| 东台市| 沂南县| 八宿县| 河源市| 商洛市| 宜君县| 旌德县| 彰武县| 永新县| 安国市| 开江县| 廉江市| 西和县| 红安县| 临高县| 广宁县| 大悟县| 崇州市| 鄂伦春自治旗| 临颍县| 瑞安市| 津南区| 保亭| 郓城县| 宜丰县|