1、为什么要写代码实现接口自动化
大家知道很多接口测试工具可以实现对接口的测试,如postman、jmeter、fiddler等等,而且使用方便,那么为什么还要写代码实现接口自动化呢?工具虽然方便,但也不足之处:
测试数据不可控制
接口测试本质是对数据的测试,调用接口,输入一些数据,随后,接口返回一些数据。验证接口返回数据的正确性。在用工具运行测试用例之前不得不手动向数据库中插入测试数据。这样我们的接口测试是不是就没有那么“自动化了”。
无法测试加密接口
这是接口测试工具的一大硬伤,如我们前面开发的接口用工具测试完全没有问题,但遇到需要对接口参 数进行加密/解密的接口,例如 md5、base64、aes 等常见加密方式。本书第十一章会对加密接口进行介绍。 又或者接口的参数需要使用时间戳,也是工具很难模拟的。
扩展能力不足
当我们在享受工具所带来的便利的同时,往往也会受制于工具所带来的局限。例如,我想将测试结果生 成 hmtl 格式测试报告,我想将测试报告发送到指定邮箱。我想对接口测试做定时任务。我想对接口测试做持续集成。这些需求都是工具难以实现的。
2、接口自动化测试设计
接口测试调用过程可以用下图概括,增加了测试数据库
一般的 接口工具 测试过程:
1、接口工具调用被测系统的接口(传参 username="zhangsan")。
2、系统接口根据传参(username="zhangsan")向 正式数据库 中查询数据。
3、将查询结果组装成一定格式的数据,并返回给被调用者。
4、人工或通过工具的断言功能检查接口测试的正确性。
接口自动化测试项目,为了使接口测试对数据变得可控,测试过程如下:
1、接口测试项目先向 测试数据库 中插入测试数据(zhangsan 的个人信息)。
2、调用被测系统接口(传参 username="zhangsan")。
3、系统接口根据传参(username="zhangsan")向测试数据库中进行查询并得到 zhangsan 个人信息。
4、将查询结果组装成一定格式的数据,并返回给被调用者。
5、通过单元测试框架断言接口返回的数据(zhangsan 的个人信息),并生成测试报告。
为了使正式数据库的数据不被污染,建议使用独立的 测试数据库 。
2、requests库
requests 使用的是 urllib3,因此继承了它的所有特性。requests 支持 http 连接保持和连接池 ,支持 使用cookie保持会话 ,支持 文件上传 ,支持 自动确定响应内容的编码。 对request库的更详细的介绍可以看我之前接口测试基础的文章:
http://www.zzvips.com/article/120675.html
http://www.zzvips.com/article/106355.html
3、接口测试代码示例
下面以之前用 python+django 开发的用户签到系统为背景,展示接口测试的代码。
为什么开发接口?开发的接口主要给谁来用?
前端和后端分离是近年来 web 应用开发的一个发展趋势。这种模式将带来以下优势:
1、后端可以不用必须精通前端技术(html/javascript/css),只专注于数据的处理,对外提供 api 接口。
2、前端的专业性越来越高,通过 api 接口获取数据,从而专注于页面的设计。
3、前后端分离增加接口的应用范围,开发的接口可以应用到 web 页面上,也可以应用到移动 app 上。
在这种开发模式下,接口测试工作就会变得尤为重要了。
开发实现的接口代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# 添加发布会接口实现 def add_event(request): eid = request.post.get( 'eid' ,'') # 发布会id name = request.post.get( 'name' ,'') # 发布会标题 limit = request.post.get( 'limit' ,'') # 限制人数 status = request.post.get( 'status' ,'') # 状态 address = request.post.get( 'address' ,'') # 地址 start_time = request.post.get( 'start_time' ,'') # 发布会时间 if eid = = ' ' or name == ' ' or limit == ' ' or address == ' ' or start_time == ' ': return jsonresponse({ 'status' : 10021 , 'message' : 'parameter error' }) result = event.objects. filter ( id = eid) if result: return jsonresponse({ 'status' : 10022 , 'message' : 'event id already exists' }) result = event.objects. filter (name = name) if result: return jsonresponse({ 'status' : 10023 , 'message' : 'event name already exists' }) if status = = '': status = 1 try : event.objects.create( id = eid,name = name,limit = limit,address = address,status = int (status),start_time = start_time) except validationerror: error = 'start_time format error. it must be in yyyy-mm-dd hh:mm:ss format.' return jsonresponse({ 'status' : 10024 , 'message' :error}) return jsonresponse({ 'status' : 200 , 'message' : 'add event success' }) |
通过post请求接收发布会参数:发布会id、标题、人数、状态、地址和时间等参数。
首先,判断eid、name、limit、address、start_time等字段均不能为空,否则jsonresponse()返回相应的状态码和提示。jsonresponse()是一个非常有用的方法,它可以直接将字典转化成json格式返回到客户端。
接下来,判断发布会id是否存在,以及发布会名称(name)是否存在;如果存在将返回相应的状态码和 提示信息。
再接下来,判断发布会状态是否为空,如果为空,将状态设置为1(true)。
最后,将数据插入到 event 表,在插入的过程中如果日期格式错误,将抛出 validationerror 异常,接收 该异常并返回相应的状态和提示,否则,插入成功,返回状态码200和“add event success”的提示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# 发布会查询接口实现 def get_event_list(request): eid = request.get.get( "eid" , "") # 发布会id name = request.get.get( "name" , "") # 发布会名称 if eid = = ' ' and name == ' ': return jsonresponse({ 'status' : 10021 , 'message' : 'parameter error' }) if eid ! = '': event = {} try : result = event.objects.get( id = eid) except objectdoesnotexist: return jsonresponse({ 'status' : 10022 , 'message' : 'query result is empty' }) else : event[ 'eid' ] = result. id event[ 'name' ] = result.name event[ 'limit' ] = result.limit event[ 'status' ] = result.status event[ 'address' ] = result.address event[ 'start_time' ] = result.start_time return jsonresponse({ 'status' : 200 , 'message' : 'success' , 'data' :event}) if name ! = '': datas = [] results = event.objects. filter (name__contains = name) if results: for r in results: event = {} event[ 'eid' ] = r. id event[ 'name' ] = r.name event[ 'limit' ] = r.limit event[ 'status' ] = r.status event[ 'address' ] = r.address event[ 'start_time' ] = r.start_time datas.append(event) return jsonresponse({ 'status' : 200 , 'message' : 'success' , 'data' :datas}) else : return jsonresponse({ 'status' : 10022 , 'message' : 'query result is empty' }) |
通过get请求接收发布会id和name 参数。两个参数都是可选的。首先,判断当两个参数同时为空,接口返回状态码10021,参数错误。
如果发布会id不为空,优先通过id查询,因为id的唯一性,所以,查询结果只会有一条,将查询结果 以 key:value 对的方式存放到定义的event字典中,并将数据字典作为整个返回字典中data对应的值返回。
name查询为模糊查询,查询数据可能会有多条,返回的数据稍显复杂;首先将查询的每一条数据放到一 个字典event中,再把每一个字典再放到数组datas中,最后再将整个数组做为返回字典中data对应的值返回。
接口测试代码示例
1
2
3
4
5
6
7
8
9
10
11
12
|
#查询发布会接口测试代码 import requests url = "http://127.0.0.1:8000/api/get_event_list/" r = requests.get(url, params = { 'eid' : '1' }) result = r.json() print (result) assert result[ 'status' ] = = 200 assert result[ 'message' ] = = "success" assert result[ 'data' ][ 'name' ] = = "xx 产品发布会" assert result[ 'data' ][ 'address' ] = = "北京林匹克公园水立方" assert result[ 'data' ][ 'start_time' ] = = "2016-10-15t18:00:00" |
因为“发布会查询接口”是get类型,所以,通过requests库的get()方法调用,第一个参数为调用接口的url地址,params设置接口的参数,参数以字典形式组织。
json()方法可以将接口返回的json格式的数据转化为字典。
接下来就是通过 assert 语句对接字典中的数据进行断言。分别断言status、message 和data的相关数据等。
使用unittest单元测试框架开发接口测试用例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
#发布会查询接口测试代码 import unittest import requests class geteventlisttest(unittest.testcase): def setup( self ): self .base_url = "http://127.0.0.1:8000/api/get_event_list/" def test_get_event_list_eid_null( self ): ''' eid 参数为空 ''' r = requests.get( self .base_url, params = { 'eid' :''}) result = r.json() self .assertequal(result[ 'status' ], 10021 ) self .assertequal(result[ 'message' ], 'parameter error' ) def test_get_event_list_eid_error( self ): ''' eid=901 查询结果为空 ''' r = requests.get( self .base_url, params = { 'eid' : 901 }) result = r.json() self .assertequal(result[ 'status' ], 10022 ) self .assertequal(result[ 'message' ], 'query result is empty' ) def test_get_event_list_eid_success( self ): ''' 根据 eid 查询结果成功 ''' r = requests.get( self .base_url, params = { 'eid' : 1 }) result = r.json() self .assertequal(result[ 'status' ], 200 ) self .assertequal(result[ 'message' ], 'success' ) self .assertequal(result[ 'data' ][ 'name' ],u 'mx6发布会' ) self .assertequal(result[ 'data' ][ 'address' ],u '北京国家会议中心' ) def test_get_event_list_nam_result_null( self ): ''' 关键字‘abc'查询 ''' r = requests.get( self .base_url, params = { 'name' : 'abc' }) result = r.json() self .assertequal(result[ 'status' ], 10022 ) self .assertequal(result[ 'message' ], 'query result is empty' ) def test_get_event_list_name_find( self ): ''' 关键字‘发布会'模糊查询 ''' r = requests.get( self .base_url, params = { 'name' : '发布会' }) result = r.json() self .assertequal(result[ 'status' ], 200 ) self .assertequal(result[ 'message' ], 'success' ) self .assertequal(result[ 'data' ][ 0 ][ 'name' ],u 'mx6发布会' ) self .assertequal(result[ 'data' ][ 0 ][ 'address' ],u '北京国家会议中心' ) 49if __name__ = = '__main__' : unittest.main() |
unittest单元测试框架可以帮助 组织和运行接口测试用例。
4、接口自动化测试框架实现
关于接口自动化测试,unittest 已经帮我们做了大部分工作,接下来只需要 集成数据库操作 ,以及 htmltestrunner测试报告生成 扩展即可。
框架结构如下图:
pyrequests 框架:
db_fixture/: 初始化接口测试数据。
interface/: 用于编写接口自动化测试用例。
report/: 生成接口自动化测试报告。
db_config.ini : 数据库配置文件。
htmltestrunner.py unittest 单元测试框架扩展,生成 html 格式的测试报告。
run_tests.py : 执行所有接口测试用例。
4.1、数据库配置
首先,需要修改被测系统将数据库指向测试数据库。以 mysql数据库为例,针对 django 项目而言,修改.../guest/settings.py 文件。可以在系统测试环境单独创建一个测试库。 这样做的目的是让接口测试的数据不会清空或污染到功能测试库的数据。 其他框架开发的项目与django项目类似,这个工作一般由开发同学完成,我们测试同学更多关注的是测试框架的代码。
4.2、框架代码实现
4.2.1、首先,创 建数据库配置文件.../db_config.ini
4.2.2、接下来, 简单封装数据库操作,数据库表数据的插入和清除 ,.../db_fixture/ mysql_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
import pymysql.cursors import os import configparser as cparser # ======== reading db_config.ini setting =========== base_dir = str (os.path.dirname(os.path.dirname(__file__))) base_dir = base_dir.replace( '\\', ' / ') file_path = base_dir + "/db_config.ini" cf = cparser.configparser() cf.read(file_path) host = cf.get( "mysqlconf" , "host" ) port = cf.get( "mysqlconf" , "port" ) db = cf.get( "mysqlconf" , "db_name" ) user = cf.get( "mysqlconf" , "user" ) password = cf.get( "mysqlconf" , "password" ) # ======== mysql base operating =================== class db: def __init__( self ): try : # connect to the database self .connection = pymysql.connect(host = host, port = int (port), user = user, password = password, db = db, charset = 'utf8mb4' , cursorclass = pymysql.cursors.dictcursor) except pymysql.err.operationalerror as e: print ( "mysql error %d: %s" % (e.args[ 0 ], e.args[ 1 ])) # clear table data def clear( self , table_name): # real_sql = "truncate table " + table_name + ";" real_sql = "delete from " + table_name + ";" with self .connection.cursor() as cursor: cursor.execute( "set foreign_key_checks=0;" ) cursor.execute(real_sql) self .connection.commit() # insert sql statement def insert( self , table_name, table_data): for key in table_data: table_data[key] = "'"+str(table_data[key])+"'" key = ',' .join(table_data.keys()) value = ',' .join(table_data.values()) real_sql = "insert into " + table_name + " (" + key + ") values (" + value + ")" #print(real_sql) with self .connection.cursor() as cursor: cursor.execute(real_sql) self .connection.commit() # close database def close( self ): self .connection.close() # init data def init_data( self , datas): for table, data in datas.items(): self .clear(table) for d in data: self .insert(table, d) self .close() if __name__ = = '__main__' : db = db() table_name = "sign_event" data = { 'id' : 1 , 'name' : '红米' , '`limit`' : 2000 , 'status' : 1 , 'address' : '北京会展中心' , 'start_time' : '2016-08-20 00:25:42' } table_name2 = "sign_guest" data2 = { 'realname' : 'alen' , 'phone' : 12312341234 , 'email' : 'alen@mail.com' , 'sign' : 0 , 'event_id' : 1 } db.clear(table_name) db.insert(table_name, data) db.close() |
首先,读取 db_config.ini 配置文件。 创建 db 类,__init__()方法初始化,通过 pymysql.connect()连接数据库。
因为这里只用到数据库表的清除和插入,所以只创建 clear()和 insert()两个方法。其中,insert()方法对数 据的插入做了简单的格式转化,可将字典转化成 sql 插入语句,这样格式转化了方便了数据库表数据的创建。
最后,通过 close()方法用于关闭数据库连接。
4.2.3、接下来接下来 创建测试数据 ,.../db_fixture/ test_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
import sys sys.path.append( '../db_fixture' ) try : from mysql_db import db except importerror: from .mysql_db import db # create data datas = { 'sign_event' :[ { 'id' : 1 , 'name' : '红米pro发布会' , '`limit`' : 2000 , 'status' : 1 , 'address' : '北京会展中心' , 'start_time' : '2017-08-20 14:00:00' }, { 'id' : 2 , 'name' : '可参加人数为0' , '`limit`' : 0 , 'status' : 1 , 'address' : '北京会展中心' , 'start_time' : '2017-08-20 14:00:00' }, { 'id' : 3 , 'name' : '当前状态为0关闭' , '`limit`' : 2000 , 'status' : 0 , 'address' : '北京会展中心' , 'start_time' : '2017-08-20 14:00:00' }, { 'id' : 4 , 'name' : '发布会已结束' , '`limit`' : 2000 , 'status' : 1 , 'address' : '北京会展中心' , 'start_time' : '2001-08-20 14:00:00' }, { 'id' : 5 , 'name' : '小米5发布会' , '`limit`' : 2000 , 'status' : 1 , 'address' : '北京国家会议中心' , 'start_time' : '2017-08-20 14:00:00' }, ], 'sign_guest' :[ { 'id' : 1 , 'realname' : 'alen' , 'phone' : 13511001100 , 'email' : 'alen@mail.com' , 'sign' : 0 , 'event_id' : 1 }, { 'id' : 2 , 'realname' : 'has sign' , 'phone' : 13511001101 , 'email' : 'sign@mail.com' , 'sign' : 1 , 'event_id' : 1 }, { 'id' : 3 , 'realname' : 'tom' , 'phone' : 13511001102 , 'email' : 'tom@mail.com' , 'sign' : 0 , 'event_id' : 5 }, ], } # inster table datas def init_data(): db().init_data(datas) if __name__ = = '__main__' : init_data() |
init_data()函数用于读取 datas 字典中的数据,调用 db 类中的 clear()方法清除数据库,然后,调用 insert() 方法插入表数据。
4.2.4、编写 接口测试用例 。创建添加发布会接口测试文件.../interface/ add_event_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
import unittest import requests import os, sys parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert( 0 , parentdir) from db_fixture import test_data class addeventtest(unittest.testcase): ''' 添加发布会 ''' def setup( self ): self .base_url = "http://127.0.0.1:8000/api/add_event/" def teardown( self ): print ( self .result) def test_add_event_all_null( self ): ''' 所有参数为空 ''' payload = { 'eid' :' ',' ':' ',' limit ':' ',' address ':"",' start_time ':' '} r = requests.post( self .base_url, data = payload) self .result = r.json() self .assertequal( self .result[ 'status' ], 10021 ) self .assertequal( self .result[ 'message' ], 'parameter error' ) def test_add_event_eid_exist( self ): ''' id已经存在 ''' payload = { 'eid' : 1 , 'name' : '一加4发布会' , 'limit' : 2000 , 'address' : "深圳宝体" , 'start_time' : '2017' } r = requests.post( self .base_url, data = payload) self .result = r.json() self .assertequal( self .result[ 'status' ], 10022 ) self .assertequal( self .result[ 'message' ], 'event id already exists' ) def test_add_event_name_exist( self ): ''' 名称已经存在 ''' payload = { 'eid' : 11 , 'name' : '红米pro发布会' , 'limit' : 2000 , 'address' : "深圳宝体" , 'start_time' : '2017' } r = requests.post( self .base_url,data = payload) self .result = r.json() self .assertequal( self .result[ 'status' ], 10023 ) self .assertequal( self .result[ 'message' ], 'event name already exists' ) def test_add_event_data_type_error( self ): ''' 日期格式错误 ''' payload = { 'eid' : 11 , 'name' : '一加4手机发布会' , 'limit' : 2000 , 'address' : "深圳宝体" , 'start_time' : '2017' } r = requests.post( self .base_url,data = payload) self .result = r.json() self .assertequal( self .result[ 'status' ], 10024 ) self .assertin( 'start_time format error.' , self .result[ 'message' ]) def test_add_event_success( self ): ''' 添加成功 ''' payload = { 'eid' : 11 , 'name' : '一加4手机发布会' , 'limit' : 2000 , 'address' : "深圳宝体" , 'start_time' : '2017-05-10 12:00:00' } r = requests.post( self .base_url,data = payload) self .result = r.json() self .assertequal( self .result[ 'status' ], 200 ) self .assertequal( self .result[ 'message' ], 'add event success' ) if __name__ = = '__main__' : test_data.init_data() # 初始化接口测试数据 unittest.main() |
在测试接口之前,调用test_data.py文件中的init_data()方法初始化数据库中的测试数据。
创建addeventtest测试类继承 unittest.testcase 类,通过创建测试用例,调用相关接口,并验证接口返回 的数据。
4.2.5、创建 run_tests.py 文件
当开发的接口达到一定数量后,就需要考虑 分文件分目录 的来 划分 接口测试用例,如何批量的执行不同文件目录下的用例呢?unittest单元测试框架提供的 discover() 方法可以帮助我们做到这一点。并使用 htmltestrunner 扩展生成 html 格式的测试报告。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import time, sys sys.path.append( './interface' ) sys.path.append( './db_fixture' ) from htmltestrunner import htmltestrunner import unittest from db_fixture import test_data # 指定测试用例为当前文件夹下的 interface 目录 test_dir = './interface' discover = unittest.defaulttestloader.discover(test_dir, pattern = '*_test.py' ) if __name__ = = "__main__" : test_data.init_data() # 初始化接口测试数据 now = time.strftime( "%y-%m-%d %h_%m_%s" ) filename = './report/' + now + '_result.html' fp = open (filename, 'wb' ) runner = htmltestrunner(stream = fp, title = 'guest manage system interface test report' , description = 'implementation example with: ' ) runner.run(discover) fp.close() |
首先,通过调用test_data.py文件中的init_data()函数来初始化接口测试数据。
使用unittest框架所提供的discover()方法,查找 interface/ 目录下,所有匹配*_test.py 的测试文件(*星 号匹配任意字符)。
htmltestrunner 为unittest单元测试框架的扩展,利用它所提供的htmltestrunner()类来替换unittest单元测试框架的texttestrunner()类,从而生成html格式的测试报告。
遗憾的是htmltestrunner并不支持python3.x,大家可以在网上找到适用于python3.x的htmltestrunner.py文件,使用在自己的接口自动化工程中。
通过 time 的 strftime()方法获取当前时间,并且转化成一定的时间格式。作为测试报告的名称。这样做目的是是为了避免因为生成的报告的名称重名而造成报告的覆盖。最终,将测试报告存放于report/目录下面。如下图,一张完整的接口自动化测试报告。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/ailiailan/p/8535293.html