熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> PHP編程 >> 正文

PHP中使用協同程序實現合作多任務

2022-06-13   來源: PHP編程 

  PHP一個比較好的新功能是實現對生成器和協同程序的支持對於生成器PHP的文檔和各種其他的博客文章(就像這一個或這一個)已經有了非常詳細的講解協同程序相對受到的關注就少了所以協同程序雖然有很強大的功能但也很難被知曉解釋起來也比較困難

  這篇文章指導你通過使用協同程序來實施任務調度通過實例實現對技術的理解我將在前三節做一個簡單的背景介紹如果你已經有了比較好的基礎可以直接跳到“協同多任務處理”一節

  生成器

  生成器最基本的思想也是一個函數這個函數的返回值是依次輸出而不是只返回一個單獨的值或者換句話說生成器使你更方便的實現了迭代器接口下面通過實現一個xrange函數來簡單說明

復制代碼 代碼如下:
<?php
function xrange($start $end $step = ) {
    for ($i = $start; $i <= $end; $i += $step) {
        yield $i;
    }
}

  foreach (xrange( ) as $num) {
    echo $num "n";
}

  上面這個xrange()函數提供了和PHP的內建函數range()一樣的功能但是不同的是range()函數返回的是一個包含屬組值從萬的數組(注請查看手冊)而xrange()函數返回的是依次輸出這些值的一個迭代器而且並不會真正以數組形式計算

  這種方法的優點是顯而易見的它可以讓你在處理大數據集合的時候不用一次性的加載到內存中甚至你可以處理無限大的數據流

  當然也可以不同通過生成器來實現這個功能而是可以通過繼承Iterator接口實現通過使用生成器實現起來會更方便而不用再去實現iterator接口中的個方法了

生成器為可中斷的函數
要從生成器認識協同程序理解它們內部是如何工作的非常重要生成器是可中斷的函數在它裡面yield構成了中斷點 

  緊接著上面的例子如果你調用xrange()的話xrange()函數裡代碼沒有真正地運行相反PHP只是返回了一個實現了迭代器接口的 生成器類實例 
 

復制代碼 代碼如下:
<?php
$range = xrange( );
var_dump($range); // object(Generator)#
var_dump($range instanceof Iterator); // bool(true)

  你對某個對象調用迭代器方法一次其中的代碼運行一次例如如果你調用$range>rewind()那麼xrange()裡的代碼運 行到控制流 第一次出現yield的地方在這種情況下這就意味著當$i=$start時yield $i才運行傳遞給yield語句的值是使用$range>current()獲取的

 為了繼續執行生成器中的代碼你必須 調用$range>next()方法這將再次啟動生成器直到yield語句出現因此連續調用next()和current()方法 你將能從生成器裡獲得所有的值直到某個點沒有再出現yield語句對xrange()來說這種情形出現在$i超過$end時在這中情況下 控制流將到達函數的終點因此將不執行任何代碼一旦這種情況發生vaild()方法將返回假這時迭代結束

協程

  協程給上面功能添加的主要東西是回送數據給生成器的能力這將把生成器到調用者的單向通信轉變為兩者之間的雙向通信
通過調用生成器的send()方法而不是其next()方法傳遞數據給協程下面的logger()協程是這種通信如何運行的例子 

復制代碼 代碼如下:
<?php

  function logger($fileName) {
    $fileHandle = fopen($fileName a);
    while (true) {
        fwrite($fileHandle yield "n");
    }
}

  $logger = logger(__DIR__ /log);
$logger>send(Foo);
$logger>send(Bar)

  正如你能看到這兒yield沒有作為一個語句來使用而是用作一個表達式即它有一個返回值yield的返回值是傳遞給send()方法的值 在這個例子裡yield將首先返回"Foo"然後返回"Bar"

  上面的例子裡yield僅作為接收者混合兩種用法是可能的即既可接收也可發送接收和發送通信如何進行的例子如下

復制代碼 代碼如下:
<?php

  function gen() {
    $ret = (yield yield);
    var_dump($ret);
    $ret = (yield yield);
    var_dump($ret);
}

  $gen = gen();
