熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java高級技術 >> 正文

由生產者/消費者問題看JAVA多線程

2013-11-23 19:52:46  來源: Java高級技術 

  生產者消費者問題是研究多線程程序時繞不開的問題它的描述是有一塊生產者和消費者共享的有界緩沖區生產者往緩沖區放入產品消費者從緩沖區取走產品這個過程可以無休止的執行不能因緩沖區滿生產者放不進產品而終止也不能因緩沖區空消費者無產品可取而終止

  解決生產者消費者問題的方法有兩種一種是采用某種機制保持生產者和消費者之間的同步一種是在生產者和消費者之間建立一個管道前一種有較高的效率並且可控制性較好比較常用後一種由於管道緩沖區不易控制及被傳輸數據對象不易封裝等原因比較少用

  同步問題的核心在於CPU是按時間片輪詢的方式執行程序我們無法知道某一個線程是否被執行是否被搶占是否結束等因此生產者完全可能當緩沖區已滿的時候還在放入產品消費者也完全可能當緩沖區為空時還在取出產品

  現在同步問題的解決方法一般是采用信號或者加鎖機制即生產者線程當緩沖區已滿時放棄自己的執行權進入等待狀態並通知消費者線程執行消費者線程當緩沖區已空時放棄自己的執行權進入等待狀態並通知生產者線程執行這樣一來就保持了線程的同步並避免了線程間互相等待而進入死鎖狀態

  JAVA語言提供了獨立於平台的線程機制保持了write once run anywhere的特色同時也提供了對同步機制的良好支持

  在JAVA中一共有四種方法支持同步其中三個是同步方法一個是管道方法

   方法wait()/notify()

   方法await()/signal()

   阻塞隊列方法BlockingQueue

   管道方法PipedInputStream/PipedOutputStream

  下面我們看各個方法的實現

   方法wait()/notify()

  wait()和notify()是根類Object的兩個方法也就意味著所有的JAVA類都會具有這個兩個方法為什麼會被這樣設計呢?我們可以認為所有的對象默認都具有一個鎖雖然我們看不到也沒有辦法直接操作但它是存在的

  wait()方法表示當緩沖區已滿或空時生產者或消費者線程停止自己的執行放棄鎖使自己處於等待狀態讓另一個線程開始執行

  notify()方法表示當生產者或消費者對緩沖區放入或取出一個產品時向另一個線程發出可執行通知同時放棄鎖使自己處於等待狀態

  下面是一個例子代碼

  import javautilLinkedList;

  public class Sycn{

  private LinkedList<Object> myList =new LinkedList<Object>();

  private int MAX = ;

  public Sycn(){

  }

  public void start(){

  new Producer()start();

  new Consumer()start();

  }

  public static void main(String[] args) throws Exception{

  Sycn s = new Sycn();

  sstart();

  }

  class Producer extends Thread{

  public void run(){

  while(true){

  synchronized(myList){

  try{

  while(myListsize() == MAX){

  Systemoutprintln(warning: its full!);

  myListwait();

  }

  Object o = new Object();

  if(myListadd(o)){

  Systemoutprintln(Producer: + o);

  myListnotify();

  }

  }catch(InterruptedException ie){

  Systemoutprintln(producer is interrupted!);

  }

  }

  }

  }

  }

  class Consumer extends Thread{

  public void run(){

  while(true){

  synchronized(myList){

  try{

  while(myListsize() == ){

  Systemoutprintln(warning: its empty!);

  myListwait();

  }

  Object o = myListremoveLast();

  Systemoutprintln(Consumer: + o);

  myListnotify();

  }catch(InterruptedException ie){

  Systemoutprintln(consumer is interrupted!);

  }

  }

  }

  }

  }

  }

   方法await()/signal()

  在JDK以後JAVA提供了新的更加健壯的線程處理機制包括了同步鎖定線程池等等它們可以實現更小粒度上的控制await()和signal()就是其中用來做同步的兩種方法它們的功能基本上和wait()/notify()相同完全可以取代它們但是它們和新引入的鎖定機制Lock直接掛鉤具有更大的靈活性

  下面是一個例子代碼

  import javautilLinkedList;

  import ncurrentlocks*;

  public class Sycn{

  private LinkedList<Object> myList = new LinkedList<Object>();

  private int MAX = ;

  private final Lock lock = new ReentrantLock();

  private final Condition full = locknewCondition();

  private final Condition empty = locknewCondition();

  public Sycn(){

  }

  public void start(){

  new Producer()start();

  new Consumer()start();

  }

  public static void main(String[] args) throws Exception{

  Sycn s = new Sycn();

  sstart();

  }

  class Producer extends Thread{

  public void run(){

  while(true){

  locklock();

  try{

  while(myListsize() == MAX){

  Systemoutprintln(warning: its full!);

  fullawait();

  }

  Object o = new Object();

  if(myListadd(o)){

  Systemoutprintln(Producer: + o);

  emptysignal();

  }

  }catch(InterruptedException ie){

  Systemoutprintln(producer is interrupted!);

  }finally{

  lockunlock();

  }

  }

  }

  }

  class Consumer extends Thread{

  public void run(){

  while(true){

  locklock();

  try{

  while(myListsize() == ){

  Systemoutprintln(warning: its empty!);

  emptyawait();

  }

  Object o = myListremoveLast();

  Systemoutprintln(Consumer: + o);

  fullsignal();

  }catch(InterruptedException ie){

  Systemoutprintln(consumer is interrupted!);

  }finally{

  lockunlock();

  }

  }

  }

  }

  }

   阻塞隊列方法BlockingQueue

  BlockingQueue也是JDK的一部分它是一個已經在內部實現了同步的隊列實現方式采用的是我們的第種await()/signal()方法它可以在生成對象時指定容量大小

  它用於阻塞操作的是put()和take()方法

  put()方法類似於我們上面的生產者線程容量最大時自動阻塞

  take()方法類似於我們上面的消費者線程容量為自動阻塞

  下面是一個例子代碼

  import ncurrent*;

  public class Sycn{

  private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();

  private int MAX = ;

  public Sycn(){

  }

  public void start(){

  new Producer()start();

  new Consumer()start();

  }

  public static void main(String[] args) throws Exception{

  Sycn s = new Sycn();

  sstart();

  }

  class Producer extends Thread{

  public void run(){

  while(true){

  //synchronized(this){

  try{

  if(queuesize() == MAX)

  Systemoutprintln(warning: its full!);

  Object o = new Object();

  queueput(o);

  Systemoutprintln(Producer: + o);

  }catch(InterruptedException e){

  Systemoutprintln(producer is interrupted!);

  }

  //}

  }

  }

  }

  class Consumer extends Thread{

  public void run(){

  while(true){

  //synchronized(this){

  try{

  if(queuesize() == )

  Systemoutprintln(warning: its empty!);

  Object o = queuetake();

  Systemoutprintln(Consumer: + o);

  }catch(InterruptedException e){

  Systemoutprintln(producer is interrupted!);

  }

  //}

  }

  }

  }

  }

  你發現這個例子中的問題了嗎?

  如果沒有我建議你運行一下這段代碼仔細觀察它的輸出是不是有下面這個樣子的?為什麼會這樣呢?

  …

  warning: its full!

  Producer: javalangobject@ea

  …

  你可能會說這是因為put()和Systemoutprintln()之間沒有同步造成的我也這樣認為我也這樣認為但是你把run()中的synchronized前面的注釋去掉重新編譯運行有改觀嗎?沒有為什麼?

  這是因為當緩沖區已滿生產者在put()操作時put()內部調用了await()方法放棄了線程的執行然後消費者線程執行調用take()方法take()內部調用了signal()方法通知生產者線程可以執行致使在消費者的println()還沒運行的情況下生產者的println()先被執行所以有了上面的輸出run()中的synchronized其實並沒有起什麼作用

  對於BlockingQueue大家可以放心使用這可不是它的問題只是在它和別的對象之間的同步有問題

  對於這種多重嵌套同步的問題以後再談吧歡迎大家討論啊!

   管道方法PipedInputStream/PipedOutputStream

  這個類位於javaio包中是解決同步問題的最簡單的辦法一個線程將數據寫入管道另一個線程從管道讀取數據這樣便構成了一種生產者/消費者的緩沖區編程模式

  下面是一個例子代碼在這個代碼我沒有使用Object對象而是簡單的讀寫字節值這是因為PipedInputStream/PipedOutputStream不允許傳輸對象這是JAVA本身的一個bug具體的大家可以看sun的解釋_bugdo?bug_id=

  import javaio*;

  public class Sycn{

  private PipedOutputStream pos;

  private PipedInputStream pis;

  //private ObjectOutputStream oos;

  //private ObjectInputStream ois;

  public Sycn(){

  try{

  pos = new PipedOutputStream();

  pis = new PipedInputStream(pos);

  //oos = new ObjectOutputStream(pos);

  //ois = new ObjectInputStream(pis);

  }catch(IOException e){

  Systemoutprintln(e);

  }

  }

  public void start(){

  new Producer()start();

  new Consumer()start();

  }

  public static void main(String[] args) throws Exception{

  Sycn s = new Sycn();

  sstart();

  }

  class Producer extends Thread{

  public void run() {

  try{

  while(true){

  int b = (int) (Mathrandom() * );

  Systemoutprintln(Producer: a byte the value is + b);

  poswrite(b);

  posflush();

  //Object o = new MyObject();

  //ooswriteObject(o);

  //oosflush();

  //Systemoutprintln(Producer: + o);

  }

  }catch(Exception e){

  //Systemoutprintln(e);

  eprintStackTrace();

  }finally{

  try{

  posclose();

  pisclose();

  //oosclose();

  //oisclose();

  }catch(IOException e){

  Systemoutprintln(e);

  }

  }

  }

  }

  class Consumer extends Thread{

  public void run(){

  try{

  while(true){

  int b = pisread();

  Systemoutprintln(Consumer: a byte the value is + StringvalueOf(b));

  //Object o = oisreadObject();

  //if(o != null)

  //Systemoutprintln(Consumer: + o);

  }

  }catch(Exception e){

  //Systemoutprintln(e);

  eprintStackTrace();

  }finally{

  try{

  posclose();

  pisclose();

  //oosclose();

  //oisclose();

  }catch(IOException e){

  Systemoutprintln(e);

  }

  }

  }

  }

  //class MyObject implements Serializable {

  //}

  }


From:http://tw.wingwit.com/Article/program/Java/gj/201311/27617.html
    推薦文章
    Copyright © 2005-2013 電腦知識網 Computer Knowledge   All rights reserved.