Ribbon从入门到源码解析

Ribbon从入门到源码解析,第1张

目录

1、简介

2、案例

2.1 搭建服务注册中心EurekaServer

2.2 搭建order-service服务

2.3 搭建user-service服务

2.4 服务启动

2.5 测试结果

3、Ribbon如何实现负载均衡

3.1 拦截http请求

3.2 解析请求中的服务名

3.3 根据服务名获取服务IP和Port信息

3.4 根据负载均衡策略发起http请求

4、简单源码解析

4.1 ILoadBalancer

4.2 AbstractLoadBalancer

4.3 BaseLoadBalancer

4.4 DynamicServerListLoadBalancer

4.5 ZoneAwareLoadBalancer


1、简介

在微服务架构中,服务拆分成一个个的微服务,并且以集群化的方式进行部署;此时服务与服务之间的调用变得复杂了起来,客户端需要自主选择调用服务端集群中的某个服务,这就是我们经常说到的客户端负载均衡,在Spring Cloud生态中使用的比较广泛的技术是Ribbon。


2、案例

无论是使用Fegin还是RestTemplate发起服务调用,客户端负载均衡均是通过Ribbon来实现,这里使用RestTemplate演示案例。


2.1 搭建服务注册中心EurekaServer
  • pom依赖


    org.springframework.cloud
    spring-cloud-starter-netflix-eureka-server

  • application.yml

server:
  port: 18888

spring:
  application:
    name: eurekaServer

eureka:
  client:
#    fetch-registry: false
#    register-with-eureka: false
    service-url:
      defaultZone: http://127.0.0.1:18888/eureka
  • 启动类

@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
    }

}
2.2 搭建order-service服务
  • pom依赖



      org.springframework.boot
      spring-boot-starter-web



      org.springframework.cloud
      spring-cloud-starter-netflix-eureka-client

  • application.yml

# server port
server:
  port: 18070

# name
spring:
  application:
    name: order-service

# eureka server
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:18888/eureka
  • 模拟业务代码

@RestController
@RequestMapping("order")
public class OrderController {
    
    @Autowired
    private OrderService orderService;
    
    @GetMapping("{orderId}")
    public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
        // 根据id查询订单并返回
        return orderService.queryOrderById(orderId);
    }
}
@Service
public class OrderService {

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private OrderMapper orderMapper;

    public Order queryOrderById(Long orderId) {
        // 1.查询订单
        Order order = orderMapper.findById(orderId);
        // 2、查询用户信息
        if (Objects.nonNull(order)) {
            String url = String.format("http://user-service/user/%s", order.getUserId());
            User user = restTemplate.getForObject(url, User.class);
            // 3、封装用户信息
            order.setUser(user);
        }
        // 4.返回
        return order;
    }
}
  • 启动类中注入RestTemplate并开启负载均衡

@MapperScan("com.lzb.order.mapper")
@SpringBootApplication
@EnableEurekaClient
public class OrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }

    /**
     * RestTemplate bean容器的注入
     * LoadBalanced 负载均衡注解
     * @return
     */
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

}
2.3 搭建user-service服务
  • pom依赖



      org.springframework.boot
      spring-boot-starter-web



      org.springframework.cloud
      spring-cloud-starter-netflix-eureka-client


  • application.yml

# server port
server:
  port: 18080

# name
spring:
  application:
    name: user-service

# eureka server
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:18888/eureka

  • 模拟业务代码

@RestController
@RequestMapping("/user")
public class UserController {

    @Autowired
    private UserService userService;

    @GetMapping("/{id}")
    public User queryById(@PathVariable("id") Long id) {
        return userService.queryById(id);
    }
}

  • 启动类

@MapperScan("com.lzb.user.mapper")
@SpringBootApplication
@EnableEurekaClient
public class UserApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class, args);
    }
}

2.4 服务启动

在上述服务搭建之后,可以看出order-service服务调用了user-service服务,因此我将user-service服务集群部署,并且在order-service注入了RestTemplate且标注了LoadBalanced注解;启动顺序如下所示:

  • 启动EurekaServer

  • 启动user-service

  • 启动user-service2

  • 启动order-service

