目录
一、线程创建的开销与线程池
1.线程创建的开销
2.线程池的设计思路
二、Python线程池及其原理和使用
1.线程池的使用
2.示例代码
一、线程创建的开销与线程池 1.线程创建的开销
对 *** 作系统来说,创建一个线程的代价是十分昂贵的,需要给它分配内存、列入调度,同时在线程切换的时候还要执行内存换页,CPU 的缓存被清空,切换回来的时候还要重新从内存中读取信息,破坏了数据的局部性。【分配内存、列入调度、内存换页、清空缓存和重新读取】
关于内存开销
Java线程的线程栈区别于堆,它是不受Java程序控制的,只受系统资源限制。默认一个线程的线程栈大小是1M,别小看这1M的空间,如果每个用户请求都新建线程的话,1024个用户光线程就占用了1个G的内存,如果系统比较大的话,一下子系统资源就不够用了,最后程序就崩溃了。
如果不对线程的创建进行有效监控、管理,风险巨大:
- 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
- 对资源无限申请缺少抑制手段,将引发系统资源耗尽的风险。
- 系统无法合理管理内部的资源分布,将降低系统的稳定性。
正是因为如此大的开销,所以几乎所有的servlet容器、web框架、rpc框架都会采用线程池化技术去提高资源复用,限制线程的随意创建。
2.线程池的设计思路首先,我们先自己想想,如果是我们自己来解决线程创建开销大的问题,会去怎么解决?下面的分析思路是比较合理,符合逻辑的:
- 资源复用:尽可能的复用资源,这里的资源当然是线程资源,提前预热全部或部分线程,由一个组件负责线程的分配和调度执行。
- 限制线程数:限制线程数是为了保护系统,避免线程创建过多导致内存不足。
这里有一个问题是,当线程不足分配时怎么办?为此有几个解决思路:
- 由提交者自行执行任务。
- 抛异常。
- 丢弃任务。可以是丢弃最老的任务或最新提交的任务。
实际上,我们上面的分析思路正好是池化管理思想的提现,线程池就是依据以上的思路进行设计的。采用这种思路有几个好处:
- 降低资源开销:由于线程是反复利用的,降低了创建线程分配资源和销毁线程的开销。
- 提高响应速度:由于线程是提前预热的,因此减少了创建线程这段时间的开销。
- 提高系统资源可管理性:使用线程池统一分配、管理和监控。
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。
线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。
Exectuor 提供了如下常用方法:
- submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
- map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
- shutdown(wait=True):关闭线程池。
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
Future 提供了如下方法:
- cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
- cancelled():返回 Future 代表的线程任务是否被成功取消。
- running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
- done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
- result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
- exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
- add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。
在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
使用线程池来执行线程任务的步骤如下:
- 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
- 定义一个普通函数作为线程任务。
- 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
- 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
下面程序示范了如何使用线程池来执行线程任务:
# -*- coding: UTF-8 -*-
"""
@Time : 2022/5/7 14:32
@Auth : rs
"""
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from logzero import logger
# 定义一个准备作为线程任务的函数
def action(max_num):
my_sum = 0
for i in range(max_num):
logger.info(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 判断future1代表的任务是否结束
logger.info(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
logger.info(future2.done())
# 查看future1代表的任务返回的结果
logger.info(future1.result())
# 查看future2代表的任务返回的结果
logger.info(future2.result())
# 关闭线程池
pool.shutdown()
参考:
线程创建的开销与线程池 - 知乎
Python线程池及其原理和使用(超级详细)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)