var_dump($gen>current());    // string() "yield"
var_dump($gen>send(ret)); // string() "ret"   (the first var_dump in gen)
                              // string() "yield" (the var_dump of the >send() return value)
var_dump($gen>send(ret)); // string() "ret"   (again from within gen)
                              // NULL               (the return value of >send())

  馬上理解輸出的精確順序有點困難因此確定你知道為什按照這種方式輸出我願意特別指出的有兩點第一點yield表達式兩邊使用 圓括號不是偶然由於技術原因(雖然我已經考慮為賦值增加一個異常就像Python那樣)圓括號是必須的第二點你可能已經注意到 調用current()之前沒有調用rewind()如果是這麼做的那麼已經隱含地執行了rewind操作 

  多任務協作

  如果閱讀了上面的logger()例子那麼你認為“為了雙向通信我為什麼要使用協程呢? 為什麼我不能只用常見的類呢?”你這麼問完全正確上面的例子演示了基本用法然而上下文中沒有真正的展示出使用協程的優點這就是列舉許多協程例子的 理由正如上面介紹裡提到的協程是非常強大的概念不過這樣的應用很稀少而且常常十分復雜給出一些簡單而真實的例子很難

  在這篇文章裡我決定去做的是使用協程實現多任務協作我們盡力解決的問題是你想並發地運行多任務(或者“程序”)不過處理器在一個時刻只能運行 一個任務(這篇文章的目標是不考慮多核的)因此處理器需要在不同的任務之間進行切換而且總是讓每個任務運行 “一小會兒” 

  多任務協作這個術語中的“協作”說明了如何進行這種切換的它要求當前正在運行的任務自動把控制傳回給調度器這樣它就可以運行其他任務了這與 “搶占”多任務相反搶占多任務是這樣的調度器可以中斷運行了一段時間的任務不管它喜歡還是不喜歡協作多任務在Windows的早期版本 (windows)和Mac OS中有使用不過它們後來都切換到使用搶先多任務了理由相當明確如果你依靠程序自動傳回 控制的話那麼壞行為的軟件將很容易為自身占用整個CPU不與其他任務共享 

  這個時候你應當明白協程和任務調度之間的聯系yield指令提供了任務中斷自身的一種方法然後把控制傳遞給調度器因此協程可以運行多個其他任務更進一步來說yield可以用來在任務和調度器之間進行通信

我們的目的是 對 “任務”用更輕量級的包裝的協程函數:
 

復制代碼 代碼如下:
<?php

  class Task {
    protected $taskId;
    protected $coroutine;
    protected $sendValue = null;
    protected $beforeFirstYield = true;

  public function __construct($taskId Generator $coroutine) {
        $this>taskId = $taskId;
        $this>coroutine = $coroutine;
    }

  public function getTaskId() {
        return $this>taskId;
    }

  public function setSendValue($sendValue) {
        $this>sendValue = $sendValue;
    }

  public function run() {
        if ($this>beforeFirstYield) {
            $this>beforeFirstYield = false;
            return $this>coroutine>current();
        } else {
            $retval = $this>coroutine>send($this>sendValue);
            $this>sendValue = null;
            return $retval;
        }
    }

  public function isFinished() {
        return !$this>coroutine>valid();
    }
}

  一個任務是用 任務ID標記一個協程使用setSendValue()方法你可以指定哪些值將被發送到下次的恢復(在之後你會了解到我們需要這個) run()函數確實沒有做什麼除了調用send()方法的協同程序要理解為什麼添加beforeFirstYieldflag需要考慮下面的代碼片 段

復制代碼 代碼如下:
<?php

  function gen() {
    yield foo;
    yield bar;
}

  $gen = gen();
var_dump($gen>send(something));

  // As the send() happens before the first yield there is an implicit rewind() call
// so what really happens is this:
$gen>rewind();
var_dump($gen>send(something));

  // The rewind() will advance to the first yield (and ignore its value) the send() will
// advance to the second yield (and dump its value) Thus we loose the first yielded value!

  通過添加 beforeFirstYieldcondition 我們可以確定 first yield 的值 被返回 

  調度器現在不得不比多任務循環要做稍微多點了然後才運行多任務
 

