python – ZeroMQ:负载平衡许多工人和一个主人

python – ZeroMQ:负载平衡许多工人和一个主人,第1张

概述假设我有一个主进程来划分要并行处理的数据.假设有1000个数据块和100个节点可用于运行计算. 有没有办法让REQ / REP让所有工人忙碌?我试图在指南中使用负载均衡器模式,但是对于单个客户端,sock.recv()将阻塞,直到它收到来自worker的响应. 这是代码,稍微修改了负载均衡器的zmq指南.启动一个客户端,10个工作人员和中间的负载均衡器/代理.我怎样才能让所有工人同时工作? fro 假设我有一个主进程来划分要并行处理的数据.假设有1000个数据块和100个节点可用于运行计算.

有没有办法让REQ / REP让所有工人忙碌?我试图在指南中使用负载均衡器模式,但是对于单个客户端,sock.recv()将阻塞,直到它收到来自worker的响应.

这是代码,稍微修改了负载均衡器的zmq指南.启动一个客户端,10个工作人员和中间的负载均衡器/代理.我怎样才能让所有工人同时工作?

from __future__ import print_functionfrom multiprocessing import Processimport zmqimport timeimport uuIDimport randomdef clIEnt_task():    """Basic request-reply clIEnt using REQ socket."""    socket = zmq.Context().socket(zmq.REQ)    socket.IDentity = str(uuID.uuID4())    socket.connect("ipc://frontend.ipc")    # Send request,get reply    for i in range(100):        print("SENDING: ",i)        socket.send('WORK')        msg = socket.recv()        print(msg)def worker_task():    """Worker task,using a REQ socket to do load-balancing."""    socket = zmq.Context().socket(zmq.REQ)    socket.IDentity = str(uuID.uuID4())    socket.connect("ipc://backend.ipc")    # Tell broker we're ready for work    socket.send(b"READY")    while True:        address,empty,request = socket.recv_multipart()        time.sleep(random.randint(1,4))        socket.send_multipart([address,b"",b"OK : " + str(socket.IDentity)])def broker():    context = zmq.Context()    frontend = context.socket(zmq.ROUTER)    frontend.bind("ipc://frontend.ipc")    backend = context.socket(zmq.ROUTER)    backend.bind("ipc://backend.ipc")    # Initialize main loop state    workers = []    poller = zmq.Poller()    # Only poll for requests from backend until workers are available    poller.register(backend,zmq.PolliN)    while True:        sockets = dict(poller.poll())        if backend in sockets:            # Handle worker activity on the backend            request = backend.recv_multipart()            worker,clIEnt = request[:3]            if not workers:                # Poll for clIEnts Now that a worker is available                poller.register(frontend,zmq.PolliN)            workers.append(worker)            if clIEnt != b"READY" and len(request) > 3:                # If clIEnt reply,send rest back to frontend                empty,reply = request[3:]                frontend.send_multipart([clIEnt,reply])        if frontend in sockets:            # Get next clIEnt request,route to last-used worker            clIEnt,request = frontend.recv_multipart()            worker = workers.pop(0)            backend.send_multipart([worker,clIEnt,request])            if not workers:                # Don't poll clIEnts if no workers are available                poller.unregister(frontend)    # Clean up    backend.close()    frontend.close()    context.term()def main():    NUM_CLIENTS = 1    NUM_WORKERS = 10    # Start background tasks    def start(task,*args):        process = Process(target=task,args=args)        process.start()    start(broker)    for i in range(NUM_CLIENTS):        start(clIEnt_task)    for i in range(NUM_WORKERS):        start(worker_task)    # Process(target=broker).start()if __name__ == "__main__":    main()
解决方法 我猜有不同的方法可以做到这一点:

例如,您可以使用线程模块从您的单个客户端启动所有请求,例如:

result_List = []  # Add the result to a List for the example rlock = threading.RLock()def clIEnt_thread(clIEnt_url,request,i):    context = zmq.Context.instance()    socket = context.socket(zmq.REQ)    socket.setsockopt_string(zmq.IDENTITY,'{}'.format(i))    socket.connect(clIEnt_url)    socket.send(request.encode())    reply = socket.recv()    with rlock:        result_List.append((i,reply))    returndef clIEnt_task():    # tasks = List with all your tasks    url_clIEnt = "ipc://frontend.ipc"    threads = []    for i in range(len(tasks)):        thread = threading.Thread(target=clIEnt_thread,args=(url_clIEnt,tasks[i],i,))        thread.start()        threads.append(thread)

– 你可以利用像asyncio这样的公平库(有一个子模块zmq.asyncio和另一个库aiozmq,最后一个提供更高级别的抽象).在这种情况下,您也会按顺序向工作人员发送请求,但不会阻止每个响应(因此不会保持主循环忙)并在返回主循环时获得结果.这看起来像这样:

import asyncioimport zmq.asyncioasync def clIEnt_async(request,context,clIEnt_url):    """Basic clIEnt sending a request (REQ) to a ROUTER (the broker)"""    socket = context.socket(zmq.REQ)    socket.setsockopt_string(zmq.IDENTITY,'{}'.format(i))    socket.connect(clIEnt_url)    await socket.send(request.encode())    reply = await socket.recv()    socket.close()    return replyasync def run(loop):    # tasks = List full of tasks    url_clIEnt = "ipc://frontend.ipc"    asyncio_tasks = []    ctx = zmq.asyncio.Context()    for i in range(len(tasks)):        task = asyncio.ensure_future(clIEnt_async(tasks[i],ctx,url_clIEnt))        asyncio_tasks.append(task)    responses = await asyncio.gather(*asyncio_tasks)    return responseszmq.asyncio.install()loop = asyncio.get_event_loop()results = loop.run_until_complete(run(loop))

我没有测试这两个片段,但他们都来了(修改以适应问题)代码我使用zmq在类似的配置而不是你的问题.

总结

以上是内存溢出为你收集整理的python – ZeroMQ:负载平衡许多工人和一个主人全部内容,希望文章能够帮你解决python – ZeroMQ:负载平衡许多工人和一个主人所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1197728.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-03
下一篇 2022-06-03

发表评论

登录后才能评论

评论列表(0条)

保存