很多朋友对异步编程都处于“听说很强大”的认知状态。鲜有在生产项目中使用它。而使用它的同学,则大多数都停留在知道如何使用 Tornado、Twisted、Gevent 这类异步框架上,出现各种古怪的问题难以解决。而且使用了异步框架的部分同学,由于用法不对,感觉它并没牛逼到哪里去,所以很多同学做 Web 后端服务时还是采用 Flask、Django等传统的非异步框架。
从上两届 PyCon 技术大会看来,异步编程已经成了 Python 生态下一阶段的主旋律。如新兴的 Go、Rust、Elixir 等编程语言都将其支持异步和高并发作为主要“卖点”,技术变化趋势如此。Python 生态为不落人后,从2013年起由 Python 之父 Guido 亲自操刀主持了Tulip(asyncio)项目的开发。
异步io的好处在于避免的线程的开销和切换,而且我们都知道python其实是没有多线程的,只是通过底层线层锁实现的多线程。另一个好处在于避免io操作(包含网络传输)的堵塞时间。
asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。
asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。
- 对于异步io你需要知道的重点,要注意的是,await语法只能出现在通过async修饰的函数中,否则会报SyntaxError错误。而且await后面的对象需要是一个Awaitable,或者实现了相关的协议。
注意:
- 所有需要异步执行的函数,都需要asyncio中的轮训器去轮训执行,如果函数阻塞,轮训器就会去执行下一个函数。所以所有需要异步执行的函数都需要加入到这个轮训器中。
asyncio
asyncio的基本概念asyncio是在python3.4中被引进的异步IO库。你也可以通过python3.3的pypi来安装它。它相当的复杂,而且我不会介绍太多的细节。相反,我将会解释你需要知道些什么,以利用它来写异步的代码。简而言之,有两件事情你需要知道:协同程序和事件循环。协同程序像是方法,但是它们可以在代码中的特定点暂停和继续。当在等待一个IO(比如一个HTTP请求),同时执行另一个请求的时候,可以用来暂停一个协同程序。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import requests import time import asyncio # 创建一个异步函数 async def task_func(): await asyncio.sleep( 1 ) resp = requests.get( 'http://192.168.2.177:5002/' ) print ( '2222222' ,time.time(),resp.text) async def main(loop): loop = asyncio.get_event_loop() # 获取全局轮训器 task = loop.create_task(task_func()) # 在全局轮训器加入协成,只有加入全局轮训器才能被监督执行 await asyncio.sleep( 2 ) # 等待两秒为了不要立即执行event_loop.close(),项目中event_loop应该是永不停歇的 print ( '11111111111' ,time.time()) event_loop = asyncio.get_event_loop() try : event_loop.run_until_complete(main(event_loop)) finally : event_loop.close() # 当轮训器关闭以后,所有没有执行完成的协成将全部关闭 |
aiohttp服务器
下面是aiohttp作为服务器端的一个简单的demo。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
#!/usr/bin/env python3 import argparse from aiohttp import web import asyncio import base64 import logging import uvloop import time,datetime import json import requests asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) routes = web.RouteTableDef() @routes .get( '/' ) async def hello(request): return web.Response(text = "Hello, world" ) # 定义一个路由映射,接收网址参数,post方式 @routes .post( '/demo1/{name}' ) async def demo1(request): # 异步监听,只要一有握手就开始触发,此时网址参数中的name就已经知道了,但是前端可能还没有完全post完数据。 name = request.match_info.get( 'name' , "Anonymous" ) # 获取name print (datetime.datetime.now()) # 触发视图函数的时间 data = await request.post() # 等待post数据完成接收,只有接收完成才能进行后续操作.data['key']获取参数 print (datetime.datetime.now()) # 接收post数据完成的时间 logging.info( 'safety dect request start %s' % datetime.datetime.now()) result = { 'name' :name, 'key' :data[ 'key' ]} logging.info( 'safety dect request finish %s, %s' % (datetime.datetime.now(),json.dumps(result))) return web.json_response(result) # 定义一个路由映射,设计到io操作 @routes .post( '/demo2' ) async def demo2(request): # 异步监听,只要一有握手就开始触发,此时网址参数中的name就已经知道了,但是前端可能还没有完全post完数据。 data = await request.post() # 等待post数据完成接收,只有接收完成才能进行后续操作.data['key']获取参数 logging.info( 'safety dect request start %s' % datetime.datetime.now()) res = requests.post( 'http://www.baidu.com' ) # 网路id,会自动切换到其他协成上 logging.info( 'safety dect request finish %s' % res.test) return web.Response(text = "welcome" ) if __name__ = = '__main__' : logging.info( 'server start' ) app = web.Application() app.add_routes(routes) web.run_app(app,host = '0.0.0.0' ,port = 8080 ) logging.info( 'server close' ) |
aiohttp客户端
aiohttp的另一个主要作用是作为异步客户端,用来解决高并发请求的情况。比如现在我要模拟一个高并发请求来测试我的服务器负载情况。所以需要在python里模拟高并发。高并发可以有多种方式,比如多线程,但是由于python本质上是没有多线程的,通过底层线程锁实现的多线程。在模型高并发时,具有线程切换和线程开销的损耗。所以我们就可以使用多协成来实现高并发。
我们就可以使用aiohttp来模拟高并发客户端。demo如下,用来模拟多个客户端向指定服务器post图片。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# 异步并发客户端 class Asyncio_Client( object ): def __init__( self ): self .loop = asyncio.get_event_loop() self .tasks = [] # 将异步函数介入任务列表。后续参数直接传给异步函数 def set_task( self ,task_fun,num, * args): for i in range (num): self .tasks.append(task_fun( * args)) # 运行,获取返回结果 def run( self ): back = [] try : f = asyncio.wait( self .tasks) # 创建future self .loop.run_until_complete(f) # 等待future完成 finally : pass # 服务器高并发压力测试 class Test_Load(): total_time = 0 # 总耗时 total_payload = 0 # 总负载 total_num = 0 # 总并发数 all_time = [] # 创建一个异步任务,本地测试,所以post和接收几乎不损耗时间,可以等待完成,主要耗时为算法模块 async def task_func1( self ,session): begin = time.time() # print('开始发送:', begin) file = open ( self .image, 'rb' ) fsize = os.path.getsize( self .image) self .total_payload + = fsize / ( 1024 * 1024 ) data = { "image_id" : "2" , 'image' : file } r = await session.post( self .url,data = data) #只post,不接收 result = await r.json() self .total_num + = 1 # print(result) end = time.time() # print('接收完成:', end,',index=',self.total_num) self .all_time.append(end - begin) # 负载测试 def test_safety( self ): print ( 'test begin' ) async_client = Asyncio_Client() # 创建客户端 session = aiohttp.ClientSession() for i in range ( 10 ): # 执行10次 self .all_time = [] self .total_num = 0 self .total_payload = 0 self .image = 'xxxx.jpg' # 设置测试nayizhang print ( '测试图片:' , self .image) begin = time.time() async_client.set_task( self .task_func1, self .num,session) # 设置并发任务 async_client.run() # 执行任务 end = time.time() self .all_time.sort(reverse = True ) print ( self .all_time) print ( '并发数量(个):' , self .total_num) print ( '总耗时(s):' ,end - begin) print ( '最大时延(s):' , self .all_time[ 0 ]) print ( '最小时延(s):' , self .all_time[ len ( self .all_time) - 1 ]) print ( 'top-90%时延(s):' , self .all_time[ int ( len ( self .all_time) * 0.1 )]) print ( '平均耗时(s/个):' , sum ( self .all_time) / self .total_num) print ( '支持并发率(个/s):' , self .total_num / (end - begin)) print ( '总负载(MB):' , self .total_payload) print ( '吞吐率(MB/S):' , self .total_payload / (end - begin)) # 吞吐率受上行下行带宽,服务器带宽,服务器算法性能诸多影响 time.sleep( 3 ) session.close() print ( 'test finish' ) |
aiohttp服务器mvc(静态网页,模板,数据库,log)
aiohttp之添加静态资源路径
所谓静态资源,是指图片、js、css等文件。
以一个小项目来说明,下面是项目的目录结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
. ├── static │ ├── css │ │ ├── base.css │ │ ├── bootstrap.min.css │ │ └── font-awesome.min.css │ ├── font │ │ ├── FontAwesome.otf │ │ ├── fontawesome-webfont.eot │ │ ├── fontawesome-webfont.svg │ │ ├── fontawesome-webfont.ttf │ │ └── fontawesome-webfont.woff │ └── index.html └── proxy_server.py |
在proxy_server.py给2个静态文件目录static/css和static/font添加路由:
1
2
3
4
5
6
|
app.router.add_static( '/css/' , path= 'static/css' , name= 'css' ) app.router.add_static( '/font/' , path= 'static/font' , name= 'font' ) |
必需的2个参数:
prefix:是静态文件的url的前缀,以/开始,在浏览器地址栏上显示在网站host之后,也用于index.html静态页面进行引用
path:静态文件目录的路径,可以是相对路径,上面代码使用的static/css就是相对路径——相对于proxy_server.py所在路径。
加载的是index.html,下面是它引用静态资源的代码:
1
2
3
4
5
6
|
<!-- Bootstrap CSS --> < link href = "css/bootstrap.min.css" rel = "external nofollow" rel = "stylesheet" > <!-- Base CSS --> < link href = "css/base.css" rel = "external nofollow" rel = "stylesheet" > <!-- FA CSS --> < link href = "css/font-awesome.min.css" rel = "external nofollow" rel = "stylesheet" > |
添加font的路径是因为/font-awesome.min.css需要使用:
如果修改前缀:
1
2
3
|
app.router.add_static( '/css2017/' , path= 'static/css' , name= 'css' ) |
虽然目录本身还是css,但通过add_static已经将它视为了css2017,在文件和浏览器中要想链接到css下的文件,必须使用css2017/xx.css来链接。
此外,如果加上show_index=True,就可以显示静态资源的目录索引了——默认是禁止访问的:
1
2
3
4
|
app.router.add_static( '/css2017/' , path= 'static/css' , name= 'css' , show_index=True) |
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对服务器之家的支持。如果你想了解更多相关内容请查看下面相关链接
原文链接:https://blog.csdn.net/luanpeng825485697/article/details/81461277