多处理-管道与队列

多处理-管道与队列,第1张

多处理-管道与队列
  • A

    Pipe()
    只能有两个端点。

  • 一个

    Queue()
    可以有多个生产者和消费者。

何时使用它们

如果您需要两个以上的交流点,请使用

Queue()

如果您需要绝对的性能,那么a

Pipe()
会更快,因为
Queue()
它建立在之上
Pipe()

绩效基准

假设您要生成两个进程并在它们之间尽快发送消息。这些是使用

Pipe()
和进行类似测试之间的拖动竞赛的计时结果
Queue()
。这是在运行Ubuntu
11.10和Python 2.7.2的ThinkpadT61上进行的。

仅供参考,我把成绩

JoinableQueue()
作为奖金;
JoinableQueue()
queue.task_done()
调用时说明任务(它甚至不知道特定任务,它只计算队列中未完成的任务),以便
queue.join()
知道工作已完成。

此答案底部的每个代码…

mpenning@mpenning-T61:~$ python multi_pipe.py Sending 10000 numbers to Pipe() took 0.0369849205017 secondsSending 100000 numbers to Pipe() took 0.328398942947 secondsSending 1000000 numbers to Pipe() took 3.17266988754 secondsmpenning@mpenning-T61:~$ python multi_queue.py Sending 10000 numbers to Queue() took 0.105256080627 secondsSending 100000 numbers to Queue() took 0.980564117432 secondsSending 1000000 numbers to Queue() took 10.1611330509 secondsmpnening@mpenning-T61:~$ python multi_joinablequeue.py Sending 10000 numbers to JoinableQueue() took 0.172781944275 secondsSending 100000 numbers to JoinableQueue() took 1.5714070797 secondsSending 1000000 numbers to JoinableQueue() took 15.8527247906 secondsmpenning@mpenning-T61:~$

概括而言,

Pipe()
它的速度比速度快3倍
Queue()
JoinableQueue()
除非您确实必须有好处,否则请不要考虑。

奖励材料2

除非您知道一些捷径,否则多处理会在信息流中引入微妙的变化,使调试变得困难。例如,在许多情况下,当您通过字典建立索引时,您的脚本可能工作正常,但是某些输入很少会失败。

通常,当整个python进程崩溃时,我们会获得有关失败的线索;但是,如果多处理功能崩溃,则不会在控制台上打印未经请求的崩溃回溯。很难找到未知的多处理崩溃,而又不知道导致进程崩溃的线索。

我发现跟踪多处理崩溃信息的最简单方法是将整个多处理功能包装在

try
/中
except
并使用
traceback.print_exc()

import tracebackdef run(self, args):    try:        # Insert stuff to be multiprocessed here        return args[0]['that']    except:        print "FATAL: reader({0}) exited while multiprocessing".format(args)         traceback.print_exc()

现在,当您发现崩溃时,您会看到类似以下内容的信息:

FATAL: reader([{'crash': 'this'}]) exited while multiprocessingTraceback (most recent call last):  File "foo.py", line 19, in __init__    self.run(args)  File "foo.py", line 46, in run    KeyError: 'that'

源代码:


"""multi_pipe.py"""from multiprocessing import Process, Pipeimport timedef reader_proc(pipe):    ## Read from the pipe; this will be spawned as a separate Process    p_output, p_input = pipe    p_input.close()    # We are only reading    while True:        msg = p_output.recv()    # Read from the output pipe and do nothing        if msg=='DONE': breakdef writer(count, p_input):    for ii in xrange(0, count):        p_input.send(ii)  # Write 'count' numbers into the input pipe    p_input.send('DONE')if __name__=='__main__':    for count in [10**4, 10**5, 10**6]:        # Pipes are unidirectional with two endpoints:  p_input ------> p_output        p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process        reader_p = Process(target=reader_proc, args=((p_output, p_input),))        reader_p.daemon = True        reader_p.start()     # Launch the reader process        p_output.close()       # We no longer need this part of the Pipe()        _start = time.time()        writer(count, p_input) # Send a lot of stuff to reader_proc()        p_input.close()        reader_p.join()        print("Sending {0} numbers to Pipe() took {1} seconds".format(count, (time.time() - _start)))

"""multi_queue.py"""from multiprocessing import Process, Queueimport timeimport sysdef reader_proc(queue):    ## Read from the queue; this will be spawned as a separate Process    while True:        msg = queue.get()         # Read from the queue and do nothing        if (msg == 'DONE'): breakdef writer(count, queue):    ## Write to the queue    for ii in range(0, count):        queue.put(ii)  # Write 'count' numbers into the queue    queue.put('DONE')if __name__=='__main__':    pqueue = Queue() # writer() writes to pqueue from _this_ process    for count in [10**4, 10**5, 10**6]:          ### reader_proc() reads from pqueue as a separate process        reader_p = Process(target=reader_proc, args=((pqueue),))        reader_p.daemon = True        reader_p.start()        # Launch reader_proc() as a separate python process        _start = time.time()        writer(count, pqueue)    # Send a lot of stuff to reader()        reader_p.join()         # Wait for the reader to finish        print("Sending {0} numbers to Queue() took {1} seconds".format(count,  (time.time() - _start)))

"""multi_joinablequeue.py"""from multiprocessing import Process, JoinableQueueimport timedef reader_proc(queue):    ## Read from the queue; this will be spawned as a separate Process    while True:        msg = queue.get()         # Read from the queue and do nothing        queue.task_done()def writer(count, queue):    for ii in xrange(0, count):        queue.put(ii)  # Write 'count' numbers into the queueif __name__=='__main__':    for count in [10**4, 10**5, 10**6]:        jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process        # reader_proc() reads from jqueue as a different process...        reader_p = Process(target=reader_proc, args=((jqueue),))        reader_p.daemon = True        reader_p.start()     # Launch the reader process        _start = time.time()        writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)        jqueue.join()         # Wait for the reader to finish        print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,  (time.time() - _start)))


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5643704.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存