基于springboot+netty自定义rpc框架(starter组件方式)

基于springboot+netty自定义rpc框架(starter组件方式),第1张

基于springboot+netty自定义rpc框架(starter组件方式)

一、rpc生产端
自定义注解

@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {

    
    String ServerName() default "" ;
}

1、核心思路是实现 BeanPostProcessor,并自定义注解,注解用于定义实现类

public class RpcServerRegistrarBeanPostProcesser  implements BeanPostProcessor {




    private RpcServerContext    rpcServerContext;


    public RpcServerRegistrarBeanPostProcesser(RpcServerContext rpcServerContext) {
        this.rpcServerContext = rpcServerContext;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        RpcService annotation = bean.getClass().getAnnotation(RpcService.class);
        if(annotation!=null) {
            String serverName = annotation.ServerName();
            if (serverName == null || "".equalsIgnoreCase(serverName.trim())) {
                serverName = bean.getClass().getInterfaces()[0].getName();
            }
            rpcServerContext.addRpcServer(serverName, beanName);
        }
        return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
    }


}

2、注册中心直接使用redis,将扫描到的带有自定义rpc服务生产者的类对应的netty的ip+端口号注册到redis,并在服务本地缓存一份
3、启动netty服务端监听对应端口
4、使用方式

@RpcService
public  class MyServiceServerImpl implements MyServiceServer {
    @Override
    public String hello() {
        return "hello";
    }
}

二、消费端
自定义注解为

@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAutowired {

    
    String ServerName() default "" ;
}

1、核心思路是实现InstantiationAwareBeanPostProcessor接口,并实现
postProcessProperties方法,这个方法是在spring生命周期中填充属性的时候执行的回调,这个时候可以对这个bean的属性参数进行修改,这边主要是判断Bean的属性中有没有自定义注解为RpcAutowired的参数,这块代码可以参考jetcache的写法

package com.example.hzzrpcclient.config;

import com.example.hzzrpcclient.config.annotations.RpcAutowired;
import com.example.hzzrpcclient.config.netty.NettyMainClient;
import com.example.hzzrpcclient.config.registercenter.RedisRegisterCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.PropertyValues;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Injectionmetadata;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.lang.reflect.Proxy;
import java.util.linkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


@Slf4j
@Configuration
public class RpcClientBeanPostProcesser implements InstantiationAwareBeanPostProcessor, BeanFactoryAware {

    @Autowired
    private RedisRegisterCenter redisRegisterCenter;

    @Autowired
    private NettyMainClient nettyMainClient;

    private ConfigurableListableBeanFactory beanFactory;

    private final Map injectionmetadataCache = new ConcurrentHashMap();

