唤醒手腕Python全栈工程师学习笔记(并发编程篇)

唤醒手腕Python全栈工程师学习笔记(并发编程篇),第1张

唤醒手腕Python全栈工程师学习笔记(并发编程篇) 01、进程的基本介绍

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是 *** 作系统结构的基础,进程是线程的容器。

什么是进程?

进程的概念主要有两点:

第一,进程是一个实体。每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region)、数据区域(data region)和堆栈(stack region)。文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存;堆栈区域存储着活动过程调用的指令(比如错误信息打印的调用关系)和本地变量。

第二,进程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理器赋予程序生命时,它才能成为一个活动的实体,我们称其为进程。

进程与线程的区别与联系

进程的执行过程是线状的,尽管中间会发生中断或暂停,但该进程所拥有的资源只为该线状执行过程服务。一旦发生进程上下文切换,这些资源都是要被保护起来的。这是进程宏观上的执行过程。

进程又可有单线程进程与多线程进程两种。我们知道,进程有一个进程控制块PCB,相关程序段和该程序段对其进行 *** 作的数据结构集这三部分,单线程进程的执行过程在宏观上是线性的,微观上也只有单一的执行过程;而多线程进程在宏观上的执行过程同样为线性的,但微观上却可以有多个执行 *** 作(线程),如不同代码片段以及相关的数据结构集。线程的改变只代表了CPU执行过程的改变,而没有发生进程所拥有的资源变化。

除了CPU之外,计算机内的软硬件资源的分配与线程无关,线程只能共享它所属进程的资源。与进程控制表和 PCB 相似,每个线程也有自己的线程控制表TCB,而这个TCB中所保存的线程状态信息则要比PCB表少得多,这些信息主要是相关指针用堆栈(系统栈和用户栈),寄存器中的状态数据。

进程拥有一个完整的虚拟地址空间,不依赖于线程而独立存在;反之,线程是进程的一部分,没有自己的地址空间,与进程内的其他线程一起共享分配给该进程的所有资源。

进程:计算机分配资源的最小单位,线程:CPU处理器分配的最小单位。

创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去一个进程
对应在内存中就是一块独立的内存空间
多个进程对应在内存中就是多块独立的内存空间
进程与进程之间数据默认情况下是无法直接交互,如果想交互可以借助于第三方工具、模块


阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io *** 作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。

    同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态);阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。

非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。

1)子进程是从父进程中使用Process生成的,对子进程使用os.getppid()可以得到父进程的pid

2)全局变量在多个进程中不能共享,在子进程中修改全局变量,对父进程没有影响

02、mutilprocessing模块

仔细说来,multiprocessing不是一个模块而是python中一个 *** 作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。 multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

multiprocessing.Process模块

process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。

Process([group [, target [, name [, args [, kwargs]]]]])
# 由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

Process类的注意点强调:

    需要使用关键字的方式来指定参数args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

Process类参数介绍:

group参数未使用,值始终为Nonetarget表示调用对象,即子进程要执行的任务args表示调用对象的位置参数元组,args=(1,2,‘egon’,)kwargs表示调用对象的字典,kwargs={‘name’:‘egon’,‘age’:18}name为子进程的名称

Process对象方法介绍:

p.start():启动进程,并调用该子进程中的p.run()p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法p.terminate():强制终止进程p,不会进行任何清理 *** 作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁p.is_alive():如果p仍然运行,返回Truep.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

Process的对象属性介绍:

p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

p.name:进程的名称

p.pid:进程的pid

p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)

p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

类继承的方式来使用:开进程的方法

import time
import random
from multiprocessing import Process


class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s piaoing' %self.name)

        time.sleep(random.randrange(1,5))
        print('%s piao end' %self.name)

p=Piao('egon')

p.start() # start会自动调用run

print('主线程')

创建并开启子进程

from multiprocessing import Process
import time
import os


def task(name):
    print("{} tasking starting!".format(name))
    print("{} current process PID = {}".format(name, os.getpid()))
    print("{} parent’s process PID = {}".format(name, os.getppid()))
    # 查看当前进程的父进程的PID
    time.sleep(5)
    print("{} tasking ending!".format(name))


if __name__ == "__main__":
    process_1 = Process(target=task, args=("唤醒手腕",))
    process_2 = Process(target=task, args=("燕小姐", ))
    process_1.start()
    process_2.start()
    process_1.join() # 主进程等待子进程结束
    process_2.join() # 主进程等待子进程结束	
    print("__main__ processing!")

查看进程是否存活:is_alive()

告诉 *** 作系统帮你去杀死当前进程但是需要一定的时间而代码的运行速度极快。

    process_1 = Process(target=task, args=("唤醒手腕",))
    print(process_1.is_alive())
    process_1.start()
    print(process_1.is_alive())
    process_1.terminate()
    time.sleep(0.1) # 杀死进程需要时间,进行时间延迟,保证进程已经杀死
    print(process_1.is_alive())
03、僵尸 / 孤儿 / 守护进程

孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。

僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。

那么什么称为僵尸进程呢?

即子进程先于父进程退出后,子进程的PCB需要其父进程释放,但是父进程并没有释放子进程的PCB,这样的子进程就称为僵尸进程,僵尸进程实际上是一个已经死掉的进程。

#include
#include
#include
#include
#include
#include
 
int main()
{
	pid_t pid = fork();
 
	if(pid == 0)  // 子进程
	{
     	printf("child id is %d n", getpid());
		printf("parent id is %d n", getppid());
	}
	else  // 父进程不退出,使子进程成为僵尸进程
	{
        while(1)
		{}
	}
	exit(0);
}

我们将它挂在后台执行,可以看到结果,用ps可以看到子进程后有一个 ,defunct是已死的,僵尸的意思,可以看出这时的子进程已经是一个僵尸进程了。因为子进程已经结束,而其父进程并未释放其PCB,所以产生了这个僵尸进程。

我们也可以用 ps -aux | grep pid 查看进程状态

