服务端代码:
import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.concurrent.ConcurrentlinkedDeque; import java.util.concurrent.atomic.AtomicInteger; import static com.netty.c1.ByteBufferUtil.debugAll; @Slf4j public class MultiThreadServer { public static void main(String[] args) throws IOException { Thread.currentThread().setName("boss"); Selector boss = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.register(boss, SelectionKey.OP_ACCEPT, null); ssc.bind(new InetSocketAddress(8080)); // 创建固定数量的 worker 并初始化 // 注意如果代码跑在docker中,不是获取docker中的cpu数量,还是物理机的cpu数量 // 该bug在jdk10以后修复了 Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; for (int i = 0; i < workers.length; ++i) { workers[i] = new Worker("worker-" + i); } AtomicInteger atomicInteger = new AtomicInteger(); while (true) { boss.select(); Iteratoriter = boss.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected...{}", sc.getRemoteAddress()); log.debug("before register...{}", sc.getRemoteAddress()); workers[atomicInteger.getAndIncrement()% workers.length].register(sc); log.debug("after register...{}", sc.getRemoteAddress()); } } } } static class Worker implements Runnable { private Thread thread; private Selector selector; private String name; private volatile boolean start = false; // 用来控制一个work对象只跟一个线程关联 private ConcurrentlinkedDeque queue = new ConcurrentlinkedDeque<>(); // 用一个队列构建两个线程直接信息传递的通道 public Worker(String name) { this.name = name; } public void register(SocketChannel sc) throws IOException { if(!start) { thread = new Thread(this, name); selector = Selector.open(); thread.start(); start = true; } // 向队列中添加了任务,但是任务还没有被执行 queue.add(() ->{ try { sc.register(selector, SelectionKey.OP_READ, null); } catch (ClosedChannelException e) { e.printStackTrace(); } }); selector.wakeup(); // 为了唤醒worker线程中的select()方法 } @Override public void run() { while (true) { try { selector.select(); // 有事件发生或者主动调用selector的wakeup方法,解除阻塞 Runnable task = queue.poll(); if (task != null) { task.run(); } Iterator iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); channel.read(buffer); buffer.flip(); debugAll(buffer); } } } catch (IOException e) { e.printStackTrace(); } } } } }
- boos线程处理accept事件
- worker线程处理 read 和 write事件
- 事件发生在channel上
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)