- 1. 多线程
- 1.1 基础
- 1.2 线程安全
- 1.3 线程锁
- 1.4 死锁
- 1.5 线程池
- 1.6 单例模式&多线程
- 2. 多进程
- 2.1 基础
- 2.2 常见功能
- 2.3 数据共享
- 2.4 进程锁
- 2.5 进程池
- 3. 协程(了解向)
b站学习视频: 彻底搞懂python进程和线程
并发编程基础: Python并发编程基础
一个程序,至少有一个进程,一个进程中至少有一个线程,最终是线程在工作。
线程,是计算机中可以被cpu调度的最小单元(真正在工作)
进程,是计算机资源分配的最小单元(进程为线程提供资源)
GIL(Global Interpreter Lock,全局解释器锁),是CPython(Python解释器有很多,如:CPython-基于C语言、JPython-基于Java、PyPy等,详见:Python解释器有哪些)解释器特有的,一个进程中同一时刻,只有一个线程能够被调用,防止对同一对象进行 *** 作时引发bug。
因此如果想要利用CPU多核的优势,则可以创建多个进程,多个进程可以同时进行运算(计算开销较大的任务,计算密集型,需进行大量计算 *** 作)。
不需要用到计算机的多核优势,采用多线程即可(IO密集型,如数据读写、网页访问并获取结果等)。
import threading
def task(arg):
pass
# 创建一个thread对象(线程),并封装线程被CPU调度是应该执行的任务和相关参数
t = threading.Thread(target=task, args=("xxx", ))
# 线程准备就绪(等待CPU调度),代码继续向下执行
t.start()
print("继续执行...")
# 主线程执行完所有代码,不结束(等待子线程)
# 线程的常见方法
# t.start(),当前线程准备就绪(等待CPU调度,具体时间由CPU来决定)
import threading
loop = 100000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
t = threading.Thread(target=_add, args=(loop,))
t.start()
print(number)
# t.join()等待当前线程的任务执行完毕后再向下继续执行
import threading
number = 0
def _add():
global number
for i in range(1000000000):
number += 1
t = threading.Thread(target=_add))
t.start()
t.join() # 主线程等待中...
print(number)
import threading
number = 0
def _add():
global number
for i in range(1000000000):
number += 1
def _sub():
global number
for i in range(1000000000):
number -= 1
t1 = threading.Thread(target=_add))
t2 = threading.Thread(target=_sub))
t1.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.start()
t2.join() # t2线程执行完毕,才继续往后走
print(number)
import threading
number = 0
def _add():
global number
for i in range(1000000000):
number += 1
def _sub():
global number
for i in range(1000000000):
number -= 1
t1 = threading.Thread(target=_add))
t2 = threading.Thread(target=_sub))
# 过程中存在线程切换(加、减都是对number对象进行的 *** 作,所以在线程切换过程中,会导致无法获取预期结果,会造成数据混乱),从而导致数据错误
t1.start()
t2.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走
print(number)
# t.setDaemon(布尔值),守护线程(必须放在start之前)
# t.setDaemon(True),设置为守护线程,主线程执行完毕后,子线程也自动关闭
# t.setDaemon(False),设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束。
(默认)
import threading
import time
def task(arg):
time.sleep(5)
print("任务")
t1 = threading.Thread(target=task, args=(11,)))
t.setDaemon(True) # True/False
t.start()
print("END")
# 线程名称的设置和获取
import threading
def task(arg):
# 获取当前执行此代码的线程
name = threading.current_thread().getName()
print(name)
for i in range(10):
t = threading.Thread(target=task, args=(11, ))
t.setName("日魔-{}".format(i))
t.start()
# 自定义线程类,直接将线程需要做的事写到run方法中
import threading
class MyThread(threading.Thread):
def run(self):
print("执行此线程", self._args)
t = MyThread(args=(100,)))
t.start()
import requests
import threading
class DouYinThread(threading.Thread):
def run(self):
file_name, video_url = self._args
res = requests.get(video_url)
with open(file_name, mode="wb") as f:
f.write(res.content)
url_list = [
("东北F4模仿秀.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0300f570000bvbmace0gvch7lo53oog"),
("卡特扣篮.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f3e0000bv52fpn5t6p007e34qlg"),
("罗斯mvp.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f240000buuer5aa4tij4gv6ajqg")
]
for item in url_list:
t = DouYinThread(args=(item[0], item[1]))
t.start()
1.2 线程安全
一个进程中可以有多个线程,且线程共享所有进程中的资源
多个线程同时去 *** 作一个对象,会导致数据混乱的现象,如1.1中提到的包含+-运算的两个多线程任务。
# 1. 加锁保证线程安全,申请到锁,释放后,别的线程才能申请释放锁,都为同一把锁
# 案例1:
import threading
loop = 10000000
number = 0
# 全局定义一个锁,用同一个锁才能起到安全保护的↔作用
lock_obj = threading.Lock()
def _add(count):
# 申请锁
lock_obj.acquire()
global number
for i in range(count):
number += 1
# 释放锁
lock_obj.release()
def _sub(count):
# 申请锁
lock_obj.acquire()
global number
for i in range(count):
number -= 1
# 释放锁
lock_obj.release()
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join()
t2.join()
print(number)
# 案例2:
import threading
number = 0
# 全局定义一个锁,用同一个锁才能起到安全保护的↔作用
lock_obj = threading.Lock()
def task():
# 申请锁
lock_obj.acquire()
global number
for i in range(100000000):
number += 1
# 释放锁
lock_obj.release()
print(number)
for i in range(2):
t = threading.Thread(target=task)
t.start()
# 锁申请释放的写法还可以是with lock_obj:
import threading
number = 0
# 全局定义一个锁,用同一个锁才能起到安全保护的↔作用
lock_obj = threading.Lock()
def task():
# 申请锁,并在执行完之后释放锁
with lock_obj:
global number
for i in range(100000000):
number += 1
print(number)
for i in range(2):
t = threading.Thread(target=task)
t.start()
# 在python中许多数据类型集成了锁的机制,在使用时不需要进行自行设置锁
# 例如:列表的append *** 作
import threading
data_list = []
def task():
print("开始")
for i in range(1000000):
data_list.append(i)
print(len(data_list))
for i in range(2):
t = threading.Thread(target=task)
t.start()
"""
1. 默认线程安全的 *** 作
List1.apend(x)
List1.extend(List2)
x = List1[i]
x = List1.pop()
List1[i:j] = L2
List1.sort()
x = y
x.field = y
dict1[x] = y
dict1.update(dict2)
dict1.keys()
2. 默认不安全的 *** 作(需要手动加锁)
i = i + 1
List1.append(List1[-1])
List1[i] = List1[j]
dict1[x] = dict1[x] + 1
需要通过官方开发文档查询,那些 *** 作不会造成数据混乱
"""
1.3 线程锁
在程序中手动加锁,一般有两种:Lock和RLock
# Lock(同步锁)就是上面介绍的锁,RLock(递归锁),和Lock用法类似
# RLock
import threading
number = 0
# 全局定义一个锁,用同一个锁才能起到安全保护的↔作用
lock_obj = threading.RLock()
def task():
# 申请锁
lock_obj.acquire()
global number
for i in range(100000000):
number += 1
# 释放锁
lock_obj.release()
print(number)
for i in range(2):
t = threading.Thread(target=task)
t.start()
# Lock不支持锁的嵌套,RLock支持锁的嵌套,锁的嵌套如下
import threading
lock_obj = threading.RLock()
def task():
print("开始")
# 申请锁
lock_obj.acquire()
lock_obj.acquire()
print(123)
# 释放锁
lock_obj.release()
lock_obj.release()
for i in range(3):
t = threading.Thread(target=task)
t.start()
# 若开发过程中,存在函数嵌套,则需要用RLock,虽然Lock单次申请释放快于RLOCK,但是容易因为嵌套问题导致死锁,因此在开发过程中常用RLock
import threading
lock_obj = threading.RLock()
import threading
lock_obj = threading.RLock()
# 程序员A开发了一个函数,函数可以被其他开发者调用,
def func():
with lock_obj:
pass
# 程序员B开发了一个函数,可以直接调用这个函数
def run():
print("其他功能")
func()
print("其他功能")
# 程序员C开发了一个函数,自己需要加锁,同时也需要调用func函数
def Process():
with lock_obj:
print("其他功能")
func()
print("其他功能")
1.4 死锁
# 除了上述死锁现象,还存在以下容易发生死锁的现象
import threading
import time
lock1 = threading.RLock()
lock2 = threading.RLock()
def task1():
lock1.acquire()
time.sleep(1)
lock2.acquire()
print(11)
lock2.release()
print(111)
lock1.release()
print(111)
def task2():
lock2.acquire()
time.sleep(1)
lock1.acquire()
print(11)
lock1.release()
print(111)
lock2.release()
print(111)
t1 = threading.Thread(target=task1)
t1.start()
t2 = threading.Thread(target=task1)
t2.start()
1.5 线程池
# 线程开得过多,也会导致程序运行速度降低
# 因此需要引入线程池,重复利用子线程,避免大量线程的开启和释放
# 示例1
import time
from concurrent.futures import ThreadPoolExecutor
def task(video_url):
print("开始执行任务", video_url)
time.sleep(5)
# 创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)
url_list = ["www.xxx-{}.com".format(i) for i in range(300)]
for url in url_list:
# 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程执行任务
pool.submit(task, url, 2)
print("END")
# 示例2:等待线程池的任务执行完毕
import time
from concurrent.futures import ThreadPoolExecutor
def task(video_url):
print("开始执行任务", video_url)
time.sleep(5)
# 创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)
url_list = ["www.xxx-{}.com".format(i) for i in range(300)]
for url in url_list:
# 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程执行任务
pool.submit(task, url)
print("执行中......")
pool.shutdown(True) # 等待线程池中的任务执行完毕后,再继续执行后面的任务
print("继续往下走")
# 示例3:任务执行完后,再干点其他事
import time
import random
from concurrent.futures import ThreadPoolExecutor, Future
def task(video_url):
print("开始执行任务", video_url)
time.sleep(2)
return random.randint(0, 10)
def done(response):
print("任务执行后的返回值", response.result())
def outer(file_name):
def save(response):
print('存储 *** 作')
return save
# 创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)
url_list = ["www.xxx-{}.com".format(i) for i in range(300)]
for i in range(len(url_list)):
# 在线程池中提交任务
future = pool.submit(task, url[i])
# 获取线程池的结果,并进行其他 *** 作
future.add_done_callback(done)
file_name = "{}.png".format(i)
# 以文件存储为例,以闭包的形式构建存储函数
future.add_done_callback(outer(file_name))
# 可以做分工,如:task专门下载,done专门将下载的数据写入本地
# 示例4:先执行任务,然后最终统一获取结果
import time
import random
from concurrent.futures import ThreadPoolExecutor, Future
def task(video_url):
print("开始执行任务", video_url)
time.sleep(2)
return random.randint(0, 10)
# 创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)
future_list = []
url_list = ["www.xxx-{}.com".format(i) for i in range(300)]
for url in url_list:
# 在线程池中提交任务
future = pool.submit(task, url)
# 获取线程池的结果,并进行其他 *** 作
future_list.append(future)
pool.shutdown()
for fu in future_list:
print(fu.result())
1.6 单例模式&多线程
单例模式的类,每次申明用的都是同一个地址,也会覆盖更新类对象的属性。
# 简单地实现单例模式
class Singleton:
instance = None
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
# 返回空对象
if cls.instance:
return cls.instance
cls.instance = object.__new__(cls) # 创建空对象
return cls.instance
obj1 = Singleton("alex")
obj2 = Singleton("SB")
# 会发现二者的地址是一样的
print(obj1, obj2)
# 多线程中的单例模式存在隐患
import threading
import time
class Singleton:
instance = None
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
# 返回空对象
if cls.instance:
return cls.instance
time.sleep(0.1)
cls.instance = object.__new__(cls)
return cls.instance
def task():
obj = Singleton("x")
print(obj)
for i in range(10):
# 会无法保证单例模式的特性,有的对象地址会不一样
t = threading.Thread(target=task)
t.start()
# 解决方案,采用锁的方式进行控制
import threading
import time
class Singleton:
instance = None
lock = threading.RLock()
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
if cls.instance:
return cls.instance
# 返回空对象
with cls.lock:
if cls.instance:
return cls.instance
cls.instance = object.__new__(cls)
return cls.instance
def task():
obj = Singleton("x")
print(obj)
for i in range(10):
# 会无法保证单例模式的特性,有的对象地址会不一样
t = threading.Thread(target=task)
t.start()
2. 多进程
2.1 基础
进程和进程之间是相互隔离的,利用CPU多核优势采用多进程完成任务。
import multiprocessing
def task():
pass
# 在windows或mac中,使用多进程必须要在一下条件if __name__ == "__main__"下才能正确运行,在linux系统中就不需要,若想要在mac中能够正常运行,也可以将运行设置改为fork
if __name__ == "__main__":
p1 = multiprocessing.Process(target=task)
p1.start()
# 创建进程的三种模式
import multiprocessing
multiprocessing.set_start_method("fork")
# 模式1:fork,拷贝几乎所有的资源,复制对象进行 *** 作,不会改变原来的对象,可以支持对特殊对象的传递,如文件对象、锁对象等。
import multiprocessing
import time
def task():
print(name)
name.append(123)
if __name__ == "__main__":
# 复制对象进行运算
multiprocessing.set_start_method("fork")
name = []
p1 = multiprocessing.Process(target=task)
p1.start()
time.sleep(2)
print(name) # []
import multiprocessing
import time
def task():
print(name)
# 子进程是复制了一个文件对象,已经有了zZz这个内容,再更新alex
file_object.write("alex\n")
# 将zZz和alex写入文件
file_object.flush()
if __name__ == "__main__":
# 复制对象进行运算
multiprocessing.set_start_method("fork")
name = []
file_object = open("x1.txt", "a+", encoding="utf-8")
# 主进程中,在缓存中先写入了一个zZz,还未写入文件
file_object.write("zZz\n")
p1 = multiprocessing.Process(target=task)
# 主进程等待子进程执行完毕后,将内容写入文件
p1.start()
# 文件中被写入的内容为
"""
zZz
alex
zZz
"""
# window只支持spawn模式
# 模式2:spawn,不支持主进程特殊对象的传递,需要在子进程中进行创建
import multiprocessing
import time
def task(name):
print(name)
name.append(123)
if __name__ == "__main__":
# 通过参数传递必备资源,也是拷贝对象
multiprocessing.set_start_method("spawn")
name = []
p1 = multiprocessing.Process(target=task, args=(name, ))
p1.start()
time.sleep(2)
print(name) # []
# 模式3: forkserver
import multiprocessing
import time
def task(name):
print(name)
name.append(123)
if __name__ == "__main__":
# 通过参数传递必备资源,也是拷贝对象
multiprocessing.set_start_method("forkserver")
name = []
p1 = multiprocessing.Process(target=task, args=(name, ))
p1.start()
time.sleep(2)
print(name) # []
2.2 常见功能
进程中的方法和线程都是相似的
# p.start(),当前进程准备就绪,等待被CPU调度()
# p.join(),等待当前进程的任务执行完毕,再向下继续执行
# p.daemon=True/False 守护进程
# 进程名称的设置和获取
import multiprocessing
import time
def task(arg):
time.sleep(2)
print("当前进程名称为: ", multiprocessing.current_process().name)
if __name__ == "__main__":
# 复制对象进行运算
multiprocessing.set_start_method("spawn")
p = multiprocessing.Process(target=task, args=("xxx",))
p.name = "hhhh"
p.start()
print("继续执行")
# 自定义进程类
import multiprocessing
import threading
import time
import os
class MyProcess(multiprocessing.Process):
# 获取进程pid和父进程PID
print(os.getpid(), os.getppid())
# 获取进程中的所有线程总数
print(len(threading.enumerate()))
def run(self) -> None:
print("执行此进程", self._args)
if __name__ == "__main__":
# 复制对象进行运算
multiprocessing.set_start_method("spawn")
p = MyProcess(args=("xxx",))
p.name = "hhhh"
p.start()
print("继续执行")
# 判断CPU个数
import multiprocessing
print(multiprocessing.cpu_count())
2.3 数据共享
# 方式一:基于Value和Array(基本不用这种方式),基于C语言开发的基础
from multiprocessing import Process, Value, Array
def func(n, m1, m2):
n.value = 888
m1.value = "a".encode("utf-8")
m2.value = "舞"
if __name__ == "__main__":
# i表示整数,c表示字符,u表示中文字符
num = Value("i", 666)
v1 = Value("c")
v2 = Value("u")
p = Process(target=func, args=(num, v1, v2))
p.start()
p.join()
print(num.value) # 888
print(v1.value) # a
print(v2.value) # 舞
Value不同字符的含义,C语言中的数据类型,规则也需要跟C语言相对应:
# 方式二:基于Manager(用的较多)
from multiprocessing import Process, Manager
def f(d, l):
d[1] = "1"
d["2"] = 2
d[0.25] = None
l.append(666)
if __name__ == "__main__":
with Manager() as manager:
d = manager.dict()
l = manager.list()
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
"""
交换的方式
"""
# 方式三:基于队列(用的较多)
import multiprocessing
def task(q):
for i in range(10):
q.put(i)
if __name__ == "__main__":
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=task, args=(queue, ))
p.start()
p.join()
print("主进程")
print(queue.get()) # 0
print(queue.get()) # 1
print(queue.get()) # 2
print(queue.get()) # 3
print(queue.get()) # 4
# 方式四:基于pipe
import multiprocessing
import time
def task(conn):
time.sleep(1)
conn.send([111, 22, 33, 44])
data = conn.recv() # 阻塞
print("子进程接收: ", data)
time.sleep(2)
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=task, args=(child_conn, ))
p.start()
info = parent_conn.recv() # 阻塞
print("主进程接收: ", info)
parent_conn.send(666)
# 还有很多其他的方式,如redis、数据库等
2.4 进程锁
多个进程共享资源时、对同一个文件进行读写 *** 作时,需要设置进程锁
# spawn模式中,进程锁可以被当作参数传入,线程锁不可以
import multiprocessing
import time
def task(lock):
print("开始")
lock.acquire()
with open("f1.txt", mode="r", encoding="utf-8") as f:
current_num = int(f.read())
print("排队抢票了")
time.sleep(0.5)
current_num -= 1
with open("f1.txt", mode="w", encoding="utf-8") as f:
f.write(str(current_num))
lock.release()
if __name__ == "__main__":
multiprocessing.set_start_method("spawn")
# 进程锁
lock = multiprocessing.RLock()
for i in range(10):
p = multiprocessing.Process(target=task, args=(lock,))
p.start()
# spawn模式,需要特殊处理
time.sleep(7)
2.5 进程池
from concurrent.futures import ProcessPoolExecutor
import time
def task(num):
print("执行", num)
time.sleep(2)
if __name__ == "__main__":
pool = ProcessPoolExecutor(4)
for i in range(10):
pool.submit(task, i)
pool.shutdown(True)
print(1)
from concurrent.futures import ProcessPoolExecutor
import time
def task(num):
print("执行", num)
time.sleep(2)
def done(res):
print(multiprocessing.current_process())
print(res.result())
if __name__ == "__main__":
pool = ProcessPoolExecutor(4)
for i in range(10):
fur = pool.submit(task, i)
# 在线程池中是子线程执行后续函数,而在进程中是主进程来完成
fur.add_done_callback(done)
print(multiprocessing.current_process())
pool.shutdown(True)
print(1)
"""
在线程池中使用锁,必须使用manager.RLock()
"""
3. 协程(了解向)
python中有多种方式可以实现协程
- greenlet
pip3 install greenlet
from greenlet import greenlet
def func1():
print(1)
gr2.switch() # 切换到func2
print(2)
gr2.switch() # 切换到func2,从上一次的位置继续执行
def func2():
print(3)
gr1.switch() # 切换到func1,从上一次的位置继续执行
print(4)
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 执行func1
- yield
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)
- asyncio
import asyncio
async def func1():
print(1)
await asyncio.sleep(2)
print(2)
async def func2():
print(3)
await asyncio.sleep(2)
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)