传统的阻塞式方案通常在有新的链接时使用其他线程进行处理,每个线程负责处理一个socket,如
ServerSocket serverSocket = new ServerSocket(3000); while(true){ Socket socket = serverSocket.accept(); new Thread(()->{ ///socket.getInputStream() /// ... }).start(); }
这种方式的弊端在于,如果连接很多,创建大量线程会有很大的开销,最终导致资源耗尽。当然我们可以用线程池进行优化。比如
ServerSocket serverSocket = new ServerSocket(3000); ExecutorService executorService = Executors.newFixedThreadPool(200); while(true){ Socket socket = serverSocket.accept(); executorService.execute(()->{ // socket.getInputStream() //... }); }
使用线程池进行优化后,可以复用线程,减少了反复创建和销毁线程时的大量开销。然而,如果连接数量超过了线程池拥有的线程,那么连接就无法得到及时的响应。
在Web项目中,大部分等待时间并不是因为cpu需要处理复杂的运算,而是因为需要等待数据库或者其他网络响应而处于阻塞状态。阻塞时, *** 作系统会把线程挂起,等到socket有资源才会恢复线程。
比如 在刚才的 *** 作中,我们需要等待客户端发来的消息
InputStream inputStream = socket.getInputStream(); int read = inputStream.read();
read()是一个阻塞方法
public abstract int read() throws IOException;
看到这一句注释:This method blocks until input data is available, the end of the stream is detected, or an exception is thrown.
意味着该方法会被阻塞,直到有可用的数据到达。在阻塞的过程中,调用此方法的线程会被cpu挂起。
在挂起时 线程不会占用cpu时间片,但是这并不意味着线程不再有开销。如果大量线程处于挂起状态意味着需要付出大量的内存等资源。
为了解决这个问题,我们可以使用NIO。非阻塞的api来实现用少量线程服务多个连接。
关于NIO的知识不在此赘述。
通过NIO 我们可以通过异步回调的方式来处理代码。
以下服务端代码使用kotlin编写
class MyServer(private val port: Int) { val asynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newSingleThreadExecutor()) fun start() { val socket = AsynchronousServerSocketChannel.open(asynchronousChannelGroup) .bind(InetSocketAddress(port)) runBlocking { while (true) { val client = connect(socket) launch { while (true) { val message = readFromConnect(client) println("${Thread.currentThread().name}来自客户端${client.remoteAddress} 的消息 $message") if (message == "byen") { println("准备断开连接") writeIntoConnect(client, "bye, toon") break } else { writeIntoConnect(client, "我已经收到$message") } } println("连接已断开${client.remoteAddress}") } } } } // asynchronousChannelGroup.awaitTermination(0, TimeUnit.DAYS) }
我们来看下关键的connect方法
suspend fun connect(socket: AsynchronousServerSocketChannel) = suspendCoroutine { socket.accept(null, object : CompletionHandler { override fun completed(client: AsynchronousSocketChannel?, attachment: Any?) { println("${Thread.currentThread().name} 连接到服务端:${client!!.remoteAddress}") it.resume(client) } override fun failed(exc: Throwable?, attachment: Any?) { // TODO("Not yet implemented") } }) }
这是一个suspend方法,意味着调用该方法的协程将被挂起。
注意,这里挂起的是协程,而不是线程。当协程被挂起后,其线程可以转去执行其他任务。
当有一个新的链接时 通过执行回调
it.resume(client)
来唤醒协程并且传递返回值。
在launch中 我们开启了一个新的协程用来处理这个连接。
为了方便理解,我们用java和thread模拟一下这个思路
while(true){ Socket socket = serverSocket.accept(); executorService.execute(()->{ try { InputStream inputStream = socket.getInputStream(); while(true){ int read = inputStream.read(); //do something } } catch (IOException e) { e.printStackTrace(); } //... }); }
可以看到有两个while 其中外面一层while用于不断接收连接,里面一层while用于不断处理连接中接收到的数据。只不过这里我们把线程换成了协程。
在不断处理接收到的数据过程中,我们仍然需要进行等待,因为客户端不一定会不断地发送消息,如果用传统的BIO和线程处理,我们可能需要阻塞住来等待客户端的消息,每当有消息时,会恢复线程进行处理,线程调度的过程本身带来了一定的开销。
我们可以用协程解决这个问题。
while (true) { val message = readFromConnect(client) println("${Thread.currentThread().name}来自客户端${client.remoteAddress} 的消息 $message")
我们使用一个挂起方法来处理请求,如前一个挂起方法一样,这并不会阻塞线程。
suspend fun readFromConnect(client: AsynchronousSocketChannel) = suspendCoroutine{ val byteBuffer1: ByteBuffer = ByteBuffer.allocateDirect(2048) client.read(byteBuffer1, Attachment(byteBuffer1), object : CompletionHandler { override fun completed(result: Int?, attachment: Attachment?) { val byteBuffer = attachment!!.byteBuffer //此时里面已经有数据了 是之前写入的 现在要转换为读取模式 //此 *** 作把limit放到当前位置 来保证不会多读 byteBuffer.flip() val byteArray = ByteArray(byteBuffer.limit()) byteBuffer.get(byteArray) byteBuffer.clear() val message = String(byteArray) it.resume(message) } override fun failed(exc: Throwable?, attachment: Attachment?) { // TODO("Not yet implemented") } } ) }
在接收到消息后,我们通过ByteBuffer进行读取。
接下来,我们对读取到的消息进行一些处理
println("${Thread.currentThread().name}来自客户端${client.remoteAddress} 的消息 $message") if (message == "byen") { println("准备断开连接") writeIntoConnect(client, "bye, toon") break } else { writeIntoConnect(client, "我已经收到$message") }
NIO是非阻塞的, write方法也会立即返回
fun writeIntoConnect(client: AsynchronousSocketChannel, echoMessage: String) { val byteBuffer1: ByteBuffer = ByteBuffer.allocateDirect(2048) byteBuffer1.put(echoMessage.toByteArray()) byteBuffer1.flip() client.write(byteBuffer1) }
这样 我们就实现了通过少量——极端情况下一个线程就能处理很多的请求。
接下来进行以下测试,客户端代码是以前写的,仍然是BIO
class MyClint(val port: Int) { fun start() = runBlocking { val clientSocket = Socket("127.0.0.1", port) val scanner = Scanner(System.`in`) println("已与服务器建立连接") val outputStream = clientSocket.getOutputStream() val bufferedWriter = BufferedWriter(OutputStreamWriter(outputStream)) println("获取输出流成功") while (scanner.hasNext()) { val nextLine = scanner.nextLine() bufferedWriter.write(nextLine + "n") bufferedWriter.flush() val bufferedReader = BufferedReader(InputStreamReader(clientSocket.getInputStream())) //readLine接收到的没有换行符n val message = bufferedReader.readLine() ?: break println("来自服务端消息:${message}") if(message=="bye, too"){ clientSocket.close() println("连接已断开") break } } } } fun main() { MyClint(5000).start() }
我们来测试一下处理多个连接是否可以只用一个线程
注意 我们服务端使用的线程池里只有一个线程
val asynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newSingleThreadExecutor())
启动两个客户端进行连接
接下来处理客户端的消息
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)