本文实例为大家分享了Python threading模块对单个接口进行并发测试的具体代码,供大家参考,具体内容如下
本文知识点
通过在threading.Thread继承类中重写run()方法实现定制输出结果
代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
import requests import threading import sys, io # 解决console显示乱码的编码问题 sys.stdout = io.TextIOWrapper(sys.stdout. buffer , encoding = 'utf-8' ) class Mythread(threading.Thread): """This class customizes the output thu overriding the run() method""" def __init__( self , obj): super (Mythread, self ).__init__() self .obj = obj def run( self ): ret = self .obj.test_search_tags_movie() print ( 'result--%s:\n%s' % ( self .getName(), ret)) class Douban( object ): """A class containing interface test method of Douban object""" def __init__( self ): self .host = 'movie.douban.com' self .headers = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:61.0) Gecko/20100101 Firefox/61.0' , 'Referer' : 'https://movie.douban.com/' , } def get_response( self , url, data): resp = requests.post(url = url, data = data, headers = self .headers).content.decode( 'utf-8' ) return resp def test_search_tags_movie( self ): method = 'search_tags' url = 'https://%s/j/%s' % ( self .host, method) post_data = { 'type' : 'movie' , 'source' : 'index' } resp = self .get_response(url = url, data = post_data) return resp if __name__ = = '__main__' : douban = Douban() thds = [] for i in range ( 9 ): thd = Mythread(douban) thd.start() thds.append(thd) for thd in thds: thd.join() |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/lylfv/article/details/82043487