关于IDEA 如何集群启动某个服务,方式比较多,我这里介绍一种常用的方法,步骤如下:

  • 首先启动该服务,直至服务启动成功

  • 右键启动的服务,选择Copy Configuration

  • Edit Configuration中修改服务Name;传入端口参数,在Environment中的VM options键入-Dserver.port=xxxx;点击Apply;点击OK即可;

  • 启动服务,右上角选择刚刚编辑的服务信息,DEBUG启动即可。


  • 服务启动后Eureka Server中服务注册信息如下所示

2.5 测试结果

清空user-service和user-service2的控制台日志,在浏览器中请求四次order-service,order-service中会通过RestTemplate调用order-service,由于RestTemplate使用了LoadBlanced注解修饰,因此Ribbon托管了RestTemplate,在发起调用之前会解析服务名获取服务Ip和port,然后根据负载均衡策略选择服务进行调用!

可以在console打印的日志中看出,第一次请求大到了user-service,第二次请求打到了user-service1,第三次请求大到了user-service,第四次请求打到了user-service1

3、Ribbon如何实现负载均衡

可以试想一下,如果是你本人去实现一个Ribbon的功能你会怎么做?我想大家的思路应该都差不多如下:

  • 拦截Http请求

  • 解析请求中的服务名

  • 在Eureka Client拉取的Eureka Server中注册的可用服务信息中,根据服务名获取服务IP和Port信息

  • 根据负载均衡策略选择服务提供者发起http请求

3.1 拦截http请求

在springboot中常用的拦截器有三个:

  • org.springframework.web.servlet.HandlerInterceptor

  • org.springframework.http.client.ClientHttpRequestInterceptor

  • feign.RequestInterceptor

三者均是对http请求进行拦截,但是3个拦截器应用的项目不同,HandlerInterceptor主要是处理http servlet请求;ClientHttpRequestInterceptor主要是处理HttpTemplate请求或者Ribbon请求;RequestInterceptor用于处理Fegin请求,Fegin本质上是http请求;因此很明显,Ribbon实现的是ClientHttpRequestInterceptor拦截器。


3.2 解析请求中的服务名

org.springframework.http.client.ClientHttpRequestInterceptor接口中只有一个方法intercept(),其子类均会重写该方法org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor,在该方法入口处打上断点。


并且在浏览器中访问order-service,order-service中会使用RestTemplate请求user-service

此时可以看到request.getURI()得到的是http://user-service/user/4 通过final URI originalUri = request.getURI(); String serviceName = originalUri.getHost();解析获得服务名

3.3 根据服务名获取服务IP和Port信息

在org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor类中重写的intercept()方法,最后一行代码至关重要,this.requestFactory.createRequest(request, body, execution)为包装http请求,不是很重要,最终的是org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient类中execute()方法。


此处的serviceId即为服务名user-service,this.getLoadBalancer(serviceId);会根据服务名从eureka中解析中对应的服务地址和端口。


this.getLoadBalancer(serviceId)方法调用了org.springframework.cloud.netflix.ribbon.SpringClientFactory类中的getLoadBalancer()方法,随后调用了org.springframework.cloud.netflix.ribbon.SpringClientFactory.getInstance()方法,之后调用了其父类org.springframework.cloud.context.named.NamedContextFactory.getInstance()方法,最终返回org.springframework.context.annotation.AnnotationConfigApplicationContext,可以看到其实获取的是spring 容器中的ILoadBalancer.class实现类com.netflix.loadbalancer.DynamicServerListLoadBalancer实例。


那现在还有最后一个问题,DynamicServerListLoadBalancer实例中的服务信息是怎么来的呢?这里其实是Eureka Clinet从Eureka Server中拉取的服务列表。


3.4 根据负载均衡策略发起http请求

最后一步就是根据负载均衡策略选择服务提供者发起http请求,负载均衡策略的选择在com.netflix.loadbalancer.ZoneAwareLoadBalancer的chooseServer()方法中实现。


在选择发起请求的服务之后执行org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient中的execute()方法即完成整个Ribbon负载均衡过程。


