【并发编程】Python并发编程多线程、多进程、多协程进阶篇

【并发编程】Python并发编程多线程、多进程、多协程进阶篇,第1张

文章目录
  • 1. 多线程
    • 1.1 基础
    • 1.2 线程安全
    • 1.3 线程锁
    • 1.4 死锁
    • 1.5 线程池
    • 1.6 单例模式&多线程
  • 2. 多进程
    • 2.1 基础
    • 2.2 常见功能
    • 2.3 数据共享
    • 2.4 进程锁
    • 2.5 进程池
  • 3. 协程(了解向)

b站学习视频: 彻底搞懂python进程和线程
并发编程基础: Python并发编程基础

一个程序,至少有一个进程,一个进程中至少有一个线程,最终是线程在工作。


线程,是计算机中可以被cpu调度的最小单元(真正在工作)
进程,是计算机资源分配的最小单元(进程为线程提供资源)

GIL(Global Interpreter Lock,全局解释器锁),是CPython(Python解释器有很多,如:CPython-基于C语言、JPython-基于Java、PyPy等,详见:Python解释器有哪些)解释器特有的,一个进程中同一时刻,只有一个线程能够被调用,防止对同一对象进行 *** 作时引发bug。


因此如果想要利用CPU多核的优势,则可以创建多个进程,多个进程可以同时进行运算(计算开销较大的任务,计算密集型,需进行大量计算 *** 作)。


不需要用到计算机的多核优势,采用多线程即可(IO密集型,如数据读写、网页访问并获取结果等)。

1. 多线程 1.1 基础
import threading

def task(arg):
	pass

# 创建一个thread对象(线程),并封装线程被CPU调度是应该执行的任务和相关参数
t = threading.Thread(target=task, args=("xxx", ))
# 线程准备就绪(等待CPU调度),代码继续向下执行
t.start()
print("继续执行...")
# 主线程执行完所有代码,不结束(等待子线程)
# 线程的常见方法
# t.start(),当前线程准备就绪(等待CPU调度,具体时间由CPU来决定)
import threading
loop = 100000000
number = 0
def _add(count):
	global number
	for i in range(count):
		number += 1

t = threading.Thread(target=_add, args=(loop,))
t.start()
print(number)
# t.join()等待当前线程的任务执行完毕后再向下继续执行
import threading
number = 0
def _add():
	global number
	for i in range(1000000000):
		number += 1

t = threading.Thread(target=_add))
t.start()
t.join() # 主线程等待中...
print(number)

import threading
number = 0
def _add():
	global number
	for i in range(1000000000):
		number += 1

def _sub():
	global number
	for i in range(1000000000):
		number -= 1

t1 = threading.Thread(target=_add))
t2 = threading.Thread(target=_sub))
t1.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.start()
t2.join() # t2线程执行完毕,才继续往后走
print(number)

import threading
number = 0
def _add():
	global number
	for i in range(1000000000):
		number += 1

def _sub():
	global number
	for i in range(1000000000):
		number -= 1

t1 = threading.Thread(target=_add))
t2 = threading.Thread(target=_sub))
# 过程中存在线程切换(加、减都是对number对象进行的 *** 作,所以在线程切换过程中,会导致无法获取预期结果,会造成数据混乱),从而导致数据错误
t1.start()
t2.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走
print(number)
# t.setDaemon(布尔值),守护线程(必须放在start之前)
# t.setDaemon(True),设置为守护线程,主线程执行完毕后,子线程也自动关闭
# t.setDaemon(False),设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束。

(默认) import threading import time def task(arg): time.sleep(5) print("任务") t1 = threading.Thread(target=task, args=(11,))) t.setDaemon(True) # True/False t.start() print("END") # 线程名称的设置和获取 import threading def task(arg): # 获取当前执行此代码的线程 name = threading.current_thread().getName() print(name) for i in range(10): t = threading.Thread(target=task, args=(11, )) t.setName("日魔-{}".format(i)) t.start() # 自定义线程类,直接将线程需要做的事写到run方法中 import threading class MyThread(threading.Thread): def run(self): print("执行此线程", self._args) t = MyThread(args=(100,))) t.start() import requests import threading class DouYinThread(threading.Thread): def run(self): file_name, video_url = self._args res = requests.get(video_url) with open(file_name, mode="wb") as f: f.write(res.content) url_list = [ ("东北F4模仿秀.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0300f570000bvbmace0gvch7lo53oog"), ("卡特扣篮.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f3e0000bv52fpn5t6p007e34qlg"), ("罗斯mvp.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f240000buuer5aa4tij4gv6ajqg") ] for item in url_list: t = DouYinThread(args=(item[0], item[1])) t.start()

