熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> JSP教程 >> 正文

面向Java開發人員的Scala指南: 深入了解Scala並發性 了解 Scala 如何簡化並發編

2022-06-13   來源: JSP教程 

  了解 Scala 如何簡化並發編程並繞過陷阱

  對於許多(如果不是大多數)Java? 程序員來說Scala 的吸引力在於處理並發性以及編寫線程安全的代碼時非常輕松在本期文章中Ted Neward 將開始深入研究 Scala 語言及環境所提供的各種並發特性和庫

  Herb Sutter 在他的文章 The Free Lunch Is Over 中揭露了行業中最不可告人的一個小秘密他明確論證了處理器在速度上的發展已經走到了盡頭並且將由全新的單芯片上的並行 內核(虛擬 CPU)所取代這一發現對編程社區造成了不小的沖擊因為正確創建線程安全的代碼在理論而非實踐中始終會提高高性能開發人員的身價而讓各公司難以聘用他們看上去僅有少數人充分理解了 Java 的線程模型並發 API 以及 同步 的含義以便能夠編寫同時提供安全性和吞吐量的代碼 —— 並且大多數人已經明白了它的困難所在

  據推測行業的其余部分將自力更生這顯然不是一個理想的結局至少不是 IT 部門努力開發軟件所應得的回報

  與 Scala 在 NET 領域中的姐妹語言 F# 相似Scala 是針對 並發性問題 的解決方案之一在本期文章中我討論了 Scala 的一些屬性這些屬性使它更加勝任於編寫線程安全的代碼比如默認不可修改的對象並討論了一種返回對象副本而不是修改它們內容的首選設計方案Scala 對並發性的支持遠比此深遠現在我們有必要來了解一下 Scala 的各種庫

   關於本系列

  關於本系列 Ted Neward 潛心研究 Scala 編程語言並帶您跟他一起徜徉在這個新的 developerWorks 系列 中您將深入了解 Scala 並看到 Scala 的語言功能的實際效果在進行相關比較時Scala 代碼和 Java 代碼將放在一起展示但(您將發現)Scala 中的許多內容與您在 Java 編程中發現的任何內容都沒有直接關聯而這正是 Scala 的魅力所在!畢竟如果 Java 代碼可以做到的話又何必學習 Scala 呢?

  並發性基礎

  在深入研究 Scala 的並發性支持之前有必要確保您具備了對 Java 基本並發性模型的良好理解因為 Scala 的並發性支持從某種程度上說建立在 JVM 和支持庫所提供的特性和功能的基礎之上為此清單 中的代碼包含了一個已知的 Producer/Consumer 並發性問題(詳見 Sun Java Tutorial 的 Guarded Blocks 小節)注意Java Tutorial 版本並未在其解決方案中使用 ncurrent 類而是擇優使用了 javalangObject 中的較舊的 wait()/notifyAll() 方法

  清單 Producer/Consumer(Java 之前)

   package comtednewardscalaexamplesnotj;

class Producer implements Runnable
{
  private Drop drop;
  private String importantInfo[] = {
    Mares eat oats
    Does eat oats
    Little lambs eat ivy
    A kid will eat ivy too
  };

  public Producer(Drop drop) { thisdrop = drop; }

  public void run()
  {
    for (int i = ; i < importantInfolength; i++)
    {
      dropput(importantInfo[i]);
    }
    dropput(DONE);
  }
}

class Consumer implements Runnable
{
  private Drop drop;

  public Consumer(Drop drop) { thisdrop = drop; }

  public void run()
  {
    for (String message = droptake(); !messageequals(DONE);
         message = droptake())
    {
      Systemoutformat(MESSAGE RECEIVED: %s%n message);
    }
  }
}

class Drop
{
  //Message sent from producer to consumer
  private String message;
 
  //True if consumer should wait for producer to send message
  //false if producer should wait for consumer to retrieve message
  private boolean empty = true;

  //Object to use to synchronize against so as to not leak the
  //this monitor
  private Object lock = new Object();