一个进程在调用exit命令结束自己的生命的时候,其实它并没有真正的被销毁,而是留下一个称为僵尸进程(Zombie)的数据结构(系统调用exit,它的作用是使进程退出,但也仅仅限于将一个正常的进程变成一个僵尸进程,并不能将其完全销毁)。

在Linux进程的状态中,僵尸进程是非常特殊的一种,它已经放弃了几乎所有内存空间,没有任何可执行代码,也不能被调度,仅仅在进程列表中保留一个位置,记载该进程的退出状态等信息供其他进程收集,除此之外,僵尸进程不再占有任何内存空间。

这个僵尸进程需要它的父进程来为它收尸,如果他的父进程没有处理这个僵尸进程的措施,那么它就一直保持僵尸状态,如果这时父进程结束了,那么init进程自动会接手这个子进程,为它收尸,它还是能被清除的。但是如果如果父进程是一个循环,不会结束,那么子进程就会一直保持僵尸状态,这就是为什么系统中有时会有很多的僵尸进程。

试想一下,如果有大量的僵尸进程驻在系统之中,必然消耗大量的系统资源。但是系统资源是有限的,因此当僵尸进程达到一定数目时,系统因缺乏资源而导致奔溃。所以在实际编程中,避免和防范僵尸进程的产生显得尤为重要。

孤儿进程

一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。

子进程死亡需要父进程来处理,那么意味着正常的进程应该是子进程先于父进程死亡。当父进程先于子进程死亡时,子进程死亡时没父进程处理,这个死亡的子进程就是孤儿进程。

但孤儿进程与僵尸进程不同的是,由于父进程已经死亡,系统会帮助父进程回收处理孤儿进程。所以孤儿进程实际上是不占用资源的,因为它终究是被系统回收了。不会像僵尸进程那样占用PID,损害运行系统。

#include
#include
#include
#include
#include
#include
 
int main()
{
	pid_t pid = fork();
 
	if(pid == 0)
	{
		printf("child ppid is %d n", getppid());
		sleep(10);     
		// 为了让父进程先结束
		printf("child ppid is %d n", getppid());
	}
	else
	{
		printf("parent id is %d n", getpid());
	}
 
	exit(0);
}

从执行结果来看,此时由pid == 4168父进程创建的子进程,其输出的父进程pid == 1,说明当其为孤儿进程时被init进程回收,最终并不会占用资源,这就是为什么要将孤儿进程分配给init进程。

孤儿进程、僵尸进程产生的原因:

(1) 一般进程

正常情况下:子进程由父进程创建,子进程再创建新的进程。父子进程是一个异步过程,父进程永远无法预测子进程的结束,所以,当子进程结束后,它的父进程会调用wait()或waitpid()取得子进程的终止状态,回收掉子进程的资源。

(2)孤儿进程

孤儿进程:父进程结束了,而它的一个或多个子进程还在运行,那么这些子进程就成为孤儿进程(father died)。子进程的资源由init进程(进程号PID = 1)回收。

(3)僵尸进程

僵尸进程:子进程退出了,但是父进程没有用wait或waitpid去获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中,这种进程称为僵死进程。

守护进程的介绍:

主进程创建守护进程

其一:守护进程会在主进程代码执行结束后就终止

其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)


p=Piao('egon')
p.daemon=True 
# 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
p.start()
print('主')
04、进程间变量的共享
import multiprocessing
import time

# 定义一个全局变量num_list,里面有一个元素100
num_list = [100]


# 写入数据
def write_data(num_list):
    for i in range(5):
        num_list.append(i)
        time.sleep(0.1)
    print("write_data:", num_list)  # 读取写入数据后的num_list


# 读取数据
def read_data(num_list):
    print("read_data:", num_list)  # 读取全局变量num_list


if __name__ == '__main__':
    # 创建写入数据的子进程
    write_process = multiprocessing.Process(target=write_data, args=(num_list,))
    # 创建读取数据的子进程
    read_process = multiprocessing.Process(target=read_data, args=(num_list,))
    # 开启写入子进程
    write_process.start()
    # 等待写入子进程执行完毕再继续
    write_process.join()
    # 开启读取数据子进程
    read_process.start()

运行结果:结果(证明进程间的变量是不共享的)

注:创建子进程其实是对主进程进行拷贝,进程之间相互独立,访问的全局变量不是同一个,所以进程间不共享全局变量。

主进程会等待所有的子进程执行完成程序再退出。

进程之间共享数据(数值型):

import multiprocessing
 
def func(num):
    num.value = 10.78  
    # 子进程改变数值的值,主进程跟着改变
 
if  __name__=="__main__":
    num = multiprocessing.Value("d",10.0) 
    # d表示数值,主进程与子进程共享这个value
    # 主进程与子进程都是用的同一个value
    print(num.value)
 
    p = multiprocessing.Process(target=func,args=(num,))
    p.start()
    p.join()
 
    print(num.value)

进程之间共享数据(dict,list):

import multiprocessing
 
def func(mydict,mylist):
    mydict["index1"]="aaaaaa"   #子进程改变dict,主进程跟着改变
    mydict["index2"]="bbbbbb"
    mylist.append(11)        #子进程改变List,主进程跟着改变
    mylist.append(22)
    mylist.append(33)
 
if __name__=="__main__":
    with multiprocessing.Manager() as MG:   #重命名
        mydict=multiprocessing.Manager().dict()   #主进程与子进程共享这个字典
        mylist=multiprocessing.Manager().list(range(5))   #主进程与子进程共享这个List
 
        p=multiprocessing.Process(target=func,args=(mydict,mylist))
        p.start()
        p.join()
 
        print(mylist)
        print(mydict)
05、socket并发通信

server端

from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__': #windows下start进程一定要写到这下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()

client端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

以上解决方案的隐患

每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。

解决方法:进程池

# Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())

# 开启6个客户端,会发现2个客户端处于等待状态

# 在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
from socket import *
from multiprocessing import Pool
import os

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)


def talk(conn, client_addr):
    print('进程pid: %s' % os.getpid())
    while True:
        try:
            msg = conn.recv(1024)
            if not msg: break
            conn.send(msg.upper())
        except Exception:
            break


