国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

解決python3 pika之連接斷開的問題

2020-02-16 00:12:03
字體:
供稿:網(wǎng)友

問題描述

在消費(fèi)rabbitMQ隊(duì)列時(shí), 每次進(jìn)入回調(diào)函數(shù)內(nèi)需要進(jìn)行一些比較耗時(shí)的操作;操作完成后給rabbitMQ server發(fā)送ack信號(hào)以dequeue本條消息。

問題就發(fā)生在發(fā)送ack操作時(shí), 程序提示鏈接已被斷開或socket error。

源碼示例

#!/usr/bin#coding: utf-8import pikaimport timeUSER = 'guest'PWD = 'guest'TEST_QUEUE = 'just4test'def callback(ch, method, properties, body): print(body) time.sleep(600) ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest") ch.basic_ack(delivery_tag = method.delivery_tag)def test_main(): s_conn = pika.BlockingConnection(  pika.ConnectionParameters('127.0.0.1',    credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest") chan.basic_consume(callback, queue=TEST_QUEUE) chan.start_consuming()if __name__ == "__main__": test_main()

運(yùn)行一段時(shí)間后, 就會(huì)報(bào)錯(cuò):

[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None[CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')

問題排查

猜測:pika客戶端沒有及時(shí)發(fā)送心跳,連接被server斷開

一開始修改了heartbeat_interval參數(shù)值, 示例如下:

def test_main(): s_conn = pika.BlockingConnection(  pika.ConnectionParameters('127.0.0.1',    heartbeat_interval=10,   socket_timeout=5,   credentials=pika.PlainCredentials(USER, PWD))) # ....

修改后運(yùn)行依然報(bào)錯(cuò),后來想想應(yīng)該單線程被一直占用,pika無法發(fā)送心跳;

于是又加了個(gè)心跳線程, 示例如下:

#!/usr/bin#coding: utf-8import pikaimport timeimport loggingimport threadingUSER = 'guest'PWD = 'guest'TEST_QUEUE = 'just4test'class Heartbeat(threading.Thread): def __init__(self, connection):  super(Heartbeat, self).__init__()  self.lock = threading.Lock()  self.connection = connection  self.quitflag = False  self.stopflag = True  self.setDaemon(True) def run(self):  while not self.quitflag:   time.sleep(10)   self.lock.acquire()   if self.stopflag :    self.lock.release()    continue   try:    self.connection.process_data_events()   except Exception as ex:    logging.warn("Error format: %s"%(str(ex)))    self.lock.release()    return   self.lock.release() def startHeartbeat(self):  self.lock.acquire()  if self.quitflag==True:   self.lock.release()   return  self.stopflag=False  self.lock.release()def callback(ch, method, properties, body): logging.info("recv_body:%s" % body) time.sleep(600) ch.basic_ack(delivery_tag = method.delivery_tag)def test_main(): s_conn = pika.BlockingConnection(  pika.ConnectionParameters('127.0.0.1',    heartbeat_interval=10,   socket_timeout=5,   credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_consume(callback,      queue=TEST_QUEUE) heartbeat = Heartbeat(s_conn) heartbeat.start()   #開啟心跳線程 heartbeat.startHeartbeat() chan.start_consuming()if __name__ == "__main__": test_main()            
發(fā)表評論 共有條評論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 闵行区| 河池市| 阿拉善盟| 莱西市| 筠连县| 乌恰县| 宣威市| 南投市| 西吉县| 阿克苏市| 四子王旗| 蓝田县| 竹山县| 鄂伦春自治旗| 灌云县| 拉萨市| 遂川县| 板桥市| 大竹县| 稷山县| 奉节县| 府谷县| 怀来县| 湘阴县| 通山县| 桂林市| 沂源县| 米泉市| 句容市| 阿坝县| 昌图县| 济南市| 太湖县| 三亚市| 乌拉特后旗| 游戏| 绥芬河市| 屏山县| 开远市| 广安市| 大丰市|