熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> JSP教程 >> 正文

面向Java開發人員的Scala指南: 深入了解Scala並發性 了解 actor 如何提供新的應

2013-11-15 11:45:44  來源: JSP教程 

  了解 actor 如何提供新的應用程序代碼建模方法

  主要芯片廠商已經開始提供同時運行兩個或更多個核的芯片(雖然不一定更快)在這種情況下並發性很快成為每個軟件開發人員都關心的熱門主題本文延續 Ted Neward 的另一篇文章 深入了解 Scala 並發性在本文中Ted Neward 通過研究 actor 深入討論並發性這個熱門主題actor 是通過傳遞消息相互協作的執行實體

   關於本系列

  Ted Neward 潛心研究 Scala 編程語言並帶您跟他一起徜徉在這個新的 developerWorks 系列 中您將深入了解 Scala 並看到 Scala 的語言功能的實際效果在進行相關比較時Scala 代碼和 Java&#; 代碼將放在一起展示但(您將發現)Scala 中的許多內容與您在 Java 編程中發現的任何內容都沒有直接關聯而這正是 Scala 的魅力所在!畢竟如果 Java 代碼可以做到的話又何必學習 Scala 呢?

  在 前一篇文章 中我討論了構建並發代碼的重要性(無論是否是 Scala 代碼)還討論了在編寫並發代碼時開發人員面對的一些問題包括不要鎖住太多東西不要鎖住太少東西避免死鎖避免生成太多線程等等

  這些理論問題太沉悶了為了避免讀者覺得失望我與您一起研究了 Scala 的一些並發構造首先是在 Scala 中直接使用 Java 語言的並發庫的基本方法然後討論 Scala API 中的 MailBox 類型盡管這兩種方法都是可行的但是它們並不是 Scala 實現並發性的主要機制

  真正提供並發性的是 Scala 的 actor

  什麼是 actor

  actor 實現在稱為 actor 的執行實體之間使用消息傳遞進行協作(注意這裡有意避免使用 進程線程機器 等詞匯)盡管它聽起來與 RPC 機制有點兒相似但是它們是有區別的RPC 調用(比如 Java RMI 調用)會在調用者端阻塞直到服務器端完成處理並發送回某種響應(返回值或異常)而消息傳遞方法不會阻塞調用者因此可以巧妙地避免死鎖

  僅僅傳遞消息並不能避免錯誤的並發代碼的所有問題另外這種方法還有助於使用 不共享任何東西 編程風格也就是說不同的 actor 並不訪問共享的數據結構(這有助於促進封裝 actor無論 actor 是 JVM 本地的還是位於其他地方) — 這樣就完全不需要同步了畢竟如果不共享任何東西並發執行就不涉及任何需要同步的東西

  這不算是對 actor 模型的正規描述而且毫無疑問具有更正規的計算機科學背景的人會找到各種更嚴謹的描述方法能夠描述 actor 的所有細節但是對於本文來說這個描述已經夠了在網上可以找到更詳細更正規的描述還有一些學術文章詳細討論了 actor 背後的概念(請您自己決定是否要深入學習這些概念)現在我們來看看 Scala actors API

  Scala actor

  使用 actor 根本不困難只需使用 Actor 類的 actor 方法創建一個 actor見清單

  清單 開拍!

   import scalaactors_ Actor_

