Kotlin NIO 实现非阻塞echo服务器

Kotlin NIO 实现非阻塞echo服务器,第1张

Kotlin NIO 实现非阻塞echo服务器

传统的阻塞式方案通常在有新的链接时使用其他线程进行处理,每个线程负责处理一个socket,如

      ServerSocket serverSocket = new ServerSocket(3000);

            while(true){
                Socket socket = serverSocket.accept();
                new Thread(()->{
                    ///socket.getInputStream()
                   /// ...
                }).start();
            }

这种方式的弊端在于,如果连接很多,创建大量线程会有很大的开销,最终导致资源耗尽。当然我们可以用线程池进行优化。比如

         ServerSocket serverSocket = new ServerSocket(3000);
            ExecutorService executorService = Executors.newFixedThreadPool(200);
            while(true){
                Socket socket = serverSocket.accept();
                executorService.execute(()->{
                   // socket.getInputStream()
                    //...
                });
            }

使用线程池进行优化后,可以复用线程,减少了反复创建和销毁线程时的大量开销。然而,如果连接数量超过了线程池拥有的线程,那么连接就无法得到及时的响应。

在Web项目中,大部分等待时间并不是因为cpu需要处理复杂的运算,而是因为需要等待数据库或者其他网络响应而处于阻塞状态。阻塞时, *** 作系统会把线程挂起,等到socket有资源才会恢复线程。
比如 在刚才的 *** 作中,我们需要等待客户端发来的消息

          InputStream inputStream = socket.getInputStream();
                        int read = inputStream.read();  

read()是一个阻塞方法

    
    public abstract int read() throws IOException;

看到这一句注释:This method blocks until input data is available, the end of the stream is detected, or an exception is thrown.
意味着该方法会被阻塞,直到有可用的数据到达。在阻塞的过程中,调用此方法的线程会被cpu挂起。

在挂起时 线程不会占用cpu时间片,但是这并不意味着线程不再有开销。如果大量线程处于挂起状态意味着需要付出大量的内存等资源。

为了解决这个问题,我们可以使用NIO。非阻塞的api来实现用少量线程服务多个连接。
关于NIO的知识不在此赘述。

通过NIO 我们可以通过异步回调的方式来处理代码。
以下服务端代码使用kotlin编写

class MyServer(private val port: Int) {
    val asynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newSingleThreadExecutor())
    fun start() {
        val socket =
            AsynchronousServerSocketChannel.open(asynchronousChannelGroup)
                .bind(InetSocketAddress(port))
        runBlocking {
            while (true) {
                val client = connect(socket)
                launch {
                    while (true) {
                        val message = readFromConnect(client)
                        println("${Thread.currentThread().name}来自客户端${client.remoteAddress} 的消息 $message")
                        if (message == "byen") {
                            println("准备断开连接")
                            writeIntoConnect(client, "bye, toon")
                            break
                        } else {
                            writeIntoConnect(client, "我已经收到$message")
                        }
                    }
                    println("连接已断开${client.remoteAddress}")
                }

            }
        }
    }
    // asynchronousChannelGroup.awaitTermination(0, TimeUnit.DAYS)
}

我们来看下关键的connect方法

suspend fun connect(socket: AsynchronousServerSocketChannel) = suspendCoroutine {
    socket.accept(null, object : CompletionHandler {
        override fun completed(client: AsynchronousSocketChannel?, attachment: Any?) {
            println("${Thread.currentThread().name} 连接到服务端:${client!!.remoteAddress}")
            it.resume(client)
        }

        override fun failed(exc: Throwable?, attachment: Any?) {
            // TODO("Not yet implemented")
        }
    })
}

这是一个suspend方法,意味着调用该方法的协程将被挂起。
注意,这里挂起的是协程,而不是线程。当协程被挂起后,其线程可以转去执行其他任务。
当有一个新的链接时 通过执行回调

 it.resume(client)

来唤醒协程并且传递返回值。

