一、rpc生产端
自定义注解为
@Target({ElementType.METHOD,ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcService { String ServerName() default "" ; }
1、核心思路是实现 BeanPostProcessor,并自定义注解,注解用于定义实现类
public class RpcServerRegistrarBeanPostProcesser implements BeanPostProcessor { private RpcServerContext rpcServerContext; public RpcServerRegistrarBeanPostProcesser(RpcServerContext rpcServerContext) { this.rpcServerContext = rpcServerContext; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { RpcService annotation = bean.getClass().getAnnotation(RpcService.class); if(annotation!=null) { String serverName = annotation.ServerName(); if (serverName == null || "".equalsIgnoreCase(serverName.trim())) { serverName = bean.getClass().getInterfaces()[0].getName(); } rpcServerContext.addRpcServer(serverName, beanName); } return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName); } }
2、注册中心直接使用redis,将扫描到的带有自定义rpc服务生产者的类对应的netty的ip+端口号注册到redis,并在服务本地缓存一份
3、启动netty服务端监听对应端口
4、使用方式
@RpcService public class MyServiceServerImpl implements MyServiceServer { @Override public String hello() { return "hello"; } }
二、消费端
自定义注解为
@Target({ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) public @interface RpcAutowired { String ServerName() default "" ; }
1、核心思路是实现InstantiationAwareBeanPostProcessor接口,并实现
postProcessProperties方法,这个方法是在spring生命周期中填充属性的时候执行的回调,这个时候可以对这个bean的属性参数进行修改,这边主要是判断Bean的属性中有没有自定义注解为RpcAutowired的参数,这块代码可以参考jetcache的写法
package com.example.hzzrpcclient.config; import com.example.hzzrpcclient.config.annotations.RpcAutowired; import com.example.hzzrpcclient.config.netty.NettyMainClient; import com.example.hzzrpcclient.config.registercenter.RedisRegisterCenter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.PropertyValues; import org.springframework.beans.factory.BeanCreationException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Injectionmetadata; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor; import org.springframework.context.annotation.Configuration; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.lang.reflect.Proxy; import java.util.linkedList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Configuration public class RpcClientBeanPostProcesser implements InstantiationAwareBeanPostProcessor, BeanFactoryAware { @Autowired private RedisRegisterCenter redisRegisterCenter; @Autowired private NettyMainClient nettyMainClient; private ConfigurableListableBeanFactory beanFactory; private final MapinjectionmetadataCache = new ConcurrentHashMap (); @Override public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeanCreationException { Injectionmetadata metadata = findAutowiringmetadata(beanName, bean.getClass(), pvs); try { metadata.inject(bean, beanName, pvs); } catch (BeanCreationException ex) { throw ex; } catch (Throwable ex) { throw new BeanCreationException(beanName, "Injection of autowired dependencies failed", ex); } return pvs; } @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { if (!(beanFactory instanceof ConfigurableListableBeanFactory)) { throw new IllegalArgumentException( "AutowiredAnnotationBeanPostProcessor requires a ConfigurableListableBeanFactory"); } this.beanFactory = (ConfigurableListableBeanFactory) beanFactory; } private Injectionmetadata findAutowiringmetadata(String beanName, Class> clazz, PropertyValues pvs) { // Fall back to class name as cache key, for backwards compatibility with custom callers. String cacheKey = (StringUtils.hasLength(beanName) ? beanName : clazz.getName()); // Quick check on the concurrent map first, with minimal locking. Injectionmetadata metadata = this.injectionmetadataCache.get(cacheKey); if (Injectionmetadata.needsRefresh(metadata, clazz)) { synchronized (this.injectionmetadataCache) { metadata = this.injectionmetadataCache.get(cacheKey); if (Injectionmetadata.needsRefresh(metadata, clazz)) { try { metadata = buildAutowiringmetadata(clazz); this.injectionmetadataCache.put(cacheKey, metadata); } catch (NoClassDefFoundError err) { throw new IllegalStateException("Failed to introspect bean class [" + clazz.getName() + "] for autowiring metadata: could not find class that it depends on", err); } } } } return metadata; } private Injectionmetadata buildAutowiringmetadata(final Class> clazz) { linkedList elements = new linkedList (); Class> targetClass = clazz; do { final linkedList currElements = new linkedList (); doWithLocalFields(targetClass, new ReflectionUtils.FieldCallback() { @Override public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException { RpcAutowired ann = field.getAnnotation(RpcAutowired.class); if (ann != null) { if (Modifier.isStatic(field.getModifiers())) { if (log.isWarnEnabled()) { log.warn("RpcClient Autowired annotation is not supported on static fields: " + field); } return; } currElements.add(new AutowiredFieldElement(field, ann)); } } }); elements.addAll(0, currElements); targetClass = targetClass.getSuperclass(); } while (targetClass != null && targetClass != Object.class); return new Injectionmetadata(clazz, elements); } private void doWithLocalFields(Class clazz, ReflectionUtils.FieldCallback fieldCallback) { Field fs[] = clazz.getDeclaredFields(); for (Field field : fs) { try { fieldCallback.doWith(field); } catch (IllegalAccessException ex) { throw new IllegalStateException("Not allowed to access field '" + field.getName() + "': " + ex); } } } private class AutowiredFieldElement extends Injectionmetadata.InjectedElement { private Field field; private RpcAutowired ann; public AutowiredFieldElement(Field field, RpcAutowired ann) { super(field, null); this.field = field; this.ann = ann; } @Override protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable { RpcClientProxy rpcClientProxy=new RpcClientProxy(redisRegisterCenter,nettyMainClient); Object o = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class[]{field.getType()}, rpcClientProxy); field.setAccessible(true); field.set(bean, o); } } }
2、注入的属性应该是一个代理类,代理类里面使用netty客户端进行链接并发送数据,并使用雪花算法创建一个唯一id,在发送完数据之后进行线程自旋并阻塞等待服务端的消息发送过来
@Slf4j public class RpcClientProxy implements InvocationHandler { private RedisRegisterCenter redisRegisterCenter; private NettyMainClient nettyMainClient; public RpcClientProxy(RedisRegisterCenter redisRegisterCenter, NettyMainClient nettyMainClient) { this.redisRegisterCenter = redisRegisterCenter; this.nettyMainClient = nettyMainClient; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName();//方法名,hello() String serverName = method.getDeclaringClass().getName();//接口名:MyServiceServer,用来寻找服务 RpcServerDto rpcServerDto = redisRegisterCenter.searchRpcServer(serverName); Channel connect = nettyMainClient.connect(rpcServerDto.getIp(), rpcServerDto.getPort()); long start = System.currentTimeMillis(); String msgId = UUID.randomUUID().toString(); connect.writeAndFlush(new String( msgId ).getBytes()).addListener( future -> { long time = System.currentTimeMillis() - start; if (!future.isSuccess()) { log.error("下发失败,msgId=[{}],耗时[{}]ms", msgId, time); return; } if (time > 20000) { log.warn("指令 *** 作=[{}],下发异常耗时,msgId=[{}],耗时[{}]ms", 20000, msgId, time); return; } log.info("下发成功,msgId=[{}],耗时[{}]ms", msgId, time); }); // 阻塞 GuardedObjectgo = GuardedObject.create(msgId); String resPayloadPacket = go.getAndThrow(res ->{ return (res.indexOf(msgId)>-1); }, 50000, TimeUnit.MILLISECONDS); return resPayloadPacket; } }
3、使用方式
@Service public class RpcClientServiceimpl { @RpcAutowired private MyServiceServer myServiceServer; public String hello(){ long s = System.currentTimeMillis(); //获取开始时间 String hello = myServiceServer.hello(); long e = System.currentTimeMillis(); //获取结束时间 System.out.println(hello+"通过rpc远程调用获取到的值"+"用时:" + (e - s) + "ms"); return hello; } }
三、接口服务
gitee地址,写了2个多小时,后面直接偷懒累了,主要目的是学习spring。。。。所以代码很烂,所以就没有地址了
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)