需要导入的包
io.netty netty-all4.1.39.Final org.projectlombok lombok1.16.18 com.google.code.gson gson2.8.5 com.google.guava guava19.0 ch.qos.logback logback-classic1.2.3
@Slf4j public class ThreadServerTest { public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open()) { ssc.bind(new InetSocketAddress(8080)); //false为非阻塞模式 ssc.configureBlocking(false); Selector master = Selector.open(); //注册到selector ssc.register(master, SelectionKey.OP_ACCEPT); //负载均衡 Worker[] workers=new Worker[3]; for (int i = 0; i <3 ; i++) { workers[i]= new Worker("worker-"+i); } AtomicInteger ai=new AtomicInteger(1); while(true){ master.select(); SeteventKeys = master.selectedKeys(); Iterator iterator = eventKeys.iterator(); while (iterator.hasNext()){ log.info("connect success"); SelectionKey event = iterator.next(); //不移除,下次还会获取相同的key,导致空指针错误 iterator.remove(); if(event.isAcceptable()){ log.info("connect create"); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.info("address:{}",sc.getRemoteAddress()); workers[ai.getAndIncrement()%3].exe(sc); // sc.register(worker.worker,SelectionKey.OP_READ,buffer); log.info("after connect:{}",sc.getRemoteAddress()); } // else if(event.isReadable()){ // log.info("get into read"); // try { // SocketChannel sc = (SocketChannel)event.channel(); // ByteBuffer buffer = (ByteBuffer)event.attachment(); // int read = sc.read(buffer); // log.info("read count {}",read); // if(read==-1){ // //event.cancel(); // } // // log.info("limit {},position {}",buffer.limit(),buffer.position()); // if(buffer.limit()==buffer.position()){ // ByteBuffer newBuffer=ByteBuffer.allocate(buffer.capacity()<<1); // buffer.flip(); // newBuffer.put(buffer); // event.attach(newBuffer); // } // buffer.flip(); // debugRead(buffer); // }catch (Exception e){ // //event.cancel(); // System.out.println("connectint shut down"); // } // } } } } catch (IOException e) { e.printStackTrace(); } } } @Slf4j class Worker implements Runnable{ private Thread thread; private volatile Boolean start=false; private ConcurrentlinkedQueue queue; private Selector worker; private String name; public Worker(String name)throws IOException { this.name=name; this.worker=Selector.open(); //保证顺序,可以不用,通过代码顺序来实现 queue=new ConcurrentlinkedQueue<>(); thread=new Thread(this,name); } public void exe(SocketChannel socketChannel)throws IOException{ // ByteBuffer buffer = ByteBuffer.allocate(16); // socketChannel.register(worker,SelectionKey.OP_READ,buffer); if(!start){ thread.start(); } start=true; queue.add(()->{ ByteBuffer buffer = ByteBuffer.allocate(16); try { socketChannel.register(worker,SelectionKey.OP_READ,buffer); } catch (ClosedChannelException e) { e.printStackTrace(); } }); worker.wakeup(); } @Override public void run() { while (true){ try { log.info("go into worker "); worker.select(); Runnable task = queue.poll(); if(task!=null){ task.run(); } log.info("worker running"); Set events = worker.selectedKeys(); Iterator iterator = events.iterator(); while (iterator.hasNext()){ SelectionKey event = iterator.next(); iterator.remove(); if(event.isReadable()){ SocketChannel channel = (SocketChannel)event.channel(); log.info("address:{}",channel.getRemoteAddress()); ByteBuffer buffer = (ByteBuffer)event.attachment(); channel.read(buffer); buffer.flip(); debugRead(buffer); } } }catch (IOException e){ e.printStackTrace(); } } } }
worker.wakeup(); 这个方法是必须使用的,类似与lockSupport中的park和unpark的使用.如果不使用这个方法,进行阻塞唤醒,会造成,该线程处理后续其他机器的请求,不能注册到selector,需要等待
worker.select()的放行.
客户端代码,加断点调试
public static void main(String[] args) { try (SocketChannel channel = SocketChannel.open()) { channel.connect(new InetSocketAddress("localhost",8080)); // channel.write(Charset.defaultCharset().encode("hello4")); System.out.println("waiting");//此行加断点 } catch (IOException e) { e.printStackTrace(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)