一、概念
生產(chǎn)者與消費(fèi)者問(wèn)題是一個(gè)金典的多線程協(xié)作的問(wèn)題.生產(chǎn)者負(fù)責(zé)生產(chǎn)產(chǎn)品,并將產(chǎn)品存放到倉(cāng)庫(kù);消費(fèi)者從倉(cāng)庫(kù)中獲取產(chǎn)品并消費(fèi)。當(dāng)倉(cāng)庫(kù)滿時(shí),生產(chǎn)者必須停止生產(chǎn),直到倉(cāng)庫(kù)有位置存放產(chǎn)品;當(dāng)倉(cāng)庫(kù)空時(shí),消費(fèi)者必須停止消費(fèi),直到倉(cāng)庫(kù)中有產(chǎn)品。
解決生產(chǎn)者/消費(fèi)者問(wèn)題主要用到如下幾個(gè)技術(shù):1.用線程模擬生產(chǎn)者,在run方法中不斷地往倉(cāng)庫(kù)中存放產(chǎn)品。2.用線程模擬消費(fèi)者,在run方法中不斷地從倉(cāng)庫(kù)中獲取產(chǎn)品。3
. 倉(cāng)庫(kù)類保存產(chǎn)品,當(dāng)產(chǎn)品數(shù)量為0時(shí),調(diào)用wait方法,使得當(dāng)前消費(fèi)者線程進(jìn)入等待狀態(tài),當(dāng)有新產(chǎn)品存入時(shí),調(diào)用notify方法,喚醒等待的消費(fèi)者線程。當(dāng)倉(cāng)庫(kù)滿時(shí),調(diào)用wait方法,使得當(dāng)前生產(chǎn)者線程進(jìn)入等待狀態(tài),當(dāng)有消費(fèi)者獲取產(chǎn)品時(shí),調(diào)用notify方法,喚醒等待的生產(chǎn)者線程。
二、實(shí)例
package book.thread.product;public class Consumer extends Thread{  private Warehouse warehouse;//消費(fèi)者獲取產(chǎn)品的倉(cāng)庫(kù)  private boolean running = false;//是否需要結(jié)束線程的標(biāo)志位  public Consumer(Warehouse warehouse,String name){    super(name);    this.warehouse = warehouse;  }  public void start(){    this.running = true;    super.start();  }  public void run(){    Product product;    try {      while(running){        //從倉(cāng)庫(kù)中獲取產(chǎn)品        product = warehouse.getProduct();        sleep(500);      }    } catch (InterruptedException e) {      e.printStackTrace();    }  }   //停止消費(fèi)者線程  public void stopConsumer(){    synchronized(warehouse){      this.running = false;      warehouse.notifyAll();//通知等待倉(cāng)庫(kù)的線程    }  }  //消費(fèi)者線程是否在運(yùn)行  public boolean isRunning(){    return running;  }} package book.thread.product;public class Producer extends Thread{   private Warehouse warehouse;//生產(chǎn)者存儲(chǔ)產(chǎn)品的倉(cāng)庫(kù)  private static int produceName = 0;//產(chǎn)品的名字  private boolean running = false;//是否需要結(jié)束線程的標(biāo)志位  public Producer(Warehouse warehouse,String name){    super(name);    this.warehouse = warehouse;  }  public void start(){    this.running = true;    super.start();  }  public void run(){    Product product;    //生產(chǎn)并存儲(chǔ)產(chǎn)品    try {    while(running){      product = new Product((++produceName)+"");      this.warehouse.storageProduct(product);      sleep(300);      }    } catch (InterruptedException e) {      e.printStackTrace();    }  }  //停止生產(chǎn)者線程  public void stopProducer(){    synchronized(warehouse){      this.running = false;      //通知等待倉(cāng)庫(kù)的線程      warehouse.notifyAll();    }  }  //生產(chǎn)者線程是否在運(yùn)行  public boolean isRunning(){    return running;  }} package book.thread.product;public class Product {  private String name;//產(chǎn)品名  public Product(String name){    this.name = name;  }  public String toString(){    return "Product-"+name;  }} package book.thread.product;//產(chǎn)品的倉(cāng)庫(kù)類,內(nèi)部采用數(shù)組來(lái)表示循環(huán)隊(duì)列,以存放產(chǎn)品public class Warehouse {  private static int CAPACITY = 11;//倉(cāng)庫(kù)的容量  private Product[] products;//倉(cāng)庫(kù)里的產(chǎn)品  //[front,rear]區(qū)間的產(chǎn)品未被消費(fèi)  private int front = 0;//當(dāng)前倉(cāng)庫(kù)中第一個(gè)未被消費(fèi)的產(chǎn)品的下標(biāo)  private int rear = 0;//倉(cāng)庫(kù)中最后一個(gè)未被消費(fèi)的產(chǎn)品下標(biāo)加1  public Warehouse(){    this.products = new Product[CAPACITY];  }  public Warehouse(int capacity){    this();    if(capacity > 0){      CAPACITY = capacity +1;      this.products = new Product[CAPACITY];    }  }  //從倉(cāng)庫(kù)獲取一個(gè)產(chǎn)品  public Product getProduct() throws InterruptedException{    synchronized(this){      boolean consumerRunning = true;//標(biāo)志消費(fèi)者線程是否還在運(yùn)行      Thread currentThread = Thread.currentThread();//獲取當(dāng)前線程      if(currentThread instanceof Consumer){        consumerRunning = ((Consumer)currentThread).isRunning();      }else{        return null;//非消費(fèi)者不能獲取產(chǎn)品      }      //若消費(fèi)者線程在運(yùn)行中,但倉(cāng)庫(kù)中沒(méi)有產(chǎn)品了,則消費(fèi)者線程繼續(xù)等待      while((front==rear) && consumerRunning){        wait();        consumerRunning = ((Consumer)currentThread).isRunning();      }      //如果消費(fèi)者線程已經(jīng)停止運(yùn)行,則退出該方法,取消獲取產(chǎn)品      if(!consumerRunning){        return null;      }      //獲取當(dāng)前未被消費(fèi)的第一個(gè)產(chǎn)品      Product product = products[front];      System.out.println("Consumer[" + currentThread.getName()+"] getProduct:"+product);      //將當(dāng)前未被消費(fèi)產(chǎn)品的下標(biāo)后移一位,如果到了數(shù)組末尾,則移動(dòng)到首部      front = (front+1+CAPACITY)%CAPACITY;      System.out.println("倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:"+(rear+CAPACITY-front)%CAPACITY);      //通知其他等待線程      notify();      return product;    }  }  //向倉(cāng)庫(kù)存儲(chǔ)一個(gè)產(chǎn)品  public void storageProduct(Product product) throws InterruptedException{  synchronized(this){    boolean producerRunning = true;//標(biāo)志生產(chǎn)者線程是否在運(yùn)行    Thread currentThread = Thread.currentThread();    if(currentThread instanceof Producer){      producerRunning = ((Producer)currentThread).isRunning();    }else{      return;    }    //如果最后一個(gè)未被消費(fèi)的產(chǎn)品與第一個(gè)未被消費(fèi)的產(chǎn)品的下標(biāo)緊挨著,則說(shuō)明沒(méi)有存儲(chǔ)空間了。    //如果沒(méi)有存儲(chǔ)空間了,而生產(chǎn)者線程還在運(yùn)行,則生產(chǎn)者線程等待倉(cāng)庫(kù)釋放產(chǎn)品    while(((rear+1)%CAPACITY == front) && producerRunning){      wait();      producerRunning = ((Producer)currentThread).isRunning();    }    //如果生產(chǎn)線程已經(jīng)停止運(yùn)行了,則停止產(chǎn)品的存儲(chǔ)    if(!producerRunning){      return;    }    //保存產(chǎn)品到倉(cāng)庫(kù)    products[rear] = product;    System.out.println("Producer[" + Thread.currentThread().getName()+"] storageProduct:" + product);    //將rear下標(biāo)循環(huán)后移一位    rear = (rear + 1)%CAPACITY;    System.out.println("倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:"+(rear + CAPACITY -front)%CAPACITY);    notify();    }  }} package book.thread.product;public class TestProduct {  public static void main(String[] args) {    Warehouse warehouse = new Warehouse(10);//建立一個(gè)倉(cāng)庫(kù),容量為10    //建立生產(chǎn)者線程和消費(fèi)者    Producer producers1 = new Producer(warehouse,"producer-1");    Producer producers2 = new Producer(warehouse,"producer-2");    Producer producers3 = new Producer(warehouse,"producer-3");    Consumer consumer1 = new Consumer(warehouse,"consumer-1");    Consumer consumer2 = new Consumer(warehouse,"consumer-2");    Consumer consumer3 = new Consumer(warehouse,"consumer-3");    Consumer consumer4 = new Consumer(warehouse,"consumer-4");    //啟動(dòng)生產(chǎn)者線程和消費(fèi)者線程    producers1.start();    producers2.start();    consumer1.start();    producers3.start();    consumer2.start();    consumer3.start();    consumer4.start();    //讓生產(chǎn)者/消費(fèi)者程序運(yùn)行1600ms    try {      Thread.sleep(1600);    } catch (InterruptedException e) {      e.printStackTrace();    }    //停止消費(fèi)者線程    producers1.stopProducer();    consumer1.stopConsumer();    producers2.stopProducer();    consumer2.stopConsumer();    producers3.stopProducer();    consumer3.stopConsumer();    consumer4.stopConsumer();  }}輸出結(jié)果:
Producer[producer-1] storageProduct:Product-1倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Consumer[consumer-2] getProduct:Product-1倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0Producer[producer-3] storageProduct:Product-3倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-2] storageProduct:Product-2倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Consumer[consumer-3] getProduct:Product-3倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Consumer[consumer-1] getProduct:Product-2倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0Producer[producer-1] storageProduct:Product-4倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Consumer[consumer-4] getProduct:Product-4倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0Producer[producer-3] storageProduct:Product-6倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-2] storageProduct:Product-5倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Consumer[consumer-1] getProduct:Product-6倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Consumer[consumer-2] getProduct:Product-5倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0Producer[producer-1] storageProduct:Product-7倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Consumer[consumer-3] getProduct:Product-7倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0Producer[producer-3] storageProduct:Product-8倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-2] storageProduct:Product-9倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Consumer[consumer-4] getProduct:Product-8倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-1] storageProduct:Product-10倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Producer[producer-3] storageProduct:Product-11倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3Producer[producer-2] storageProduct:Product-12倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:4Consumer[consumer-1] getProduct:Product-9倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3Consumer[consumer-2] getProduct:Product-10倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Consumer[consumer-3] getProduct:Product-11倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-3] storageProduct:Product-13倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Producer[producer-1] storageProduct:Product-14倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3Producer[producer-2] storageProduct:Product-15倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:4Consumer[consumer-4] getProduct:Product-12倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3Consumer[consumer-1] getProduct:Product-13倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Consumer[consumer-2] getProduct:Product-14倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-1] storageProduct:Product-16倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Producer[producer-3] storageProduct:Product-17倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3Producer[producer-2] storageProduct:Product-18倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:4
分析:在main方法中建立了一個(gè)產(chǎn)品倉(cāng)庫(kù),并未該倉(cāng)庫(kù)關(guān)聯(lián)了3個(gè)生產(chǎn)者線程和4個(gè)消費(fèi)者線程,啟動(dòng)這些線程,使生產(chǎn) 者/消費(fèi)者模型運(yùn)作起來(lái),當(dāng)程序運(yùn)行1600ms時(shí),所有的生產(chǎn)者停止生產(chǎn)產(chǎn)品,消費(fèi)者停止消費(fèi)產(chǎn)品。
生產(chǎn)者線程Product在run方法中沒(méi)300ms便生產(chǎn)一個(gè)產(chǎn)品,并存入倉(cāng)庫(kù);消費(fèi)者線程Consumer在run方法中沒(méi)500ms便從倉(cāng)庫(kù)中取一個(gè)產(chǎn)品。
倉(cāng)庫(kù)類Warehouse負(fù)責(zé)存放產(chǎn)品和發(fā)放產(chǎn)品。storageProduct方法負(fù)責(zé)存儲(chǔ)產(chǎn)品,當(dāng)倉(cāng)庫(kù)滿時(shí),當(dāng)前線程進(jìn)入等待狀態(tài),即如果生產(chǎn)者線程A在調(diào)用storageProduct方法以存儲(chǔ)產(chǎn)品時(shí),發(fā)現(xiàn)倉(cāng)庫(kù)已滿,無(wú)法存儲(chǔ)時(shí),便會(huì)進(jìn)入等待狀態(tài)。當(dāng)存儲(chǔ)產(chǎn)品成功時(shí),調(diào)用notify方法,喚醒等待的消費(fèi)者線程。
getProduct方法負(fù)責(zé)提前產(chǎn)品,當(dāng)倉(cāng)庫(kù)空時(shí),當(dāng)前線程進(jìn)入等待狀態(tài),即如果消費(fèi)者線程B在調(diào)用getProduct方法以獲取產(chǎn)品時(shí),發(fā)現(xiàn)倉(cāng)庫(kù)空了,便會(huì)進(jìn)入等待狀態(tài)。當(dāng)提取產(chǎn)品成功時(shí),調(diào)用notify方法,喚醒等待的生產(chǎn)者線程。
以上這篇淺談java線程中生產(chǎn)者與消費(fèi)者的問(wèn)題就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持武林網(wǎng)。
新聞熱點(diǎn)
疑難解答
圖片精選