上大学的时候,对微信公众号开发浅尝辄止的玩了一下,感觉还是挺有意思的。后来服务器到期了,也就搁置了。由于发布web程序,使用PHP很顺手,就使用了PHP作为开发语言。但是其实微信公众号的开发和语言关联并不大,流程,原理上都是一致的。
快要做毕设了,想着到时候应该会部署一些代码到服务器上,进行长期的系统构建。所以趁着还是学生,就买了阿里云的学生机。买了之后,就想着玩点什么,于是微信公众号的开发,就又提上了日程。但是这次,我不打算使用PHP了,感觉局限性相对于Python而言,稍微有点大。
使用Python的话,可以灵活的部署一些爬虫类程序,和用户交互起来也会比较方便。可拓展性感觉也比较的高,于是就选它了。
服务器配置这部分属于是比较基础的,不太明白的可以看看我之前的那个博客,还算是比较的详细。今天就只是对核心代码做下介绍好了。
项目目录
1
2
3
|
root@aliyun: / var / www / html / wx / py # ls *.py api.py dispatcher.py robot.py root@aliyun: / var / www / html / wx / py # |
api.py
这个文件相当于是一个关卡,涉及token的验证,和服务的支持。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# -*- coding:utf-8 -*- #中文编码 import sys reload (sys) # 不加这部分处理中文还是会出问题 sys.setdefaultencoding( 'utf-8' ) import time from flask import Flask, request, make_response import hashlib import json import xml.etree.ElementTree as ET from dispatcher import * app = Flask(__name__) app.debug = True @app .route( '/' ) # 默认网址 def index(): return 'Index Page' @app .route( '/wx' , methods = [ 'GET' , 'POST' ]) def wechat_auth(): # 处理微信请求的处理函数,get方法用于认证,post方法取得微信转发的数据 if request.method = = 'GET' : token = '你自己设置好的token' data = request.args signature = data.get( 'signature' , '') timestamp = data.get( 'timestamp' , '') nonce = data.get( 'nonce' , '') echostr = data.get( 'echostr' , '') s = [timestamp, nonce, token] s.sort() s = ''.join(s) if (hashlib.sha1(s).hexdigest() = = signature): return make_response(echostr) else : rec = request.stream.read() # 接收消息 dispatcher = MsgDispatcher(rec) data = dispatcher.dispatch() with open ( "./debug.log" , "a" ) as file : file .write(data) file .close() response = make_response(data) response.content_type = 'application/xml' return response if __name__ = = '__main__' : app.run(host = "0.0.0.0" , port = 80 ) |
dispatcher.py
这个文件是整个服务的核心,用于识别用户发来的消息类型,然后交给不同的handler来处理,并将运行的结果反馈给前台,发送给用户。消息类型这块,在微信的开发文档上有详细的介绍,因此这里就不再过多的赘述了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
#! /usr/bin python # coding: utf8 import sys reload (sys) sys.setdefaultencoding( "utf8" ) import time import json import xml.etree.ElementTree as ET from robot import * class MsgParser( object ): """ 用于解析从微信公众平台传递过来的参数,并进行解析 """ def __init__( self , data): self .data = data def parse( self ): self .et = ET.fromstring( self .data) self .user = self .et.find( "FromUserName" ).text self .master = self .et.find( "ToUserName" ).text self .msgtype = self .et.find( "MsgType" ).text # 纯文字信息字段 self .content = self .et.find( "Content" ).text if self .et.find( "Content" ) is not None else "" # 语音信息字段 self .recognition = self .et.find( "Recognition" ).text if self .et.find( "Recognition" ) is not None else "" self . format = self .et.find( "Format" ).text if self .et.find( "Format" ) is not None else "" self .msgid = self .et.find( "MsgId" ).text if self .et.find( "MsgId" ) is not None else "" # 图片 self .picurl = self .et.find( "PicUrl" ).text if self .et.find( "PicUrl" ) is not None else "" self .mediaid = self .et.find( "MediaId" ).text if self .et.find( "MediaId" ) is not None else "" # 事件 self .event = self .et.find( "Event" ).text if self .et.find( "Event" ) is not None else "" return self class MsgDispatcher( object ): """ 根据消息的类型,获取不同的处理返回值 """ def __init__( self , data): parser = MsgParser(data).parse() self .msg = parser self .handler = MsgHandler(parser) def dispatch( self ): self .result = "" # 统一的公众号出口数据 if self .msg.msgtype = = "text" : self .result = self .handler.textHandle() elif self .msg.msgtype = = "voice" : self .result = self .handler.voiceHandle() elif self .msg.msgtype = = 'image' : self .result = self .handler.imageHandle() elif self .msg.msgtype = = 'video' : self .result = self .handler.videoHandle() elif self .msg.msgtype = = 'shortvideo' : self .result = self .handler.shortVideoHandle() elif self .msg.msgtype = = 'location' : self .result = self .handler.locationHandle() elif self .msg.msgtype = = 'link' : self .result = self .handler.linkHandle() elif self .msg.msgtype = = 'event' : self .result = self .handler.eventHandle() return self .result class MsgHandler( object ): """ 针对type不同,转交给不同的处理函数。直接处理即可 """ def __init__( self , msg): self .msg = msg self .time = int (time.time()) def textHandle( self , user = ' ', master=' ', time=' ', content=' '): template = """ <xml> <ToUserName><![CDATA[{}]]></ToUserName> <FromUserName><![CDATA[{}]]></FromUserName> <CreateTime>{}</CreateTime> <MsgType><![CDATA[text]]></MsgType> <Content><![CDATA[{}]]></Content> </xml> """ # 对用户发过来的数据进行解析,并执行不同的路径 try : response = get_response_by_keyword( self .msg.content) if response[ 'type' ] = = "image" : result = self .imageHandle( self .msg.user, self .msg.master, self .time, response[ 'content' ]) elif response[ 'type' ] = = "music" : data = response[ 'content' ] result = self .musicHandle(data[ 'title' ], data[ 'description' ], data[ 'url' ], data[ 'hqurl' ]) elif response[ 'type' ] = = "news" : items = response[ 'content' ] result = self .newsHandle(items) # 这里还可以添加更多的拓展内容 else : response = get_turing_response( self .msg.content) result = template. format ( self .msg.user, self .msg.master, self .time, response) #with open("./debug.log", 'a') as f: # f.write(response['content'] + '~~' + result) # f.close() except Exception as e: with open ( "./debug.log" , 'a' ) as f: f.write( "text handler:" + str (e.message)) f.close() return result def musicHandle( self , title = ' ', description=' ', url=' ', hqurl=' '): template = """ <xml> <ToUserName><![CDATA[{}]]></ToUserName> <FromUserName><![CDATA[{}]]></FromUserName> <CreateTime>{}</CreateTime> <MsgType><![CDATA[music]]></MsgType> <Music> <Title><![CDATA[{}]]></Title> <Description><![CDATA[{}]]></Description> <MusicUrl><![CDATA[{}]]></MusicUrl> <HQMusicUrl><![CDATA[{}]]></HQMusicUrl> </Music> <FuncFlag>0</FuncFlag> </xml> """ response = template. format ( self .msg.user, self .msg.master, self .time, title, description, url, hqurl) return response def voiceHandle( self ): response = get_turing_response( self .msg.recognition) result = self .textHandle( self .msg.user, self .msg.master, self .time, response) return result def imageHandle( self , user = ' ', master=' ', time=' ', mediaid=' '): template = """ <xml> <ToUserName><![CDATA[{}]]></ToUserName> <FromUserName><![CDATA[{}]]></FromUserName> <CreateTime>{}</CreateTime> <MsgType><![CDATA[image]]></MsgType> <Image> <MediaId><![CDATA[{}]]></MediaId> </Image> </xml> """ if mediaid = = '': response = self .msg.mediaid else : response = mediaid result = template. format ( self .msg.user, self .msg.master, self .time, response) return result def videoHandle( self ): return 'video' def shortVideoHandle( self ): return 'shortvideo' def locationHandle( self ): return 'location' def linkHandle( self ): return 'link' def eventHandle( self ): return 'event' def newsHandle( self , items): # 图文消息这块真的好多坑,尤其是<![CDATA[]]>中间不可以有空格,可怕极了 articlestr = """ <item> <Title><![CDATA[{}]]></Title> <Description><![CDATA[{}]]></Description> <PicUrl><![CDATA[{}]]></PicUrl> <Url><![CDATA[{}]]></Url> </item> """ itemstr = "" for item in items: itemstr + = str (articlestr. format (item[ 'title' ], item[ 'description' ], item[ 'picurl' ], item[ 'url' ])) template = """ <xml> <ToUserName><![CDATA[{}]]></ToUserName> <FromUserName><![CDATA[{}]]></FromUserName> <CreateTime>{}</CreateTime> <MsgType><![CDATA[news]]></MsgType> <ArticleCount>{}</ArticleCount> <Articles>{}</Articles> </xml> """ result = template. format ( self .msg.user, self .msg.master, self .time, len (items), itemstr) return result |
robot.py
这个文件属于那种画龙点睛性质的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
#!/usr/bin python #coding: utf8 import requests import json def get_turing_response(req = ""): url = "http://www.tuling123.com/openapi/api" secretcode = "嘿嘿,这个就不说啦" response = requests.post(url = url, json = { "key" : secretcode, "info" : req, "userid" : 12345678 }) return json.loads(response.text)[ 'text' ] if response.status_code = = 200 else "" def get_qingyunke_response(req = ""): url = "http://api.qingyunke.com/api.php?key=free&appid=0&msg={}" . format (req) response = requests.get(url = url) return json.loads(response.text)[ 'content' ] if response.status_code = = 200 else "" # 简单做下。后面慢慢来 def get_response_by_keyword(keyword): if '团建' in keyword: result = { "type" : "image" , "content" : "3s9Dh5rYdP9QruoJ_M6tIYDnxLLdsQNCMxkY0L2FMi6HhMlNPlkA1-50xaE_imL7" } elif 'music' in keyword or '音乐' in keyword: musicurl = 'http://204.11.1.34:9999/dl.stream.qqmusic.qq.com/C400001oO7TM2DE1OE.m4a?vkey=3DFC73D67AF14C36FD1128A7ABB7247D421A482EBEDA17DE43FF0F68420032B5A2D6818E364CB0BD4EAAD44E3E6DA00F5632859BEB687344&guid=5024663952&uin=1064319632&fromtag=66' result = { "type" : "music" , "content" : { "title" : "80000" , "description" : "有个男歌手姓巴,他的女朋友姓万,于是这首歌叫80000" , "url" : musicurl, "hqurl" : musicurl}} elif '关于' in keyword: items = [{ "title" : "关于我" , "description" : "喜欢瞎搞一些脚本" , "picurl" : "https://avatars1.githubusercontent.com/u/12973402?s=460&v=4" , "url" : "https://github.com/guoruibiao" }, { "title" : "我的博客" , "description" : "收集到的,瞎写的一些博客" , "picurl" : "http://avatar.csdn.net/0/8/F/1_marksinoberg.jpg" , "url" : "http://blog.csdn.net/marksinoberg" }, { "title" : "薛定谔的:dog:" , "description" : "副标题有点奇怪,不知道要怎么设置比较好" , "picurl" : "https://www.baidu.com/img/bd_logo1.png" , "url" : "http://www.baidu.com" } ] result = { "type" : "news" , "content" : items} else : result = { "type" : "text" , "content" : "可以自由进行拓展" } return result |
其实这看起来是一个文件,其实可以拓展为很多的方面。
如果想通过公众号来监控服务器的运行情况,就可以添加一个对服务器负载的监控的脚本;
如果想做一些爬虫,每天抓取一些高质量的文章,然后通过公众号进行展示。
不方便使用电脑的情况下,让公众号调用一些命令也可以算是曲线救国的一种方式。
等等吧,其实有多少想法,就可以用Python进行事先。然后通过公众号这个平台进行展示。
易错点
在从PHP重构为Python的过程中,我其实也是遇到了一些坑的。下面总结下,如果恰好能帮助到遇到同样问题的你,那我这篇文章也算是没有白写了。
微信公众号的开发,其实关键就在于理解这个工作的模式。大致有这么两条路。
用户把消息发送到微信公众平台上,平台把信息拼接组装成XML发到我们自己的服务器。(通过一系列的认证,校验,让平台知道,我们的服务是合法的),然后服务器将XML进行解析,处理。
我们的服务器解析处理完成后,将数据再次拼接组装成XML,发给微信公众平台,平台帮我们把数据反馈给对应的用户。
这样,一个交互就算是完成了。在这个过程中,有下面几个容易出错的地方。
token校验: token的校验是一个get方式的请求。通过代码我们也可以看到,就是对singature的校验,具体看代码就明白了。
XML数据的解析,对于不同的消息,记得使用不同的格式。其中很容易出错的就是格式不规范。 <!CDATA[[]]> 中括号之间最好不要有空格,不然定位起错误还是很麻烦的。
服务的稳定性。这里用的web框架是flask,小巧精良。但是对并发的支持性不是很好,对此可以使用uwsgi和Nginx来实现一个更稳定的服务。如果就是打算自己玩一玩,通过命令行启用(如python api.py)就不是很保险了,因为很有可能会因为用户的一个奇怪的输入导致整个服务垮掉,建议使用nohup的方式,来在一定程度上保证服务的质量。
结果演示
目前这个公众号支持文字,语音,图片,图文等消息类型。示例如下。
总结
在将公众号从PHP重构为Python的过程中,遇到了一些问题,然后通过不断的摸索,慢慢的也把问题解决了。其实有时候就是这样,只有不断的发现问题,才能不断的提升自己。
这里其实并没有深入的去完善,重构后的微信公众号其实能做的还有很多,毕竟就看敢不敢想嘛。好了,就先扯这么多了,后面如果有好的思路和实现,再回来更新好了。
以上所述是小编给大家介绍的Python微信公众号开发平台,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
原文链接:https://juejin.im/post/5a68966951882573256bfb8e