python controls parallel execution of multiple devices based on Appium
- 2021-10-11 18:58:13
- OfStack
Foreword:
How to control multiple devices to execute test cases in parallel.
Thoughts
Let's think about it. We can get the information of parameters and devices, so we can also open different port services for each device. Then each service corresponds to the port. When we get the device list, we should correspond to each service. In this way, we open one into the city, we control the devices in the process pool, and each process pool can control different kinds of devices.
Implementation chapter
Firstly, the corresponding parameter section and the corresponding device port are realized,
def startdevicesApp():
l_devices_list=[]
port_list=[]
alldevices=get_devices()
if len(alldevices)>0:
for item in alldevices:
port=random.randint(1000,6000)
port_list.append(port)
desired_caps = {
'platformName': 'Android',
'deviceName': item,
'platformVersion': getPlatForm(item),
'appPackage': get_apkname(apk_path), # Package name
'appActivity': get_apk_lautc(apk_path), # apk Adj. launcherActivity
'skipServerInstallation': True,
"port":port
}
l_devices_list.append(desired_caps)
return l_devices_list,port_list
Next, let's write a port to open the service.
class RunServer(threading.Thread):# Thread that starts the service
def __init__(self, cmd):
threading.Thread.__init__(self)
self.cmd = cmd
def run(self):
os.system(self.cmd)
def start(port_list:list):
def __run(url):
time.sleep(10)
response = urllib.request.urlopen(url, timeout=5)
if str(response.getcode()).startswith("2"):
return True
for i in range(0, len(port_list)):
cmd = "appium -p %s " % (
port_list[i])
if platform.system() == "Windows": # windows Start down server
t1 =RunServer(cmd)
p = Process(target=t1.start())
p.start()
while True:
time.sleep(4)
if __run("http://127.0.0.1:" + port_list[i]+ "/wd/hub/status"):
break
We started the service. Next, how do we execute the test cases according to different processes?
def runcase(devics):
# Execute test cases
pass
def run(deviceslist:list):
pool = Pool(len(deviceslist))
for i in deviceslist:
pool.map(runcase, i)
pool.close()
pool.join()
Next, we combine the code to form the final execution.
Final code presentation
from appium import webdriver
from androguard.core.bytecodes.apk import APK
import os
import random
apk_path = "/Users/lileilei/Downloads/com.tencent.mobileqq_8.5.0_1596.apk"
def get_devices() -> list:
all_devices = []
cmd = "adb devices"
reslut = os.popen(cmd).readlines()[1:]
for item in reslut:
if item != "\n":
all_devices.append(str(item).split("\t")[0])
return all_devices
def getPlatForm(dev: str) -> str:
cmd = 'adb -s {} shell getprop ro.build.version.release'.format(dev)
reslut = os.popen(cmd).readlines()[0]
return str(reslut).split("\n")[0]
def get_apkname(apk):
a = APK(apk, False, "r")
return a.get_package()
def get_apk_lautc(apk):
a = APK(apk, False, "r")
return a.get_main_activity()
import platform
from multiprocessing import Process,Pool
import time,urllib.request
import threading
class RunServer(threading.Thread):# Thread that starts the service
def __init__(self, cmd):
threading.Thread.__init__(self)
self.cmd = cmd
def run(self):
os.system(self.cmd)
def start(port_list:list):
def __run(url):
time.sleep(10)
response = urllib.request.urlopen(url, timeout=5)
if str(response.getcode()).startswith("2"):
return True
for i in range(0, len(port_list)):
cmd = "appium -p %s " % (
port_list[i])
if platform.system() == "Windows": # windows Start down server
t1 =RunServer(cmd)
p = Process(target=t1.start())
p.start()
while True:
time.sleep(4)
if __run("http://127.0.0.1:" + port_list[i]+ "/wd/hub/status"):
break
def startdevicesApp():
l_devices_list=[]
port_list=[]
alldevices=get_devices()
if len(alldevices)>0:
for item in alldevices:
port=random.randint(1000,6000)
port_list.append(port)
desired_caps = {
'platformName': 'Android',
'deviceName': item,
'platformVersion': getPlatForm(item),
'appPackage': get_apkname(apk_path), # Package name
'appActivity': get_apk_lautc(apk_path), # apk Adj. launcherActivity
'skipServerInstallation': True,
"port":port
}
l_devices_list.append(desired_caps)
return l_devices_list,port_list
def runcase(devics):
# Execute test cases
pass
def run(deviceslist:list):
pool = Pool(len(deviceslist))
for devices in deviceslist:
pool.map(runcase, devices)
pool.close()
pool.join()
if __name__=="__main__":
l_devices_list,port_list=startdevicesApp()
start(port_list)
run(l_devices_list)
Above is python based on Appium control multi-device parallel execution details, more about Appium control multi-device parallel execution information please pay attention to other related articles on this site!