springcloud-eureka服务端接收客户端全量更新的增量更新请求处理

springcloud-eureka服务端接收客户端全量更新的增量更新请求处理,第1张

springcloud-eureka服务端接收客户端全量更新增量更新请求处理

文章目录
  • 一、服端接收请求方法
    • 1、getContainers处理全量更新方法
    • 2、getContainerDifferential处理新量更新方法
    • 3、readonlyCacheMap 、readWriteCacheMap在哪里更新的
      • 3.1、定时任务更新readOnlyCacheMap
      • 3.2、readWriteCacheMap
      • 3.3、如果全量直接遍历registry返回实例列表
      • 3.4、如果增量直接遍历recentlyChangedQueue返回实例列表

一、服端接收请求方法

从客户端发送请求的路径可以找到,服务端接收全量和增量请求的方法如下:

1、getContainers处理全量更新方法

主要代码

//构造缓存key,不管全量还是增量 获取实例列表的信息 都是从缓存中获取的,只是构造的缓存key不同
Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS,
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
 );

 Response response;
 if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
 //从responseCache中获取实例列表 压缩后返回
     response = Response.ok(responseCache.getGZIP(cacheKey))
             .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
             .header(HEADER_CONTENT_TYPE, returnMediaType)
             .build();
 } else {
     response = Response.ok(responseCache.get(cacheKey))
             .build();
 }
 return response;
2、getContainerDifferential处理新量更新方法
//构造增量缓存key
Key cacheKey = new Key(Key.EntityType.Application,
      ResponseCacheImpl.ALL_APPS_DELTA,
       keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
if (acceptEncoding != null
         && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
     return Response.ok(responseCache.getGZIP(cacheKey))
             .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
             .header(HEADER_CONTENT_TYPE, returnMediaType)
             .build();
 } else {
     return Response.ok(responseCache.get(cacheKey))
             .build();
 }

从上面2个方法可以看出,不管是获取全量服务列、还是获取增量服务列表都是从responseCache.getGZIP这个方法

public byte[] getGZIP(Key key) {
        Value payload = getValue(key, shouldUseReadOnlyResponseCache);
        if (payload == null) {
            return null;
        }
        return payload.getGzipped();
    }

getValue:
//先从readOnlyCacheMap获取,如果不为空,那么再从readWriteCacheMap获取,然后再更新readOnlyCacheMap。eureka服务端也用了缓存。即使调用接口从服务端获取最新服务,也不一定是最新的。

Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        if (useReadOnlyCache) {
            final Value currentPayload = readOnlyCacheMap.get(key);
            if (currentPayload != null) {
                payload = currentPayload;
            } else {
                payload = readWriteCacheMap.get(key);
                readOnlyCacheMap.put(key, payload);
            }
        } else {
            payload = readWriteCacheMap.get(key);
        }
    } catch (Throwable t) {
        logger.error("Cannot get value for key :" + key, t);
    }
    return payload;
}

readonlyCacheMap 、readWriteCacheMap实际上就是map结构

 private final ConcurrentMap readonlyCacheMap = new ConcurrentHashMap();

    private final LoadingCache readWriteCacheMap;
3、readonlyCacheMap 、readWriteCacheMap在哪里更新的 3.1、定时任务更新readonlyCacheMap

//遍历所有key,从readWriteCacheMap中更新到readOnlyCacheMap中
for (Key key : readOnlyCacheMap.keySet()) {
      try {
          CurrentRequestVersion.set(key.getVersion());
          
          Value cachevalue = readWriteCacheMap.get(key);
          Value currentCachevalue = readOnlyCacheMap.get(key);
          if (cachevalue != currentCachevalue) {
              readOnlyCacheMap.put(key, cachevalue);
          }
      } catch (Throwable th) {
          logger.error("Error while updating the client cache from response cache", th);
      }
  }
3.2、readWriteCacheMap
//初始化时构造缓存:
  Value value = generatePayload(key);

关键代码:

//如果是全量,调用getApplicationsFromMultipleRegions构造缓存
 if (ALL_APPS.equals(key.getName())) {
    if (isRemoteRegionRequested) {
         tracer = serializeAllAppsWithRemoteRegionTimer.start();
         payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
     } else {
         tracer = serializeAllAppsTimer.start();
         payload = getPayLoad(key, registry.getApplications());
     }
     //如果增量调用getApplicationDeltasFromMultipleRegions 构造缓存
 } else if (ALL_APPS_DELTA.equals(key.getName())) {
     if (isRemoteRegionRequested) {
         tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
         versionDeltaWithRegions.incrementAndGet();
         versionDeltaWithRegionsLegacy.incrementAndGet();
         payload = getPayLoad(key,
                 registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
     } else {
         tracer = serializeDeltaAppsTimer.start();
         versionDelta.incrementAndGet();
         versionDeltaLegacy.incrementAndGet();
         payload = getPayLoad(key, registry.getApplicationDeltas());
     }
 }
3.3、如果全量直接遍历registry返回实例列表
for (Entry>> entry : registry.entrySet()) {
    Application app = null;

    if (entry.getValue() != null) {
        for (Entry> stringLeaseEntry : entry.getValue().entrySet()) {
            Lease lease = stringLeaseEntry.getValue();
            if (app == null) {
                app = new Application(lease.getHolder().getAppName());
            }
            app.addInstance(decorateInstanceInfo(lease));
        }
    }
    if (app != null) {
        apps.addApplication(app);
    }
}
3.4、如果增量直接遍历recentlyChangedQueue返回实例列表
Iterator iter = this.recentlyChangedQueue.iterator();
while (iter.hasNext()) {
   Lease lease = iter.next().getLeaseInfo();
   InstanceInfo instanceInfo = lease.getHolder();
   Object[] args = {instanceInfo.getId(),
           instanceInfo.getStatus().name(),
           instanceInfo.getActionType().name()};
   logger.debug("The instance id %s is found with status %s and actiontype %s", args);
   Application app = applicationInstancesMap.get(instanceInfo.getAppName());
   if (app == null) {
       app = new Application(instanceInfo.getAppName());
       applicationInstancesMap.put(instanceInfo.getAppName(), app);
       apps.addApplication(app);
   }
   app.addInstance(decorateInstanceInfo(lease));
}

recentlyChangedQueue:它是一个ConcurrentlinkedQueue,通过定时任务 默认180过期数据

private ConcurrentlinkedQueue recentlyChangedQueue = new ConcurrentlinkedQueue();


private TimerTask getDeltaRetentionTask() {
  return new TimerTask() {
      @Override
      public void run() {
          Iterator it = recentlyChangedQueue.iterator();
          while (it.hasNext()) {
              if (it.next().getLastUpdateTime() <
                      System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                   //数据过期,那么移除getRetentionTimeInMSInDeltaQueue:默认180s
                  it.remove();
              } else {
                  break;
              }
          }
      }

  };
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5697269.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存