python多进程,进程池,数据共享,进程通信,分布式进程

python多进程,进程池,数据共享,进程通信,分布式进程,第1张

概述一、 *** 作系统中相关进程的知识 ??Unix/Linux *** 作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为 *** 作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。 ??子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每 一、 *** 作系统中相关进程的知识

??Unix/linux *** 作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为 *** 作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。 ??子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppID()就可以拿到父进程的ID。 ??Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程。

??示例如下

import ospID=os.fork()if pID==0:    print('I am child process %s my parents is %s'%(os.getpID(),os.getppID()))else:    print('I (%s) just created a child process (%s).'%(os.getpID(),pID))

??输出如下

I (64225) just created a child process (64226).I am child process 64226 my parents is 64225
二、跨平台模块multiprocessing

multiprocessing模块提供了一个Process类来代表一个进程对象。
??示例1

from multiprocessing import Processimport os# 子进程要执行的代码def run_proc(name):    print('Run child process %s (%s)...' % (name,os.getpID()))if __name__=='__main__':    print('Parent process %s.' % os.getppID())    p = Process(target=run_proc,args=('test',))    print('Child process will start.')    p.start()    p.join()    print('Child process end.')#join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

??示例2

from multiprocessing import Processimport timeimport osclass P(Process):    def run(self):        print('Run child process %s (%s)...'%(self.name,os.getpID()))  # 默认函数对象有name方法 ,结果为:P-1        time.sleep(3)        print('%s is done' % self.name)if __name__ == '__main__':    print('Parent process %s.' % os.getppID())    p=P()    p.start()    p.join()
三、进程数据隔离

多个进程间的数据是隔离的,也就是说多个进程修改全局变量互不影响
??验证示例

from multiprocessing import Processimport timex=100def task():    global x    print('子进程开启,当前x的值为%d'%x)    time.sleep(3)    x=10    print('子进程结束,当前x的值为%d'%x)if __name__ == '__main__':    print('当前为父进程,准备开启子进程,x的值为%d' % x)    p1=Process(target=task)    p1.start()    p1.join()    print('当前为父进程,准备结束父进程,x的值为%d' % x)

??输出

当前为父进程,准备开启子进程,x的值为100子进程开启,当前x的值为100子进程结束,当前x的值为10当前为父进程,准备结束父进程,x的值为100

==注意:有些情况是需要加锁的情况,如文件读写问题==

四、多进程并行执行

??示例如下

import timefrom multiprocessing import Processdef task(name,n):    print('%s is running'%name)    time.sleep(n)    print('%s is done'%name)if __name__ == '__main__':    p1=Process(target=task,args=("进程1",1)) #用时1s    p2=Process(target=task,args=("进程2",2)) #用时1s    p3=Process(target=task,args=("进程3",3)) #用时1s        start_time=time.time()    p1.start()    p2.start()    p3.start()    # 当第一秒在运行p1时,其实p2、p3也已经在运行,当1s后到p2时只需要再运行1s就到p3了,到p3也是一样。    p1.join()    p2.join()    p3.join()    stop_time=time.time()         print(stop_time-start_time) #3.2848567962646484
五、进程池 1、线性执行( pool.apply() )
from multiprocessing import Pool  # 导入进程池模块poolimport time,osdef foo(i):    time.sleep(2)    print("in process",os.getpID())  # 打印进程号if __name__ == "__main__":    pool = Pool(processes=5)   # 设置允许进程池同时放入5个进程    for i in range(10):        pool.apply(func=foo,args=(i,))   # 同步执行挂起进程    print('end')    pool.close() # 关闭进程池,不再接受新进程    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释掉,那么程序直接关闭。
2、并发执行( pool.apply_async() )
from multiprocessing import Pool  # 导入进程池模块poolimport time,os.getpID())  # 打印进程号if __name__ == "__main__":    pool = Pool(processes=5)   # 设置允许进程池同时放入5个进程,并且将这5个进程交给cpu去运行    for i in range(10):        pool.apply_async(func=foo,))   # 采用异步方式执行foo函数    print('end')    pool.close()    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释掉,那么程序直接关闭。
3、设置回调
from multiprocessing import Process,Poolimport time,os.getpID())  # 打印子进程的进程号def bar(arg):#注意arg参数是必须要有的    print('-->exec done:',arg,os.getpID())   # 打印进程号 if __name__ == "__main__":    pool = Pool(processes=2)    print("主进程",os.getpID())   # 主进程的进程号    for i in range(3):        pool.apply_async(func=foo,),callback=bar)   # 执行回调函数callback=bar    print('end')    pool.close()    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释掉,那么程序直接关闭。

