Script sharing for network testing using Python
- 2020-06-01 10:17:08
- OfStack
preface
Recently, my classmate asked me to help write a tool to test the network. Due to work issues, it took a long time to give a relatively complete version. As a matter of fact, I use Python very little, so I basically write programs while looking up information.
The main logic of the program is as follows:
Read the list of ip in one excel file, then count the network parameters of each ip using multi-threaded calls to ping, and finally output the results to the excel file.
The code is as follows:
#! /usr/bin/env python
# -*- coding: UTF-8 -*-
# File: pingtest_test.py
# Date: 2008-09-28
# Author: Michael Field
# Modified By : intheworld
# Date: 2017-4-17
import sys
import os
import getopt
import commands
import subprocess
import re
import time
import threading
import xlrd
import xlwt
TEST = [
'220.181.57.217',
'166.111.8.28',
'202.114.0.242',
'202.117.0.20',
'202.112.26.34',
'202.203.128.33',
'202.115.64.33',
'202.201.48.2',
'202.114.0.242',
'202.116.160.33',
'202.202.128.33',
]
RESULT={}
def usage():
print "USEAGE:"
print "\t%s -n TEST|excel name [-t times of ping] [-c concurrent number(thread nums)]" %sys.argv[0]
print "\t TEST For simple testing IP The list of "
print "\t-t times Test times ; The default is 1000;"
print "\t-c concurrent number Number of parallel threads : The default is 10"
print "\t-h|-?, Help information "
print "\t The output is the current directory file ping_result.txt and ping_result.xls"
print "for example:"
print "\t./ping_test.py -n TEST -t 1 -c 10"
def div_list(ls,n):
if not isinstance(ls,list) or not isinstance(n,int):
return []
ls_len = len(ls)
print 'ls length = %s' %ls_len
if n<=0 or 0==ls_len:
return []
if n > ls_len:
return []
elif n == ls_len:
return [[i] for i in ls]
else:
j = ls_len/n
k = ls_len%n
### j,j,j,...( In front of n-1 a j),j+k
# Step length j, The number of n-1
ls_return = []
for i in xrange(0,(n-1)*j,j):
ls_return.append(ls[i:i+j])
# Let's take the last one j+k
ls_return.append(ls[(n-1)*j:])
return ls_return
def pin(IP):
try:
xpin=subprocess.check_output("ping -n 1 -w 100 %s" %IP, shell=True)
except Exception:
xpin = 'empty'
ms = '=[0-9]+ms'.decode("utf8")
print "%s" %ms
print "%s" %xpin
mstime=re.search(ms,xpin)
if not mstime:
MS='timeout'
return MS
else:
MS=mstime.group().split('=')[1]
return MS.strip('ms')
def count(total_count,I):
global RESULT
nowsecond = int(time.time())
nums = 0
oknums = 0
timeout = 0
lostpacket = 0.0
total_ms = 0.0
avgms = 0.0
maxms = -1
while nums < total_count:
nums += 1
MS = pin(I)
print 'pin output = %s' %MS
if MS == 'timeout':
timeout += 1
lostpacket = timeout*100.0 / nums
else:
oknums += 1
total_ms = total_ms + float(MS)
if oknums == 0:
oknums = 1
maxms = float(MS)
avgms = total_ms / oknums
else:
avgms = total_ms / oknums
maxms = max(maxms, float(MS))
RESULT[I] = (I, avgms, maxms, lostpacket)
def thread_func(t, ipList):
if not isinstance(ipList,list):
return
else:
for ip in ipList:
count(t, ip)
def readIpsInFile(excelName):
data = xlrd.open_workbook(excelName)
table = data.sheets()[0]
nrows = table.nrows
print 'nrows %s' %nrows
ips = []
for i in range(nrows):
ips.append(table.cell_value(i, 0))
print table.cell_value(i, 0)
return ips
if __name__ == '__main__':
file = 'ping_result.txt'
times = 10
network = ''
thread_num = 10
args = sys.argv[1:]
try:
(opts, getopts) = getopt.getopt(args, 'n:t:c:h?')
except:
print "\nInvalid command line option detected."
usage()
sys.exit(1)
for opt, arg in opts:
if opt in ('-n'):
network = arg
if opt in ('-h', '-?'):
usage()
sys.exit(0)
if opt in ('-t'):
times = int(arg)
if opt in ('-c'):
thread_num = int(arg)
f = open(file, 'w')
workbook = xlwt.Workbook()
sheet1 = workbook.add_sheet("sheet1", cell_overwrite_ok=True)
if not isinstance(times,int):
usage()
sys.exit(0)
if network not in ['TEST'] and not os.path.exists(os.path.join(os.path.dirname(__file__), network)):
print "The network is wrong or excel file does not exist. please check it."
usage()
sys.exit(0)
else:
if network == 'TEST':
ips = TEST
else:
ips = readIpsInFile(network)
print 'Starting...'
threads = []
nest_list = div_list(ips, thread_num)
loops = range(len(nest_list))
print 'Total %s Threads is working...' %len(nest_list)
for ipList in nest_list:
t = threading.Thread(target=thread_func,args=(times,ipList))
threads.append(t)
for i in loops:
threads[i].start()
for i in loops:
threads[i].join()
it = 0
for line in RESULT:
value = RESULT[line]
sheet1.write(it, 0, line)
sheet1.write(it, 1, str('%.2f'%value[1]))
sheet1.write(it, 2, str('%.2f'%value[2]))
sheet1.write(it, 3, str('%.2f'%value[3]))
it+=1
f.write(line + '\t'+ str('%.2f'%value[1]) + '\t'+ str('%.2f'%value[2]) + '\t'+ str('%.2f'%value[3]) + '\n')
f.close()
workbook.save('ping_result.xls')
print 'Work Done. please check result %s and ping_result.xls.'%file
This code refers to the implementation of others, although it is not particularly complex, here is a simple explanation 1.
excel reads and writes using xlrd and xlwt, basically using a simple api. Multithreaded concurrency using threading is very similar to the standard POSIX interface. thread_func is a thread handler whose input contains one List of ip, so each ip is processed through a loop inside the function. In addition, Python's commands is not compatible with Windows, so the subprocess module is used.So far, my understanding of the character set in Python is not good enough, so the regular-expression matching code is not strong enough, but it works just fine for now. I'll have to change it later.
conclusion