netty是基于NIO(同步非阻塞)开发的网络通信框架;对比传统BIO(阻塞IO),其并发性能有很大提升。而dubbo的底层就是使用netty作为网络框架,本文就手写简单的基于netty的RPC框架。
1 设计步骤定义一个通用接口,作为服务提供者(provider)和消费者(consumer)之间的 *** 作纽带 创建一个服务提供者,实现通用接口,并返回处理结果;网络方面监听消费者请求 创建一个服务消费者,通过代理模式调用远程服务接口1.1 程序目录 1.2 定义一个通用接口
public interface TestService { String hello(String msg); }2 服务提供者模块 2.1 接口实现
public class TestServiceImpl implements TestService { @Override public String hello(String msg) { System.out.println("TestServiceImpl中hello被调用,参数:" + msg); return "你好客户端,我已经收到你的消息:" + msg; } }2.2 定义一个服务启动类
public class ServerBootStrap { public static void main(String[] args) { NettyServer nettyServer = new NettyServer(40004); nettyServer.init(); } }2.3 创建netty服务端
**此步骤是netty常规服务端创建方式**
public class NettyServer { private int port; public NettyServer(int port) { this.port = port; } public void init() { //创建一个用于接收连接的线程组,参数代表线程个数 EventLoopGroup boss = new NioEventLoopGroup(1); //创建处理 *** 作时间的线程组,没有参数netty会默认线程为内核数*2 EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer2.4 服务端业务处理Handler() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); } }); ChannelFuture ch = serverBootstrap.bind(port).sync(); ch.channel().closeFuture().sync(); } catch (Exception ex) { } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
channelRead0方法用于接收客户端传来的信息,同时对数据进行校验 校验成功后,截取有效参数调用服务接口
public class NettyServerHandler extends SimpleChannelInboundHandler { private static String head = "dubbo#TestServie#"; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("收到客户端数据:" + msg); if (msg.toString().startsWith(head)) { TestService testService = new TestServiceImpl(); String result = testService.hello(msg.toString().substring(head.length())); ctx.writeAndFlush(result); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("接收到连接请,channelActive被调用:" + ctx.channel().remoteAddress()); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("读取完成"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("断开连接"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("抛出异常"); ctx.channel().close(); } }3 消费者模块 3.1 创建消费者启动程序
public class ClientBootStrap { public static void main(String[] args) { NettyClient nettyClient = new NettyClient("127.0.0.1", 40004); String head = "dubbo#TestServie#"; // nettyClient.init(); TestService service = (TestService) nettyClient.getBean(TestService.class, head); String result = service.hello("你好,我是服务消费者"); System.out.println("调用返回了结果:" + result); } }3.2 创建消费者网络通信模块
通过代理模式调用
public class NettyClient { private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static NettyClientHandler nettyClientHandler; private String host; private int port; public NettyClient(String host, int port) { this.host = host; this.port = port; } //编写一个代理 请求服务提供者接口 public Object getBean(final Class> serviceClass, final String providerName) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class>[]{serviceClass}, ((proxy, method, args) -> { System.out.println("开始执行代理"); if (nettyClientHandler == null) init(); System.out.println("设置代理参数"); nettyClientHandler.setPara(providerName + args[0].toString()); return executorService.submit(nettyClientHandler).get(); })); } private static void init() { System.out.println("开始执行init方法"); nettyClientHandler = new NettyClientHandler(); NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(worker) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer3.3 创建消费者业务处理handler() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(nettyClientHandler); } }); bootstrap.connect("127.0.0.1", 40004).sync(); // future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // worker.shutdownGracefully(); // System.out.println("执行结束"); } } }
成员变量para: 为调用远程接口服务的参数 成员变量result::为调用远程服务器接口返回结果 需要注意的是该handller实现了Callable接口中call()方法; 执行步骤为: 1、连接建立成功后执行channelActive方法 2、执行call方法发送数据到服务端,同时阻塞线程 3、服务端返回结果后执行channelRead0方法,唤醒线程, 4、执行call方法中wait()后面的步骤,返回结果
public class NettyClientHandler extends SimpleChannelInboundHandler implements Callable { private ChannelHandlerContext context; private String para; private String result; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.context = ctx; } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("断开连接"); } @Override public synchronized Object call() throws Exception { System.out.println("发送call消息:" + para); context.writeAndFlush(para); wait(); return result; } @Override protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { result = o.toString(); System.out.println("收到服务端的返回消息:" + o); notify(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("客户端发生异常"); } void setPara(String str) { this.para = str; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)