salt分发后,主动将已完成的任务数据推送到redis中,使用redis的生产者模式,进行消息传送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
#coding=utf-8 import fnmatch,json,logging import salt.config import salt.utils.event from salt.utils.redis import RedisPool import sys,os,datetime,random import multiprocessing,threading from joi.utils.gobsAPI import PostWeb logger = logging.getLogger(__name__) opts = salt.config.client_config( '/data/salt/saltstack/etc/salt/master' ) r_conn = RedisPool(opts.get( 'redis_db' )).getConn() lock = threading.Lock() class RedisQueueDaemon( object ): ''' redis 队列监听器 ''' def __init__( self ,r_conn): self .r_conn = r_conn #redis 连接实例 self .task_queue = 'task:prod:queue' #任务消息队列 def listen_task( self ): ''' 监听主函数 ''' while True : queue_item = self .r_conn.blpop( self .task_queue, 0 )[ 1 ] print "queue get" ,queue_item #self.run_task(queue_item) t = threading.Thread(target = self .run_task,args = (queue_item,)) t.start() def run_task( self ,info): ''' 执行操作函数 ''' lock.acquire() info = json.loads(info) if info[ 'type' ] = = 'pushTaskData' : task_data = self .getTaskData(info[ 'jid' ]) task_data = json.loads(task_data) if task_data else [] logger.info( '获取缓存数据:%s' % task_data) if task_data: if self .sendTaskData2bs(task_data): task_data = [] self .setTaskData(info[ 'jid' ], task_data) elif info[ 'type' ] = = 'setTaskState' : self .setTaskState(info[ 'jid' ],info[ 'state' ],info[ 'message' ]) elif info[ 'type' ] = = 'setTaskData' : self .setTaskData(info[ 'jid' ], info[ 'data' ]) lock.release() def getTaskData( self ,jid): return self .r_conn.hget( 'task:' + jid, 'data' ) def setTaskData( self ,jid,data): self .r_conn.hset( 'task:' + jid, 'data' ,json.dumps(data)) def sendTaskData2bs( self ,task_data): logger.info( '发送任务数据到后端...' ) logger.info(task_data) if task_data: p = PostWeb( '/jgapi/verify' ,task_data, 'pushFlowTaskData' ) result = p.postRes() print result if result[ 'code' ]: logger.info( '发送成功!' ) return True else : logger.error( '发送失败!' ) return False else : return True def setTaskState( self ,jid,state,message = ''): logger.info( '到后端设置任务【%s】状态' % str (jid)) p = PostWeb( '/jgapi/verify' ,{ 'code' :jid, 'state' : 'success' , 'message' :message}, 'setTaskState' ) result = p.postRes() if result[ 'code' ]: logger.info( '设置任务【%s】状态成功!' % str (jid)) return True ,result else : logger.error( '设置任务【%s】状态失败!' % str (jid)) return result def salt_job_listener(): ''' salt job 监听器 ''' sevent = salt.utils.event.get_event( 'master' , sock_dir = opts[ 'sock_dir' ], transport = opts[ 'transport' ], opts = opts) while True : ret = sevent.get_event(full = True ) if ret is None : continue if fnmatch.fnmatch(ret[ 'tag' ], 'salt/job/*/ret/*' ): task_key = 'task:' + ret[ 'data' ][ 'jid' ] task_state = r_conn.hget(task_key, 'state' ) task_data = r_conn.hget(task_key, 'data' ) if task_state: jid_data = { 'code' :ret[ 'data' ][ 'jid' ], 'project_id' :settings.SALT_MASTER_OPTS[ 'project_id' ], 'serverip' :ret[ 'data' ][ 'id' ], 'returns' :ret[ 'data' ][ 'return' ], 'name' :ret[ 'data' ][ 'id' ], 'state' : 'success' if ret[ 'data' ][ 'success' ] else 'failed' , } task_data = json.loads(task_data) if task_data else [] task_data.append(jid_data) logger.info( "新增数据:%s" % json.dumps(task_data)) r_conn.lpush( 'task:prod:queue' ,json.dumps({ 'type' : 'setTaskData' , 'jid' :ret[ 'data' ][ 'jid' ], 'data' :task_data})) #r_conn.hset(task_key,'data',json.dumps(task_data)) if task_state = = 'running' : if len (task_data)> = 1 : logger.info( '新增消息到队列:pushTaskData' ) r_conn.lpush( 'task:prod:queue' ,json.dumps({ 'jid' :ret[ 'data' ][ 'jid' ], 'type' : 'pushTaskData' })) else : logger.info( '任务{0}完成,发送剩下的数据到后端...' . format (task_key)) logger.info( '新增消息到队列:pushTaskData' ) r_conn.lpush( 'task:prod:queue' ,json.dumps({ 'jid' :ret[ 'data' ][ 'jid' ], 'type' : 'pushTaskData' })) print datetime.datetime.now() def run(): print 'start redis product queue listerner...' logger.info( 'start redis product queue listerner...' ) multiprocessing.Process(target = RedisQueueDaemon(r_conn).listen_task,args = ()).start() print 'start salt job listerner...' logger.info( 'start salt job listerner...' ) multiprocessing.Process(target = salt_job_listener,args = ()).start() ''' p=multiprocessing.Pool(2) print 'start redis product queue listerner...' p.apply_async(redis_queue_listenr,()) print 'start salt job listerner...' p.apply_async(salt_job_listener,()) p.close() p.join() ''' |
以上这篇python 监听salt job状态,并任务数据推送到redis中的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/u011085172/article/details/81228450