软件测试知识持续更新整理不易,希望对各位学习软件测试能带来帮助
- 第八章 自动化测试高级应用
- 第一节、自动发邮件功能
- 8.1.1、文件形式的邮件
- 8.1.2、HTML 形式的邮件
- 8.1.3、获取测试报告
- 8.1.4、整合自动发邮件功能
- 第二节、python 多进程/线程基础
- 8.2.1、单线程
- 8.2.2、thread 模块
- 8.2.3、threading 模块
- 8.2.4、multiprocessing 模块
- 8.2.5、Pipe 和 queue
- 第三节、多进程执行测试用例
- 第四节、定时任务
- 8.4.1、程序控制时间执行
- 8.4.2、windows 添加任务计划
- 8.4.3、linux 实现定时任务
- 一、通过 at 命令创建任务
- at 命令指定时间的方式
- 绝对计时方法:
- 相对计时方法:
- 用法:
- 二、通过 crontab 命令创建任务
- 第五节、WebDriver 方法二次封装
- 未完待续
既然要做自动,就得对得起自动化的这个名字。这一章我们将进一步的增加自动化测试的实用,增加自动发邮件功能、多线程 和定时任务,让我们的自动化工作真正变得高效而又强大起来。
第一节、自动发邮件功能我们自动化脚本运行完成之后生成了测试报告,如果能将结果自动的发到邮箱就不用每次打开阅读,而且随着脚本的不段运行,生成的报告会越来越多,找到最近的报告也是一个比较麻烦的事件;如果能自动的将结果发到 boss 邮箱,也是个不错的选择。
python 的 smtplib 模块提供了一种很方便的途径发送电子邮件。它对 smtp 协议进行了简单的封装。
smtp 协议的基本命令包括:
HELO 向服务器标识用户身份
MAIL 初始化邮件传输 mail from:
RCPT 标识单个的邮件接收人;常在 MAIL 命令后面,可有多个 rcpt to:
DATA 在单个或多个 RCPT 命令后,表示所有的邮件接收人已标识,并初始化数据传输,以.结束
VRFY 用于验证指定的用户/邮箱是否存在;由于安全方面的原因,服务器常禁止此命令
EXPN 验证给定的邮箱列表是否存在,扩充邮箱列表,也常被禁用
HELP 查询服务器支持什么命令
NOOP 无 *** 作,服务器应响应 OK
QUIT 结束会话
RSET 重置会话,当前传输被取消
MAIL FROM 指定发送者地址
RCPT TO 指明的接收者地址
一般 smtp 会话有两种方式,一种是邮件直接投递,就是说,比如你要发邮件給 zzz@163.com,那
就直接连接 163.com 的邮件服务器,把信投給 zzz@163.com; 另一种是验证过后的发信,它的过程是,比如你要发邮件給 zzz@163.com,你不是直接投到 163.com,而是通过自己在 sina.com 的另一个邮箱来发。这样就要先连接 sina.com 的 smtp 服务器,然后认证,之后在把要发到 163.com 的信件投到sina.com上,sina.com 会帮你把信投递到 163.com。
下面解析几种发邮件的实例,让我们深入理解发邮件的实现。
8.1.1、文件形式的邮件#coding=utf-8
import smtplib
from email.mime.text import MIMEText
from email.header import Header
#发送邮箱
sender = 'abc@126.com'
#接收邮箱
receiver = '123456@qq.com'
#发送邮件主题
subject = 'python email test'
#发送邮箱服务器
smtpserver = 'smtp.126.com'
#发送邮箱用户/密码
username = 'abc@126.com'
password = '123456'
#中文需参数‘utf-8’,单字节字符不需要
msg = MIMEText('你好!','text','utf-8')
msg['Subject'] = Header(subject, 'utf-8')
smtp = smtplib.SMTP()
smtp.connect('smtp.126.com')
smtp.login(username, password)
smtp.sendmail(sender, receiver, msg.as_string())
smtp.quit()
import smtplib
导入 smtplib 发邮件模块,从面的脚本,邮件的发送、接收等相关服务,全部由 smtplib.SMTP 方法来完成。
from email.mime.text import MIMEText
from email.header import Header
导入 email 模块,MIMEText 和 Header 主要用来完邮件内容与邮件标题的定义。
smtp.connect()
用于链接邮件服务器
smtp.login()
配置发送邮箱的用户名密码
smtp.sendmail()
配置发送邮箱,接收邮箱,以及发送内容
smtp.quit()
关闭发邮件服务
运行程序,登录接收邮件的邮箱,会看一封发来的邮件。如图 :
文件标题为“python email test” ,邮件内容在生成的一个 DAT000000.text 文件里,打开文件可阅读文件内容。注意:不同邮件服务器对邮件文件的解析会有差异。
8.1.2、HTML 形式的邮件#coding=utf-8
import smtplib
from email.mime.text import MIMEText
from email.header import Header
#邮件信息配置
sender = 'abc@126.com'
receiver = '123456@qq.com'
subject = 'python email test'
smtpserver = 'smtp.126.com'
username = 'abc@126.com'
password = '123456'
#HTML 形式的文件内容
msg = MIMEText('你好!','html','utf-8')
msg['Subject'] = subject
smtp = smtplib.SMTP()
smtp.connect('smtp.126.com')
smtp.login(username, password)
smtp.sendmail(sender, receiver, msg.as_string())
smtp.quit()
相比上面的例子,我们唯一修改的就是文件内容的形式。这次的文件主题直接显示在邮件的正文,不
需要通过附件打开进行阅读。
运行程序,登录接收邮件的邮箱查看,如图
我们已经可以通过 python 编写发邮件程序了,现在要解决的问题如何在 report\目录下找到最新生成
的报告,只有找到了才能把发邮件功能集成到我们的自动化应用中。
newfile.py
# coding:utf-8
import os,datetime,time
result_dir = 'D:\selenium_python\report'
lists=os.listdir(result_dir)
lists.sort(key=lambda fn: os.path.getmtime(result_dir+"\"+fn) if not
os.path.isdir(result_dir+"\"+fn) else 0)
print ('最新的文件为: '+lists[-1])
file = os.path.join(result_dir,lists[-1])
print file
os.listdir()
用于获取目录下的所有文件列表
lists.sort(key=lambda fn: os.path.getmtime(result_dir+“\”+fn) if not os.path.isdir(result_dir+“\”+fn)
else 0)
在这段代码中,这一条语句是比较复杂难懂的,我们将其拆开来分析。
lists.sort()
Python 列表有一个内置的列表。sort()方法用于改变列表中元素的位置。还有一个 sorted()内置函数,建立了一种新的迭代排序列表。
key=lambda fn:
key 是带一个参数的函数,用来为每个元素提取比较值. 默认为 None, 即直接比较每个元素
lambda 提供了一个运行时动态创建函数的方法。我这里创建了 fn 函数。
下面一个小例子来演示通过 sort()方法对一数组进排序:
#定位一个数组
>>> lists=['c.txt','b.txt','d.txt','a.txt']
>>> lists
['c.txt', 'b.txt', 'd.txt', 'a.txt']
#取数组中的 key 做排序
>>> lists.sort(key=lambda lists:lists[0])
>>> print lists
['a.txt', 'b.txt', 'c.txt', 'd.txt']
>>> lists.sort(key=lambda lists:lists[1])
>>> lists
['c.txt', 'b.txt', 'd.txt', 'a.txt']
lists:lists[0] 表示取的是每个元组中的前半部分,即为:c、b、d、a ,所以可进行排序。
lists:lists[1] 表示取的是每个元组中的后半部分,即为:txt ,不能有效的进行排序规律,所以按照数组的原样输出。
os.path.getmtime()
getmtime()返回文件列表中最新文件的时间(最新文件的时间最大,所以我们会得到一个最大时间)
os.path.isdir()
isdir()函数判断某一路径是否为目录。
经过 sort()复杂的运算,我们获得了最新的 lists 文件列表,这个文件列表是根据文件的创建时间进行
排序的。
lists[-1]
-1 表示取文件列表中的最大值,也就是最新被创建的文件。
os.path.join()
join()方法用来连接字符串,通过路径与文件名的拼接,我们将得到目录下最新被创建的的文件名的完
整路径。运行 newfile.py 得到如下结果:
>>> ================================ RESTART ================================
>>>
最新的文件为: 2013-12-17-10_23_32result.html
D:\selenium_python\report\2013-12-17-10_23_32result.html
8.1.4、整合自动发邮件功能
前提基本功已经练得差不多了,我们已经学会了如何通过 python 现一个发邮件的脚本,学会了如何在一个目录下找到最新的文件。下面就将他们整合到 all_tests.py 文件中。
#coding=utf-8
import unittest
import HTMLTestRunner
import os ,time,datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
#定义发送邮件
def sentmail(file_new):
#发信邮箱
mail_from='fnngj@126.com'
#收信邮箱
mail_to='123456@qq.com'
#定义正文
f = open(file_new, 'rb')
mail_body = f.read()
f.close()
msg=MIMEText(mail_body,_subtype='html',_charset='utf-8')
#定义标题
msg['Subject']=u"私有云测试报告"
#定义发送时间(不定义的可能有的邮件客户端会不显示发送时间)
msg['date']=time.strftime('%a, %d %b %Y %H:%M:%S %z')
smtp=smtplib.SMTP()
#连接 SMTP 服务器,此处用的126的 SMTP 服务器
smtp.connect('smtp.126.com')
#用户名密码
smtp.login('fnngj@126.com','123456')
smtp.sendmail(mail_from,mail_to,msg.as_string())
smtp.quit()
print 'email has send out !'
#查找测试报告,调用发邮件功能
def sendreport():
result_dir = 'D:\selenium_python\report'
lists=os.listdir(result_dir)
lists.sort(key=lambda fn: os.path.getmtime(result_dir+"\"+fn) if not
os.path.isdir(result_dir+"\"+fn) else 0)
print (u'最新测试生成的报告: '+lists[-1])
#找到最新生成的文件
file_new = os.path.join(result_dir,lists[-1])
print file_new
#调用发邮件模块
sentmail(file_new) ……
if __name__ == "__main__":
#执行测试用例
runner.run(alltestnames)
#执行发邮件
sendreport()
sentmail(file_new)
定义一个 sentmail()发邮件函数,接收一个参数 file_new,表示接收最新生成的测试报告文件。
open(file_new, ‘rb’)
以读写(rb)方式打开最新生成的测试报告文件。
mail_body = f.read()
读取文件内容,将内容传递给 mail_body。
MIMEText(mail_body,_subtype=‘html’,_charset=‘utf-8’)
文件内容写入到邮件正文中。html 格式,编码为 utf-8。
sendreport()
定义 sendreport()用于找最新生成的测试报告文件 file_new。调用并将 file_new 传给 sentmail()函数。
程序执行过程:
执行 all_tests.py 文件,通过执行 runner.run(alltestnames)方法开始执行测试用例,所有用例执行完成执行 sendreport()方法,用于找到最新生成的报告,并将报告内容发送到指定的邮箱。
问题:
指定的邮箱可以正常收到邮件,但所得到的邮件内容是空的,这是由于 HTMLTestRunner 报告文件的机制所引起的。在测试用例运行之前生成报告文件,在整个程序没有彻底运行结束前,程序并没有把运行的结果写入到文件中,所以,在用例运行完成后发邮件,造成邮件内容是空的。
所以,我们不能在整个程序未运行结束时发送当前的测试报告,我们可以选择上一次运行结果的报告
进行发送。对代码做如下修改:
……
def sendreport():
result_dir = 'D:\selenium_python\report'
lists=os.listdir(result_dir)
lists.sort(key=lambda fn: os.path.getmtime(result_dir+"\"+fn) if not
os.path.isdir(result_dir+"\"+fn) else 0)
print (u'上一次测试生成的报告: '+lists[-2])
#找到上一次测试生成的文件
file_new = os.path.join(result_dir,lists[-2])
print file_new
#调用发邮件模块
sentmail(file_new)
……
lists[-2]
对文件列表中取的值做修改,-2 表示前一次生成的测试结果。
再次运行程序,邮件就正常收到了测试报告的内容。如图
在使用多线程之前,我们首页要理解什么是进程和线程。
什么是进程?
计算机程序只不过是磁盘中可执行的,二进制(或其它类型)的数据。它们只有在被读取到内存中,
被 *** 作系统调用的时候才开始它们的生命期。进程(有时被称为重量级进程)是程序的一次执行。每个进程都有自己的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据。 *** 作系统管理在其上运行的所有进程,并为这些进程公平地分配时间。
什么是线程?
线程(有时被称为轻量级进程)跟进程有些相似,不同的是,所有的线程运行在同一个进程中,共享相同的运行环境。我们可以想像成是在主进程或“主线程”中并行运行的“迷你进程”。
在单线程中顺序执行两个循环。一定要一个循环结束后,另一个才能开始。总时间是各个循环
运行时间之和。
onetherad.py
from time import sleep, ctime
def loop0():
print 'start loop 0 at:', ctime()
sleep(4)
print 'loop 0 done at:', ctime()
def loop1():
print 'start loop 1 at:', ctime()
sleep(2)
print 'loop 1 done at:', ctime()
def main():
print 'start:', ctime()
loop0()
loop1()
print 'all end:', ctime()
if __name__ == '__main__':
main()
运行结果:
start loop 0 at: Mon Dec 23 09:59:44 2013
loop 0 done at: Mon Dec 23 09:59:48 2013
start loop 1 at: Mon Dec 23 09:59:48 2013
loop 1 done at: Mon Dec 23 09:59:50 2013
all end: Mon Dec 23 09:59:50 2013
Python 通过两个标准库 thread 和 threading 提供对线程的支持。thread 提供了低级别的、原始的线程以及一个简单的锁。threading 基于 Java 的线程模型设计。锁(Lock)和条件变量(Condition)在 Java 中是对象的基本行为(每一个对象都自带了锁和条件变量),而在 Python 中则是独立的对象。
8.2.2、thread 模块mtsleep1.py
import thread
from time import sleep, ctime
loops = [4,2]
def loop0():
print 'start loop 0 at:', ctime()
sleep(4)
print 'loop 0 done at:', ctime()
def loop1():
print 'start loop 1 at:', ctime()
sleep(2)
print 'loop 1 done at:', ctime()
def main():
print 'start:', ctime()
thread.start_new_thread(loop0, ())
thread.start_new_thread(loop1, ())
sleep(6)
print 'all end:', ctime()
if __name__ == '__main__':
main()
start_new_thread()要求一定要有前两个参数。所以,就算我们想要运行的函数不要参数,我们也要传一个空的元组。
这个程序的输出与之前的输出大不相同,之前是运行了 6,7 秒,而现在则是 4 秒,是最长的循环的运行时间与其它的代码的时间总和。
start: Mon Dec 23 10:05:09 2013
start loop 0 at: Mon Dec 23 10:05:09 2013
start loop 1 at: Mon Dec 23 10:05:09 2013
loop 1 done at: Mon Dec 23 10:05:11 2013
loop 0 done at: Mon Dec 23 10:05:13 2013
all end: Mon Dec 23 10:05:15 2013
睡眠 4 秒和 2 秒的代码现在是并发执行的。这样,就使得总的运行时间被缩短了。你可以看到,loop1 甚至在 loop0 前面就结束了。
程序的一大不同之处就是多了一个“sleep(6)”的函数调用。如果我们没有让主线程停下来,那主线程就会运行下一条语句,显示“all end”,然后就关闭运行着 loop0()和 loop1()的两个线程并退出了。我们使用 6 秒是因为我们已经知道,两个线程(你知道,一个要 4 秒,一个要 2 秒)在主线程等待 6
秒后应该已经结束了。
你也许在想,应该有什么好的管理线程的方法,而不是在主线程里做一个额外的延时 6 秒的 *** 作。因为这样一来,我们的总的运行时间并不比单线程的版本来得少。而且,像这样使用 sleep()函数做线程的同步 *** 作是不可靠的。如果我们的循环的执行时间不能事先确定的话,那怎么办呢?这可能造成主线程过早或过晚退出。这就是锁的用武之地了。
mtsleep2.py
#coding=utf-8
import thread
from time import sleep, ctime
loops = [4,2]
def loop(nloop, nsec, lock):
print 'start loop', nloop, 'at:', ctime()
sleep(nsec)
print 'loop', nloop, 'done at:', ctime()
#解锁
lock.release()
def main():
print 'starting at:', ctime()
locks =[]
#以 loops 数组创建列表,并赋值给 nloops
nloops = range(len(loops))
for i in nloops:
lock = thread.allocate_lock()
#锁定
lock.acquire()
#追加到 locks[]数组中
locks.append(lock)
#执行多线程
for i in nloops:
thread.start_new_thread(loop,(i,loops[i],locks[i]))
for i in nloops:
while locks[i].locked():
pass
print 'all end:', ctime()
if __name__ == '__main__':
main()
thread.allocate_lock()
返回一个新的锁定对象。
acquire() /release()
一个原始的锁有两种状态,锁定与解锁,分别对应 acquire()和 release() 方法。
range()
range()函数来创建列表包含算术级数。
range(len(loops))函数理解:
>>> aa= "hello"
#长度计算
>>> len(aa)
5
#创建列表
>>> range(len(aa))
[0, 1, 2, 3, 4]
#循环输出列表元素
>>> for a in range(len(aa)):
print a
0
1
2
3
4
我们先调用 thread.allocate_lock()函数创建一个锁的列表,并分别调用各个锁的 acquire()函数获得锁。获得锁表示“把锁锁上”。锁上后,我们就把锁放到锁列表 locks 中。
下一个循环创建线程,每个线程都用各自的循环号,睡眠时间和锁为参数去调用 loop()函数。为什么我们不在创建锁的循环里创建线程呢?有以下几个原因:(1) 我们想到实现线程的同步,所以要让“所有的马同时冲出栅栏”。(2) 获取锁要花一些时间,如果你的线程退出得“太快”,可能会导致还没有获得锁,线程就已经结束了的情况。
在线程结束的时候,线程要自己去做解锁 *** 作。最后一个循环只是坐在那一直等(达到暂停主线程的目的),直到两个锁都被解锁为止才继续运行。
mtsleep2.py 运行结果:
starting at: Mon Dec 23 20:57:26 2013
start loop start loop0 1at: at:Mon Dec 23 20:57:26 2013
Mon Dec 23 20:57:26 2013
loop 1 done at: Mon Dec 23 20:57:28 2013
loop 0 done at: Mon Dec 23 20:57:30 2013
all end: Mon Dec 23 20:57:30 2013
8.2.3、threading 模块
我们应该避免使用 thread 模块,原因是它不支持守护线程。当主线程退出时,所有的子线程不论它
们是否还在工作,都会被强行退出。有时我们并不期望这种行为,这时就引入了守护线程的概念。threading
模块则支持守护线程。
mtsleep3.py
#coding=utf-8
import threading
from time import sleep, ctime
loops = [4,2]
def loop(nloop, nsec):
print 'start loop', nloop, 'at:', ctime()
sleep(nsec)
print 'loop', nloop, 'done at:', ctime()
def main():
print 'starting at:', ctime()
threads = []
nloops = range(len(loops))
#创建线程
for i in nloops:
t = threading.Thread(target=loop,args=(i,loops[i]))
threads.append(t)
#开始线程
for i in nloops:
threads[i].start()
#等待所有结束线程
for i in nloops:
threads[i].join()
print 'all end:', ctime()
if __name__ == '__main__':
main()
运行时长:
starting at: Mon Dec 23 22:58:55 2013
start loop 0 at: Mon Dec 23 22:58:55 2013
start loop 1 at: Mon Dec 23 22:58:55 2013
loop 1 done at: Mon Dec 23 22:58:57 2013
loop 0 done at: Mon Dec 23 22:58:59 2013
all end: Mon Dec 23 22:58:59 2013
start()
开始线程活动
join()
等待线程终止
所有的线程都创建了之后,再一起调用 start()函数启动,而不是创建一个启动一个。而且,不用再管理一堆锁(分配锁,获得锁,释放锁,检查锁的状态等),只要简单地对每个线程调用 join()函数就可以了。
join()会等到线程结束,或者在给了 timeout 参数的时候,等到超时为止。join()的另一个比较重
要的方面是它可以完全不用调用。一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。
使用可调用的类
mtsleep4.py
#coding=utf-8
import threading
from time import sleep, ctime
loops = [4,2]
class ThreadFunc(object):
def __init__(self,func,args,name=''):
self.name=name
self.func=func
self.args=args
def __call__(self):
apply(self.func,self.args)
def loop(nloop,nsec):
print "seart loop",nloop,'at:',ctime()
sleep(nsec)
print 'loop',nloop,'done at:',ctime()
def main():
print 'starting at:',ctime()
threads=[]
nloops = range(len(loops))
for i in nloops:
#调用 ThreadFunc 实例化的对象,创建所有线程
t = threading.Thread(
target=ThreadFunc(loop,(i,loops[i]),loop.__name__))
threads.append(t)
#开始线程
for i in nloops:
threads[i].start()
#等待所有结束线程
for i in nloops:
threads[i].join()
print 'all end:', ctime()
if __name__ == '__main__':
main()
运行结果:
starting at: Tue Dec 24 16:39:16 2013
seart loop 0 at: Tue Dec 24 16:39:16 2013
seart loop 1 at: Tue Dec 24 16:39:16 2013
loop 1 done at: Tue Dec 24 16:39:18 2013
loop 0 done at: Tue Dec 24 16:39:20 2013
all end: Tue Dec 24 16:39:20 2013
创建新线程的时候,Thread 对象会调用我们的 ThreadFunc 对象,这时会用到一个特殊函数__call__()。
由于我们已经有了要用的参数,所以就不用再传到 Thread()的构造函数中。由于我们有一个参数的元组,这时要在代码中使用 apply()函数。
我们传了一个可调用的类(的实例),而不是仅传一个函数。
__ init__()
方法在类的一个对象被建立时运行。这个方法可以用来对你的对象做一些初始化。
apply()
apply(func [, args [, kwargs ]]) 函数用于当函数参数已经存在于一个元组或字典中时,间接地调用函数。
args 是一个包含将要提供给函数的按位置传递的参数的元组。如果省略了 args,任何参数都不会被传递,kwargs 是一个包含关键字参数的字典。
apply() 用法:
#不带参数的方法
>>> def say():
print 'say in'
>>> apply(say)
say in
#函数只带元组的参数
>>> def say(a,b):
print a,b
>>> apply(say,('hello','虫师'))
hello 虫师
#函数带关键字参数
>>> def say(a=1,b=2):
print a,b
>>> def haha(**kw):
apply(say,(),kw)>>> haha(a='a',b='b')
a b
8.2.4、multiprocessing 模块
multiprocessing 使用类似于 threading 模块的 API ,multiprocessing 提供了本地和远程的并发性,有
效的通过全局解释锁(Global Interceptor Lock, GIL)来使用进程(而不是线程)。由于 GIL 的存在,在 CPU 密集型的程序当中,使用多线程并不能有效地利用多核 CPU 的优势,因为一个解释器在同一时刻只会有一个线程在执行。所以,multiprocessing 模块可以充分的利用硬件的多处理器来进行工作。它支持 Unix 和Windows 系统上的运行。
process1.py
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
运行结果:
hello bob
与 threading.Thread 类似,它可以利用 multiprocessing.Process 对象来创建一个进程。Process 对象与
Thread 对象的用法相同,也有 start(), run(), join()的方法。
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
target 表示调用对象,args 表示调用对象的位置参数元组。kwargs 表示调用对象的字典。Name 为别名。
Group 实质上不使用。
扩展理解:
在nix 上面创建的新的进程使用的是 fork:
.
一个进程,包括代码、数据和分配给进程的资源。fork()函数通过系统调用创建一个与原来进程几乎完全相同的进程,也就是两个进程可以做完全相同的事,但如果初始参数或者传入的变量不同,两个进程也可以做不同的事。
.
这意味着子进程开始执行的时候具有与父进程相同的全部内容。请记住这点,这个将是下面我们讨论基于继承的对象共享的基础。所谓基于继承的对象共享,是说在创建子进程之前由父进程初始化的某些对象可以在子进程当中直接访问到。在 Windows平台上,因为没有 fork 语义的系统调用,基于继承的共享对象比nix 有更多的限制,最主要就是体现在要求 Process的__init__当中的参数必须可以 Pickle。
.
但是,并不是所有的对象都是可以通过继承来共享,只有 multiprocessing库当中的某些对象才可以。 例如 Queue,同步对象,共享变量,Manager 等等。
.
在一个 multiprocessing库的典型使用场景下,所有的子进程都是由一个父进程启动起来的,这个父 进程称为 master进程。这个父进程非常重要,它会管理一系列的对象状态,一旦这个进程退出,子进程很可能会处于一个很不稳定的状态,因为它们共享的状态也许已经被损坏掉了。因此,这个进程最好尽可能做最少的事情,以便保持其稳定性。
获取进程 ID
process2.py
```c
from multiprocessing import Process
import os
def info(title):
print title
print 'module name:', __name__
if hasattr(os, 'getppid'):
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
def f(name):
info('function f')
print 'hello', name
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
运行结果:
main line
module name: __main__
process id: 8972
function f
module name: __main__
process id: 10648
hello bob
multiprocessing 提供了 threading 包中没有的 IPC(比如 Pipe 和 Queue),效率上更高。应优先
考虑 Pipe 和 Queue,避免使用 Lock/Event/Semaphore/Condition 等同步方式 (因为它们占据的不
是用户进程的资源)。
hasattr(object, name)
判断对象 object 是否包含名为 name 的特性(hasattr 是通过调用 getattr(ojbect, name)是否抛出异常来实
现的)。hasattr(os, ‘getppid’) 用于判断系统是否包含 getppid。
getpid()得到本身进程 id,getppid()得到父进程进程 id,如果已经是父进程,得到系统进程
id。
Process.PID 中保存有 PID,如果进程还没有 start(),则 PID 为 None。
Pipe 和 Queue
multiprocessing 包中有 Pipe 类和 Queue 类来分别支持这两种 IPC 机制。Pipe 和 Queue 可以
用来传送常见的对象。
( 1 ) Pipe 可 以 是 单 向 (half-duplex) , 也 可 以 是 双 向 (duplex) 。 我 们 通 过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从 PIPE 一端输入对象,然后被 PIPE 另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
pipe.py
#coding=utf-8
import multiprocessing
def proc1(pipe):
pipe.send('hello')
print('proc1 rec:',pipe.recv())
def proc2(pipe):
print('proc2 rec:',pipe.recv())
pipe.send('hello, too')
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
注:本程序只能在 linux/Unix 上运行,运行结果:
‘proc2 rec:’,‘hello’
‘proc2 rec:’,‘hello,too’
这里的 Pipe 是双向的。Pipe 对象建立的时候,返回一个含有两个元素的表,每个元素代表 Pipe的一端(Connection 对象)。我们对 Pipe 的某一端调用 send()方法来传送对象,在另一端使用 recv()来接收。
(2) Queue 与 Pipe 相类似,都是先进先出的结构。但 Queue 允许多个进程放入,多个进程从队列取出对象。Queue 使用 mutiprocessing.Queue(maxsize)创建,maxsize 表示队列中可以存放对象的最大数量。
queue.py
#coding=utf-8
import os
import multiprocessing
import time
# input worker
def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.time())
queue.put(info)
# output worker
def outputQ(queue,lock):
info = queue.get()
lock.acquire()
print (str(os.getpid()) + '(get):' + info)
lock.release()
# Main
record1 = [] # store input processes
record2 = [] # store output processes
lock = multiprocessing.Lock() # 加锁,为防止散乱的打印
queue = multiprocessing.Queue(3)
# input processes
for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)
# output processes
for i in range(10):
process = multiprocessing.Process(target=outputQ,args=(queue,lock))
process.start()
record2.append(process)
for p in record1:
p.join()
queue.close() # 没有更多的对象进来,关闭 queue
for p in record2:
p.join()
注:本程序只能在 linux/Unix 上运行,运行结果
2702(get):2689(put):1387947815.56
2704(get):2691(put):1387947815.58
2706(get):2690(put):1387947815.56
2707(get):2694(put):1387947815.59
2708(get):2692(put):1387947815.61
2709(get):2697(put):1387947815.6
2703(get):2698(put):1387947815.61
2713(get):2701(put):1387947815.65
2716(get):2699(put):1387947815.63
2717(get):2700(put):1387947815.62
第三节、多进程执行测试用例
因为多进程与多线程特性在 python 中当中也属于比较高级的应用,对于初学者来说比理解起来有一定难度,所以在介绍 python 的多进程与多线程时我们通过相当的篇幅和实例来进行讲解,目的是为了让读者深入的理解 python 的多进程与多线程。
为实现多进程运行测试用例,我需要对文件结构进行调整:
我们创建了 thread1 和 thread2 两个文件夹,分别放入了两个测试用例;下面我们编写 all_tests_process.py文件来通过多进程来执行测试用例。
#coding=utf-8
import unittest, time, os, multiprocessing
import commands
from email.mime.text import MIMEText
import HTMLTestRunner
import sys
sys.path.append('\selenium_proces')
def EEEcreatsuite1():
casedir=[]
listaa=os.listdir('D:\selenium_proces\')
for xx in listaa:
if "thread" in xx:
casedir.append(xx)
print casedir
suite=[]
for n in casedir:
testunit=unittest.TestSuite()
discover=unittest.defaultTestLoader.discover(str(n),pattern
='start_*.py',top_level_dir=r'E:\')
for test_suite in discover:
for test_case in test_suite:
testunit.addTests(test_case)
#print testunit
suite.append(testunit)
return suite,casedir
def EEEEEmultiRunCase(suite,casedir):
now = time.strftime('%Y-%m-%d-%H_%M_%S',time.localtime(time.time()))
filename = 'D:\selenium_python\report\'+now+'result.html'
fp = file(filename, 'wb')
proclist=[]
s=0
for i in suite:
runner = HTMLTestRunner.HTMLTestRunner(
stream=fp,
title=str(casedir[s])+u'测试报告',
description=u'用例执行情况:'
)
proc = multiprocessing.Process(target=runner.run,args=(i,))
proclist.append(proc)
s=s+1
for proc in proclist: proc.start()
for proc in proclist: proc.join()
fp.close()
if __name__ == "__main__":
runtmp=EEEcreatsuite1()
EEEEEmultiRunCase(runtmp[0],runtmp[1])
all_tests_process.py 程序稍微复杂,我们分段来进行讲解。
在 EEEcreatsuite1()函数中:
……
casedir=[]
listaa=os.listdir('D:\selenium_proces\')
for xx in listaa:
if "thread" in xx:
casedir.append(xx)
print casedir
……
定义 casedir 数组,读取 selenium_proces 目录下的文件/文件夹,找到文件/文件夹的名包含
“thread”的文件/文件夹添加到 casedir 数组中(即 thread1 和 thread2 两个文件夹)。
……
suite=[]
for n in casedir:
testunit=unittest.TestSuite()
discover=unittest.defaultTestLoader.discover(str(n),pattern
='start_*.py',top_level_dir=r'E:\')
for test_suite in discover:
for test_case in test_suite:
testunit.addTests(test_case)
#print testunit
suite.append(testunit)
return suite,casedir
定位 suite 数组,for 循环读取 casedir 数组中的数据(即 thread1 和 thread2 两个文件夹)。通过 discover 分别读取文件夹下匹配 start_*.py 规则的用例文件,将所有用例文件添加到testunit 测试条件中,再将测试套件追加到定义的 suite 数组中。
在整个 EEEcreatsuite1()函数中 返回 suite 和 casedir 两个数组的值。
在 EEEEEmultiRunCase 函数中:
……
proclist=[]
s=0
for i in suite:
runner = HTMLTestRunner.HTMLTestRunner(
stream=fp,
title=str(casedir[s])+u'测试报告',
description=u'用例执行情况:'
)
proc = multiprocessing.Process(target=runner.run,args=(i,))
proclist.append(proc)
s=s+1
for proc in proclist: proc.start()
for proc in proclist: proc.join()
……
定义 proclist()函数,for 循环把 suite 数组中的用例执行结果写入 HTMLTestRunner 测试报告。multiprocessing.Process 创建用例执行的多进程,把创建的多进程追加到 proclist 数组中,
for 循环 proc.start()开始进程活动,proc.join()等待线程终止。
小结:
笔者虽然花费了不少力气介绍 python 的多进程/多线程,以及如何用多进程执行测试用例,但笔者并不推荐在 window 以及低配置的电脑上使用,另外一台电脑上开的进程不要超过三个,并行执行用例时相互之间会发生一定的干扰。
为了更对得起“自动化测试”的名号,我们可以设置定时任务,使我们自动化脚本在某个时间点自动运行脚本。实现这个需求的方式很多。
8.4.1、程序控制时间执行通过程序来控制用例在什么时候执行的执行最简单的方式了。创建 all_tests_time.py 文件:
#coding=utf-8
import unittest
import HTMLTestRunner
import os ,time
listaa='D:\selenium_python\test_case'
def creatsuitel():
testunit=unittest.TestSuite()
discover=unittest.defaultTestLoader.discover(listaa,
pattern ='start_*.py',
top_level_dir=None)
for test_suite in discover:
for test_case in test_suite:
testunit.addTests(test_case)
print testunit
return testunit
alltestnames = creatsuitel()
now = time.strftime('%Y-%m-%M-%H_%M_%S',time.localtime(time.time()))
filename = 'D:\selenium_python\report\'+now+'result.html'
fp = file(filename, 'wb')
runner =HTMLTestRunner.HTMLTestRunner(
stream=fp,
title=u'百度搜索测试报告',
description=u'用例执行情况:')
#########控制什么时间脚本执行######
k=1
while k <2:
timing=time.strftime('%H_%M',time.localtime(time.time()))
if timing == '12_00':
print u"开始运行脚本:"
runner.run(alltestnames) #执行测试用例
print u"运行完成退出"
break
else:
time.sleep(5)
print timing
本程序是在 all_tests.py 文件基础上做的修改,为了便于读者对完整程序的阅读,所以这里贴出了整个程序代码,下面和读者重点分析最后一段代码。
k=1,while 循环中判断 k<2 ,我们在整个循环体中并没有对 k 的值做任何修改,所以,单从这个循环条件来判断,是一个死循环。但是,我们可以循环的执行过程中通过 break 语句跳出循环。
在循环体中,我们通过 timing 来获取当前时间,只获取当前的小时和分钟,来判断是否为 12:00 ,如果当前时间为 12 点,开始执行测试脚本,并通过 break 语句跳出循环。
如果当前时间不等于 12:00 那么修改 5 秒后,输出当前时间。当然这个休眠时间可以自由控制,但不能大于 60 秒,不然,有可能匹配不到 12 点 00 分。
为了运行方便,我们在命令提示符下运行(linux 可以在终端下运行),运行结果如图
注意:
假如,我们要运行的时间不是今天的12:00 ,而是某年某月某天的一个时间,那么在获取当前时间时要获取年月日进行比较。
假如我们会在(每天、每周、每月)一个固定的时间运行测试用例,那么可以通过 windows 创建任务计划,这种方式更加方便,我们不用一直打开一个程序来判断当前的时间。
下面以 windows XP 为例:
打开“控制面版”—>“任务计划”—>“添加任务计划” 双击打开任务计划向导
点击“下一步”,可以在程序列表选择要执行的程序。
我这里要选择的是一个 python 脚本,点击“浏览”找到要运行的 python 脚本;再次点击“下一步”。
对任务进行命名,选择执行脚本的频率(每天,每周,每月,一次性,计算机启动时,登录时),然
后,点击“下一步”。
选择任务执行起始时间和日期;点击“下一步”。
输入管理员系统用户的密码。点击“下一步”。
确认任务的创建信息,点击“完成”任务计划创建成功。
任务创建完成,会在当前目录下显示创建完成的任务计划。右键单击创建的任务计划,选择“运行”
将会立刻执行程序。
运行效果如下:
当然,我们也可以右键创建的任务计划,选择“属性”,对任务计划进行调整和修改。
在 linux 下相对实现定时任务的方式比较灵活。我们可以通过 at 命令实现一次性计划任务,也可以通过 batch 实现周期性计划任务。
一、通过 at 命令创建任务at 命令主要用于创建临时的任务,创建的任务只能被执行一次。
以下面以 ubuntu 为例演示 at 命令的使用:
/home/fnngj/test/目录下创建 file.py 文件
#!/usr/bin/python
#coding=utf-8
f=open('f.txt','w')
f.write('hello world!!')
f.close()
以写(‘w’) *** 作打开当前目录下的 f.txt 文件(没有此文件会自动创建),向文件中写入‘hello world!’,
然后 close()关闭文件。
通过 at 执行创建的 file.py 程序。
fnngj@fnngj-VirtualBox:~/test$ at now+5 minutes
warning: commands will be executed using /bin/sh
at> python /home/fnngj/test/fiele.py
at> <EOT> Ctrl+d 保存退出
job 17 at Wed Jan 8 17:56:00 2014
now+5 minutes 表示当前时间,5分钟之后执行
python /home/fnngj/test/fiele.py 要执行的文件,python 命令,文件要完整的路径
Ctrl+d 保存退出
查看创建的任务
fnngj@fnngj-VirtualBox:~/test$ at -l
18 Wed Jan 8 17:56:00 2014 a fnngj
fnngj@fnngj-VirtualBox:~/test$ atq
10 Wed Jan 8 12:57:00 2014 a fnngj
at -l / atq 两个命令查看 at 创建的任务。
删除已经设置的任务
fnngj@fnngj-VirtualBox:~/test$ at -l
10 Wed Jan 8 12:57:00 2014 a fnngj
fnngj@fnngj-VirtualBox:~/test$ atrm 10
10 是任务的“编号”,atrm 用于删除已经创建的任务
启动 atd 进程
linux 一般默认会启动 atd 进行
fnngj@fnngj-VirtualBox:~/test$ ps -ef | grep atd
daemon 806 1 0 16:33 ? 00:00:00 atd
fnngj 3300 1882 0 19:07 pts/1 00:00:00 grep --color=auto atd
上面表示 atd 进程已经启动
fnngj@fnngj-VirtualBox:~/test$ /etc/init.d/atd status --启动 atd 进程
at 命令指定时间的方式
绝对计时方法:
midnight noon teatime
hh:mm [today]
hh:mm tomorrow
hh:mm 星期
hh:mm MM/DD/YY
now+n minutes
mow+n hours
now+n days
指定在今天下午17:30执行某命令(假设现在时间是下午14:30,2014年1月11日)
命令格式:
at 5:30pm
at 17:30
at 17:20 today
at now+3 hours
at now+180 minutes
at 17:30 14.1.11
at 17:30 1.11.14
查看 f.txt 文件内容
fnngj@fnngj-VirtualBox:~/test$ ls
f.txt~ f.txt open.py open.py~
fnngj@fnngj-VirtualBox:~/test$ cat f.txt
hello world!!
二、通过 crontab 命令创建任务
crontab 可以方便的用来创建周期性任务,也许你想每天某个时间执行 python 程序,或每周五的某个
时间执行。crontab 像 windows 的计划任务一样方便,或者更加灵活。
file_time.py
#!/usr/bin/python
#coding=utf-8
import time
f=open('123.txt','a')
now = time.strftime('%Y-%m-%d-%H_%M_%S',time.localtime(time.time()))
f.write('file run time:'+now+'\n')
f.close()
这次,我们以追加的方式,获取当前时间写入到123.txt 文件中。也就是说程序每运行一次,获取一次当前时间追加(不是替换)写入到123.txt 文件中。
运行一次 file_time.py
fnngj@fnngj-VirtualBox:~/test$ python file_time.py
查看123.txt 文件内容
fnngj@fnngj-VirtualBox:~/test$ cat 123.txt
file run time:2014-01-09-17_53_17
下面通过 crontab 来创建任务:
为更快的看到任务是否被多次执行的效果,我们要求 file_time.py 每小时过5分钟执行一次。
fnngj@fnngj-VirtualBox:~/test$ crontab -e
crontab: installing new crontab
输入 crontab - e 命令进入 crontab 文件:
按键盘 i、o、a 任意一个键进入编辑状态,可以对文件进行修改。
按照上面的格式写入内空。
ctrl+x 离开,提示是否保存任务?按 y 保存任务退出。(不同版本 linux 对 crontab 文件的编辑/退出会有差异。)
完成 crontab 任务创建后,会有如下提示:
crontab: installing new crontab
启动 crontab 服务:
注意:在完成编辑以后,要重新启动 cron 进程,crontab 服务 *** 作说明:
~# /etc/init.d/cron restart //重启服务
~# /etc/init.d/cron start //启动服务
~# /etc/init.d/cron stop //关闭服务
~# /etc/init.d/cron reload //重新载入配置
查看 crontab 任务计划:
root@fnngj-VirtualBox:~# cd /var/spool/crontabs/
root@fnngj-VirtualBox:/var/spool/cron/crontabs# ls
fnngj root
root@fnngj-VirtualBox:/var/spool/cron/crontabs# cat fnngj ---因为我们的计划是用 fnngj 用户创建
……
# m h dom mon dow command
5 * * * * python /home/fnngj/test/file_time.py
查看123.txt 文件:
root@fnngj-VirtualBox:/home/fnngj# cat 123.txt
file run time:2014-01-10-10_05_01
file run time:2014-01-10-11_05_01
file run time:2014-01-10-12_05_01
file run time:2014-01-10-13_05_02
在创建完任务后,你可能需要等上一段时间才能看到文件中被写入的内容。
crontab 格式说明:
crontab 的命令格式
crontab {-l|-r|-e}
-l 显示当前的 crontab
-r 删除当前的 crontab
-e 使用编辑器编辑当前 crontab 文件
好多人都觉得周期计划任务设置起来比较麻烦,其实我们只要掌握规律就很好设置。
在以上各个字段中,还可以使用以下特殊字符:
星号( **):代表所有可能的值,例如 month 字段如果是星号,则表示在满足其它字段的制约条件后每
月都执行该命令 *** 作。
逗号(,):可以用逗号隔开的值指定一个列表范围,例如,“1,2,5,7,8,9”
中杠(-):可以用整数之间的中杠表示一个整数范围,例如“2-6”表示“2,3,4,5,6”
正斜线(/):可以用正斜线指定时间的间隔频率,例如“0-23/2”表示每两小时执行一次。同时正斜线可以和星号一起使用,例如/10,如果用在 minute 字段,表示每十分钟执行一次。
实列:
规则: 把知道的具体的时间添上,不知道的都添加上 *
分钟 小时 天 月 星期 命令/脚本
假如,我们每天早上4点要做一下 *** 作,以下面方式表示:
假如,我们每周一和三下午的6点要做一下 *** 作,以下面方式表示:
在上学的时候都有上机课,周一到周五,下午5点30上课结果。我们需要在5点30发一个通知,5点45
自动关机。设定计划任务需要分两步完成,第一步提醒,第二步关机
在 linux 下设置定时任务,本书默认读者是具备 linux *** 作使用的能力的,如文件的基本 *** 作,vi/vim
编辑器的基本使用。
在自动化脚本越写越多的时候,发现 WebDriver 的 API 提供给我们的方法并不好用,例如 webdriver
所提供的元素定位方法又长又难写。这个时候我们就可以对这些方法做个简易的二次封装,使其更易于被使用。
这里我们抛弃之前的测试结构,从一个最简单的脚本来分析如何对 WebDriver 的方法做二次封装。这
里依然使用百度搜索的例子。
# coding = utf-8
from selenium import webdriver
browser = webdriver.Firefox()
browser.get("http://www.baidu.com")
browser.find_element_by_id("kw").send_keys("selenium")
browser.find_element_by_id("su").click()
browser.quit()
在一个脚本中被最频繁使用的应该是 webdriver 的定位方法了,如 find_element_by_id() 定位方法又长
又难写,我们就可以考虑把这样的方法再次封装到一个方法里调用。封装后的脚本如下:
# coding = utf-8
from selenium import webdriver
driver = webdriver.Firefox()
driver.get("http://www.baidu.com")
#封装 find_element_by_id()方法
def findId(id):
f = driver.find_element_by_id(id)
return f
#调用 findId()方法
findId("kw").send_keys("selenium")
findId("su").click()
driver.quit()
被二次封装后的 findId()方法最原始的调用还是 find_element_by_id() 方法,但 findId()的写法确实更加
简便,从而并没有降低对方法的理解。
我们现在希望把封装的方法给所有的脚本所调用,所以,我们需要把这些方法封装到一个单独的文件
中,在些之些我们先找出 Element 类中所提供的所有定位方法。
定位单个元素:
find_element_by_id()
find_element_by_name()
find_element_by_class_name()
find_element_by_tag_name()
find_element_by_link_text()
find_element_by_partial_link_text()
find_element_by_xpath()
find_element_by_css_selector()
定位一组元素:
find_elements_by_id()
find_elements_by_name()
find_elements_by_class_name()
find_elements_by_tag_name()
find_elements_by_link_text()
find_elements_by_partial_link_text()
find_elements_by_xpath()
find_elements_by_css_selector()
找出这些方法之后,创建 package 目录,在目录下创建 location.py 文件:
#coding=utf-8
from selenium import webdriver
''' 本文件简易的封装定位单个元素和定位一组元素的方法
'''
'''定位单个元素封装'''
def findId(driver,id):
f = driver.find_element_by_id(id)
return f
def findName(driver,name):
f = driver.find_element_by_name(name)
return f
def findClassName(driver,name):
f = driver.find_element_by_class_name(name)
return f
def findTagName(driver,name):
f = driver.find_element_by_tag_name(name)
return f
def findLinkText(driver,text):
f = driver.find_element_by_link_text(text)
return f
def findPLinkText(driver,text):
f = driver.find_element_by_partial_link_text(text)
return f
def findXpath(driver,xpath):
f = driverfind_element_by_xpath(xpath)
return f
def findCss(driver,css):
f = driver.find_element_by_css_selector(css)
return f
'''定位一组元素封装'''
def findsId(driver,id):
f = driver.find_elements_by_id(id)
return f
def findsName(driver,name):
f = driver.find_elements_by_name(name)
return f
def findsClassName(driver,name):
f = driver.find_elements_by_class_name(name)
return f
def findsTagName(driver,name):
f = driver.find_elements_by_tag_name(name)
return f
def findsLinkText(driver,text):
f = driver.find_elements_by_link_text(text)
return f
def findsPLinkText(driver,text):
f = driver.find_elements_by_partial_link_text(text)
return f
def findsXpath(driver,xpath):
f = driver.find_elements_by_xpath(xpath)
return f
def findsCss(driver,css):
f = driver.find_elements_by_css_selector(css)
return f
方法封装完成下面就可以直接在具体的测试脚本中调用了。
baidu.py
#coding=utf-8
from selenium import webdriver
import time
import sys
sys.path.append("\package")
from package import location
#调用 location.py 文件的定位方法
we = location
dr = webdriver.Chrome()
dr.get('http://www.baidu.com')
#调用封装的方法
we.findId(dr,"kw").send_keys('selenium')
time.sleep(2)
we.findId(dr,"su").click()
time.sleep(2)
dr.quit()
findId(dr,”kw”)
这定位方法的传参与上面例子稍有差别,上面的例子中我们只传输入了定位的元素属性 findId(”kw”),而这里却转输入一个参数 dr (浏览器驱动),因为我们同样的脚本需要在不同的浏览器下运行,那么在封装的方法里就不能把驱动写死,所以,我们需要对调用的定位方法传入不同的浏览器驱动。
下面以本节的内容为基础,来封装你的 webdriver api
未完待续关注我,每天持续更新软件测试小知识
软件测试打卡交流聚集地点我,每天都有教学直播【暗号:CSDN】
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)