if __name__ == '__main__':
    p = Pool()
    while True:
        conn, client_addr = server.accept()
        p.apply_async(talk, args=(conn, client_addr))
        
        # p.apply(talk,args=(conn,client_addr)) 
        
        # 同步的话,则同一时间只有一个客户端能访问

回掉函数:

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

06、Queue队列库的介绍

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

创建队列的类(底层就是以管道和锁定的方式实现):

Queue([maxsize]):
# 创建共享的进程队列,Queue是多进程安全的队列,
# 可以使用Queue实现多进程之间的数据传递。

# maxsize是队列中允许最大项数,省略则无大小限制。

Queue对象 主要方法:

q.put
# 该方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。
# 如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。
# 如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

q.get
# 方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。
# 如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。
# 如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
 
q.get_nowait() # 同q.get(False)
q.put_nowait() # 同q.put(False)

q.empty()
# 调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full()
# 调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize()
# 返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

Queue对象 其他方法(了解):

q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get() *** 作上,关闭生产者中的队列不会导致get()方法返回错误。
q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
07、进程池的使用介绍

在利用Python进行系统管理的时候,特别是同时 *** 作多个文件目录,或者远程控制多台主机,并行 *** 作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:

    很明显需要并发执行的任务通常要远大于CPU核数一个 *** 作系统不可能无限开启进程,通常有几个核就开几个进程进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

在利用Python进行系统管理的时候,特别是同时 *** 作多个文件目录,或者远程控制多台主机,并行 *** 作可以节约大量的时间。当被 *** 作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程。

Pool([numprocess  [,initializer [, initargs]]]):创建进程池
    numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值initializer:是每个工作进程启动时要执行的可调用对象,默认为Noneinitargs:是要传给initializer的参数组
p.apply(func [, args [, kwargs]])
# 在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此 *** 作并不会在所有池工作进程中并执行func函数。
# 如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]])
# 在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。
# 当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞 *** 作,否则将接收其他异步 *** 作中的结果。
   
p.close()
# 关闭进程池,防止进一步 *** 作。如果所有 *** 作持续挂起,它们将在工作进程终止前完成
P.join()
# 等待所有工作进程退出。此方法只能在close()或teminate()之后调用

方法apply_async()和map_async()的返回值是AsyncResul的实例obj,实例具有以下方法:

obj.get()
# 返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程 *** 作中引发了异常,它将在调用此方法时再次被引发。
obj.ready()
# 如果调用完成,返回True
obj.successful()
# 如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout])
# 等待结果变为可用。
obj.terminate()
# 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

同步调用apply

同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限。

from multiprocessing import Pool
import os, time


def work(n):
    print('%s run' % os.getpid())
    time.sleep(3)
    return n ** 2


if __name__ == '__main__':
    p = Pool(3)  # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    res_l = []
    for i in range(10):
        res = p.apply(work, args=(i,))
        res_l.append(res)
    print(res_l)

异步调用apply_async

异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了。

from multiprocessing import Pool
import os, time


def work(n):
    print('%s run' % os.getpid())
    time.sleep(3)
    return n ** 2


if __name__ == '__main__':
    p = Pool(3)
    # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    res_l = []
    for i in range(10):
        res = p.apply_async(work, args=(i,))
        # 同步运行,阻塞、直到本次任务执行完毕拿到res
        res_l.append(res)

    p.close()
    p.join()
    for res in res_l:
        print(res.get())
        # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

详细分析原理:

# 一:使用进程池(异步调用,apply_async)
# coding: utf-8
from multiprocessing import Process, Pool
import time


def func(msg):
    print("msg:", msg)
    time.sleep(1)
    return msg


if __name__ == "__main__":
    pool = Pool(processes=3)
    res_l = []
    for i in range(10):
        msg = "hello %d" % (i)
        res = pool.apply_async(func, (msg,))
        # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res)

    # 没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了

    pool.close()
    # 关闭进程池,防止进一步 *** 作。如果所有 *** 作持续挂起,它们将在工作进程终止前完成
    pool.join()
    # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

    print(res_l)
    # 看到的是
    # 对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果
    for i in res_l:
        print(i.get())
        # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

Process产生的子进程,默认主进程等待所有子进程执行完毕之后在终止。
Pool进程池,只要主进程跑完了,立刻终止所有程序。

08、死锁 / 递归锁 / 互斥锁
from threading import Thread,Lock
import time
noodle_lock = Lock()
kuaizi_lock = Lock()

def eat1(name):
	noodle_lock.acquire()
	print("%s拿到面条" % (name))
	kuaizi_lock.acquire()
	print("%s拿到筷子" % (name))
	
	print("开始吃")
	time.sleep(0.7)
	
	kuaizi_lock.release()
	print("%s放下筷子" % (name))
	noodle_lock.release()
	print("%s放下面条" % (name))
	
def eat2(name):
	kuaizi_lock.acquire()
	print("%s拿到筷子" % (name))
	noodle_lock.acquire()
	print("%s拿到面条" % (name))

	print("开始吃")
	time.sleep(0.7)
	
	noodle_lock.release()
	print("%s放下面条" % (name))
	kuaizi_lock.release()
	print("%s放下筷子" % (name))

if __name__ == "__main__":
	name_list1 = ["马具强","熊卫华"]
	name_list2 = ["黄熊大","黄将用"]
	for name in  name_list1:
		Thread(target=eat1,args=(name,)).start()
		
	for name in name_list2:
		Thread(target=eat2,args=(name,)).start()

运行可以发现,以上的代码,发生了死锁的情况。

递归锁

递归锁专门用来解决死锁现象,临时用于快速解决服务器崩溃异常现象,用递归锁应急,解决应急问题的。

到底什么是递归锁,为什么会存在?

递归锁原理

原理其实很简单的:就是递归锁,每开一把门,在字典里面存一份数据,退出的时候去到door1或者door2里面找到这个钥匙退出就OK了。

