Python 多进程学习

Python 多进程学习,第1张

Python 多进程学习

@[TOC] Python 多进程学习

前言

研究回调函数中学习了Python的多进程, 记录备忘

常用的两种方式启动多进程
用multiprocessing 库的Process 或 Pool
感觉Pool比较方便 进程池Pool

from multiprocessing import Process, Pool
import time
import random
import os

def download(f):
    print('%s_ID= %s pid=%d,ppid=%d'%(str(time.ctime()),str(f),os.getpid(),os.getppid()))
    for i in range(3):
        print(f,'--文件--%d'%i)
        time.sleep(random.randint(5, 15))
    
    if random.randint(1,9) >= 5:
        return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 0, "info": '下载失败!'}
    else:
        return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 1, "info": '下载完成!'}

if __name__ == "__main__":
    p = Pool(2) 
    result = []
    result.append( p.map_async(func=download, iterable=(1111,2222,3333,4444,5555,6666)))
    #也可以用apply_async()函数
    #result.append( p.apply_async(func=download, args=(1111,)))               
    #result.append( p.apply_async(func=download, args=(2222,)))
    #result.append( p.apply_async(func=download, args=(3333,)))
    #result.append( p.apply_async(func=download, args=(4444,)))
    #result.append( p.apply_async(func=download, args=(5555,)))
    #result.append( p.apply_async(func=download, args=(6666,))) 
    p.close()#关闭进程池,关闭后,p不再接收新的请求
    print(str(time.ctime()) + "---start----")

    intCnt = 0
    while intCnt < 5:    
        print("%s: Now intCnt value is: %d sleep for 5s:     "%( str(time.ctime()),intCnt))
        time.sleep(5)
        intCnt +=1
        
    print("%sNow print results"%time.ctime())
    for i in range(6,2,-1):
        print(result[i-1].get())
    print(result[0].get())
    print('main process pid: %d, ppid:%d '%(os.getpid(),os.getppid()))
    print(str(time.ctime()) + '---end----')

close()函数作用是关闭进程池(关门,不接受新进程加入的请求)而不是关闭进程。关闭进程是terminate(). 进程是在调用apply_async()时就启动了,并不是调用close()函数后才启动的。事实上,没有close()也不影响读取map_async()和apply_async()的返回值,会阻塞主进程。map_async() 更适用于调用的处理程序是同一只,但是调用参数不一样的情况,所以批量调用,写法简单;而apply_async()则是一个一个地调用,适合于每次调用的处理程序可能不太一样的情况。实测两个方法在子进程出现错误后,进程都会被利用。只是apply_async()因为是单独处理的,所以正常运行的子程序返回值都可以取到。但是map_async()只要有一个出问题,会导致所有的返回值都取不到。如果没有其它阻塞主进程的动作,又想确保子进程能执行完毕,可以用join()函数来阻塞主进程。进程之间(包括主进程与子进程) 是独立的,所以不能在子进程中读取或设置主进程中的变量(反之亦然).map_async()和apply_async()都可以有optional的callback参数,可以指定进程结束后的回调函数。也可以理解为上例中的download函数也是回调函数. 注意,map_async()和apply_async()的返回值是由func参数指定的函数返回的,而不是由callback参数返回的。

from multiprocessing import Process,Pool
import os
import time
from datetime import datetime
import random

def callback_setFlag(f):
    print('now this thread starts')
    print('Income parameter is:{}',f)
   
    keyboard.add_hotkey('ctrl+shift+a', lambda: print('test'))
    keyboard.wait('ctrl+alt+q')
    return {'result':0, 'Information': 'user pressed key to stop!'}
    

def alterUser(msg):
    print(r"I'm messager: " + msg['Information'])
    return msg['result']

def setStopFlag():
    global blStop
    print(r"Hahahaha, I'll let you stop!!!")
    blStop = True
    
if __name__ == '__main__':
    blStop = False
    p = Pool(1)
    result = p.apply_async(func=callback_setFlag, args=('thread1',), callback=alterUser)
    p.close()
    keyboard.add_hotkey('ctrl+shift+t',setStopFlag)
    print('----------Start-----------------')
    
    intCnt = 0    
    while intCnt < 10 and blStop == False:
        print( "%s  Print result from loop in main process, intCnt is: %d"%(time.ctime(),intCnt))
        time.sleep(5)
        print("now wake up!")
        intCnt +=1
        
    if blStop == False:
        print( 'Timeout!')
    else:
        print( 'End: stopped by user!')
    print(result.get())
    #p.join()
    print('----------End-----------------')
进程间通信

注意, 对于Pool,Queues要用mutiprocessing.Manager().Queue(), 不能直接用mutiprocessing.Queue()

from multiprocessing import Process,Pool,Pipe,Queue,Manager
import os
import time
from datetime import datetime
import random
import itertools

def callback_setFlag(q,f):
    print('now this thread starts:%d'%f)

    keyboard.add_hotkey('ctrl+shift+a', lambda: q.put(str(f) + ' send message: hot key pressed!'))
    keyboard.wait('ctrl+alt+q')
    q.put('%d send message: user decided to quit! Now try to read queue.')
    if not(q.empty()):
        print("%d read queue: %s" %(f, q.get(True, 2)))
    else:
        print("%d read queue: no message in queue!"%f)
        
    return {'result':0, 'Information': 'user pressed key to stop!'}

def download(q,f):
    print('%s__进程池中的进程——pid=%d,ppid=%d'%(str(time.ctime()),os.getpid(),os.getppid()))
    for i in range(3):
        q.put('%d send message: now downloadfile %d'%(f,i))
        if not(q.empty()):
            print("%d read queue: %s" %(f, q.get(True, 2)))
        print(f,'--文件--%d'%i)
        time.sleep(random.randint(5, 25))
        # time.sleep(1)

    
    if random.randint(1,9) >= 5:
        return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 0, "info": '下载失败!'}
    else:
        return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 1, "info": '下载完成!'}    

def alterUser(msg):
    print(r"I'm messager: " + msg['Information'])
    return msg['result']

def setFlag():
    global blStop
    print(r"Hahahaha, I'll let you stop!!!")
    blStop = True
    
if __name__ == '__main__':
    blStop = False
    p = Pool(3)
    #conn1, conn2 = Pipe()
    q = Manager().Queue()
    result = []
    result.append( p.apply_async(func=callback_setFlag, args=(q,0000,)))
    for i in (1111,7777, 1111):
        result.append(p.apply_async(func = download, args = (q,i,)))

    p.close()
    keyboard.add_hotkey('ctrl+shift+t',setFlag,args=None,suppress=True)
    print('----------Start-----------------')
    
    intCnt = 0    
    while intCnt < 50 and blStop == False:
        print( "%s  Print result from loop in main process, intCnt is: %d"%(time.ctime(),intCnt))
        q.put(r"From main process: I'm going to sleep now, wait for me!!")
        time.sleep(5)
        print("now wake up! Read Queue")
        if not(q.empty()):
            print("Main process read queue: %s" % q.get(True, 2))
        else:
            print("From main process: no message!")
        intCnt +=1
        

    if blStop == False:
        print( 'Timeout!')
    else:
        print( 'End: stopped by user!')
    #print(result.get())
    #p.join()
    print('----------End-----------------')

Queue放进去,只能取出来一次,不是广播模式。如果想在各个进程间互相通信,估计需要设计成令牌模式,取出来,判断一下,如果是给自己的,则处理,不是给自己的,再放回去

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5720474.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存