国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 編程 > JavaScript > 正文

Node.js + Redis Sorted Set實(shí)現(xiàn)任務(wù)隊(duì)列

2019-11-20 08:57:21
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

需求:功能 A 需要調(diào)用第三方 API 獲取數(shù)據(jù),而第三方 API 自身是異步處理方式,在調(diào)用后會(huì)返回?cái)?shù)據(jù)與狀態(tài) { data: "查詢結(jié)果", "status": "正在異步處理中" } ,這樣就需要間隔一段時(shí)間后再去調(diào)用第三方 API 獲取數(shù)據(jù)。為了用戶在使用功能 A 時(shí)不會(huì)因?yàn)榈谌?API 正在異步處理中而必須等待,將用戶請(qǐng)求加入任務(wù)隊(duì)列中,返回部分?jǐn)?shù)據(jù)并關(guān)閉請(qǐng)求。然后定時(shí)從任務(wù)隊(duì)列里中取出任務(wù)調(diào)用第三方 API,若返回狀態(tài)為”異步處理中“,將該任務(wù)再次加入任務(wù)隊(duì)列,若返回狀態(tài)為”已處理完畢“,將返回?cái)?shù)據(jù)入庫(kù)。

根據(jù)以上問(wèn)題,想到使用 Node.js + Redis sorted set 來(lái)實(shí)現(xiàn)任務(wù)隊(duì)列。Node.js 實(shí)現(xiàn)自身應(yīng)用 API 用來(lái)接受用戶請(qǐng)求,合并數(shù)據(jù)庫(kù)已存數(shù)據(jù)與 API 返回的部分?jǐn)?shù)據(jù)返回給用戶,并將任務(wù)加入到任務(wù)隊(duì)列中。利用 Node.js child process 與 cron 定時(shí)從任務(wù)隊(duì)列中取出任務(wù)執(zhí)行。

在設(shè)計(jì)任務(wù)隊(duì)列的過(guò)程中需要考慮到的幾個(gè)問(wèn)題

  • 并行執(zhí)行多個(gè)任務(wù)
  • 任務(wù)唯一性
  • 任務(wù)成功或失敗后的處理

針對(duì)以上問(wèn)題的解決方案

  • 并行執(zhí)行多個(gè)任務(wù)利用 Promise.all 來(lái)實(shí)現(xiàn)
  • 任務(wù)唯一性利用 Redis sorted set 來(lái)實(shí)現(xiàn)。使用時(shí)間戳作為分值可以實(shí)現(xiàn)將 sorted set 作為 list 來(lái)使用,在加入任務(wù)時(shí)判斷任務(wù)是否已經(jīng)存在,在取出任務(wù)執(zhí)行時(shí)將該任務(wù)分值設(shè)置為 0,每次取出分值大于 0 的任務(wù)來(lái)執(zhí)行,可以避免重復(fù)執(zhí)行任務(wù)。
  • 執(zhí)行任務(wù)成功后刪除任務(wù),執(zhí)行任務(wù)失敗后將任務(wù)分值更新為當(dāng)前時(shí)間時(shí)間戳,這樣就可以將失敗的任務(wù)重新加入任務(wù)隊(duì)列尾部

示例代碼