递归锁用于多重锁的情况,如果只是一层锁,我们不用。在实际情况下,递归锁场景用的不是特别多,所以知道就行了。

from threading import Thread,RLock

rlock = RLock()
def func(name):
	rlock.acquire()
	print(name,1)
	rlock.acquire()
	print(name,2)
	rlock.acquire()
	print(name,3)
	
	rlock.release()
	rlock.release()
	rlock.release()
lst = []
for i in range(10):
	t1 = Thread(target=func,args=("name%s" % (i) , ))
	t1.start()
	lst.append(t1)
	
for i in lst:
	i.join()
	
print("程序结束了")

用递归锁应急解决死锁现象

noodle_lock = kuaizi_lock = RLock()
def eat1(name):
	noodle_lock.acquire()
	print("%s拿到面条" % (name))
	kuaizi_lock.acquire()
	print("%s拿到筷子" % (name))
	
	print("开始吃")
	time.sleep(0.7)
	
	kuaizi_lock.release()
	print("%s放下筷子" % (name))
	noodle_lock.release()
	print("%s放下面条" % (name))
	
def eat2(name):
	kuaizi_lock.acquire()
	print("%s拿到筷子" % (name))
	noodle_lock.acquire()
	print("%s拿到面条" % (name))

	print("开始吃")
	time.sleep(0.7)
	
	noodle_lock.release()
	print("%s放下面条" % (name))
	kuaizi_lock.release()
	print("%s放下筷子" % (name))

if __name__ == "__main__":
	name_list1 = ["马具强","熊卫华"]
	name_list2 = ["黄熊大","黄将用"]
	for name in  name_list1:
		Thread(target=eat1,args=(name,)).start()
		
	for name in name_list2:
		Thread(target=eat2,args=(name,)).start()

互斥锁

从语法上来看,锁是可以互相嵌套的,但是不要使用。上一次锁,就对应解开一把锁,形成互斥锁。吃面条和拿筷子是同时的,上一次锁就够了,不要分别上锁。尽量不要形成锁的嵌套,容易死锁。

mylock = Lock()
def eat1(name):
	mylock.acquire()
	print("%s拿到面条" % (name))
	print("%s拿到筷子" % (name))
	
	print("开始吃")
	time.sleep(0.7)	

	print("%s放下筷子" % (name))	
	print("%s放下面条" % (name))
	mylock.release()
	
def eat2(name):
	mylock.acquire()
	print("%s拿到筷子" % (name))
	print("%s拿到面条" % (name))

	print("开始吃")
	time.sleep(0.7)	

	print("%s放下面条" % (name))	
	print("%s放下筷子" % (name))
	mylock.release()

if __name__ == "__main__":
	name_list1 = ["马具强","熊卫华"]
	name_list2 = ["黄熊大","黄将用"]
	for name in  name_list1:
		Thread(target=eat1,args=(name,)).start()
		
	for name in name_list2:
		Thread(target=eat2,args=(name,)).start()
09、threading线程模块

multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性。

这个是模块在较低级的模块 _thread 基础上建立较高级的线程接口,大多数情况我们使用threading就够用了。多线程的应用场景是进行多个任务处理时。由于线程是 *** 作系统直接支持的执行单元,我们可以通过建立多个线程来实现多个任务的处理,使它们同步进行(宏观看起来是这样的,实际上是各个线程交替工作)。

创建线程的两种方式:

方式一:

from threading import Thread
import time


def sayhi(name):
    time.sleep(2)
    print('%s say hello' % name)


if __name__ == '__main__':
    t = Thread(target=sayhi, args=('egon',))
    t.start()
    print('主线程')

方式二:

from threading import Thread
import time


class SayHello(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = SayHello('唤醒手腕')
    t.start()
    print('主线程')

有关threading的参数介绍

    threading.active_count() 返回当前存活着的Tread对象个数

    threading.current_thread() 返回当前正在运行的线程的Tread对象

    threading.enumerate() 返回一个列表,列表里面是还存活的Tread对象

    threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)创建线程,直接使用Tread类这是一种方法,另一种方法思新建一个类然后继承threading.Thread

    group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。

    target 是用于run()方法调用的可调用对象。默认是 None,表示不需要调用任何方法。

    name 是线程名称。默认情况下,由 “Thread-N” 格式构成一个唯一的名称,其中 N 是小的十进制数。

    args 是用于调用目标函数的参数元组。默认是 ()。

    kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。 如果不是 None,daemon 参数将显式地设置该线程是否为守护模式。 如果是None (默认值),线程将继承当前线程的守护模式属性。

    Thread类的start()方法用来开始一个线程。

    hread类的join(timeout=None)方法会让开启线程的线程(一般指主线程)等待,阻塞这个线程,直到这个线程运行结束才结束等待。timeout的参数值为浮点数,用于设置 *** 作超时的时间。

    threading.Lock 锁对象,可以通过它来创建锁被创建时为非锁定状态,原始锁有两种状态锁定和非锁定。

    Lock对象acquire(blocking=True, timeout=-1)方法,获得锁。

    当锁的状态为非锁定时, acquire() 将锁状态改为锁定并立即返回(即执行下面的程序)。

    当状态是锁定时, acquire() 将阻塞(将发起获得锁的线程挂起直到锁被释放获得锁),当其他线程调用 release() 将锁改为非锁定状态后(即锁被释放后), 被挂起线程的acquire() 将获得锁且重置其为锁定状态并返回(与1一致)。

    blocking 参数为bool值(默认True),可以阻塞或非阻塞地获得锁(即无法获得锁时是否阻塞线程)

    timeout 参数为浮点数(默认-1),当无法获得锁时,timeout为正决定阻塞的时间,为负数时为无限等待。blocking为False时timeout无作用(不阻塞当然涉及不到阻塞的时间)

    Lock对象release()方法,释放锁。

    当锁被锁定,将它重置为未锁定,并返回。如果其他线程正在等待这个锁解锁而被阻塞,只允许其中一个允许。

    在未锁定的锁调用时,会引发 RuntimeError 异常。

    Lock对象的locked()方法,用来判断是否获得了锁。