    @Override
    public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName)
            throws BeanCreationException {
        Injectionmetadata metadata = findAutowiringmetadata(beanName, bean.getClass(), pvs);
        try {
            metadata.inject(bean, beanName, pvs);
        } catch (BeanCreationException ex) {
            throw ex;
        } catch (Throwable ex) {
            throw new BeanCreationException(beanName, "Injection of autowired dependencies failed", ex);
        }
        return pvs;
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        if (!(beanFactory instanceof ConfigurableListableBeanFactory)) {
            throw new IllegalArgumentException(
                    "AutowiredAnnotationBeanPostProcessor requires a ConfigurableListableBeanFactory");
        }
        this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
    }


    private Injectionmetadata findAutowiringmetadata(String beanName, Class clazz, PropertyValues pvs) {
        // Fall back to class name as cache key, for backwards compatibility with custom callers.
        String cacheKey = (StringUtils.hasLength(beanName) ? beanName : clazz.getName());
        // Quick check on the concurrent map first, with minimal locking.
        Injectionmetadata metadata = this.injectionmetadataCache.get(cacheKey);
        if (Injectionmetadata.needsRefresh(metadata, clazz)) {
            synchronized (this.injectionmetadataCache) {
                metadata = this.injectionmetadataCache.get(cacheKey);
                if (Injectionmetadata.needsRefresh(metadata, clazz)) {
                    try {
                        metadata = buildAutowiringmetadata(clazz);
                        this.injectionmetadataCache.put(cacheKey, metadata);
                    } catch (NoClassDefFoundError err) {
                        throw new IllegalStateException("Failed to introspect bean class [" + clazz.getName() +
                                "] for autowiring metadata: could not find class that it depends on", err);
                    }
                }
            }
        }
        return metadata;
    }


    private Injectionmetadata buildAutowiringmetadata(final Class clazz) {
        linkedList elements = new linkedList();
        Class targetClass = clazz;

        do {
            final linkedList currElements =
                    new linkedList();

            doWithLocalFields(targetClass, new ReflectionUtils.FieldCallback() {
                @Override
                public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                    RpcAutowired ann = field.getAnnotation(RpcAutowired.class);
                    if (ann != null) {
                        if (Modifier.isStatic(field.getModifiers())) {
                            if (log.isWarnEnabled()) {
                                log.warn("RpcClient Autowired annotation is not supported on static fields: " + field);
                            }
                            return;
                        }
                        currElements.add(new AutowiredFieldElement(field, ann));
                    }
                }
            });

            elements.addAll(0, currElements);
            targetClass = targetClass.getSuperclass();
        }
        while (targetClass != null && targetClass != Object.class);

        return new Injectionmetadata(clazz, elements);
    }


    private void doWithLocalFields(Class clazz, ReflectionUtils.FieldCallback fieldCallback) {
        Field fs[] = clazz.getDeclaredFields();
        for (Field field : fs) {
            try {
                fieldCallback.doWith(field);
            } catch (IllegalAccessException ex) {
                throw new IllegalStateException("Not allowed to access field '" + field.getName() + "': " + ex);
            }
        }
    }


    private class AutowiredFieldElement extends Injectionmetadata.InjectedElement {

        private Field field;
        private RpcAutowired ann;

        public AutowiredFieldElement(Field field, RpcAutowired ann) {
            super(field, null);
            this.field = field;
            this.ann = ann;
        }

        @Override
        protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
            RpcClientProxy rpcClientProxy=new RpcClientProxy(redisRegisterCenter,nettyMainClient);
            Object o = Proxy.newProxyInstance(field.getType().getClassLoader(),
                    new Class[]{field.getType()}, rpcClientProxy);
            field.setAccessible(true);
            field.set(bean, o);
        }
    }
}

2、注入的属性应该是一个代理类,代理类里面使用netty客户端进行链接并发送数据,并使用雪花算法创建一个唯一id,在发送完数据之后进行线程自旋并阻塞等待服务端的消息发送过来

@Slf4j
public class RpcClientProxy implements InvocationHandler {

    private RedisRegisterCenter redisRegisterCenter;

    private NettyMainClient nettyMainClient;

    public RpcClientProxy(RedisRegisterCenter redisRegisterCenter, NettyMainClient nettyMainClient) {
        this.redisRegisterCenter = redisRegisterCenter;
        this.nettyMainClient = nettyMainClient;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();//方法名,hello()
        String serverName = method.getDeclaringClass().getName();//接口名:MyServiceServer,用来寻找服务
        RpcServerDto rpcServerDto = redisRegisterCenter.searchRpcServer(serverName);
        Channel connect = nettyMainClient.connect(rpcServerDto.getIp(), rpcServerDto.getPort());
        long start = System.currentTimeMillis();

        String msgId = UUID.randomUUID().toString();
        connect.writeAndFlush(new String( msgId ).getBytes()).addListener(
                future -> {
                    long time = System.currentTimeMillis() - start;
                    if (!future.isSuccess()) {
                        log.error("下发失败,msgId=[{}],耗时[{}]ms", msgId, time);
                        return;
                    }
                    if (time > 20000) {
                        log.warn("指令 *** 作=[{}],下发异常耗时,msgId=[{}],耗时[{}]ms", 20000, msgId, time);
                        return;
                    }
                    log.info("下发成功,msgId=[{}],耗时[{}]ms", msgId, time);
                });

        // 阻塞
        GuardedObject go = GuardedObject.create(msgId);
        String resPayloadPacket = go.getAndThrow(res ->{
            return (res.indexOf(msgId)>-1);
        }, 50000, TimeUnit.MILLISECONDS);

        return resPayloadPacket;
    }

}

3、使用方式

@Service
public class RpcClientServiceimpl {

    @RpcAutowired
    private MyServiceServer myServiceServer;



    public String hello(){
          long s = System.currentTimeMillis();   //获取开始时间

        String hello = myServiceServer.hello();
        long e = System.currentTimeMillis(); //获取结束时间

        System.out.println(hello+"通过rpc远程调用获取到的值"+"用时:" + (e - s) + "ms");
        return hello;
    }
}

三、接口服务

gitee地址,写了2个多小时,后面直接偷懒累了,主要目的是学习spring。。。。所以代码很烂,所以就没有地址了

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5697306.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存