熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java高級技術 >> 正文

Java多線程同步設計中使用Metux

2013-11-23 19:44:40  來源: Java高級技術 

  Mutex是互斥體廣泛地應用在多線程編程中本文以廣為流程的Doug Lea的concurrent工具包的Mutex實現為例進行一點探討在Doug Lea的concurrent工具包中Mutex實現了Sync接口該接口是concurrent工具包中所有鎖(lock)門(gate)和條件變量(condition)的公共接口Sync的實現類主要有MutexSemaphore及其子類LatchCountDownReentrantLock等這也體現了面向抽象編程的思想使我們可以在不改變代碼或者改變少量代碼的情況下選擇使用Sync的不同實現下面是Sync接口的定義

  

  public interface Sync
{
 public void acquire() throws InterruptedException;
 //獲取許可
 public boolean attempt(long msecs) throws InterruptedException;
 //嘗試獲取許可
 public void release();
 //釋放許可
}

  通過使用Sync可以替代Java synchronized關鍵字並提供更加靈活的同步控制當然並不是說 concurrent工具包是和Java synchronized獨立的技術其實concurrent工具包也是在synchronized的基礎上搭建的從下面對Mutex源碼的解析即可以看到這一點synchronized關鍵字僅在方法內或者代碼塊內有效而使用Sync卻可以跨越方法甚至通過在對象之間傳遞跨越對象進行同步這是Sync及concurrent工具包比直接使用synchronized更加強大的地方

  注意Sync中的acquire()和attempt()都會拋出InterruptedException所以使用Sync及其子類時調用這些方法一定要捕獲InterruptedException而release()方法並不會拋出InterruptedException這是因為在acquire()和attempt()方法中可能會調用wait()等待其它線程釋放鎖而release()在實現上進行了簡化直接釋放鎖不管是否真的持有所以你可以對一個並沒有acquire()的線程調用release()這也不會有什麼問題而由於release()不會拋出InterruptedException所以我們可以在catch或finally子句中調用release()以保證獲得的鎖能夠被正確釋放比如

  

  class X
{
 Sync gate; //
 public void m()
 {
  try
  {
   gateacquire();
   // block until condition holds
   try
   {
    // method body
   }
   finally { gaterelease(); }
  }
  catch (InterruptedException ex) { // evasive action }
 }
}

  Mutex是一個非重入的互斥鎖Mutex廣泛地用在需要跨越方法的before/after類型的同步環境中下面是Doug Lea的concurrent工具包中的Mutex的實現

  

  public class Mutex implements Sync
{
 /** The lock status **/
 protected boolean inuse_ = false;
 public void acquire() throws InterruptedException
 {
  if (Threadinterrupted()) throw new InterruptedException();//()
  synchronized(this)
  {
   try
   {
    while (inuse_) wait();
    inuse_ = true;
   }
   catch (InterruptedException ex)
   {
    //()
    notify();
    throw ex;
   }
  }
 }

  public synchronized void release()
 {
  inuse_ = false;
  notify();
 }

  public boolean attempt(long msecs) throws InterruptedException
 {
  if (Threadinterrupted()) throw new InterruptedException();
  synchronized(this)
  {
   if (!inuse_)
   {
    inuse_ = true;
    return true;
   }
   else if (msecs <= )
    return false;
   else
   {
    long waitTime = msecs;
    long start = SystemcurrentTimeMillis();
    try
    {
     for (;;)
     {
      wait(waitTime);
      if (!inuse_)
      {
       inuse_ = true;
       return true;
      }
      else
      {
       waitTime = msecs (SystemcurrentTimeMillis() start);
       if (waitTime <= ) // ()
        return false;
       }
     }
    }
    catch (InterruptedException ex)
    {
     notify();
     throw ex;
    }
   }
  }
 }
}

  為什麼要在acquire()和attempt(方法的開始都要檢查當前線程的中斷標志呢?這是為了在當前線程已經被打斷時可以立即返回而不會仍然在鎖標志上等待調用一個線程的interrupt()方法根據當前線程所處的狀態可能產生兩種不同的結果當線程在運行過程中被打斷則設置當前線程的中斷標志為true如果當前線程阻塞於wait()sleep()join()則當前線程的中斷標志被清空同時拋出InterruptedException所以在上面代碼的位置()也捕獲了InterruptedException然後再次拋出InterruptedException

  release()方法簡單地重置inuse_標志並通知其它線程

  attempt()方法是利用Java的Objectwait(long)進行計時的由於Objectwait(long)不是一個精確的時鐘所以attempt(long)方法也是一個粗略的計時注意代碼中位置(在超時時返回

  Mutex是Sync的一個基本實現除了實現了Sync接口中的方法外並沒有添加新的方法所以Mutex的使用和Sync的完全一樣在concurrent包的API中Doug給出了一個精細鎖定的List的實現示例我們這兒也給出作為對Mutex和Sync使用的一個例子

  

  class Node
{
 Object item; Node next;
 Mutex lock = new Mutex();
 // 每一個節點都持有一個鎖
 Node(Object x Node n)
 {
  item = x;
  next = n;
 }
}

  class List
{
 protected Node head;
 // 指向列表的頭
 // 使用Java的synchronized保護head域
 // (我們當然可以使用Mutex但是這兒似乎沒有這樣做的必要
 
 protected synchronized Node getHead()
 { return head; }
 boolean search(Object x) throws InterruptedException
 {
  Node p = getHead();
  if (p == null) return false;
  // (這兒可以更加緊湊但是為了演示的清楚各種情況都分別進行處理)
  plockacquire();
  // Prime loop by acquiring first lock
  // (If the acquire fails due to
  // interrupt the method will throw
  // InterruptedException now
  // so there is no need for any
  // further cleanup)
  for (;;)
  {
   if (xequals(em))
   {
    plockrelease();
    // 釋放當前節點的鎖
    return true;
   }
   else
   {
    Node nextp = pnext;
    if (nextp == null)
    {
     plockrelease();
     // 釋放最後持有的鎖
     return false;
    }
    else
    {
     try
     {
      nextplockacquire();
      // 在釋放當前鎖之前獲取下一個節點的鎖
     }
     catch (InterruptedException ex)
     {
      plockrelease();
      // 如果獲取失敗也釋放當前的鎖 throw ex;
     }
     plockrelease();
     // 釋放上個節點的鎖現在已經持有新的鎖了
     p = nextp;
    }
   }
  }
 }
 synchronized void add(Object x)
 {
  // 使用synchronized保護head域
  head = new Node(x head);
 }
 // other similar traversal and update methods
}


From:http://tw.wingwit.com/Article/program/Java/gj/201311/27377.html
    推薦文章
    Copyright © 2005-2013 電腦知識網 Computer Knowledge   All rights reserved.