復制代碼 代碼如下:
<?php

  class Scheduler {
    protected $maxTaskId = ;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;

  public function __construct() {
        $this>taskQueue = new SplQueue();
    }

  public function newTask(Generator $coroutine) {
        $tid = ++$this>maxTaskId;
        $task = new Task($tid $coroutine);
        $this>taskMap[$tid] = $task;
        $this>schedule($task);
        return $tid;
    }

  public function schedule(Task $task) {
        $this>taskQueue>enqueue($task);
    }

  public function run() {
        while (!$this>taskQueue>isEmpty()) {
            $task = $this>taskQueue>dequeue();
            $task>run();

  if ($task>isFinished()) {
                unset($this>taskMap[$task>getTaskId()]);
            } else {
                $this>schedule($task);
            }
        }
    }
}

  newTask()方法(使用下一個空閒的任務id)創建一個新任務然後把這個任務放入任務映射數組裡接著它通過把任務放入任務隊列裡來實現 對任務的調度接著run()方法掃描任務隊列運行任務如果一個任務結束了那麼它將從隊列裡刪除否則它將在隊列的末尾再次被調度
 讓我們看看下面具有兩個簡單(並且沒有什麼意義)任務的調度器 

復制代碼 代碼如下:
<?php

  function task() {
    for ($i = ; $i <= ; ++$i) {
        echo "This is task iteration $in";
        yield;
    }
}

  function task() {
    for ($i = ; $i <= ; ++$i) {
        echo "This is task iteration $in";
        yield;
    }
}

  $scheduler = new Scheduler;

  $scheduler>newTask(task());
$scheduler>newTask(task());

  $scheduler>run();

  兩個任務都僅僅回顯一條信息然後使用yield把控制回傳給調度器輸出結果如下

復制代碼 代碼如下:
 This is task iteration
This is task iteration
This is task iteration
This is task iteration
This is task iteration
This is task iteration
This is task iteration
This is task iteration
This is task iteration
This is task iteration
This is task iteration
This is task iteration
This is task iteration
This is task iteration
This is task iteration
 

  輸出確實如我們所期望的對前五個迭代來說兩個任務是交替運行的接著第二個任務結束後只有第一個任務繼續運行  

與調度器之間通信

  既然調度器已經運行了那麼我們就轉向日程表的下一項任務和調度器之間的通信我們將使用進程用來和操作系統會話的同樣的方式來通信系統調用 我們需要系統調用的理由是操作系統與進程相比它處在不同的權限級別上因此為了執行特權級別的操作(如殺死另一個進程)就不得不以某種方式把控制傳回給 內核這樣內核就可以執行所說的操作了再說一遍這種行為在內部是通過使用中斷指令來實現的過去使用的是通用的int指令如今使用的是更特殊並且更 快速的syscall/sysenter指令

  我們的任務調度系統將反映這種設計不是簡單地把調度器傳遞給任務(這樣久允許它做它想做的任何事)我們將通過給yield表達式傳遞信息來與系統調用通信這兒yield即是中斷也是傳遞信息給調度器(和從調度器傳遞出信息)的方法 

  為了說明系統調用我將對可調用的系統調用做一個小小的封裝
 

復制代碼 代碼如下:
<?php

  class SystemCall {
    protected $callback;

  public function __construct(callable $callback) {
        $this>callback = $callback;
    }

  public function __invoke(Task $task Scheduler $scheduler) {
        $callback = $this>callback; // Cant call it directly in PHP :/
        return $callback($task $scheduler);
    }
}

  它將像其他任何可調用那樣(使用_invoke)運行不過它要求調度器把正在調用的任務和自身傳遞給這個函數為了解決這個問題 我們不得不微微的修改調度器的run方法

復制代碼 代碼如下:
<?php
public function run() {
    while (!$this>taskQueue>isEmpty()) {
        $task = $this>taskQueue>dequeue();
        $retval = $task>run();

  if ($retval instanceof SystemCall) {
            $retval($task $this);
            continue;
        }

  if ($task>isFinished()) {
            unset($this>taskMap[$task>getTaskId()]);
        } else {
            $this>schedule($task);
        }
    }
}

  第一個系統調用除了返回任務ID外什麼都沒有做

