PHP
這篇文章指導你通過使用協同程序來實施任務調度
生成器
生成器最基本的思想也是一個函數
<?php
function xrange($start
for ($i = $start; $i <= $end; $i += $step) {
yield $i;
}
}
foreach (xrange(
echo $num
}
上面這個xrange()函數提供了和PHP的內建函數range()一樣的功能
這種方法的優點是顯而易見的
當然
生成器為可中斷的函數
要從生成器認識協同程序
緊接著上面的例子
<?php
$range = xrange(
var_dump($range); // object(Generator)#
var_dump($range instanceof Iterator); // bool(true)
你對某個對象調用迭代器方法一次
為了繼續執行生成器中的代碼
協程
協程給上面功能添加的主要東西是回送數據給生成器的能力
通過調用生成器的send()方法而不是其next()方法傳遞數據給協程
<?php
function logger($fileName) {
$fileHandle = fopen($fileName
while (true) {
fwrite($fileHandle
}
}
$logger = logger(__DIR__
$logger
$logger
正如你能看到
上面的例子裡yield僅作為接收者
<?php
function gen() {
$ret = (yield
var_dump($ret);
$ret = (yield
var_dump($ret);
}
$gen = gen();
var_dump($gen
var_dump($gen
// string(
var_dump($gen
// NULL (the return value of
馬上理解輸出的精確順序有點困難
多任務協作
如果閱讀了上面的logger()例子
在這篇文章裡
多任務協作這個術語中的“協作”說明了如何進行這種切換的
這個時候你應當明白協程和任務調度之間的聯系
我們的目的是 對 “任務”用更輕量級的包裝的協程函數:
<?php
class Task {
protected $taskId;
protected $coroutine;
protected $sendValue = null;
protected $beforeFirstYield = true;
public function __construct($taskId
$this
$this
}
public function getTaskId() {
return $this
}
public function setSendValue($sendValue) {
$this
}
public function run() {
if ($this
$this
return $this
} else {
$retval = $this
$this
return $retval;
}
}
public function isFinished() {
return !$this
}
}
一個任務是用 任務ID標記一個協程
<?php
function gen() {
yield
yield
}
$gen = gen();
var_dump($gen
// As the send() happens before the first yield there is an implicit rewind() call
// so what really happens is this:
$gen
var_dump($gen
// The rewind() will advance to the first yield (and ignore its value)
// advance to the second yield (and dump its value)
通過添加 beforeFirstYieldcondition 我們可以確定 first yield 的值 被返回
調度器現在不得不比多任務循環要做稍微多點了
<?php
class Scheduler {
protected $maxTaskId =
protected $taskMap = []; // taskId => task
protected $taskQueue;
public function __construct() {
$this
}
public function newTask(Generator $coroutine) {
$tid = ++$this
$task = new Task($tid
$this
$this
return $tid;
}
public function schedule(Task $task) {
$this
}
public function run() {
while (!$this
$task = $this
$task
if ($task
unset($this
} else {
$this
}
}
}
}
newTask()方法(使用下一個空閒的任務id)創建一個新任務
讓我們看看下面具有兩個簡單(並且沒有什麼意義)任務的調度器
<?php
function task
for ($i =
echo "This is task
yield;
}
}
function task
for ($i =
echo "This is task
yield;
}
}
$scheduler = new Scheduler;
$scheduler
$scheduler
$scheduler
兩個任務都僅僅回顯一條信息
This is task
This is task
This is task
This is task
This is task
This is task
This is task
This is task
This is task
This is task
This is task
This is task
This is task
This is task
This is task
輸出確實如我們所期望的
與調度器之間通信
既然調度器已經運行了
我們的任務調度系統將反映這種設計
為了說明系統調用
<?php
class SystemCall {
protected $callback;
public function __construct(callable $callback) {
$this
}
public function __invoke(Task $task
$callback = $this
return $callback($task
}
}
它將像其他任何可調用那樣(使用_invoke)運行
<?php
public function run() {
while (!$this
$task = $this
$retval = $task
if ($retval instanceof SystemCall) {
$retval($task
continue;
}
if ($task
unset($this
} else {
$this
}
}
}
第一個系統調用除了返回任務ID外什麼都沒有做
<?php
function getTaskId() {
return new SystemCall(function(Task $task
$task
$scheduler
});
}
這個函數確實設置任務id為下一次發送的值
<?php
function task($max) {
$tid = (yield getTaskId()); // <
for ($i =
echo "This is task $tid iteration $i
yield;
}
}
$scheduler = new Scheduler;
$scheduler
$scheduler
$scheduler
這段代碼將給出與前一個例子相同的輸出
<?php
function newTask(Generator $coroutine) {
return new SystemCall(
function(Task $task
$task
$scheduler
}
);
}
function killTask($tid) {
return new SystemCall(
function(Task $task
$task
$scheduler
}
);
}
killTask函數需要在調度器裡增加一個方法
<?php
public function killTask($tid) {
if (!isset($this
return false;
}
unset($this
// This is a bit ugly and could be optimized so it does not have to walk the queue
// but assuming that killing tasks is rather rare I won
foreach ($this
if ($task
unset($this
break;
}
}
return true;
}
用來測試新功能的微腳本
<?php
function childTask() {
$tid = (yield getTaskId());
while (true) {
echo "Child task $tid still alive!n";
yield;
}
}
function task() {
$tid = (yield getTaskId());
$childTid = (yield newTask(childTask()));
for ($i =
echo "Parent task $tid iteration $i
yield;
if ($i ==
}
}
$scheduler = new Scheduler;
$scheduler
$scheduler
這段代碼將打印以下信息
Parent task
Child task
Parent task
Child task
Parent task
Child task
Parent task
Parent task
Parent task
經過三次迭代以後子任務將被殺死
你可以實現許多進程管理調用
然而讓我們把這些留給有興趣的讀者吧
幾點人
翻譯於
頂 翻譯的不錯哦!
非阻塞IO
很明顯
web服務器最難的部分通常是像讀數據這樣的套接字操作是阻塞的
解決方案是確保在真正對套接字讀寫之前該套接字已經“准備就緒”
首先
<?php
function waitForRead($socket) {
return new SystemCall(
function(Task $task
$scheduler
}
);
}
function waitForWrite($socket) {
return new SystemCall(
function(Task $task
$scheduler
}
);
}
這些 syscall 只是在調度器中代理其各自的方法
<?php
// resourceID => [socket
protected $waitingForRead = [];
protected $waitingForWrite = [];
public function waitForRead($socket
if (isset($this
$this
} else {
$this
}
}
public function waitForWrite($socket
if (isset($this
$this
} else {
$this
}
}
waitingForRead 及 waitingForWrite 屬性是兩個承載等待的socket 及等待它們的任務的數組
<?php
protected function ioPoll($timeout) {
$rSocks = [];
foreach ($this
$rSocks[] = $socket;
}
$wSocks = [];
foreach ($this
$wSocks[] = $socket;
}
$eSocks = []; // dummy
if (!stream_select($rSocks
return;
}
foreach ($rSocks as $socket) {
list(
unset($this
foreach ($tasks as $task) {
$this
}
}
foreach ($wSocks as $socket) {
list(
unset($this
foreach ($tasks as $task) {
$this
}
}
}
stream_select 函數接受承載讀取
為了正常地執行上面的輪詢動作
<?php
protected function ioPollTask() {
while (true) {
if ($this
$this
} else {
$this
}
yield;
}
}
需要在某個地方注冊這個任務
只有任務隊列為空時
現在編寫服務器相對容易了
<?php
function server($port) {
echo "Starting server at port $port
$socket = @stream_socket_server("tcp://localhost:$port"
if (!$socket) throw new Exception($errStr
stream_set_blocking($socket
while (true) {
yield waitForRead($socket);
$clientSocket = stream_socket_accept($socket
yield newTask(handleClient($clientSocket));
}
}
function handleClient($socket) {
yield waitForRead($socket);
$data = fread($socket
$msg = "Received following request:nn$data";
$msgLength = strlen($msg);
$response = <<<RES
HTTP/
Content
Content
Connection: closer
r
$msg
RES;
yield waitForWrite($socket);
fwrite($socket
fclose($socket);
}
$scheduler = new Scheduler;
$scheduler
$scheduler
這段代碼將接收到localhost:
你 可以使用類似於ab
協程堆棧
如果你試圖用我們的調度系統建立更大的系統的話
<?php
function echoTimes($msg
for ($i =
echo "$msg iteration $in";
yield;
}
}
function task() {
echoTimes(
echo "
echoTimes(
yield; // force it to be a coroutine
}
$scheduler = new Scheduler;
$scheduler
$scheduler
這段代碼試圖把重復循環“輸出n次“的代碼嵌入到一個獨立的協程裡
為了仍然允許這麼做
$retval = (yield someCoroutine($foo
使用yield
yield retval("I
retval函數除了返回一個值的封裝外沒有做任何其他事情
<?php
class CoroutineReturnValue {
protected $value;
public function __construct($value) {
$this
}
public function getValue() {
return $this
}
}
function retval($value) {
return new CoroutineReturnValue($value);
}
為了把協程轉變為協程堆棧(它支持子調用)
<?php
function stackedCoroutine(Generator $gen) {
$stack = new SplStack;
for (;;) {
$value = $gen
if ($value instanceof Generator) {
$stack
$gen = $value;
continue;
}
$isReturnValue = $value instanceof CoroutineReturnValue;
if (!$gen
if ($stack
return;
}
$gen = $stack
$gen
continue;
}
$gen
}
}
這 個函數在調用者和當前正在運行的子協程之間扮演著簡單代理的角色
為了使協程堆棧在任務裡可用
現在我們可以稍微改進上面web服務器例子
<?php
class CoSocket {
protected $socket;
public function __construct($socket) {
$this
}
public function accept() {
yield waitForRead($this
yield retval(new CoSocket(stream_socket_accept($this
}
public function read($size) {
yield waitForRead($this
yield retval(fread($this
}
public function write($string) {
yield waitForWrite($this
fwrite($this
}
public function close() {
@fclose($this
}
}
現在服務器可以編寫的稍微簡潔點了
<?php
function server($port) {
echo "Starting server at port $port
$socket = @stream_socket_server("tcp://localhost:$port"
if (!$socket) throw new Exception($errStr
stream_set_blocking($socket
$socket = new CoSocket($socket);
while (true) {
yield newTask(
handleClient(yield $socket
);
}
}
function handleClient($socket) {
$data = (yield $socket
$msg = "Received following request:nn$data";
$msgLength = strlen($msg);
$response = <<<RES
HTTP/
Content
Content
Connection: closer
r
$msg
RES;
yield $socket
yield $socket
}
錯誤處理
作為一個優秀的程序員
不過
throw() 方法接受一個 Exception
<?php
function gen() {
echo "Foon";
try {
yield;
} catch (Exception $e) {
echo "Exception: {$e
}
echo "Barn";
}
$gen = gen();
$gen
$gen
// and "Bar"
這非常棒
<?php
if ($retval instanceof SystemCall) {
try {
$retval($task
} catch (Exception $e) {
$task
$this
}
continue;
}
Task 類也許要添加 throw 調用處理
<?php
class Task {
//
protected $exception = null;
public function setException($exception) {
$this
}
public function run() {
if ($this
$this
return $this
} elseif ($this
$retval = $this
$this
return $retval;
} else {
$retval = $this
$this
return $retval;
}
}
//
}
現在
<?php
function killTask($tid) {
return new SystemCall(
function(Task $task
if ($scheduler
$scheduler
} else {
throw new InvalidArgumentException(
}
}
);
}
試試看
<?php
function task() {
try {
yield killTask(
} catch (Exception $e) {
echo
}
}
這些代碼現在尚不能正常運作
<?php
function stackedCoroutine(Generator $gen) {
$stack = new SplStack;
$exception = null;
for (;;) {
try {
if ($exception) {
$gen
$exception = null;
continue;
}
$value = $gen
if ($value instanceof Generator) {
$stack
$gen = $value;
continue;
}
$isReturnValue = $value instanceof CoroutineReturnValue;
if (!$gen
if ($stack
return;
}
$gen = $stack
$gen
continue;
}
try {
$sendValue = (yield $gen
} catch (Exception $e) {
$gen
continue;
}
$gen
} catch (Exception $e) {
if ($stack
throw $e;
}
$gen = $stack
$exception = $e;
}
}
}
結束語
在 這篇文章裡
當我第一次 聽到所有這一切的時候
無論如何
From:http://tw.wingwit.com/Article/program/PHP/201311/21125.html