在launch中 我们开启了一个新的协程用来处理这个连接。
为了方便理解,我们用java和thread模拟一下这个思路

 while(true){
                Socket socket = serverSocket.accept();
                executorService.execute(()->{
                    try {
                        InputStream inputStream = socket.getInputStream();
                        while(true){
                            int read = inputStream.read();
                            //do something
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    //...
                });
            }

可以看到有两个while 其中外面一层while用于不断接收连接,里面一层while用于不断处理连接中接收到的数据。只不过这里我们把线程换成了协程。

在不断处理接收到的数据过程中,我们仍然需要进行等待,因为客户端不一定会不断地发送消息,如果用传统的BIO和线程处理,我们可能需要阻塞住来等待客户端的消息,每当有消息时,会恢复线程进行处理,线程调度的过程本身带来了一定的开销。
我们可以用协程解决这个问题。

        while (true) {
                        val message = readFromConnect(client)
                        println("${Thread.currentThread().name}来自客户端${client.remoteAddress} 的消息 $message")

我们使用一个挂起方法来处理请求,如前一个挂起方法一样,这并不会阻塞线程。

suspend fun readFromConnect(client: AsynchronousSocketChannel) = suspendCoroutine {
    val byteBuffer1: ByteBuffer = ByteBuffer.allocateDirect(2048)
    client.read(byteBuffer1, Attachment(byteBuffer1), object : CompletionHandler {
        override fun completed(result: Int?, attachment: Attachment?) {
            val byteBuffer = attachment!!.byteBuffer
            //此时里面已经有数据了 是之前写入的 现在要转换为读取模式
            //此 *** 作把limit放到当前位置 来保证不会多读
            byteBuffer.flip()
            val byteArray = ByteArray(byteBuffer.limit())
            byteBuffer.get(byteArray)
            byteBuffer.clear()
            val message = String(byteArray)
            it.resume(message)
        }

        override fun failed(exc: Throwable?, attachment: Attachment?) {
            // TODO("Not yet implemented")
        }
    }
    )
}

在接收到消息后,我们通过ByteBuffer进行读取。
接下来,我们对读取到的消息进行一些处理

                    println("${Thread.currentThread().name}来自客户端${client.remoteAddress} 的消息 $message")
                        if (message == "byen") {
                            println("准备断开连接")
                            writeIntoConnect(client, "bye, toon")
                            break
                        } else {
                            writeIntoConnect(client, "我已经收到$message")
                        }

NIO是非阻塞的, write方法也会立即返回

fun writeIntoConnect(client: AsynchronousSocketChannel, echoMessage: String) {
    val byteBuffer1: ByteBuffer = ByteBuffer.allocateDirect(2048)
    byteBuffer1.put(echoMessage.toByteArray())
    byteBuffer1.flip()
    client.write(byteBuffer1)
}

这样 我们就实现了通过少量——极端情况下一个线程就能处理很多的请求。

接下来进行以下测试,客户端代码是以前写的,仍然是BIO

class MyClint(val port: Int) {
    fun start() = runBlocking {
        val clientSocket = Socket("127.0.0.1", port)
        val scanner = Scanner(System.`in`)
        println("已与服务器建立连接")
        val outputStream = clientSocket.getOutputStream()
        val bufferedWriter = BufferedWriter(OutputStreamWriter(outputStream))
        println("获取输出流成功")
        while (scanner.hasNext()) {
            val nextLine = scanner.nextLine()
            bufferedWriter.write(nextLine + "n")
            bufferedWriter.flush()
            val bufferedReader = BufferedReader(InputStreamReader(clientSocket.getInputStream()))
            //readLine接收到的没有换行符n
            val message = bufferedReader.readLine() ?: break

            println("来自服务端消息:${message}")
            if(message=="bye, too"){
                clientSocket.close()
                println("连接已断开")
                break
            }
        }
    }
}

fun main() {
    MyClint(5000).start()
}

我们来测试一下处理多个连接是否可以只用一个线程
注意 我们服务端使用的线程池里只有一个线程

 val asynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newSingleThreadExecutor())

启动两个客户端进行连接


接下来处理客户端的消息


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5722605.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存