原子(atomic),本意是指“不能被進一步分割的粒子”。原子操作意味著“不可被中斷的一個或一系列操作”。在java中通過鎖和循環CAS的方式實現原子操作。
鎖機制保證了只有獲得鎖的線程才能操作鎖定的內存區域,在JDK 5之前Java語言是靠synchronized關鍵字保證同步的,synchronized可以保證方法或代碼塊在運行時,同一時刻只有一個線程可以進入到臨界區(互斥性),同時它還保證了共享變量的內存可見性。
Java中的每個對象都可以作為鎖。
普通同步方法,鎖是當前實例對象。
靜態同步方法,鎖是當前類的class對象。
同步代碼塊,鎖是括號中的對象。
我們先來看一下等待/通知機制:
import java.util.concurrent.TimeUnit;/** * Created by aires on 2017/3/4. */public class WaitNotify { static boolean flag = true; static Object lock = new Object(); public static void main(String[] args) throws InterruptedException { Thread A = new Thread(new Wait(), "wait thread"); A.start(); TimeUnit.SECONDS.sleep(2); Thread B = new Thread(new Notify(), "notify thread"); B.start(); } static class Wait implements Runnable { @Override public void run() { synchronized (lock) { while (flag) { try { System.out.PRintln(Thread.currentThread() + " flag is true"); lock.wait(); } catch (InterruptedException e) { } } System.out.println(Thread.currentThread() + " flag is false"); } } } static class Notify implements Runnable { @Override public void run() { synchronized (lock) { flag = false; lock.notifyAll(); try { TimeUnit.SECONDS.sleep(7); } catch (InterruptedException e) { e.printStackTrace(); } } } }}等待/通知機制相關方法在java.lang.Object上定義,線程A在獲取鎖后調用了對象lock的wait方法進入了等待狀態,線程B調用對象lock的notifyAll()方法,線程A收到通知后從wait方法處返回繼續執行,線程B對共享變量flag的修改對線程A來說是可見的。
整個運行過程需要注意一下幾點:
使用wait()、notify()和notifyAll()時需要先對調用對象加鎖,調用wait()方法后會釋放鎖。
調用wait()方法之后,線程狀態由RUNNING變為WAITING,并將當前線程放置到對象的等待隊列中。
notify()或notifyAll()方法調用后,等待線程不會立刻從wait()中返回,需要等該線程釋放鎖之后,才有機會獲取鎖,之后從wait()返回。
notify()方法將等待隊列中的一個等待線程從等待隊列中移動到同步隊列中;
notifyAll()方法則是把等待隊列中的所有線程都移動到同步隊列中,被移動的線程狀態從WAITING變為BLOCKED。
從wait()方法返回的前提是,該線程獲得了調用對象的鎖。
那么,它是如何實現線程之間的互斥性和可見性?
互斥性 我們通過一段代碼解釋:
/** * Created by aires on 2017/3/4. */public class SynchronizedDemo { private static Object object = new Object(); public static void main(String[] args) throws Exception{ synchronized(object) { //同步代碼塊 } } public static synchronized void m() {} //同步方法}上這段代碼中,使用了同步代碼塊和同步方法,
同步代碼塊使用了 monitorenter 和 monitorexit 指令實現。
同步方法中依靠方法修飾符上的 ACC_SYNCHRONIZED 實現。
無論哪種實現,本質上都是對指定對象相關聯的monitor的獲取,這個過程是互斥性的,也就是說同一時刻只有一個線程能夠成功,其它失敗的線程會被阻塞,并放入到同步隊列中,進入BLOCKED狀態。
鎖的內部機制
通常情況下鎖有4種狀態:無鎖狀態,偏向鎖狀態,輕量級鎖狀態,重量級鎖狀態。 在進一步了解鎖之前,我們需要了解兩個概念:對象頭和monitor。
什么是對象頭?
在hotspot虛擬機中,對象在內存的分布分為3個部分:對象頭,實例數據,和對齊填充。 mark Word 被分成兩部分,lock word和標志位。 Klass ptr指向Class字節碼在虛擬機內部的對象表示的地址。 Fields表示連續的對象實例字段。

mark word 被設計為非固定的數據結構,以便在極小的空間內存儲更多的信息。比如:在32位的hotspot虛擬機中:如果對象處于未被鎖定的情況下。mark word 的32bit空間中有25bit存儲對象的哈希碼、4bit存儲對象的分代年齡、2bit存儲鎖的標記位、1bit固定為0。而在其他的狀態下(輕量級鎖、重量級鎖、GC標記、可偏向)下對象的存儲結構為:

monitor
monitor是線程私有的數據結構,每一個線程都有一個可用monitor列表,同時還有一個全局的可用列表,先來看monitor的內部: 
Owner:初始時為NULL表示當前沒有任何線程擁有該monitor,當線程成功擁有該鎖后保存線程唯一標識,當鎖被釋放時又設置為NULL;
EntryQ:關聯一個系統互斥鎖(semaphore),阻塞所有試圖鎖住monitor而失敗的線程。
RcThis:表示blocked或waiting在該monitor上的所有線程的個數。
Nest:用來實現重入鎖的計數。
HashCode:保存從對象頭拷貝過來的HashCode值(可能還包含GC age)。
Candidate:用來避免不必要的阻塞或等待線程喚醒,因為每一次只有一個線程能夠成功擁有鎖,如果每次前一個釋放鎖的線程喚醒所有正在阻塞或等待的線程,會引起不必要的上下文切換(從阻塞到就緒然后因為競爭鎖失敗又被阻塞)從而導致性能嚴重下降。Candidate只有兩種可能的值:0表示沒有需要喚醒的線程,1表示要喚醒一個繼任線程來競爭鎖。
那么Monitor和對象頭又是如何工作的呢?
在 java 虛擬機中,線程一旦進入到被synchronized修飾的方法或代碼塊時,指定的鎖對象通過某些操作將對象頭中的LockWord指向monitor 的起始地址與之關聯,同時monitor 中的Owner存放擁有該鎖的線程的唯一標識,確保一次只能有一個線程執行該部分的代碼,線程在獲取鎖之前不允許執行該部分的代碼。
接下去,我們可以深入了解下在鎖各個狀態下,底層是如何處理多線程之間對鎖的競爭。
偏向鎖
下述代碼中,當線程訪問同步方法method1時,會在對象頭(SynchronizedDemo.class對象的對象頭)和棧幀的鎖記錄中存儲鎖偏向的線程ID,下次該線程在進入method2,只需要判斷對象頭存儲的線程ID是否為當前線程,而不需要進行CAS操作進行加鎖和解鎖(因為CAS原子指令雖然相對于重量級鎖來說開銷比較小但還是存在非??捎^的本地延遲)。
public class SynchronizedDemo { private static Object lock = new Object(); public static void main(String[] args) { method1(); method2(); } synchronized static void method1() {} synchronized static void method2() {}}輕量級鎖
利用了CPU原語Compare-And-Swap(CAS,匯編指令CMPXCHG)。線程可以通過兩種方式鎖住一個對象:
通過膨脹一個處于無鎖狀態(狀態位001)的對象獲得該對象的鎖;
對象處于膨脹狀態(狀態位00),但LockWord指向的monitor的Owner字段為NULL,則可以直接通過CAS原子指令嘗試將Owner設置為自己的標識來獲得鎖。
獲取鎖(monitorenter)的大概過程:
對象處于無鎖狀態時(LockWord的值為hashCode等,狀態位為001),線程首先從monitor列表中取得一個空閑的monitor,初始化Nest和Owner值為1和線程標識,一旦monitor準備好,通過CAS替換monitor起始地址到LockWord進行膨脹。如果存在其它線程競爭鎖的情況而導致CAS失敗,則回到monitorenter重新開始獲取鎖的過程即可。
對象已經膨脹,monitor中的Owner指向當前線程,這是重入鎖的情況(reentrant),將Nest加1,不需要CAS操作,效率高。
對象已經膨脹,monitor中的Owner為NULL,此時多個線程通過CAS指令試圖將Owner設置為自己的標識獲得鎖,競爭失敗的線程則進入第4種情況。
對象已經膨脹,同時Owner指向別的線程,在調用操作系統的重量級的互斥鎖之前自旋一定的次數,當達到一定的次數如果仍然沒有獲得鎖,則開始準備進入阻塞狀態,將rfThis值原子加1,由于在加1的過程中可能被其它線程破壞對象和monitor之間的聯系,所以在加1后需要再進行一次比較確保lock word的值沒有被改變,當發現被改變后則要重新進行monitorenter過程。同時再一次觀察Owner是否為NULL,如果是則調用CAS參與競爭鎖,鎖競爭失敗則進入到阻塞狀態。
釋放鎖(monitorexit)的大概過程:
檢查該對象是否處于膨脹狀態并且該線程是這個鎖的擁有者,如果發現不對則拋出異常。
檢查Nest字段是否大于1,如果大于1則簡單的將Nest減1并繼續擁有鎖,如果等于1,則進入到步驟3。
檢查rfThis是否大于0,設置Owner為NULL然后喚醒一個正在阻塞或等待的線程再一次試圖獲取鎖,如果等于0則進入到步驟4。
縮?。╠eflate)一個對象,通過將對象的LockWord置換回原來的HashCode等值來解除和monitor之間的關聯來釋放鎖,同時將monitor放回到線程私有的可用monitor列表。
public class SynchronizedDemo implements Runnable { private static Object lock = new Object(); public static void main(String[] args) { Thread A = new Thread(new SynchronizedDemo(), "A"); A.start(); Thread B = new Thread(new SynchronizedDemo(), "B"); B.start(); } synchronized static void method1() { } synchronized static void method2() { } @Override public void run() { method1(); method2(); }}重量級鎖
當鎖處于這個狀態下,其他線程試圖獲取鎖都會被阻塞住,當持有鎖的線程釋放鎖之后會喚醒這些線程。
內存可見性
線程釋放鎖時,JMM會把該線程對應的本地內存中的共享變量刷新到主內存中。
線程獲取鎖時,JMM會把該線程對應的本地內存置為無效,從而使得被監視器保護的臨界區代碼必須從主內存中讀取共享變量。
鎖機制存在以下問題:
(1)在多線程競爭下,加鎖、釋放鎖會導致比較多的上下文切換和調度延時,引起性能問題。
(2)一個線程持有鎖會導致其它所有需要此鎖的線程掛起。
(3)如果一個優先級高的線程等待一個優先級低的線程釋放鎖會導致優先級倒置,引起性能風險。
volatile是不錯的機制,但是volatile不能保證原子性。因此對于同步最終還是要回到鎖機制上來。
獨占鎖是一種悲觀鎖,synchronized就是一種獨占鎖,會導致其它所有需要鎖的線程掛起,等待持有鎖的線程釋放鎖。而另一個更加有效的鎖就是樂觀鎖。所謂樂觀鎖就是,每次不加鎖而是假設沒有沖突而去完成某項操作,如果因為沖突失敗就重試,直到成功為止。樂觀鎖用到的機制就是CAS,Compare and Swap。
什么是CAS
CAS,Compare and Swap。
在java語言之前,并發就已經廣泛存在并在服務器領域得到了大量的應用。所以硬件廠商在很早之前就在芯片中加入了大量支持并發操作的原語,從而在硬件層面提升效率。比如:intel在其CPU中,使用cmpxchg指令。
在Java發展初期,java語言是不能夠利用硬件提供的這些便利來提升系統的性能的。但隨著java不斷的發展,Java本地方法(JNI)的出現,使得java程序能夠越過JVM直接調用本地方法。CAS也成為java.util.concurrent的基石,CAS實現了區別于synchronouse同步鎖的一種樂觀鎖。
CAS 操作包含三個操作數 —— 內存位置(V)、預期原值(A)和新值(B)。 如果內存位置的值與預期原值相匹配,那么處理器會自動將該位置值更新為新值 。否則,處理器不做任何操作。無論哪種情況,它都會在 CAS 指令之前返回該 位置的值。(在 CAS 的一些特殊情況下將僅返回 CAS 是否成功,而不提取當前值。)CAS 有效地說明了“我認為位置 V 應該包含值 A;如果包含該值,則將 B 放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。”
通常將 CAS 用于同步的方式是從地址 V 讀取值 A,執行多步計算來獲得新值 B,然后使用 CAS 將 V 的值從 A 改為 B。如果 V 處的值尚未同時更改,則 CAS 操作成功。
類似于 CAS 的指令允許算法執行讀-修改-寫操作,而無需害怕其他線程同時 修改變量,因為如果其他線程修改變量,那么 CAS 會檢測它(并失?。?,算法 可以對該操作重新計算。
CAS的目的
利用CPU的CAS指令,同時借助JNI來完成Java的非阻塞算法。其它原子操作都是利用類似的特性完成的。而整個java.util.concurrent都是建立在CAS之上的,因此對于synchronized阻塞算法,java.util.concurrent在性能上有了很大的提升。
下面我們通過java.util.concurrent中的AtomicInteger來理解基于CAS實現的非阻塞算法 :
import java.io.Serializable;import java.util.function.IntBinaryOperator;import java.util.function.IntUnaryOperator;import sun.misc.Unsafe;public class AtomicInteger extends Number implements Serializable { private static final long serialVersionUID = 6214790243416807050L; private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; private volatile int value; public AtomicInteger(int var1) { this.value = var1; } public AtomicInteger() { } public final int get() { return this.value; } public final void set(int var1) { this.value = var1; } public final void lazySet(int var1) { unsafe.putOrderedInt(this, valueOffset, var1); } public final int getAndSet(int var1) { return unsafe.getAndSetInt(this, valueOffset, var1); } public final boolean compareAndSet(int var1, int var2) { return unsafe.compareAndSwapInt(this, valueOffset, var1, var2); } public final boolean weakCompareAndSet(int var1, int var2) { return unsafe.compareAndSwapInt(this, valueOffset, var1, var2); } public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } public final int getAndDecrement() { return unsafe.getAndAddInt(this, valueOffset, -1); } public final int getAndAdd(int var1) { return unsafe.getAndAddInt(this, valueOffset, var1); } public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } public final int decrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, -1) - 1; } public final int addAndGet(int var1) { return unsafe.getAndAddInt(this, valueOffset, var1) + var1; } public final int getAndUpdate(IntUnaryOperator var1) { int var2; int var3; do { var2 = this.get(); var3 = var1.applyAsInt(var2); } while(!this.compareAndSet(var2, var3)); return var2; } public final int updateAndGet(IntUnaryOperator var1) { int var2; int var3; do { var2 = this.get(); var3 = var1.applyAsInt(var2); } while(!this.compareAndSet(var2, var3)); return var3; } public final int getAndAccumulate(int var1, IntBinaryOperator var2) { int var3; int var4; do { var3 = this.get(); var4 = var2.applyAsInt(var3, var1); } while(!this.compareAndSet(var3, var4)); return var3; } public final int accumulateAndGet(int var1, IntBinaryOperator var2) { int var3; int var4; do { var3 = this.get(); var4 = var2.applyAsInt(var3, var1); } while(!this.compareAndSet(var3, var4)); return var4; } public String toString() { return Integer.toString(this.get()); } public int intValue() { return this.get(); } public long longValue() { return (long)this.get(); } public float floatValue() { return (float)this.get(); } public double doubleValue() { return (double)this.get(); } static { try { valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value")); } catch (Exception var1) { throw new Error(var1); } }}一個線程的失敗或者掛起不應該影響其他線程的失敗或掛起的算法。
現代的CPU提供了特殊的指令,可以自動更新共享數據,而且能夠檢測到其他線程的干擾,而 compareAndSet() 就用這些代替了鎖定。
private volatile int value;首先毫無以為,在沒有鎖的機制下可能需要借助volatile原語,保證線程間的數據是可見的(共享的)。
這樣才獲取變量的值的時候才能直接讀取。
public final int get() { return value; }然后來看看++i是怎么做到的。
public final int incrementAndGet() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return next; } }在這里采用了CAS操作,每次從內存中讀取數據然后將此數據和+1后的結果進行CAS操作,如果成功就返回結果,否則重試直到成功為止。
而compareAndSet利用JNI來完成CPU指令的操作。
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }整體的過程就是這樣子的,利用CPU的CAS指令,同時借助JNI來完成Java的非阻塞算法。其它原子操作都是利用類似的特性完成的。
CAS的原理
CAS通過調用JNI的代碼實現的。JNI:Java Native Interface為JAVA本地調用,允許java調用其他語言。而compareAndSwapInt就是借助C來調用CPU底層指令實現的。
下面從分析比較常用的CPU(intel x86)來解釋CAS的實現原理。
下面是sun.misc.Unsafe類的compareAndSwapInt()方法的源代碼:
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);可以看到這是個本地方法調用。這個本地方法在openjdk中依次調用的c++代碼為:unsafe.cpp,atomic.cpp和atomicwindowsx86.inline.hpp。這個本地方法的最終實現在openjdk的如下位置:/openjdk/hotspot/src/oscpu/windowsx86/vm/ atomicwindowsx86.inline.hpp(對應于windows操作系統,X86處理器)。下面是對應于intel x86處理器的源代碼的片段:
// Adding a lock prefix to an instruction on MP machine// VC++ doesn't like the lock prefix to be on a single line// so we can't insert a label after the lock prefix.// By emitting a lock prefix, we can define a label after it.#define LOCK_IF_MP(mp) __asm cmp mp, 0 / __asm je L0 / __asm _emit 0xF0 / __asm L0:inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // alternative for InterlockedCompareExchange int mp = os::is_MP(); __asm { mov edx, dest mov ecx, exchange_value mov eax, compare_value LOCK_IF_MP(mp) cmpxchg dword ptr [edx], ecx }}如上面源代碼所示,程序會根據當前處理器的類型來決定是否為cmpxchg指令添加lock前綴。如果程序是在多處理器上運行,就為cmpxchg指令加上lock前綴(lock cmpxchg)。反之,如果程序是在單處理器上運行,就省略lock前綴(單處理器自身會維護單處理器內的順序一致性,不需要lock前綴提供的內存屏障效果)。
三、CAS存在的問題
CAS雖然很高效的解決原子操作,但是CAS仍然存在三大問題。ABA問題,循環時間長開銷大和只能保證一個共享變量的原子操作
ABA問題。因為CAS需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那么使用CAS進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那么A-B-A 就會變成1A-2B-3A。
從Java1.5開始JDK的atomic包里提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法作用是首先檢查當前引用是否等于預期引用,并且當前標志是否等于預期標志,如果全部相等,則以原子方式將該引用和該標志的值設置為給定的更新值。
循環時間長開銷大。自旋CAS如果長時間不成功,會給CPU帶來非常大的執行開銷。如果JVM能支持處理器提供的pause指令那么效率會有一定的提升,pause指令有兩個作用,第一它可以延遲流水線執行指令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決于具體實現的版本,在一些處理器上延遲時間是零。第二它可以避免在退出循環的時候因內存順序沖突(memory order violation)而引起CPU流水線被清空(CPU pipeline flush),從而提高CPU的執行效率。
只能保證一個共享變量的原子操作。當對一個共享變量執行操作時,我們可以使用循環CAS的方式來保證原子操作,但是對多個共享變量操作時,循環CAS就無法保證操作的原子性,這個時候就可以用鎖,或者有一個取巧的辦法,就是把多個共享變量合并成一個共享變量來操作。比如有兩個共享變量i=2,j=a,合并一下ij=2a,然后用CAS來操作ij。從Java1.5開始JDK提供了AtomicReference類來保證引用對象之間的原子性,你可以把多個變量放在一個對象里來進行CAS操作。
concurrent包的實現
由于java的CAS同時具有 volatile 讀和volatile寫的內存語義,因此Java線程之間的通信現在有了下面四種方式:
A線程寫volatile變量,隨后B線程讀這個volatile變量.A線程寫volatile變量,隨后B線程用CAS更新這個volatile變量。A線程用CAS更新一個volatile變量,隨后B線程用CAS更新這個volatile變量.A線程用CAS更新一個volatile變量,隨后B線程讀這個volatile變量。Java的CAS會使用現代處理器上提供的高效機器級別原子指令,這些原子指令以原子方式對內存執行讀-改-寫操作,這是在多處理器中實現同步的關鍵(從本質上來說,能夠支持原子性讀-改-寫指令的計算機器,是順序計算圖靈機的異步等價機器,因此任何現代的多處理器都會去支持某種能對內存執行原子性讀-改-寫操作的原子指令)。同時,volatile變量的讀/寫和CAS可以實現線程之間的通信。把這些特性整合在一起,就形成了整個concurrent包得以實現的基石。如果我們仔細分析concurrent包的源代碼實現,會發現一個通用化的實現模式:
首先,聲明共享變量為volatile;然后,使用CAS的原子條件更新來實現線程之間的同步;同時,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的內存語義來實現線程之間的通信。AQS,非阻塞數據結構和原子變量類(java.util.concurrent.atomic包中的類),這些concurrent包中的基礎類都是使用這種模式來實現的,而concurrent包中的高層類又是依賴于這些基礎類來實現的。從整體來看,concurrent包的實現示意圖如下:

新聞熱點
疑難解答