復制代碼 代碼如下:
<?php
function getTaskId() {
    return new SystemCall(function(Task $task Scheduler $scheduler) {
        $task>setSendValue($task>getTaskId());
        $scheduler>schedule($task);
    });
}

  這個函數確實設置任務id為下一次發送的值並再次調度了這個任務由於使用了系統調用所以調度器不能自動調用任務我們需要手工調度任務(稍後你將明白為什麼這麼做)要使用這個新的系統調用的話我們要重新編寫以前的例子

復制代碼 代碼如下:
<?php

  function task($max) {
    $tid = (yield getTaskId()); // < heres the syscall!
    for ($i = ; $i <= $max; ++$i) {
        echo "This is task $tid iteration $in";
        yield;
    }
}

  $scheduler = new Scheduler;

  $scheduler>newTask(task());
$scheduler>newTask(task());

  $scheduler>run();

  這段代碼將給出與前一個例子相同的輸出注意系統調用同其他任何調用一樣正常地運行不過預先增加了yield要創建新的任務然後再殺死它們的話需要兩個以上的系統調用  

復制代碼 代碼如下:
<?php

  function newTask(Generator $coroutine) {
    return new SystemCall(
        function(Task $task Scheduler $scheduler) use ($coroutine) {
            $task>setSendValue($scheduler>newTask($coroutine));
            $scheduler>schedule($task);
        }
    );
}

  function killTask($tid) {
    return new SystemCall(
        function(Task $task Scheduler $scheduler) use ($tid) {
            $task>setSendValue($scheduler>killTask($tid));
            $scheduler>schedule($task);
        }
    );
}

  killTask函數需要在調度器裡增加一個方法

復制代碼 代碼如下:
<?php

  public function killTask($tid) {
    if (!isset($this>taskMap[$tid])) {
        return false;
    }

  unset($this>taskMap[$tid]);

  // This is a bit ugly and could be optimized so it does not have to walk the queue
    // but assuming that killing tasks is rather rare I wont bother with it now
    foreach ($this>taskQueue as $i => $task) {
        if ($task>getTaskId() === $tid) {
            unset($this>taskQueue[$i]);
            break;
        }
    }

  return true;
}

  用來測試新功能的微腳本  

復制代碼 代碼如下:
<?php

  function childTask() {
    $tid = (yield getTaskId());
    while (true) {
        echo "Child task $tid still alive!n";
        yield;
    }
}

  function task() {
    $tid = (yield getTaskId());
    $childTid = (yield newTask(childTask()));

  for ($i = ; $i <= ; ++$i) {
        echo "Parent task $tid iteration $in";
        yield;

  if ($i == ) yield killTask($childTid);
    }
}

  $scheduler = new Scheduler;
$scheduler>newTask(task());
$scheduler>run();

  這段代碼將打印以下信息

復制代碼 代碼如下:
Parent task iteration
Child task still alive!
Parent task iteration
Child task still alive!
Parent task iteration
Child task still alive!
Parent task iteration
Parent task iteration
Parent task iteration

  經過三次迭代以後子任務將被殺死因此這就是"Child is still alive"消息結束的時候可能應當指出的是這不是真正的父子關系 因為甚至在父任務結束後子任務仍然可以運行或者子任務可以殺死父任務可以修改調度器使它具有更層級化的任務結構不過 在這篇文章裡我沒有這麼做 

  你可以實現許多進程管理調用例如 wait(它一直等待到任務結束運行時)exec(它替代當前任務)和fork(它創建一個 當前任務的克隆)fork非常酷而且你可以使用PHP的協程真正地實現它因為它們都支持克隆 

  然而讓我們把這些留給有興趣的讀者吧我們去看下一個議題

幾點人
翻譯於 天前
人頂
頂 翻譯的不錯哦!
 

  非阻塞IO
很明顯我們的任務管理系統的真正很酷的應用是web服務器它有一個任務是在套接字上偵聽是否有新連接當有新連接要建立的時候  它創建一個新任務來處理新連接
 web服務器最難的部分通常是像讀數據這樣的套接字操作是阻塞的例如PHP將等待到客戶端完成發送為止對一個WEB服務器來說這 根本不行這就意味著服務器在一個時間點上只能處理一個連接

