Python多进程与多线程编程

Python多进程与多线程编程,第1张

概述Python的多进程编程与multiprocess模块 python的多进程编程主要依靠multiprocess模块。我们先对比两段代码,看看多进程编程的优势。我们模拟了一个非常耗时的任务,计算8的2 Python的多进程编程与multiprocess模块

python的多进程编程主要依靠multiprocess模块。我们先对比两段代码,看看多进程编程的优势。我们模拟了一个非常耗时的任务,计算8的20次方,为了使这个任务显得更耗时,我们还让它sleep 2秒。第一段代码是单进程计算(代码如下所示),我们按顺序执行代码,重复计算2次,并打印出总共耗时。

import time osdef long_time_task():    print('当前进程: {}'.format(os.getpID()))    time.sleep(2)    "结果: {}".format(8 ** 20))if __name__ == __main__":    当前母进程: {}.format(os.getpID()))    start = time.time()    for i in range(2):        long_time_task()    end =用时{}秒".format((end-start)))

输出结果如下,总共耗时4秒,至始至终只有一个进程14236。看来电脑计算8的20次方基本不费时。

当前母进程: 14236当前进程: 14236结果: 1152921504606846976用时4.01080060005188秒

第2段代码是多进程计算代码。我们利用multiprocess模块的Process方法创建了两个新的进程p1和p2来进行并行计算。Process方法接收两个参数,第一个是target,一般指向函数名,第二个时args,需要向函数传递的参数。对于创建的新进程,调用start()方法即可让其开始。我们可以使用os.getpID()打印出当前进程的名字。

from multiprocessing  Process os time long_time_task(i):    子进程: {} - 任务{}.format(os.getpID(),i))    time.sleep(2))__name__== time.time()    p1 = Process(target=long_time_task,args=(1,))    p2 = Process(target=long_time_task,args=(2等待所有子进程完成。)    p1.start()    p2.start()    p1.join()    p2.join()    end =总共用时{}秒".format((end - start)))

输出结果如下所示,耗时变为2秒,时间减了一半,可见并发执行的时间明显比顺序执行要快很多。你还可以看到尽管我们只创建了两个进程,可实际运行中却包含里1个母进程2个子进程。之所以我们使用join()方法就是为了让母进程阻塞,等待子进程都完成后才打印出总共耗时,否则输出时间只是母进程执行的时间。

当前母进程: 6920等待所有子进程完成。子进程: 17020 - 任务1子进程: 5904 - 任务2结果: 1152921504606846976总共用时2.131091356277466秒

知识点:

新创建的进程与进程的切换都是要耗资源的,所以平时工作中进程数不能开太大。同时可以运行的进程数一般受制于cpu的核数。除了使用Process方法,我们还可以使用Pool类创建多进程。
利用multiprocess模块的Pool类创建多进程

很多时候系统都需要创建多个进程以提高cpu的利用率,当数量较少时,可以手动生成一个个Process实例。当进程数量很多时,或许可以利用循环,但是这需要程序员手动管理系统中并发进程的数量,有时会很麻烦。这时进程池Pool就可以发挥其功效了。可以通过传递参数限制并发进程的数量,默认值为cpu的核数。

Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果进程池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

下面介绍一下multiprocessing 模块下的Pool类的几个方法:

1.apply_async

函数原型:apply_async(func[,args=()[,kwds={}[,callback=None]]])

其作用是向进程池提交需要执行的函数及参数, 各个进程采用非阻塞(异步)的调用方式,即每个子进程只管运行自己的,不管其它进程是否已经完成。这是默认方式。

2.map()

函数原型:map(func,1)"> iterable[,chunksize=None])

Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回。 注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程

3.map_async()

函数原型:map_async(func,iterable[,chunksize[,callback]])
与map用法一致,但是它是非阻塞的。其有关事项见apply_async。

4.close()

关闭进程池(pool),使其不在接受新的任务。

5. terminate()

结束工作进程,不在处理未处理的任务。

6.join()

主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。

下例是一个简单的multiprocessing.Pool类的实例。因为小编我的cpu是4核的,一次最多可以同时运行4个进程,所以我开启了一个容量为4的进程池。4个进程需要计算5次,你可以想象4个进程并行4次计算任务后,还剩一次计算任务(任务4)没有完成,系统会等待4个进程完成后重新安排一个进程来计算。

 Pool,cpu_countcpu内核数:{}.format(cpu_count()))     time.time()    p = Pool(4in range(5):        p.apply_async(long_time_task,args=(i,1)">)    p.close()    p.join()    end =".format((end - start)))

知识点:

对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close()或terminate()方法,让其不再接受新的Process了。

输出结果如下所示,5个任务(每个任务大约耗时2秒)使用多进程并行计算只需4.37秒,耗时减少了60%,可见并行计算优势还是很明显的。

cpu内核数:4当前母进程: 2556等待所有子进程完成。子进程: 16480 - 任务0子进程: 15216 - 任务1子进程: 15764 - 任务2子进程: 10176 - 任务3结果: 1152921504606846976子进程: 15216 - 任务4结果: 1152921504606846976总共用时4.377134561538696秒

相信大家都知道python解释器中存在GIL(全局解释器锁),它的作用就是保证同一时刻只有一个线程可以执行代码。由于GIL的存在,很多人认为python中的多线程其实并不是真正的多线程,如果想要充分地使用多核cpu的资源,在python中大部分情况需要使用多进程。然而这并意味着python多线程编程没有意义哦,请继续阅读下文。

多进程间的数据共享与通信

通常,进程之间是相互独立的,每个进程都有独立的内存。通过共享内存(nmap模块),进程之间可以共享对象,使多个进程可以访问同一个变量(地址相同,变量名可能不同)。多进程共享资源必然会导致进程间相互竞争,所以应该尽最大可能防止使用共享状态。还有一种方式就是使用队列queue来实现不同进程间的通信或数据共享,这一点和多线程编程类似。

下例这段代码中中创建了2个独立进程,一个负责写(pw),1)">一个负责读(pr),实现了共享一个队列queue

 Process,Queue os,time,random# 写数据进程执行的代码: write(q):    Process to write: {}.format(os.getpID()))    for value in [A',BC]:        Put %s to queue...' % value)        q.put(value)        time.sleep(random.random()) 读数据进程执行的代码: read(q):    Process to read:{}while True:        value = q.get(True)        Get %s from queue. value) 父进程创建Queue,并传给各个子进程:    q = Queue()    pw = Process(target=write,args=(q,))    pr = Process(target=read,1)"> 启动子进程pw,写入:    pw.start()     启动子进程pr,读取:    pr.start()     等待pw结束:    pw.join()     pr进程里是死循环,无法等待其结束,只能强行终止:    pr.terminate()

