国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 編程 > Python > 正文

解決python3 pika之連接斷開(kāi)的問(wèn)題

2020-01-04 13:46:52
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

問(wèn)題描述

在消費(fèi)rabbitMQ隊(duì)列時(shí), 每次進(jìn)入回調(diào)函數(shù)內(nèi)需要進(jìn)行一些比較耗時(shí)的操作;操作完成后給rabbitMQ server發(fā)送ack信號(hào)以dequeue本條消息。

問(wèn)題就發(fā)生在發(fā)送ack操作時(shí), 程序提示鏈接已被斷開(kāi)或socket error。

源碼示例

#!/usr/bin#coding: utf-8import pikaimport timeUSER = 'guest'PWD = 'guest'TEST_QUEUE = 'just4test'def callback(ch, method, properties, body): print(body) time.sleep(600) ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest") ch.basic_ack(delivery_tag = method.delivery_tag)def test_main(): s_conn = pika.BlockingConnection(  pika.ConnectionParameters('127.0.0.1',    credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest") chan.basic_consume(callback, queue=TEST_QUEUE) chan.start_consuming()if __name__ == "__main__": test_main()

運(yùn)行一段時(shí)間后, 就會(huì)報(bào)錯(cuò):

[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None[CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')

問(wèn)題排查

猜測(cè):pika客戶端沒(méi)有及時(shí)發(fā)送心跳,連接被server斷開(kāi)

一開(kāi)始修改了heartbeat_interval參數(shù)值, 示例如下:

def test_main(): s_conn = pika.BlockingConnection(  pika.ConnectionParameters('127.0.0.1',    heartbeat_interval=10,   socket_timeout=5,   credentials=pika.PlainCredentials(USER, PWD))) # ....

修改后運(yùn)行依然報(bào)錯(cuò),后來(lái)想想應(yīng)該單線程被一直占用,pika無(wú)法發(fā)送心跳;

于是又加了個(gè)心跳線程, 示例如下:

#!/usr/bin#coding: utf-8import pikaimport timeimport loggingimport threadingUSER = 'guest'PWD = 'guest'TEST_QUEUE = 'just4test'class Heartbeat(threading.Thread): def __init__(self, connection):  super(Heartbeat, self).__init__()  self.lock = threading.Lock()  self.connection = connection  self.quitflag = False  self.stopflag = True  self.setDaemon(True) def run(self):  while not self.quitflag:   time.sleep(10)   self.lock.acquire()   if self.stopflag :    self.lock.release()    continue   try:    self.connection.process_data_events()   except Exception as ex:    logging.warn("Error format: %s"%(str(ex)))    self.lock.release()    return   self.lock.release() def startHeartbeat(self):  self.lock.acquire()  if self.quitflag==True:   self.lock.release()   return  self.stopflag=False  self.lock.release()def callback(ch, method, properties, body): logging.info("recv_body:%s" % body) time.sleep(600) ch.basic_ack(delivery_tag = method.delivery_tag)def test_main(): s_conn = pika.BlockingConnection(  pika.ConnectionParameters('127.0.0.1',    heartbeat_interval=10,   socket_timeout=5,   credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_consume(callback,      queue=TEST_QUEUE) heartbeat = Heartbeat(s_conn) heartbeat.start()   #開(kāi)啟心跳線程 heartbeat.startHeartbeat() chan.start_consuming()if __name__ == "__main__": test_main()

嘗試運(yùn)行,結(jié)果還是不行,不得不安靜下來(lái)思考自己是不是想錯(cuò)了。

去看它的api,看到heartbeat_interval的解析:

:param int heartbeat_interval: How often to send heartbeats.         Min between this value and server's proposal         will be used. Use 0 to deactivate heartbeats         and None to accept server's proposal.

按這樣說(shuō)法,應(yīng)該還是沒(méi)有把心跳值給設(shè)置好。上面的程序期望是10秒發(fā)一次心跳,但是理論上發(fā)送心跳的間隔會(huì)比10秒多一點(diǎn)。所以艾瑪,我應(yīng)該是把heartbeat_interval的作用搞錯(cuò)了, 它是指超過(guò)這個(gè)時(shí)間間隔不發(fā)心跳或不給server任何信息,server就會(huì)斷開(kāi)連接, 而不是說(shuō)pika會(huì)按這個(gè)間隔來(lái)發(fā)心跳。 結(jié)果我把heartbeat_interval值設(shè)置高一點(diǎn)(比實(shí)際發(fā)送心跳/信息的間隔更長(zhǎng)),比如上面設(shè)置成60秒,就正常運(yùn)行了。

如果不指定heartbeat_interval, 它默認(rèn)為None, 意味著按rabbitMQ server的配置來(lái)檢測(cè)心跳是否正常。

如果設(shè)置heartbeat_interval=0, 意味著不檢測(cè)心跳,server端將不會(huì)主動(dòng)斷開(kāi)連接。

以上這篇解決python3 pika之連接斷開(kāi)的問(wèn)題就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持VEVB武林網(wǎng)。


注:相關(guān)教程知識(shí)閱讀請(qǐng)移步到python教程頻道。
發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 南开区| 静宁县| 威宁| 紫金县| 阿克陶县| 重庆市| 阿城市| 新余市| 湖北省| 资阳市| 印江| 潮州市| 宁波市| 广东省| 二连浩特市| 水城县| 独山县| 陕西省| 丹阳市| 封开县| 五峰| 长泰县| 抚顺市| 关岭| 苏尼特左旗| 中超| 武清区| 喀喇沁旗| 邳州市| 嵊泗县| 乌兰县| 北京市| 大渡口区| 治多县| 桐乡市| 于都县| 成安县| 德江县| 综艺| 龙州县| 柘荣县|