解決方案是確保在真正對套接字讀寫之前該套接字已經“准備就緒”為了查找哪個套接字已經准備好讀或者寫了可以使用 流選擇函數 

  首先讓我們添加兩個新的 syscall它們將等待直到指定 socket 准備好

復制代碼 代碼如下:
 <?php     
 function waitForRead($socket) { 
     return new SystemCall( 
         function(Task $task Scheduler $scheduler) use ($socket) { 
             $scheduler>waitForRead($socket $task); 
         } 
     ); 
 } 

 function waitForWrite($socket) { 
     return new SystemCall( 
         function(Task $task Scheduler $scheduler) use ($socket) { 
             $scheduler>waitForWrite($socket $task); 
         } 
     ); 
 }
這些 syscall 只是在調度器中代理其各自的方法  復制代碼 代碼如下:
 <?php 

 // resourceID => [socket tasks] 
 protected $waitingForRead = []; 
 protected $waitingForWrite = []; 

 public function waitForRead($socket Task $task) { 
     if (isset($this>waitingForRead[(int) $socket])) { 
         $this>waitingForRead[(int) $socket][][] = $task; 
     } else { 
         $this>waitingForRead[(int) $socket] = [$socket [$task]]; 
     } 
 } 

 public function waitForWrite($socket Task $task) { 
     if (isset($this>waitingForWrite[(int) $socket])) { 
         $this>waitingForWrite[(int) $socket][][] = $task; 
     } else { 
         $this>waitingForWrite[(int) $socket] = [$socket [$task]]; 
     } 
 }
 
waitingForRead 及 waitingForWrite 屬性是兩個承載等待的socket 及等待它們的任務的數組有趣的部分在於下面的方法它將檢查 socket 是否可用並重新安排各自任務 復制代碼 代碼如下:
 <?php 

 protected function ioPoll($timeout) { 
     $rSocks = []; 
     foreach ($this>waitingForRead as list($socket)) { 
         $rSocks[] = $socket; 
     } 

     $wSocks = []; 
     foreach ($this>waitingForWrite as list($socket)) { 
         $wSocks[] = $socket; 
     } 

     $eSocks = []; // dummy 

     if (!stream_select($rSocks $wSocks $eSocks $timeout)) { 
         return; 
     } 

     foreach ($rSocks as $socket) { 
         list( $tasks) = $this>waitingForRead[(int) $socket]; 
         unset($this>waitingForRead[(int) $socket]); 

         foreach ($tasks as $task) { 
             $this>schedule($task); 
         } 
     } 

     foreach ($wSocks as $socket) { 
         list( $tasks) = $this>waitingForWrite[(int) $socket]; 
         unset($this>waitingForWrite[(int) $socket]); 

         foreach ($tasks as $task) { 
             $this>schedule($task); 
         } 
     } 
 }
 
stream_select 函數接受承載讀取寫入以及待檢查的socket的數組(我們無需考慮最後一類)數組將按引用傳遞函數只會保留那些狀態改變了的數組元素我們可以遍歷這些數組並重新安排與之相關的任務

為了正常地執行上面的輪詢動作我們將在調度器裡增加一個特殊的任務 復制代碼 代碼如下:
 <?php 
 protected function ioPollTask() { 
     while (true) { 
         if ($this>taskQueue>isEmpty()) { 
             $this>ioPoll(null); 
         } else { 
             $this>ioPoll(); 
         } 
         yield; 
     } 
 }
 

  需要在某個地方注冊這個任務例如你可以在run()方法的開始增 加$this>newTask($this>ioPollTask())然後就像其他 任務一樣每執行完整任務循環一次就執行輪詢操作一次(這麼做一定不是最好的方法)ioPollTask將使用秒的超時來調用ioPoll 這意味著stream_select將立即返回(而不是等待) 
只有任務隊列為空時我們才使用null超時這意味著它一直等到某個套接口准備就緒如果我們沒有這麼做那麼輪詢任務將一而再 再而三的循環運行直到有新的連接建立這將導致%的CPU利用率相反讓操作系統做這種等待會更有效

現在編寫服務器相對容易了

