php5.5一個(gè)比較好的新功能是加入了對迭代生成器和協(xié)程的支持。對于生成器,PHP的文檔和各種其他的博客文章已經(jīng)有了非常詳細(xì)的講解。協(xié)程相對受到的關(guān)注就少了,因?yàn)閰f(xié)程雖然有很強(qiáng)大的功能但相對比較復(fù)雜, 也比較難被理解,解釋起來也比較困難。 這篇文章將嘗試通過介紹如何使用協(xié)程來實(shí)施任務(wù)調(diào)度, 來解釋在PHP中的協(xié)程。
迭代生成器 生成器也是一個(gè)函數(shù),不同的是這個(gè)函數(shù)的返回值是依次輸出,而不是只返回一個(gè)單獨(dú)的值。或者,換句話說,生成器使你更方便的實(shí)現(xiàn)了迭代器接口。下面通過實(shí)現(xiàn)一個(gè)xrange函數(shù)來簡單說明:
<?phpfunction xrange($start, $end, $step = 1) { for ($i = $start; $i <= $end; $i += $step) { yield $i; }}foreach (xrange(1, 1000000) as $num) { echo $num, "/n";}上面這個(gè)xrange()函數(shù)提供了和PHP的內(nèi)建函數(shù)range()一樣的功能。但是不同的是range()函數(shù)返回的是一個(gè)包含屬組值從1到100萬的數(shù)組(注:請查看手冊)。而xrange()函數(shù)返回的是依次輸出這些值的一個(gè)迭代器,而且并不會真正以數(shù)組形式返回. 這種方法的優(yōu)點(diǎn)是顯而易見的。它可以讓你在處理大數(shù)據(jù)集合的時(shí)候不用一次性的加載到內(nèi)存中。甚至你可以處理無限大的數(shù)據(jù)流。當(dāng)然,也可以不同通過生成器來實(shí)現(xiàn)這個(gè)功能,而是可以通過繼承Iterator接口實(shí)現(xiàn)。但通過使用生成器實(shí)現(xiàn)起來會更方便,不用再去實(shí)現(xiàn)iterator接口中的5個(gè)方法了。生成器為可中斷的函數(shù) 要從生成器認(rèn)識協(xié)程,理解它們內(nèi)部是如何工作是非常重要的: 生成器是一種可中斷的函數(shù),在它里面,yield構(gòu)成了中斷點(diǎn)。 緊接著上面的例子,如果你調(diào)用xrange(1,1000000)的話,xrange()函數(shù)里代碼其實(shí)并沒有真正地運(yùn)行。相反,PHP只是返回了一個(gè)實(shí)現(xiàn)了迭代器接口的生成器類實(shí)例:
<?php $range = xrange(1, 1000000); var_dump($range); // object(Generator)#1 var_dump($range instanceof Iterator); // bool(true)你對對象調(diào)用迭代器方法一次,其中的代碼運(yùn)行一次。例如,如果你調(diào)用$range->rewind(), 那么xrange()里的代碼就會運(yùn)行到控制流第一次出現(xiàn)yield的地方。而函數(shù)內(nèi)傳遞給yield語句的返回值可以通過$range->current()獲取。 為了繼續(xù)執(zhí)行生成器中的代碼,你必須調(diào)用$range->next()方法。這將再次啟動(dòng)生成器,直到下一次yield語句出現(xiàn)。因此,連續(xù)調(diào)用next()和current()方法 你將能從生成器里獲得所有的值,直到再?zèng)]有yield語句出現(xiàn)。對xrange()來說,這種情形出現(xiàn)在$i超過$end時(shí)。在這中情況下, 控制流將到達(dá)函數(shù)的終點(diǎn),因此將不執(zhí)行任何代碼。一旦這種情況發(fā)生,vaild()方法將返回假,這時(shí)迭代結(jié)束。 協(xié)程 協(xié)程給上面功能添加的主要功能就是回送數(shù)據(jù)給生成器的能力(調(diào)用者發(fā)送數(shù)據(jù)給被調(diào)用的生成器函數(shù))。這就把生成器到調(diào)用者的單向通信轉(zhuǎn)變?yōu)閮烧咧g的雙向通信。 你可以調(diào)用生成器的send()方法傳遞數(shù)據(jù)給協(xié)程。下面的logger()協(xié)程是這種通信如何運(yùn)行的例子:
<?phpfunction logger($fileName) { $fileHandle = fopen($fileName, 'a'); while (true) { fwrite($fileHandle, yield . "/n"); }}$logger = logger(__DIR__ . '/log');$logger->send('Foo');$logger->send('Bar')正如你能看到,這兒yield沒有作為一個(gè)語句來使用,而是用作一個(gè)表達(dá)式, 即它能被演變成一個(gè)值。這個(gè)值就是調(diào)用者傳遞給send()方法的值。 在這個(gè)例子里,yield表達(dá)式將首先被”Foo”替代寫入Log, 然后被”Bar”替代寫入Log。 上面的例子里yield僅作為接收者。但它其實(shí)既可接收也可發(fā)送。接收和發(fā)送通信如何進(jìn)行的例子如下:
<?phpfunction gen() { $ret = (yield 'yield1'); var_dump($ret); $ret = (yield 'yield2'); var_dump($ret);}$gen = gen();var_dump($gen->current()); // string(6) "yield1"var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen) // string(6) "yield2" (the var_dump of the ->send() return value)var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen) // NULL (the return value of ->send())要馬上理解輸出的精確順序有點(diǎn)困難,因此確定你知道為什按照這種方式輸出。我要特別指出的有兩點(diǎn): 第一點(diǎn),yield表達(dá)式兩邊的括號在PHP7以前不是可選的, 也就是說在PHP5.5和PHP5.6中圓括號是必須的。 第二點(diǎn),你可能已經(jīng)注意到調(diào)用current()之前沒有調(diào)用rewind()。這是因?yàn)樯傻鷮ο蟮臅r(shí)候已經(jīng)隱含地執(zhí)行了rewind操作。 多任務(wù)協(xié)作 如果閱讀了上面的logger()例子,你也許會疑惑“為了雙向通信我為什么要使用協(xié)程呢?我完全可以使用過程的方法實(shí)現(xiàn)同樣的功能啊?”, 是的, 你是對的, 但上面的例子只是為了演示了基本用法,這個(gè)例子其實(shí)并沒有真正的展示出使用協(xié)程的優(yōu)點(diǎn)。 正如上面介紹里提到的,協(xié)程是非常強(qiáng)大的概念,不過卻應(yīng)用的很稀少而且常常十分復(fù)雜。要給出一些簡單而真實(shí)的例子很難。 在這篇文章里,我決定去做的是使用協(xié)程實(shí)現(xiàn)多任務(wù)協(xié)作。我們要解決的問題是你想并發(fā)地運(yùn)行多任務(wù)(或者“程序”)。不過我們都知道CPU在一個(gè)時(shí)刻只能運(yùn)行一個(gè)任務(wù)(不考慮多核的情況)。因此處理器需要在不同的任務(wù)之間進(jìn)行切換,而且總是讓每個(gè)任務(wù)運(yùn)行 “一小會兒”。 多任務(wù)協(xié)作這個(gè)術(shù)語中的“協(xié)作”說明了如何進(jìn)行這種切換的:它要求當(dāng)前正在運(yùn)行的任務(wù)自動(dòng)把控制傳回給調(diào)度器,這樣就可以運(yùn)行其他任務(wù)了。這與“搶占”多任務(wù)相反,搶占多任務(wù)是這樣的:調(diào)度器可以中斷運(yùn)行了一段時(shí)間的任務(wù),不管它喜歡還是不喜歡。協(xié)作多任務(wù)在Windows的早期版本(windows95)和Mac OS中有使用,不過它們后來都切換到使用搶先多任務(wù)了。理由相當(dāng)明確:如果你依靠程序自動(dòng)交出控制的話,那么一些設(shè)計(jì)有問題的軟件將很容易為自身占用整個(gè)CPU,不與其他任務(wù)共享。 現(xiàn)在你應(yīng)當(dāng)明白協(xié)程和任務(wù)調(diào)度之間的聯(lián)系:yield指令提供了任務(wù)中斷自身的一種方法,然后把控制交回給任務(wù)調(diào)度器。因此協(xié)程可以運(yùn)行多個(gè)其他任務(wù)。更進(jìn)一步來說,yield可以用來在任務(wù)和調(diào)度器之間進(jìn)行通信。 我們的目的是 對 “任務(wù)”用更輕量級的包裝的協(xié)程函數(shù):
<?phpclass Task { PRotected $taskId; protected $coroutine; protected $sendValue = null; protected $beforeFirstYield = true; public function __construct($taskId, Generator $coroutine) { $this->taskId = $taskId; $this->coroutine = $coroutine; } public function getTaskId() { return $this->taskId; } public function setSendValue($sendValue) { $this->sendValue = $sendValue; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } public function isFinished() { return !$this->coroutine->valid(); }}一個(gè)任務(wù)是用任務(wù)ID標(biāo)記的一個(gè)協(xié)程。使用setSendValue()方法,你可以指定哪些值將被發(fā)送到下次的恢復(fù)(在之后你會了解到我們需要這個(gè))。 run()函數(shù)確實(shí)沒有做什么,除了調(diào)用send()方法的協(xié)同程序。要理解為什么添加beforeFirstYieldflag,需要考慮下面的代碼片段:
<?phpfunction gen() { yield 'foo'; yield 'bar';}$gen = gen();var_dump($gen->send('something'));// As the send() happens before the first yield there is an implicit rewind() call,// so what really happens is this:$gen->rewind();var_dump($gen->send('something'));// The rewind() will advance to the first yield (and ignore its value), the send() will// advance to the second yield (and dump its value). Thus we loose the first yielded value!通過添加 beforeFirstYieldcondition 我們可以確定 first yield 的值 被返回。 調(diào)度器現(xiàn)在不得不比多任務(wù)循環(huán)要做稍微多點(diǎn)了,然后才運(yùn)行多任務(wù):
<?phpclass Scheduler { protected $maxTaskId = 0; protected $taskMap = []; // taskId => task protected $taskQueue; public function __construct() { $this->taskQueue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule($task); return $tid; } public function schedule(Task $task) { $this->taskQueue->enqueue($task); } public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $task->run(); if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } }}?>newTask()方法(使用下一個(gè)空閑的任務(wù)id)創(chuàng)建一個(gè)新任務(wù),然后把這個(gè)任務(wù)放入任務(wù)map數(shù)組里。接著它通過把任務(wù)放入任務(wù)隊(duì)列里來實(shí)現(xiàn)對任務(wù)的調(diào)度。接著run()方法掃描任務(wù)隊(duì)列,運(yùn)行任務(wù)。如果一個(gè)任務(wù)結(jié)束了,那么它將從隊(duì)列里刪除,否則它將在隊(duì)列的末尾再次被調(diào)度。 讓我們看看下面具有兩個(gè)簡單(并且沒有什么意義)任務(wù)的調(diào)度器:
<?phpfunction task1() { for ($i = 1; $i <= 10; ++$i) { echo "This is task 1 iteration $i./n"; yield; }}function task2() { for ($i = 1; $i <= 5; ++$i) { echo "This is task 2 iteration $i./n"; yield; }}$scheduler = new Scheduler;$scheduler->newTask(task1());$scheduler->newTask(task2());$scheduler->run();兩個(gè)任務(wù)都僅僅回顯一條信息,然后使用yield把控制回傳給調(diào)度器。輸出結(jié)果如下: This is task 1 iteration 1. This is task 2 iteration 1. This is task 1 iteration 2. This is task 2 iteration 2. This is task 1 iteration 3. This is task 2 iteration 3. This is task 1 iteration 4. This is task 2 iteration 4. This is task 1 iteration 5. This is task 2 iteration 5. This is task 1 iteration 6. This is task 1 iteration 7. This is task 1 iteration 8. This is task 1 iteration 9. This is task 1 iteration 10. 輸出確實(shí)如我們所期望的:對前五個(gè)迭代來說,兩個(gè)任務(wù)是交替運(yùn)行的,接著第二個(gè)任務(wù)結(jié)束后,只有第一個(gè)任務(wù)繼續(xù)運(yùn)行。 與調(diào)度器之間通信 既然調(diào)度器已經(jīng)運(yùn)行了,那么我們來看下一項(xiàng):任務(wù)和調(diào)度器之間的通信。 我們將使用進(jìn)程用來和操作系統(tǒng)會話的同樣的方式來通信:系統(tǒng)調(diào)用。我們需要系統(tǒng)調(diào)用的理由是操作系統(tǒng)與進(jìn)程相比它處在不同的權(quán)限級別上。因此為了執(zhí)行特權(quán)級別的操作(如殺死另一個(gè)進(jìn)程),就不得不以某種方式把控制傳回給內(nèi)核,這樣內(nèi)核就可以執(zhí)行所說的操作了。再說一遍,這種行為在內(nèi)部是通過使用中斷指令來實(shí)現(xiàn)的。過去使用的是通用的int指令,如今使用的是更特殊并且更快速的syscall/sysenter指令。 我們的任務(wù)調(diào)度系統(tǒng)將反映這種設(shè)計(jì):不是簡單地把調(diào)度器傳遞給任務(wù)(這樣就允許它做它想做的任何事),我們將通過給yield表達(dá)式傳遞信息來與系統(tǒng)調(diào)用通信。這兒yield即是中斷,也是傳遞信息給調(diào)度器(和從調(diào)度器傳遞出信息)的方法。 為了說明系統(tǒng)調(diào)用,我將對可調(diào)用的系統(tǒng)調(diào)用做一個(gè)小小的封裝:
<?phpclass SystemCall { protected $callback; public function __construct(callable $callback) { $this->callback = $callback; } public function __invoke(Task $task, Scheduler $scheduler) { $callback = $this->callback; // Can't call it directly in PHP :/ return $callback($task, $scheduler); }}它將像其他任何可調(diào)用那樣(使用_invoke)運(yùn)行,不過它要求調(diào)度器把正在調(diào)用的任務(wù)和自身傳遞給這個(gè)函數(shù)。為了解決這個(gè)問題我們不得不微微的修改調(diào)度器的run方法:
<?phppublic function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $retval = $task->run(); if ($retval instanceof SystemCall) { $retval($task, $this); continue; } if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } }}第一個(gè)系統(tǒng)調(diào)用除了返回任務(wù)ID外什么都沒有做:
<?phpfunction getTaskId() { return new SystemCall(function(Task $task, Scheduler $scheduler) { $task->setSendValue($task->getTaskId()); $scheduler->schedule($task); });}這個(gè)函數(shù)設(shè)置任務(wù)id為下一次發(fā)送的值,并再次調(diào)度了這個(gè)任務(wù)。由于使用了系統(tǒng)調(diào)用,所以調(diào)度器不能自動(dòng)調(diào)用任務(wù),我們需要手工調(diào)度任務(wù)(稍后你將明白為什么這么做)。要使用這個(gè)新的系統(tǒng)調(diào)用的話,我們要重新編寫以前的例子:
<?phpfunction task($max) { $tid = (yield getTaskId()); // <-- here's the syscall! for ($i = 1; $i <= $max; ++$i) { echo "This is task $tid iteration $i./n"; yield; }}$scheduler = new Scheduler;$scheduler->newTask(task(10));$scheduler->newTask(task(5));$scheduler->run();這段代碼將給出與前一個(gè)例子相同的輸出。注意系統(tǒng)調(diào)用同其他任何調(diào)用一樣正常地運(yùn)行,不過預(yù)先增加了yield。要?jiǎng)?chuàng)建新的任務(wù),然后再殺死它們的話,需要兩個(gè)以上的系統(tǒng)調(diào)用:
<?phpfunction newTask(Generator $coroutine) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($coroutine) { $task->setSendValue($scheduler->newTask($coroutine)); $scheduler->schedule($task); } );}function killTask($tid) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { $task->setSendValue($scheduler->killTask($tid)); $scheduler->schedule($task); } );}killTask函數(shù)需要在調(diào)度器里增加一個(gè)方法:
<?phppublic function killTask($tid) { if (!isset($this->taskMap[$tid])) { return false; } unset($this->taskMap[$tid]); // This is a bit ugly and could be optimized so it does not have to walk the queue, // but assuming that killing tasks is rather rare I won't bother with it now foreach ($this->taskQueue as $i => $task) { if ($task->getTaskId() === $tid) { unset($this->taskQueue[$i]); break; } } return true;}用來測試新功能的微腳本:
<?phpfunction childTask() { $tid = (yield getTaskId()); while (true) { echo "Child task $tid still alive!/n"; yield; }}function task() { $tid = (yield getTaskId()); $childTid = (yield newTask(childTask())); for ($i = 1; $i <= 6; ++$i) { echo "Parent task $tid iteration $i./n"; yield; if ($i == 3) yield killTask($childTid); }}$scheduler = new Scheduler;$scheduler->newTask(task());$scheduler->run();?>這段代碼將打印以下信息: Parent task 1 iteration 1. Child task 2 still alive! Parent task 1 iteration 2. Child task 2 still alive! Parent task 1 iteration 3. Child task 2 still alive! Parent task 1 iteration 4. Parent task 1 iteration 5. Parent task 1 iteration 6. 經(jīng)過三次迭代以后子任務(wù)將被殺死,因此這就是”Child is still alive”消息結(jié)束的時(shí)候。可能應(yīng)當(dāng)指出的是這不是真正的父子關(guān)系。 因?yàn)樯踔猎诟溉蝿?wù)結(jié)束后子任務(wù)仍然可以運(yùn)行。或者子任務(wù)可以殺死父任務(wù)。可以修改調(diào)度器使它具有更層級化的任務(wù)結(jié)構(gòu),不過 在這篇文章里我沒有這么做。 你可以實(shí)現(xiàn)許多進(jìn)程管理調(diào)用。例如 wait(它一直等待到任務(wù)結(jié)束運(yùn)行時(shí)),exec(它替代當(dāng)前任務(wù))和fork(它創(chuàng)建一個(gè) 當(dāng)前任務(wù)的克隆)。fork非常酷,而且你可以使用PHP的協(xié)程真正地實(shí)現(xiàn)它,因?yàn)樗鼈兌贾С挚寺 ?然而讓我們把這些留給有興趣的讀者吧,我們來看下一個(gè)議題。 非阻塞IO 很明顯,我們的任務(wù)管理系統(tǒng)的真正很酷的應(yīng)用是web服務(wù)器。它有一個(gè)任務(wù)是在套接字上偵聽是否有新連接,當(dāng)有新連接要建立的時(shí)候 ,它創(chuàng)建一個(gè)新任務(wù)來處理新連接。 web服務(wù)器最難的部分通常是像讀數(shù)據(jù)這樣的套接字操作是阻塞的。例如PHP將等待到客戶端完成發(fā)送為止。對一個(gè)WEB服務(wù)器來說,這 根本不行;這就意味著服務(wù)器在一個(gè)時(shí)間點(diǎn)上只能處理一個(gè)連接。 解決方案是確保在真正對套接字讀寫之前該套接字已經(jīng)“準(zhǔn)備就緒”。為了查找哪個(gè)套接字已經(jīng)準(zhǔn)備好讀或者寫了,可以使用 流選擇函數(shù)。 首先,讓我們添加兩個(gè)新的 syscall,它們將等待直到指定 socket 準(zhǔn)備好:
<?phpfunction waitForRead($socket) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($socket) { $scheduler->waitForRead($socket, $task); } );}function waitForWrite($socket) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($socket) { $scheduler->waitForWrite($socket, $task); } );}這些 syscall 只是在調(diào)度器中代理其各自的方法:
<?php// resourceID => [socket, tasks]protected $waitingForRead = [];protected $waitingForWrite = [];public function waitForRead($socket, Task $task) { if (isset($this->waitingForRead[(int) $socket])) { $this->waitingForRead[(int) $socket][1][] = $task; } else { $this->waitingForRead[(int) $socket] = [$socket, [$task]]; }}public function waitForWrite($socket, Task $task) { if (isset($this->waitingForWrite[(int) $socket])) { $this->waitingForWrite[(int) $socket][1][] = $task; } else { $this->waitingForWrite[(int) $socket] = [$socket, [$task]]; }}waitingForRead 及 waitingForWrite 屬性是兩個(gè)承載等待的socket 及等待它們的任務(wù)的數(shù)組。有趣的部分在于下面的方法,它將檢查 socket 是否可用,并重新安排各自任務(wù):
<?phpprotected function ioPoll($timeout) { $rSocks = []; foreach ($this->waitingForRead as list($socket)) { $rSocks[] = $socket; } $wSocks = []; foreach ($this->waitingForWrite as list($socket)) { $wSocks[] = $socket; } $eSocks = []; // dummy if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) { return; } foreach ($rSocks as $socket) { list(, $tasks) = $this->waitingForRead[(int) $socket]; unset($this->waitingForRead[(int) $socket]); foreach ($tasks as $task) { $this->schedule($task); } } foreach ($wSocks as $socket) { list(, $tasks) = $this->waitingForWrite[(int) $socket]; unset($this->waitingForWrite[(int) $socket]); foreach ($tasks as $task) { $this->schedule($task); } }}stream_select 函數(shù)接受承載讀取、寫入以及待檢查的socket的數(shù)組(我們無需考慮最后一類)。數(shù)組將按引用傳遞,函數(shù)只會保留那些狀態(tài)改變了的數(shù)組元素。我們可以遍歷這些數(shù)組,并重新安排與之相關(guān)的任務(wù)。 為了正常地執(zhí)行上面的輪詢動(dòng)作,我們將在調(diào)度器里增加一個(gè)特殊的任務(wù):
<?phpprotected function ioPollTask() { while (true) { if ($this->taskQueue->isEmpty()) { $this->ioPoll(null); } else { $this->ioPoll(0); } yield; }}需要在某個(gè)地方注冊這個(gè)任務(wù),例如,你可以在run()方法的開始增加
;
這段代碼將接收到localhost:8000上的連接,然后僅僅返回發(fā)送來的內(nèi)容作為HTTP響應(yīng)。要做“實(shí)際”的事情的話就愛哪個(gè)非常復(fù)雜(處理 HTTP請求可能已經(jīng)超出了這篇文章的范圍)。上面的代碼片段只是演示了一般性的概念。 你可以使用類似于ab -n 10000 -c 100 localhost:8000/這樣命令來測試服務(wù)器。這條命令將向服務(wù)器發(fā)送10000個(gè)請求,并且其中100個(gè)請求將同時(shí)到達(dá)。使用這樣的數(shù)目,我得到了處于中間的10毫秒的響應(yīng)時(shí)間。不過還有一個(gè)問題:有少數(shù)幾個(gè)請求真正處理的很慢(如5秒), 這就是為什么總吞吐量只有2000請求/秒(如果是10毫秒的響應(yīng)時(shí)間的話,總的吞吐量應(yīng)該更像是10000請求/秒 協(xié)程堆棧 如果你試圖用我們的調(diào)度系統(tǒng)建立更大的系統(tǒng)的話,你將很快遇到問題:我們習(xí)慣了把代碼分解為更小的函數(shù),然后調(diào)用它們。然而, 如果使用了協(xié)程的話,就不能這么做了。例如,看下面代碼:
<?phpfunction echoTimes($msg, $max) { for ($i = 1; $i <= $max; ++$i) { echo "$msg iteration $i/n"; yield; }}function task() { echoTimes('foo', 10); // print foo ten times echo "---/n"; echoTimes('bar', 5); // print bar five times yield; // force it to be a coroutine}$scheduler = new Scheduler;$scheduler->newTask(task());$scheduler->run();這段代碼試圖把重復(fù)循環(huán)“輸出n次“的代碼嵌入到一個(gè)獨(dú)立的協(xié)程里,然后從主任務(wù)里調(diào)用它。然而它無法運(yùn)行。正如在這篇文章的開始 所提到的,調(diào)用生成器(或者協(xié)程)將沒有真正地做任何事情,它僅僅返回一個(gè)對象。這也出現(xiàn)在上面的例子里。echoTimes調(diào)用除了放回一個(gè)(無用的)協(xié)程對象外不做任何事情。 為了仍然允許這么做,我們需要在這個(gè)裸協(xié)程上寫一個(gè)小小的封裝。我們將調(diào)用它:“協(xié)程堆棧”。因?yàn)樗鼘⒐芾砬短椎膮f(xié)程調(diào)用堆棧。 這將是通過生成協(xié)程來調(diào)用子協(xié)程成為可能:
為了把協(xié)程轉(zhuǎn)變?yōu)閰f(xié)程堆棧(它支持子調(diào)用),我們將不得不編寫另外一個(gè)函數(shù)(很明顯,它是另一個(gè)協(xié)程):
<?phpfunction stackedCoroutine(Generator $gen) { $stack = new SplStack; for (;;) { $value = $gen->current(); if ($value instanceof Generator) { $stack->push($gen); $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty()) { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue() : NULL); continue; } $gen->send(yield $gen->key() => $value); }}這個(gè)函數(shù)在調(diào)用者和當(dāng)前正在運(yùn)行的子協(xié)程之間扮演著簡單代理的角色。在
現(xiàn)在服務(wù)器可以編寫的稍微簡潔點(diǎn)了:
<?phpfunction server($port) { echo "Starting server at port $port.../n"; $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if (!$socket) throw new Exception($errStr, $errNo); stream_set_blocking($socket, 0); $socket = new CoSocket($socket); while (true) { yield newTask( handleClient(yield $socket->accept()) ); }}function handleClient($socket) { $data = (yield $socket->read(8192)); $msg = "Received following request:/n/n$data"; $msgLength = strlen($msg); $response = <<<RESHTTP/1.1 200 OK/rContent-Type: text/plain/rContent-Length: $msgLength/rConnection: close/r/r$msgRES; yield $socket->write($response); yield $socket->close();}錯(cuò)誤處理 作為一個(gè)優(yōu)秀的程序員,相信你已經(jīng)察覺到上面的例子缺少錯(cuò)誤處理。幾乎所有的 socket 都是易出錯(cuò)的。我這樣做的原因一方面固然是因?yàn)殄e(cuò)誤處理的乏味(特別是 socket!),另一方面也在于它很容易使代碼體積膨脹。 不過,我仍然了一講一下常見的協(xié)程錯(cuò)誤處理:協(xié)程允許使用 throw() 方法在其內(nèi)部拋出一個(gè)錯(cuò)誤。盡管此方法還未在 PHP 中實(shí)現(xiàn),但我很快就會提交它,就在今天。 throw() 方法接受一個(gè) Exception,并將其拋出到協(xié)程的當(dāng)前懸掛點(diǎn),看看下面代碼:
<?phpfunction gen() { echo "Foo/n"; try { yield; } catch (Exception $e) { echo "Exception: {$e->getMessage()}/n"; } echo "Bar/n";}$gen = gen();$gen->rewind(); // echos "Foo"$gen->throw(new Exception('Test')); // echos "Exception: Test" // and "Bar"這非常棒,因?yàn)槲覀兛梢允褂孟到y(tǒng)調(diào)用以及子協(xié)程調(diào)用異常拋出。對與系統(tǒng)調(diào)用,Scheduler::run() 方法需要一些小調(diào)整:
<?phpif ($retval instanceof SystemCall) { try { $retval($task, $this); } catch (Exception $e) { $task->setException($e); $this->schedule($task); } continue;}Task 類也許要添加 throw 調(diào)用處理:
<?phpclass Task { // ... protected $exception = null; public function setException($exception) { $this->exception = $exception; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } elseif ($this->exception) { $retval = $this->coroutine->throw($this->exception); $this->exception = null; return $retval; } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } // ...}現(xiàn)在,我們已經(jīng)可以在系統(tǒng)調(diào)用中使用異常拋出了!例如,要調(diào)用 killTask,讓我們在傳遞 ID 不可用時(shí)拋出一個(gè)異常:
<?phpfunction killTask($tid) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { if ($scheduler->killTask($tid)) { $scheduler->schedule($task); } else { throw new InvalidArgumentException('Invalid task ID!'); } } );}試試看:
<?phpfunction task() { try { yield killTask(500); } catch (Exception $e) { echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "/n"; }}這些代碼現(xiàn)在尚不能正常運(yùn)作,因?yàn)?stackedCoroutine 函數(shù)無法正確處理異常。要修復(fù)需要做些調(diào)整:
<?phpfunction stackedCoroutine(Generator $gen) { $stack = new SplStack; $exception = null; for (;;) { try { if ($exception) { $gen->throw($exception); $exception = null; continue; } $value = $gen->current(); if ($value instanceof Generator) { $stack->push($gen); $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty()) { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue() : NULL); continue; } try { $sendValue = (yield $gen->key() => $value); } catch (Exception $e) { $gen->throw($e); continue; } $gen->send($sendValue); } catch (Exception $e) { if ($stack->isEmpty()) { throw $e; } $gen = $stack->pop(); $exception = $e; } }}結(jié)束語 在這篇文章里,我使用多任務(wù)協(xié)作構(gòu)建了一個(gè)任務(wù)調(diào)度器,其中包括執(zhí)行“系統(tǒng)調(diào)用”,做非阻塞操作和處理錯(cuò)誤。所有這些里真正很酷的事情是任務(wù)的結(jié)果代碼看起來完全同步,甚至任務(wù)正在執(zhí)行大量的異步操作的時(shí)候也是這樣。如果你打算從套接口讀取數(shù)據(jù)的話,你將不需要傳遞某個(gè)回調(diào)函數(shù)或者注冊一個(gè)事件偵聽器。相反,你只要書寫yield $socket->read()。這兒大部分都是你常常也要編寫的,只在它的前面增加yield。 當(dāng)我第一次聽到所有這一切的時(shí)候,我發(fā)現(xiàn)這個(gè)概念完全令人折服,而且正是這個(gè)激勵(lì)我在PHP中實(shí)現(xiàn)了它。同時(shí)我發(fā)現(xiàn)協(xié)程真正令人心慌。在令人敬畏的代碼和很大一堆代碼之間只有單薄的一行,我認(rèn)為協(xié)程正好處在這一行上。講講使用上面所述的方法書寫異步代碼是否真的有益對我來說很難。
新聞熱點(diǎn)
疑難解答