- 一、服端接收请求方法
- 1、getContainers处理全量更新方法
- 2、getContainerDifferential处理新量更新方法
- 3、readonlyCacheMap 、readWriteCacheMap在哪里更新的
- 3.1、定时任务更新readOnlyCacheMap
- 3.2、readWriteCacheMap
- 3.3、如果全量直接遍历registry返回实例列表
- 3.4、如果增量直接遍历recentlyChangedQueue返回实例列表
从客户端发送请求的路径可以找到,服务端接收全量和增量请求的方法如下:
主要代码
//构造缓存key,不管全量还是增量 获取实例列表的信息 都是从缓存中获取的,只是构造的缓存key不同 Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { //从responseCache中获取实例列表 压缩后返回 response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { response = Response.ok(responseCache.get(cacheKey)) .build(); } return response;2、getContainerDifferential处理新量更新方法
//构造增量缓存key Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS_DELTA, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { return Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { return Response.ok(responseCache.get(cacheKey)) .build(); }
从上面2个方法可以看出,不管是获取全量服务列、还是获取增量服务列表都是从responseCache.getGZIP这个方法
public byte[] getGZIP(Key key) { Value payload = getValue(key, shouldUseReadOnlyResponseCache); if (payload == null) { return null; } return payload.getGzipped(); }
getValue:
//先从readOnlyCacheMap获取,如果不为空,那么再从readWriteCacheMap获取,然后再更新readOnlyCacheMap。eureka服务端也用了缓存。即使调用接口从服务端获取最新服务,也不一定是最新的。
Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key :" + key, t); } return payload; }
readonlyCacheMap 、readWriteCacheMap实际上就是map结构
private final ConcurrentMap3、readonlyCacheMap 、readWriteCacheMap在哪里更新的 3.1、定时任务更新readonlyCacheMapreadonlyCacheMap = new ConcurrentHashMap (); private final LoadingCache readWriteCacheMap;
//遍历所有key,从readWriteCacheMap中更新到readOnlyCacheMap中 for (Key key : readOnlyCacheMap.keySet()) { try { CurrentRequestVersion.set(key.getVersion()); Value cachevalue = readWriteCacheMap.get(key); Value currentCachevalue = readOnlyCacheMap.get(key); if (cachevalue != currentCachevalue) { readOnlyCacheMap.put(key, cachevalue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache", th); } }3.2、readWriteCacheMap
//初始化时构造缓存: Value value = generatePayload(key);
关键代码:
//如果是全量,调用getApplicationsFromMultipleRegions构造缓存 if (ALL_APPS.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeAllAppsWithRemoteRegionTimer.start(); payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); } else { tracer = serializeAllAppsTimer.start(); payload = getPayLoad(key, registry.getApplications()); } //如果增量调用getApplicationDeltasFromMultipleRegions 构造缓存 } else if (ALL_APPS_DELTA.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); versionDeltaWithRegions.incrementAndGet(); versionDeltaWithRegionsLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); } else { tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltas()); } }3.3、如果全量直接遍历registry返回实例列表
for (Entry3.4、如果增量直接遍历recentlyChangedQueue返回实例列表>> entry : registry.entrySet()) { Application app = null; if (entry.getValue() != null) { for (Entry > stringLeaseEntry : entry.getValue().entrySet()) { Lease lease = stringLeaseEntry.getValue(); if (app == null) { app = new Application(lease.getHolder().getAppName()); } app.addInstance(decorateInstanceInfo(lease)); } } if (app != null) { apps.addApplication(app); } }
Iteratoriter = this.recentlyChangedQueue.iterator(); while (iter.hasNext()) { Lease lease = iter.next().getLeaseInfo(); InstanceInfo instanceInfo = lease.getHolder(); Object[] args = {instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name()}; logger.debug("The instance id %s is found with status %s and actiontype %s", args); Application app = applicationInstancesMap.get(instanceInfo.getAppName()); if (app == null) { app = new Application(instanceInfo.getAppName()); applicationInstancesMap.put(instanceInfo.getAppName(), app); apps.addApplication(app); } app.addInstance(decorateInstanceInfo(lease)); }
recentlyChangedQueue:它是一个ConcurrentlinkedQueue,通过定时任务 默认180过期数据
private ConcurrentlinkedQueuerecentlyChangedQueue = new ConcurrentlinkedQueue (); private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { Iterator it = recentlyChangedQueue.iterator(); while (it.hasNext()) { if (it.next().getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { //数据过期,那么移除getRetentionTimeInMSInDeltaQueue:默认180s it.remove(); } else { break; } } } }; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)