復制代碼 代碼如下:
 <?php 

 function server($port) { 
     echo "Starting server at port $portn"; 

     $socket = @stream_socket_server("tcp://localhost:$port" $errNo $errStr); 
     if (!$socket) throw new Exception($errStr $errNo); 

     stream_set_blocking($socket ); 

     while (true) { 
         yield waitForRead($socket); 
         $clientSocket = stream_socket_accept($socket ); 
         yield newTask(handleClient($clientSocket)); 
     } 
 } 

 function handleClient($socket) { 
     yield waitForRead($socket); 
     $data = fread($socket ); 

     $msg = "Received following request:nn$data"; 
     $msgLength = strlen($msg); 

     $response = <<<RES 
 HTTP/ OKr 
 ContentType: text/plainr 
 ContentLength: $msgLengthr 
 Connection: closer 
 r 
 $msg
 RES; 

     yield waitForWrite($socket); 
     fwrite($socket $response); 

     fclose($socket); 
 } 

 $scheduler = new Scheduler; 
 $scheduler>newTask(server()); 
 $scheduler>run();
 
這段代碼將接收到localhost:上的連接然後僅僅返回發送來的內容作為HTTP響應要做“實際”的事情的話就愛哪個非常復雜(處理 HTTP請求可能已經超出了這篇文章的范圍)上面的代碼片段只是演示了一般性的概念

你 可以使用類似於ab n c localhost:/這樣命令來測試服務器這條命令將向服務器發送個請求並且其中個請求將同時到達使用這樣的數目我得 到了處於中間的毫秒的響應時間不過還有一個問題有少數幾個請求真正處理的很慢(如秒) 這就是為什麼總吞吐量只有請求/秒(如果是毫秒的響應時間的話總的吞吐量應該更像是請求/秒)調高並發數(比如 c )服務器大多數運行良好不過某些連接將拋出“連接被對方重置”的錯誤由於我對低級別的socket資料了解的非常少所以 我不能指出問題出在哪兒

協程堆棧
如果你試圖用我們的調度系統建立更大的系統的話你將很快遇到問題我們習慣了把代碼分解為更小的函數然後調用它們然而 如果使用了協程的話就不能這麼做了例如看下面代碼 復制代碼 代碼如下:
 <?php 

 function echoTimes($msg $max) { 
     for ($i = ; $i <= $max; ++$i) { 
         echo "$msg iteration $in"; 
         yield; 
     } 
 } 

 function task() { 
     echoTimes(foo ); // print foo ten times 
     echo "n"; 
     echoTimes(bar ); // print bar five times 
     yield; // force it to be a coroutine 
 } 

 $scheduler = new Scheduler; 
 $scheduler>newTask(task()); 
 $scheduler>run();
 

  這段代碼試圖把重復循環“輸出n次“的代碼嵌入到一個獨立的協程裡然後從主任務裡調用它然而它無法運行正如在這篇文章的開始  所提到的調用生成器(或者協程)將沒有真正地做任何事情它僅僅返回一個對象這也出現在上面的例子裡echoTimes調用除了放回一個(無用的) 協程對象外不做任何事情 

為了仍然允許這麼做我們需要在這個裸協程上寫一個小小的封裝我們將調用它“協程堆棧”因為它將管理嵌套的協程調用堆棧 這將是通過生成協程來調用子協程成為可能

 $retval = (yield someCoroutine($foo $bar));

  使用yield子協程也能再次返回值

  yield retval("Im a return value!");

  retval函數除了返回一個值的封裝外沒有做任何其他事情這個封裝將表示它是一個返回值

復制代碼 代碼如下:
 <?php 

 class CoroutineReturnValue { 
     protected $value; 

     public function __construct($value) { 
         $this>value = $value; 
     } 

     public function getValue() { 
         return $this>value; 
     } 
 } 

 function retval($value) { 
     return new CoroutineReturnValue($value); 
 }

  為了把協程轉變為協程堆棧(它支持子調用)我們將不得不編寫另外一個函數(很明顯它是另一個協程)