1.2 线程安全

一个进程中可以有多个线程,且线程共享所有进程中的资源
多个线程同时去 *** 作一个对象,会导致数据混乱的现象,如1.1中提到的包含+-运算的两个多线程任务。

# 1. 加锁保证线程安全,申请到锁,释放后,别的线程才能申请释放锁,都为同一把锁
# 案例1:
import threading

loop = 10000000
number = 0

# 全局定义一个锁,用同一个锁才能起到安全保护的↔作用
lock_obj = threading.Lock()


def _add(count):
    # 申请锁
    lock_obj.acquire()
    global number
    for i in range(count):
        number += 1
    # 释放锁
    lock_obj.release()


def _sub(count):
    # 申请锁
    lock_obj.acquire()
    global number
    for i in range(count):
        number -= 1
    # 释放锁
    lock_obj.release()

t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()

t1.join()
t2.join()

print(number)
# 案例2:
import threading

number = 0

# 全局定义一个锁,用同一个锁才能起到安全保护的↔作用
lock_obj = threading.Lock()


def task():
    # 申请锁
    lock_obj.acquire()
    global number
    for i in range(100000000):
        number += 1
    # 释放锁
    lock_obj.release()
    print(number)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()
# 锁申请释放的写法还可以是with lock_obj:
import threading

number = 0

# 全局定义一个锁,用同一个锁才能起到安全保护的↔作用
lock_obj = threading.Lock()


def task():
    # 申请锁,并在执行完之后释放锁
    with lock_obj:
        global number
        for i in range(100000000):
            number += 1
    print(number)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()
# 在python中许多数据类型集成了锁的机制,在使用时不需要进行自行设置锁
# 例如:列表的append *** 作
import threading

data_list = []


def task():
    print("开始")
    for i in range(1000000):
        data_list.append(i)
    print(len(data_list))


for i in range(2):
    t = threading.Thread(target=task)
    t.start()
"""
1. 默认线程安全的 *** 作
List1.apend(x)
List1.extend(List2)
x = List1[i]
x = List1.pop()
List1[i:j] = L2
List1.sort()
x = y
x.field = y
dict1[x] = y
dict1.update(dict2)
dict1.keys()
2. 默认不安全的 *** 作(需要手动加锁)
i = i + 1
List1.append(List1[-1])
List1[i] = List1[j]
dict1[x] = dict1[x] + 1

需要通过官方开发文档查询,那些 *** 作不会造成数据混乱
"""
1.3 线程锁

在程序中手动加锁,一般有两种:Lock和RLock

# Lock(同步锁)就是上面介绍的锁,RLock(递归锁),和Lock用法类似
# RLock
import threading

number = 0

# 全局定义一个锁,用同一个锁才能起到安全保护的↔作用
lock_obj = threading.RLock()


def task():
    # 申请锁
    lock_obj.acquire()
    global number
    for i in range(100000000):
        number += 1
    # 释放锁
    lock_obj.release()
    print(number)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()
# Lock不支持锁的嵌套,RLock支持锁的嵌套,锁的嵌套如下
import threading

lock_obj = threading.RLock()


def task():
    print("开始")
    # 申请锁
    lock_obj.acquire()
    lock_obj.acquire()
    print(123)
    # 释放锁
    lock_obj.release()
    lock_obj.release()


for i in range(3):
    t = threading.Thread(target=task)
    t.start()
# 若开发过程中,存在函数嵌套,则需要用RLock,虽然Lock单次申请释放快于RLOCK,但是容易因为嵌套问题导致死锁,因此在开发过程中常用RLock
import threading
lock_obj = threading.RLock()
import threading

lock_obj = threading.RLock()


# 程序员A开发了一个函数,函数可以被其他开发者调用,
def func():
    with lock_obj:
        pass


# 程序员B开发了一个函数,可以直接调用这个函数
def run():
    print("其他功能")
    func()
    print("其他功能")


# 程序员C开发了一个函数,自己需要加锁,同时也需要调用func函数
def Process():
    with lock_obj:
        print("其他功能")
        func()
        print("其他功能")
1.4 死锁
# 除了上述死锁现象,还存在以下容易发生死锁的现象
import threading
import time

