微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。
其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。
而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果。
为兼容这两种方式,因此按照订阅号的方式处理。
处理办法与接口文档中的要求相同:
为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。
而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。
下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数
据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。
设计思路和使用方法:
自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据
尽可能的保证Class中每一个可执行的函数单独调用都能成功。
Class中只将真正能被用到的方法和变量设置为public的。
使用时只需要修改此文件中的weixin_qy_CorpID和weixin_qy_Secret改成自己的,并import此文件,使
用WeiXinTokenClass().get()方法即可得到access_token。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
|
#!/usr/bin/python # encoding: utf-8 # -*- coding: utf8 -*- #Python学习群125240963每天更新资料,包括2018最新企业级项目案例,同千人一起交流。 import os import sqlite3 import sys import urllib import urllib2 import json import datetime # import time enable_debug = True def debug(msg, code = None ): if enable_debug: if code is None : print "message: %s" % msg else : print "message: %s, code: %s " % (msg, code) AUTHOR_MAIL = "uberurey_ups@163.com" weixin_qy_CorpID = "your_corpid" weixin_qy_Secret = "your_secret" # Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # Database # https://docs.djangoproject.com/en/1.9/ref/settings/#databases DATABASES = { 'default' : { 'ENGINE' : 'db.backends.sqlite3' , 'NAME' : os.path.join(BASE_DIR, '.odbp_db.sqlite3' ), } } sqlite3_db_file = str (DATABASES[ 'default' ][ 'NAME' ]) def sqlite3_conn(database): try : conn = sqlite3.connect(database) except sqlite3.Error: print >> sys.stderr, """\ There was a problem connecting to Database: %s The error leading to this problem was: %s It's possible that this database is broken or permission denied. If you cannot solve this problem yourself, please mail to: %s """ % (database, sys.exc_value, AUTHOR_MAIL) sys.exit( 1 ) else : return conn def sqlite3_commit(conn): return conn.commit() def sqlite3_close(conn): return conn.close() def sqlite3_execute(database, sql): try : sql_conn = sqlite3_conn(database) sql_cursor = sql_conn.cursor() sql_cursor.execute(sql) sql_conn.commit() sql_conn.close() except sqlite3.Error as e: print e sys.exit( 1 ) def sqlite3_create_table_token(): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''CREATE TABLE "main"."weixin_token" ( "id" INTEGER , "access_token" TEXT, "expires_in" TEXT, "expires_on" TEXT, "is_expired" INTEGER ) ; ''' ) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) def sqlite3_create_table_account(): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''CREATE TABLE "main"."weixin_account" ( "id" INTEGER, "name" TEXT, "corpid" TEXT, "secret" TEXT, "current" INTEGER ) ; ''' ) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) def sqlite3_create_tables(): print "sqlite3_create_tables" sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''CREATE TABLE "main"."weixin_token" ( "id" INTEGER , "access_token" TEXT, "expires_in" TEXT, "expires_on" TEXT, "is_expired" INTEGER ) ; ''' ) sql_cursor.execute( '''CREATE TABLE "main"."weixin_account" ( "id" INTEGER, "name" TEXT, "corpid" TEXT, "secret" TEXT, "current" INTEGER ) ; ''' ) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) def sqlite3_set_credential(corpid, secret): try : sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''INSERT INTO "weixin_account" ("id", "name", "corpid", "secret", "current") VALUES (1, 'odbp', ?, ?, 1) ''' , (corpid, secret)) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) except sqlite3.Error: sqlite3_create_table_account() sqlite3_set_credential(corpid, secret) def sqlite3_set_token(access_token, expires_in, expires_on, is_expired): try : sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''INSERT INTO "weixin_token" ("id", "access_token", "expires_in", "expires_on", "is_expired") VALUES ( 1, ?, ?, ?, ? ) ''' , (access_token, expires_in, expires_on, is_expired)) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) except sqlite3.Error: sqlite3_create_table_token() sqlite3_set_token(access_token, expires_in, expires_on, is_expired) def sqlite3_get_credential(): try : sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() credential = sql_cursor.execute( '''SELECT "corpid", "secret" FROM weixin_account WHERE current == 1;''' ) result = credential.fetchall() sqlite3_close(sql_conn) except sqlite3.Error: sqlite3_set_credential(weixin_qy_CorpID, weixin_qy_Secret) return sqlite3_get_credential() else : if result is not None and len (result) ! = 0 : return result else : print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL sys.exit( 1 ) def sqlite3_get_token(): try : sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() credential = sql_cursor.execute( '''SELECT "access_token", "expires_on" FROM weixin_token WHERE "is_expired" == 1 ;''' ) result = credential.fetchall() sqlite3_close(sql_conn) except sqlite3.Error: info = sys.exc_info() print info[ 0 ], ":" , info[ 1 ] else : if result is not None and len (result) ! = 0 : return result else : # print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL # sys.exit(1) return None def sqlite3_update_token(access_token, expires_on): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''UPDATE "weixin_token" SET access_token=?, expires_on=? WHERE _ROWID_ = 1;''' , (access_token, expires_on) ) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) class WeiXinTokenClass( object ): def __init__( self ): self .__corpid = None self .__corpsecret = None self .__use_persistence = True self .__access_token = None self .__expires_in = None self .__expires_on = None self .__is_expired = None if self .__use_persistence: self .__corpid = sqlite3_get_credential()[ 0 ][ 0 ] self .__corpsecret = sqlite3_get_credential()[ 0 ][ 1 ] else : self .__corpid = weixin_qy_CorpID self .__corpsecret = weixin_qy_Secret def __get_token_from_weixin_qy_api( self ): parameters = { "corpid" : self .__corpid, "corpsecret" : self .__corpsecret } url_parameters = urllib.urlencode(parameters) token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?" url = token_url + url_parameters response = urllib2.urlopen(url) result = response.read() token_json = json.loads(result) if token_json[ 'access_token' ] is not None : get_time_now = datetime.datetime.now() # TODO(Guodong Ding) token will expired ahead of time or not expired after the time expire_time = get_time_now + datetime.timedelta(seconds = token_json[ 'expires_in' ]) token_json[ 'expires_on' ] = str (expire_time) self .__access_token = token_json[ 'access_token' ] self .__expires_in = token_json[ 'expires_in' ] self .__expires_on = token_json[ 'expires_on' ] self .__is_expired = 1 try : token_result_set = sqlite3_get_token() except sqlite3.Error: token_result_set = None if token_result_set is None and len (token_result_set) = = 0 : sqlite3_set_token( self .__access_token, self .__expires_in, self .__expires_on, self .__is_expired) else : if self .__is_token_expired() is True : sqlite3_update_token( self .__access_token, self .__expires_on) else : debug( "pass" ) return else : if token_json[ 'errcode' ] is not None : print "errcode is: %s" % token_json[ 'errcode' ] print "errmsg is: %s" % token_json[ 'errmsg' ] else : print result def __get_token_from_persistence_storage( self ): try : token_result_set = sqlite3_get_token() except sqlite3.Error: self .__get_token_from_weixin_qy_api() finally : if token_result_set is None : self .__get_token_from_weixin_qy_api() token_result_set = sqlite3_get_token() access_token = token_result_set[ 0 ][ 0 ] expire_time = token_result_set[ 0 ][ 1 ] else : access_token = token_result_set[ 0 ][ 0 ] expire_time = token_result_set[ 0 ][ 1 ] expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f' ) now_time = datetime.datetime.now() if now_time < expire_time: # print "The token is %s" % access_token # print "The token will expire on %s" % expire_time return access_token else : self .__get_token_from_weixin_qy_api() return self .__get_token_from_persistence_storage() @staticmethod def __is_token_expired(): try : token_result_set = sqlite3_get_token() except sqlite3.Error as e: print e sys.exit( 1 ) expire_time = token_result_set[ 0 ][ 1 ] expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f' ) now_time = datetime.datetime.now() if now_time < expire_time: return False else : return True def get( self ): return self .__get_token_from_persistence_storage() |
Python实现通过微信企业号发送文本消息的Class
编程要点和调用方法:
支持发送中文,核心语句“payload = json.dumps(self.data, encoding='utf-8', ensure_ascii=False)”,关键字“python json 中文”
这个Class只有一个公共方法send()。
使用方法:import这个class,然后调用send方法即可,方法参数只需要两个,给谁(多UserID用"|"隔开),内容是什么,例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import odbp_sendMessage msg = odbp_sendMessage.WeiXinSendMsgClass() msg.send( "dingguodong" , "Python 大魔王!" ) #!/usr/bin/python # encoding: utf-8 # -*- coding: utf8 -*- """ Created by PyCharm. File: LinuxBashShellScriptForOps:odbp_sendMessage.py User: Guodong Create Date: 2016/8/12 Create Time: 14:49 """ import odbp_getToken class WeiXinSendMsgClass( object ): def __init__( self ): self .access_token = odbp_getToken.WeiXinTokenClass().get() self .to_user = "" self .to_party = "" self .to_tag = "" self .msg_type = "text" self .agent_id = 2 self .content = "" self .safe = 0 self .data = { "touser" : self .to_user, "toparty" : self .to_party, "totag" : self .to_tag, "msgtype" : self .msg_type, "agentid" : self .agent_id, "text" : { "content" : self .content }, "safe" : self .safe } def send( self , to_user, content): if to_user is not None and content is not None : self .data[ 'touser' ] = to_user self .data[ 'text' ][ 'content' ] = content else : print raise RuntimeError import requests import json url = "https://qyapi.weixin.qq.com/cgi-bin/message/send" querystring = { "access_token" : self .access_token} payload = json.dumps( self .data, encoding = 'utf-8' , ensure_ascii = False ) headers = { 'content-type' : "application/json" , 'cache-control' : "no-cache" , } response = requests.request( "POST" , url, data = payload, headers = headers, params = querystring) return_content = json.loads(response.content) if return_content[ "errcode" ] = = 0 and return_content[ "errmsg" ] = = "ok" : print "Send successfully! %s " % return_content else : print "Send failed! %s " % return_content |
python调用mongodb发送微信企业号
python2.x
注意:data变量里, agent_id为刚刚创建的应用id(可在web页面看到)
toparty即为目标部门,或者可以用touser,totag指定目标账户
比较简单的调用,已实测,可以使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
#coding:utf-8 import sys import requests import json from pymongo import MongoClient reload (sys) sys.setdefaultencoding( 'utf-8' ) class Weixin( object ): def __init__( self , corp_id, corp_secret): self .token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s' % (corp_id, corp_secret) self .send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' def get_token( self ): try : r = requests.get( self .token_url, timeout = 10 ) except Exception as e: print e sys.exit( 1 ) if r.status_code = = requests.codes.ok: data = r.json() if data.get( 'errcode' ): print data[ 'errmsg' ] sys.exit( 1 ) return data[ 'access_token' ] else : print r.status_code sys.exit( 1 ) def send( self ,message): url = self .send_url + self .get_token() data = { "touser" : "hequan2011" , "msgtype" : "text" , "agentid" : "0" , "text" : { "content" : message }, "safe" : "0" } send_data = json.dumps(data,ensure_ascii = False ) try : r = requests.post(url, send_data) except Exception, e: print e sys.exit( 1 ) if r.status_code = = requests.codes.ok: print r.json() else : print r.code sys.exit( 1 ) corpid = 'xxxxxxxxxxx' corpsecret = 'xxxxxxxxxxxxxxxxx' client = MongoClient( 'mongodb://user:password@127.0.0.1:27017/' ) db = client.ku collection = db.biao a = [] for data in collection.find(): a.append(data) l = a[ 0 ] g = l z = str (g[ "name" ]) z1 = int (g[ "jg" ]) print z msg = "1:{0}\n 2:{1}\n" . format (z,z1) w = Weixin(corpid,corpsecret) w.send(msg) |
ZABBIX 微信报警 插件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
#!/usr/bin/env python # -*- coding:utf-8 -*- # __author__ = '懒懒的天空' import requests import sys import json from conf.INIFILES import read_config, write_config import os import datetime from conf.BLog import Log reload (sys) sys.setdefaultencoding( 'utf-8' ) class WeiXin( object ): def __init__( self , corpid, corpsecret): # 初始化的时候需要获取corpid和corpsecret,需要从管理后台获取 self .__params = { 'corpid' : corpid, 'corpsecret' : corpsecret } self .url_get_token = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken' self .url_send = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?' self .__token = self .__get_token() self .__token_params = { 'access_token' : self .__token } def __raise_error( self , res): raise Exception( 'error code: %s,error message: %s' % (res.json()[ 'errcode' ], res.json()[ 'errmsg' ])) global senderr sendstatus = False senderr = 'error code: %s,error message: %s' % (res.json()[ 'errcode' ], res.json()[ 'errmsg' ]) def __get_token( self ): # print self.url_get_token headers = { 'content-type' : 'application/json' } res = requests.get( self .url_get_token, headers = headers, params = self .__params) try : return res.json()[ 'access_token' ] except : self .__raise_error(res.content) def send_message( self , agentid, messages, userid = ' ', toparty=' '): payload = { 'touser' : userid, 'toparty' : toparty, 'agentid' : agentid, "msgtype" : "news" , "news" : messages } headers = { 'content-type' : 'application/json' } data = json.dumps(payload, ensure_ascii = False ).encode( 'utf-8' ) params = self .__token_params res = requests.post( self .url_send, headers = headers, params = params, data = data) try : return res.json() except : self .__raise_error(res) def main(send_to, subject, content): try : global sendstatus global senderr data = '' messages = {} body = {} config_file_path = get_path() CorpID = read_config(config_file_path, 'wei' , "CorpID" ) CorpSecret = read_config(config_file_path, 'wei' , "CorpSecret" ) agentid = read_config(config_file_path, 'wei' , "agentid" ) web = read_config(config_file_path, 'wei' , "web" ) content = json.loads(content) messages[ "message_url" ] = web body[ "url" ] = web + "history.php?action=showgraph&itemids[]=" + content[u '监控ID' ] warn_message = '' if content[u '当前状态' ] = = 'PROBLEM' : body[ "title" ] = "服务器故障" warn_message + = subject + '\n' warn_message + = '详情:\n' warn_message + = '告警等级:' + content[u '告警等级' ] + '\n' warn_message + = '告警时间:' + content[u '告警时间' ] + '\n' warn_message + = '告警地址:' + content[u '告警地址' ] + '\n' warn_message + = '持续时间:' + content[u '持续时间' ] + '\n' warn_message + = '监控项目:' + content[u '监控项目' ] + '\n' warn_message + = content[u '告警主机' ] + '故障(' + content[u '事件ID' ] + ')' else : body[ "title" ] = "服务器恢复" warn_message + = subject + '\n' warn_message + = '详情:\n' warn_message + = '告警等级:' + content[u '告警等级' ] + '\n' warn_message + = '恢复时间:' + content[u '恢复时间' ] + '\n' warn_message + = '告警地址:' + content[u '告警地址' ] + '\n' warn_message + = '持续时间:' + content[u '持续时间' ] + '\n' warn_message + = '监控项目:' + content[u '监控项目' ] + '\n' warn_message + = content[u '告警主机' ] + '恢复(' + content[u '事件ID' ] + ')' body[ 'description' ] = warn_message data = [] data.append(body) messages[ 'articles' ] = data wx = WeiXin(CorpID, CorpSecret) data = wx.send_message(toparty = send_to, agentid = agentid, messages = messages) sendstatus = True except Exception, e: senderr = str (e) sendstatus = False logwrite(sendstatus, data) def get_path(): path = os.path.dirname(os.path.abspath(sys.argv[ 0 ])) config_path = path + '/config.ini' return config_path def logwrite(sendstatus, content): logpath = '/var/log/zabbix/weixin' if not sendstatus: content = senderr t = datetime.datetime.now() daytime = t.strftime( '%Y-%m-%d' ) daylogfile = logpath + '/' + str (daytime) + '.log' logger = Log(daylogfile, level = "info" , is_console = False , mbs = 5 , count = 5 ) os.system( 'chown zabbix.zabbix {0}' . format (daylogfile)) logger.info(content) if __name__ = = "__main__" : if len (sys.argv) > 1 : send_to = sys.argv[ 1 ] subject = sys.argv[ 2 ] content = sys.argv[ 3 ] logwrite( True , content) main(send_to, subject, content) |
python实现微信企业号的文本消息推送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#!/usr/bin/python # _*_coding:utf-8 _*_ import urllib2 import json import sys reload (sys) sys.setdefaultencoding( 'utf-8' ) def gettoken(corpid, corpsecret): gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corpid + '&corpsecret=' + corpsecret try : token_file = urllib2.urlopen(gettoken_url) except urllib2.HTTPError as e: print e.code print e.read().decode( "utf8" ) sys.exit() token_data = token_file.read().decode( 'utf-8' ) token_json = json.loads(token_data) token_json.keys() token = token_json[ 'access_token' ] return token def senddata(access_token, user, party, agent, subject, content): send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token send_values = "{\"touser\":\"" + user + "\",\"toparty\":\"" + party + "\",\"totag\":\"\",\"msgtype\":\"text\",\"agentid\":\"" + agent + "\",\"text\":{\"content\":\"" + subject + "\n" + content + "\"},\"safe\":\"0\"}" send_request = urllib2.Request(send_url, send_values) response = json.loads(urllib2.urlopen(send_request).read()) print str (response) if __name__ = = '__main__' : user = str (sys.argv[ 1 ]) # 参数1:发送给用户的账号,必须关注企业号,并对企业号有发消息权限 party = str (sys.argv[ 2 ]) # 参数2:发送给组的id号,必须对企业号有权限 agent = str (sys.argv[ 3 ]) # 参数3:企业号中的应用id subject = str (sys.argv[ 4 ]) # 参数4:标题【消息内容的一部分】 content = str (sys.argv[ 5 ]) # 参数5:文本具体内容 corpid = 'CorpID' # CorpID是企业号的标识 corpsecret = 'corpsecretSecret' # corpsecretSecret是管理组凭证密钥 try : accesstoken = gettoken(corpid, corpsecret) senddata(accesstoken, user, party, agent, subject, content) except Exception, e: print str (e) + "Error Please Check \"corpid\" or \"corpsecret\" Config" |
Nagios调用Python程序控制微信公众平台发布报警信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
vim Notify - host - by - weixin - party.py import urllib.request import json import sys #以上是导入模块 #创建获取AccessToken的方法 def gettoken(corp_id,corp_secret): gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret try : token_file = urllib.request.urlopen(gettoken_url) except urllib.error.HTTPError as e: print (e.code) print (e.read().decode( "utf8" )) token_data = token_file.read().decode( 'utf-8' ) token_json = json.loads(token_data) token_json.keys() token = token_json[ 'access_token' ] return token #这里是发送消息的方法 def senddata(access_token,notify_str): send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token #我传入的参数是一段字符串每个信息用separator连起来,只要再用字符串的split("separator")方法分开信息就可以了。 notifydata = notify_str.split( "separator" ) party = notifydata[ 0 ] cationtype = notifydata[ 1 ] name = notifydata[ 2 ] state = notifydata[ 3 ] address = notifydata[ 4 ] output = notifydata[ 5 ] datatime = notifydata[ 6 ] # content = '[擦汗]Host Notification[擦汗]\n\n类型: ' + cationtype + '\n主机名: ' + name + '\n状态: ' + state + '\nIP地址: ' + address + '\n摘要: ' + output + '\n时间: ' + datatime + '\n' if cationtype = = "RECOVERY" : content = '[嘘]' + address + ' is ' + state + '[嘘]\n\nIP地址: ' + address + '\n主要用途: ' + name + '\n当前状态: ' + state + '\n\n日志摘要: ' + output + '\n检测时间: ' + datatime + '\n' else : content = '[擦汗]' + address + ' is ' + state + '[擦汗]\n\nIP地址: ' + address + '\n主要用途: ' + name + '\n当前状态: ' + state + '\n\n日志摘要: ' + output + '\n检测时间: ' + datatime + '\n' send_values = { "toparty" :party, "totag" : "2" , "msgtype" : "text" , "agentid" : "15" , "text" :{ "content" :content }, "safe" : "0" } send_data = json.dumps(send_values, ensure_ascii = False ).encode(encoding = 'UTF8' ) #设置为非ascii解析,使其支持中文 send_request = urllib.request.Request(send_url, send_data) response = urllib.request.urlopen(send_request) #这个是返回微信公共平台的信息,调试时比较有用 msg = response.read() return msg default_encoding = 'utf-8' if sys.getdefaultencoding() ! = default_encoding: reload (sys) sys.setdefaultencoding(default_encoding) #我编辑的脚本是要获取nagios传入的一段参数的(字符串),下面这条代码是获取执行脚本后获取的第一个参数(经测试nagios只能传入一个参进python,所以把所有包括用户名跟报警主机报警信息放进一个字符串里) notifystr = str (sys.argv[ 1 ]) corpid = 'wxb6162862801114c9da602' corpsecret = '2nCsNcHxepBCV4U9Lcf-23By1RGzU1Zs422tdJpKTQzqjQ1b26IFxP76ydG2rKkchGN6E' accesstoken = gettoken(corpid,corpsecret) msg = senddata(accesstoken,notifystr) print (msg) [root@localhost python] # vim Notify-service-by-weixin-party.py import urllib.request import json import sys def gettoken(corp_id,corp_secret): gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret try : token_file = urllib.request.urlopen(gettoken_url) except urllib.error.HTTPError as e: print (e.code) print (e.read().decode( "utf8" )) token_data = token_file.read().decode( 'utf-8' ) token_json = json.loads(token_data) token_json.keys() token = token_json[ 'access_token' ] return token def senddata(access_token,notify_str): send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token notifydata = notify_str.split( "separator" ) party = notifydata[ 0 ] cationtype = notifydata[ 1 ] desc = notifydata[ 2 ] alias = notifydata[ 3 ] address = notifydata[ 4 ] state = notifydata[ 5 ] datatime = notifydata[ 6 ] output = notifydata[ 7 ] # content ='[擦汗]Service Notification [擦汗]\n\n类型: ' + cationtype + '\n\n服务名: ' + desc + '\n主机名: ' + alias + '\nIP址: ' + address + '\n状态: ' + state + '\n时间: ' + datatime + '\n摘要:\n' + output + '\n' if cationtype = = "RECOVERY" : content = '[鼓掌]' + desc + ' is ' + state + '[鼓掌]\n\nIP地址: ' + address + '\n主要用途: ' + alias + '\n服务状态: ' + desc + ' is ' + state + '\n检测时间: ' + datatime + '\n日志摘要: \n' + output + '\n' else : content = '[擦汗]' + desc + ' is ' + state + '[擦汗]\n\nIP地址: ' + address + '\n主要用途: ' + alias + '\n服务状态: ' + desc + ' is ' + state + '\n检测时间: ' + datatime + '\n日志摘要: \n' + output + '\n' send_values = { "toparty" :party, "totag" : "2" , "msgtype" : "text" , "agentid" : "15" , "text" :{ "content" :content }, "safe" : "0" } send_data = json.dumps(send_values, ensure_ascii = False ).encode(encoding = 'UTF8' ) send_request = urllib.request.Request(send_url, send_data) response = urllib.request.urlopen(send_request) msg = response.read() return msg default_encoding = 'utf-8' if sys.getdefaultencoding() ! = default_encoding: reload (sys) sys.setdefaultencoding(default_encoding) notifystr = str (sys.argv[ 1 ]) corpid = 'wxb616286d28ds01114c9da602' corpsecret = '2nCsNcHxepBCdtgV4U9Lcf-23By1RGzUgh1Zs422tdJpKTQzqjQ1b26IFxP76ydG2rKkchGN6E' accesstoken = gettoken(corpid,corpsecret) msg = senddata(accesstoken,notifystr) print (msg) |
shell和Python调用企业微信服务号进行报警
1
2
3
4
5
|
#!/bin/bash corpid= "wxd6b3" corpsecret= "aJTaPaGjW" access_token=`curl -s "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpid&corpsecret=$corpsecret" |jq '.access_token' | awk -F '"' '{print $2}' ` curl -l -H "Content-type: application/json" -X POST -d '{"touser":"@all","msgtype":"text","toparty":"14","agentid":"14","text":{"content":"测试"} , "safe":"0"}' " |
Python脚本如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# coding:utf-8 import sys import urllib2 import time import json import requests reload (sys) sys.setdefaultencoding( 'utf-8' ) #title = sys.argv[2] # 位置参数获取title 适用于zabbix #content = sys.argv[3] # 位置参数获取content 适用于zabbix title = "title 测试" # 位置参数获取title 适用于zabbix content = "content 测试" # 位置参数获取content 适用于zabbix class Token( object ): # 获取token def __init__( self , corpid, corpsecret): self .baseurl = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={0}&corpsecret={1}' . format ( corpid, corpsecret) self .expire_time = sys.maxint def get_token( self ): if self .expire_time > time.time(): request = urllib2.Request( self .baseurl) response = urllib2.urlopen(request) ret = response.read().strip() ret = json.loads(ret) if 'errcode' in ret.keys(): print >> ret[ 'errmsg' ], sys.stderr sys.exit( 1 ) self .expire_time = time.time() + ret[ 'expires_in' ] self .access_token = ret[ 'access_token' ] return self .access_token def send_msg(title, content): # 发送消息 corpid = "" # 填写自己应用的 corpsecret = "" # 填写自己应用的 qs_token = Token(corpid = corpid, corpsecret = corpsecret).get_token() url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={0}" . format ( qs_token) payload = { "touser" : "@all" , "msgtype" : "text" , "agentid" : "14" , "text" : { "content" : "标题:{0}\n 内容:{1}" . format (title, content) }, "safe" : "0" } ret = requests.post(url, data = json.dumps(payload, ensure_ascii = False )) print ret.json() if __name__ = = '__main__' : # print title, content send_msg(title, content) |
python利用微信订阅号报警
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# coding=utf-8 import urllib import urllib2 import cookielib import json import sys data = { 'username' : 'yaokuaile-99' , 'pwd' : 'f4bb2d8abe7a799ad62495a912ae3363' , 'imgcode' :'', 'f' : 'json' } cj = cookielib.LWPCookieJar() opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) urllib2.install_opener(opener) def getToken(): headers = { 'Accept' : 'application/json, text/javascript, */*; q=0.01' , 'Accept-Encoding' : 'gzip,deflate,sdch' , 'Accept-Language' : 'zh-CN,zh;q=0.8' , 'Connection' : 'keep-alive' , 'Content-Type' : 'application/x-www-form-urlencoded; charset=UTF-8' , 'Content-Length' : '74' , 'Content-Type' : 'application/x-www-form-urlencoded; charset=UTF-8' , 'Host' : 'mp.weixin.qq.com' , 'Origin' : 'https://mp.weixin.qq.com' , 'Referer' : 'https://mp.weixin.qq.com/cgi-bin/loginpage?t=wxm2-login&lang=zh_CN' , 'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36' , 'X-Requested-With' : 'XMLHttpRequest' , } req = urllib2.Request( 'https://mp.weixin.qq.com/cgi-bin/login?lang=zh_CN' ,urllib.urlencode(data),headers) ret = urllib2.urlopen(req) retread = ret.read() token = json.loads(retread) token = token[ 'redirect_url' ][ 44 :] return token ### send msg def sendWeixin(msg,token,tofakeid): msg = msg token = token data1 = { 'type' : '1' , 'content' :msg, 'imgcode' :' ',' imgcode ':' ',' tofakeid ':tofakeid,' f ':' json ',' token ':token,' ajax ':' 1 '} headers = { 'Accept' : '*/*' , 'Accept-Encoding' : 'gzip,deflate,sdch' , 'Accept-Language' : 'zh-CN,zh;q=0.8' , 'Connection' : 'keep-alive' , 'Content-Type' : 'application/x-www-form-urlencoded; charset=UTF-8' , 'Host' : 'mp.weixin.qq.com' , 'Origin' : 'https://mp.weixin.qq.com' , 'Referer' : 'https://mp.weixin.qq.com/cgi-bin/singlemsgpage?msgid=&source=&count=20&t=wxm-singlechat&fromfakeid=150890&token=%s&lang=zh_CN' , 'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36' , 'X-Requested-With' : 'XMLHttpRequest' , } req2 = urllib2.Request( 'https://mp.weixin.qq.com/cgi-bin/singlesend?t=ajax-response&f=json&lang=zh_CN' ,urllib.urlencode(data1),headers) ret2 = urllib2.urlopen(req2) if __name__ = = '__main__' : ''' useage: ./send_wx.py msg ''' token = getToken() msg = sys.argv[ 1 :] msg = '\n' .join(msg) tofakeid = '2443746922' sendWeixin(msg, token, tofakeid) |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
#!/usr/bin/env python #-*- coding:utf-8 -*- import urllib2,json import datetime,time from config import * import sys reload (sys) sys.setdefaultencoding( "utf-8" ) class WechatPush(): def __init__( self ,appid,secrect,file_name): # 传入appid self .appid = appid # 传入密码 self .secrect = secrect # 传入记录token和过期时间的文件名 self .file_name = file_name def build_timestamp( self ,interval): # 传入时间间隔,得到指定interval后的时间 格式为"2015-07-01 14:41:40" now = datetime.datetime.now() delta = datetime.timedelta(seconds = interval) now_interval = now + delta return now_interval.strftime( '%Y-%m-%d %H:%M:%S' ) def check_token_expires( self ): # 判断token是否过期 with open ( self .file_name, 'r' ) as f: line = f.read() if len (line)> 0 : expires_time = line.split( "," )[ 1 ] token = line.split( "," )[ 0 ] else : return " "," true" curr_time = time.strftime( '%Y-%m-%d %H:%M:%S' ) # 如果过期返回false if curr_time>expires_time: return token, "false" # 没过期返回true else : return token, "true" def getToken( self ): # 获取accessToken url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=' + self .appid + "&secret=" + self .secrect try : f = urllib2.urlopen(url) s = f.read() # 读取json数据 j = json.loads(s) j.keys() # 从json中获取token token = j[ 'access_token' ] # 从json中获取过期时长 expires_in = j[ 'expires_in' ] # 将得到的过期时长减去300秒然后与当前时间做相加计算然后写入到过期文件 write_expires = self .build_timestamp( int (expires_in - 300 )) content = "%s,%s" % (token,write_expires) with open ( self .file_name, 'w' ) as f: f.write(content) except Exception,e: print e return token def post_data( self ,url,para_dct): """触发post请求微信发送最终的模板消息""" para_data = para_dct f = urllib2.urlopen(url,para_data) content = f.read() return content def do_push( self ,touser,template_id,url,topcolor,data): '''推送消息 ''' #获取存入到过期文件中的token,同时判断是否过期 token,if_token_expires = self .check_token_expires() #如果过期了就重新获取token if if_token_expires = = "false" : token = self .getToken() # 背景色设置,貌似不生效 if topcolor.strip() = = '': topcolor = "#7B68EE" #最红post的求情数据 dict_arr = { 'touser' : touser, 'template_id' :template_id, 'url' :url, 'topcolor' :topcolor, 'data' :data} json_template = json.dumps(dict_arr) requst_url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=" + token content = self .post_data(requst_url,json_template) #读取json数据 j = json.loads(content) j.keys() errcode = j[ 'errcode' ] errmsg = j[ 'errmsg' ] #print errmsg if __name__ = = "__main__" : def alarm(title,hostname,timestap,level,message,state,tail): """报警函数""" color = "#FF0000" data = { "first" :{ "value" :title}, "keyword1" :{ "value" :hostname, "color" :color}, "keyword2" :{ "value" :timestap, "color" :color}, "keyword3" :{ "value" :level, "color" :color}, "keyword4" :{ "value" :message, "color" :color}, "keyword5" :{ "value" :state, "color" :color}, "remark" :{ "value" :tail}} return data def recover(title,message,alarm_time,recover_time,continue_time,tail): """恢复函数""" re_color = "#228B22" data = { "first" :{ "value" :title}, "content" :{ "value" :message, "color" :re_color}, "occurtime" :{ "value" :alarm_time, "color" :re_color}, "recovertime" :{ "value" :recover_time, "color" :re_color}, "lasttime" :{ "value" :continue_time, "color" :re_color}, "remark" :{ "value" :tail}} return data # data=alarm("测试的报警消息","8.8.8.8",time.ctime(),"最高级别","然并卵","挂了","大傻路赶紧处理") # 实例化类 webchart = WechatPush(appid,secrect,file_name) url = "http://www.xiaoniu88.com" print len (sys.argv) # 发送报警消息 if len (sys.argv) = = 9 : title = sys.argv[ 1 ] hostname = sys.argv[ 2 ] timestap = sys.argv[ 3 ] level = sys.argv[ 4 ] message = sys.argv[ 5 ] state = sys.argv[ 6 ] tail = sys.argv[ 7 ] print "sys.argv[1]" + sys.argv[ 1 ] print "sys.argv[2]" + sys.argv[ 2 ] print "sys.argv[3]" + sys.argv[ 3 ] print "sys.argv[4]" + sys.argv[ 4 ] print "sys.argv[5]" + sys.argv[ 5 ] print "sys.argv[6]" + sys.argv[ 6 ] print "sys.argv[7]" + sys.argv[ 7 ] print "sys.argv[8]" + sys.argv[ 8 ] with open ( "/etc/zabbix/moniter_scripts/test.log" , 'a+' ) as f: f.write(title + "\n" ) f.write(hostname + "\n" ) f.write(timestap + "\n" ) f.write(level + "\n" ) f.write(message + "\n" ) f.write(state + "\n" ) f.write(tail + "\n" ) f.write( "%s_%s" % ( "group" ,sys.argv[ 8 ]) + "\n" ) data = alarm(title,hostname,timestap,level,message,state,tail) group_name = "%s_%s" % ( "group" ,sys.argv[ 8 ]) for touser in eval ( "%s_%s" % ( "group" ,sys.argv[ 8 ])): webchart.do_push(touser,alarm_id,url,"",data) for touser in group_super: webchart.do_push(touser,alarm_id,url,"",data) #发送恢复消息 elif len (sys.argv) = = 8 : title = sys.argv[ 1 ] message = sys.argv[ 2 ] alarm_time = sys.argv[ 3 ] recover_time = sys.argv[ 4 ] continue_time = sys.argv[ 5 ] tail = sys.argv[ 6 ] print "sys.argv[1]" + sys.argv[ 1 ] print "sys.argv[2]" + sys.argv[ 2 ] print "sys.argv[3]" + sys.argv[ 3 ] print "sys.argv[4]" + sys.argv[ 4 ] print "sys.argv[5]" + sys.argv[ 5 ] print "sys.argv[6]" + sys.argv[ 6 ] print "sys.argv[7]" + sys.argv[ 7 ] data = recover(title,message,alarm_time,recover_time,continue_time,tail) for touser in eval ( "%s_%s" % ( "group" ,sys.argv[ 7 ])): webchart.do_push(touser,recover_id,url,"",data) for touser in group_super: webchart.do_push(touser,recover_id,url,"",data) |
zabbix使用微信报警python脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
#!/usr/bin/python # coding: utf-8 #python2将zabbix报警信息发送到微信。 #by linwangyi #2016-01-18 import urllib,urllib2 import json import sys def gettoken(corpid,corpsecret): gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corpid + '&corpsecret=' + corpsecret try : token_file = urllib2.urlopen(gettoken_url) except urllib2.HTTPError as e: print e.code print e.read().decode( "utf8" ) sys.exit() token_data = token_file.read().decode( 'utf-8' ) token_json = json.loads(token_data) token_json.keys() token = token_json[ 'access_token' ] return token def senddata(access_token,user,content): send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token send_values = { "touser" :user, #企业号中的用户帐号,在zabbix用户Media中配置,如果配置不正常,将按部门发送。 "toparty" : "1" , #企业号中的部门id "msgtype" : "text" , #企业号中的应用id,消息类型。 "agentid" : "1" , "text" :{ "content" :content }, "safe" : "0" } send_data = json.dumps(send_values, ensure_ascii = False ) send_request = urllib2.Request(send_url, send_data) response = json.loads(urllib2.urlopen(send_request).read()) print str (response) if __name__ = = '__main__' : user = str (sys.argv[ 1 ]) #zabbix传过来的第一个参数 content = str (sys.argv[ 3 ]) #zabbix传过来的第三个参数 corpid = 'XXXX' #CorpID是企业号的标识 corpsecret = 'XXXXX' #corpsecretSecret是管理组凭证密钥 accesstoken = gettoken(corpid,corpsecret) senddata(accesstoken,user,content) #!/usr/bin/python3 # coding: utf-8 #python3将zabbix报警信息发送到微信。 #by http://sunday208.blog.51cto.com/ #2016-01-18 import urllib.request import json import sys def gettoken(corp_id,corp_secret): gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret try : token_file = urllib.request.urlopen(gettoken_url) except urllib.error.HTTPError as e: print (e.code) print (e.read().decode( "utf8" )) token_data = token_file.read().decode( 'utf-8' ) token_json = json.loads(token_data) token_json.keys() token = token_json[ 'access_token' ] return token def senddata(access_token,user,content): try : send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token send_values = { "touser" :user, #企业号中的用户帐号,在zabbix用户Media中配置,如果配置不正常,将按部门发送。 "toparty" : "1" , #企业号中的部门id "msgtype" : "text" , "agentid" : "1" , #企业号中的应用id,消息类型。 "text" :{ "content" :content }, "safe" : "0" } send_data = json.dumps(send_values, ensure_ascii = False ).encode(encoding = 'UTF8' ) send_request = urllib.request.Request(send_url, send_data) response = urllib.request.urlopen(send_request) msg = response.read() print ( "returned value : " + str (msg)) except : print ( "returned value : " + str (msg)) default_encoding = 'utf-8' if sys.getdefaultencoding() ! = default_encoding: reload (sys) sys.setdefaultencoding(default_encoding) user = str (sys.argv[ 1 ]) #zabbix传过来的第一个参数 content = str (sys.argv[ 3 ]) #zabbix传过来的第三个参数 corpid = 'XXXX' #CorpID是企业号的标识 corpsecret = 'XXXXX' #corpsecretSecret是管理组凭证密钥 accesstoken = gettoken(corpid,corpsecret) senddata(accesstoken,user,content) |
总结
以上所述是小编给大家介绍的python利用微信公众号实现报警功能 ,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!
原文链接:https://www.cnblogs.com/huohuohuo1/archive/2018/06/09/9161592.html