學(xué)習(xí)java的同學(xué)注意了!!! 學(xué)習(xí)過程中遇到什么問題或者想獲取學(xué)習(xí)資源的話,歡迎加入Java學(xué)習(xí)交流群,群號碼:183993990 我們一起學(xué)Java!
計算機(jī)用戶想當(dāng)然地認(rèn)為他們的系統(tǒng)在一個時間可以做多件事。他們認(rèn)為,他們可以工作在一個字處理器,而其他應(yīng)用程序在下載文件,管理打印隊列和音頻流。即使是單一的應(yīng)用程序通常也是被期望在一個時間來做多件事。例如,音頻流應(yīng)用程序必須同時讀取數(shù)字音頻,解壓,管理播放,并更新顯示。即使字處理器應(yīng)該隨時準(zhǔn)備響應(yīng)鍵盤和鼠標(biāo)事件,不管多么繁忙,它總是能格式化文本或更新顯示。可以做這樣的事情的軟件稱為并發(fā)軟件(concurrent software)。
在 Java 平臺是完全支持并發(fā)編程。自從 5.0 版本以來,這個平臺還包括高級并發(fā) API, 主要集中在 java.util.concurrent 包。
進(jìn)程(PRocesses )和線程(Threads)
進(jìn)程和線程是并發(fā)編程的兩個基本的執(zhí)行單元。在 Java 中,并發(fā)編程主要涉及線程。
一個計算機(jī)系統(tǒng)通常有許多活動的進(jìn)程和線程。在給定的時間內(nèi),每個處理器只能有一個線程得到真正的運行。對于單核處理器來說,處理時間是通過時間切片來在進(jìn)程和線程之間進(jìn)行共享的。
現(xiàn)在多核處理器或多進(jìn)程的電腦系統(tǒng)越來越流行。這大大增強(qiáng)了系統(tǒng)的進(jìn)程和線程的并發(fā)執(zhí)行能力。但即便是沒有多處理器或多進(jìn)程的系統(tǒng)中,并發(fā)仍然是可能的。
進(jìn)程
進(jìn)程有一個獨立的執(zhí)行環(huán)境。進(jìn)程通常有一個完整的、私人的基本運行時資源;特別是,每個進(jìn)程都有其自己的內(nèi)存空間。
進(jìn)程往往被視為等同于程序或應(yīng)用程序。然而,用戶將看到一個單獨的應(yīng)用程序可能實際上是一組合作的進(jìn)程。大多數(shù)操作系統(tǒng)都支持進(jìn)程間通信( Inter Process Communication,簡稱 ipC)資源,如管道和套接字。IPC 不僅用于同個系統(tǒng)的進(jìn)程之間的通信,也可以用在不同系統(tǒng)的進(jìn)程。
大多數(shù) Java 虛擬機(jī)的實現(xiàn)作為一個進(jìn)程運行。Java 應(yīng)用程序可以使用 ProcessBuilder 對象創(chuàng)建額外的進(jìn)程。多進(jìn)程應(yīng)用程序超出了本書的講解范圍。
線程
線程有時被稱為輕量級進(jìn)程。進(jìn)程和線程都提供一個執(zhí)行環(huán)境,但創(chuàng)建一個新的線程比創(chuàng)建一個新的進(jìn)程需要更少的資源。
線程中存在于進(jìn)程中,每個進(jìn)程都至少一個線程。線程共享進(jìn)程的資源,包括內(nèi)存和打開的文件。這使得工作變得高效,但也存在了一個潛在的問題——通信。
多線程執(zhí)行是 Java 平臺的一個重要特點。每個應(yīng)用程序都至少有一個線程,或者幾個,如果算上“系統(tǒng)”的線程(負(fù)責(zé)內(nèi)存管理和信號處理)那就更多。但從程序員的角度來看,你啟動只有一個線程,稱為主線程。這個線程有能力創(chuàng)建額外的線程。
線程對象
每個線程都與 Thread 類的一個實例相關(guān)聯(lián)。有兩種使用線程對象來創(chuàng)建并發(fā)應(yīng)用程序的基本策略:
為了直接控制線程的創(chuàng)建和管理,簡單地初始化線程,應(yīng)用程序每次需要啟動一個異步任務(wù)。通過傳遞給應(yīng)用程序任務(wù)給一個 Executor,從而從應(yīng)用程序的其他部分抽象出線程管理。定義和啟動一個線程
有兩種方式穿件 Thread 的實例:
提供 Runnable 對象。Runnable 接口定義了一個方法 run ,用來包含線程要執(zhí)行的代碼。如 HelloRunnable 所示:public class HelloRunnable implements Runnable { /* (non-Javadoc) * @see java.lang.Runnable#run() */ @Override public void run() { System.out.println("Hello from a thread!"); } /** * @param args */ public static void main(String[] args) { (new Thread(new HelloRunnable())).start(); }}繼承 Thread。Thread 類本身是實現(xiàn) Runnable,雖然它的 run 方法啥都沒干。HelloThread 示例如下:public class HelloThread extends Thread { public void run() { System.out.println("Hello from a thread!"); } /** * @param args */ public static void main(String[] args) { (new HelloThread()).start(); }}請注意,這兩個例子調(diào)用 start 來啟動線程。
第一種方式,它使用 Runnable 對象,在實際應(yīng)用中更普遍,因為 Runnable 對象可以繼承 Thread 以外的類。第二種方式,在簡單的應(yīng)用程序更容易使用,但受限于你的任務(wù)類必須是一個 Thread 的后代。本書推薦使用第一種方法,將 Runnable 任務(wù)從 Thread 對象分離來執(zhí)行任務(wù)。這不僅更靈活,而且它適用于高級線程管理 API。
Thread 類定義了大量的方法用于線程管理。
Sleep 來暫停執(zhí)行
Thread.sleep 可以當(dāng)前線程執(zhí)行暫停一個時間段,這樣處理器時間就可以給其他線程使用。
sleep 有兩種重載形式:一個是指定睡眠時間到毫秒,另外一個是指定的睡眠時間為納秒級。然而,這些睡眠時間不能保證是精確的,因為它們是通過由基礎(chǔ) OS 提供的,并受其限制。此外,睡眠周期也可以通過中斷終止,我們將在后面的章節(jié)中看到。在任何情況下,你不能假設(shè)調(diào)用 sleep 會掛起線程用于指定精確的時間段。
SleepMessages 示例使用 sleep 每隔4秒打印一次消息:
public class SleepMessages { /** * @param args */ public static void main(String[] args) throws InterruptedException { String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; for (int i = 0; i < importantInfo.length; i++) { // Pause for 4 seconds Thread.sleep(4000); // Print a message System.out.println(importantInfo[i]); } }}請注意 main 聲明拋出 InterruptedException。當(dāng) sleep 是激活的時候,若有另一個線程中斷當(dāng)前線程時,則 sleep 拋出異常。由于該應(yīng)用程序還沒有定義的另一個線程來引起的中斷,所以考慮捕捉 InterruptedException。
中斷(interrupt)
中斷是表明一個線程,它應(yīng)該停止它正在做和將要做事的時。線程通過在 Thread 對象調(diào)用 interrupt 來實現(xiàn)線程的中斷。為了中斷機(jī)制能正常工作,被中斷的線程必須支持自己的中斷。
支持中斷
如何實現(xiàn)線程支持自己的中斷?這要看是什么它目前正在做。如果線程頻繁調(diào)用拋出InterruptedException 的方法,它只要在 run 方法捕獲了異常之后返回即可。例如 :
for (int i = 0; i < importantInfo.length; i++) { // Pause for 4 seconds try { Thread.sleep(4000); } catch (InterruptedException e) { // We've been interrupted: no more messages. return; } // Print a message System.out.println(importantInfo[i]);}很多方法都會拋出 InterruptedException,如 sleep,被設(shè)計成在收到中斷時立即取消他們當(dāng)前的操作并返回。
若線程長時間沒有調(diào)用方法拋出 InterruptedException 的話,那么它必須定期調(diào)用 Thread.interrupted ,在接收到中斷后返回 true。
for (int i = 0; i < inputs.length; i++) { heavyCrunch(inputs[i]); if (Thread.interrupted()) { // We've been interrupted: no more crunching. return; }}在這個簡單的例子中,代碼簡單地測試該中斷,如果已接收到中斷線程就退出。在更復(fù)雜的應(yīng)用程序,它可能會更有意義拋出一個 InterruptedException:
if (Thread.interrupted()) { throw new InterruptedException();}中斷狀態(tài)標(biāo)志
中斷機(jī)制是使用被稱為中斷狀態(tài)的內(nèi)部標(biāo)志實現(xiàn)的。調(diào)用 Thread.interrupt 可以設(shè)置該標(biāo)志。當(dāng)一個線程通過調(diào)用靜態(tài)方法 Thread.interrupted 檢查中斷,中斷狀態(tài)被清除。非靜態(tài) isInterrupted 方法,它是用于線程來查詢另一個線程的中斷狀態(tài),不會改變中斷狀態(tài)標(biāo)志。
按照慣例,任何方法因拋出一個 InterruptedException 退出都會清除中斷狀態(tài)。當(dāng)然,它可能因為另一個線程調(diào)用 interrupt 而讓那個中斷狀態(tài)立即被重新設(shè)置。
join 方法
join 方法允許一個線程等待另一個完成。假設(shè) t 是一個 Thread 對象,
t.join();它會導(dǎo)致當(dāng)前線程暫停執(zhí)行直到 t 線程終止。join 允許程序員指定一個等待周期。與 sleep 一樣,等待時間是依賴于操作系統(tǒng)的時間,不能假設(shè) join 等待時間是精確的。
像 sleep 一樣,join 響應(yīng)中斷并通過 InterruptedException 退出。
SimpleThreads 示例
SimpleThreads 示例,有兩個線程,第一個線程是每個 Java 應(yīng)用程序都有主線程。主線程創(chuàng)建的 Runnable 對象 MessageLoop,并等待它完成。如果 MessageLoop 需要很長時間才能完成,主線程就中斷它。
該 MessageLoop 線程打印出一系列消息。如果中斷之前就已經(jīng)打印了所有消息,則 MessageLoop 線程打印一條消息并退出。
public class SimpleThreads { // Display a message, preceded by // the name of the current thread static void threadMessage(String message) { String threadName = Thread.currentThread().getName(); System.out.format("%s: %s%n", threadName, message); } private static class MessageLoop implements Runnable { public void run() { String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; try { for (int i = 0; i < importantInfo.length; i++) { // Pause for 4 seconds Thread.sleep(4000); // Print a message threadMessage(importantInfo[i]); } } catch (InterruptedException e) { threadMessage("I wasn't done!"); } } } public static void main(String args[]) throws InterruptedException { // Delay, in milliseconds before // we interrupt MessageLoop // thread (default one hour). long patience = 1000 * 60 * 60; // If command line argument // present, gives patience // in seconds. if (args.length > 0) { try { patience = Long.parseLong(args[0]) * 1000; } catch (NumberFormatException e) { System.err.println("Argument must be an integer."); System.exit(1); } } threadMessage("Starting MessageLoop thread"); long startTime = System.currentTimeMillis(); Thread t = new Thread(new MessageLoop()); t.start(); threadMessage("Waiting for MessageLoop thread to finish"); // loop until MessageLoop // thread exits while (t.isAlive()) { threadMessage("Still waiting..."); // Wait maximum of 1 second // for MessageLoop thread // to finish. t.join(1000); if (((System.currentTimeMillis() - startTime) > patience) && t.isAlive()) { threadMessage("Tired of waiting!"); t.interrupt(); // Shouldn't be long now // -- wait indefinitely t.join(); } } threadMessage("Finally!"); }}同步(Synchronization)
線程間的通信主要是通過共享訪問字段以及其字段所引用的對象來實現(xiàn)的。這種形式的通信是非常有效的,但可能導(dǎo)致2種可能的錯誤:線程干擾(thread interference)和內(nèi)存一致性錯誤(memory consistency errors)。同步就是要需要避免這些錯誤的工具。
但是,同步可以引入線程競爭(thread contention),當(dāng)兩個或多個線程試圖同時訪問相同的資源時,并導(dǎo)致了 Java 運行時執(zhí)行一個或多個線程更慢,或甚至?xí)和K麄兊膱?zhí)行。饑餓(Starvation)和活鎖 (livelock) 是線程競爭的表現(xiàn)形式。
線程干擾
描述當(dāng)多個線程訪問共享數(shù)據(jù)時是錯誤如何出現(xiàn)。
考慮下面的一個簡單的類 Counter:
public class Counter { private int c = 0; public void increment() { c++; } public void decrement() { c--; } public int value() { return c; }}其中的 increment 方法用來對 c 加1;decrement 方法用來對 c 減 1。然而,有多個線程中都存在對某個 Counter 對象的引用,那么線程間的干擾就可能導(dǎo)致出現(xiàn)我們不想要的結(jié)果。
線程間的干擾出現(xiàn)在多個線程對同一個數(shù)據(jù)進(jìn)行多個操作的時候,也就是出現(xiàn)了“交錯”。這就意味著操作是由多個步驟構(gòu)成的,而此時,在這多個步驟的執(zhí)行上出現(xiàn)了疊加。
Counter類對象的操作貌似不可能出現(xiàn)這種“交錯(interleave)”,因為其中的兩個關(guān)于c 的操作都很簡單,只有一條語句。然而,即使是一條語句也是會被虛擬機(jī)翻譯成多個步驟的。在這里,我們不深究虛擬機(jī)具體上上面的操作翻譯成了什么樣的步驟。只需要知道即使簡單的 c++ 這樣的表達(dá)式也是會被翻譯成三個步驟的:
獲取 c 的當(dāng)前值。對其當(dāng)前值加 1。將增加后的值存儲到 c 中。表達(dá)式 c– 也是會被按照同樣的方式進(jìn)行翻譯,只不過第二步變成了減1,而不是加1。
假定線程 A 中調(diào)用 increment 方法,線程 B 中調(diào)用 decrement 方法,而調(diào)用時間基本上相同。如果 c 的初始值為 0,那么這兩個操作的“交錯”順序可能如下:
線程A:獲取 c 的值。線程B:獲取 c 的值。線程A:對獲取到的值加1;其結(jié)果是1。線程B:對獲取到的值減1;其結(jié)果是-1。線程A:將結(jié)果存儲到 c 中;此時c的值是1。線程B:將結(jié)果存儲到 c 中;此時c的值是-1。這樣線程 A 計算的值就丟失了,也就是被線程 B 的值覆蓋了。上面的這種“交錯”只是其中的一種可能性。在不同的系統(tǒng)環(huán)境中,有可能是 B 線程的結(jié)果丟失了,或者是根本就不會出現(xiàn)錯誤。由于這種“交錯”是不可預(yù)測的,線程間相互干擾造成的 bug 是很難定位和修改的。
內(nèi)存一致性錯誤
介紹了通過共享內(nèi)存出現(xiàn)的不一致的錯誤。
內(nèi)存一致性錯誤(Memory consistency errors)發(fā)生在不同線程對同一數(shù)據(jù)產(chǎn)生不同的“看法”。導(dǎo)致內(nèi)存一致性錯誤的原因很復(fù)雜,超出了本書的描述范圍。慶幸的是,程序員并不需要知道出現(xiàn)這些原因的細(xì)節(jié)。我們需要的是一種可以避免這種錯誤的方法。
避免出現(xiàn)內(nèi)存一致性錯誤的關(guān)鍵在于理解 happens-before 關(guān)系。這種關(guān)系是一種簡單的方法,能夠確保一條語句對內(nèi)存的寫操作對于其它特定的語句都是可見的。為了理解這點,我們可以考慮如下的示例。假定定義了一個簡單的 int 類型的字段并對其進(jìn)行了初始化:
int counter = 0;該字段由兩個線程共享:A 和 B。假定線程 A 對 counter 進(jìn)行了自增操作:
counter++;然后,線程 B 打印 counter 的值:
System.out.println(counter);如果以上兩條語句是在同一個線程中執(zhí)行的,那么輸出的結(jié)果自然是1。但是如果這兩條語句是在兩個不同的線程中,那么輸出的結(jié)構(gòu)有可能是0。這是因為沒有保證線程 A 對 counter 的修改對線程 B 來說是可見的。除非程序員在這兩條語句間建立了一定的 happens-before 關(guān)系。
我們可以采取多種方式建立這種 happens-before 關(guān)系。使用同步就是其中之一,這點我們將會在下面的小節(jié)中看到。
到目前為止,我們已經(jīng)看到了兩種建立這種 happens-before 的方式:
當(dāng)一條語句中調(diào)用了 Thread.start 方法,那么每一條和該語句已經(jīng)建立了 happens-before 的語句都和新線程中的每一條語句有著這種 happens-before。引入并創(chuàng)建這個新線程的代碼產(chǎn)生的結(jié)果對該新線程來說都是可見的。當(dāng)一個線程終止了并導(dǎo)致另外的線程中調(diào)用 Thread.join 的語句返回,那么此時這個終止了的線程中執(zhí)行了的所有語句都與隨后的 join 語句隨后的所有語句建立了這種 happens-before 。也就是說終止了的線程中的代碼效果對調(diào)用 join 方法的線程來說是可見。關(guān)于哪些操作可以建立這種 happens-before,更多的信息請參閱“java.util.concurrent 包的概要說明”。
同步方法
描述了一個簡單的做法,可以有效防止線程干擾和內(nèi)存一致性錯誤。
Java 編程語言中提供了兩種基本的同步用語:同步方法(synchronized methods)和同步語句(synchronized statements)。同步語句相對而言更為復(fù)雜一些,我們將在下一小節(jié)中進(jìn)行描述。本節(jié)重點討論同步方法。
我們只需要在聲明方法的時候增加關(guān)鍵字 synchronized 即可:
public class SynchronizedCounter { private int c = 0; public synchronized void increment() { c++; } public synchronized void decrement() { c--; } public synchronized int value() { return c; }}如果 count 是 SynchronizedCounter 類的實例,設(shè)置其方法為同步方法將有兩個效果:
首先,不可能出現(xiàn)對同一對象的同步方法的兩個調(diào)用的“交錯”。當(dāng)一個線程在執(zhí)行一個對象的同步方式的時候,其他所有的調(diào)用該對象的同步方法的線程都會被掛起,直到第一個線程對該對象操作完畢。其次,當(dāng)一個同步方法退出時,會自動與該對象的同步方法的后續(xù)調(diào)用建立 happens-before 關(guān)系。這就確保了對該對象的修改對其他線程是可見的。注意:構(gòu)造函數(shù)不能是 synchronized ——在構(gòu)造函數(shù)前使用 synchronized 關(guān)鍵字將導(dǎo)致語義錯誤。同步構(gòu)造函數(shù)是沒有意義的。這是因為只有創(chuàng)建該對象的線程才能調(diào)用其構(gòu)造函數(shù)。
警告:在創(chuàng)建多個線程共享的對象時,要特別小心對該對象的引用不能過早地“泄露”。例如,假定我們想要維護(hù)一個保存類的所有實例的列表 instances。我們可能會在構(gòu)造函數(shù)中這樣寫到:
instances.add(this);但是,其他線程可會在該對象的構(gòu)造完成之前就訪問該對象。
同步方法是一種簡單的可以避免線程相互干擾和內(nèi)存一致性錯誤的策略:如果一個對象對多個線程都是可見的,那么所有對該對象的變量的讀寫都應(yīng)該是通過同步方法完成的(一個例外就是 final 字段,他在對象創(chuàng)建完成后是不能被修改的,因此,在對象創(chuàng)建完畢后,可以通過非同步的方法對其進(jìn)行安全的讀取)。這種策略是有效的,但是可能導(dǎo)致“活躍度(liveness)”問題。這點我們會在本課程的后面進(jìn)行描述。
內(nèi)部鎖和同步
描述了一個更通用的同步方法,并介紹了同步是如何基于內(nèi)部鎖的。
同步是構(gòu)建在被稱為“內(nèi)部鎖(intrinsic lock)”或者是“監(jiān)視鎖(monitor lock)”的內(nèi)部實體上的。(在 API 中通常被稱為是“監(jiān)視器(monitor)”。)內(nèi)部鎖在兩個方面都扮演著重要的角色:保證對對象狀態(tài)訪問的排他性和建立也對象可見性相關(guān)的重要的“ happens-before。
每一個對象都有一個與之相關(guān)聯(lián)動的內(nèi)部鎖。按照傳統(tǒng)的做法,當(dāng)一個線程需要對一個對象的字段進(jìn)行排他性訪問并保持訪問的一致性時,他必須在訪問前先獲取該對象的內(nèi)部鎖,然后才能訪問之,最后釋放該內(nèi)部鎖。在線程獲取對象的內(nèi)部鎖到釋放對象的內(nèi)部鎖的這段時間,我們說該線程擁有該對象的內(nèi)部鎖。只要有一個線程已經(jīng)擁有了一個內(nèi)部鎖,其他線程就不能再擁有該鎖了。其他線程將會在試圖獲取該鎖的時候被阻塞了。
當(dāng)一個線程釋放了一個內(nèi)部鎖,那么就會建立起該動作和后續(xù)獲取該鎖之間的 happens-before 關(guān)系。
同步方法中的鎖
當(dāng)一個線程調(diào)用一個同步方法的時候,他就自動地獲得了該方法所屬對象的內(nèi)部鎖,并在方法返回的時候釋放該鎖。即使是由于出現(xiàn)了沒有被捕獲的異常而導(dǎo)致方法返回,該鎖也會被釋放。
我們可能會感到疑惑:當(dāng)調(diào)用一個靜態(tài)的同步方法的時候會怎樣了,靜態(tài)方法是和類相關(guān)的,而不是和對象相關(guān)的。在這種情況下,線程獲取的是該類的類對象的內(nèi)部鎖。這樣對于靜態(tài)字段的方法是通過一個和類的實例的鎖相區(qū)分的另外的鎖來進(jìn)行的。
同步語句
另外一種創(chuàng)建同步代碼的方式就是使用同步語句。和同步方法不同,使用同步語句是必須指明是要使用哪個對象的內(nèi)部鎖:
public void addName(String name) { synchronized(this) { lastName = name; nameCount++; } nameList.add(name);}在上面的示例中,方法 addName 需要對 lastName 和 nameCount 的修改進(jìn)行同步,還要避免同步調(diào)用其他對象的方法(在同步代碼段中調(diào)用其他對象的方法可能導(dǎo)致“活躍度(Liveness)”中描述的問題)。如果沒有使用同步語句,那么將不得不使用一個單獨的,未同步的方法來完成對 nameList.add 的調(diào)用。
在改善并發(fā)性時,巧妙地使用同步語句能起到很大的幫助作用。例如,我們假定類 MsLunch 有兩個實例字段,c1 和 c2,這兩個變量絕不會一起使用。所有對這兩個變量的更新都需要進(jìn)行同步。但是沒有理由阻止對 c1 的更新和對 c2 的更新出現(xiàn)交錯——這樣做會創(chuàng)建不必要的阻塞,進(jìn)而降低并發(fā)性。此時,我們沒有使用同步方法或者使用和this 相關(guān)的鎖,而是創(chuàng)建了兩個單獨的對象來提供鎖。
public class MsLunch { private long c1 = 0; private long c2 = 0; private Object lock1 = new Object(); private Object lock2 = new Object(); public void inc1() { synchronized(lock1) { c1++; } } public void inc2() { synchronized(lock2) { c2++; } }}采用這種方式時需要特別的小心。我們必須絕對確保相關(guān)字段的訪問交錯是完全安全的。
重入同步(Reentrant Synchronization)
回憶前面提到的:線程不能獲取已經(jīng)被別的線程獲取的鎖。但是線程可以獲取自身已經(jīng)擁有的鎖。允許一個線程能重復(fù)獲得同一個鎖就稱為重入同步(reentrant synchronization)。它是這樣的一種情況:在同步代碼中直接或者間接地調(diào)用了還有同步代碼的方法,兩個同步代碼段中使用的是同一個鎖。如果沒有重入同步,在編寫同步代碼時需要額外的小心,以避免線程將自己阻塞。
原子訪問
介紹了不會被其他線程干擾的做法的總體思路。
在編程中,原子性動作就是指一次性有效完成的動作。原子性動作是不能在中間停止的:要么一次性完全執(zhí)行完畢,要么就不執(zhí)行。在動作沒有執(zhí)行完畢之前,是不會產(chǎn)生可見結(jié)果的。
通過前面的示例,我們已經(jīng)發(fā)現(xiàn)了諸如 c++ 這樣的自增表達(dá)式并不屬于原子操作。即使是非常簡單的表達(dá)式也包含了復(fù)雜的動作,這些動作可以被解釋成許多別的動作。然而,的確存在一些原子操作的:
對幾乎所有的原生數(shù)據(jù)類型變量(除了 long he double)的讀寫以及引用變量的讀寫都是原子的。對所有聲明為 Volatile 的變量的讀寫都是原子的,包括 long 和 double 類型。原子性動作是不會出現(xiàn)交錯的,因此,使用這些原子性動作時不用考慮線程間的干擾。然而,這并不意味著可以移除對原子操作的同步。因為內(nèi)存一致性錯誤還是有可能出現(xiàn)的。使用 volatile 變量可以減少內(nèi)存一致性錯誤的風(fēng)險,因為任何對 volatile 變 量的寫操作都和后續(xù)對該變量的讀操作建立了 happens-before 關(guān)系。這就意味著對 volatile 類型變量的修改對于別的線程來說是可見的。更重要的是,這意味著當(dāng)一個線程讀取一個 volatile 類型的變量時,他看到的不僅僅是對該變量的最后一次修改,還看到了導(dǎo)致這種修改的代碼帶來的其他影響。
使用簡單的原子變量訪問比通過同步代碼來訪問變量更高效,但是需要程序員的更多細(xì)心考慮,以避免內(nèi)存一致性錯誤。這種額外的付出是否值得完全取決于應(yīng)用程序的大小和復(fù)雜度。
活躍度(Liveness)
一個并行應(yīng)用程序的及時執(zhí)行能力被稱為它的活躍度(liveness)。本節(jié)將介紹最常見的一種活躍度的問題——死鎖,以及另外兩個活躍度的問題——饑餓和活鎖。
死鎖(Deadlock)
死鎖是指兩個或兩個以上的線程永遠(yuǎn)被阻塞,一直等待對方的資源。
下面是一個例子。
Alphonse 和 Gaston 是朋友,都很有礼貌。礼貌的一個嚴(yán)格的規(guī)則是,當(dāng)你給一個朋友鞠躬時,你必須保持鞠躬,直到你的朋友鞠躬回給你。不幸的是,這條規(guī)則有個缺陷,那就是如果兩個朋友同一時間向?qū)Ψ骄瞎蔷陀肋h(yuǎn)不會完了。這個示例應(yīng)用程序中,死鎖模型是這樣的:
public class Deadlock { static class Friend { private final String name; public Friend(String name) { this.name = name; } public String getName() { return this.name; } public synchronized void bow(Friend bower) { System.out.format("%s: %s" + " has bowed to me!%n", this.name, bower.getName()); bower.bowBack(this); } public synchronized void bowBack(Friend bower) { System.out.format("%s: %s" + " has bowed back to me!%n", this.name, bower.getName()); } } public static void main(String[] args) { final Friend alphonse = new Friend("Alphonse"); final Friend gaston = new Friend("Gaston"); new Thread(new Runnable() { public void run() { alphonse.bow(gaston); } }).start(); new Thread(new Runnable() { public void run() { gaston.bow(alphonse); } }).start(); }}當(dāng)他們嘗試調(diào)用 bowBack 兩個線程將被阻塞。無論是哪個線程永遠(yuǎn)不會結(jié)束,因為每個線程都在等待對方鞠躬。這就是死鎖了。
饑餓和活鎖(Starvation and Livelock)
饑餓和活鎖雖比死鎖問題稍微不常見點,但這些是在并發(fā)軟件種每一個設(shè)計師仍然可能會遇到的問題。
饑餓(Starvation)
饑餓描述了這樣一個情況,一個線程不能獲得定期訪問共享資源,于是無法繼續(xù)執(zhí)行。這種情況一般出現(xiàn)在共享資源被某些“貪婪”線程占用,而導(dǎo)致資源長時間不被其他線程可用。例如,假設(shè)一個對象提供一個同步的方法,往往需要很長時間返回。如果一個線程頻繁調(diào)用該方法,其他線程若也需要頻繁的同步訪問同一個對象通常會被阻塞。
活鎖(Livelock)
一個線程常常處于響應(yīng)另一個線程的動作,如果其他線程也常常處于該線程的動作,那么就可能出現(xiàn)活鎖。與死鎖、活鎖的線程一樣,程序無法進(jìn)一步執(zhí)行。然而,線程是不會阻塞的,他們只是會忙于應(yīng)對彼此的恢復(fù)工作。現(xiàn)實種的例子是,兩人面對面試圖通過一條走廊: Alphonse 移動到他的左則讓路給 Gaston ,而 Gaston 移動到他的右側(cè)想讓 Alphonse 過去,兩個人同時讓路,但其實兩人都擋住了對方?jīng)]辦法過去,他們?nèi)匀槐舜俗枞?/p>
Guarded Blocks
多線程之間經(jīng)常需要協(xié)同工作,最常見的方式是使用 Guarded Blocks,它循環(huán)檢查一個條件(通常初始值為 true),直到條件發(fā)生變化才跳出循環(huán)繼續(xù)執(zhí)行。在使用 Guarded Blocks 時有以下幾個步驟需要注意:
假設(shè) guardedJoy 方法必須要等待另一線程為共享變量 joy 設(shè)值才能繼續(xù)執(zhí)行。那么理論上可以用一個簡單的條件循環(huán)來實現(xiàn),但在等待過程中 guardedJoy 方法不停的檢查循環(huán)條件實際上是一種資源浪費。
public void guardedJoy() { // Simple loop guard. Wastes // processor time. Don't do this! while(!joy) {} System.out.println("Joy has been achieved!");}更加高效的保護(hù)方法是調(diào)用 Object.wait 將當(dāng)前線程掛起,直到有另一線程發(fā)起事件通知(盡管通知的事件不一定是當(dāng)前線程等待的事件)。
public synchronized void guardedJoy() { // This guard only loops once for each special event, which may not // be the event we're waiting for. while(!joy) { try { wait(); } catch (InterruptedException e) {} } System.out.println("Joy and efficiency have been achieved!");}注意:一定要在循環(huán)里面調(diào)用 wait 方法,不要想當(dāng)然的認(rèn)為線程喚醒后循環(huán)條件一定發(fā)生了改變。
和其他可以暫停線程執(zhí)行的方法一樣,wait 方法會拋出 InterruptedException,在上面的例子中,因為我們關(guān)心的是 joy 的值,所以忽略了 InterruptedException。
為什么 guardedJoy 是 synchronized 的?假設(shè) d 是用來調(diào)用 wait 的對象,當(dāng)一個線程調(diào)用 d.wait,它必須要擁有 d的內(nèi)部鎖(否則會拋出異常),獲得 d 的內(nèi)部鎖的最簡單方法是在一個 synchronized 方法里面調(diào)用 wait。
當(dāng)一個線程調(diào)用 wait 方法時,它釋放鎖并掛起。然后另一個線程請求并獲得這個鎖并調(diào)用 Object.notifyAll 通知所有等待該鎖的線程。
public synchronized notifyJoy() { joy = true; notifyAll();}當(dāng)?shù)诙€線程釋放這個該鎖后,第一個線程再次請求該鎖,從 wait 方法返回并繼續(xù)執(zhí)行。
注意:還有另外一個通知方法,notify(),它只會喚醒一個線程。但由于它并不允許指定哪一個線程被喚醒,所以一般只在大規(guī)模并發(fā)應(yīng)用(即系統(tǒng)有大量相似任務(wù)的線程)中使用。因為對于大規(guī)模并發(fā)應(yīng)用,我們其實并不關(guān)心哪一個線程被喚醒。
現(xiàn)在我們使用 Guarded blocks 創(chuàng)建一個生產(chǎn)者/消費者應(yīng)用。這類應(yīng)用需要在兩個線程之間共享數(shù)據(jù):生產(chǎn)者生產(chǎn)數(shù)據(jù),消費者使用數(shù)據(jù)。兩個線程通過共享對象通信。在這里,線程協(xié)同工作的關(guān)鍵是:生產(chǎn)者發(fā)布數(shù)據(jù)之前,消費者不能夠去讀取數(shù)據(jù);消費者沒有讀取舊數(shù)據(jù)前,生產(chǎn)者不能發(fā)布新數(shù)據(jù)。
在下面的例子中,數(shù)據(jù)通過 Drop 對象共享的一系列文本消息:
public class Drop { // Message sent from producer // to consumer. private String message; // True if consumer should wait // for producer to send message, // false if producer should wait for // consumer to retrieve message. private boolean empty = true; public synchronized String take() { // Wait until message is // available. while (empty) { try { wait(); } catch (InterruptedException e) {} } // Toggle status. empty = true; // Notify producer that // status has changed. notifyAll(); return message; } public synchronized void put(String message) { // Wait until message has // been retrieved. while (!empty) { try { wait(); } catch (InterruptedException e) {} } // Toggle status. empty = false; // Store message. this.message = message; // Notify consumer that status // has changed. notifyAll(); }}Producer 是生產(chǎn)者線程,發(fā)送一組消息,字符串 DONE 表示所有消息都已經(jīng)發(fā)送完成。為了模擬現(xiàn)實情況,生產(chǎn)者線程還會在消息發(fā)送時隨機(jī)的暫停。
public class Producer implements Runnable { private Drop drop; public Producer(Drop drop) { this.drop = drop; } public void run() { String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; Random random = new Random(); for (int i = 0; i < importantInfo.length; i++) { drop.put(importantInfo[i]); try { Thread.sleep(random.nextInt(5000)); } catch (InterruptedException e) { } } drop.put("DONE"); }}Consumer 是消費者線程,讀取消息并打印出來,直到讀取到字符串 DONE 為止。消費者線程在消息讀取時也會隨機(jī)的暫停。
public class Consumer implements Runnable { private Drop drop; public Consumer(Drop drop) { this.drop = drop; } public void run() { Random random = new Random(); for (String message = drop.take(); !message.equals("DONE"); message = drop.take()) { System.out.format("MESSAGE RECEIVED: %s%n", message); try { Thread.sleep(random.nextInt(5000)); } catch (InterruptedException e) { } } }}ProducerConsumerExample 是主線程,它啟動生產(chǎn)者線程和消費者線程。
public class ProducerConsumerExample { public static void main(String[] args) { Drop drop = new Drop(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); }}不可變對象(Immutable Objects)
如果一個對象它被構(gòu)造后其,狀態(tài)不能改變,則這個對象被認(rèn)為是不可變的(immutable )。不可變對象的好處是可以創(chuàng)建簡單的、可靠的代碼。
不可變對象在并發(fā)應(yīng)用種特別有用。因為他們不能改變狀態(tài),它們不能被線程干擾所中斷或者被其他線程觀察到內(nèi)部不一致的狀態(tài)。
程序員往往不愿使用不可變對象,因為他們擔(dān)心創(chuàng)建一個新的對象要比更新對象的成本要高。實際上這種開銷常常被過分高估,而且使用不可變對象所帶來的一些效率提升也抵消了這種開銷。例如:使用不可變對象降低了垃圾回收所產(chǎn)生的額外開銷,也減少了用來確保使用可變對象不出現(xiàn)并發(fā)錯誤的一些額外代碼。
接下來看一個可變對象的類,然后轉(zhuǎn)化為一個不可變對象的類。通過這個例子說明轉(zhuǎn)化的原則以及使用不可變對象的好處。
一個同步類的例子
SynchronizedRGB 是表示顏色的類,每一個對象代表一種顏色,使用三個整形數(shù)表示顏色的三基色,字符串表示顏色名稱。
public class SynchronizedRGB { // Values must be between 0 and 255. private int red; private int green; private int blue; private String name; private void check(int red, int green, int blue) { if (red < 0 || red > 255 || green < 0 || green > 255 || blue < 0 || blue > 255) { throw new IllegalArgumentException(); } } public SynchronizedRGB(int red, int green, int blue, String name) { check(red, green, blue); this.red = red; this.green = green; this.blue = blue; this.name = name; } public void set(int red, int green, int blue, String name) { check(red, green, blue); synchronized (this) { this.red = red; this.green = green; this.blue = blue; this.name = name; } } public synchronized int getRGB() { return ((red << 16) | (green << 8) | blue); } public synchronized String getName() { return name; } public synchronized void invert() { red = 255 - red; green = 255 - green; blue = 255 - blue; name = "Inverse of " + name; }}使用 SynchronizedRGB 時需要小心,避免其處于不一致的狀態(tài)。例如一個線程執(zhí)行了以下代碼:
SynchronizedRGB color = new SynchronizedRGB(0, 0, 0, "Pitch Black");...int myColorInt = color.getRGB(); //Statement 1String myColorName = color.getName(); //Statement 2如果有另外一個線程在 Statement 1 之后、Statement 2 之前調(diào)用了 color.set 方法,那么 myColorInt 的值和 myColorName 的值就會不匹配。為了避免出現(xiàn)這樣的結(jié)果,必須要像下面這樣把這兩條語句綁定到一塊執(zhí)行:
synchronized (color) { int myColorInt = color.getRGB(); String myColorName = color.getName();}這種不一致的問題只可能發(fā)生在可變對象上。
定義不可變對象的策略
以下的一些創(chuàng)建不可變對象的簡單策略。并非所有不可變類都完全遵守這些規(guī)則,不過這不是編寫這些類的程序員們粗心大意造成的,很可能的是他們有充分的理由確保這些對象在創(chuàng)建后不會被修改。但這需要非常復(fù)雜細(xì)致的分析,并不適用于初學(xué)者。
不要提供 setter 方法。(包括修改字段的方法和修改字段引用對象的方法)將類的所有字段定義為 final、private 的。不允許子類重寫方法。簡單的辦法是將類聲明為 final,更好的方法是將構(gòu)造函數(shù)聲明為私有的,通過工廠方法創(chuàng)建對象。如果類的字段是對可變對象的引用,不允許修改被引用對象。不提供修改可變對象的方法。不共享可變對象的引用。當(dāng)一個引用被當(dāng)做參數(shù)傳遞給構(gòu)造函數(shù),而這個引用指向的是一個外部的可變對象時,一定不要保存這個引用。如果必須要保存,那么創(chuàng)建可變對象的拷貝,然后保存拷貝對象的引用。同樣如果需要返回內(nèi)部的可變對象時,不要返回可變對象本身,而是返回其拷貝。將這一策略應(yīng)用到 SynchronizedRGB 有以下幾步:
SynchronizedRGB 類有兩個 setter 方法。第一個 set 方法只是簡單的為字段設(shè)值,第二個 invert 方法修改為創(chuàng)建一個新對象,而不是在原有對象上修改。所有的字段都已經(jīng)是私有的,加上 final 即可。將類聲明為 final 的只有一個字段是對象引用,并且被引用的對象也是不可變對象。經(jīng)過以上這些修改后,我們得到了 ImmutableRGB:
public class ImmutableRGB { // Values must be between 0 and 255. final private int red; final private int green; final private int blue; final private String name; private void check(int red, int green, int blue) { if (red < 0 || red > 255 || green < 0 || green > 255 || blue < 0 || blue > 255) { throw new IllegalArgumentException(); } } public ImmutableRGB(int red, int green, int blue, String name) { check(red, green, blue); this.red = red; this.green = green; this.blue = blue; this.name = name; } public int getRGB() { return ((red << 16) | (green << 8) | blue); } public String getName() { return name; } public ImmutableRGB invert() { return new ImmutableRGB(255 - red, 255 - green, 255 - blue, "Inverse of " + name); }}高級并發(fā)對象
目前為止,之前的教程都是重點講述了最初作為 Java 平臺一部分的低級別 API。這些API 對于非常基本的任務(wù)來說已經(jīng)足夠,但是對于更高級的任務(wù)就需要更高級的 API。特別是針對充分利用了當(dāng)今多處理器和多核系統(tǒng)的大規(guī)模并發(fā)應(yīng)用程序。 本章,我們將著眼于 Java 5.0 新增的一些高級并發(fā)特征。大多數(shù)功能已經(jīng)在新的java.util.concurrent 包中實現(xiàn)。Java 集合框架中也定義了新的并發(fā)數(shù)據(jù)結(jié)構(gòu)。
鎖對象
提供了可以簡化許多并發(fā)應(yīng)用的鎖的慣用法。
同步代碼依賴于一種簡單的可重入鎖。這種鎖使用簡單,但也有諸多限制。java.util.concurrent.locks 包提供了更復(fù)雜的鎖。這里會重點關(guān)注其最基本的接口 Lock。 Lock 對象作用非常類似同步代碼使用的內(nèi)部鎖。如同內(nèi)部鎖,每次只有一個線程可以獲得 Lock 對象。通過關(guān)聯(lián) Condition 對象,Lock 對象也支持 wait/notify 機(jī)制。
Lock 對象之于隱式鎖最大的優(yōu)勢在于,它們有能力收回獲得鎖的嘗試。如果當(dāng)前鎖對象不可用,或者鎖請求超時(如果超時時間已指定),tryLock 方法會收回獲取鎖的請求。如果在鎖獲取前,另一個線程發(fā)送了一個中斷,lockInterruptibly 方法也會收回獲取鎖的請求。
讓我們使用 Lock 對象來解決我們在活躍度中見到的死鎖問題。Alphonse 和 Gaston 已經(jīng)把自己訓(xùn)練成能注意到朋友何時要鞠躬。我們通過要求 Friend 對象在雙方鞠躬前必須先獲得鎖來模擬這次改善。下面是改善后模型的源代碼 Safelock :
public class Safelock { static class Friend { private final String name; private final Lock lock = new ReentrantLock(); public Friend(String name) { this.name = name; } public String getName() { return this.name; } public boolean impendingBow(Friend bower) { Boolean myLock = false; Boolean yourLock = false; try { myLock = lock.tryLock(); yourLock = bower.lock.tryLock(); } finally { if (!(myLock && yourLock)) { if (myLock) { lock.unlock(); } if (yourLock) { bower.lock.unlock(); } } } return myLock && yourLock; } public void bow(Friend bower) { if (impendingBow(bower)) { try { System.out.format("%s: %s has" + " bowed to me!%n", this.name, bower.getName()); bower.bowBack(this); } finally { lock.unlock(); bower.lock.unlock(); } } else { System.out.format( "%s: %s started" + " to bow to me, but saw that" + " I was already bowing to" + " him.%n", this.name, bower.getName()); } } public void bowBack(Friend bower) { System.out.format("%s: %s has" + " bowed back to me!%n", this.name, bower.getName()); } } static class BowLoop implements Runnable { private Friend bower; private Friend bowee; public BowLoop(Friend bower, Friend bowee) { this.bower = bower; this.bowee = bowee; } public void run() { Random random = new Random(); for (;;) { try { Thread.sleep(random.nextInt(10)); } catch (InterruptedException e) { } bowee.bow(bower); } } } public static void main(String[] args) { final Friend alphonse = new Friend("Alphonse"); final Friend gaston = new Friend("Gaston"); new Thread(new BowLoop(alphonse, gaston)).start(); new Thread(new BowLoop(gaston, alphonse)).start(); }}執(zhí)行器(Executors)
為加載和管理線程定義了高級 API。Executors 的實現(xiàn)由 java.util.concurrent 包提供,提供了適合大規(guī)模應(yīng)用的線程池管理。
在之前所有的例子中,Thread 對象表示的線程和 Runnable 對象表示的線程所執(zhí)行的任務(wù)之間是緊耦合的。這對于小型應(yīng)用程序來說沒問題,但對于大規(guī)模并發(fā)應(yīng)用來說,合理的做法是將線程的創(chuàng)建與管理和程序的其他部分分離開。封裝這些功能的對象就是執(zhí)行器,接下來的部分將講詳細(xì)描述執(zhí)行器。
執(zhí)行器接口
在 java.util.concurrent 中包括三個執(zhí)行器接口:
Executor,一個運行新任務(wù)的簡單接口。ExecutorService,擴(kuò)展了 Executor 接口。添加了一些用來管理執(zhí)行器生命周期和任務(wù)生命周期的方法。ScheduledExecutorService,擴(kuò)展了 ExecutorService。支持 future 和(或)定期執(zhí)行任務(wù)。通常來說,指向 executor 對象的變量應(yīng)被聲明為以上三種接口之一,而不是具體的實現(xiàn)類
Executor 接口
Executor 接口只有一個 execute 方法,用來替代通常創(chuàng)建(啟動)線程的方法。例如:r 是一個 Runnable 對象,e 是一個 Executor 對象。可以使用
e.execute(r);代替
(new Thread(r)).start();但 execute 方法沒有定義具體的實現(xiàn)方式。對于不同的 Executor 實現(xiàn),execute 方法可能是創(chuàng)建一個新線程并立即啟動,但更有可能是使用已有的工作線程運行r,或者將 r放入到隊列中等待可用的工作線程。(我們將在線程池一節(jié)中描述工作線程。)
ExecutorService 接口
ExecutorService 接口在提供了 execute 方法的同時,新加了更加通用的 submit 方法。submit 方法除了和 execute 方法一樣可以接受 Runnable 對象作為參數(shù),還可以接受 Callable 對象作為參數(shù)。使用 Callable對象可以能使任務(wù)返還執(zhí)行的結(jié)果。通過 submit 方法返回的Future 對象可以讀取 Callable 任務(wù)的執(zhí)行結(jié)果,或是管理 Callable 任務(wù)和 Runnable 任務(wù)的狀態(tài)。 ExecutorService 也提供了批量運行 Callable 任務(wù)的方法。最后,ExecutorService 還提供了一些關(guān)閉執(zhí)行器的方法。如果需要支持即時關(guān)閉,執(zhí)行器所執(zhí)行的任務(wù)需要正確處理中斷。
ScheduledExecutorService 接口
ScheduledExecutorService 擴(kuò)展 ExecutorService接口并添加了 schedule 方法。調(diào)用 schedule 方法可以在指定的延時后執(zhí)行一個Runnable 或者 Callable 任務(wù)。ScheduledExecutorService 接口還定義了按照指定時間間隔定期執(zhí)行任務(wù)的 scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法。
線程池
線程池是最常見的一種執(zhí)行器的實現(xiàn)。
在 java.util.concurrent 包中多數(shù)的執(zhí)行器實現(xiàn)都使用了由工作線程組成的線程池,工作線程獨立于所它所執(zhí)行的 Runnable 任務(wù)和 Callable 任務(wù),并且常用來執(zhí)行多個任務(wù)。
使用工作線程可以使創(chuàng)建線程的開銷最小化。在大規(guī)模并發(fā)應(yīng)用中,創(chuàng)建大量的 Thread 對象會占用占用大量系統(tǒng)內(nèi)存,分配和回收這些對象會產(chǎn)生很大的開銷。
一種最常見的線程池是固定大小的線程池。這種線程池始終有一定數(shù)量的線程在運行,如果一個線程由于某種原因終止運行了,線程池會自動創(chuàng)建一個新的線程來代替它。需要執(zhí)行的任務(wù)通過一個內(nèi)部隊列提交給線程,當(dāng)沒有更多的工作線程可以用來執(zhí)行任務(wù)時,隊列保存額外的任務(wù)。
使用固定大小的線程池一個很重要的好處是可以實現(xiàn)優(yōu)雅退化(degrade gracefully)。例如一個 Web 服務(wù)器,每一個 HTTP 請求都是由一個單獨的線程來處理的,如果為每一個 HTTP 都創(chuàng)建一個新線程,那么當(dāng)系統(tǒng)的開銷超出其能力時,會突然地對所有請求都停止響應(yīng)。如果限制 Web 服務(wù)器可以創(chuàng)建的線程數(shù)量,那么它就不必立即處理所有收到的請求,而是在有能力處理請求時才處理。
創(chuàng)建一個使用線程池的執(zhí)行器最簡單的方法是調(diào)用 java.util.concurrent.Executors 的 newFixedThreadPool 方法。Executors 類還提供了下列一下方法:
newCachedThreadPool 方法創(chuàng)建了一個可擴(kuò)展的線程池。適合用來啟動很多短任務(wù)的應(yīng)用程序。newSingleThreadExecutor 方法創(chuàng)建了每次執(zhí)行一個任務(wù)的執(zhí)行器。還有一些 ScheduledExecutorService 執(zhí)行器創(chuàng)建的工廠方法。如果上面的方法都不滿足需要,可以嘗試 java.util.concurrent.ThreadPoolExecutor 或者java.util.concurrent.ScheduledThreadPoolExecutor。
Fork/Join
該框架是 JDK 7 中引入的并發(fā)框架。
fork/join 框架是 ExecutorService 接口的一種具體實現(xiàn),目的是為了幫助你更好地利用多處理器帶來的好處。它是為那些能夠被遞歸地拆解成子任務(wù)的工作類型量身設(shè)計的。其目的在于能夠使用所有可用的運算能力來提升你的應(yīng)用的性能。
類似于 ExecutorService 接口的其他實現(xiàn),fork/join 框架會將任務(wù)分發(fā)給線程池中的工作線程。fork/join 框架的獨特之處在與它使用工作竊取(work-stealing)算法。完成自己的工作而處于空閑的工作線程能夠從其他仍然處于忙碌(busy)狀態(tài)的工作線程處竊取等待執(zhí)行的任務(wù)。
fork/join 框架的核心是 ForkJoinPool 類,它是對 AbstractExecutorService 類的擴(kuò)展。ForkJoinPool 實現(xiàn)了工作竊取算法,并可以執(zhí)行ForkJoinTask 任務(wù)。
基本使用方法
使用 fork/join 框架的第一步是編寫執(zhí)行一部分工作的代碼。你的代碼結(jié)構(gòu)看起來應(yīng)該與下面所示的偽代碼類似:
if (my portion of the work is small enough) do the work directlyelse split my work into two pieces invoke the two pieces and wait for the results翻譯為中文為:
if (當(dāng)前這個任務(wù)工作量足夠小) 直接完成這個任務(wù)else 將這個任務(wù)或這部分工作分解成兩個部分 分別觸發(fā)(invoke)這兩個子任務(wù)的執(zhí)行,并等待結(jié)果你需要將這段代碼包裹在一個 ForkJoinTask 的子類中。不過,通常情況下會使用一種更為具體的的類型,或者是 RecursiveTask(會返回一個結(jié)果),或者是 RecursiveAction。 當(dāng)你的 ForkJoinTask 子類準(zhǔn)備好了,創(chuàng)建一個代表所有需要完成工作的對象,然后將其作為參數(shù)傳遞給一個ForkJoinPool 實例的 invoke() 方法即可。
模糊圖片的例子
想要了解 fork/join 框架的基本工作原理,接下來的這個例子會有所幫助。假設(shè)你想要模糊一張圖片。原始的 source 圖片由一個整數(shù)的數(shù)組表示,每個整數(shù)表示一個像素點的顏色數(shù)值。與 source 圖片相同,模糊之后的 destination 圖片也由一個整數(shù)數(shù)組表示。 對圖片的模糊操作是通過對 source 數(shù)組中的每一個像素點進(jìn)行處理完成的。處理的過程是這樣的:將每個像素點的色值取出,與周圍像素的色值(紅、黃、藍(lán)三個組成部分)放在一起取平均值,得到的結(jié)果被放入 destination 數(shù)組。因為一張圖片會由一個很大的數(shù)組來表示,這個流程會花費一段較長的時間。如果使用 fork/join 框架來實現(xiàn)這個模糊算法,你就能夠借助多處理器系統(tǒng)的并行處理能力。下面是上述算法結(jié)合 fork/join 框架的一種簡單實現(xiàn):
public class ForkBlur extends RecursiveAction { private int[] mSource; private int mStart; private int mLength; private int[] mDestination; // Processing window size; should be odd. private int mBlurWidth = 15; public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // Calculate average. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float)((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float)((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float)((pixel & 0x000000ff) >> 0) / mBlurWidth; } // Reassemble destination pixel. int dpixel = (0xff000000 ) | (((int)rt) << 16) | (((int)gt) << 8) | (((int)bt) << 0); mDestination[index] = dpixel; } } ...接下來你需要實現(xiàn)父類中的 compute() 方法,它會直接執(zhí)行模糊處理,或者將當(dāng)前的工作拆分成兩個更小的任務(wù)。數(shù)組的長度可以作為一個簡單的閥值來判斷任務(wù)是應(yīng)該直接完成還是應(yīng)該被拆分。
protected static int sThreshold = 100000;protected void compute() { if (mLength < sThreshold) { computeDirectly(); return; } int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination));}如果前面這個方法是在一個 RecursiveAction 的子類中,那么設(shè)置任務(wù)在ForkJoinPool 中執(zhí)行就再直觀不過了。通常會包含以下一些步驟:
創(chuàng)建一個表示所有需要完成工作的任務(wù)。// source image pixels are in src // destination image pixels are in dst ForkBlur fb = new ForkBlur(src, 0, src.length, dst);創(chuàng)建將要用來執(zhí)行任務(wù)的 ForkJoinPool。ForkJoinPool pool = new ForkJoinPool();執(zhí)行任務(wù)。pool.invoke(fb);想要瀏覽完成的源代碼,請查看 ForkBlur示例,其中還包含一些創(chuàng)建 destination 圖片文件的額外代碼。
標(biāo)準(zhǔn)實現(xiàn)
除了能夠使用 fork/join 框架來實現(xiàn)能夠在多處理系統(tǒng)中被并行執(zhí)行的定制化算法(如前文中的 ForkBlur.java 例子),在 Java SE 中一些比較常用的功能點也已經(jīng)使用 fork/join 框架來實現(xiàn)了。在 Java SE 8 中,java.util.Arrays 類的一系列parallelSort() 方法就使用了 fork/join 來實現(xiàn)。這些方法與 sort() 方法很類似,但是通過使用 fork/join框 架,借助了并發(fā)來完成相關(guān)工作。在多處理器系統(tǒng)中,對大數(shù)組的并行排序會比串行排序更快。這些方法究竟是如何運用 fork/join 框架并不在本教程的討論范圍內(nèi)。想要了解更多的信息,請參見 Java API 文檔。 其他采用了 fork/join 框架的方法還包括java.util.streams包中的一些方法,此包是作為 Java SE 8 發(fā)行版中 Project Lambda 的一部分。想要了解更多信息,請參見 Lambda 表達(dá)式一節(jié)。
并發(fā)集合
并發(fā)集合簡化了大型數(shù)據(jù)集合管理,且極大的減少了同步的需求。
java.util.concurrent 包囊括了 Java 集合框架的一些附加類。它們也最容易按照集合類所提供的接口來進(jìn)行分類:
BlockingQueue 定義了一個先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),當(dāng)你嘗試往滿隊列中添加元素,或者從空隊列中獲取元素時,將會阻塞或者超時。ConcurrentMap 是 java.util.Map 的子接口,定義了一些有用的原子操作。移除或者替換鍵值對的操作只有當(dāng) key 存在時才能進(jìn)行,而新增操作只有當(dāng) key 不存在時。使這些操作原子化,可以避免同步。ConcurrentMap 的標(biāo)準(zhǔn)實現(xiàn)是 ConcurrentHashMap,它是 HashMap 的并發(fā)模式。ConcurrentNavigableMap 是 ConcurrentMap 的子接口,支持近似匹配。ConcurrentNavigableMap 的標(biāo)準(zhǔn)實現(xiàn)是 ConcurrentSkipListMap,它是 TreeMap 的并發(fā)模式。所有這些集合,通過在集合里新增對象和訪問或移除對象的操作之間,定義一個happens-before 的關(guān)系,來幫助程序員避免內(nèi)存一致性錯誤。
原子變量
java.util.concurrent.atomic 包定義了對單一變量進(jìn)行原子操作的類。所有的類都提供了 get 和 set 方法,可以使用它們像讀寫 volatile 變量一樣讀寫原子類。就是說,同一變量上的一個 set 操作對于任意后續(xù)的 get 操作存在 happens-before 關(guān)系。原子的 compareAndSet 方法也有內(nèi)存一致性特點,就像應(yīng)用到整型原子變量中的簡單原子算法。
為了看看這個包如何使用,讓我們返回到最初用于演示線程干擾的 Counter 類:
class Counter { private int c = 0; public void increment() { c++; } public void decrement() { c--; } public int value() { return c; }}使用同步是一種使 Counter 類變得線程安全的方法,如 SynchronizedCounter:
class SynchronizedCounter { private int c = 0; public synchronized void increment() { c++; } public synchronized void decrement() { c--; } public synchronized int value() { return c; }}對于這個簡單的類,同步是一種可接受的解決方案。但是對于更復(fù)雜的類,我們可能想要避免不必要同步所帶來的活躍度影響。將 int 替換為 AtomicInteger 允許我們在不進(jìn)行同步的情況下阻止線程干擾,如 AtomicCounter:
import java.util.concurrent.atomic.AtomicInteger;class AtomicCounter { private AtomicInteger c = new AtomicInteger(0); public void increment() { c.incrementAndGet(); } public void decrement() { c.decrementAndGet(); } public int value() { return c.get(); }}并發(fā)隨機(jī)數(shù)
并發(fā)隨機(jī)數(shù)(JDK7)提供了高效的多線程生成偽隨機(jī)數(shù)的方法。
在 JDK7 中,java.util.concurrent 包含了一個相當(dāng)便利的類 ThreadLocalRandom,可以在當(dāng)應(yīng)用程序期望在多個線程或 ForkJoinTasks 中使用隨機(jī)數(shù)時使用。
對于并發(fā)訪問,使用 TheadLocalRandom 代替 Math.random() 可以減少競爭,從而獲得更好的性能。
你只需調(diào)用 ThreadLocalRandom.current(), 然后調(diào)用它的其中一個方法去獲取一個隨機(jī)數(shù)即可。下面是一個例子:
int r = ThreadLocalRandom.current() .nextInt(4, 77);學(xué)習(xí)Java的同學(xué)注意了!!! 學(xué)習(xí)過程中遇到什么問題或者想獲取學(xué)習(xí)資源的話,歡迎加入Java學(xué)習(xí)交流群,群號碼:183993990 我們一起學(xué)Java!
新聞熱點
疑難解答
圖片精選