锁,一般用在两个线程同时使用一个公共变量的情况下。为了防止两个线程同时修改变量导致的混乱。

线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来。

join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高。

开启新的子进程必须在__main__进程下创建,否则会出现以下的报错内容。

RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.
import time
from threading import Thread
from multiprocessing import Process
# 开启线程不需要在main下面 执行代码直接书写就可以
# 但是我们还是习惯性的将启动命令写在main下面

def task(name):
    print("%s is running" % name)
    time.sleep(1)
    print("%s is over" % name)

# t = Thread(target=task, args=('Wrist',))
p = Process(target=task, args=('Wrist',))

p.start()
print("__main__ is running")

为什么开启子进程 一定要放在 if __name__ == '__main__' 下面

windows 平台有特殊要求Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() inside

平台有特殊要求Since窗口没有分叉,多处理模块启动一个新的Python进程并导入调用模块。如果在导入时调用进程(),那么这将引发无限连续的新进程(或者直到您的机器耗尽资源耗尽)。


探究import导入模块时候,会开辟线程,还是子进程?

# import_file.py
import os

print("son process is {}".format(os.getpid()))
print("son father process is {}".format(os.getppid()))

# main_test.py
import import_file
import os

print("main process is {}".format(os.getpid()))

运行结果如下:PID是一样的,说明没有开辟子进程

son process is 8752
son father process is 14080
main process is 8752
# import_file.py
import os
import threading

length = len(threading.enumerate())
print('当前运行的线程数为:%d' % length)


# main_test.py
import threading

import import_file
import os

length = len(threading.enumerate())
print('当前运行的线程数为:%d' % length)


if __name__ == '__main__':
    length = len(threading.enumerate())
    print('当前运行的线程数为:%d' % length)

运行结果如下:当前运行的线程数都是1,说明也没有开辟新的线程。

当前运行的线程数为:1
当前运行的线程数为:1
当前运行的线程数为:1
10、父线程与子线程介绍


在线程的生命周期中,从创建到执行以及最终终止,线程通常处于四种状态之一:开始态、可调度状态、阻塞态和终止态。

父线程和子线程

当一个新的进程或程序开始运行时,它将以一个线程开始,这个线程被称为主线程。然后主线程可以启动或生成其他线程,这被称为子线程,它们同样是进程的一部分,但独立执行其他任务。如果需要,这些线程还可以生成自己的子线程,当每个线程完成执行时,将通知它们的父线程,最后主线程终止整个任务。父线程和子线程关系如下图所示:

线程的四种基本状态

不同的编程语言可能会使用不同的名称,并且还有一些额外的名称,但通常在线程的生命周期中,从创建到执行以及最终终止,线程通常处于四种状态:开始态、可调度态、阻塞态和终止态。

开始态:主线程需要产生或创建另一个线程来辅助完成整个任务,子线程将以新状态开始,Python语言要求在创建线程后显式启动它。可调度态:处于可运行状态,这意味着 *** 作系统可以安排资源调度执行。通过上下文与其他线程交换,以便在处理器上运行。阻塞态:当线程需要等待事件发生时,如外部输入或计时器,或者调用子线程的join()方法进入阻塞状态,当进入阻塞态时,线程不会使用任何CPU资源。终止态:线程在完成执行或异常中止时进入终止状态。

线程的状态转换如下图所示:

在Python语言中Python线程可以从这里开始与主线程对GIL的竞争,在t_bootstrap中,申请完了GIL,也就是说子线程也就获得了GIL,使其始终保存着活动线程的状态对象。

我们知道,在 *** 作系统从进程A切换到进程B时,首先会保存进程A的上下文环境,再进行切换;当从进程B切换回进程A时,又会恢复进程A的上下文环境,这样就保证了进程A始终是在属于自己的上下文环境中运行。

这里的线程状态对象就等同于进程的上下文,Python同样会有一套存储、恢复线程状态对象的机制。同时,在Python内部,维护着一个全局变量:PyThreadState * _PyThread- State_Current

当前活动线程所对应的线程状态对象就保存在这个变量里,当Python调度线程时,会将被激活的线程所对应的线程状态对象赋给_PyThreadState_Current,使其始终保存着活动线程的状态对象。

这就引出了这样的一个问题:Python如何在调度进程时,获得被激活线程对应的状态对象?Python内部会通过一个单向链表来管理所有的Python线程的状态对象,当需要寻找一个线程对应的状态对象时。

主线程结束,子线程是否进行执行?

import threading
import time


def t_task():
    print("son thread start running!")
    time.sleep(3)
    print("son thread end")


if __name__ == "__main__":
    t = threading.Thread(target=t_task)
    t.start()
    print("main_thread end")

可见父线程结束后,子线程仍在运行,此时结束进程,子线程才会被终止

主线程结束后进程不等待守护线程完成,立即结束

当设置一个线程为守护线程时,此线程所属进程不会等待此线程运行结束,进程将立即结束。

11、Process和Tread区别

解释下运行的周期存活问题:

Process进程:

Process非进程池:父进程等待所有非守护进程结束后,然后再结束。

Process进程池Pool:父进程结束,所有非守护进程也会自动结束,更Pool底层的实现有关。

Thread线程:

Thread非线程池:主线程等待所有非守护线程结束后,然后再结束。

进程Process和线程threading区别

1.一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程(线程是计算机的最小单位);

2.资源分配给进程,同一进程的所有线程共享该进程的所有资源,进程与进程之间资源相互独立,互不影响(类似深拷贝);

3.多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程,多进程模式的缺点是在Windows下创建进程开销巨大。另外, *** 作系统能同时运行的进程数也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行, *** 作系统连调度都会成问题(进程的创建比线程的创建更加占用计算机资源);

4.多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存;

5.由于GIL锁的缘故,python 中线程实际上是并发运行(即便有多个cpu,线程会在其中一个cpu来回切换,只占用一个cpu资源),而进程才是真正的并行(同时执行多个任务,占用多个cpu资源)

