您将需要一个
multiprocessing.Pipe或一个
multiprocessing.Queue将结果发送回您的父流程。如果仅执行I /
0,则应使用a
Thread而不是a
Process,因为它更轻巧,并且大多数时间都花在等待上。我向您展示了一般情况下进程和线程是如何完成的。
队列处理
多处理队列建立在管道的顶部,访问与锁/信号量同步。队列是线程安全和进程安全的,这意味着您可以将一个队列用于多个生产者/消费者进程,甚至这些进程中的多个线程。在队列中添加第一项也会在调用过程中启动一个供料器线程。在
multiprocessing.Queue单个生产者/单个消费者的情况下使用管道的制造商的额外开销更可取,并且性能更高。
以下是发送和检索结果的方法
multiprocessing.Queue:
from multiprocessing import Process, QueueSENTINEL = 'SENTINEL'def sim_busy(out_queue, x): for _ in range(int(x)): assert 1 == 1 result = x out_queue.put(result) # If all results are enqueued, send a sentinel-value to let the parent know # no more results will come. out_queue.put(SENTINEL)if __name__ == '__main__': out_queue = Queue() p = Process(target=sim_busy, args=(out_queue, 150e6)) # 150e6 == 150000000.0 p.start() for result in iter(out_queue.get, SENTINEL): # sentinel breaks the loop print(result)
队列作为参数传递到函数中,结果
.put()在队列中,而父对象
get.()在队列中。
.get()是一个阻塞调用,执行不恢复,直到事情 是
得到(指定超时参数是可能的)。请注意,
sim_busy这里所做的工作是CPU密集型的,那就是您选择进程而不是线程。
工艺与管道
对于一对一连接,管道就足够了。设置几乎相同,只是方法的名称不同,并且调用
Pipe()返回两个连接对象。在双工模式下,两个对象都是读写端,
duplex=False(简单)第一个连接对象是管道的读端,第二个对象是写端。在这种基本情况下,我们只需要一个单纯形管道:
from multiprocessing import Process, PipeSENTINEL = 'SENTINEL'def sim_busy(write_conn, x): for _ in range(int(x)): assert 1 == 1 result = x write_conn.send(result) # If all results are send, send a sentinel-value to let the parent know # no more results will come. write_conn.send(SENTINEL)if __name__ == '__main__': # duplex=False because we just need one-way communication in this case. read_conn, write_conn = Pipe(duplex=False) p = Process(target=sim_busy, args=(write_conn, 150e6)) # 150e6 == 150000000.0 p.start() for result in iter(read_conn.recv, SENTINEL): # sentinel breaks the loop print(result)
线程和队列
要与线程一起使用,您想要切换到
queue.Queue。
queue.Queue是在之上构建的
collections.deque,添加了一些锁以使其具有线程安全性。与多重处理的队列和管道不同,放置在
queue.Queue容器上的对象不会被腌制。由于线程共享相同的内存地址空间,因此无需进行用于内存复制的序列化,因此仅传输指针。
from threading import Threadfrom queue import Queueimport timeSENTINEL = 'SENTINEL'def sim_io(out_queue, query): time.sleep(1) result = query + '_result' out_queue.put(result) # If all results are enqueued, send a sentinel-value to let the parent know # no more results will come. out_queue.put(SENTINEL)if __name__ == '__main__': out_queue = Queue() p = Thread(target=sim_io, args=(out_queue, 'my_query')) p.start() for result in iter(out_queue.get, SENTINEL): # sentinel-value breaks the loop print(result)
- 如果可能的话,请在这里阅读为什么
for result in iter(out_queue.get, SENTINEL):
比while True...break
设置更受欢迎。 - 在这里阅读为什么要
if __name__ == '__main__':
在所有脚本中使用,尤其是在多处理中。 - 更多关于-
get()
用法的信息。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)