package comtednewardscalaexamplesscalaV
{
  object Actor
  {
    def main(args : Array[String]) =
    {
      val badActor =
        actor
        {
          receive
          {
            case msg => Systemoutprintln(msg)
          }
        }
     
      badActor ! Do ya feel lucky punk?
    }
  }
}

  這裡同時做了兩件事

  首先我們從 Scala Actors 庫的包中導入了這個庫然後從庫中直接導入了 Actor 類的成員第二步並不是完全必要的因為在後面的代碼中可以使用 Actoractor 替代 actor但是這麼做能夠表明 actor 是語言的內置結構並(在一定程度上)提高代碼的可讀性

  下一步是使用 actor 方法創建 actor 本身這個方法通過參數接收一個代碼塊在這裡代碼塊執行一個簡單的 receive(稍後討論)結果是一個 actor它被存儲在一個值引用中供以後使用

  請記住除了消息之外actor 不使用其他通信方法使用 ! 的代碼行實際上是一個向 badActor 發送消息的方法這可能不太直觀Actor 內部還包含另一個 MailBox 元素(已討論)! 方法接收傳遞過來的參數(在這裡是一個字符串)把它發送給郵箱然後立即返回

  消息交付給 actor 之後actor 通過調用它的 receive 方法來處理消息這個方法從郵箱中取出第一個可用的消息把它交付給一個模式匹配塊注意因為這裡沒有指定模式匹配的類型所以任何消息都是匹配的而且消息被綁定到 msg 名稱(為了打印它)

  一定要注意一點對於可以發送的類型沒有任何限制 — 不一定要像前面的示例那樣發送字符串實際上基於 actor 的設計常常使用 Scala case 類攜帶實際消息本身這樣就可以根據 case 類的參數/成員的類型提供隱式的 命令動作或者向動作提供數據

  例如假設希望 actor 用兩個不同的動作來響應發送的消息新的實現可能與清單 相似

  清單 我是導演!

     object Actor
  {
    case class Speak(line : String);
    case class Gesture(bodyPart : String action : String);
    case class NegotiateNewContract;
 
    def main(args : Array[String]) =
    {
      val badActor =
        actor
        {
          receive
          {
            case NegotiateNewContract =>
              Systemoutprintln(I wont do it for less than $ million!)
            case Speak(line) =>
              Systemoutprintln(line)
            case Gesture(bodyPart action) =>
              Systemoutprintln(( + action + s + bodyPart + ))
            case _ =>
              Systemoutprintln(Huh? Ill be in my trailer)
          }
        }
     
      badActor ! NegotiateNewContract
      badActor ! Speak(Do ya feel lucky punk?)
      badActor ! Gesture(face grimaces)
      badActor ! Speak(Well do ya?)
    }
  }

  到目前為止看起來似乎沒問題但是在運行時只協商了新合同在此之後JVM 終止了初看上去似乎是生成的線程無法足夠快地響應消息但是要記住在 actor 模型中並不處理線程只處理消息傳遞這裡的問題其實非常簡單一次接收使用一個消息所以無論隊列中有多少個消息正在等待處理都無所謂因為只有一次接收所以只交付一個消息

  糾正這個問題需要對代碼做以下修改見清單

  把 receive 塊放在一個接近無限的循環中

  創建一個新的 case 類來表示什麼時候處理全部完成了

  清單 現在我是一個更好的導演!

     object Actor
  {
    case class Speak(line : String);
    case class Gesture(bodyPart : String action : String);
    case class NegotiateNewContract;
    case class ThatsAWrap;
 
    def main(args : Array[String]) =
    {
      val badActor =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case NegotiateNewContract =>
                Systemoutprintln(I wont do it for less than $ million!)
              case Speak(line) =>
                Systemoutprintln(line)
              case Gesture(bodyPart action) =>
                Systemoutprintln(( + action + s + bodyPart + ))
              case ThatsAWrap =>
                Systemoutprintln(Great cast party everybody! See ya!)
                done = true
              case _ =>
                Systemoutprintln(Huh? Ill be in my trailer)
            }
          }
        }
     
      badActor ! NegotiateNewContract
      badActor ! Speak(Do ya feel lucky punk?)
      badActor ! Gesture(face grimaces)
      badActor ! Speak(Well do ya?)
      badActor ! ThatsAWrap
    }
  }

  這下行了!使用 Scala actor 就這麼容易

  並發地執行動作

  上面的代碼沒有反映出並發性 — 到目前為止給出的代碼更像是另一種異步的方法調用形式您看不出區別(從技術上說在第二個示例中引入接近無限循環之前的代碼中可以猜出有一定的並發性存在但這只是偶然的證據不是明確的證明)

  為了證明在幕後確實有多個線程存在我們深入研究一下前一個示例

  清單 我要拍特寫了

     object Actor
  {
    case class Speak(line : String);
    case class Gesture(bodyPart : String action : String);
    case class NegotiateNewContract;
    case class ThatsAWrap;
 
    def main(args : Array[String]) =
    {
      def ct =
        Thread + ThreadcurrentThread()getName() + :
      val badActor =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case NegotiateNewContract =>
                Systemoutprintln(ct + I wont do it for less than $ million!)
              case Speak(line) =>
                Systemoutprintln(ct + line)
              case Gesture(bodyPart action) =>
                Systemoutprintln(ct + ( + action + s + bodyPart + ))
              case ThatsAWrap =>
                Systemoutprintln(ct + Great cast party everybody! See ya!)
                done = true
              case _ =>
                Systemoutprintln(ct + Huh? Ill be in my trailer)
            }
          }
        }
     
      Systemoutprintln(ct + Negotiating)
      badActor ! NegotiateNewContract
      Systemoutprintln(ct + Speaking)
      badActor ! Speak(Do ya feel lucky punk?)
      Systemoutprintln(ct + Gesturing)
      badActor ! Gesture(face grimaces)
      Systemoutprintln(ct + Speaking again)
      badActor ! Speak(Well do ya?)
      Systemoutprintln(ct + Wrapping up)
      badActor ! ThatsAWrap
    }
  }

  運行這個新示例就會非常明確地發現確實有兩個不同的線程

  main 線程(所有 Java 程序都以它開始)

  Thread 線程它是 Scala Actors 庫在幕後生成的

  因此在啟動第一個 actor 時本質上已經開始了多線程執行

  但是習慣這種新的執行模型可能有點兒困難因為這是一種全新的並發性考慮方式例如請考慮 前一篇文章 中的 Producer/Consumer 模型那裡有大量代碼尤其是在 Drop 類中我們可以清楚地看到線程之間以及線程與保證所有東西同步的監視器之間有哪些交互活動為了便於參考我在這裡給出前一篇文章中的 V 代碼

  清單 ProdConSamplev (Scala)

   package comtednewardscalaexamplesscalaV
{
  import concurrentMailBox
  import concurrentops_

  object ProdConSample
  {
    class Drop
    {
      private val m = new MailBox()
     
      private case class Empty()
      private case class Full(x : String)
     
      m send Empty()  // initialization
     
      def put(msg : String) : Unit =
      {
        m receive
        {
          case Empty() =>
            m send Full(msg)
        }
      }
     
      def take() : String =
      {
        m receive
        {
          case Full(msg) =>
            m send Empty(); msg
        }
      }
    }
 
    def main(args : Array[String]) : Unit =
    {
      // Create Drop
      val drop = new Drop()
     
      // Spawn Producer
      spawn
      {
        val importantInfo : Array[String] = Array(
          Mares eat oats
          Does eat oats
          Little lambs eat ivy
          A kid will eat ivy too
        );
       
        importantInfoforeach((msg) => dropput(msg))
        dropput(DONE)
      }
     
      // Spawn Consumer
      spawn
      {
        var message = droptake()
        while (message != DONE)
        {
          Systemoutformat(MESSAGE RECEIVED: %s%n message)
          message = droptake()
        }
      }
    }
  }
}

  盡管看到 Scala 如何簡化這些代碼很有意思但是它實際上與原來的 Java 版本沒有概念性差異現在看看如果把 Producer/Consumer 示例的基於 actor 的版本縮減到最基本的形式它會是什麼樣子

  清單 Take 開拍!生產!消費!

     object ProdConSample
  {
    case class Message(msg : String)
   
    def main(args : Array[String]) : Unit =
    {
      val consumer =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case msg =>
                Systemoutprintln(Received message! > + msg)
                done = (msg == DONE)
            }
          }
        }
     
      consumer ! Mares eat oats
      consumer ! Does eat oats
      consumer ! Little lambs eat ivy
      consumer ! Kids eat ivy too
      consumer ! DONE     
    }
  }

  第一個版本確實簡短多了而且在某些情況下可能能夠完成所需的所有工作但是如果運行這段代碼並與以前的版本做比較就會發現一個重要的差異 — 基於 actor 的版本是一個多位置緩沖區而不是我們以前使用的單位置緩沖這看起來是一項改進而不是缺陷但是我們要通過對比確認這一點我們來創建 Drop 的基於 actor 的版本在這個版本中所有對 put() 的調用必須由對 take() 的調用進行平衡

  幸運的是Scala Actors 庫很容易模擬這種功能希望讓 Producer 一直阻塞直到 Consumer 接收了消息實現的方法很簡單讓 Producer 一直阻塞直到它從 Consumer 收到已經接收消息的確認從某種意義上說這就是以前的基於監視器的代碼所做的那個版本通過對鎖對象使用監視器發送這種信號

  在 Scala Actors 庫中最容易的實現方法是使用 !? 方法而不是 ! 方法(這樣就會一直阻塞到收到確認時)(在 Scala Actors 實現中每個 Java 線程都是一個 actor所以回復會發送到與 main 線程隱式關聯的郵箱)這意味著 Consumer 需要發送某種確認這要使用隱式繼承的 reply(它還繼承 receive 方法)見清單

  清單 Take 開拍!

  
  object ProdConSample
  {
    case class Message(msg : String)
   
    def main(args : Array[String]) : Unit =
    {
      val consumer =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case msg =>
                Systemoutprintln(Received message! > + msg)
                done = (msg == DONE)
                reply(RECEIVED)
            }
          }
        }
     
      Systemoutprintln(Sending)
      consumer !? Mares eat oats
      Systemoutprintln(Sending)
      consumer !? Does eat oats
      Systemoutprintln(Sending)
      consumer !? Little lambs eat ivy
      Systemoutprintln(Sending)
      consumer !? Kids eat ivy too
      Systemoutprintln(Sending)
      consumer !? DONE     
    }
  }

  如果喜歡使用 spawn 把 Producer 放在 main() 之外的另一個線程中(這非常接近最初的代碼)那麼代碼可能像清單 這樣

  清單 Take 開拍!

     object ProdConSampleUsingSpawn
  {
    import concurrentops_
 
    def main(args : Array[String]) : Unit =
    {
      // Spawn Consumer
      val consumer =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case msg =>
                Systemoutprintln(MESSAGE RECEIVED: + msg)
                done = (msg == DONE)
                reply(RECEIVED)
            }
          }
        }
   
      // Spawn Producer
      spawn
      {
        val importantInfo : Array[String] = Array(
          Mares eat oats
          Does eat oats
          Little lambs eat ivy
          A kid will eat ivy too
          DONE
        );
       
        importantInfoforeach((msg) => consumer !? msg)
      }
    }
  }

  無論從哪個角度來看基於 actor 的版本都比原來的版本簡單多了讀者只要讓 actor 和隱含的郵箱自己發揮作用即可

  但是這並不簡單actor 模型完全顛覆了考慮並發性和線程安全的整個過程在以前的模型中我們主要關注共享的數據結構(數據並發性)而現在主要關注操作數據的代碼本身的結構(任務並發性)盡可能少共享數據請注意 Producer/Consumer 示例的不同版本的差異在以前的示例中並發功能是圍繞 Drop 類(有界限的緩沖區)顯式編寫的在本文中的版本中Drop 甚至沒有出現重點在於兩個 actor(線程)以及它們之間的交互(通過不共享任何東西的消息)

  當然仍然可以用 actor 構建以數據為中心的並發構造只是必須采用稍有差異的方式請考慮一個簡單的 計數器 對象它使用 actor 消息傳達 incrementget 操作見清單

  清單 Take 計數!

    object CountingSample
  {
    case class Incr
    case class Value(sender : Actor)
    case class Lock(sender : Actor)
    case class UnLock(value : Int)
 
    class Counter extends Actor
    {
      override def act(): Unit = loop()

      def loop(value: int): Unit = {
        receive {
          case Incr()   => loop(value + )
          case Value(a) => a ! value; loop(value)
          case Lock(a)  => a ! value
                           receive { case UnLock(v) => loop(v) }
          case _        => loop(value)
        }
      }
    }
   
    def main(args : Array[String]) : Unit =
    {
      val counter = new Counter
      counterstart()
      counter ! Incr()
      counter ! Incr()
      counter ! Incr()
      counter ! Value(self)
      receive { case cvalue => Consoleprintln(cvalue) }   
      counter ! Incr()
      counter ! Incr()
      counter ! Value(self)
      receive { case cvalue => Consoleprintln(cvalue) }   
    }
  }

  為了進一步擴展 Producer/Consumer 示例清單 給出一個在內部使用 actor 的 Drop 版本(這樣其他 Java 類就可以使用這個 Drop而不需要直接調用 actor 的方法)

  清單 在內部使用 actor 的 Drop

     object ActorDropSample
  {
    class Drop
    {
      private case class Put(x: String)
      private case object Take
      private case object Stop
 
      private val buffer =
        actor
        {
          var data =
          loop
          {
            react
            {
              case Put(x) if data == =>
                data = x; reply()
              case Take if data != =>
                val r = data; data = ; reply(r)
              case Stop =>
                reply(); exit(stopped)
            }
          }
        }
 
      def put(x: String) { buffer !? Put(x) }
      def take() : String = (buffer !? Take)asInstanceOf[String]
      def stop() { buffer !? Stop }
    }
   
    def main(args : Array[String]) : Unit =
    {
      import concurrentops_
   
      // Create Drop
      val drop = new Drop()
     
      // Spawn Producer
      spawn
      {
        val importantInfo : Array[String] = Array(
          Mares eat oats
          Does eat oats
          Little lambs eat ivy
          A kid will eat ivy too
        );
       
        importantInfoforeach((msg) => { dropput(msg) })
        dropput(DONE)
      }
     
      // Spawn Consumer
      spawn
      {
        var message = droptake()
        while (message != DONE)
        {
          Systemoutformat(MESSAGE RECEIVED: %s%n message)
          message = droptake()
        }
        dropstop()
      }
    }
  }

  可以看到這需要更多代碼(和更多的線程因為每個 actor 都在一個線程池內部起作用)但是這個版本的 API 與以前的版本相同它把所有與並發性相關的代碼都放在 Drop 內部這正是 Java 開發人員所期望的

  actor 還有更多特性

  在規模很大的系統中讓每個 actor 都由一個 Java 線程支持是非常浪費資源的尤其是在 actor 的等待時間比處理時間長的情況下在這些情況下基於事件的 actor 可能更合適這種 actor 實際上放在一個閉包中閉包捕捉 actor 的其他動作也就是說現在並不通過線程狀態和寄存器表示代碼塊(函數)當一個消息到達 actor 時(這時顯然需要活動的線程)觸發閉包閉包在它的活動期間借用一個活動的線程然後通過回調本身終止或進入 等待 狀態這樣就會釋放線程(請參見 參考資料 中 Haller/Odersky 的文章)

  在 Scala Actors 庫中這要使用 react 方法而不是前面使用的 receive使用 react 的關鍵是在形式上 react 不能返回所以 react 中的實現必須重復調用包含 react 塊的代碼塊簡便方法是使用 loop 結構創建一個接近無限的循環這意味著 清單 中的 Drop 實現實際上只通過借用調用者的線程執行操作這會減少執行所有操作所需的線程數(在實踐中我還沒有見過在簡單的示例中出現這種效果所以我想我們只能暫且相信 Scala 設計者的說法)

  在某些情況下可能選擇通過派生基本的 Actor 類(在這種情況下必須定義 act 方法否則類仍然是抽象的)創建一個新類它隱式地作為 actor 執行盡管這是可行的但是這種思想在 Scala 社區中不受歡迎在一般情況下我在這裡描述的方法(使用 Actor 對象中的 actor 方法)是創建 actor 的首選方法

  結束語

  因為 actor 編程需要與 傳統 對象編程不同的風格所以在使用 actor 時要記住幾點

  首先actor 的主要能力來源於消息傳遞風格而不采用阻塞調用風格這是它的主要特點(有意思的是也有使用消息傳遞作為核心機制的面向對象語言最知名的兩個例子是 ObjectiveC 和 Smalltalk還有 ThoughtWorker 的 Ola Bini 新創建的 Ioke)如果創建直接或間接擴展 Actor 的類那麼要確保對對象的所有調用都通過消息傳遞進行

  第二因為可以在任何時候交付消息而且更重要的是在發送和接收之間可能有相當長的延遲所以一定要確保消息攜帶正確地處理它們所需的所有狀態這種方式會

  讓代碼更容易理解(因為消息攜帶處理所需的所有狀態)

  減少 actor 訪問某些地方的共享狀態的可能性從而減少發生死鎖或其他並發性問題的機會

  第三actor 應該不會阻塞您從前面的內容應該能夠看出這一點從本質上說阻塞是導致死鎖的原因代碼可能產生的阻塞越少發生死鎖的可能性就越低

  很有意思的是如果您熟悉 Java Message Service (JMS) API就會發現我給出的這些建議在很大程度上也適用於 JMS — 畢竟actor 消息傳遞風格只是在實體之間傳遞消息JMS 消息傳遞也是在實體之間傳遞消息它們的差異在於JMS 消息往往比較大在層和進程級別上操作而 actor 消息往往比較小在對象和線程級別上操作如果您掌握了 JMSactor 也不難掌握

  actor 並不是解決所有並發性問題的萬靈藥但是它們為應用程序或庫代碼的建模提供了一種新的方式所用的構造相當簡單明了盡管它們的工作方式有時與您預期的不一樣但是一些行為正是我們所熟悉的 — 畢竟我們在最初使用對象時也有點不習慣只要經過努力您也會掌握並喜歡上 actor


From:http://tw.wingwit.com/Article/program/Java/JSP/201311/19572.html
    推薦文章
    Copyright © 2005-2013 電腦知識網 Computer Knowledge   All rights reserved.