lock1 = threading.RLock()
lock2 = threading.RLock()


def task1():
    lock1.acquire()
    time.sleep(1)
    lock2.acquire()
    print(11)
    lock2.release()
    print(111)
    lock1.release()
    print(111)


def task2():
    lock2.acquire()
    time.sleep(1)
    lock1.acquire()
    print(11)
    lock1.release()
    print(111)
    lock2.release()
    print(111)

t1 = threading.Thread(target=task1)
t1.start()

t2 = threading.Thread(target=task1)
t2.start()
1.5 线程池
# 线程开得过多,也会导致程序运行速度降低
# 因此需要引入线程池,重复利用子线程,避免大量线程的开启和释放
# 示例1
import time
from concurrent.futures import ThreadPoolExecutor


def task(video_url):
    print("开始执行任务", video_url)
    time.sleep(5)

# 创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)

url_list = ["www.xxx-{}.com".format(i) for i in range(300)]

for url in url_list:
    # 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程执行任务
    pool.submit(task, url, 2)

print("END")
# 示例2:等待线程池的任务执行完毕
import time
from concurrent.futures import ThreadPoolExecutor


def task(video_url):
    print("开始执行任务", video_url)
    time.sleep(5)

# 创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)

url_list = ["www.xxx-{}.com".format(i) for i in range(300)]

for url in url_list:
    # 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程执行任务
    pool.submit(task, url)

print("执行中......")
pool.shutdown(True) # 等待线程池中的任务执行完毕后,再继续执行后面的任务
print("继续往下走")

# 示例3:任务执行完后,再干点其他事
import time
import random
from concurrent.futures import ThreadPoolExecutor, Future


def task(video_url):
    print("开始执行任务", video_url)
    time.sleep(2)
    return random.randint(0, 10)


def done(response):
    print("任务执行后的返回值", response.result())

def outer(file_name):
	def save(response):
		print('存储 *** 作')
	return save
	
# 创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)

url_list = ["www.xxx-{}.com".format(i) for i in range(300)]

for i in range(len(url_list)):
    # 在线程池中提交任务
    future = pool.submit(task, url[i])
    # 获取线程池的结果,并进行其他 *** 作
    future.add_done_callback(done)
    file_name = "{}.png".format(i)
    # 以文件存储为例,以闭包的形式构建存储函数
    future.add_done_callback(outer(file_name))

# 可以做分工,如:task专门下载,done专门将下载的数据写入本地

# 示例4:先执行任务,然后最终统一获取结果
import time
import random
from concurrent.futures import ThreadPoolExecutor, Future


def task(video_url):
    print("开始执行任务", video_url)
    time.sleep(2)
    return random.randint(0, 10)
    
# 创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)

future_list = []

url_list = ["www.xxx-{}.com".format(i) for i in range(300)]

for url in url_list:
    # 在线程池中提交任务
    future = pool.submit(task, url)
    # 获取线程池的结果,并进行其他 *** 作
    future_list.append(future)

pool.shutdown()
for fu in future_list:
    print(fu.result())
1.6 单例模式&多线程

单例模式的类,每次申明用的都是同一个地址,也会覆盖更新类对象的属性。

# 简单地实现单例模式
class Singleton:
    instance = None

    def __init__(self, name):
        self.name = name

    def __new__(cls, *args, **kwargs):
        # 返回空对象
        if cls.instance:
            return cls.instance
        cls.instance = object.__new__(cls)  # 创建空对象
        return cls.instance

obj1 = Singleton("alex")
obj2 = Singleton("SB")

# 会发现二者的地址是一样的
print(obj1, obj2)

# 多线程中的单例模式存在隐患
import threading
import time


class Singleton:
    instance = None

    def __init__(self, name):
        self.name = name

    def __new__(cls, *args, **kwargs):
        # 返回空对象
        if cls.instance:
            return cls.instance
        time.sleep(0.1)
        cls.instance = object.__new__(cls)
        return cls.instance


def task():
    obj = Singleton("x")
    print(obj)


for i in range(10):
    # 会无法保证单例模式的特性,有的对象地址会不一样
    t = threading.Thread(target=task)
    t.start()

# 解决方案,采用锁的方式进行控制
import threading
import time


class Singleton:
    instance = None
    lock = threading.RLock()

    def __init__(self, name):
        self.name = name

    def __new__(cls, *args, **kwargs):
    	if cls.instance:
                return cls.instance
        # 返回空对象
        with cls.lock:
            if cls.instance:
                return cls.instance
            cls.instance = object.__new__(cls)
            return cls.instance