输出结果如下所示:

Process to write: 3036Put A to queue...Process to read:9408Get A from queue.Put B to queue...Get B  queue.Put C to queue...Get C from queue.
Python的多线程编程与threading模块

python 3中的多进程编程主要依靠threading模块。创建新线程与创建新进程的方法非常类似。threading.Thread方法可以接收两个参数,1)">第一个是target,一般指向函数名,第二个时args,需要向函数传递的参数。对于创建的新线程,调用start()方法即可让其开始。我们还可以使用current_thread().name打印出当前线程的名字。 下例中我们使用多线程技术重构之前的计算代码。

 threading当前子线程: {} - 任务{}.format(threading.current_thread().name,1)">:    start =这是主线程:{}.format(threading.current_thread().name))    t1 = threading.Thread(target=long_time_task,args=(1".format((end - start)))

下面是输出结果。为什么总耗时居然是0秒? 我们可以明显看到主线程和子线程其实是独立运行的,主线程根本没有等子线程完成,而是自己结束后就打印了消耗时间。主线程结束后,子线程仍在独立运行,这显然不是我们想要的。

这是主线程:MainThread当前子线程: Thread-1 - 任务1当前子线程: Thread-2 - 任务2总共用时0.0017192363739013672秒结果: 1152921504606846976结果: 1152921504606846976

如果要实现主线程和子线程的同步,我们必需使用join方法(代码如下所示)。

当前子线程: {} 任务{}.format(threading.current_thread().name))    thread_List = []    in range(1,3):        t = threading.Thread(target=long_time_task,))        thread_List.append(t)    for t in thread_List:        t.start()     thread_List:        t.join()    end =".format((end - start)))

