熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java核心技術 >> 正文

java阻塞隊列 線程同步合作

2022-06-13   來源: Java核心技術 

  Queue接口與ListSet同一級別都是繼承了Collection接口LinkedList實現了Queue接口Queue接口窄化了對LinkedList的方法的訪問權限(即在方法中的參數類型如果是Queue時就完全只能訪問Queue接口所定義的方法了而不能直接訪問 LinkedList的非Queue的方法)以使得只有恰當的方法才可以使用BlockingQueue 繼承了Queue接口

  隊列是一種數據結構.它有兩個基本操作在隊列尾部加人一個元素和從隊列頭部移除一個元素就是說隊列以一種先進先出的方式管理數據如果你試圖向一個已經滿了的阻塞隊列中添加一個元素或者是從一個空的阻塞隊列中移除一個元索將導致線程阻塞.在多線程進行合作時阻塞隊列是很有用的工具工作者線程可以定期地把中間結果存到阻塞隊列中而其他工作者線線程把中間結果取出並在將來修改它們隊列會自動平衡負載如果第一個線程集運行得比第二個慢則第二個線程集在等待結果時就會阻塞如果第一個線程集運行得快那麼它將等待第二個線程集趕上來下表顯示了jdk中的阻塞隊列的操作

  add        增加一個元索                     如果隊列已滿則拋出一個IIIegaISlabEepeplian異常

  remove   移除並返回隊列頭部的元素    如果隊列為空則拋出一個NoSuchElementException異常

  element  返回隊列頭部的元素             如果隊列為空則拋出一個NoSuchElementException異常

  offer       添加一個元素並返回true       如果隊列已滿則返回false

  poll         移除並返問隊列頭部的元素    如果隊列為空則返回null

  peek       返回隊列頭部的元素             如果隊列為空則返回null

  put         添加一個元素                      如果隊列滿則阻塞

  take        移除並返回隊列頭部的元素     如果隊列為空則阻塞

  removeelementoffer pollpeek 其實是屬於Queue接口

  阻塞隊列的操作可以根據它們的響應方式分為以下三類aadremovee和element操作在你試圖為一個已滿的隊列增加元素或從空隊列取得元素時拋出異常當然在多線程程序中隊列在任何時間都可能變成滿的或空的所以你可能想使用offerpollpeek方法這些方法在無法完成任務時只是給出一個出錯示而不會拋出異常

  注意poll和peek方法出錯進返回null因此向隊列中插入null值是不合法的

  還有帶超時的offer和poll方法變種例如下面的調用

  boolean success = qoffer(xTimeUnitMILLISECONDS);

  嘗試在毫秒內向隊列尾部插入一個元素如果成功立即返回true否則當到達超時進返回false同樣地調用

  Object head = qpoll( TimeUnitMILLISECONDS);

  如果在毫秒內成功地移除了隊列頭元素則立即返回頭元素否則在到達超時時返回null

  最後我們有阻塞操作put和takeput方法在隊列滿時阻塞take方法在隊列空時阻塞

  ncurrent包提供了阻塞隊列的個變種默認情況下LinkedBlockingQueue的容量是沒有上限的(說的不准確在不指定時容量為IntegerMAX_VALUE不要然的話在put時怎麼會受阻呢)但是也可以選擇指定其最大容量它是基於鏈表的隊列此隊列按 FIFO(先進先出)排序元素

  ArrayBlockingQueue在構造時需要指定容量並可以選擇是否需要公平性如果公平參數被設置true等待時間最長的線程會優先得到處理(其實就是通過將ReentrantLock設置為true來達到這種公平性的即等待時間最長的線程會先操作)通常公平性會使你在性能上付出代價只有在的確非常需要的時候再使用它它是基於數組的阻塞循環隊列此隊列按 FIFO(先進先出)原則對元素進行排序

  PriorityBlockingQueue是一個帶優先級的隊列而不是先進先出隊列元素按優先級順序被移除該隊列也沒有上限(看了一下源碼PriorityBlockingQueue是對PriorityQueue的再次包裝是基於堆數據結構的而PriorityQueue是沒有容量限制的與ArrayList一樣所以在優先阻塞隊列上put時是不會受阻的雖然此隊列邏輯上是無界的但是由於資源被耗盡所以試圖執行添加操作可能會導致 OutOfMemoryError)但是如果隊列為空那麼取元素的操作take就會阻塞所以它的檢索操作take是受阻的另外往入該隊列中的元素要具有比較能力

  最後DelayQueue(基於PriorityQueue來實現的)是一個存放Delayed 元素的無界阻塞隊列只有在延遲期滿時才能從中提取元素該隊列的頭部是延遲期滿後保存時間最長的 Delayed 元素如果延遲都還沒有期滿則隊列沒有頭部並且poll將返回null當一個元素的 getDelay(TimeUnitNANOSECONDS) 方法返回一個小於或等於零的值時則出現期滿poll就以移除這個元素了此隊列不允許使用 null 元素 下面是延遲接口

  Java代碼

  public interface Delayed extends Comparable<Delayed> {

  long getDelay(TimeUnit unit);

  }

  public interface Delayed extends Comparable<Delayed> {

  long getDelay(TimeUnit unit);

  }

  放入DelayQueue的元素還將要實現compareTo方法DelayQueue使用這個來為元素排序

  下面的實例展示了如何使用阻塞隊列來控制線程集程序在一個目錄及它的所有子目錄下搜索所有文件打印出包含指定關鍵字的文件列表從下面實例可以看出使用阻塞隊列兩個顯著的好處就是多線程操作共同的隊列時不需要額外的同步另外就是隊列會自動平衡負載即那邊(生產與消費兩邊)處理快了就會被阻塞掉從而減少兩邊的處理速度差距下面是具體實現

  Java代碼

  public class BlockingQueueTest {

  public static void main(String[] args) {

  Scanner in = new Scanner(Systemin);

  Systemoutprint(Enter base directory (eg /usr/local/jdk/src): );

  String directory = innextLine();

  Systemoutprint(Enter keyword (eg volatile): );

  String keyword = innextLine();

  final int FILE_QUEUE_SIZE = ;// 阻塞隊列大小

  final int SEARCH_THREADS = ;// 關鍵字搜索線程個數

  // 基於ArrayBlockingQueue的阻塞隊列

  BlockingQueue<File> queue = new ArrayBlockingQueue<File>(

  FILE_QUEUE_SIZE);

  //只啟動一個線程來搜索目錄

  FileEnumerationTask enumerator = new FileEnumerationTask(queue

  new File(directory));

  new Thread(enumerator)start();

  //啟動個線程用來在文件中搜索指定的關鍵字

  for (int i = ; i <= SEARCH_THREADS; i++)

  new Thread(new SearchTask(queue keyword))start();

  }

  }

  class FileEnumerationTask implements Runnable {

  //啞元文件對象放在阻塞隊列最後用來標示文件已被遍歷完

  public static File DUMMY = new File();

  private BlockingQueue<File> queue;

  private File startingDirectory;

  public FileEnumerationTask(BlockingQueue<File> queue File startingDirectory) {

  thisqueue = queue;

  thisstartingDirectory = startingDirectory;

  }

  public void run() {

  try {

  enumerate(startingDirectory);

  queueput(DUMMY);//執行到這裡說明指定的目錄下文件已被遍歷完

  } catch (InterruptedException e) {

  }

  }

  // 將指定目錄下的所有文件以File對象的形式放入阻塞隊列中

  public void enumerate(File directory) throws InterruptedException {

  File[] files = directorylistFiles();

  for (File file : files) {

  if (fileisDirectory())

  enumerate(file);

  else

  //將元素放入隊尾如果隊列滿則阻塞

  queueput(file);

  }

  }

  }

  class SearchTask implements Runnable {

  private BlockingQueue<File> queue;

  private String keyword;

  public SearchTask(BlockingQueue<File> queue String keyword) {

  thisqueue = queue;

  thiskeyword = keyword;

  }

  public void run() {

  try {

  boolean done = false;

  while (!done) {

  //取出隊首元素如果隊列為空則阻塞

  File file = queuetake();

  if (file == FileEnumerationTaskDUMMY) {

  //取出來後重新放入好讓其他線程讀到它時也很快的結束

  queueput(file);

  done = true;

  } else

  search(file);

  }

  } catch (IOException e) {

  eprintStackTrace();

  } catch (InterruptedException e) {

  }

  }

  public void search(File file) throws IOException {

  Scanner in = new Scanner(new FileInputStream(file));

  int lineNumber = ;

  while (inhasNextLine()) {

  lineNumber++;

  String line = innextLine();

  if (ntains(keyword))

  Systemoutprintf(%s:%d:%s%n filegetPath() lineNumber

  line);

  }

  inclose();

  }

  }


From:http://tw.wingwit.com/Article/program/Java/hx/201311/26657.html
    推薦文章
    Copyright © 2005-2022 電腦知識網 Computer Knowledge   All rights reserved.