def task():
    obj = Singleton("x")
    print(obj)


for i in range(10):
    # 会无法保证单例模式的特性,有的对象地址会不一样
    t = threading.Thread(target=task)
    t.start()
2. 多进程 2.1 基础

进程和进程之间是相互隔离的,利用CPU多核优势采用多进程完成任务。

import multiprocessing


def task():
    pass


# 在windows或mac中,使用多进程必须要在一下条件if __name__ == "__main__"下才能正确运行,在linux系统中就不需要,若想要在mac中能够正常运行,也可以将运行设置改为fork
if __name__ == "__main__":
    p1 = multiprocessing.Process(target=task)
    p1.start()

# 创建进程的三种模式
import multiprocessing

multiprocessing.set_start_method("fork")
# 模式1:fork,拷贝几乎所有的资源,复制对象进行 *** 作,不会改变原来的对象,可以支持对特殊对象的传递,如文件对象、锁对象等。

import multiprocessing import time def task(): print(name) name.append(123) if __name__ == "__main__": # 复制对象进行运算 multiprocessing.set_start_method("fork") name = [] p1 = multiprocessing.Process(target=task) p1.start() time.sleep(2) print(name) # [] import multiprocessing import time def task(): print(name) # 子进程是复制了一个文件对象,已经有了zZz这个内容,再更新alex file_object.write("alex\n") # 将zZz和alex写入文件 file_object.flush() if __name__ == "__main__": # 复制对象进行运算 multiprocessing.set_start_method("fork") name = [] file_object = open("x1.txt", "a+", encoding="utf-8") # 主进程中,在缓存中先写入了一个zZz,还未写入文件 file_object.write("zZz\n") p1 = multiprocessing.Process(target=task) # 主进程等待子进程执行完毕后,将内容写入文件 p1.start() # 文件中被写入的内容为 """ zZz alex zZz """ # window只支持spawn模式 # 模式2:spawn,不支持主进程特殊对象的传递,需要在子进程中进行创建 import multiprocessing import time def task(name): print(name) name.append(123) if __name__ == "__main__": # 通过参数传递必备资源,也是拷贝对象 multiprocessing.set_start_method("spawn") name = [] p1 = multiprocessing.Process(target=task, args=(name, )) p1.start() time.sleep(2) print(name) # [] # 模式3: forkserver import multiprocessing import time def task(name): print(name) name.append(123) if __name__ == "__main__": # 通过参数传递必备资源,也是拷贝对象 multiprocessing.set_start_method("forkserver") name = [] p1 = multiprocessing.Process(target=task, args=(name, )) p1.start() time.sleep(2) print(name) # []

2.2 常见功能

进程中的方法和线程都是相似的

# p.start(),当前进程准备就绪,等待被CPU调度()
# p.join(),等待当前进程的任务执行完毕,再向下继续执行
# p.daemon=True/False 守护进程
# 进程名称的设置和获取
import multiprocessing
import time


def task(arg):
    time.sleep(2)
    print("当前进程名称为: ", multiprocessing.current_process().name)


if __name__ == "__main__":
    # 复制对象进行运算
    multiprocessing.set_start_method("spawn")

    p = multiprocessing.Process(target=task, args=("xxx",))
    p.name = "hhhh"
    p.start()
    
    print("继续执行")
# 自定义进程类
import multiprocessing
import threading
import time
import os

class MyProcess(multiprocessing.Process):
	# 获取进程pid和父进程PID
	print(os.getpid(), os.getppid())
	# 获取进程中的所有线程总数
	print(len(threading.enumerate()))
    def run(self) -> None:
        print("执行此进程", self._args)



if __name__ == "__main__":
    # 复制对象进行运算
    multiprocessing.set_start_method("spawn")

    p = MyProcess(args=("xxx",))
    p.name = "hhhh"
    p.start()

    print("继续执行")
    
# 判断CPU个数
import multiprocessing
print(multiprocessing.cpu_count())
2.3 数据共享
# 方式一:基于Value和Array(基本不用这种方式),基于C语言开发的基础
from multiprocessing import Process, Value, Array


def func(n, m1, m2):
    n.value = 888
    m1.value = "a".encode("utf-8")
    m2.value = "舞"


