问题描述
在消费rabbitMQ队列时, 每次进入回调函数内需要进行一些比较耗时的操作;操作完成后给rabbitMQ server发送ack信号以dequeue本条消息。
问题就发生在发送ack操作时, 程序提示链接已被断开或socket error。
源码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
#!/usr/bin #coding: utf-8 import pika import time USER = 'guest' PWD = 'guest' TEST_QUEUE = 'just4test' def callback(ch, method, properties, body): print (body) time.sleep( 600 ) ch.basic_publish('', routing_key = TEST_QUEUE, body = "fortest" ) ch.basic_ack(delivery_tag = method.delivery_tag) def test_main(): s_conn = pika.BlockingConnection( pika.ConnectionParameters( '127.0.0.1' , credentials = pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue = TEST_QUEUE) chan.basic_publish('', routing_key = TEST_QUEUE, body = "fortest" ) chan.basic_consume(callback, queue = TEST_QUEUE) chan.start_consuming() if __name__ = = "__main__" : test_main() |
运行一段时间后, 就会报错:
1
2
3
|
[ERROR][pika.adapters.base_connection][ 2017 - 08 - 18 12 : 33 : 49 ]Error event 25 , None [CRITICAL][pika.adapters.base_connection][ 2017 - 08 - 18 12 : 33 : 49 ]Tried to handle an error where no error existed [ERROR][pika.adapters.base_connection][ 2017 - 08 - 18 12 : 33 : 49 ]Fatal Socket Error: BrokenPipeError( 32 , 'Broken pipe' ) |
问题排查
猜测:pika客户端没有及时发送心跳,连接被server断开
一开始修改了heartbeat_interval参数值, 示例如下:
1
2
3
4
5
6
7
|
def test_main(): s_conn = pika.BlockingConnection( pika.ConnectionParameters( '127.0.0.1' , heartbeat_interval = 10 , socket_timeout = 5 , credentials = pika.PlainCredentials(USER, PWD))) # .... |
修改后运行依然报错,后来想想应该单线程被一直占用,pika无法发送心跳;
于是又加了个心跳线程, 示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
#!/usr/bin #coding: utf-8 import pika import time import logging import threading USER = 'guest' PWD = 'guest' TEST_QUEUE = 'just4test' class Heartbeat(threading.Thread): def __init__( self , connection): super (Heartbeat, self ).__init__() self .lock = threading.Lock() self .connection = connection self .quitflag = False self .stopflag = True self .setDaemon( True ) def run( self ): while not self .quitflag: time.sleep( 10 ) self .lock.acquire() if self .stopflag : self .lock.release() continue try : self .connection.process_data_events() except Exception as ex: logging.warn( "Error format: %s" % ( str (ex))) self .lock.release() return self .lock.release() def startHeartbeat( self ): self .lock.acquire() if self .quitflag = = True : self .lock.release() return self .stopflag = False self .lock.release() def callback(ch, method, properties, body): logging.info( "recv_body:%s" % body) time.sleep( 600 ) ch.basic_ack(delivery_tag = method.delivery_tag) def test_main(): s_conn = pika.BlockingConnection( pika.ConnectionParameters( '127.0.0.1' , heartbeat_interval = 10 , socket_timeout = 5 , credentials = pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue = TEST_QUEUE) chan.basic_consume(callback, queue = TEST_QUEUE) heartbeat = Heartbeat(s_conn) heartbeat.start() #开启心跳线程 heartbeat.startHeartbeat() chan.start_consuming() if __name__ = = "__main__" : test_main() |
尝试运行,结果还是不行,不得不安静下来思考自己是不是想错了。
去看它的api,看到heartbeat_interval的解析:
1
2
3
4
|
:param int heartbeat_interval: How often to send heartbeats. Min between this value and server's proposal will be used. Use 0 to deactivate heartbeats and None to accept server's proposal. |
按这样说法,应该还是没有把心跳值给设置好。上面的程序期望是10秒发一次心跳,但是理论上发送心跳的间隔会比10秒多一点。所以艾玛,我应该是把heartbeat_interval的作用搞错了, 它是指超过这个时间间隔不发心跳或不给server任何信息,server就会断开连接, 而不是说pika会按这个间隔来发心跳。 结果我把heartbeat_interval值设置高一点(比实际发送心跳/信息的间隔更长),比如上面设置成60秒,就正常运行了。
如果不指定heartbeat_interval, 它默认为None, 意味着按rabbitMQ server的配置来检测心跳是否正常。
如果设置heartbeat_interval=0, 意味着不检测心跳,server端将不会主动断开连接。
以上这篇解决python3 pika之连接断开的问题就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/moxiaomomo/article/details/77414831