前言
最近同学让我帮忙写一个测试网络的工具。由于工作上的事情,断断续续地拖了很久才给出一个相对完整的版本。其实,我Python用的比较少,所以基本都是边查资料边写程序。
程序的主要逻辑如下:
读取一个excel文件中的ip列表,然后使用多线程调用ping统计每个ip的网络参数,最后把结果输出到excel文件中。
代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
#! /usr/bin/env python # -*- coding: UTF-8 -*- # File: pingtest_test.py # Date: 2008-09-28 # Author: Michael Field # Modified By:intheworld # Date: 2017-4-17 import sys import os import getopt import commands import subprocess import re import time import threading import xlrd import xlwt TEST = [ '220.181.57.217' , '166.111.8.28' , '202.114.0.242' , '202.117.0.20' , '202.112.26.34' , '202.203.128.33' , '202.115.64.33' , '202.201.48.2' , '202.114.0.242' , '202.116.160.33' , '202.202.128.33' , ] RESULT = {} def usage(): print "USEAGE:" print "\t%s -n TEST|excel name [-t times of ping] [-c concurrent number(thread nums)]" % sys.argv[ 0 ] print "\t TEST为简单测试的IP列表" print "\t-t times 测试次数;默认为1000;" print "\t-c concurrent number 并行线程数目:默认为10" print "\t-h|-?, 帮助信息" print "\t 输出为当前目录文件ping_result.txt 和 ping_result.xls" print "for example:" print "\t./ping_test.py -n TEST -t 1 -c 10" def div_list(ls,n): if not isinstance (ls, list ) or not isinstance (n, int ): return [] ls_len = len (ls) print 'ls length = %s' % ls_len if n< = 0 or 0 = = ls_len: return [] if n > ls_len: return [] elif n = = ls_len: return [[i] for i in ls] else : j = ls_len / n k = ls_len % n ### j,j,j,...(前面有n-1个j),j+k #步长j,次数n-1 ls_return = [] for i in xrange ( 0 ,(n - 1 ) * j,j): ls_return.append(ls[i:i + j]) #算上末尾的j+k ls_return.append(ls[(n - 1 ) * j:]) return ls_return def pin(IP): try : xpin = subprocess.check_output( "ping -n 1 -w 100 %s" % IP, shell = True ) except Exception: xpin = 'empty' ms = '=[0-9]+ms' .decode( "utf8" ) print "%s" % ms print "%s" % xpin mstime = re.search(ms,xpin) if not mstime: MS = 'timeout' return MS else : MS = mstime.group().split( '=' )[ 1 ] return MS.strip( 'ms' ) def count(total_count,I): global RESULT nowsecond = int (time.time()) nums = 0 oknums = 0 timeout = 0 lostpacket = 0.0 total_ms = 0.0 avgms = 0.0 maxms = - 1 while nums < total_count: nums + = 1 MS = pin(I) print 'pin output = %s' % MS if MS = = 'timeout' : timeout + = 1 lostpacket = timeout * 100.0 / nums else : oknums + = 1 total_ms = total_ms + float (MS) if oknums = = 0 : oknums = 1 maxms = float (MS) avgms = total_ms / oknums else : avgms = total_ms / oknums maxms = max (maxms, float (MS)) RESULT[I] = (I, avgms, maxms, lostpacket) def thread_func(t, ipList): if not isinstance (ipList, list ): return else : for ip in ipList: count(t, ip) def readIpsInFile(excelName): data = xlrd.open_workbook(excelName) table = data.sheets()[ 0 ] nrows = table.nrows print 'nrows %s' % nrows ips = [] for i in range (nrows): ips.append(table.cell_value(i, 0 )) print table.cell_value(i, 0 ) return ips if __name__ = = '__main__' : file = 'ping_result.txt' times = 10 network = '' thread_num = 10 args = sys.argv[ 1 :] try : (opts, getopts) = getopt.getopt(args, 'n:t:c:h?' ) except : print "\nInvalid command line option detected." usage() sys.exit( 1 ) for opt, arg in opts: if opt in ( '-n' ): network = arg if opt in ( '-h' , '-?' ): usage() sys.exit( 0 ) if opt in ( '-t' ): times = int (arg) if opt in ( '-c' ): thread_num = int (arg) f = open ( file , 'w' ) workbook = xlwt.Workbook() sheet1 = workbook.add_sheet( "sheet1" , cell_overwrite_ok = True ) if not isinstance (times, int ): usage() sys.exit( 0 ) if network not in [ 'TEST' ] and not os.path.exists(os.path.join(os.path.dirname(__file__), network)): print "The network is wrong or excel file does not exist. please check it." usage() sys.exit( 0 ) else : if network = = 'TEST' : ips = TEST else : ips = readIpsInFile(network) print 'Starting...' threads = [] nest_list = div_list(ips, thread_num) loops = range ( len (nest_list)) print 'Total %s Threads is working...' % len (nest_list) for ipList in nest_list: t = threading.Thread(target = thread_func,args = (times,ipList)) threads.append(t) for i in loops: threads[i].start() for i in loops: threads[i].join() it = 0 for line in RESULT: value = RESULT[line] sheet1.write(it, 0 , line) sheet1.write(it, 1 , str ( '%.2f' % value[ 1 ])) sheet1.write(it, 2 , str ( '%.2f' % value[ 2 ])) sheet1.write(it, 3 , str ( '%.2f' % value[ 3 ])) it + = 1 f.write(line + '\t' + str ( '%.2f' % value[ 1 ]) + '\t' + str ( '%.2f' % value[ 2 ]) + '\t' + str ( '%.2f' % value[ 3 ]) + '\n' ) f.close() workbook.save( 'ping_result.xls' ) print 'Work Done. please check result %s and ping_result.xls.' % file |
这段代码参照了别人的实现,虽然不是特别复杂,这里还是简单解释一下。
- excel读写使用了xlrd和xlwt,基本都是使用了一些简单的api。
- 使用了threading实现多线程并发,和POSIX标准接口非常相似。thread_func是线程的处理函数,它的输入包含了一个ip的List,所以在函数内部通过循环处理各个ip。
- 此外,Python的commands在Windows下并不兼容,所以使用了subprocess模块。
到目前为止,我对Python里面字符集的理解还不到位,所以正则表达式匹配的代码并不够强壮,不过目前勉强可以工作,以后有必要再改咯!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对服务器之家的支持。
原文链接:http://intheworld.win/2017/04/27/使用python进行网络测试/