復制代碼 代碼如下:
 <?php 

 function stackedCoroutine(Generator $gen) { 
     $stack = new SplStack; 

     for (;;) { 
         $value = $gen>current(); 

         if ($value instanceof Generator) { 
             $stack>push($gen); 
             $gen = $value; 
             continue; 
         } 

         $isReturnValue = $value instanceof CoroutineReturnValue; 
         if (!$gen>valid() || $isReturnValue) { 
             if ($stack>isEmpty()) { 
                 return; 
             } 

             $gen = $stack>pop(); 
             $gen>send($isReturnValue ? $value>getValue() : NULL); 
             continue; 
         } 

         $gen>send(yield $gen>key() => $value); 
     } 
 }
 
 這 個函數在調用者和當前正在運行的子協程之間扮演著簡單代理的角色在$gen>send(yield $gen>key()=>$value)這行完成了代理功能另外它檢查返回值是否是生成器萬一是生成器的話它將開始運行這個生成 器並把前一個協程壓入堆棧裡一旦它獲得了CoroutineReturnValue的話它將再次請求堆棧彈出然後繼續執行前一個協程 

為了使協程堆棧在任務裡可用任務構造器裡的$thiscoroutine =$coroutine;這行需要替代為$this>coroutine = StackedCoroutine($coroutine); 
現在我們可以稍微改進上面web服務器例子把wait+read(和wait+write和warit+accept)這樣的動作分組為函數為了分組相關的 功能我將使用下面類 復制代碼 代碼如下:
 <?php 

 class CoSocket { 
     protected $socket; 

     public function __construct($socket) { 
         $this>socket = $socket; 
     } 

     public function accept() { 
         yield waitForRead($this>socket); 
         yield retval(new CoSocket(stream_socket_accept($this>socket ))); 
     } 

     public function read($size) { 
         yield waitForRead($this>socket); 
         yield retval(fread($this>socket $size)); 
     } 

     public function write($string) { 
         yield waitForWrite($this>socket); 
         fwrite($this>socket $string); 
     } 

     public function close() { 
         @fclose($this>socket); 
     } 
 }
 
 現在服務器可以編寫的稍微簡潔點了 復制代碼 代碼如下:
 <?php 

 function server($port) { 
     echo "Starting server at port $portn"; 

     $socket = @stream_socket_server("tcp://localhost:$port" $errNo $errStr); 
     if (!$socket) throw new Exception($errStr $errNo); 

     stream_set_blocking($socket ); 

     $socket = new CoSocket($socket); 
     while (true) { 
         yield newTask( 
             handleClient(yield $socket>accept()) 
         ); 
     } 
 } 

 function handleClient($socket) { 
     $data = (yield $socket>read()); 

     $msg = "Received following request:nn$data"; 
     $msgLength = strlen($msg); 

     $response = <<<RES 
 HTTP/ OKr 
 ContentType: text/plainr 
 ContentLength: $msgLengthr 
 Connection: closer 
 r 
 $msg
 RES; 

     yield $socket>write($response); 
     yield $socket>close(); 
 }

錯誤處理
作為一個優秀的程序員相信你已經察覺到上面的例子缺少錯誤處理幾乎所有的 socket 都是易出錯的我這樣做的原因一方面固然是因為錯誤處理的乏味(特別是 socket!)另一方面也在於它很容易使代碼體積膨脹
不過我仍然了一講一下常見的協程錯誤處理協程允許使用 throw() 方法在其內部拋出一個錯誤盡管此方法還未在 PHP 中實現但我很快就會提交它就在今天
throw() 方法接受一個 Exception並將其拋出到協程的當前懸掛點看看下面代碼 復制代碼 代碼如下:
 <?php 

 function gen() { 
     echo "Foon"; 
     try { 
         yield; 
     } catch (Exception $e) { 
         echo "Exception: {$e>getMessage()}n"; 
     } 
     echo "Barn"; 
 } 

 $gen = gen(); 
 $gen>rewind();                     // echos "Foo" 
 $gen>throw(new Exception(Test)); // echos "Exception: Test" 
                                     // and "Bar"
這非常棒因為我們可以使用系統調用以及子協程調用異常拋出對與系統調用Scheduler::run() 方法需要一些小調整 復制代碼 代碼如下:
 <?php 

 if ($retval instanceof SystemCall) { 
     try { 
         $retval($task $this); 
     } catch (Exception $e) { 
         $task>setException($e); 
         $this>schedule($task); 
     } 
     continue; 
 }
 