4、简单源码解析

在Ribbon整个源码体系中,ILoadBalancer接口的类关系图十分重要,因此源码解析也会根据这张图的类关系图来。


4.1 ILoadBalancer

com.netflix.loadbalancer.ILoadBalancer是一个顶层接口类,该类中定义了几个未实现的方法,具体实现在子类中完成。


方法作用如下所示:

方法名作用
addServers1、服务器列表初始化
2、添加新的服务
chooseServer从负载均衡器中选择服务器
markServerDown负载均衡客户端主动通知下机,否则不可用的服务将会存活到下一个ping周期
getServerList@Deprecated
getReachableServers获取能正常访问的服务器
getAllServers获取所有已知的服务器,包括可访问的和不可访问的
4.2 AbstractLoadBalancer

com.netflix.loadbalancer.AbstractLoadBalancer是一个抽象类,它实现了com.netflix.loadbalancer.ILoadBalancer接口;其源码非常少,如下所示:

public abstract class AbstractLoadBalancer implements ILoadBalancer {
    
    public enum ServerGroup{
        ALL,
        STATUS_UP,
        STATUS_NOT_UP        
    }

    public Server chooseServer() {
     return chooseServer(null);
    }

    public abstract List getServerList(ServerGroup serverGroup);

    public abstract LoadBalancerStats getLoadBalancerStats();    
}

AbstractLoadBalancer抽象类中定义类一个ServerGroup内部枚举类,ServerGroup用于标志服务实例的分组类型:

  • ALL 表示所有服务

  • STATUS_UP 表示正常服务

  • STATUS_NOT_UP 表示下线服务

4.3 BaseLoadBalancer

com.netflix.loadbalancer.BaseLoadBalancer类继承了com.netflix.loadbalancer.AbstractLoadBalancer,BaseLoadBalancer类源码比较复杂,但是有几个点是比较重要的。


  • allServerList 用于保存所有服务实例

  • upServerList用于保存所有在线服务实例

@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List allServerList = Collections
        .synchronizedList(new ArrayList());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List upServerList = Collections
        .synchronizedList(new ArrayList());

  • 定义负载均衡默认策略为轮询

private final static IRule DEFAULT_RULE = new RoundRobinRule(); 
protected IRule rule = DEFAULT_RULE;

  • IPingStrategy表示服务检查策略,用于检查服务是否可用;默认的服务检查策略为SerialPingStrategy,SerialPingStrategy中的pingServers方法就是遍历所有服务实例,一个个发送ping请求,查看服务是否有效。


private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;

  • BaseLoadBalancer构造函数中启动了一个PingTask,PingTask每隔10秒钟会ping一次服务列表中的服务是否可用,PingTask中干的事情就是pingStrategy服务检查策略。


protected int pingIntervalSeconds = 10;

public BaseLoadBalancer() {
    this.name = DEFAULT_NAME;
    this.ping = null;
    setRule(DEFAULT_RULE);
    setupPingTask();
    lbStats = new LoadBalancerStats(DEFAULT_NAME);
}

void setupPingTask() {
    if (canSkipPing()) {
        return;
    }
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
            true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}

4.4 DynamicServerListLoadBalancer

com.netflix.loadbalancer.DynamicServerListLoadBalancer类继承了com.netflix.loadbalancer.BaseLoadBalancer,因此DynamicServerListLoadBalancer类主要是对BaseLoadBalancer类功能进行扩展,DynamicServerListLoadBalancer类源码比较复杂,但是有几个点是比较重要的。


  • serverListImpl是DynamicServerListLoadBalancer中声明的ServerList类型的变量,ServerList接口中定义了两个方法

volatile ServerList serverListImpl;

  • getInitialListOfServers方法用于获取所有初始化服务列表

  • getUpdatedListOfServers方法用于获取更新的服务实例列表

public interface ServerList {

    public List getInitialListOfServers();
    
