国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 語言 > PHP > 正文

PHP使用Beanstalkd實例詳解

2024-09-04 11:49:50
字體:
供稿:網(wǎng)友

有關(guān)Beanstalkd的基本概念,編譯和yum的安裝方法已經(jīng)在上篇文章《Beanstalkd消息/任務(wù)隊列的詳解》中介紹了,今天練習(xí)下PHP使用Beanstalkd的過程,我選擇的是使用Pheanstalk類來連接Beanstalkd

1.使用Composer安裝Pheanstalk

composer require pda/pheanstalk

2.實現(xiàn)代碼

php查看beanstalkd狀態(tài)腳本Status.php

  1. <?php 
  2.  
  3. /** 
  4.  
  5.  * Created by PhpStorm. 
  6.  
  7.  * User: jmsite.cn 
  8.  
  9.  * Date: 2019/1/21 
  10.  
  11.  * Time: 10:32 
  12.  
  13.  */ 
  14.  
  15. require "../vendor/autoload.php"
  16.  
  17. use Pheanstalk/Pheanstalk; 
  18.  
  19. $pheanstalk = new Pheanstalk('192.168.75.135',11300); 
  20.  
  21. print_r($pheanstalk->stats()); 

生產(chǎn)者代碼Producter.php

  1. <?php 
  2.  
  3. /** 
  4.  
  5.  * Created by PhpStorm. 
  6.  
  7.  * User: jmsite.cn 
  8.  
  9.  * Date: 2019/1/20 
  10.  
  11.  * Time: 16:30 
  12.  
  13.  */ 
  14.  
  15. require "../vendor/autoload.php"
  16.  
  17. use Pheanstalk/Pheanstalk; 
  18.  
  19. $pheanstalk = new Pheanstalk('192.168.75.135',11300); 
  20.  
  21. for ($i=0;$i<50;$i++){ 
  22.  
  23.     $data = array
  24.  
  25.         'key' => 'testkey'.$i
  26.  
  27.         'value' => 'testvalue'
  28.  
  29.         'time' => time(), 
  30. //Vevb.com 
  31.     ); 
  32.  
  33.     $ret = $pheanstalk->putInTube('test-tube', json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR); 
  34.  
  35.     var_dump($ret); 
  36.  

