從用法上來說,CyclicBarrier可能看出是CountDownLatch的高級版本,增加了重置的功能,對于多個線程的中斷提供了通知的功能。
具體的用法通過api就有比較詳細的介紹。
首先CyclicBarrier內部有一個內部靜態類Generation。當然在每個CyclicBarrier實例中也有一個Generation域
這個類只有一個內部域broken用來表示當前的屏障是否被打破了。
PRivate static class Generation { boolean broken = false; }Generation只在線程不中斷的情況下用來判斷CyclicBarrier的狀態的。 是由于有count個線程調用了await來正常中斷的——即所謂的開閘狀態。 還是由于其他特殊原因打破了CyclicBarrier(也就是當前CyclicBarrier無效了)——即所謂的打破狀態。
而如果需要重置也就是講CyclicBarrier實例中的域來重新構建一個新的Generation就可以了。
鎖、條件隊列、狀態變量、條件謂詞之間的關系。
最主要的就是await()方法。
實現的功能:
調用await()的線程會等待直到有足夠數量的線程調用await——也就是開閘狀態,
當最后一個線程到達或者出現下面的情況——也就是打破狀態。
有其他線程中斷當前線程。則拋出interruptException指定了限時操作,并到達線程,則拋出TimeoutException如果barrier被重置,或者屏障處于打破狀態,則拋出BrokenBarrierException什么樣的情況會出現打破狀態?當任意等待線程拋出BrokenBarrierException的時候會使得當前屏障處于打破狀態。
await方法是通過一個內部方法dowait來實現的。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken)//如果當前Generation是處于打破狀態則傳播這個BrokenBarrierExcption throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier();//如果當前線程被中斷則使得當前generation處于打破狀態,重置剩余count。并且喚醒狀態變量。這時候其他線程會傳播BrokenBarrierException. throw new InterruptedException(); } int index = --count;//嘗試降低當前count if (index == 0) { // tripped//如果當前狀態將為0,則Generation處于開閘狀態。運行可能存在的command,設置下一個Generation。相當于每次開閘之后都進行了一次reset。 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction)//如果運行command失敗也會導致當前屏障被打破。 breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed)//阻塞在當前的狀態變量。 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) {//如果當前線程被中斷了則使得屏障被打破。并拋出異常。 breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt();//這種捕獲了InterruptException之后調用Thread.currentThread().interrupt()是一種通用的方式。但是之前源碼中好像都沒有體現。我第一次見這個好像是java并發實踐中。這樣做的目的是什么?其實就是為了保存中斷狀態,從而讓其他更高層次的代碼注意到這個中斷。但是需要注意的是這里需要其他代碼予以配合才行否則這樣做其實是比較危險的一種方式,因為這相當于吞了這個異常。 } } //從阻塞恢復之后,需要重新判斷當前的狀態。 if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }此外再看下兩個小過程:
這兩個小過程當然是需要鎖的,但是由于這兩個方法只是通過其他方法調用,所以依然是在持有鎖的范圍內運行的。這兩個方法都是對域進行操作。
nextGeneration實際上在屏障開閘之后重置狀態。以待下一次調用。 breakBarrier實際上是在屏障打破之后設定打破狀態,以喚醒其他線程并通知。
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }reset reset方法比較簡單。但是這里還是要注意一下要先打破當前屏蔽,然后再重建一個新的屏蔽。否則的話可能會導致信號丟失。
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }新聞熱點
疑難解答