多線程的難點(diǎn)主要就是多線程通信協(xié)作這一塊了,前面筆記二中提到了常見(jiàn)的同步方法,這里主要是進(jìn)行實(shí)例學(xué)習(xí)了,今天總結(jié)了一下3個(gè)實(shí)例:
1、銀行存款與提款多線程實(shí)現(xiàn),使用Lock鎖和條件Condition。 附加 : 用監(jiān)視器進(jìn)行線程間通信
2、生產(chǎn)者消費(fèi)者實(shí)現(xiàn),使用LinkedList自寫緩沖區(qū)。
3、多線程之阻塞隊(duì)列學(xué)習(xí),用阻塞隊(duì)列快速實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型。 附加:用布爾變量關(guān)閉線程
在三種線程同步方法中,我們這里的實(shí)例用Lock鎖來(lái)實(shí)現(xiàn)變量同步,因?yàn)樗容^靈活直觀。
實(shí)現(xiàn)了變量的同步,我們還要讓多個(gè)線程之間進(jìn)行“通話”,就是一個(gè)線程完成了某個(gè)條件之后,告訴其他線程我完成了這個(gè)條件,你們可以行動(dòng)了。下面就是java提供的條件接口Condition定義的同步方法:

很方便的是,java的Lock鎖里面提供了newConditon()方法可以,該方法返回:一個(gè)綁定了lock鎖的Condition實(shí)例,有點(diǎn)抽象,其實(shí)把它看作一個(gè)可以發(fā)信息的鎖就可以了,看后面的代碼,應(yīng)該就能理解了。
1、銀行存款與提款多線程實(shí)現(xiàn)。我們模擬ATM機(jī)器存款與提款,創(chuàng)建一個(gè)賬戶類Account(),該類包含同步方法:
存款方法:deposit()
提款方法:withdraw()
以及一個(gè)普通的查詢余額的方法getbalance().
我們創(chuàng)建兩個(gè)任務(wù)線程,分別調(diào)用兩個(gè)同步方法,進(jìn)行模擬操作,看代碼:
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ThreadCoOperation {PRivate static Account account = new Account(); public static void main(String[] args){//創(chuàng)建線程池ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new DepositTask());executor.execute(new WithdrawTask());}//存錢public static class DepositTask implements Runnable{@Overridepublic void run() {try {while(true){account.deposit((int)(Math.random()*1000)+1);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}}}public static class WithdrawTask implements Runnable{@Overridepublic void run() {try{while(true){account.withdraw((int)(Math.random()*1000)+1);Thread.sleep(500);}} catch (InterruptedException e) {e.printStackTrace();}}}public static class Account{//一個(gè)鎖是一個(gè)Lock接口的實(shí)例 它定義了加鎖和釋放鎖的方法 ReentrantLock是為創(chuàng)建相互排斥的鎖的Lock的具體實(shí)現(xiàn)private static Lock lock = new ReentrantLock();//創(chuàng)建一個(gè)condition,具有發(fā)通知功能的鎖,前提是要實(shí)現(xiàn)了lock接口private static Condition newDeposit = lock.newCondition();private int balance = 0;public int getBalance(){return balance;}public void withdraw(int amount){lock.lock();try {while(balance < amount){System.out.println("/t/t錢不夠,等待存錢");newDeposit.await();}balance -= amount;System.out.println("/t/t取出"+amount+"塊錢/t剩余"+getBalance());} catch (InterruptedException e) {e.printStackTrace();}finally{lock.unlock();}}public void deposit(int amount){lock.lock();try{balance+=amount;System.out.println("存入"+amount+"塊錢");newDeposit.signalAll(); //發(fā)信息喚醒所有的線程}finally{lock.unlock();}}}}
運(yùn)行截圖![C1]I1L)1D0(F)I4MCX3)4$I C1]I1L)1D0(F)I4MCX3)4$I](http://s1.VeVb.com/20150728/5nfkgc0akn505.png)
分析:
1、程序中需要注意的:創(chuàng)建一個(gè)condition,具有發(fā)通知功能的鎖,前提是要實(shí)現(xiàn)了lock接口。
2、while(balance < amount)不能改用if判斷,用if會(huì)使得線程不安全,使用if會(huì)不會(huì)進(jìn)行循環(huán)驗(yàn)證,而while會(huì),我們經(jīng)常看到while(true),但是不會(huì)經(jīng)常看到if(true).
3、調(diào)用了await方法后,要記得使用signalAll()或者signal()將線程喚醒,否則線程永久等待。
最后再來(lái)分析一下這個(gè)類的結(jié)構(gòu),有3個(gè)類,兩個(gè)靜態(tài)任務(wù)類實(shí)現(xiàn)了Runnable接口,是線程類,而另外一個(gè)類則是普通的任務(wù)類,包含了線程類所用到的方法。我們的主類在main方法前面就實(shí)例化一個(gè)Account類,以供線程類調(diào)用該類里面的同步方法。
這種構(gòu)造方式是多線程常用到的一種構(gòu)造方式吧。不難發(fā)現(xiàn)后面要手寫的生產(chǎn)者消費(fèi)者模型也是這樣子構(gòu)造的。這相當(dāng)于是一個(gè)多線程模板。也是我們學(xué)習(xí)這個(gè)例子最重要的收獲吧。
用監(jiān)視器進(jìn)行線程之間的通信
還有一點(diǎn),接口Lock與Condition都是在java5之后出現(xiàn)的,在這之前,線程通信是通過(guò)內(nèi)置的監(jiān)視器(monitor)實(shí)現(xiàn)的。
監(jiān)視器是一個(gè)相互排斥且具有同步能力的對(duì)象,任意對(duì)象都有可能成為一個(gè)monitor。監(jiān)視器是通過(guò)synchronized關(guān)鍵字來(lái)對(duì)自己加鎖(加鎖解鎖是解決線程同步最基本的思想),使用wait()方法時(shí)線程暫停并 等待條件發(fā)生,發(fā)通知?jiǎng)t是通過(guò)notify()和notifyAll()方法。大體的模板是這樣子的:

