国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 開發 > PHP > 正文

源碼分析 Laravel 重復執行同一個隊列任務的原因

2024-05-04 21:50:18
字體:
來源:轉載
供稿:網友

前言:

laravel 的隊列服務對各種不同的后臺隊列服務提供了統一的 API。隊列允許你延遲執行消耗時間的任務,比如發送一封郵件。這樣可以有效的降低請求響應的時間。

發現問題:

在 Laravel 中使用 Redis 處理隊列任務,框架提供的功能非常強大,但是最近遇到一個問題,就是發現一個任務被多次執行,這是為什么呢?

源碼分析 Laravel 重復執行同一個隊列任務的原因

先說原因:

因為在 Laravel 中如果一個隊列(任務)執行時間大于 60 秒,就會被認為執行失敗并重新加入隊列中,這樣就會導致重復執行同一個任務。

這個任務的邏輯就是給用戶推送內容,需要根據隊列內容取出用戶并遍歷,通過請求后端 HTTP 接口發送。比如有 10000 個用戶,在用戶數量多或接口處理速度沒那么快的情況下,執行時間肯定會大于 60 秒,于是這個任務就被重新加入隊列。情況更糟糕一點,前面的任務如果都沒有在 60 秒執行完,就都會重新加入隊列,這樣同一個任務就不止重復執行一次了,而是多次。

下面從 Laravel 源代碼找一下罪魁禍首。

源代碼文件:vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php

  1. /** 
  2.  * The expiration time of a job. 
  3.  * 
  4.  * @var int|null 
  5.  */ 
  6. protected $expire = 60; 

這個 $expire 成員變量是一個固定的值,Laravel 認為一個隊列再怎么 60 秒也該執行完了吧。取隊列方法:

  1. public function pop($queue = null) 
  2.  $original = $queue ?: $this->default;  
  3.  $queue = $this->getQueue($queue);  
  4.  $this->migrateExpiredJobs($queue.':delayed'$queue);  
  5.  if (! is_null($this->expire)) { 
  6.   $this->migrateExpiredJobs($queue.':reserved'$queue); 
  7.  }  
  8.  list($job$reserved) = $this->getConnection()->eval
  9.   LuaScripts::pop(), 2, $queue$queue.':reserved'$this->getTime() + $this->expire 
  10.  ); //Vevb.com 
  11.  if ($reserved) { 
  12.   return new RedisJob($this->container, $this$job$reserved$original); 
  13.  } 

取隊列有幾步操作,因為隊列執行失敗,或執行超時等都會放入另外的集合保存起來,以便重試,過程如下:

1.把因執行失敗的隊列從 delayed 集合重新 rpush 到當前執行的隊列中。

2.把因執行超時的隊列從 reserved 集合重新 rpush 到當前執行的隊列中。

3.然后才是從隊列中取任務開始執行,同時把隊列放入 reserved 的有序集合。

這里使用了 eval 命令執行這個過程,用到了幾個 lua 腳本。

從要執行的隊列中取任務:

  1. local job = redis.call('lpop', KEYS[1]) 
  2. local reserved = false 
  3. if(job ~= false) then 
  4.  reserved = cjson.decode(job) 
  5.  reserved['attempts'] = reserved['attempts'] + 1 
  6.  reserved = cjson.encode(reserved) 
  7.  redis.call('zadd', KEYS[2], ARGV[1], reserved) 
  8. end 
  9. return {job, reserved} 

可以看到 Laravel 在取 Redis 要執行的隊列的時候,同時會放一份到一個有序集合中,并使用過期時間戳作為分值。

只有當這個任務完成后,再把有序集合中這個任務移除。從這個有序集合移除隊列的代碼就省略,我們看一下 Laravel 如何處理執行時間大于 60 秒的隊列。

也就是這段 lua 腳本執行的操作:

  1. local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) 
  2. if(next(val) ~= nil) then 
  3.  redis.call('zremrangebyrank', KEYS[1], 0, #val - 1) 
  4.  for i = 1, #val, 100 do 
  5.   redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val))) 
  6.  end 
  7. end 
  8. return true 

這里 zrangebyscore 找出分值從無限小到當前時間戳的元素,也就是 60 秒之前加入到集合的任務,然后通過 zremrangebyrank 從集合移除這些元素并 rpush 到隊列中。

看到這里應該就恍然大悟了。

如果一個隊列 60 秒沒執行完,那么進程在取隊列的時候從 reserved 集合中把這些任務又重新 rpush 到隊列中。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 黔江区| 武城县| 军事| 武宁县| 通榆县| 综艺| 淅川县| 邢台县| 六枝特区| 电白县| 探索| 古交市| 江津市| 盐亭县| 芦山县| 石嘴山市| 沧源| 清河县| 揭西县| 万宁市| 桦甸市| 奎屯市| 沾益县| 农安县| 德江县| 分宜县| 景洪市| 岚皋县| 临朐县| 张掖市| 永年县| 舞钢市| 沐川县| 汕头市| 宜州市| 衡山县| 溧水县| 安岳县| 周口市| 玉屏| 江阴市|