本项目所有代码可见:https://github.com/weiyu-zeng/SimpleRPC
前言本次改进我们将引入zookeeper作为RPC框架的注册中心,服务端在zookeeper上注册自己的服务,而客户端调用服务,回去zookeeper上根据服务名寻找调用的服务器地址,使得我们RPC支持集群调度通信的能力。
实现 zookeeper安装与使用zookeeper安装请见:
【zookeeper】windows版zookeeper安装与启动 可能遇到的各种问题
安装好之后,我们打开zookeeper的server:
server启动如下:
开启zookeeper的client:
如下,说明成功启动了
按回车:
输入ls /我们查看目录:
到此为止,先放在这不要关,我们写代码去。
项目创建创建一个名为simpleRPC-06的module:
创建com.rpc的package:
依赖配置pom.xml
SimpleRPC org.example 1.0-SNAPSHOT 4.0.0 simpleRPC-068 8 org.projectlombok lombok1.18.12 provided io.netty netty-all4.1.51.Final com.alibaba fastjson1.2.67 org.apache.curator curator-recipes2.13.0 org.slf4j slf4j-nop1.7.30
请注意一下,curator必须要和zookeeper版本适配,如果curator版本太高,项目将无法运行。
我们在resources目录下配置一下 log4j的配置,文件名为 log4j.properties:
log4j.properties
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%nregister
我们创建一个名为register的package:
创建注册中心的注册服务接口ServiceRegister.java:
package com.rpc.register; import java.net.InetSocketAddress; public interface ServiceRegister { void register(String serviceName, InetSocketAddress serverAddress); InetSocketAddress serviceDiscovery(String serviceName); }
然后创建服务注册实现类 ZkServiceRegister.java:
package com.rpc.register; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import java.net.InetSocketAddress; import java.util.List; public class ZkServiceRegister implements ServiceRegister { // curator 提供的zookeeper客户端 private Curatorframework client; // zookeeper根路径结点 private static final String ROOT_PATH = "MyRPC"; // 构造方法 // 这里负责zookeeper客户端的初始化,并与zookeeper服务端建立连接。 // 初始化包括指定重连策略,指定连接zookeeper的端口,指定超时时间,指定命名空间 // 初始化完成之后start()开启zookeeper客户端。 public ZkServiceRegister() { // 重连策略:指数时间重试 RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); // zookeeper的地址固定,不管是服务提供者还是消费者,都要与之建立连接 // sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系, // zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍 // 使用心跳监听状态 this.client = CuratorframeworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(40000) .retryPolicy(policy) .namespace(ROOT_PATH) .build(); this.client.start(); System.out.println("zookeeper 连接成功"); } // 注册:传入服务方法名(String),传入主机名和端口号的套接字地址(InetSocketAddress) @Override public void register(String serviceName, InetSocketAddress serverAddress) { try { // serviceName创建成永久节点,服务提供者下线时,不删服务名,只删地址 Stat stat = client.checkExists().forPath("/" + serviceName); if (stat == null) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/" + serviceName); } // 路径地址,一个/代表一个节点 String path = "/" + serviceName + "/" + getServiceAddress(serverAddress); // 临时节点,服务器下线就删除节点 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); } catch (Exception e) { System.out.println("此服务已存在"); } } // 根据服务名返回地址 @Override public InetSocketAddress serviceDiscovery(String serviceName) { try { Liststrings = client.getChildren().forPath("/" + serviceName); // 这里默认用的第一个,后面加负载均衡 String string = strings.get(0); return parseAddress(string); } catch (Exception e) { e.printStackTrace(); } return null; } // 地址 -> XXX.XXX.XXX.XXX:port 字符串 private String getServiceAddress(InetSocketAddress serverAddress) { return serverAddress.getHostName() + ":" + serverAddress.getPort(); } // 字符串解析为地址:按照":"切分开,前半是host(String),后半是port(int) private InetSocketAddress parseAddress(String address) { String[] result = address.split(":"); return new InetSocketAddress(result[0], Integer.parseInt(result[1])); } }
接下来可以对service,client和server进行修改。
clientNettyRPCClient.java 做一点修改:
package com.rpc.client; import com.rpc.common.RPCRequest; import com.rpc.common.RPCResponse; import com.rpc.register.ServiceRegister; import com.rpc.register.ZkServiceRegister; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import java.net.InetSocketAddress; public class NettyRPCClient implements RPCClient { private static final Bootstrap bootstrap; private static final EventLoopGroup eventLoopGroup; private String host; private int port; private ServiceRegister serviceRegister; // ServiceRegister接口类class // 构造函数:初始化zookeeper public NettyRPCClient() { this.serviceRegister = new ZkServiceRegister(); } // netty客户端初始化,重复使用 static { eventLoopGroup = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new NettyClientInitializer()); } @Override public RPCResponse sendRequest(RPCRequest request) { InetSocketAddress address = serviceRegister.serviceDiscovery(request.getInterfaceName()); host = address.getHostName(); port = address.getPort(); try { ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); Channel channel = channelFuture.channel(); // 发送数据 channel.writeAndFlush(request); channel.closeFuture().sync(); // 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置) // AttributeKey是,线程隔离的,不会由线程安全问题。 // 实际上不应通过阻塞,可通过回调函数,后面可以再进行优化 AttributeKeykey = AttributeKey.valueOf("RPCResponse"); RPCResponse response = channel.attr(key).get(); System.out.println(response); return response; } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
TestClient.java 也做相应的修改:
package com.rpc.client; import com.rpc.common.Blog; import com.rpc.common.User; import com.rpc.service.BlogService; import com.rpc.service.UserService; public class TestClient { public static void main(String[] args) { // 不需传host,port RPCClient rpcClient = new NettyRPCClient(); // 把这个客户端传入代理客户端 RPCClientProxy rpcClientProxy = new RPCClientProxy(rpcClient); // 代理客户端根据不同的服务,获得一个代理类, 并且这个代理类的方法以或者增强(封装数据,发送请求) UserService userService = rpcClientProxy.getProxy(UserService.class); // 服务的方法1 User userByUserId = userService.getUserByUserId(10); System.out.println("从服务器端得到的user为:" + userByUserId); // 服务的方法2 User user = User.builder().userName("张三").id(100).sex(true).build(); Integer integer = userService.insertUserId(user); System.out.println("向服务器端插入数据" + integer); // 服务的方法3 BlogService blogService = rpcClientProxy.getProxy(BlogService.class); Blog blogById = blogService.getBlogById(10000); System.out.println("从服务端得到的blog为:" + blogById); } }
client中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:
RPCClient.java
package com.rpc.client; import com.rpc.common.RPCRequest; import com.rpc.common.RPCResponse; public interface RPCClient { RPCResponse sendRequest(RPCRequest request); }
RPCClientProxy.java
package com.rpc.client; import com.rpc.common.RPCRequest; import com.rpc.common.RPCResponse; import lombok.AllArgsConstructor; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; @AllArgsConstructor public class RPCClientProxy implements InvocationHandler { private RPCClient client; // jdk动态代理,每一次代理对象调用方法,会经过此方法增强(反射获取request对象,socket发送至客户端) @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // request的构建,使用了lombok中的builder,更加简洁 RPCRequest request = RPCRequest.builder().interfaceName(method.getDeclaringClass().getName()) .methodName(method.getName()) .params(args) .paramsTypes(method.getParameterTypes()) .build(); // 数据传输 RPCResponse response = client.sendRequest(request); // System.out.println(response); return response.getData(); }T getProxy(Class clazz) { Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this); return (T)o; } }
NettyClientInitializer.java
package com.rpc.client; import com.rpc.codec.JsonSerializer; import com.rpc.codec.MyDecode; import com.rpc.codec.MyEncode; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; public class NettyClientInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 使用自定义的编解码器 pipeline.addLast(new MyDecode()); // 编码需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的 pipeline.addLast(new MyEncode(new JsonSerializer())); pipeline.addLast(new NettyClientHandler()); } }
NettyClientHandler.java
package com.rpc.client; import com.rpc.common.RPCResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.AttributeKey; public class NettyClientHandler extends SimpleChannelInboundHandlerservice{ @Override protected void channelRead0(ChannelHandlerContext ctx, RPCResponse msg) throws Exception { // 接收到response, 给channel设计别名,让sendRequest里读取response AttributeKey key = AttributeKey.valueOf("RPCResponse"); ctx.channel().attr(key).set(msg); ctx.channel().close(); } // 跟NettyRPCServerHandler一样 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
服务暴露类加入注册的功能,ServiceProvider.java 做相应的修改:
package com.rpc.service; import com.rpc.register.ServiceRegister; import com.rpc.register.ZkServiceRegister; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; public class ServiceProvider { private MapinterfaceProvider; private ServiceRegister serviceRegister; private String host; private int port; public ServiceProvider(String host, int port){ // 需要传入服务端自身的服务的网络地址 this.host = host; this.port = port; this.interfaceProvider = new HashMap<>(); this.serviceRegister = new ZkServiceRegister(); } public void provideServiceInterface(Object service) throws Exception { Class>[] interfaces = service.getClass().getInterfaces(); for(Class clazz : interfaces){ // 本机的映射表 interfaceProvider.put(clazz.getName(),service); // 在注册中心注册服务 serviceRegister.register(clazz.getName(), new InetSocketAddress(host, port)); } } public Object getService(String interfaceName){ return interfaceProvider.get(interfaceName); } }
service中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:
BlogService.java
package com.rpc.service; import com.rpc.common.Blog; public interface BlogService { Blog getBlogById(Integer id); }
BlogServiceImpl.java
package com.rpc.service; import com.rpc.common.Blog; public class BlogServiceImpl implements BlogService { @Override public Blog getBlogById(Integer id) { Blog blog = Blog.builder() .id(id) .title("我的博客") .useId(22).build(); System.out.println("客户端查询了" + id + "博客"); return blog; } }
UserService.java
package com.rpc.service; import com.rpc.common.User; public interface UserService { // 客户端通过这个接口调用服务端的实现类 User getUserByUserId(Integer id); // 给这个服务增加一个功能 Integer insertUserId(User user); }
UserServiceImpl.java
package com.rpc.service; import com.rpc.common.User; public class UserServiceImpl implements UserService { @Override public User getUserByUserId(Integer id) { // 模拟从数据库中取用户的行为 User user = User.builder() .id(id) .userName("he2121") .sex(true).build(); System.out.println("客户端查询了" + id + "的用户"); return user; } @Override public Integer insertUserId(User user) { System.out.println("插入数据成功: " + user); return 1; } }server
TestServer.java 做相应的修改:
package com.rpc.server; import com.rpc.service.*; public class TestServer { public static void main(String[] args) throws Exception { UserService userService = new UserServiceImpl(); BlogService blogService = new BlogServiceImpl(); // 这里重用了服务暴露类,顺便在注册中心注册,实际上应分开,每个类做各自独立的事 ServiceProvider serviceProvider = new ServiceProvider("127.0.0.1", 8899); // 8899 serviceProvider.provideServiceInterface(userService); serviceProvider.provideServiceInterface(blogService); RPCServer RPCServer = new NettyRPCServer(serviceProvider); RPCServer.start(8899); } }
server中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:
RPCServer.java
package com.rpc.server; public interface RPCServer { void start(int port); void stop(); }
NettyRPCServer.java
package com.rpc.server; import com.rpc.service.ServiceProvider; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.AllArgsConstructor; @AllArgsConstructor public class NettyRPCServer implements RPCServer { private ServiceProvider serviceProvider; @Override public void start(int port) { // netty服务线程组负责建立连接(TCP/IP连接),work负责具体的请求 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); System.out.println("Netty服务端启动了"); try { // 启动Netty服务器 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 初始化 serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .childHandler(new NettyServerInitializer(serviceProvider)); // 同步阻塞 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); // 死循环监听 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } @Override public void stop() { } }
NettyServerInitializer.java
package com.rpc.server; import com.rpc.codec.JsonSerializer; import com.rpc.codec.MyDecode; import com.rpc.codec.MyEncode; import com.rpc.service.ServiceProvider; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import lombok.AllArgsConstructor; @AllArgsConstructor public class NettyServerInitializer extends ChannelInitializer{ private ServiceProvider serviceProvider; @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 使用自定义的解码器 pipeline.addLast(new MyDecode()); // 使用自定义的编码器,而且解码器需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的 pipeline.addLast(new MyEncode(new JsonSerializer())); pipeline.addLast(new NettyRPCServerHandler(serviceProvider)); } }
NettyRPCServerHandler.java
package com.rpc.server; import com.rpc.common.RPCRequest; import com.rpc.common.RPCResponse; import com.rpc.service.ServiceProvider; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.AllArgsConstructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @AllArgsConstructor public class NettyRPCServerHandler extends SimpleChannelInboundHandlercommon{ private ServiceProvider serviceProvider; @Override protected void channelRead0(ChannelHandlerContext ctx, RPCRequest msg) throws Exception { // System.out.println(msg); RPCResponse response = getResponse(msg); ctx.writeAndFlush(response); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } // 这里和WorkThread里的getResponse差不多 RPCResponse getResponse(RPCRequest request) { // 得到服务名 String interfaceName = request.getInterfaceName(); // 得到服务器相应类 Object service = serviceProvider.getService(interfaceName); // 反射调用方法 Method method = null; try { method = service.getClass().getMethod(request.getMethodName(), request.getParamsTypes()); Object invoke = method.invoke(service, request.getParams()); return RPCResponse.success(invoke); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { e.printStackTrace(); System.out.println("方法执行错误"); return RPCResponse.fail(); } } }
和simpleRPC-05一样,可以直接复制过来。
codec和simpleRPC-05一样,可以直接复制过来。
文件结构simpleRPC-06的文件结构如下:
运行启动TestServer.java :
然后启动TestClient.java:
我们来看看我们最开始开的zookeeper客户端:
现在输入ls /
发现我们多了一个结点 MyRPC:
输入ls /MyRPC:
可以看到我们注册的服务都在这里,成功!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)