不難看出await()、signal()、signally()是wait()、notify()、notifyAll()的進(jìn)化形態(tài),所以不建議使用監(jiān)視器。
2、生產(chǎn)者消費(fèi)者實(shí)現(xiàn),使用LinkedList自寫緩沖區(qū)
這個(gè)模型一直很經(jīng)典,學(xué)操作系統(tǒng)的時(shí)候還學(xué)過(guò),記得linux還用PV操作去實(shí)現(xiàn)它,不過(guò)這東西是跨學(xué)科的。
考慮緩存區(qū)buffer的使用者,生產(chǎn)者和消費(fèi)者,他們都能識(shí)別緩沖區(qū)是否滿的,且兩種各只能發(fā)出一種信號(hào):
生產(chǎn)者:它能發(fā)出notEmpty()信號(hào),即緩沖區(qū)非空信號(hào),當(dāng)它看到緩沖區(qū)滿的時(shí)候,它就調(diào)用await等待。
消費(fèi)者:它能發(fā)出notFull()信號(hào),即緩沖區(qū)未滿的信號(hào),當(dāng)它看到緩沖區(qū)空的時(shí)候,它也調(diào)用await等待。
看代碼:
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;//生產(chǎn)者消費(fèi)者public class ConsumerProducer {private static Buffer buffer= new Buffer();public static void main(String[] args){ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new ProducerTask());executor.execute(new ConsumerTask());executor.shutdown();}public static class ProducerTask implements Runnable{@Overridepublic void run() {int i=1;try {while(true){System.out.println("生產(chǎn)者寫入數(shù)據(jù)"+i);buffer.write(i++);Thread.sleep((int)(Math.random()*80));} }catch (InterruptedException e) {e.printStackTrace();}}}public static class ConsumerTask implements Runnable{public void run() {try {while(true){System.out.println("/t/t消費(fèi)讀出數(shù)據(jù)"+buffer.read());Thread.sleep((int)(Math.random()*100));}} catch (InterruptedException e) {e.printStackTrace();}}}public static class Buffer{private static final int CAPACTIY = 4; //緩沖區(qū)容量private java.util.LinkedList<Integer> queue = new java.util.LinkedList<Integer>();private static Lock lock = new ReentrantLock();private static Condition notEmpty = lock.newCondition();private static Condition notFull = lock.newCondition();public void write(int value){lock.lock();try{while(queue.size()==CAPACTIY){System.out.println("緩沖區(qū)爆滿");notFull.await();}queue.offer(value);notEmpty.signalAll(); //通知所有的緩沖區(qū)未空的情況}catch(InterruptedException ex){ex.printStackTrace();}finally{lock.unlock();}}@SuppressWarnings("finally")public int read(){int value = 0;lock.lock();try{while(queue.isEmpty()){System.out.println("/t/t緩沖區(qū)是空的,等待緩沖區(qū)非空的情況");notEmpty.await();}value = queue.remove();notFull.signal();}catch(InterruptedException ex){ex.printStackTrace();}finally{lock.unlock();return value;}}}}
運(yùn)行截圖
程序運(yùn)行正常,不過(guò)稍微延長(zhǎng)一下讀取時(shí)間,就會(huì)出現(xiàn)這樣的情況
程序里面設(shè)置的容量是4,可是這里卻可以存入最多5個(gè)數(shù)據(jù),而且更合理的情況應(yīng)該是初始緩沖區(qū)是空的,后面找了下這個(gè)小bug,原來(lái)是調(diào)用offer()函數(shù)應(yīng)該放在檢測(cè)語(yǔ)句之前,如果希望一開(kāi)始就調(diào)用ConsumerTask,在main方法里面調(diào)換兩者的順序即可。
3、用阻塞隊(duì)列快速實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型java的強(qiáng)大之處是它有著豐富的類庫(kù),我們學(xué)習(xí)java在某種程度上就是學(xué)習(xí)這些類庫(kù)。
阻塞隊(duì)列是這樣的一種隊(duì)列:當(dāng)試圖向一個(gè)滿隊(duì)列里添加元素 或者 從空隊(duì)列里刪除元素時(shí),隊(duì)列會(huì)讓線程自動(dòng)阻塞,且當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)繼續(xù)存儲(chǔ)元素,供喚醒后的線程使用。這應(yīng)該說(shuō)是專門為消費(fèi)者生產(chǎn)者模型而設(shè)計(jì)的一種隊(duì)列吧,它實(shí)現(xiàn)了Queue接口,主要方法是put()和take()方法。