Task 類也許要添加 throw 調用處理 復制代碼 代碼如下:
 <?php 

 class Task { 
     //  
     protected $exception = null; 

     public function setException($exception) { 
         $this>exception = $exception; 
     } 

     public function run() { 
         if ($this>beforeFirstYield) { 
             $this>beforeFirstYield = false; 
             return $this>coroutine>current(); 
         } elseif ($this>exception) { 
             $retval = $this>coroutine>throw($this>exception); 
             $this>exception = null; 
             return $retval; 
         } else { 
             $retval = $this>coroutine>send($this>sendValue); 
             $this>sendValue = null; 
             return $retval; 
         } 
     } 

     //  
 }
 
現在我們已經可以在系統調用中使用異常拋出了!例如要調用 killTask讓我們在傳遞 ID 不可用時拋出一個異常 復制代碼 代碼如下:
 <?php 

 function killTask($tid) { 
     return new SystemCall( 
         function(Task $task Scheduler $scheduler) use ($tid) { 
             if ($scheduler>killTask($tid)) { 
                 $scheduler>schedule($task); 
             } else { 
                 throw new InvalidArgumentException(Invalid task ID!); 
             } 
         } 
     ); 
 }
試試看 復制代碼 代碼如下:
 <?php     
 function task() { 
     try { 
         yield killTask(); 
     } catch (Exception $e) { 
         echo Tried to kill task but failed: $e>getMessage() "n"; 
     } 
 }
 
這些代碼現在尚不能正常運作因為 stackedCoroutine 函數無法正確處理異常要修復需要做些調整 復制代碼 代碼如下:
 <?php     
 function stackedCoroutine(Generator $gen) { 
     $stack = new SplStack; 
     $exception = null; 

     for (;;) { 
         try { 
             if ($exception) { 
                 $gen>throw($exception); 
                 $exception = null; 
                 continue; 
             } 

             $value = $gen>current(); 

             if ($value instanceof Generator) { 
                 $stack>push($gen); 
                 $gen = $value; 
                 continue; 
             } 

             $isReturnValue = $value instanceof CoroutineReturnValue; 
             if (!$gen>valid() || $isReturnValue) { 
                 if ($stack>isEmpty()) { 
                     return; 
                 } 

                 $gen = $stack>pop(); 
                 $gen>send($isReturnValue ? $value>getValue() : NULL); 
                 continue; 
             } 

             try { 
                 $sendValue = (yield $gen>key() => $value); 
             } catch (Exception $e) { 
                 $gen>throw($e); 
                 continue; 
             } 

             $gen>send($sendValue); 
         } catch (Exception $e) { 
             if ($stack>isEmpty()) { 
                 throw $e; 
             } 

             $gen = $stack>pop(); 
             $exception = $e; 
         } 
     } 
 }
 

結束語

在 這篇文章裡我使用多任務協作構建了一個任務調度器其中包括執行“系統調用”做非阻塞操作和處理錯誤所有這些裡真正很酷的事情是任務的結果代碼看起 來完全同步甚至任務正在執行大量的異步操作的時候也是這樣如果你打算從套接口讀取數據的話你將不需要傳遞某個回調函數或者注冊一個事件偵聽器相 反你只要書寫yield $socket>read()這兒大部分都是你常常也要編寫的只在它的前面增加yield
當我第一次 聽到所有這一切的時候我發現這個概念完全令人折服而且正是這個激勵我在PHP中實現了它同時我發現協程真正令人心慌在令人敬畏的代碼和很大一堆代 碼之間只有單薄的一行我認為協程正好處在這一行上講講使用上面所述的方法書寫異步代碼是否真的有益對我來說很難
無論如何我認為這是一個有趣的話題而且我希望你也能找到它的樂趣歡迎評論:)
From:http://tw.wingwit.com/Article/program/PHP/201311/21125.html
  • 上一篇文章:

  • 下一篇文章:
  • 推薦文章
    Copyright © 2005-2022 電腦知識網 Computer Knowledge   All rights reserved.