// remote_api.js 模擬第三方 API'use strict';const app = require('express')();app.get('/', (req, res) => {  setTimeout(() => {    let arr = [200, 300]; // 200 代表成功,300 代表失敗需要重新請(qǐng)求    res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });  }, 3000);});app.listen('9001', () => {  console.log('API 服務(wù)監(jiān)聽(tīng)端口:9001');});// producer.js 自身應(yīng)用 API,用來(lái)接受用戶請(qǐng)求并將任務(wù)加入任務(wù)隊(duì)列'use strict';const app = require('express')();const redisClient = require('redis').createClient();const QUEUE_NAME = 'queue:example';function addTaskToQueue(taskName, callback) {  // 先判斷任務(wù)是否已經(jīng)存在,存在:跳過(guò),不存在:加入任務(wù)隊(duì)列  redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {    if (error) {      console.log(error);    } else {      if (task) {        console.log('任務(wù)已存在,不新增相同任務(wù)');        callback(null, task);      } else {        redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {          if (error) {            callback(error);          } else {            callback(null, result);          }        });      }    }  });}app.get('/', (req, res) => {  let taskName = req.query['task-name'];  addTaskToQueue(taskName, (error, result) => {    if (error) {      console.log(error);    } else {      res.status(200).send('正在查詢中......');    }  });});app.listen(9002, () => {  console.log('生產(chǎn)者服務(wù)監(jiān)聽(tīng)端口:9002');});// consumer.js 定時(shí)獲取任務(wù)并執(zhí)行'use strict';const redisClient = require('redis').createClient();const request = require('request');const schedule = require('node-schedule');const QUEUE_NAME = 'queue:expmple';const PARALLEL_TASK_NUMBER = 2; // 并行執(zhí)行任務(wù)數(shù)量function getTasksFromQueue(callback) {  // 獲取多個(gè)任務(wù)  redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {    if (error) {      callback(error);    } else {      // 將任務(wù)分值設(shè)置為 0,表示正在處理      if (tasks.length > 0) {        let tmp = [];        tasks.forEach((task) => {          tmp.push(0);          tmp.push(task);        });        redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {          if (error) {            callback(error);          } else {            callback(null, tasks)          }        });      }    }  });}function addFailedTaskToQueue(taskName, callback) {  redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {    if (error) {      callback(error);    } else {      callback(null, result);    }  });}function removeSucceedTaskFromQueue(taskName, callback) {  redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {    if (error) {      callback(error);    } else {      callback(null, result);    }  })}function execTask(taskName) {  return new Promise((resolve, reject) => {    let requestOptions = {      'url': 'http://127.0.0.1:9001',      'method': 'GET',      'timeout': 5000    };    request(requestOptions, (error, response, body) => {      if (error) {        resolve('failed');        console.log(error);        addFailedTaskToQueue(taskName, (error) => {          if (error) {            console.log(error);          } else {          }        });      } else {        try {          body = typeof body !== 'object' ? JSON.parse(body) : body;        } catch (error) {          resolve('failed');          console.log(error);          addFailedTaskToQueue(taskName, (error, result) => {            if (error) {              console.log(error);            } else {            }          });          return;        }        if (body.status !== 200) {          resolve('failed');          addFailedTaskToQueue(taskName, (error, result) => {            if (error) {              console.log(error);            } else {            }          });        } else {          resolve('succeed');          removeSucceedTaskFromQueue(taskName, (error, result) => {            if (error) {              console.log(error);            } else {            }          });        }      }    });  });}// 定時(shí),每隔 5 秒獲取新的任務(wù)來(lái)執(zhí)行l(wèi)et job = schedule.scheduleJob('*/5 * * * * *', () => {  console.log('獲取新任務(wù)');  getTasksFromQueue((error, tasks) => {    if (error) {      console.log(error);    } else {      if (tasks.length > 0) {        console.log(tasks);        Promise.all(tasks.map(execTask))        .then((results) => {          console.log(results);        })        .catch((error) => {          console.log(error);        });              }    }  });});

發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 马公市| 巴青县| 从江县| 泗水县| 海城市| 新源县| 临沭县| 杭锦后旗| 自贡市| 健康| 嘉祥县| 镇江市| 富宁县| 潼南县| 台江县| 漳浦县| 仙居县| 黑山县| 师宗县| 怀集县| 宁安市| 邮箱| 宿州市| 呈贡县| 抚宁县| 宁化县| 志丹县| 太和县| 嘉兴市| 北碚区| 中宁县| 越西县| 横山县| 建水县| 诸城市| 平果县| 扎鲁特旗| 嘉峪关市| 蓬莱市| 富蕴县| 达尔|