如果不去
MyProtocol上课,很难确切地说出正在发生的事情。这个问题听起来很像是由于您直接弄乱了底层函数,因此还有类的
state属性(
WebSocket实际上是WebSocket连接的内部状态的表示)引起的。
根据高速公路参考文档,
WebSicketProtocol您可以直接使用和覆盖的API来自:
- onOpen
- onMessage
- onClose
- 发信息
- sendClose
您使用
StringTransport来测试协议的方法并不理想。问题在于这样
MyProtocol一个事实,即
WebSocketProtocol高速公路上的框架上有一小层,无论好坏,它隐藏了有关管理连接,传输和内部协议状态的细节。
如果考虑一下,就不要测试
WebSocketProtocol,因此,如果不想嵌入虚拟服务器或客户端,最好的选择是直接测试
MyProtocol覆盖的方法。
我说的一个例子如下
class MyPublisher(object): cbk=None def publish(self, msg): if self.cbk: self.cbk(msg)class MyProtocol(WebSocketServerProtocol): def __init__(self, publisher): WebSocketServerProtocol.__init__(self) #Defining callback for publisher publisher.cbk = self.sendMessage def onMessage(self, msg, binary) #Stupid echo self.sendMessage(msg)class NotificationTest(unittest.TestCase): class MyProtocolFactory(WebSocketServerFactory): def __init__(self, publisher): WebSocketServerFactory.__init__(self, "ws://127.0.0.1:8081") self.publisher = publisher self.openHandshakeTimeout = None def buildProtocol(self, addr): protocol = MyProtocol(self.listener) protocol.factory = self protocol.websocket_version = 13 #Hybi version 13 is supported by pretty much everyone (apart from IE <8 and android browsers) return protocol def setUp(self): publisher = task.LoopingCall(self.send_stuff, "Hi there") factory = NotificationTest.MyProtocolFactory(listener) protocol = factory.buildProtocol(None) transport = proto_helpers.StringTransport() def play_dumb(*args): pass setattr(transport, "setTcpNoDelay", play_dumb) protocol.makeConnection(transport) self.protocol, self.transport, self.publisher, self.fingerprint_handler = protocol, transport, publisher, fingerprint_handler def test_onMessage(self): #Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with! self.protocol.state = WebSocketProtocol.STATE_OPEN self.protocol.websocket_version = 13 self.protocol.onMessage("Whatever") self.assertEqual(self.transport.value()[2:], 'Whatever') def test_push(self):#Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with! self.protocol.state = WebSocketProtocol.STATE_OPEN self.protocol.websocket_version = 13 self.publisher.publish("Hi there") self.assertEqual(self.transport.value()[2:], 'Hi There')
您可能已经注意到,使用
StringTransport此处非常麻烦。您必须了解下划线框架并绕过其状态管理,而这并不是您真正想要做的。不幸的是,高速公路没有提供允许立即进行状态 *** 作的即用型测试对象,因此我对使用虚拟服务器和客户端的建议仍然有效
使用网络测试服务器
所提供的测试显示了如何测试服务器推送,断言所获得的就是您的期望,并且还使用了如何确定何时完成的挂钩。
服务器协议客户端协议和工厂from twisted.trial.unittest import TestCase as TrialTestfrom autobahn.websocket import WebSocketServerProtocol, WebSocketServerFactory, WebSocketClientProtocol, WebSocketClientFactory, connectWS, listenWSfrom twisted.internet.defer import Deferredfrom twisted.internet import taskSTART="START"class TestServerProtocol(WebSocketServerProtocol): def __init__(self): #The publisher task simulates an event that triggers a message push self.publisher = task.LoopingCall(self.send_stuff, "Hi there") def send_stuff(self, msg): #this method sends a message to the client self.sendMessage(msg) def _on_start(self): #here we trigger the task to execute every second self.publisher.start(1.0) def onMessage(self, message, binary): #According to this stupid protocol, the server starts sending stuff when the client sends a "START" message #You can plug other commands in here {START : self._on_start#Put other keys here }[message]() def onClose(self, wasClean, pre, reason): #After closing the connection, we tell the task to stop sending messages self.publisher.stop()
下一类是客户端协议。它基本上告诉服务器开始推送消息。它调用
close_condition它们以查看是否该关闭连接,最后,
assertion它对收到的消息调用该函数以查看测试是否成功。
基于试验的测试class TestClientProtocol(WebSocketClientProtocol): def __init__(self, assertion, close_condition, timeout, *args, **kwargs): self.assertion = assertion self.close_condition = close_condition self._received_msgs = [] from twisted.internet import reactor #This is a way to set a timeout for your test #in case you never meet the conditions dictated by close_condition self.damocle_sword = reactor.callLater(timeout, self.sendClose) def onOpen(self): #After the connection has been established, #you can tell the server to send its stuff self.sendMessage(START) def onMessage(self, msg, binary): #Here you get the messages pushed from the server self._received_msgs.append(msg) #If it is time to close the connection if self.close_condition(msg): self.damocle_sword.cancel() self.sendClose() def onClose(self, wasClean, pre, reason): #Now it is the right time to check our test assertions self.assertion.callback(self._received_msgs)class TestClientProtocolFactory(WebSocketClientFactory): def __init__(self, assertion, close_condition, timeout, **kwargs): WebSocketClientFactory.__init__(self, **kwargs) self.assertion = assertion self.close_condition = close_condition self.timeout = timeout #This parameter needs to be forced to None to not leave the reactor dirty self.openHandshakeTimeout = None def buildProtocol(self, addr): protocol = TestClientProtocol(self.assertion, self.close_condition, self.timeout) protocol.factory = self return protocol
class WebSocketTest(TrialTest): def setUp(self): port = 8088 factory = WebSocketServerFactory("ws://localhost:{}".format(port)) factory.protocol = TestServerProtocol self.listening_port = listenWS(factory) self.factory, self.port = factory, port def tearDown(self): #cleaning up stuff otherwise the reactor complains self.listening_port.stopListening() def test_message_reception(self): #This is the test assertion, we are testing that the messages received were 3 def assertion(msgs): self.assertEquals(len(msgs), 3) #This class says when the connection with the server should be finalized. #In this case the condition to close the connectionis for the client to get 3 messages class CommunicationHandler(object): msg_count = 0 def close_condition(self, msg): self.msg_count += 1 return self.msg_count == 3 d = Deferred() d.addCallback(assertion) #Here we create the client... client_factory = TestClientProtocolFactory(d, CommunicationHandler().close_condition, 5, url="ws://localhost:{}".format(self.port)) #...and we connect it to the server connectWS(client_factory) #returning the assertion as a deferred purely for demonstration return d
显然,这只是一个示例,但是正如您所看到的,我不必弄乱
makeConnection或
transport明确地
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)