修改代码后的输出如下所示。这时你可以看到主线程在等子线程完成后才答应出总消耗时间(2秒),比正常顺序执行代码(4秒)还是节省了不少时间。

这是主线程:MainThread当前子线程: Thread - 1 任务1当前子线程: Thread - 2总共用时2.0166890621185303秒

当我们设置多线程时,主线程会创建多个子线程,在python中,默认情况下主线程和子线程独立运行互不干涉。如果希望让主线程等待子线程实现线程的同步,我们需要使用join()方法。如果我们希望一个主线程结束时不再执行子线程,我们应该怎么办呢? 我们可以使用t.setDaemon(True),代码如下所示。

当子线程: {}.format(threading.current_thread().name))    time.sleep(2.format(threading.current_thread().name))    ())        t.setDaemon(True)        t.start()    end =".format((end - start)))
通过继承Thread类重写run方法创建新进程

除了使用Thread()方法创建新的线程外,我们还可以通过继承Thread类重写run方法创建新的线程,这种方法更灵活。下例中我们自定义的类为MyThread,随后我们通过该类的实例化创建了2个子线程。

-*- enCoding:utf-8 -*- long_time_task(i):    time.sleep(2return 8**20class MyThread(threading.Thread):    def __init__(self,func,args,name=''__init__(self)        self.func = func        self.args = args        self.name = name        self.result = None     run(self):        开始子进程{}.format(self.name))        self.result = self.func(self.args[0],)        .format(self.result))        结束子进程{}.format(self.name)) time.time()    threads =):        t = MyThread(long_time_task,(i,),str(i))        threads.append(t)     threads:        t.start()     threads:        t.join()    end =".format((end - start)))

输出结果如下所示:

开始子进程1开始子进程2结果: 1152921504606846976结束子进程1结束子进程2总共用时2.005445718765259秒
不同线程间的数据共享

一个进程所含的不同线程间共享内存,这就意味着任何一个变量都可以被任何一个线程修改,因此线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。如果不同线程间有共享的变量,其中一个方法就是在修改前给其上一把锁lock,确保一次只有一个线程能修改它。threading.lock()方法可以轻易实现对一个共享变量的锁定,修改完后release供其它线程使用。比如下例中账户余额balance是一个共享变量,使用lock可以使其不被改乱。

 -*- Coding: utf-8 -* threading Account:    (self):        self.balance = 0     add(self,lock):         获得锁        lock.acquire()        in range(0,100000):            self.balance += 1         释放锁        lock.release()     delete(self,1)">):            self.balance -= 1                    lock.release():    account = Account()    lock = threading.Lock()     创建线程   thread_add = threading.Thread(target=account.add,args=(lock,1)">Add)    thread_delete = threading.Thread(target=account.delete,1)">Delete)     启动线程   thread_add.start()    thread_delete.start()     等待线程结束   thread_add.join()    thread_delete.join()    The final balance is: {}'.format(account.balance))
使用queue队列通信-经典的生产者和消费者模型

下例中创建了两个线程,一个负责生成,一个负责消费,所生成的产品存放在queue里,实现了不同线程间沟通。

from queue  Queue random,threading,time 生产者类 Producer(threading.Thread):    (self,name,queue):        threading.Thread.name)        self.queue = queue    ):            {} is producing {} to the queue!.format(self.getname(),i))            self.queue.put(i)            time.sleep(random.randrange(10) / 5)        %s finished!" % self.getname()) 消费者类 Consumer(threading.Thread):    ):            val = self.queue.get()            {} is consuming {} in the queue.))         main():    queue = Queue()    producer = Producer(ProducerConsumerAll threads finished!):    main()

队列queue的put方法可以将一个对象obj放入队列中。如果队列已满,此方法将阻塞至队列有空间可用为止。queue的get方法一次返回队列中的一个成员。如果队列为空,此方法将阻塞至队列中有成员可用为止。queue同时还自带emtpy(),full()等方法来判断一个队列是否为空或已满,但是这些方法并不可靠,因为多线程和多进程,在返回结果和使用结果之间,队列中可能添加/删除了成员。

 

总结

以上是内存溢出为你收集整理的Python多进程与多线程编程全部内容,希望文章能够帮你解决Python多进程与多线程编程所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1189244.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-03
下一篇 2022-06-03

发表评论

登录后才能评论

评论列表(0条)

保存