if __name__ == "__main__":
	# i表示整数,c表示字符,u表示中文字符
    num = Value("i", 666)
    v1 = Value("c")
    v2 = Value("u")

    p = Process(target=func, args=(num, v1, v2))
    p.start()
    p.join()

    print(num.value)  # 888
    print(v1.value)  # a
    print(v2.value)  # 舞

Value不同字符的含义,C语言中的数据类型,规则也需要跟C语言相对应:

# 方式二:基于Manager(用的较多)
from multiprocessing import Process, Manager


def f(d, l):
    d[1] = "1"
    d["2"] = 2
    d[0.25] = None
    l.append(666)


if __name__ == "__main__":
    with Manager() as manager:
        d = manager.dict()
        l = manager.list()

    p = Process(target=f, args=(d, l))
    p.start()
    p.join()

    print(d)
    print(l)

"""
交换的方式
"""

# 方式三:基于队列(用的较多)
import multiprocessing


def task(q):
    for i in range(10):
        q.put(i)


if __name__ == "__main__":
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=task, args=(queue, ))
    p.start()
    p.join()

    print("主进程")
    print(queue.get())  # 0
    print(queue.get())  # 1
    print(queue.get())  # 2
    print(queue.get())  # 3
    print(queue.get())  # 4

# 方式四:基于pipe
import multiprocessing
import time


def task(conn):
    time.sleep(1)
    conn.send([111, 22, 33, 44])
    data = conn.recv()  # 阻塞
    print("子进程接收: ", data)
    time.sleep(2)


if __name__ == "__main__":
    parent_conn, child_conn = multiprocessing.Pipe()

    p = multiprocessing.Process(target=task, args=(child_conn, ))
    p.start()

    info = parent_conn.recv()  # 阻塞
    print("主进程接收: ", info)
    parent_conn.send(666)

# 还有很多其他的方式,如redis、数据库等
2.4 进程锁

多个进程共享资源时、对同一个文件进行读写 *** 作时,需要设置进程锁

# spawn模式中,进程锁可以被当作参数传入,线程锁不可以
import multiprocessing
import time


def task(lock):
    print("开始")
    lock.acquire()
    with open("f1.txt", mode="r", encoding="utf-8") as f:
        current_num = int(f.read())

    print("排队抢票了")
    time.sleep(0.5)
    current_num -= 1

    with open("f1.txt", mode="w", encoding="utf-8") as f:
        f.write(str(current_num))
    lock.release()


if __name__ == "__main__":
    multiprocessing.set_start_method("spawn")

    # 进程锁
    lock = multiprocessing.RLock()

    for i in range(10):
        p = multiprocessing.Process(target=task, args=(lock,))
        p.start()

    # spawn模式,需要特殊处理
    time.sleep(7)
2.5 进程池
from concurrent.futures import ProcessPoolExecutor
import time


def task(num):
    print("执行", num)
    time.sleep(2)


if __name__ == "__main__":
    pool = ProcessPoolExecutor(4)
    for i in range(10):
        pool.submit(task, i)

    pool.shutdown(True)
    print(1)

from concurrent.futures import ProcessPoolExecutor
import time


def task(num):
    print("执行", num)
    time.sleep(2)

def done(res):
	print(multiprocessing.current_process())
	print(res.result())


if __name__ == "__main__":
    pool = ProcessPoolExecutor(4)
    for i in range(10):
        fur = pool.submit(task, i)
        # 在线程池中是子线程执行后续函数,而在进程中是主进程来完成
		fur.add_done_callback(done)

	print(multiprocessing.current_process())
    pool.shutdown(True)
    print(1)

"""
在线程池中使用锁,必须使用manager.RLock()
"""
3. 协程(了解向)

python中有多种方式可以实现协程

  1. greenlet
    pip3 install greenlet
from greenlet import greenlet

def func1():
    print(1)
    gr2.switch()  # 切换到func2
    print(2)
    gr2.switch()  # 切换到func2,从上一次的位置继续执行


def func2():
    print(3)
    gr1.switch()  # 切换到func1,从上一次的位置继续执行
    print(4)

gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 执行func1
  1. yield
def func1():
    yield 1
    yield from func2()
    yield 2


def func2():
    yield 3
    yield 4

f1 = func1()
for item in f1:
    print(item)
  1. asyncio
import asyncio


async def func1():
    print(1)
    await asyncio.sleep(2)
    print(2)


async def func2():
    print(3)
    await asyncio.sleep(2)
    print(4)

tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/674855.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-19
下一篇 2022-04-19

发表评论

登录后才能评论

评论列表(0条)

保存