@[TOC] Python 多进程学习
前言研究回调函数中学习了Python的多进程, 记录备忘
常用的两种方式启动多进程
用multiprocessing 库的Process 或 Pool
感觉Pool比较方便
进程池Pool
from multiprocessing import Process, Pool import time import random import os def download(f): print('%s_ID= %s pid=%d,ppid=%d'%(str(time.ctime()),str(f),os.getpid(),os.getppid())) for i in range(3): print(f,'--文件--%d'%i) time.sleep(random.randint(5, 15)) if random.randint(1,9) >= 5: return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 0, "info": '下载失败!'} else: return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 1, "info": '下载完成!'} if __name__ == "__main__": p = Pool(2) result = [] result.append( p.map_async(func=download, iterable=(1111,2222,3333,4444,5555,6666))) #也可以用apply_async()函数 #result.append( p.apply_async(func=download, args=(1111,))) #result.append( p.apply_async(func=download, args=(2222,))) #result.append( p.apply_async(func=download, args=(3333,))) #result.append( p.apply_async(func=download, args=(4444,))) #result.append( p.apply_async(func=download, args=(5555,))) #result.append( p.apply_async(func=download, args=(6666,))) p.close()#关闭进程池,关闭后,p不再接收新的请求 print(str(time.ctime()) + "---start----") intCnt = 0 while intCnt < 5: print("%s: Now intCnt value is: %d sleep for 5s: "%( str(time.ctime()),intCnt)) time.sleep(5) intCnt +=1 print("%sNow print results"%time.ctime()) for i in range(6,2,-1): print(result[i-1].get()) print(result[0].get()) print('main process pid: %d, ppid:%d '%(os.getpid(),os.getppid())) print(str(time.ctime()) + '---end----')
close()函数作用是关闭进程池(关门,不接受新进程加入的请求)而不是关闭进程。关闭进程是terminate(). 进程是在调用apply_async()时就启动了,并不是调用close()函数后才启动的。事实上,没有close()也不影响读取map_async()和apply_async()的返回值,会阻塞主进程。map_async() 更适用于调用的处理程序是同一只,但是调用参数不一样的情况,所以批量调用,写法简单;而apply_async()则是一个一个地调用,适合于每次调用的处理程序可能不太一样的情况。实测两个方法在子进程出现错误后,进程都会被利用。只是apply_async()因为是单独处理的,所以正常运行的子程序返回值都可以取到。但是map_async()只要有一个出问题,会导致所有的返回值都取不到。如果没有其它阻塞主进程的动作,又想确保子进程能执行完毕,可以用join()函数来阻塞主进程。进程之间(包括主进程与子进程) 是独立的,所以不能在子进程中读取或设置主进程中的变量(反之亦然).map_async()和apply_async()都可以有optional的callback参数,可以指定进程结束后的回调函数。也可以理解为上例中的download函数也是回调函数. 注意,map_async()和apply_async()的返回值是由func参数指定的函数返回的,而不是由callback参数返回的。
from multiprocessing import Process,Pool import os import time from datetime import datetime import random def callback_setFlag(f): print('now this thread starts') print('Income parameter is:{}',f) keyboard.add_hotkey('ctrl+shift+a', lambda: print('test')) keyboard.wait('ctrl+alt+q') return {'result':0, 'Information': 'user pressed key to stop!'} def alterUser(msg): print(r"I'm messager: " + msg['Information']) return msg['result'] def setStopFlag(): global blStop print(r"Hahahaha, I'll let you stop!!!") blStop = True if __name__ == '__main__': blStop = False p = Pool(1) result = p.apply_async(func=callback_setFlag, args=('thread1',), callback=alterUser) p.close() keyboard.add_hotkey('ctrl+shift+t',setStopFlag) print('----------Start-----------------') intCnt = 0 while intCnt < 10 and blStop == False: print( "%s Print result from loop in main process, intCnt is: %d"%(time.ctime(),intCnt)) time.sleep(5) print("now wake up!") intCnt +=1 if blStop == False: print( 'Timeout!') else: print( 'End: stopped by user!') print(result.get()) #p.join() print('----------End-----------------')进程间通信
注意, 对于Pool,Queues要用mutiprocessing.Manager().Queue(), 不能直接用mutiprocessing.Queue()
from multiprocessing import Process,Pool,Pipe,Queue,Manager import os import time from datetime import datetime import random import itertools def callback_setFlag(q,f): print('now this thread starts:%d'%f) keyboard.add_hotkey('ctrl+shift+a', lambda: q.put(str(f) + ' send message: hot key pressed!')) keyboard.wait('ctrl+alt+q') q.put('%d send message: user decided to quit! Now try to read queue.') if not(q.empty()): print("%d read queue: %s" %(f, q.get(True, 2))) else: print("%d read queue: no message in queue!"%f) return {'result':0, 'Information': 'user pressed key to stop!'} def download(q,f): print('%s__进程池中的进程——pid=%d,ppid=%d'%(str(time.ctime()),os.getpid(),os.getppid())) for i in range(3): q.put('%d send message: now downloadfile %d'%(f,i)) if not(q.empty()): print("%d read queue: %s" %(f, q.get(True, 2))) print(f,'--文件--%d'%i) time.sleep(random.randint(5, 25)) # time.sleep(1) if random.randint(1,9) >= 5: return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 0, "info": '下载失败!'} else: return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 1, "info": '下载完成!'} def alterUser(msg): print(r"I'm messager: " + msg['Information']) return msg['result'] def setFlag(): global blStop print(r"Hahahaha, I'll let you stop!!!") blStop = True if __name__ == '__main__': blStop = False p = Pool(3) #conn1, conn2 = Pipe() q = Manager().Queue() result = [] result.append( p.apply_async(func=callback_setFlag, args=(q,0000,))) for i in (1111,7777, 1111): result.append(p.apply_async(func = download, args = (q,i,))) p.close() keyboard.add_hotkey('ctrl+shift+t',setFlag,args=None,suppress=True) print('----------Start-----------------') intCnt = 0 while intCnt < 50 and blStop == False: print( "%s Print result from loop in main process, intCnt is: %d"%(time.ctime(),intCnt)) q.put(r"From main process: I'm going to sleep now, wait for me!!") time.sleep(5) print("now wake up! Read Queue") if not(q.empty()): print("Main process read queue: %s" % q.get(True, 2)) else: print("From main process: no message!") intCnt +=1 if blStop == False: print( 'Timeout!') else: print( 'End: stopped by user!') #print(result.get()) #p.join() print('----------End-----------------')
Queue放进去,只能取出来一次,不是广播模式。如果想在各个进程间互相通信,估计需要设计成令牌模式,取出来,判断一下,如果是给自己的,则处理,不是给自己的,再放回去
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)