使用Autobahn WebSocket进行单元测试

使用Autobahn WebSocket进行单元测试,第1张

使用Autobahn WebSocket进行单元测试

如果不去

MyProtocol
上课,很难确切地说出正在发生的事情。这个问题听起来很像是由于您直接弄乱了底层函数,因此还有类的
state
属性(
WebSocket
实际上是WebSocket连接的内部状态的表示)引起的。

根据高速公路参考文档,

WebSicketProtocol
您可以直接使用和覆盖的API来自:

  • onOpen
  • onMessage
  • onClose
  • 发信息
  • sendClose

您使用

StringTransport
来测试协议的方法并不理想。问题在于这样
MyProtocol
一个事实,即
WebSocketProtocol
高速公路上的框架上有一小层,无论好坏,它隐藏了有关管理连接,传输和内部协议状态的细节。

如果考虑一下,就不要测试

WebSocketProtocol
,因此,如果不想嵌入虚拟服务器或客户端,最好的选择是直接测试
MyProtocol
覆盖的方法。

我说的一个例子如下

class MyPublisher(object):    cbk=None    def publish(self, msg):        if self.cbk: self.cbk(msg)class MyProtocol(WebSocketServerProtocol):    def __init__(self, publisher):        WebSocketServerProtocol.__init__(self)        #Defining callback for publisher        publisher.cbk = self.sendMessage    def onMessage(self, msg, binary)        #Stupid echo        self.sendMessage(msg)class NotificationTest(unittest.TestCase):    class MyProtocolFactory(WebSocketServerFactory):        def __init__(self, publisher): WebSocketServerFactory.__init__(self, "ws://127.0.0.1:8081") self.publisher = publisher self.openHandshakeTimeout = None        def buildProtocol(self, addr): protocol =  MyProtocol(self.listener) protocol.factory = self protocol.websocket_version = 13 #Hybi version 13 is supported by pretty much everyone (apart from IE <8 and android browsers) return protocol    def setUp(self):        publisher = task.LoopingCall(self.send_stuff, "Hi there")     factory = NotificationTest.MyProtocolFactory(listener)        protocol = factory.buildProtocol(None)        transport = proto_helpers.StringTransport()        def play_dumb(*args): pass        setattr(transport, "setTcpNoDelay", play_dumb)        protocol.makeConnection(transport)        self.protocol, self.transport, self.publisher, self.fingerprint_handler =  protocol, transport, publisher, fingerprint_handler    def test_onMessage(self):        #Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with!        self.protocol.state = WebSocketProtocol.STATE_OPEN        self.protocol.websocket_version = 13        self.protocol.onMessage("Whatever")        self.assertEqual(self.transport.value()[2:], 'Whatever')    def test_push(self):#Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with!        self.protocol.state = WebSocketProtocol.STATE_OPEN        self.protocol.websocket_version = 13        self.publisher.publish("Hi there")        self.assertEqual(self.transport.value()[2:], 'Hi There')

您可能已经注意到,使用

StringTransport
此处非常麻烦。您必须了解下划线框架并绕过其状态管理,而这并不是您真正想要做的。不幸的是,高速公路没有提供允许立即进行状态 *** 作的即用型测试对象,因此我对使用虚拟服务器和客户端的建议仍然有效


使用网络测试服务器

所提供的测试显示了如何测试服务器推送,断言所获得的就是您的期望,并且还使用了如何确定何时完成的挂钩。

服务器协议
from twisted.trial.unittest import TestCase as TrialTestfrom autobahn.websocket import WebSocketServerProtocol, WebSocketServerFactory, WebSocketClientProtocol, WebSocketClientFactory, connectWS, listenWSfrom twisted.internet.defer import Deferredfrom twisted.internet import taskSTART="START"class TestServerProtocol(WebSocketServerProtocol):    def __init__(self):        #The publisher task simulates an event that triggers a message push        self.publisher = task.LoopingCall(self.send_stuff, "Hi there")    def send_stuff(self, msg):        #this method sends a message to the client        self.sendMessage(msg)    def _on_start(self):        #here we trigger the task to execute every second        self.publisher.start(1.0)    def onMessage(self, message, binary):        #According to this stupid protocol, the server starts sending stuff when the client sends a "START" message        #You can plug other commands in here        {START : self._on_start#Put other keys here        }[message]()    def onClose(self, wasClean, pre, reason):        #After closing the connection, we tell the task to stop sending messages        self.publisher.stop()
客户端协议和工厂

下一类是客户端协议。它基本上告诉服务器开始推送消息。它调用

close_condition
它们以查看是否该关闭连接,最后,
assertion
它对收到的消息调用该函数以查看测试是否成功。

class TestClientProtocol(WebSocketClientProtocol):    def __init__(self, assertion, close_condition, timeout, *args, **kwargs):        self.assertion = assertion        self.close_condition = close_condition        self._received_msgs = []         from twisted.internet import reactor        #This is a way to set a timeout for your test         #in case you never meet the conditions dictated by close_condition        self.damocle_sword = reactor.callLater(timeout, self.sendClose)    def onOpen(self):        #After the connection has been established,         #you can tell the server to send its stuff        self.sendMessage(START)    def onMessage(self, msg, binary):        #Here you get the messages pushed from the server        self._received_msgs.append(msg)        #If it is time to close the connection        if self.close_condition(msg): self.damocle_sword.cancel() self.sendClose()    def onClose(self, wasClean, pre, reason):        #Now it is the right time to check our test assertions        self.assertion.callback(self._received_msgs)class TestClientProtocolFactory(WebSocketClientFactory):    def __init__(self, assertion, close_condition, timeout, **kwargs):        WebSocketClientFactory.__init__(self, **kwargs)        self.assertion = assertion        self.close_condition = close_condition        self.timeout = timeout        #This parameter needs to be forced to None to not leave the reactor dirty        self.openHandshakeTimeout = None    def buildProtocol(self, addr):        protocol = TestClientProtocol(self.assertion, self.close_condition, self.timeout)        protocol.factory = self        return protocol
基于试验的测试
class WebSocketTest(TrialTest):    def setUp(self):        port = 8088        factory = WebSocketServerFactory("ws://localhost:{}".format(port))        factory.protocol = TestServerProtocol        self.listening_port = listenWS(factory)        self.factory, self.port = factory, port    def tearDown(self):        #cleaning up stuff otherwise the reactor complains        self.listening_port.stopListening()    def test_message_reception(self):         #This is the test assertion, we are testing that the messages received were 3        def assertion(msgs): self.assertEquals(len(msgs), 3)        #This class says when the connection with the server should be finalized.         #In this case the condition to close the connectionis for the client to get 3 messages        class CommunicationHandler(object): msg_count = 0 def close_condition(self, msg):     self.msg_count += 1     return self.msg_count == 3        d = Deferred()        d.addCallback(assertion)        #Here we create the client...        client_factory = TestClientProtocolFactory(d, CommunicationHandler().close_condition, 5, url="ws://localhost:{}".format(self.port))        #...and we connect it to the server        connectWS(client_factory)        #returning the assertion as a deferred purely for demonstration        return d

显然,这只是一个示例,但是正如您所看到的,我不必弄乱

makeConnection
transport
明确地



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5642918.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存