上个项目中用到了activemq,只是简单应用,安装完成后直接是用就可以了。由于新项目中一些硬件的限制,需要把消息队列换成rabbitmq。
rabbitmq中的几种模式和机制比activemq多多了,根据业务需要,使用rpc实现功能,其中踩过的一些坑,有必要记录一下了。
上代码,目录结构分为 c_server、c_client、c_hanlder:
c_server:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika import time import json import io import yaml s_exchange = input ( "请输入交换机名称->>" ).decode( 'utf-8' ).strip() s_queue = input ( "输入消息队列名称->>" ).decode( 'utf-8' ).strip() credentials = pika.plaincredentials( 'system' , 'manager' ) connection = pika.blockingconnection(pika.connectionparameters(host = 'xxx.xxx.xxx.xxx' ,credentials = credentials)) # 定义 channel = connection.channel() channel.exchange_declare(exchange = s_exchange, exchange_type = 'direct' ) channel.queue_declare(queue = s_queue, exclusive = true) channel.queue_bind(queue = s_queue, exchange = s_exchange) def s_manage(content): # 解决unicode转码问题 json.jsondecoder().decode(content) str_content = yaml.safe_load(json.loads(content,encoding = 'utf-8' )) str_res = { "errorid" : 0 , "resp" : str_content[ 'cmd' ], "errorcont" : "成功" } return json.dumps(str_res) def on_request(ch, method, props, body): response = s_manage(body) ch.basic_publish(exchange = '', routing_key = props.reply_to, properties = pika.basicproperties(correlation_id = \ props.correlation_id), body = response) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count = 1 ) channel.basic_consume(on_request, queue = s_queue) print ( " [x] awaiting rpc requests" ) channel.start_consuming() |
c_client:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika import uuid import json import io class rpcclient( object ): def __init__( self ): self .credentials = pika.plaincredentials( 'guest' , 'guest' ) self .connection = pika.blockingconnection(pika.connectionparameters(host = 'xxx.xxx.xxx.xxx' , credentials = self .credentials)) self .channel = self .connection.channel() def on_response( self , ch, method, props, body): if self .callback_id = = props.correlation_id: self .response = body ch.basic_ack(delivery_tag = method.delivery_tag) def get_response( self , callback_queue, callback_id): '''取队列里的值,获取callback_queued的执行结果''' self .callback_id = callback_id self .response = none self .channel.queue_declare( 'q_manager' , durable = true) self .channel.basic_consume( self .on_response, # 只要收到消息就执行on_response queue = callback_queue) while self .response is none: self .connection.process_data_events() # 非阻塞版的start_consuming return self .response def call( self , queue_name, command, exchange,rout_key): # 命令下发 '''队列里发送数据''' # result = self.channel.queue_declare(exclusive=false) #exclusive=false 必须这样写 self .callback_queue = 'q_manager' # result.method.queue self .corr_id = str (uuid.uuid4()) self .channel.basic_publish(exchange = exchange, routing_key = queue_name, properties = pika.basicproperties( reply_to = self .callback_queue, # 发送返回信息的队列name correlation_id = self .corr_id, # 发送uuid 相当于验证码 ), body = command) return self .callback_queue, self .corr_id client |
c_handler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
#!/usr/bin/env python # -*- coding:utf-8 -*- from c_client import * import random, time import threading import json import sys class handler( object ): def __init__( self ): self .information = {} # 后台进程信息 def check_all( self , * args): '''查看所有信息''' time.sleep( 2 ) print ( '获取消息' ) for key in self .information: print ( "cid【%s】\t 队列【%s】\t 命令【%s】" % (key, self .information[key][ 0 ], self .information[key][ 1 ])) def check_task( self , cmd): '''查看task_id执行结果''' time.sleep( 2 ) try : task_id = int (cmd) print (task_id) callback_queue = self .information[task_id][ 2 ] callback_id = self .information[task_id][ 3 ] client = rpcclient() response = client.get_response(callback_queue, callback_id) print (response) # print(response.decode()) del self .information[task_id] except keyerror as e : print ( "error: [%s]" % e) except indexerror as e: print ( "error: [%s]" % e) def run( self , user_cmd, host, exchange = ' ', rout_key=' ',que=' '): try : time.sleep( 2 ) command = user_cmd task_id = random.randint( 10000 , 99999 ) client = rpcclient() response = client.call(queue_name = host, command = command,exchange = exchange,rout_key = que) self .information[task_id] = [host, command, response[ 0 ], response[ 1 ]] except indexerror as e: print ( "[error]:%s" % e) def reflect( self , str ,cmd,host,exchange,que): '''反射''' if hasattr ( self , str ): getattr ( self , str )(cmd,host,exchange,que) def start( self , m,cmd, host, exchange,que): while true: user_resp = input ( "输入处理消息内容id->>" ).decode( 'utf-8' ).strip() self .check_task(user_resp) str = m print ( self .information) t1 = threading.thread(target = self .reflect, args = ( str ,cmd,host,exchange,que)) #多线程 t1.start() s_exchange = input ( "请输入交换机名称->>" ).decode( 'utf-8' ).strip() s_queue = input ( "输入消息队列名称->>" ).decode( 'utf-8' ).strip() d_cmd_state = input ( "输入json命令->>" ).decode( 'utf-8' ).strip() s_cmd = json.dumps(d_cmd_state) handler = handler() handler.start( 'run' ,s_cmd, s_queue, s_exchange, s_queue) handler |
注意要点:1、c_client 发布消息到rabbitmq 需要携带 服务器返回的队列名称,及corr_id
2、c_handler 做了处理,每次发送的内容都会放到task列表中,直到显示id号,就可以查询返回的内容,调用如下:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.cnblogs.com/dugufei/p/9105581.html