??执行结果

主进程 752endin process 2348-->exec done: None 752in process 8364-->exec done: None 752in process 2348-->exec done: None 752#回调函数说明fun=Foo干不完就不执行bar函数,等Foo执行完就去执行bar#这个回调函数是主进程去调用的,而不是每个子进程去调用的。
六、子进程

??1、 很多时候子进程是一个外部进程,如执行一条命令,这和命令行执行效果是一样的 ??示例如下

import subprocessprint('$nslookup https://www.baIDu.com')r = subprocess.call(['nslookup','https://www.baIDu.com'])print('Exit code',r)

??2、 有时候子进程还需要进行输入,可以通过communicate方法来输入 ??示例如下

import subprocessprint('$ nslookup https://www.baIDu.com')p = subprocess.Popen(['nslookup'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)output,err = p.communicate(b'set q=mx\nbaIDu.com\nexit\n')print(output.decode('gbk'))print('Exit code:',p.returncode)

??输出如下

$ nslookup https://www.baIDu.com默认服务器:  bogonAddress:  192.168.111.1> > 服务器:  bogonAddress:  192.168.111.1baIDu.com   MX preference = 10,mail exchanger = mx.maillb.baIDu.combaIDu.com   MX preference = 20,mail exchanger = jpmx.baIDu.combaIDu.com   MX preference = 15,mail exchanger = mx.n.shifen.combaIDu.com   MX preference = 20,mail exchanger = mx50.baIDu.combaIDu.com   MX preference = 20,mail exchanger = mx1.baIDu.com> Exit code: 0
七、守护进程

守护进程在主进程代码执行完毕时立刻挂掉,然后主进程等待非守护进程执行完毕后回收子进程的资源(避免产生僵尸进程),整体才算结束。
示例

from multiprocessing import Processimport osimport timedef task(x):    print('%s is running ' %x)    time.sleep(3)    print('%s is done' %x)if __name__ == '__main__':    p1=Process(target=task,args=('守护进程',))    p2=Process(target=task,args=('子进程',))    p2.start()    p1.daemon=True   # 设置p1为守护进程    p1.start()    print('主进程代码执行完毕')>>:主进程代码执行完毕>>:子进程 is running>>:子进程 is done

==可以从结果看出,主进程代码执行完,守护进程立即挂掉,主进程在等待子进程执行完毕后退出==

八、进程间通信

??如果想要进程间通信可以使用QueuePipe来实现 ??使用Queue示例

from multiprocessing import Queue,Processdef put_ID(q):     q.put([1,2,3,4])if __name__ == '__main__':     q=Queue()     p=Process(target=put_ID,args=(q,))     p.start()     print(q.get())     p.join()# 输出[1,4]

==注意:在这需要从multiprocessing导入Queue模块==

??使用Pipe示例

from multiprocessing import Process,Pipedef put_ID(conn):    conn.send([1,3])    conn.send([4,5,6])    conn.close()    if __name__ == '__main__':    ## 生成管道。 生成时会产生两个返回对象,这两个对象相当于两端的电话,通过管道线路连接。    ## 两个对象分别交给两个变量。    parent_conn,child_conn=Pipe()    p=Process(target=put_ID,args=(child_conn,))#child_conn需要传给对端,用于send数据给parent_conn    p.start()    print(parent_conn.recv())  # parent_conn在这断用于接收数据>>>>[1,3]    print(parent_conn.recv())  # parent_conn在这断用于接收数据>>>>[4,6]    p.join()

==注意两端要发送次数和接受次数要对等,不然会卡住直到对等==

九、进程间数据共享(字典和列表型)

??前面说过,进程间数据是隔离的,如果想要进程间数据共享可以通过Manager来实现 ??示例如下

from multiprocessing import Manager,Processfrom random import randintimport osdef run(d,l):    d[randint(1,50)]=randint(51,100)#生成一个可在多个进程之间传递和共享的字典    l.append(os.getpID())    print(l)if __name__ == '__main__':    with Manager() as manage: #做一个别名,此时manager就相当于Manager()        d=manage.dict()#生成一个可在多个进程之间传递和共享的字典        l=manage.List(range(5))#生成一个可在多个进程之间传递和共享的列表        p_List=[]        for i in range(10):#生成10个进程            p=Process(target=run,args=(d,l))            p_List.append(p)# 将每个进程放入空列表中            p.start()        for i in p_List:            i.join()        print(d)#所有进程都执行完毕后打印字典        print(l)#所有进程都执行完毕后打印列表
十、分布式进程

??在做分布式计算时显然进程比线程各合适,一来进程更稳定,二来线程最多只能在同一台机器的多个cpu上运行; ??multiprocessingmanagers子模块支持把多进程分布到多个机器上,一个服务进程用作调度者,依靠网络将任务分布到其它多个进程中。 ??假设有一个需求,拥有两台机器,一台机器用来做发送任务的服务进程,一台用来做处理任务的服务进程; ??示例如下

# task_master.pyfrom multiprocessing.managers import BaseManagerfrom queue import Queueimport randomimport timetask_queue = Queue()result_queue = Queue()class QueueManager(BaseManager):        passdef get_task_queue():    global task_queue    return task_queuedef get_result_queue():    global result_queue    return result_queueif __name__ == '__main__':    # 将两个队列注册到网络上,calltable参数关联Queue对象    QueueManager.register('get_task_queue',callable=get_task_queue)    QueueManager.register('get_result_queue',callable=get_result_queue)    # 创建一个队列管理器,绑定端口5000,设定密码为abc    manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')    manager.start()    # 通过网络获取Queue对象    task = manager.get_task_queue()    result = manager.get_result_queue()    # 放任务进去    for i in range(10):        n = random.randint(0,1000)        print('Put Task %d'%n)        task.put(n)    # 从结果队列获取结果    print('Try get results')    for i in range(10):        r = result.get()        print('Result: %s' % r)    manager.shutdown()    print('master exit')

==注意:一定要用注册过的Queue对象,另外在linux/unix/mac等系统上注册可直接使用QueueManager.register(‘get_result_queue‘,callable=lambda : result_queue)==

# task_worker.pyfrom multiprocessing.managers import BaseManagerfrom queue import Queuefrom queue import Emptyimport timeclass QueueManager(BaseManager):    passif __name__ == '__main__':    # 从服务器上获取,所以注册时只需要提供名字,也就是接口名字    QueueManager.register('get_task_queue')    QueueManager.register('get_result_queue')    # 连接到服务器,也就是task_master.py的机器    server_addr = '127.0.0.1'    manager = QueueManager(address=(server_addr,authkey=b'abc')    manager.connect()    # 获取Queue对象    task = manager.get_task_queue()    result = manager.get_result_queue()    # 从队列提取任务,将处理结果插入result队列    for i in range(10):        try:            n = task.get(timeout=1)            print('run task %d*%d'%(n,n))            r = '%d * %d = %d'%(n,n,n*n)            time.sleep(1)            result.put(r)        except Empty:            print('task queue is empty')    print('worker exit')
总结

以上是内存溢出为你收集整理的python多进程,进程池,数据共享,进程通信,分布式进程全部内容,希望文章能够帮你解决python多进程,进程池,数据共享,进程通信,分布式进程所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1191285.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-03
下一篇 2022-06-03

发表评论

登录后才能评论

评论列表(0条)

保存