有没有办法让REQ / REP让所有工人忙碌?我试图在指南中使用负载均衡器模式,但是对于单个客户端,sock.recv()将阻塞,直到它收到来自worker的响应.
这是代码,稍微修改了负载均衡器的zmq指南.启动一个客户端,10个工作人员和中间的负载均衡器/代理.我怎样才能让所有工人同时工作?
from __future__ import print_functionfrom multiprocessing import Processimport zmqimport timeimport uuIDimport randomdef clIEnt_task(): """Basic request-reply clIEnt using REQ socket.""" socket = zmq.Context().socket(zmq.REQ) socket.IDentity = str(uuID.uuID4()) socket.connect("ipc://frontend.ipc") # Send request,get reply for i in range(100): print("SENDING: ",i) socket.send('WORK') msg = socket.recv() print(msg)def worker_task(): """Worker task,using a REQ socket to do load-balancing.""" socket = zmq.Context().socket(zmq.REQ) socket.IDentity = str(uuID.uuID4()) socket.connect("ipc://backend.ipc") # Tell broker we're ready for work socket.send(b"READY") while True: address,empty,request = socket.recv_multipart() time.sleep(random.randint(1,4)) socket.send_multipart([address,b"",b"OK : " + str(socket.IDentity)])def broker(): context = zmq.Context() frontend = context.socket(zmq.ROUTER) frontend.bind("ipc://frontend.ipc") backend = context.socket(zmq.ROUTER) backend.bind("ipc://backend.ipc") # Initialize main loop state workers = [] poller = zmq.Poller() # Only poll for requests from backend until workers are available poller.register(backend,zmq.PolliN) while True: sockets = dict(poller.poll()) if backend in sockets: # Handle worker activity on the backend request = backend.recv_multipart() worker,clIEnt = request[:3] if not workers: # Poll for clIEnts Now that a worker is available poller.register(frontend,zmq.PolliN) workers.append(worker) if clIEnt != b"READY" and len(request) > 3: # If clIEnt reply,send rest back to frontend empty,reply = request[3:] frontend.send_multipart([clIEnt,reply]) if frontend in sockets: # Get next clIEnt request,route to last-used worker clIEnt,request = frontend.recv_multipart() worker = workers.pop(0) backend.send_multipart([worker,clIEnt,request]) if not workers: # Don't poll clIEnts if no workers are available poller.unregister(frontend) # Clean up backend.close() frontend.close() context.term()def main(): NUM_CLIENTS = 1 NUM_WORKERS = 10 # Start background tasks def start(task,*args): process = Process(target=task,args=args) process.start() start(broker) for i in range(NUM_CLIENTS): start(clIEnt_task) for i in range(NUM_WORKERS): start(worker_task) # Process(target=broker).start()if __name__ == "__main__": main()解决方法 我猜有不同的方法可以做到这一点:
例如,您可以使用线程模块从您的单个客户端启动所有请求,例如:
result_List = [] # Add the result to a List for the example rlock = threading.RLock()def clIEnt_thread(clIEnt_url,request,i): context = zmq.Context.instance() socket = context.socket(zmq.REQ) socket.setsockopt_string(zmq.IDENTITY,'{}'.format(i)) socket.connect(clIEnt_url) socket.send(request.encode()) reply = socket.recv() with rlock: result_List.append((i,reply)) returndef clIEnt_task(): # tasks = List with all your tasks url_clIEnt = "ipc://frontend.ipc" threads = [] for i in range(len(tasks)): thread = threading.Thread(target=clIEnt_thread,args=(url_clIEnt,tasks[i],i,)) thread.start() threads.append(thread)
– 你可以利用像asyncio这样的公平库(有一个子模块zmq.asyncio和另一个库aiozmq,最后一个提供更高级别的抽象).在这种情况下,您也会按顺序向工作人员发送请求,但不会阻止每个响应(因此不会保持主循环忙)并在返回主循环时获得结果.这看起来像这样:
import asyncioimport zmq.asyncioasync def clIEnt_async(request,context,clIEnt_url): """Basic clIEnt sending a request (REQ) to a ROUTER (the broker)""" socket = context.socket(zmq.REQ) socket.setsockopt_string(zmq.IDENTITY,'{}'.format(i)) socket.connect(clIEnt_url) await socket.send(request.encode()) reply = await socket.recv() socket.close() return replyasync def run(loop): # tasks = List full of tasks url_clIEnt = "ipc://frontend.ipc" asyncio_tasks = [] ctx = zmq.asyncio.Context() for i in range(len(tasks)): task = asyncio.ensure_future(clIEnt_async(tasks[i],ctx,url_clIEnt)) asyncio_tasks.append(task) responses = await asyncio.gather(*asyncio_tasks) return responseszmq.asyncio.install()loop = asyncio.get_event_loop()results = loop.run_until_complete(run(loop))
我没有测试这两个片段,但他们都来了(修改以适应问题)代码我使用zmq在类似的配置而不是你的问题.
总结以上是内存溢出为你收集整理的python – ZeroMQ:负载平衡许多工人和一个主人全部内容,希望文章能够帮你解决python – ZeroMQ:负载平衡许多工人和一个主人所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)