在应用启动后,节点们将会向Eureka Server发送心跳,默认周期为30秒,如果Eureka Server在多个心跳周期内没有接收到某个节点的心跳,Eureka Server将会从服务注册表中把这个服务节点移除(默认90秒)。
Eureka Server在运行期间会去统计心跳成功的比例在15分钟之内是否低于85% , 如果低于85%, Eureka Server会认为当前实例的客户端与自己的心跳连接出现了网络故障,那么Eureka Server会把这些实例保护起来,让这些实例不会过期导致实例剔除。这样做就是为了防止EurekaClient可以正常运行, 但是与EurekaServer网络不通情况下, EurekaSerJer不会立刻将EurekaClient服务剔除
这样做的目的是为了减少网络不稳定或者网络分区的情况下,Eureka Server将健康服务剔除下线的问题。 使用自我保护机制可以使得Eureka 集群更加健壮和稳定的运行。进入自我保护状态后,会出现以下几种情况:Eureka Server不再从注册列表中移除因为长时间没有收到心跳而应该剔除的过期服务;Eureka Server仍然能够接受新服务的注册和查询请求,但是不会被同步到其他节点上,保证当前节点依然可用。
自我保护模式:默认情况下,如果EurekaServer在一定时间内没有接收到某个微服务实例的心跳,EurekaServer将会注销该实例 (默认90秒)。但是当网络分区故障发生(延时、卡顿、拥挤)时,微服务与EurekaServer之间无法正常通信,以上行为可能变得非常危险了——因为微服务本身其实是健康的,此时本不应该注销这个微服务。Eureka通过" 自我保护模式”来解决这个问题——当EurekaServer节点在短时间内丢失过多客户端时(可能发生了网络分区故障),那么这个节点就会进入自我保护模式。
一旦进入该模式,Eureka Server就会保护服务注册表中的信息,不再删除服务注册表中的数据(也就是不会注销任何微服务)。当网络故障恢复后,该Eureka Server节点会自动退出自我保护模式。综上,自我保护模式是一种应对网络异常的安全保护措施。它的架构哲学是宁可同时保留所有微服务(健康的微服务和不健康的微服务都会保留),也不盲目注销任何健康的微服务。使用自我保护模式,可以让Eureka集群更加的健壮、稳定。在Spring Cloud中,可以使用eureka.server.enable-self-preservation = false 禁用自我保护模式。
开启自我保护机制:通过配置将判定时间改为10s,接着启动Eureka Server,等待10s之后,就会出现以上提示信息,表示自我保护被激活了。
# 设置 eureka server同步失败的等待时间默认5分钟
#在这期间,它不向客户端提供服务注册信息
eureka.server.wait-time-in-ms-when-sync-empty=10000
重要的变量
在Eureka的自我保护机制中,有两个很重要的变量,Eureka的自我保护机制,都是围绕这两个变量来实现的,在AbstractInstanceRegistry这个类中定义的:
**numberOfRenewsPerMinThreshold**
protected volatile int numberOfRenewsPerMinThreshold;
//每分钟最小续约数量就是Eureka Server期望每分钟收到客户端实例续约的总数的阈值。如果小于这个阈值,就会触发自我保护机制。
它的赋值方法:
protected void updateRenewsPerMinThreshold() { this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfClientsSendingRenews * (60.0D / (double)this.serverConfig.getExpectedClientRenewalIntervalSeconds()) * this.serverConfig.getRenewalPercentThreshold()); }
getExpectedClientRenewalIntervalSeconds,客户端的续约间隔,默认为30s;
getRenewalPercentThreshold,自我保护续约百分比阈值因子,默认0.85,即每分钟的续约数量要大于85%。
需要注意的是,这两个变量是动态更新的,有四个地方来更新这两个值。
# Eureka-Server的初始化
EurekaBootstrap类中的initEurekaServerContext方法会对Eureka-Server进行初始化:
protected void initEurekaServerContext() throws Exception { EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig(); //... registry.openForTraffic(applicationInfoManager, registryCount); }
PeerAwareInstanceRegistryImpl类中的openForTraffic方法:
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. this.expectedNumberOfClientsSendingRenews = count; //初始化 this.updateRenewsPerMinThreshold(); //更新numberOfRenewsPerMinThreshold logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", this.numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyonStartup = false; } Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && this.serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); this.primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); super.postInit(); }
# 服务的主动下线
PeerAwareInstanceRegistryImpl类中的cancel方法:
当服务提供者主动下线时,表示这个时候Eureka-Server要剔除这个服务提供者的地址,同时也代表这这个心跳续约的阈值要发生变化。所以在PeerAwareInstanceRegistryImpl.cancel 中可以看到数据的更新
调用路径
PeerAwareInstanceRegistryImpl.cancel -> AbstractInstanceRegistry.cancel->internalCancel
服务下线之后,意味着需要发送续约的客户端数量递减了,所以在这里进行修改
protected boolean internalCancel(String appName, String id, boolean isReplication) { //.... synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); } } }
# 服务的注册
PeerAwareInstanceRegistryImpl类中的register方法:
当有新的服务提供者注册到eureka-server上时,需要增加续约的客户端数量,所以在register方法中会进行处理
register ->super.register(AbstractInstanceRegistry)
public void register(InstanceInfo info, boolean isReplication) { int leaseDuration = 90; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication); }
父类AbstractInstanceRegistry中的register方法:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { //.... // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } }
PeerAwareInstanceRegistryImpl类中的scheduleRenewalThresholdUpdateTask方法(更新心跳):
15分钟运行一次,判断在15分钟之内心跳失败比例是否低于85%。在
DefaultEurekaServerContext -> @PostConstruct修饰的initialize()方法 -> init()
private void scheduleRenewalThresholdUpdateTask() { this.timer.schedule(new TimerTask() { public void run() { PeerAwareInstanceRegistryImpl.this.updateRenewalThreshold(); } }, (long)this.serverConfig.getRenewalThresholdUpdateIntervalMs(), (long)this.serverConfig.getRenewalThresholdUpdateIntervalMs()); }
private void updateRenewalThreshold() { try { Applications apps = this.eurekaClient.getApplications(); int count = 0; Iterator var3 = apps.getRegisteredApplications().iterator(); while(var3.hasNext()) { Application app = (Application)var3.next(); Iterator var5 = app.getInstances().iterator(); while(var5.hasNext()) { InstanceInfo instance = (InstanceInfo)var5.next(); if (this.isRegisterable(instance)) { ++count; } } } // Update threshold only if the threshold is greater than the // current expected threshold or if self preservation is disabled. synchronized(this.lock) { if ((double)count > this.serverConfig.getRenewalPercentThreshold() * (double)this.expectedNumberOfClientsSendingRenews || !this.isSelfPreservationModeEnabled()) { this.expectedNumberOfClientsSendingRenews = count; this.updateRenewsPerMinThreshold(); } } logger.info("Current renewal threshold is : {}", this.numberOfRenewsPerMinThreshold); } catch (Throwable var9) { logger.error("Cannot update renewal threshold", var9); } }
# 自我保护机制触发任务
在AbstractInstanceRegistry的postInit方法中,会开启一个EvictionTask的任务,这个任务用来检测是否需要开启自我保护机制:
protected void postInit() { this.renewsLastMin.start(); if (this.evictionTaskRef.get() != null) { ((AbstractInstanceRegistry.EvictionTask)this.evictionTaskRef.get()).cancel(); } this.evictionTaskRef.set(new AbstractInstanceRegistry.EvictionTask()); this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(), this.serverConfig.getEvictionIntervalTimerInMs(), this.serverConfig.getEvictionIntervalTimerInMs()); }
其中,EvictionTask表示最终执行的任务:
class EvictionTask extends TimerTask { private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L); EvictionTask() { } public void run() { try { long compensationTimeMs = this.getCompensationTimeMs(); AbstractInstanceRegistry.logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); AbstractInstanceRegistry.this.evict(compensationTimeMs); } catch (Throwable var3) { AbstractInstanceRegistry.logger.error("Could not run the evict task", var3); } } long getCompensationTimeMs() { long currNanos = this.getCurrentTimeNano(); long lastNanos = this.lastExecutionNanosRef.getAndSet(currNanos); if (lastNanos == 0L) { return 0L; } else { long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos); long compensationTime = elapsedMs - AbstractInstanceRegistry.this.serverConfig.getEvictionIntervalTimerInMs(); return compensationTime <= 0L ? 0L : compensationTime; } } long getCurrentTimeNano() { return System.nanoTime(); } } @Override public void run() { try { long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms",compensationTimeMs); evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } }
evict
public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); // 是否需要开启自我保护机制,如果需要,那么直接return,不需要继续往下执行了 if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; }
//下面主要是做服务自动下线的 *** 作的
isLeaseExpirationEnabled方法:判断是否开启了自我保护机制,如果没有,则跳过,默认是开启。计算是否需要开启自我保护,判断最后一分钟收到的续约数量是否大于numberOfRenewsPerMinThreshold
public boolean isLeaseExpirationEnabled() { if (!isSelfPreservationModeEnabled()) { // The self preservation mode is disabled, hence allowing the instances to expire. return true; } return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)