熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java高級技術 >> 正文

Java多線程(五)之BlockingQueue深入分析

2013-11-23 19:50:37  來源: Java高級技術 

  概述

  BlockingQueue作為線程容器可以為線程同步提供有力的保障

  BlockingQueue定義的常用方法

  BlockingQueue定義的常用方法如下

  拋出異常 特殊值 阻塞 超時

  插入 add(e) offer(e) put(e) offer(e time unit)

  移除 remove() poll() take() poll(time unit)

  檢查 element() peek() 不可用 不可用

  )add(anObject)把anObject加到BlockingQueue裡即如果BlockingQueue可以容納則返回true否則招聘異常

  )offer(anObject)表示如果可能的話將anObject加到BlockingQueue裡即如果BlockingQueue可以容納則返回true否則返回false

  )put(anObject)把anObject加到BlockingQueue裡如果BlockQueue沒有空間則調用此方法的線程被阻斷直到BlockingQueue裡面有空間再繼續

  )poll(time)取走BlockingQueue裡排在首位的對象若不能立即取出則可以等time參數規定的時間取不到時返回null

  )take()取走BlockingQueue裡排在首位的對象若BlockingQueue為空阻斷進入等待狀態直到Blocking有新的對象被加入為止

  其中BlockingQueue 不接受null 元素試圖addput 或offer 一個null 元素時某些實現會拋出NullPointerExceptionnull 被用作指示poll 操作失敗的警戒值

  BlockingQueue的幾個注意點

  【】BlockingQueue 可以是限定容量的它在任意給定時間都可以有一個remainingCapacity超出此容量便無法無阻塞地put 附加元素沒有任何內部容量約束的BlockingQueue 總是報告IntegerMAX_VALUE 的剩余容量

  【】BlockingQueue 實現主要用於生產者使用者隊列但它另外還支持Collection 接口因此舉例來說使用remove(x) 從隊列中移除任意一個元素是有可能的然而這種操作通常不 會有效執行只能有計劃地偶爾使用比如在取消排隊信息時

  【】BlockingQueue 實現是線程安全的所有排隊方法都可以使用內部鎖或其他形式的並發控制來自動達到它們的目的然而大量的 Collection 操作(addAllcontainsAllretainAll 和removeAll)沒有 必要自動執行除非在實現中特別說明因此舉例來說在只添加了c 中的一些元素後addAll(c) 有可能失敗(拋出一個異常)

  【】BlockingQueue 實質上不 支持使用任何一種closeshutdown操作來指示不再添加任何項這種功能的需求和使用有依賴於實現的傾向例如一種常用的策略是對於生產者插入特殊的endofstream 或poison 對象並根據使用者獲取這些對象的時間來對它們進行解釋

  簡要概述BlockingQueue常用的四個實現類

  

  )ArrayBlockingQueue:規定大小的BlockingQueue其構造函數必須帶一個int參數來指明其大小其所含的對象是以FIFO(先入先出)順序排序的

  

  )LinkedBlockingQueue:大小不定的BlockingQueue若其構造函數帶一個規定大小的參數生成的BlockingQueue有大小限制若不帶大小參數所生成的BlockingQueue的大小由IntegerMAX_VALUE來決定其所含的對象是以FIFO(先入先出)順序排序的

  )PriorityBlockingQueue:類似於LinkedBlockQueue但其所含對象的排序不是FIFO而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序

  )SynchronousQueue:特殊的BlockingQueue對其的操作必須是放和取交替完成的

  其中LinkedBlockingQueue和ArrayBlockingQueue比較起來它們背後所用的數據結構不一樣導致LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue但在線程數量很大時其性能的可預見性低於ArrayBlockingQueue

  具體BlockingQueue的實現類的內部細節

  有耐心的同學請看具體實現類細節

  ArrayBlockingQueue

  ArrayBlockingQueue是一個由數組支持的有界阻塞隊列此隊列按 FIFO(先進先出)原則對元素進行排序隊列的頭部 是在隊列中存在時間最長的元素隊列的尾部 是在隊列中存在時間最短的元素新元素插入到隊列的尾部隊列檢索操作則是從隊列頭部開始獲得元素

  這是一個典型的有界緩存區固定大小的數組在其中保持生產者插入的元素和使用者提取的元素一旦創建了這樣的緩存區就不能再增加其容量試圖向已滿隊列中放入元素會導致放入操作受阻塞試圖從空隊列中檢索元素將導致類似阻塞

  ArrayBlockingQueue創建的時候需要指定容量capacity(可以存儲的最大的元素個數因為它不會自動擴容)以及是否為公平鎖(fair參數)

  在創建ArrayBlockingQueue的時候默認創建的是非公平鎖不過我們可以在它的構造函數裡指定這裡調用ReentrantLock的構造函數創建鎖的時候調用了

  public ReentrantLock(boolean fair) {

  sync = (fair)? new FairSync() : new NonfairSync()

  }

  FairSync/ NonfairSync是ReentrantLock的內部類

  線程按順序請求獲得公平鎖而一個非公平鎖可以闖入且當它尚未進入等待隊列就會和等待隊列head結點的線程發生競爭如果鎖的狀態可用請求非公平鎖的線程可在等待隊列中向前跳躍獲得該鎖內部鎖synchronized沒有提供確定的公平性保證

  分三點來講這個類

   添加新元素的方法add/put/offer

   該類的幾個實例變量takeIndex/putIndex/count/

   Condition實現

   添加新元素的方法add/put/offer

  首先談到添加元素的方法首先得分析以下該類同步機制中用到的鎖

  Java代碼

  [java]

  lock = new ReentrantLock(fair)

  notEmpty = locknewCondition()//Condition Variable

  notFull =  locknewCondition()//Condition Variable

  這三個都是該類的實例變量只有一個鎖lock然後lock實例化出兩個ConditionnotEmpty/noFull分別用來協調多線程的讀寫操作

  Java代碼

  [java]

  public boolean offer(E e) {

  if (e == null) throw new NullPointerException()

  final ReentrantLock lock = thislock;//每個對象對應一個顯示的鎖

  locklock()//請求鎖直到獲得鎖(不可以被interrupte)

  try {

  if (count == itemslength)//如果隊列已經滿了

  return false;

  else {

  insert(e)

  return true;

  }

  } finally {

  lockunlock()//

  }

  }

  看insert方法

  private void insert(E x) {

  items[putIndex] = x;

  //增加全局index的值

  /*

  Inc方法體內部

  final int inc(int i) {

  return (++i == itemslength)? : i;

  }

  這裡可以看出ArrayBlockingQueue采用從前到後向內部數組插入的方式插入新元素的如果插完了putIndex可能重新變為(在已經執行了移除操作的前提下否則在之前的判斷中隊列為滿)

  */

  putIndex = inc(putIndex)

  ++count;

  notEmptysignal()//wake up one waiting thread

  }

  Java代碼

  [java]

  public void put(E e) throws InterruptedException {

  if (e == null) throw new NullPointerException()

  final E[] items = ems;

  final ReentrantLock lock = thislock;

  locklockInterruptibly()//請求鎖直到得到鎖或者變為interrupted

  try {

  try {

  while (count == itemslength)//如果滿了當前線程進入noFull對應的等waiting狀態

  notFullawait()

  } catch (InterruptedException ie) {

  notFullsignal() // propagate to noninterrupted thread

  throw ie;

  }

  insert(e)

  } finally {

  lockunlock()

  }

  }

  Java代碼

  [java]

  public boolean offer(E e long timeout TimeUnit unit)

  throws InterruptedException {

  if (e == null) throw new NullPointerException()

  long nanos = unittoNanos(timeout)

  final ReentrantLock lock = thislock;

  locklockInterruptibly()

  try {

  for () {

  if (count != itemslength) {

  insert(e)

  return true;

  }

  if (nanos <=

  return false;

  try {

  //如果沒有被 signal/interruptes需要等待nanos時間才返回

  nanos = notFullawaitNanos(nanos)

  } catch (InterruptedException ie) {

  notFullsignal() // propagate to noninterrupted thread

  throw ie;

  }

  }

  } finally {

  lockunlock()

  }

  }

  Java代碼

  [java]

  public boolean add(E e) {

  return superadd(e)

  }

  父類

  public boolean add(E e) {

  if (offer(e))

  return true;

  else

  throw new IllegalStateException(Queue full

  }

   該類的幾個實例變量takeIndex/putIndex/count

  Java代碼

  [java]

  用三個數字來維護這個隊列中的數據變更

  /** items index for next take poll or remove */

  private int takeIndex;

  /** items index for next put offer or add */

  private int putIndex;

  /** Number of items in the queue */

  private int count;

  提取元素的三個方法take/poll/remove內部都調用了這個方法

  Java代碼

  [java]

  private E extract() {

  final E[] items = ems;

  E x = items[takeIndex];

  items[takeIndex] = null;//移除已經被提取出的元素

  takeIndex = inc(takeIndex)//策略和添加元素時相同

  count;

  notFullsignal()//提醒其他在notFull這個Condition上waiting的線程可以嘗試工作了

  return x;

  }

  從這個方法裡可見tabkeIndex維護一個可以提取/移除元素的索引位置因為takeIndex是從遞增的所以這個類是FIFO隊列

  putIndex維護一個可以插入的元素的位置索引

  count顯然是維護隊列中已經存在的元素總數

   Condition實現

  Condition現在的實現只有ncurrentlocksAbstractQueueSynchoronizer內部的ConditionObject並且通過ReentranLock的newCondition()方法暴露出來這是因為Condition的await()/sinal()一般在locklock()與lockunlock()之間執行當執行conditionawait()方法時它會首先釋放掉本線程持有的鎖然後自己進入等待隊列直到sinal()喚醒後又會重新試圖去拿到鎖拿到後執行await()下的代碼其中釋放當前鎖和得到當前鎖都需要ReentranLock的tryAcquire(int arg)方法來判定並且享受ReentranLock的重進入特性

  Java代碼

  [java]

  public final void await() throws InterruptedException {

  if (Threadinterrupted())

  throw new InterruptedException()

  //加一個新的condition等待節點

  Node node = addConditionWaiter()

  //釋放自己的鎖

  int savedState = fullyRelease(node)

  int interruptMode = ;

  while (!isOnSyncQueue(node)) {

  //如果當前線程 等待狀態時CONDITIONpark住當前線程等待condition的signal來解除

  LockSupportpark(this)

  if ((interruptMode = checkInterruptWhileWaiting(node)) !=

  break;

  }

  if (acquireQueued(node savedState) && interruptMode != THROW_IE)

  interruptMode = REINTERRUPT;

  if (nodenextWaiter != null)

  unlinkCancelledWaiters()

  if (interruptMode !=

  reportInterruptAfterWait(interruptMode)

  }

  SynchronousQueue

  一種阻塞隊列其中每個 put 必須等待一個 take反之亦然同步隊列沒有任何內部容量甚至連一個隊列的容量都沒有不能在同步隊列上進行 peek因為僅在試圖要取得元素時該元素才存在除非另一個線程試圖移除某個元素否則也不能(使用任何方法)添加元素也不能迭代隊列因為其中沒有元素可用於迭代隊列的頭 是嘗試添加到隊列中的首個已排隊線程元素如果沒有已排隊線程則不添加元素並且頭為 null對於其他Collection 方法(例如 contains)SynchronousQueue 作為一個空集合此隊列不允許 null 元素

  同步隊列類似於 CSP 和 Ada 中使用的 rendezvous 信道它非常適合於傳遞性設計在這種設計中在一個線程中運行的對象要將某些信息事件或任務傳遞給在另一個線程中運行的對象它就必須與該對象同步

  對於正在等待的生產者和使用者線程而言此類支持可選的公平排序策略默認情況下不保證這種排序但是使用公平設置為 true 所構造的隊列可保證線程以 FIFO 的順序進行訪問公平通常會降低吞吐量但是可以減小可變性並避免得不到服務

  LinkedBlockingQueue

  一個基於已鏈接節點的范圍任意的 blocking queue此隊列按 FIFO(先進先出)排序元素隊列的頭部 是在隊列中時間最長的元素隊列的尾部 是在隊列中時間最短的元素新元素插入到隊列的尾部並且隊列檢索操作會獲得位於隊列頭部的元素鏈接隊列的吞吐量通常要高於基於數組的隊列但是在大多數並發應用程序中其可預知的性能要低

  單向鏈表結構的隊列如果不指定容量默認為IntegerMAX_VALUE通過putLock和takeLock兩個鎖進行同步兩個鎖分別實例化notFull和notEmpty兩個Condtion用來協調多線程的存取動作其中某些方法(如removetoArraytoStringclear等)的同步需要同時獲得這兩個鎖並且總是先putLocklock緊接著takeLocklock(在同一方法fullyLock中)這樣的順序是為了避免可能出現的死鎖情況(我也想不明白為什麼會是這樣?)

  PriorityBlockingQueue

  一個無界的阻塞隊列它使用與類 PriorityQueue 相同的順序規則並且提供了阻塞檢索的操作雖然此隊列邏輯上是無界的但是由於資源被耗盡所以試圖執行添加操作可能會失敗(導致 OutOfMemoryError)此類不允許使用 null 元素依賴自然順序的優先級隊列也不允許插入不可比較的對象(因為這樣做會拋出ClassCastException)

  看它的三個屬性就基本能看懂這個類了

  Java代碼

  [java]

  private final PriorityQueue q;

  private final ReentrantLock lock = new ReentrantLock(true)

  private final Condition notEmpty = locknewCondition()

  lock說明本類使用一個lock來同步讀寫等操作

  notEmpty協調隊列是否有新元素提供而隊列滿了以後會調用PriorityQueue的grow方法來擴容

  DelayQueue

  Delayed 元素的一個無界阻塞隊列只有在延遲期滿時才能從中提取元素該隊列的頭部 是延遲期滿後保存時間最長的 Delayed 元素如果延遲都還沒有期滿則隊列沒有頭部並且 poll 將返回 null當一個元素的getDelay(TimeUnitNANOSECONDS) 方法返回一個小於或等於零的值時則出現期滿此隊列不允許使用 null 元素

  Delayed接口繼承自Comparable我們插入的E元素都要實現這個接口

  DelayQueue的設計目的間API文檔

  An unbounded blocking queue of Delayed elements in which an element can only be taken when its delay has expired The head of the queue is that Delayed element whose delay expired furthest in the past If no delay has expired there is no head and poll will returnnull Expiration occurs when an elements getDelay(TimeUnitNANOSECONDS) method returns a value less than or equal to zero Even though unexpired elements cannot be removed using take or poll they are otherwise treated as normal elements For example the size method returns the count of both expired and unexpired elements This queue does not permit null elements

  因為DelayQueue構造函數了裡限定死不允許傳入comparator(之前的PriorityBlockingQueue中沒有限定死)即只能在compare方法裡定義優先級的比較規則再看上面這段英文The head of the queue is that Delayed element whose delay expired furthest in the past說明compare方法實現的時候要保證最先加入的元素最早結束延時Expiration occurs when an elements getDelay(TimeUnitNANOSECONDS) method returns a value less than or equal to zero說明getDelay方法的實現必須保證延時到了返回的值變為<=的int

  上面這段英文中還說明了在poll/take的時候隊列中元素會判定這個elment有沒有達到超時時間如果沒有達到poll返回null而take進入等待狀態但是除了這兩個方法隊列中的元素會被當做正常的元素來對待例如size方法返回所有元素的數量而不管它們有沒有達到超時時間而協調的Condition available只對take和poll是有意義的

  另外需要補充的是在ScheduledThreadPoolExecutor中工作隊列類型是它的內部類DelayedWorkQueue而DelayedWorkQueue的Task容器是DelayQueue類型而ScheduledFutureTask作為Delay的實現類作為Runnable的封裝後的Task類也就是說ScheduledThreadPoolExecutor是通過DelayQueue優先級判定規則來執行任務的

  BlockingDque+LinkedBlockingQueue

  BlockingDque為阻塞雙端隊列接口實現類有LinkedBlockingDque雙端隊列特別之處是它首尾都可以操作LinkedBlockingDque不同於LinkedBlockingQueue它只用一個lock來維護讀寫操作並由這個lock實例化出兩個Condition notEmpty及notFull而LinkedBlockingQueue讀和寫分別維護一個lock


From:http://tw.wingwit.com/Article/program/Java/gj/201311/27544.html
    Copyright © 2005-2013 電腦知識網 Computer Knowledge   All rights reserved.