    public List getUpdatedListOfServers();   

}

  • ServerList接口有5个实现类,DynamicServerListLoadBalancer默认实现是DomainExtractingServerList,但是DomainExtractingServerList构造函数中传入的是DiscoveryEnabledNIWSServerList(可以看我下面Debug的图),因此可以看出重点类其实是DiscoveryEnabledNIWSServerList

  • DiscoveryEnabledNIWSServerList类中一个比较重要的方法是obtainServersViaDiscovery方法,可以从名字看出这是通过注册中心获取服务列表,代码中可以看出依赖 EurekaClient从服务注册中心中获取具体的服务实例InstanceInfo

private List obtainServersViaDiscovery() {
        List serverList = new ArrayList();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList();
        }

        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
    }

  • DiscoveryEnabledNIWSServerList类中另一个比较重要点是定义了一个ServerListUpdater.UpdateAction更新器,该更新器用于更新服务信息。


    ServerListUpdater提供两个实现类com.netflix.niws.loadbalancer.EurekaNotificationServerListUpdater和com.netflix.loadbalancer.PollingServerListUpdater;其中EurekaNotificationServerListUpdater通过Eureka的事件监听机制来更新服务信息;而此处默认的是PollingServerListUpdater定时任务更新机制。


  • PollingServerListUpdater代码中可以看出定时任务延迟启动initialDelayMs为1秒,刷新频率refreshIntervalMs为30秒

private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;  

public PollingServerListUpdater() {
    this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
}

public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
    this.initialDelayMs = initialDelayMs;
    this.refreshIntervalMs = refreshIntervalMs;
}

public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
    } else {
        logger.info("Already active, no-op");
    }
}

  • 在DynamicServerListLoadBalancer定义了一个变量ServerListFilter,可以看到在updateListOfServers方法中,会判断filter是否为空,然后对getUpdatedListOfServers获取到的服务列表servers执行getFilteredListOfServers方法,其实就是对服务列表根据ServerListFilter接口的实现类逻辑进行过滤。


volatile ServerListFilter filter;

public void updateListOfServers() {
    List servers = new ArrayList();
    if (serverListImpl != null) {
        servers = serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                getIdentifier(), servers);

        if (filter != null) {
            servers = filter.getFilteredListOfServers(servers);
            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);
        }
    }
    updateAllServerList(servers);
}

  • ServerListFilter的实现类如下所示,默认的实现类是DefaultNIWSServerListFilter,但是DefaultNIWSServerListFilter啥也没有,仅仅继承了ZoneAffinityServerListFilter;因此具体的功能还是在ZoneAffinityServerListFilter中实现,而ZoneAffinityServerListFilter主要提供的是对服务提供者所处的Zone和服务消费者所在的Zone进行比较,过滤掉不在一个Zone的实例。


4.5 ZoneAwareLoadBalancer

com.netflix.loadbalancer.ZoneAwareLoadBalancer是com.netflix.loadbalancer.DynamicServerListLoadBalancer的唯一子类,在DynamicServerListLoadBalancer中还有一个非常重要的方法没有实现,那就是chooseServer方法。


chooseServer用于负载均衡器选择服务器进行调用,因此ZoneAwareLoadBalancer的出现就是解决这个问题。


此外ZoneAwareLoadBalancer重写了setServerListForZones方法,setServerListForZones方法getLoadBalancer(zone)用于创建负载均衡器; existingLBEntry.getValue().setServersList(Collections.emptyList())用于清除不包含server的zone

protected void setServerListForZones(Map> zoneServersMap) {
    super.setServerListForZones(zoneServersMap);
    if (balancers == null) {
        balancers = new ConcurrentHashMap();
    }
    for (Map.Entry> entry: zoneServersMap.entrySet()) {
        String zone = entry.getKey().toLowerCase();
        getLoadBalancer(zone).setServersList(entry.getValue());
    }
    // check if there is any zone that no longer has a server
    // and set the list to empty so that the zone related metrics does not
    // contain stale data
    for (Map.Entry existingLBEntry: balancers.entrySet()) {
        if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
            existingLBEntry.getValue().setServersList(Collections.emptyList());
        }
    }
} 

 👇🏻 关注公众号 我们一起进大厂👇🏻     

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/563459.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-02
下一篇 2022-04-02

发表评论

登录后才能评论

评论列表(0条)

保存