消費者代碼Consumer.php

  1. <?php 
  2.  
  3. /** 
  4.  
  5.  * Created by PhpStorm. 
  6.  
  7.  * User: jmsite.cn 
  8.  
  9.  * Date: 2019/1/20 
  10.  
  11.  * Time: 16:31 
  12.  
  13.  */ 
  14.  
  15. set_time_limit(0); 
  16.  
  17. ini_set('default_socket_timeout', 900); 
  18.  
  19. require "../vendor/autoload.php"
  20.  
  21. use Pheanstalk/Pheanstalk; 
  22.  
  23. $pheanstalk = new Pheanstalk('192.168.75.135',11300); 
  24.  
  25. while (true){ 
  26.  
  27.     $job = $pheanstalk 
  28.  
  29.         ->watch('test-tube'
  30.  
  31.         ->ignore('default'
  32.  
  33.         ->reserve(); 
  34.  
  35.     if ($job){ 
  36.  
  37.         sleep(2); 
  38.  
  39.         echo $job->getData(); 
  40.  
  41.         echo "/n"
  42.  
  43.         $pheanstalk->delete($job); 
  44.  
  45.     } 
  46.  

打開命令行/終端窗口,執(zhí)行生產(chǎn)者,會向tube寫入50條任務(wù)

  1. PS E:/repository/work/beanstalk> php ./Producter.php 
  2.  
  3. int(101) 
  4.  
  5. int(102) 
  6.  
  7. int(103) 
  8.  
  9. int(104) 
  10.  
  11. int(105) 
  12.  
  13. int(106) 
  14.  
  15. int(107) 
  16.  
  17. int(108) 
  18.  
  19. int(109) 
  20.  
  21. int(110) 
  22.  
  23. int(111) 
  24.  
  25. int(112) 
  26.  
  27. int(113) 
  28.  
  29. int(114) 
  30.  
  31. ...... 

由此可見,$pheanstalk->putInTube成功后返回的是job的id

查看狀態(tài)

  1. PS E:/repository/work/beanstalk> php Status.php 
  2.  
  3. Pheanstalk/Response/ArrayResponse Object 
  4.  
  5.  
  6.     [_name:Pheanstalk/Response/ArrayResponse:private] => OK 
  7.  
  8.     [storage:ArrayObject:private] => Array 
  9.  
  10.         ( 
  11.  
  12.             [current-jobs-urgent] => 0 
  13.  
  14.             [current-jobs-ready] => 50 
  15.  
  16.             [current-jobs-reserved] => 0 
  17.  
  18.             [current-jobs-delayed] => 0 
  19.  
  20.             [current-jobs-buried] => 0 
  21.  
  22.             ...... 

結(jié)果中顯示處于ready待讀取狀態(tài)的job是50個

打開兩個或以上命令行/終端窗口,執(zhí)行消費者,模擬多消費者競爭

消費者1

  1. PS E:/repository/work/beanstalk> php ./Consumer.php 
  2.  
  3. {"key":"testkey0","value":"testvalue","time":1548039103} 
  4.  
  5. {"key":"testkey1","value":"testvalue","time":1548039103} 
  6.  
  7. {"key":"testkey2","value":"testvalue","time":1548039103} 
  8.  
  9. {"key":"testkey4","value":"testvalue","time":1548039103} 
  10.  
  11. {"key":"testkey6","value":"testvalue","time":1548039103} 
  12.  
  13. {"key":"testkey8","value":"testvalue","time":1548039103} 
  14.  
  15. {"key":"testkey10","value":"testvalue","time":1548039103} 
  16.  
  17. {"key":"testkey12","value":"testvalue","time":1548039103} 
  18.  
  19. {"key":"testkey14","value":"testvalue","time":1548039103} 
  20.  
  21. {"key":"testkey16","value":"testvalue","time":1548039103} 
  22.  
  23. {"key":"testkey18","value":"testvalue","time":1548039103} 
  24.  
  25. {"key":"testkey20","value":"testvalue","time":1548039103} 
  26.  
  27. {"key":"testkey22","value":"testvalue","time":1548039103} 
  28.  
  29. {"key":"testkey24","value":"testvalue","time":1548039103} 
  30.  
  31. {"key":"testkey26","value":"testvalue","time":1548039103} 
  32.  
  33. {"key":"testkey28","value":"testvalue","time":1548039103} 
  34.  
  35. {"key":"testkey30","value":"testvalue","time":1548039103} 
  36.  
  37. {"key":"testkey32","value":"testvalue","time":1548039103} 
  38.  
  39. {"key":"testkey34","value":"testvalue","time":1548039103} 
  40.  
  41. {"key":"testkey36","value":"testvalue","time":1548039103} 
  42.  
  43. {"key":"testkey38","value":"testvalue","time":1548039103} 
  44.  
  45. {"key":"testkey40","value":"testvalue","time":1548039103} 
  46.  
  47. {"key":"testkey42","value":"testvalue","time":1548039103} 
  48.  
  49. {"key":"testkey44","value":"testvalue","time":1548039103} 
  50.  
  51. {"key":"testkey46","value":"testvalue","time":1548039103} 
  52.  
  53. {"key":"testkey48","value":"testvalue","time":1548039103} 

消費者2

  1. PS E:/repository/work/beanstalk> php ./Consumer.php 
  2.  
  3. {"key":"testkey3","value":"testvalue","time":1548039103} 
  4.  
  5. {"key":"testkey5","value":"testvalue","time":1548039103} 
  6.  
  7. {"key":"testkey7","value":"testvalue","time":1548039103} 
  8.  
  9. {"key":"testkey9","value":"testvalue","time":1548039103} 
  10.  
  11. {"key":"testkey11","value":"testvalue","time":1548039103} 
  12.  
  13. {"key":"testkey13","value":"testvalue","time":1548039103} 
  14.  
  15. {"key":"testkey15","value":"testvalue","time":1548039103} 
  16.  
  17. {"key":"testkey17","value":"testvalue","time":1548039103} 
  18.  
  19. {"key":"testkey19","value":"testvalue","time":1548039103} 
  20.  
  21. {"key":"testkey21","value":"testvalue","time":1548039103} 
  22.  
  23. {"key":"testkey23","value":"testvalue","time":1548039103} 
  24.  
  25. {"key":"testkey25","value":"testvalue","time":1548039103} 
  26.  
  27. {"key":"testkey27","value":"testvalue","time":1548039103} 
  28.  
  29. {"key":"testkey29","value":"testvalue","time":1548039103} 
  30.  
  31. {"key":"testkey31","value":"testvalue","time":1548039103} 
  32.  
  33. {"key":"testkey33","value":"testvalue","time":1548039103} 
  34.  
  35. {"key":"testkey35","value":"testvalue","time":1548039103} 
  36.  
  37. {"key":"testkey37","value":"testvalue","time":1548039103} 
  38.  
  39. {"key":"testkey39","value":"testvalue","time":1548039103} 
  40.  
  41. {"key":"testkey41","value":"testvalue","time":1548039103} 
  42.  
  43. {"key":"testkey43","value":"testvalue","time":1548039103} 
  44.  
  45. {"key":"testkey45","value":"testvalue","time":1548039103} 
  46.  
  47. {"key":"testkey47","value":"testvalue","time":1548039103} 
  48.  
  49. {"key":"testkey49","value":"testvalue","time":1548039103} 

兩個消費者競爭著完成了全部任務(wù),由于我的beanstalkd啟動時開啟了binlog持久,所以beanstalkd重啟后任務(wù)也不會丟失

3.需要注意的事項

1.創(chuàng)建job時,設(shè)置的超時時間Pheanstalk::DEFAULT_TTR一定要比消費者處理一個job的時間要長,否則job在超時之后會被tube更改為ready狀態(tài),被其他消費者獲取,而此時當前消費者還在處理該job,這就出現(xiàn)了一個job被多個消費者重復(fù)執(zhí)行的可怕現(xiàn)象

2.Pheanstalk的維護者發(fā)生了變化,在新版的Pheanstalk中是不支持長連接的,當客戶端socket連接服務(wù)器時間超過php.ini中設(shè)置的default_socket_timeout時,如果未能從服務(wù)端tube獲得job,連接將會被斷開,所以消費者進程需要維護,以便在退出后可以重新開啟進程,推薦使用supervisord維護消費者進程。

判斷socket超時的代碼

  1. public function getLine($length = null) 
  2.  
  3.     { 
  4.  
  5.         $timeout = ini_get('default_socket_timeout'); 
  6.  
  7.         $timer   = microtime(true); 
  8.  
  9.         do { 
  10.  
  11.             $data = isset($length) ? 
  12.  
  13.                 $this->_wrapper()->fgets($this->_socket, $length) : 
  14.  
  15.                 $this->_wrapper()->fgets($this->_socket); 
  16.  
  17.             if ($this->_wrapper()->feof($this->_socket)) { 
  18.  
  19.                 throw new Exception/SocketException('Socket closed by server!'); 
  20.  
  21.             } 
  22.  
  23.             if (($data === false) && microtime(true) - $timer > $timeout) { 
  24.  
  25.                 $this->disconnect(); 
  26.  
  27.                 throw new Exception/SocketException('Socket timed out!'); 
  28.  
  29.             } 
  30.  
  31.         } while ($data === false); 
  32.  
  33.         return rtrim($data); 
  34.  
  35.     } 

發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 顺昌县| 元朗区| 邯郸市| 石门县| 临潭县| 涪陵区| 新巴尔虎右旗| 泸水县| 苏尼特左旗| 元谋县| 宜兰县| 玛曲县| 荣昌县| 明溪县| 昂仁县| 从化市| 如东县| 波密县| 南京市| 长沙市| 鞍山市| 康保县| 丰顺县| 淮南市| 宜丰县| 新巴尔虎左旗| 水城县| 搜索| 安新县| 桦甸市| 吉木萨尔县| 万安县| 温州市| 隆尧县| 铁力市| 西宁市| 台湾省| 简阳市| 台湾省| 克拉玛依市| 奈曼旗|