Python并发和并行方案
在Python世界有3种并发和并行方案,如下:
多线程(threading)
多进程(mulTIprocessing)
异步IO(asyncio)
注: 并发和并行的区别先不提,最后会借着例子更好的解释,另外稍后也会提到 concurrent.futures,不过它不是一种独立的方案,所以在这里没有列出来。
这些方案是为了解决不同特点的性能瓶颈。性能问题主要有2种:
CPU密集型(CPU-bound)。这也就是指计算密集型任务,它的特点是需要要进行大量的计算。例如Python内置对象的各种方法的执行,科学计算,视频转码等等。
I/O密集型(I/O-bound)。凡是涉及到网络、内存访问、磁盘I/O等的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待I/O *** 作完成。例如数据库连接、Web服务、文件读写等等。
如果你不知道一个任务哪种类型,我的经验是你问问自己,如果给你一个更好更快的CPU它可以更快,那么这就是一个CPU密集的任务,否则就是I/O密集的任务。
这三个方案中对于CPU密集型的任务,优化方案只有一种,就是使用多进程充分利用多核CPU一起完成任务,达到提速的目的。而对于I/O密集型的任务,则这三种方案都可以。
接着借着一个抓取网页并写入本地(典型的I/O密集型任务)小例子来挨个拆解对比一下这些方案。先看例子:
import requests
url
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36' # noqa
}
def fetch(session, page):
with (session.get(f'{url}{page*25}', headers=headers) as r,
open(f'top250-{page}.html', 'w') as f):
f.write(r.text)
def main():
with requests.Session() as session:
for p in range(25):
fetch(session, p)
if __name__ == '__main__':
main()
在这个例子中会抓取豆瓣电影Top250的25个页面(每页显示10个电影),使用requests库,不同页面按顺序请求,一共花了3.9秒:
➜ TIme python io_non_concurrent.py
python io_non_concurrent.py 0.23s user 0.05s system 7% cpu 3.911 total
这个速度虽然看起来还是很好的,一方面是豆瓣做了很好的优化,一方面我家的带宽网速也比较好。接着用上面三种方案优化看看效果。
多进程版本
Python解释器使用单进程,如果服务器或者你的电脑是多核的,这么用其实是很浪费的,所以可以通过多进程提速:
from mulTIprocessing import Pool
def main():
with (Pool() as pool,
requests.Session() as session):
pool.starmap(fetch, [(session, p) for p in range(25)])
注: 这里省略到了那些上面已经出现的了代码,只展示改变了的那部分。
使用多进程池,但没指定进程数量,所以会按着Macbook的核数启动10个进程一起工作,耗时如下:
➜ TIme python use_multiprocessing.py
python use_multiprocessing.py 2.15s user 0.30s system 232% cpu 1.023 total
多进程理论上可以有十倍效率的提升,因为10个进程在一起执行任务。当然由于任务数量是25,不是整数倍,是无法达到10倍的降低耗时,而且由于抓取太快了,没有充分显示多进程方案下的效率提升,所以用时1秒,也就是大约4倍的效率提升。
多进程方案下没有明显的缺点,只要机器够强悍,就可以更快。
多线程版本
Python解释器不是线程安全的,为此Python设计了GIL: 获得GIL锁才可以访问线程中的Python对象。所以在任何一个时间,只有一个线程可以执行代码,这样就不会引发竞态条件(Race Condition) ,虽然GIL的问题很多,但是GIL却是还有它存在的优点,例如简化了内存管理等等,这些不是本文重点所以就不展开了,有兴趣的可以专门去了解。
那么有同学会问,既然同一时间永远只有一个线程在工作,那么多线程可以提高并发效率的原因是什么呢?
解释这个问题还是要提GIL。延伸阅读链接1《Understanding the Python GIL》中做了很好的解释(这里要注意,我们提的方案是Python 3.2新的GIL,而不是Python2的旧版GIL,现在网上有很多针对旧的GIL的描述,其实是过时的,这部分也可以看看延伸阅读链接2的文章帮助理解它们的区别),我截其中几张PPT来说明:
在上图里,本来只有1个线程,所以不需要释放或者获得GIL,但是接着出现了第二个线程,这样就是多个线程,一开始线程2是挂起状态,因为它没有GIL。
线程1在一个 cv_wait周期内会自愿的放弃GIL,例如出现了I/O阻塞,或者超时了(线程不能一直拿着不放,即便在一个周期内没有出现I/O阻塞也要强制释放执行权,这个默认时间是5毫秒,可以通过 sys.setswitchinterval设置,当然设置前你得知道你在做什么)都会触发这个释放GIL的 *** 作。
这里演示了常规的例子(非超时被迫释放),在 cv_wait阶段,线程1由于遇到了I/O阻塞,会发送信号给线程2,此时线程1让出GIL并挂起,而线程2获得GIL,如此循环,之后线程2会释放GIL给线程1。这个PPT在业界非常知名,建议大家多看看。之后的PPT还列举了超时的处理,由于和我们这篇文章关系稍远也不展开了,有兴趣的接着看。btw,我第一次看这个PPT觉得这个超时时间好可怕,也就是说1秒钟要最少切换200次,这也太浪费了,所以你可以尝试在代码中调大这个超时时间哟。
通过上面的内容,多线程通过GIL的控制,每个线程都得到了更好的执行时机,所以不会出现被某个线程任务一直阻塞,因为如果线程遇到阻塞会自愿让出GIL让自己挂起,把机会让给其他线程,这样就提高了执行任务总体的效率。多线程模式下最完美的场景就是任何时间点对应的线程都在做事,而不是有的线程其实等着被执行,但是实际上却被阻塞着。
我们看一下多线程的方案:
from multiprocessing.pool import ThreadPool
def main():
with (ThreadPool(processes=5) as pool,
requests.Session() as session):
pool.starmap(fetch, [(session, p) for p in range(25)])
这里说明2点:
多进程和多线程例子中我都使用了【池】,这是一个好的习惯,因为线(进)程过多会带来额外的开销,其中包括创建销毁的开销、调度开销等等,同时也降低了计算机的整体性能。使用线(进)程池维护多个线(进)程,等待监督管理者分配可并发执行的任务。这样一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。另外用标准库里的进程池和线程池的实现写额外代码极少,而且代码结构还很像,特别适合写对比的例子。
processes如果不指定也是和CPU核数一致的10,但是并不是线程越多越好,因为线程多了,反而出现本来正常有效的执行却被GIL强制释放,这就造成多余上下文切换反而是一个负担了。
在这个例子中,线程数为5,这个其实一方面是经验,一方面是多次调试值的结果,所以这也暴露了多线程编程中如果稍有不慎会让优化变差,也会存在没有找到最优值得问题,因为GIL控制线程是一个黑盒 *** 作,开发者无法直接控制,这哪怕对一些相对有经验的Python开发也非常不友好。
我们看一下时间:
➜ time python use_threading.py
python use_threading.py 0.62s user 0.24s system 74% cpu 1.157 total
可以看到,多线程方案下比原始方案速度快了一倍以上,但是比多进程方案差一点(事实上我认为在真实的例子中会差很多)。这是因为在多进程方案下多核CPU都在独立工作,而多线程方案一方面由于效率问题下不能使用那么多数量的线程,而且由于GIL的限制,在不需要被释放GIL的时候依然被强制释放,就这么不断的切换的过程中反而降低了效率,让效果大打折扣。
concurrent.futures版本
这里也顺便提一下 concurrent.futures的方案。其实它不是一个全新的方案,这是在其他语言(例如Java)里早就出现的一种框架,可以通过它控制线(进)程的启动、执行和关闭。我把它理解为抽象了多进程池和多线程池的代码,让开发者不需要关注多线程和多进程模块的具体细节和用法。其实理解起来也不难,你可以这么拆解:
其实理解起来也不难,例如ThreadPoolExecutor可以这么拆解: ThreadPoolExecutor=Thread+Pool+Executor,其实就是线程+池+执行器。就是预先创建一个线程池用来被重复使用,Executor将任务提交和任务执行进行解耦,它完成线程的调配(如何以及何时)和任务的执行部分。
如果你想了解它的细节,我推荐直接看它的源码文件头部的注释,里面对于数据流有非常详细的说明,可以说比任何技术文章写的都要深入准确了。
这里只演示一下ThreadPoolExecutor的用法:
from functools import partial
from concurrent.futures import ThreadPoolExecutor
def main():
with (ThreadPoolExecutor(max_workers=5) as pool,
requests.Session() as session):
list(pool.map(partial(fetch, session), range(25)))
是不是很熟悉的配方?接口和上面用的进程池线程池都很像,但是要注意 max_workers如果不指定的话数量是CPU个数+4,最大为32。它和多线程的用法问题一样,这个 max_workers需要调优(这里为了对比,所以用了相同的数值)。
➜ time python use_executor.py
python use_executor.py 0.63s user 0.32s system 82% cpu 1.153 total
虽然 concurrent.futures是现在更主流的方案,但是在我使用的体验里,它的效率要略低于直接使用进程池或者线程池的代码,因为它高度抽象,却把事情搞得复杂了,例如用到了对应的queue(queue模块)和信号量(Semaphore),这些反而限制了性能的提升。所以我的建议是,Python初学者可以用它,但高级开发者应该自己控制并发实现。
asyncio版本
前面的多线程相关的方案中,需要开发者根据经验或者去实验,找到一个(或者多个)最优的线程数量,不同的场景这个值区别是很大的,这对于初学者很不友好,非常容易陷入【在用多线程,但是用错了或者用的不够好】这么一种境地。
后来Python引入了新的并发模型: aysncio,本小节给大家解释下最新的asyncio方案为什么是一个更优的选择。首先还是看《Understanding the Python GIL》里面的一页PPT:
我们回忆一下,它提到当只有单个线程时,实际上不会触发GIL,这个独立的线程可以一直执行下去。这也是asyncio找到的切入点: 因为是单进程单线程的,所以理论上不受GIL的限制。在事件驱动的机制下,可以更好的利用单线程的性能,尤其是通过await关键词可以让开发者自己决定调度方案,而不是多线程那种由GIL来控制。
那设想一下,在最美好的情况下,所有await的地方都是可能的I/O阻塞的。那么在执行时,遇到I/O阻塞就可以切换协程,执行其他可以继续执行的任务,所以,这个线程一直都在工作而不会阻塞,可以说利用率达到100%!这是多线程方案下永远不可及的。
讲到这个,我们再回去重新整理和理解一遍,先出基本理论开始。
协程
协程是一种特殊函数,这个函数在本来的def关键字前面加了async关键字,本质上它是生成器函数,可以生成值或者接收外面发送(通过send方法)来的值,但是它最重要的特点是它可以在需要时保存上下文(或者说状态),挂起自己并将控制权交给调用者,由于它保存了挂起时的上下文,在未来可以接着被执行。
其实在调用协程是,它并不会立刻执行:
In : async def a():
...: print('Called')
...:
In : a() # 并未执行,只是返回了协程对象
Out:
In : await a() # 使用await才会真的执行
Called
异步和并发
异步(asynchronous)、非阻塞(non-blocking)、并发(concurrent)是很容易让人产生迷惑的词。结合asyncio场景,我的理解是:
协程是异步执行的,在asyncio中,协程可以在等待执行结果时把自己【暂停】,以便让其他协程同时运行。
异步让执行不需要等待阻塞的逻辑完成就可以先让其他代码同时运行,所以这样就不会【阻塞】其他代码,那么这就是【非阻塞】的代码
使用异步代码编写的程序执行时,看起来其中的任务都在同时执行和完成(因为会在等待中切换),所以看起来是【并发】的
事件循环(EventLoop)
Event Loop这个概念其实我理解了很多年,从Twisted时代开始。我一直觉得它非常神秘复杂,现在看来其实想多了。对于初学者,不如换个思路,它的重点就是事件+循环: Loop是一个环,每个任务作为一个事件放到这个环上,事件会不断地循环,在符合条件的情况下触发执行事件。它的特点如下:
一个事件循环运行在一个线程中
Awaitables对象(协程、Task、Future下面都会提到)都可以注册到事件循环上
如果协程中调用了另外一个协程(通过await),这个协程会挂起,发生上下文切换转而去执行另外这个协程,如此循环
如果协程执行时遇到I/O阻塞,这个协程会带着上下文挂起,然后把控制权交还给EventLoop
既然是loop。注册的全部事件执行完毕后,循环会重新开始
Future/Task
asyncio.Future我觉得像Javascript里面的 Promise, 它是一个占位对象,代表一件还没有做完的事情,在未来才会实现或者完成(当然还可能由于内部出错而抛出异常)。它和上面提的 concurrent.futures方案中实现的 concurrent.futures.Futures很像,但是针对asyncio的事件循环做了很多定制。asyncio.Future它仅仅是一个数据的容器。
asyncio.Task是 asyncio.Future的子类,它用于在事件循环中运行协程。
在官方文档中提到了一个非常直观的例子,我这里改写它在IPython里面执行并说明:
In : async def set_after(fut): # 创建一个协程,他会异步的sleep3秒,然后给future对象设置结果
...: await asyncio.sleep(3)
...: fut.set_result('Done')
...:
In : loop = asyncio.get_event_loop() # 获取当前的事件循环
In : fut = loop.create_future() # 在事件循环中创建一个Future
In : fut # 此时它还是默认的pending状态,因为没有调用它
Out:
In : task = loop.create_task(set_after(fut)) # 在事件循环中创建(或者说注册)了一个任务
In : task # 马上输入它,此时刚创建任务,还在执行中
Out:
In : fut # 马上输入它,此时刚创建任务,还没有执行完所以future没有变化
Out:
In : task # 过了三秒,任务执行完成了
Out:
In : fut # Future也已经设置了结果,所以状态是finished
Out:
可以感受到:
Future对象不是任务,就是存放状态的一个容器
create_task会让事件循环调度协程的执行
创建任务可以用 ensure_future和 create_task, ensure_future是一个更高级封装的函数,但是Python3.7以上版本应该使用 create_task
接着是了解await的作用。如果协程中await一个Future对象,Task会暂停协程的执行并等待Future的完成。而当Future完成后,包装协程的执行将继续:
In : async def a():
...: print('IN a')
...: await b()
...: await c()
...: print('OUT a')
...:
In : async def b():
...: print('IN b')
...: await d()
...: print('OUT b')
...:
...:
In : async def c():
...: print('IN c')
...: await asyncio.sleep(1)
...: print('OUT c')
...:
...:
In : async def d():
...: print('IN d')
...: await asyncio.sleep(1)
...: print('OUT d')
...:
In : asyncio.run(a())
IN a
IN b
IN d
OUT d
OUT b
IN c
OUT c
OUT a
这个例子中,a的入口函数,其中调用b和c,b又会调用d。await会让对应的协程获取执行权限,协程内await的其他协程都执行完毕才会释放权限,所以注意这个更像DFS(深度优先搜索),所以执行顺序是a->b->d->c。
所以这里就得出结论:
事件循环负责协程的协作调度:事件循环一次运行一个任务。当一个任务等待一个Awaitables对象完成时,事件循环会运行其他任务、回调或执行 IO *** 作。
asyncio方案
在asyncio方案里,凡是涉及I/O阻塞 *** 作的库都要使用aio生态中的库,所以已经不能再使用requests库,而是需要使用aiohttp,另外文件 *** 作需要使用aiofiles。最终代码如下(这个2个包需要下载再使用):
import aiofiles
import asyncio
import aiohttp
async def fetch(session, page):
r = await session.get(f'{url}{page*25}', headers=headers)
async with aiofiles.open(f'top250-{page}.html', mode='w') as f:
await f.write(await r.text())
async def main():
loop = asyncio.get_event_loop()
async with aiohttp.ClientSession(loop=loop) as session:
tasks = [asyncio.ensure_future(fetch(session, p)) for p in range(25)]
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
看一下效率:
➜ time python use_asyncio.py
python use_asyncio.py 0.20s user 0.04s system 34% cpu 0.684 total
所以asyncio的优点如下:
asyncio用好了,是这些并发方案中最快的
它支持数千级别的活动连接,这对于websockets和MQTT之类的场景下性能可以表现的很好,而多线程方案中在这个规模的线程数量下会出现严重的性能问题。
多线程方案下线程切换是隐式的,我们无法确认它何时会切换线程的执行权,所以非常容易出现竞态条件(Race Condition)。而asyncio方案里协程的切换是显式、明确的,开发者可以明确地获知或者指定执行的顺序
并发和并行
我之前翻到了一个对比这些方案的说法(延伸链接4),其中也提到了并发和并行,说的特别形象,我加以说明:
多进程。10个厨房,10个厨子,10道菜。也是1个厨房1厨子做1道菜。
多线程。1个厨房,10个厨子,10道菜。因为厨房比较小,只能大家一起挤在里面,事实上是轮着做,而且一个厨师在做的时候其他人只能等着轮到自己。
asyncio。1个厨房,1个厨子,10道菜。听起来好像这就是一个顺序执行,但事实上,当某道菜需要炖或者其他什么耗时的烹饪方法时,可以同时做其他的菜或者做准备,最美好的场景是这个厨师一直在忙着做。
对于并发和并行我推荐看一下延伸阅读连接3的文章。并发(Concurrency)允许同时执行多个任务,这些任务可能访问相同的共享资源,例如硬盘、网络以及对应的那个单核CPU。既然会出现访问共享资源,就可能出现竞态条件,所以某个时间点事实上只有一个任务在执行,在本质上目标是当一个任务被迫等待外部资源时,通过在它们之间切换来防止任务相互阻塞,系统会有机制保证这些任务都在推进。并行(Parallelism)是指多个任务在独立分区的资源(如多个CPU内核)上并行运行,这样可以最大限度地利用硬件资源。
审核编辑:刘清
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)