西安市建设局网站,wordpress 全文,高端建筑铝型材,网站建设策划案目录
一、Nacos客户端服务订阅的事件机制
1、监听事件的注册
2、ServiceInfo处理
serviceInfoHolder.processServiceInfo 一、Nacos客户端服务订阅的事件机制
Nacos客户端订阅的核心流程#xff1a;Nacos客户端通过一个定时任务#xff0c;每6秒从注册中心获取实例列表Nacos客户端通过一个定时任务每6秒从注册中心获取实例列表当发现实例发生变化时发布变更事件订阅者进行业务处理然后更新内存中和本地的缓存中的实例。 在第一步调用subscribe方法时会订阅一个EventListener事件。而在定时任务UpdateTask定时获取实例列表之后会调用ServiceInfoHolder.processServiceInfo方法对ServiceInfo进行本地处理这其中就包括和事件处理。
1、监听事件的注册
在subscribe方法中通过了下面的源码进行了监听事件的注册
public class NacosNamingService implements NamingService {
...Overridepublic void subscribe(String serviceName, String groupName, ListString clusters, EventListener listener)throws NacosException {if (null listener) {return;}String clusterString StringUtils.join(clusters, ,);changeNotifier.registerListener(groupName, serviceName, clusterString, listener);clientProxy.subscribe(serviceName, groupName, clusterString);} 在这其中我们主要要关注的就是changeNotifier.registerListener此监听就是进行具体事件注册逻辑的我们来看一下源码 可以看出事件的注册便是将EventListener存储在InstancesChangeNotifier的listenerMap属性当中了。同时这里的数据结构为ConcurrentHashMapkey为服务实例的信息的拼接value为监听事件的集合。
public class InstancesChangeNotifier extends SubscriberInstancesChangeEvent {private final String eventScope;private final MapString, ConcurrentHashSetEventListener listenerMap new ConcurrentHashMap();
...public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {String key ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);ConcurrentHashSetEventListener eventListeners listenerMap.computeIfAbsent(key, keyInner - new ConcurrentHashSet());eventListeners.add(listener);}
2、ServiceInfo处理 上面的源码中已经完成了事件的注册现在就来追溯触发事件的来源UpdateTask中获取到最新的实例会进行本地化处理部分源码如下
// ServiceInfoUpdateServiceUpdateTaskrun()
ServiceInfo serviceObj serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (serviceObj null) {serviceObj namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 本地缓存处理serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime serviceObj.getLastRefTime();return;
}
serviceInfoHolder.processServiceInfo
这个逻辑简单来说判断新的ServiceInfo数据是否正确是否发生了变化。如果数据格式正确且发生变化那就发布一个InstancesChangeEvent事件同时将ServiceInfo写入本地缓存。
public class ServiceInfoHolder implements Closeable {
...public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey serviceInfo.getKey();if (serviceKey null) {return null;}ServiceInfo oldService serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {//empty or error push, just ignorereturn oldService;}// 缓存服务信息serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 判断注册的实例信息是否已变更boolean changed isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}// 监控服务监控缓存Map的大小MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());// 服务实例已更变if (changed) {NAMING_LOGGER.info(current ips:({}) service: {} - {}, serviceInfo.ipCount(), serviceInfo.getKey(),JacksonUtils.toJson(serviceInfo.getHosts()));// 添加实例变更事件会被订阅者执行NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));// 记录Service本地文件DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;}
分析到这里我们发现其实这个重点应该在服务信息变更之后发布的InstancesChangeEvent事件这个事件是NotifyCenter进行发布的我们来追踪一下源码 Spring Cloud Alibaba - Nacos 干我们这行啥时候懈怠就意味着长进的停止长进的停止就意味着被淘汰只能往前冲直到凤凰涅槃的一天