  public String take()
  {
    synchronized(lock)
    {
      //Wait until message is available
      while (empty)
      {
        try
        {
          lockwait();
        }
        catch (InterruptedException e) {}
      }
      //Toggle status
      empty = true;
      //Notify producer that status has changed
      locknotifyAll();
      return message;
    }
  }

  public void put(String message)
  {
    synchronized(lock)
    {
      //Wait until message has been retrieved
      while (!empty)
      {
        try
        {
          lockwait();
        } catch (InterruptedException e) {}
      }
      //Toggle status
      empty = false;
      //Store message
      ssage = message;
      //Notify consumer that status has changed
      locknotifyAll();
    }
  }
}

public class ProdConSample
{
  public static void main(String[] args)
  {
    Drop drop = new Drop();
    (new Thread(new Producer(drop)))start();
    (new Thread(new Consumer(drop)))start();
  }
}

  注意 我在此處展示的代碼對 Sun 教程解決方案做了少許修改它們提供的代碼存在一個很小的設計缺陷(參見 Java 教程 缺陷

   Java 教程 缺陷
好奇的讀者可能會將此處的代碼與 Java Tutorial 中的代碼進行比較尋找它們之間有哪些不同他們會發現我並未 同步 puttake 方法而是使用了存儲在 Drop 中的 lock 對象其原因非常簡單對象的監測程序永遠都不會封裝在類的內部因此 Java Tutorial 版本允許此代碼打破此規則(顯然很瘋狂)

  public class ProdConSample { public static void main(String[] args) { Drop drop = new Drop(); (new Thread(new Producer(drop)))start(); (new Thread(new Consumer(drop)))start(); synchronized(drop) { Threadsleep( * * * * ); // sleep for years?!? } } }




通過使用私有對象作為鎖定所依托的監測程序此代碼將不會有任何效果從本質上說現在已經封裝了線程安全的實現然後它才能依賴客戶機的優勢正常運行

  Producer/Consumer 問題的核心非常容易理解一個(或多個)生產者實體希望將數據提供給一個(或多個)使用者實體供它們使用和操作(在本例中它包括將數據打印到控制台)Producer 和 Consumer 類是相應直觀的 Runnable實現類Producer 從數組中獲取 String並通過 put 將它們放置到 Consumer 的緩沖區中並根據需要執行 take

  問題的難點在於如果 Producer 運行過快則數據在覆蓋時可能會丟失如果 Consumer 運行過快則當 Consumer 讀取相同的數據兩次時數據可能會得到重復處理緩沖區(在 Java Tutorial 代碼中稱作 Drop)將確保不會出現這兩種情況數據破壞的可能性就更不用提了(在 String 引用的例子中很困難但仍然值得注意)因為數據會由 put 放入緩沖區並由 take 取出

  關於此主題的全面討論請閱讀 Brian Goetz 的 Java Concurrency in Practice 或 Doug Lea 的 Concurrent Programming in Java(參見 參考資料)但是在應用 Scala 之前有必要快速了解一下此代碼的運行原理

  當 Java 編譯器看到 synchronized 關鍵字時它會在同步塊的位置生成一個 try/finally 塊其頂部包括一個 monitorenter 操作碼並且 finally 塊中包括一個 monitorexit 操作碼以確保監控程序(Java 的原子性基礎)已經發布而與代碼退出的方式無關因此Drop 中的 put 代碼將被重寫如清單 所示

  清單 編譯器失效後的 Dropput

     // This is pseudocode
  public void put(String message)
  {
    try
    {
  monitorenter(lock)

      //Wait until message has been retrieved
      while (!empty)
      {
        try
        {
          lockwait();
        } catch (InterruptedException e) {}
      }
      //Toggle status
      empty = false;
      //Store message
      ssage = message;
      //Notify consumer that status has changed
      locknotifyAll();
    }
finally
{
  monitorexit(lock)
}
  }

  wait() 方法將通知當前線程進入非活動狀態並等待另一個線對該對象調用 notifyAll()然後通知的線程必須在能夠繼續執行的時候嘗試再次獲取監控程序從本質上說wait() 和 notify()/notifyAll() 允許一種簡單的信令機制它允許 Drop 在 Producer 和 Consumer 線程之間進行協調每個 put 都有相應的 take

  本文的 代碼下載 部分使用 Java 並發性增強(Lock 和 Condition 接口以及 ReentrantLock 鎖定實現)提供 清單 的基於超時的版本但基本代碼模式仍然相同這就是問題所在編寫清單 這樣的代碼的開發人員需要過度專注於線程和鎖定的細節以及低級實現代碼以便讓它們能夠正確運行此外開發人員需要對每一行代碼刨根知底以確定是否需要保護它們因為過度同步與過少同步同樣有害

  現在我們來看到 Scala 替代方案

  良好的 Scala 並發性 (v

  開始應用 Scala 並發性的一種方法是將 Java 代碼直接轉換為 Scala以便利用 Scala 的語法優勢來簡化代碼(至少能簡化一點)

  清單 ProdConSample (Scala)

   object ProdConSample
{
  class Producer(drop : Drop)
    extends Runnable
  {
    val importantInfo : Array[String] = Array(
      Mares eat oats
      Does eat oats
      Little lambs eat ivy
      A kid will eat ivy too
    );
 
    override def run() : Unit =
    {
      importantInfoforeach((msg) => dropput(msg))
      dropput(DONE)
    }
  }
 
  class Consumer(drop : Drop)
    extends Runnable
  {
    override def run() : Unit =
    {
      var message = droptake()
      while (message != DONE)
      {
        Systemoutformat(MESSAGE RECEIVED: %s%n message)
        message = droptake()
      }
    }
  }
 
  class Drop
  {
    var message : String =
    var empty : Boolean = true
    var lock : AnyRef = new Object()
 
    def put(x: String) : Unit =
      locksynchronized
      {
        // Wait until message has been retrieved
        await (empty == true)
        // Toggle status
        empty = false
        // Store message
        message = x
        // Notify consumer that status has changed
        locknotifyAll()
      }

    def take() : String =
      locksynchronized
      {
        // Wait until message is available
        await (empty == false)
        // Toggle status
        empty=true
        // Notify producer that staus has changed
        locknotifyAll()
        // Return the message
        message
      }

    private def await(cond: => Boolean) =
      while (!cond) { lockwait() }
  }

  def main(args : Array[String]) : Unit =
  {
    // Create Drop
    val drop = new Drop();
 
    // Spawn Producer
    new Thread(new Producer(drop))start();
   
    // Spawn Consumer
    new Thread(new Consumer(drop))start();
  }
}

  Producer 和 Consumer 類幾乎與它們的 Java 同類相同再一次擴展(實現)了 Runnable 接口並覆蓋了 run() 方法並且 — 對於 Producer 的情況 — 分別使用了內置迭代方法來遍歷 importantInfo 數組的內容(實際上為了讓它更像 ScalaimportantInfo 可能應該是一個 List 而不是 Array但在第一次嘗試時我希望盡可能保證它們與原始 Java 代碼一致

  Drop 類同樣類似於它的 Java 版本但 Scala 中有一些例外synchronized 並不是關鍵字它是針對 AnyRef 類定義的一個方法即 Scala 所有引用類型的根這意味著要同步某個特定的對象您只需要對該對象調用同步方法在本例中對 Drop 上的 lock 字段中所保存的對象調用同步方法

  注意我們在 await() 方法定義的 Drop 類中還利用了一種 Scala 機制cond 參數是等待計算的代碼塊而不是在傳遞給該方法之前進行計算在 Scala 中這被稱作 callbyname此處它是一種實用的方法可以捕獲需要在 Java 版本中表示兩次的條件等待邏輯(分別用於 put 和 take)

  最後在 main() 中創建 Drop 實例實例化兩個線程使用 start() 啟動它們然後在 main() 的結束部分退出相信 JVM 會在 main() 結束之前啟動這兩個線程(在生產代碼中可能無法保證這種情況但對於這樣的簡單的例子% 沒有問題

  但是已經說過仍然存在相同的基本問題程序員仍然需要過分擔心兩個線程之間的通信和協調問題雖然一些 Scala 機制可以簡化語法但這目前為止並沒有相當大的吸引力

  Scala 並發性 v

  Scala Library Reference 中有一個有趣的包ncurrency這個包包含許多不同的並發性結構包括我們即將利用的 MailBox 類

  顧名思義MailBox 從本質上說就是 Drop用於在檢測之前保存數據塊的單槽緩沖區但是MailBox 最大的優勢在於它將發送和接收數據的細節完全封裝到模式匹配和 case 類中這使它比簡單的 Drop(或 Drop 的多槽數據保存類 ncurrentBoundedBuffer)更加靈活

  清單 ProdConSample v (Scala)

   package comtednewardscalaexamplesscalaV
{
  import concurrent{MailBox ops}

  object ProdConSample
  {
    class Producer(drop : Drop)
      extends Runnable
    {
      val importantInfo : Array[String] = Array(
        Mares eat oats
        Does eat oats
        Little lambs eat ivy
        A kid will eat ivy too
      );
   
      override def run() : Unit =
      {
        importantInfoforeach((msg) => dropput(msg))
        dropput(DONE)
      }
    }
   
    class Consumer(drop : Drop)
      extends Runnable
    {
      override def run() : Unit =
      {
        var message = droptake()
        while (message != DONE)
        {
          Systemoutformat(MESSAGE RECEIVED: %s%n message)
          message = droptake()
        }
      }
    }

    class Drop
    {
      private val m = new MailBox()
     
      private case class Empty()
      private case class Full(x : String)
     
      m send Empty()  // initialization
     
      def put(msg : String) : Unit =
      {
        m receive
        {
          case Empty() =>
            m send Full(msg)
        }
      }
     
      def take() : String =
      {
        m receive
        {
          case Full(msg) =>
            m send Empty(); msg
        }
      }
    }
 
    def main(args : Array[String]) : Unit =
    {
      // Create Drop
      val drop = new Drop()
     
      // Spawn Producer
      new Thread(new Producer(drop))start();
     
      // Spawn Consumer
      new Thread(new Consumer(drop))start();
    }
  }
}

  此處v 和 v 之間的惟一區別在於 Drop 的實現它現在利用 MailBox 類處理傳入以及從 Drop 中刪除的消息的阻塞和信號事務(我們可以重寫 Producer 和 Consumer讓它們直接使用 MailBox但考慮到簡單性我們假定希望保持所有示例中的 Drop API 相一致)使用 MailBox 與使用典型的 BoundedBuffer(Drop)稍有不同因此我們來仔細看看其代碼

  MailBox 有兩個基本操作send 和 receivereceiveWithin 方法僅僅是基於超時的 receiveMailBox 接收任何類型的消息send() 方法將消息放置到郵箱中並立即通知任何關心該類型消息的等待接收者並將它附加到一個消息鏈表中以便稍後檢索receive() 方法將阻塞直到接收到對於功能塊合適的消息

  因此在這種情況下我們將創建兩個 case 類一個不包含任何內容(Empty)這表示 MailBox 為空另一個包含消息數據(Full

  put 方法由於它會將數據放置在 Drop 中對 MailBox 調用 receive() 以查找 Empty 實例因此會阻塞直到發送 Empty此時它發送一個 Full 實例給包含新數據的 MailBox

  take 方法由於它會從 Drop 中刪除數據對 MailBox 調用 receive() 以查找 Full 實例提取消息(再次得益於模式匹配從 case 類內部提取值並將它們綁到本地變量的能力)並發送一個 Empty 實例給 MailBox不需要明確的鎖定並且不需要考慮監控程序

  Scala 並發性 v

  事實上我們可以顯著縮短代碼只要 Producer 和 Consumer 不需要功能全面的類(此處便是如此) — 兩者從本質上說都是 Runnablerun() 方法的瘦包裝器Scala 可以使用 ncurrentops 對象的 spawn 方法來實現如清單 所示

  清單 ProdConSample v (Scala)

   package comtednewardscalaexamplesscalaV
{
  import concurrentMailBox
  import concurrentops_

  object ProdConSample
  {
    class Drop
    {
      private val m = new MailBox()
     
      private case class Empty()
      private case class Full(x : String)
     
      m send Empty()  // initialization
     
      def put(msg : String) : Unit =
      {
        m receive
        {
          case Empty() =>
            m send Full(msg)
        }
      }
     
      def take() : String =
      {
        m receive
        {
          case Full(msg) =>
            m send Empty(); msg
        }
      }
    }
 
    def main(args : Array[String]) : Unit =
    {
      // Create Drop
      val drop = new Drop()
     
      // Spawn Producer
      spawn
      {
        val importantInfo : Array[String] = Array(
          Mares eat oats
          Does eat oats
          Little lambs eat ivy
          A kid will eat ivy too
        );
       
        importantInfoforeach((msg) => dropput(msg))
        dropput(DONE)
      }
     
      // Spawn Consumer
      spawn
      {
        var message = droptake()
        while (message != DONE)
        {
          Systemoutformat(MESSAGE RECEIVED: %s%n message)
          message = droptake()
        }
      }
    }
  }
}

  spawn 方法(通過包塊頂部的 ops 對象導入)接收一個代碼塊(另一個 byname 參數示例)並將它包裝在匿名構造的線程對象的 run() 方法內部事實上並不難理解 spawn 的定義在 ops 類的內部是什麼樣的

  清單 ncurrentopsspawn()

     def spawn(p: => Unit) = {
    val t = new Thread() { override def run() = p }
    tstart()
  }

  ……這再一次強調了 byname 參數的強大之處

  opsspawn 方法的一個缺點在於它是在 年 Java concurrency 類還不可用的時候編寫的特別是ncurrentExecutor 及其同類的作用是讓開發人員更加輕松地生成線程而不需要實際處理直接創建線程對象的細節幸運的是在您自己的自定義庫中重新創建 spawn 的定義是相當簡單的這需要利用 Executor(或 ExecutorService 或 ScheduledExecutorService)來執行線程的實際啟動任務

  事實上Scala 的並發性支持超越了 MailBox 和 ops 類Scala 還支持一個類似的 Actors 概念它使用了與 MailBox 所采用的方法相類似的消息傳遞方法但應用更加全面並且靈活性也更好但是這部分內容將在下期討論

  結束語

  Scala 為並發性提供了兩種級別的支持這與其他與 Java 相關的主題極為類似

  首先對底層庫的完全訪問(比如說 ncurrent)以及對 傳統 Java 並發性語義的支持(比如說監控程序和 wait()/notifyAll())

  其次這些基本機制上面有一個抽象層詳見本文所討論的 MailBox 類以及將在本系列下一篇文章中討論的 Actors 庫

  兩個例子中的目標是相同的讓開發人員能夠更加輕松地專注於問題的實質而不用考慮並發編程的低級細節(顯然第二種方法更好地實現了這一目標至少對於沒有過多考慮低級細節的人來說是這樣的

  但是當前 Scala 庫的一個明顯的缺陷就是缺乏 Java 支持ncurrentops 類應該具有 spawn 這樣的利用新的 Executor 接口的方法它還應該支持利用新的 Lock 接口的各種版本的 synchronized幸運的是這些都是可以在 Scala 生命周期中實現的庫增強而不會破壞已有代碼它們甚至可以由 Scala 開發人員自己完成而不需要等待 Scala 的核心開發團隊提供給他們(只需要花費少量時間)

   描述 名字 大小 下載方法 本文的示例 Scala 代碼 jscalazip KB HTTP


From:http://tw.wingwit.com/Article/program/Java/JSP/201311/19570.html
    推薦文章
    Copyright © 2005-2022 電腦知識網 Computer Knowledge   All rights reserved.