12、全局解释器锁 (GIL)

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

为什么会有GIL?

由于物理上得限制,各CPU厂商在核心频率上的比赛已经被多核所取代。为了更有效的利用多核处理器的性能,就出现了多线程的编程方式,而随之带来的就是线程间数据一致性和状态同步的困难。即使在CPU内部的Cache也不例外,为了有效解决多份缓存之间的数据同步时各厂商花费了不少心思,也不可避免的带来了一定的性能损失。

Python当然也逃不开,为了利用多核,Python开始支持多线程。而解决多线程之间数据完整性和状态同步的最简单方法自然就是加锁。 于是有了GIL这把超级大锁,而当越来越多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步 *** 作)。

慢慢的这种实现方式被发现是蛋疼且低效的。但当大家试图去拆分和去除GIL的时候,发现大量库代码开发者已经重度依赖GIL而非常难以去除了。有多难?做个类比,像MySQL这样的“小项目”为了把Buffer Pool Mutex这把大锁拆分成各个小锁也花了从5.5到5.6再到5.7多个大版为期近5年的时间,本且仍在继续。MySQL这个背后有公司支持且有固定开发团队的产品走的如此艰难,那又更何况Python这样核心开发和代码贡献者高度社区化的团队呢?

所以简单的说GIL的存在更多的是历史原因。如果推到重来,多线程的问题依然还是要面对,但是至少会比目前GIL这种方式会更优雅。

GIL的影响

从上文的介绍和官方的定义来看,GIL无疑就是一把全局排他锁。毫无疑问全局锁的存在会对多线程的效率有不小影响。甚至就几乎等于Python是个单线程的程序。

那么读者就会说了,全局锁只要释放的勤快效率也不会差啊。只要在进行耗时的IO *** 作的时候,能释放GIL,这样也还是可以提升运行效率的嘛。或者说再差也不会比单线程的效率差吧。理论上是这样,而实际上呢?Python比你想的更糟。

下面我们就对比下Python在多线程和单线程下得效率对比。测试方法很简单,一个循环1亿次的计数器函数。一个通过单线程执行两次,一个多线程执行。最后比较执行总时间。测试环境为双核的Mac pro。

注:为了减少线程库本身性能损耗对测试结果带来的影响,这里单线程的代码同样使用了线程。只是顺序的执行两次,模拟单线程。

顺序执行的单线程(single_thread.py)

#! /usr/bin/python

from threading import Thread
import time

def my_counter():
    i = 0
    for _ in range(100000000):
        i = i + 1
    return True


def main():
    start_time = time.time()
    for tid in range(2):
        t = Thread(target=my_counter)
        t.start()
        t.join()
    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))


if __name__ == '__main__':
    main()

同时执行的两个并发线程(multi_thread.py)

#! /usr/bin/python

from threading import Thread
import time

def my_counter():
    i = 0
    for _ in range(100000000):
        i = i + 1
    return True


def main():
    thread_array = {}
    start_time = time.time()

    for tid in range(2):
        t = Thread(target=my_counter)
        t.start()
        thread_array[tid] = t

    for i in range(2):
        thread_array[i].join()
    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))


if __name__ == "__main__":
    main()

可以看到python在多线程的情况下居然比单线程整整慢了45%。按照之前的分析,即使是有GIL全局锁的存在,串行化的多线程也应该和单线程有一样的效率才对。那么怎么会有这么糟糕的结果呢?

在 intel i7 的 4CPU-8核 处理器的 win10系统 测试,多线程的情况比单线程整整快了45%

计算密集型:多进程效率高

from multiprocessing import Process
from threading import Thread
import os, time


def work():
    res = 0
    for i in range(100000000):
        res *= i


if __name__ == '__main__':
    l = []
    print(os.cpu_count())  # 本机为4核
    start = time.time()
    for i in range(4):
        p = Process(target=work)
        # 耗时 15.626161098480225
        p = Thread(target=work)
        # 耗时 40.876734256744385
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop = time.time()
    print('run time is %s' % (stop - start))

IO密集型:多线程效率高

from multiprocessing import Process
from threading import Thread
import threading
import os, time


def work():
    time.sleep(2)
    print('haha ~~~')


if __name__ == '__main__':
    l = []
    print(os.cpu_count())  # 本机为4核
    start = time.time()
    for i in range(4):
        p = Process(target=work)
        # 耗时 3.9347474575042725多, 大部分时间耗费在创建进程上
        p = Thread(target=work)
        # 耗时 2.004810333251953
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop = time.time()
    print('run time is %s' % (stop - start))

测试实验分析:

我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:
方案一:开启四个进程
方案二:一个进程下,开启四个线程

单核情况下,分析结果:

如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜

多核情况下,分析结果:

如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜
如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜

得出结论:

现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换)
但是,对于IO密集型的任务效率还是有显著提升的。

那么CPython实现中的GIL又是什么呢?GIL全称Global Interpreter Lock为了避免误导,我们还是来看一下官方给出的解释:

Python的解释器的版本:CPython、JPython、pypyPython

但是普遍使用的都是cPython解释器

在CPython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多个线程的同时执行同一个进程下的多个线程无法利用多核优势。

疑问:python的多线程是不是一点用都没有???无法利用多核优势!

数据结构和GIL

Queue

    标准库queue模块,提供FIFO的Queue、LIFO的队列、优先队列。Queue类是线程安全的,适用于多线程间安全的交换数据。内部使用了Lock和Condition。在自定义容器类中,如果不加锁,是不可能获得准确的大小的,因为你刚读取到了一个大小,还没有取走数据,就有可能被其他线程改 了。Queue类的size虽然加了锁,但是,依然不能保证立即get、put就能成功,因为读取大小和get、put方法是分开的。
import queue

q = queue.Queue(8)

if q.qsize() == 7:
    q.put()  #上下两句可能会被打断

if q.qsize() == 1:
    q.get() #未必会成功,同样上下两句会被打断

GIL全局解释器锁

CPython 在解释器进程级别有一把锁,叫做GIL,即全局解释器锁。