java支持三個(gè)具體的阻塞隊(duì)列ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。都在java.util.concurrent包中。
簡(jiǎn)單描述上面三個(gè)阻塞隊(duì)列:
ArrayBlockingQueue: 該阻塞用數(shù)組實(shí)現(xiàn),按照FIFO,即先進(jìn)先出的原則對(duì)數(shù)據(jù)進(jìn)行排序,和數(shù)組的使用有點(diǎn)相似,它事先需要指定一個(gè)容量,不過(guò)即便隊(duì)列超出這個(gè)容量,也是不會(huì)報(bào)錯(cuò)滴。
LinkeddBlockingQueue:用鏈表實(shí)現(xiàn),默認(rèn)隊(duì)列大小是Integer.MAX_VALUE,也是按照先進(jìn)先出的方法對(duì)數(shù)據(jù)排序,性能可能比ArrayBlockingQueue,有待研究。
PriorityBlockingQueue:用優(yōu)先隊(duì)列實(shí)現(xiàn)的阻塞隊(duì)列,會(huì)對(duì)元素按照大小進(jìn)行排序,也可以創(chuàng)建不受限制的隊(duì)列,put方法永不阻塞。
ok,看代碼:
import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ConsumerProducerUsingBlockQueue {private static ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<Integer>(2);public static void main(String[] args){ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new Consumer());executor.execute(new Producer());try {Thread.sleep(100);executor.shutdownNow(); //暴力關(guān)閉,會(huì)報(bào)錯(cuò),不推薦} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}public static class Consumer implements Runnable{@Overridepublic void run() {try{int i=1;while(true){System.out.println("生成者寫入:"+i);buffer.put(i++);Thread.sleep((int)(Math.random())*1000);}}catch(InterruptedException ex){ex.printStackTrace();}}}public static class Producer implements Runnable{@Overridepublic void run() {try{while(true){System.out.println("/t/t消費(fèi)者取出"+buffer.take());Thread.sleep((int)(Math.random())*10000);}}catch(InterruptedException ex){ex.printStackTrace();}}}}
運(yùn)行截圖:
沒(méi)啥大的問(wèn)題,就是在關(guān)閉線程的時(shí)候太過(guò)暴力了,會(huì)報(bào)錯(cuò),線程里面的每一個(gè)函數(shù)都似乎值得研究,之前想通過(guò)Interrupt暫停,不過(guò)失敗了,就直接使用線程池執(zhí)行器的shoutdownNow方法來(lái)的。后面自己又用了另外一種關(guān)閉線程的方法,見(jiàn)下面代碼
使用LinkedBlockingQueue實(shí)現(xiàn)消費(fèi)者生產(chǎn)者且使用布爾變量控制線程關(guān)閉
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;public class A_Control_stop {private static LinkedBlockingQueue<String> buffer = new LinkedBlockingQueue<String>();public static void main(String[] args){ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new Consumer());executor.execute(new Producer());executor.shutdown();while(!executor.isTerminated()){}System.out.println("所有的的線程都正常結(jié)束");}public static class Consumer implements Runnable{private volatile boolean exit = false;@Overridepublic void run() {try{int i=0;String[] str ={"as","d","sd","ew","sdfg","esfr"};while(!exit){System.out.println("生成者寫入:"+str[i]);buffer.put(str[i++]);Thread.sleep((int)(Math.random())*10);if(5==i){exit=true;}}}catch(InterruptedException ex){ex.printStackTrace();}}}public static class Producer implements Runnable{private volatile boolean exit = false;@Overridepublic void run() {try{int i=0;while(!exit){System.out.println("/t/t消費(fèi)者取出"+buffer.take());i++;Thread.sleep((int)(Math.random())*10);if(5==i){exit=true;}}}catch(InterruptedException ex){ex.printStackTrace();}}}}
截圖
關(guān)于阻塞隊(duì)列,覺(jué)得這篇文章講的不錯(cuò),推薦大家看看 聊聊并發(fā)----Java中的阻塞隊(duì)列
用了幾天,多線程算是學(xué)了點(diǎn)皮毛,附注一下:這幾天文章主要是參考了《java程序語(yǔ)言設(shè)計(jì)進(jìn)階篇第8版》,老外寫的書講的真心不錯(cuò),只不過(guò)現(xiàn)在java都已經(jīng)更新到j(luò)ava8了。在其他一些網(wǎng)站上看到自己的文章,沒(méi)有說(shuō)明轉(zhuǎn)載什么的,估計(jì)是直接“被采集”過(guò)去了。
本文出自于博客園蘭幽,轉(zhuǎn)載請(qǐng)說(shuō)明出處。
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注