如果传递
zmq.NOBLOCKflag参数,zmq.Socket.recv将不会阻塞。
文档说:
If NOBLOCK is set, this method will raise a ZMQError with EAGAIN if a message is not ready.
zmq会将接收到的消息放入队列中,并且每次recv()调用都会返回一条消息,直到此队列耗尽为止,之后引发ZMQError。
下面示例中使用的zmq.Again是一个
wrapperfor zmq.EAGAIN。
例如:
while True: try: #check for a message, this will not block message = socket.recv(flags=zmq.NOBLOCK) #a message has been received print "Message received:", message except zmq.Again as e: print "No message received yet" # perform other important stuff time.sleep(10)
该
sub_client.py示例可能被编写为使用如下非阻塞行为:
import sys, timeimport zmqport = "5556"if len(sys.argv) > 1: port = sys.argv[1] int(port)# Socket to talk to servercontext = zmq.Context()socket = context.socket(zmq.SUB)print "Collecting updates from weather server..."socket.connect ("tcp://localhost:%s" % port)# Subscribe to zippre, default is NYC, 10001topicfilter = "10001"socket.setsockopt(zmq.SUBSCRIBE, topicfilter)# Process 5 updatestotal_value = 0received_value_count = 0do_receive_loop = Truewhile do_receive_loop: try: #process all messages waiting on subscribe socket while True: #check for a message, this will not block string = socket.recv(flags=zmq.NOBLOCK) #message received, process it topic, messagedata = string.split() total_value += int(messagedata) print ('{} {}'.format(topic, messagedata)) #check if we have all the messages we want received_value_count += 1 if received_value_count > 4: do_receive_loop = False break except zmq.Again as e: #No messages waiting to be processed pass #Here we can do other stuff while waiting for messages #contemplate answer to 'The Last Question' time.sleep(15) print "INSUFFICIENT DATA FOR MEANINGFUL ANSWER"print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/5)))
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)