GIL 保证CPython进程中,只有一个线程执行字节码。甚至是在多核CPU的情况下,也只允许同时只能有一个CPU 上运行该进程的一个线程。

CPython中

IO密集型,某个线程阻塞,就会调度其他就绪线程;

CPU密集型,当前线程可能会连续的获得GIL,导致其它线程几乎无法使用CPU。

在CPython中由于有GIL存在,IO密集型,使用多线程较为合算;CPU密集型,使用多进程,要绕开GIL。

新版CPython正在努力优化GIL的问题,但不是移除。

如果在意多线程的效率问题,请绕行,选择其它语言erlang、Go等。

GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图

Python中绝大多数内置数据结构的读、写 *** 作都是原子 *** 作。由于GIL的存在,Python的内置数据类型在多线程编程的时候就变成了安全的了,但是实际上它们本身不是线程安全类型

    GIL不是python的特点而是cPython解释器的特点GIL是保证解释器级别的数据的安全GIL会导致同一个进程下的多个线程的无法同时执行针对不同的数据还是需要加不同的锁处理

保留GIL的原因:

Guido坚持的简单哲学,对于初学者门槛低,不需要高深的系统知识也能安全、简单的使用Python。

而且移除GIL,会降低CPython单线程的执行效率。

Python的原始解释器CPython中存在着GIL,因此再解释执行Python代码时,会产生互斥锁来限制线程对共享资源的访问,知道解释器遇到I / O *** 作或者 *** 作次数达到一定数目。

因此,由于GIL的存在,在进行多线程 *** 作时,不能调用多个CPU内核,所有在CPU密集型 *** 作时更倾向于使用多进程。

对于IO密集型 *** 作,使用多线程则可以提高效率,例如Python爬虫的开发。

测试下面两个计算密集型代码:运行时长差不多

单线程计算:

import datetime

def calc():
    sum = 0
    for _ in range(1000000000): #10亿
        sum += 1

start = datetime.datetime.now()
for i in range(4):
    calc()
dalta = (datetime.datetime.now() - start).total_seconds()
print("耗时:{}".format(dalta))

多线程计算:

import datetime
import threading

def calc():
    sum = 0
    for _ in range(1000000000): #10亿
        sum += 1

start = datetime.datetime.now()
for i in range(4):
    threading.Thread(target=calc).start()

for i in threading.enumerate():
    if i.name != "MainThread":
        i.join()
dalta = (datetime.datetime.now() - start).total_seconds()
print("耗时:{}".format(dalta))

注意,不要在代码中出现print等访问IO的语句。访问IO,线程阻塞,会释放GIL锁,其他线程被调度。

程序1是单线程程序,所有calc()依次执行,根本就不是并发。在主线程内,函数串行执行。

程序2是多线程程序,calc()执行在不同的线程中,但是由于GIL的存在,线程的执行变成了假并发。但是这些线程 可以被调度到不同的CPU核心上执行,只不过GIL让同一时间该进程只有一个线程被执行。

从两段程序测试的结果来看,CPython中多线程根本没有任何优势,和一个线程执行时间相当。因为GIL的存在,尤其是像上面的计算密集型程序,和单线程串行效果相当。这样,实际上就没有用上CPU多核心的优势。

13、Windows / Linux 进程

查看进程、杀进程(tasklist、taskkill)

1)PID:进程的唯一标识。如果一个进程含有多个线程,所有线程调用 getpid 函数会返回相同的值。

2)PGID:进程组 ID。每个进程都会有进程组 ID,表示该进程所属的进程组。默认情况下新创建的进程会继承父进程的进程组 ID。

3)SID:会话 ID。每个进程也都有会话 ID。默认情况下,新创建的进程会继承父进程的会话 ID。

4)PPID:是程序的父进程号。

会话(session)是一个或多个进程组的集合

通常是由shell的管道线将几个进程变成一组的。

查看进程列表

tasklist | findstr “java”

通过进程号终止进程

taskkill /F /PID 2328

通过映像名称终止进程(支持通配符*模糊处理)

taskkill /F /IM powershell.exe
 
taskkill /F /IM power*.exe

在Python中查看进程、杀进程

import os
import time
 
# 2种方式打开应用

# (1)阻塞方式打开应用
os.system('"C:\Program Files (x86)\bd\infoflow\infoflow.exe"')
# (2)非阻塞方式打开应用
os.startfile('"C:\Program Files (x86)\bd\infoflow\infoflow.exe"')
 
# 等待10s
time.sleep(10)
 
# 关闭应用,2个进程

# 关闭进程1
os.system('"taskkill /F /IM infoflow.exe"')
# 关闭进程2
os.system('"taskkill /F /IM hiwebhelper.exe"')

Linux 查看进程之PS命令

要对进程进行检测和控制,首先必须要了解当前进程的情况,也就是需要查看当前进程运行状态。Linux 系统中我们可以使用 ps 命令查看进程。

ps(process status) 命令是 Linux 下最常用的进程查看工具,使用该命令可以确定哪些进程正在运行和运行的状态、进程是否结束、进程有没有僵尸、哪些进程占用了过多的资源等等。

注意:ps 命令工具显示的是进程的瞬间状态,并不是动态连续显示,如果想对进程状态进行实时监控应该用 top 命令。

使用标准语法 (Unix 风格) 查看各个进程

-e :显示系统内所有进程的信息。与 -A 选项功能相同

-f :使用完整 (full) 的格式显示进程信息,如果只有 ps -e 则输出进程信息的格式和只使用 ps 一样(都只有PID TTY TIME CMD这几项,但是输出信息的内容和ps的不一样)

各字段含义如下:

  UID:启动该进程的用户的 ID 号

  PPID:代表该进程的父进程的 ID 号

  C:进程的 CPU 处理器利用率

  STIME/START:表示进程的启动时间
14、互斥锁 / 事件 / 信号量

互斥锁 Lock

在之前我们了解到python多线程中的全局变量改变问题在线程中改变一个全局变量,多线程开发的时候共享全局变量会带来资源竞争效果。也就是数据不安全。所以为了掌控进程合理对变量的改变,我们用线程锁来控制。

互斥锁:每个线程几乎同时修改一个共享数据的时候,需要进行同步控制线程同步能够保证多个线程安全的访问竞争资源(全局内容),简单的同步机制就是使用互斥锁。

某个线程要更改共享数据时,先将其锁定,此时资源的状态为锁定状态,其他线程就能更改,直到该线程将资源状态改为非锁定状态,也就是释放资源,其他的线程才能再次锁定资源。互斥锁保证了每一次只有一个线程进入写入 *** 作。从而保证了多线程下数据的安全性。

死锁

死锁:在多个线程共享资源的时候,如果两个线程分别占有一部分资源,并且同时等待对方的资源,就会造成死锁现象。如果锁之间相互嵌套,就有可能出现死锁。因此尽量不要出现锁之间的嵌套。

import time
from threading import Thread
value = 100
def task():
    global value
    temp = value
    time.sleep(0.1) # 遇到IO,GIL自动释放
    value = temp - 1

if __name__  == "__main__":
    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t_list.append(t)

        # 100个线程启动,自动去抢GIL锁

    for i in range(100):
        t_list[i].join()

    print(value)

如果task函数中加time.sleep(0.1),那么最终的结果是99,如果不加sleep,那么最终的结果是0,为什么会产生这种所谓的情况呢?

如果不加sleep,当开启100个线程以后,这100线程会自动去争抢全局解释器锁(Global interpreter lock),当 Thread - A 线程,执行完,Thread - B 抢到 GIL 开启执行,但是此时的Temp 已经是 99了。

如果加sleep,当开启100个线程以后,这100线程会自动去争抢全局解释器锁(Global interpreter lock),当 Thread - A 线程,执行到 sleep(遇到IO,自动放弃全局解释器锁),Thread - B 抢到 GIL 开启执行,但是此时的Temp 仍然是100。


可见,在多线程中 *** 作全局变量是危险的,不可控制的。GIL只是用了保护解释器级别的数据,自定义的全局变量需要自己加锁进行保护。

解决方案:调用Lock设置互斥锁。

import time
from threading import Thread, Lock
value = 100
mutex = Lock()
def task():
    global value
    mutex.acquire()
    temp = value
    time.sleep(0.01) # 遇到IO,GIL自动释放
    value = temp - 1
    """
    mutex.release()
        with mutex:
        temp = value
        time.sleep(0.01) # 遇到IO,GIL自动释放
        value = temp - 1
    """

if __name__  == "__main__":
    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t_list.append(t)

        # 100个线程启动,自动去抢GIL锁

    for i in range(100):
        t_list[i].join()

    print(value)

Python线程之threading.Event

python使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,Event默认内置了一个标志,初始值为False。一旦该线程通过wait()方法进入等待状态,直到另一个线程调用该Event的set()方法将内置标志设置为True时,该Event会通知所有等待状态的线程恢复运行。

python线程的事件Event用于主线程控制其他线程的执行,事件主要提供了三个方法wait、clear、set。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

clear:将“Flag”设置为Falseset:将“Flag”设置为True

import threading
import time


class MyThread(threading.Thread):
    def __init__(self, signal):
        threading.Thread.__init__(self)
        # 初始化
        self.signal = signal

    def run(self):
        print("%s:I am %s,I will sleep ..." % (time.ctime(), self.name))

        # 进入等待状态
        self.signal.wait()
        print("%s:I am %s, I awake..." % (time.ctime(), self.name))


if __name__ == "__main__":
    # 初始 为 False
    event_obj = threading.Event()
    for t in range(0, 3):
        thread = MyThread(event_obj)
        thread.start()

    print("%s:main thread sleep 3 seconds... " % time.ctime())

    time.sleep(3)
    # 唤醒含有event_obj, 处于等待状态的线程
    event_obj.set()

运行展示如下所示:

Mon Jan 24 13:19:56 2022:I am Thread-1,I will sleep ...
Mon Jan 24 13:19:56 2022:I am Thread-2,I will sleep ...
Mon Jan 24 13:19:56 2022:I am Thread-3,I will sleep ...
Mon Jan 24 13:19:56 2022:main thread sleep 3 seconds... 
Mon Jan 24 13:19:59 2022:I am Thread-2, I awake...
Mon Jan 24 13:19:59 2022:I am Thread-1, I awake...
Mon Jan 24 13:19:59 2022:I am Thread-3, I awake...

Python Threading Semaphore信号量(子线程的数量)

原因:主要用在保护有限的资源。

假设当前数据库支持最大连接数为3,将信号量初始值设为3,那么同时最大可以有三个线程连接数据库,其他线程若再想连接数据库,则只有等待,直到某一个线程释放数据库连接。

import logging
import threading
import time
import random
from threading import Semaphore

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [*] %(message)s"
)
threads = []
lock_sm = Semaphore(3)


class connectdb(threading.Thread):
    def run(self):
        while True:
            lock_sm.acquire()
            logging.info(f"{self.name} connecting to db... ")
            logging.info(f"{self.name} released db...")
            time.sleep(2)
            lock_sm.release()


if __name__ == '__main__':
    for i in range(5):
        threads.append(connectdb())

    # 从五个线程取出三个
    random_threads = random.sample(threads, 3)

    # 阻塞启动线程 ---------------------------
    for t in random_threads:
        t.start()
    for t in random_threads:
        t.join()

注意:输出结果是每两秒打印的,跟数据库连接 *** 作差的远呢

15、Pool线程池原理介绍

通过多线程改善传输层TCP协议套接字的会话案例:

import socket
import threading


def communication(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0:
                break
            print(threading.enumerate())
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()

def server():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    server.bind(('127.0.0.1', 8080))
    server.listen(5)
    while True:
        conn, cli_address = server.accept()
        t_conn = threading.Thread(target=communication, args=(conn, ))
        t_conn.start()

if __name__ == "__main__":
    server()

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5711193.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存