网站建设怎么下载代码,百度快照官网登录,ppt模板大全免费简约,换ip对网站有影响吗1 Directory目录概述
Directory代表多个invoker#xff0c;其内部维护了一个list#xff0c;并且这个list的内容是动态变化的#xff08;对于消费端来说#xff0c;每个invoker代表一个服务提供者#xff09;。
在Dubbo中#xff0c;RegistryDirectory和StaticDirector…
1 Directory目录概述
Directory代表多个invoker其内部维护了一个list并且这个list的内容是动态变化的对于消费端来说每个invoker代表一个服务提供者。
在Dubbo中RegistryDirectory和StaticDirectory都是Directory的实现类。
RegistryDirectory是一个动态服务目录可以感知注册中心配置的变化其持有的Invoker列表会随着注册中心内容的变化而变化。每次变化后RegistryDirectory都会动态地增删Invoker并调用Router的route方法进行路由过滤掉不符合路由规则的Invoker。相反StaticDirectory是一个静态服务目录它内部存放的Invoker是不会变动的。
RegistryDirectory是Dubbo中默认使用的Directory适用于服务提供者和消费者都动态变化的情况。而StaticDirectory适用于服务提供者和消费者相对固定不需要频繁变动的场景。 2 RegistryDirectory的创建
RegistryDirectory是在服务消费端启动时创建的。
消费端启动时通过 ReferenceConfig#get() 创建对服务提供方的远程调用代理类。最终在通过RegistryProtocol#refer() 创建invoker时创建了RegistryDirectory。具体实现细节如下所示。 public T get(boolean check) {// ...return ref;} protected synchronized void init(boolean check) {// ...// 创建对服务提供方的远程调用代理类ref createProxy(referenceParameters);// ...}private T createProxy(MapString, String referenceParameters) {// ...// 创建invokercreateInvoker();// ...// create service proxyreturn (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));}private void createInvoker() {if (urls.size() 1) {URL curUrl urls.get(0);invoker protocolSPI.refer(interfaceClass, curUrl);// ...} else {ListInvoker? invokers new ArrayList();URL registryUrl null;for (URL url : urls) {invokers.add(protocolSPI.refer(interfaceClass, url));if (UrlUtils.isRegistry(url)) {// use last registry urlregistryUrl url;}}// ...}}
创建invoker的核心方法为
invoker protocolSPI.refer(interfaceClass, curUrl);
服务注册和发现使用是register协议因此上述方法实际上将调用 RegistryProtocol#refer方法实现如下所示。 public T InvokerT refer(ClassT type, URL url) throws RpcException {url getRegistryUrl(url);Registry registry getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// groupa,b or group*MapString, String qs (MapString, String) url.getAttribute(REFER_KEY);String group qs.get(GROUP_KEY);if (StringUtils.isNotEmpty(group)) {if ((COMMA_SPLIT_PATTERN.split(group)).length 1 || *.equals(group)) {return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);}}Cluster cluster Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));// 创建invokerreturn doRefer(cluster, registry, type, url, qs);}protected T InvokerT doRefer(Cluster cluster, Registry registry, ClassT type, URL url, MapString, String parameters) {// ...// 创建invokerClusterInvokerT migrationInvoker getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);return interceptInvoker(migrationInvoker, url, consumerUrl);}最终将调用 InterfaceCompatibleRegistryProtocol#getInvoker() 方法创建RegistryDirectory。
// InterfaceCompatibleRegistryProtocol.getInvoker
public T ClusterInvokerT getInvoker(Cluster cluster, Registry registry, ClassT type, URL url) {DynamicDirectoryT directory new RegistryDirectory(type, url);return doCreateInvoker(directory, cluster, registry, type);
}// RegistryProtocol#doCreateInvoker
protected T ClusterInvokerT doCreateInvoker(DynamicDirectoryT directory, Cluster cluster, Registry registry, ClassT type) {directory.setRegistry(registry);directory.setProtocol(protocol);// all attributes of REFER_KEYMapString, String parameters new HashMapString, String(directory.getConsumerUrl().getParameters());URL urlToRegistry new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);if (directory.isShouldRegister()) {directory.setRegisteredConsumerUrl(urlToRegistry);registry.register(directory.getRegisteredConsumerUrl());}// 1、建立路由规则链directory.buildRouterChain(urlToRegistry);// 2、订阅服务提供者地址生成invokerdirectory.subscribe(toSubscribeUrl(urlToRegistry));// 3、包装机器容错策略到invokerreturn (ClusterInvokerT) cluster.join(directory);
} 3 RegistryDirectory中invoker列表的更新
创建完RegistryDirectory后会调用subscribe()方法订阅需要调用的服务提供者的地址列表。主要操作如下。
1假设使用的服务注册中心为Zookeeper则会调用Zookeeper的subscribe()方法去Zookeeper订阅服务提供者的地址列表并且创建一个监听器。
2当Zookeeper服务端发现服务提供者的地址列表发生变化后zkClient会回调该监听器的notify()方法推送服务提供者的地址列表刷新RegistryDirectory中的invoker列表。
3服务消费端启动时则是创建完监听器后同步调用notify()方法刷新RegistryDirectory中的invoker列表。
具体实现细节如下所示。 // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
public void doSubscribe(final URL url, final NotifyListener listener) {try {checkDestroyed();if (ANY_VALUE.equals(url.getServiceInterface())) {// ...zkClient.create(root, false, true);ListString services zkClient.addChildListener(root, zkListener);// ...} else {CountDownLatch latch new CountDownLatch(1);try {ListURL urls new ArrayList();// 创建监听器for (String path : toCategoriesPath(url)) {ConcurrentMapNotifyListener, ChildListener listeners ConcurrentHashMapUtils.computeIfAbsent(zkListeners, url, k - new ConcurrentHashMap());ChildListener zkListener ConcurrentHashMapUtils.computeIfAbsent(listeners, listener, k - new RegistryChildListenerImpl(url, k, latch));if (zkListener instanceof RegistryChildListenerImpl) {((RegistryChildListenerImpl) zkListener).setLatch(latch);}// create directories.zkClient.create(path, false, true);// Add children (i.e. service items).ListString children zkClient.addChildListener(path, zkListener);if (children ! null) {// The invocation point that may cause 1-1.urls.addAll(toUrlsWithEmpty(url, path, children));}}// 回调方法notify(url, listener, urls);} finally {// tells the listener to run only after the sync notification of main thread finishes.latch.countDown();}}} catch (Throwable e) {throw new RpcException(Failed to subscribe url to zookeeper getUrl() , cause: e.getMessage(), e);}
}// org.apache.dubbo.registry.support.FailbackRegistry#notify
protected void notify(URL url, NotifyListener listener, ListURL urls) {if (url null) {throw new IllegalArgumentException(notify url null);}if (listener null) {throw new IllegalArgumentException(notify listener null);}try {doNotify(url, listener, urls);} catch (Exception t) {// Record a failed registration request to a failed listlogger.error(REGISTRY_FAILED_NOTIFY_EVENT, , , Failed to notify addresses for subscribe url , cause: t.getMessage(), t);}
}protected void doNotify(URL url, NotifyListener listener, ListURL urls) {super.notify(url, listener, urls);
}// org.apache.dubbo.registry.support.AbstractRegistry#notify
protected void notify(URL url, NotifyListener listener, ListURL urls) {// ...MapString, ListURL categoryNotified notified.computeIfAbsent(url, u - new ConcurrentHashMap());for (Map.EntryString, ListURL entry : result.entrySet()) {String category entry.getKey();ListURL categoryList entry.getValue();categoryNotified.put(category, categoryList);// 主要方法listener.notify(categoryList);if (localCacheEnabled) {saveProperties(url);}}
}// org.apache.dubbo.registry.integration.RegistryDirectory#notify
public synchronized void notify(ListURL urls) {// ...refreshOverrideAndInvoker(providerURLs);
}protected synchronized void refreshOverrideAndInvoker(ListURL urls) {// mock zookeeper://xxx?mockreturn nullthis.directoryUrl overrideWithConfigurator(getOriginalConsumerUrl());refreshInvoker(urls);
}刷新invoker列表缓存urlInvokerMap的最终实现细节如下所示
protected volatile MapURL, InvokerT urlInvokerMap;// RegistryDirectory#refreshInvoker
private void refreshInvoker(ListURL invokerUrls) {// ...// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().MapURL, InvokerT localUrlInvokerMap this.urlInvokerMap;// cant use local reference as oldUrlInvokerMaps mappings might be removed directly at toInvokers().MapURL, InvokerT oldUrlInvokerMap null;if (localUrlInvokerMap ! null) {// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.oldUrlInvokerMap new LinkedHashMap(Math.round(1 localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));localUrlInvokerMap.forEach(oldUrlInvokerMap::put);}// 刷新invoker列表缓存-urlInvokerMapMapURL, InvokerT newUrlInvokerMap toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map// ...
}private MapURL, InvokerT toInvokers(MapURL, InvokerT oldUrlInvokerMap, ListURL urls) {MapURL, InvokerT newUrlInvokerMap new ConcurrentHashMap(urls null ? 1 : (int) (urls.size() / 0.75f 1));if (urls null || urls.isEmpty()) {return newUrlInvokerMap;}String queryProtocols this.queryMap.get(PROTOCOL_KEY);for (URL providerUrl : urls) {if (!checkProtocolValid(queryProtocols, providerUrl)) {continue;}URL url mergeUrl(providerUrl);// Cache key is url that does not merge with consumer side parameters,// regardless of how the consumer combines parameters,// if the server url changes, then refer againInvokerT invoker oldUrlInvokerMap null ? null : oldUrlInvokerMap.remove(url);if (invoker null) { // Not in the cache, refer againtry {boolean enabled true;if (url.hasParameter(DISABLED_KEY)) {enabled !url.getParameter(DISABLED_KEY, false);} else {enabled url.getParameter(ENABLED_KEY, true);}if (enabled) {invoker protocol.refer(serviceType, url);}} catch (Throwable t) {// Thrown by AbstractProtocol.optimizeSerialization()if (t instanceof RpcException t.getMessage().contains(serialization optimizer)) {// 4-2 - serialization optimizer class initialization failed.logger.error(PROTOCOL_FAILED_INIT_SERIALIZATION_OPTIMIZER, typo in optimizer class, ,Failed to refer invoker for interface: serviceType ,url:( url ) t.getMessage(), t);} else {// 4-3 - Failed to refer invoker by other reason.logger.error(PROTOCOL_FAILED_REFER_INVOKER, , ,Failed to refer invoker for interface: serviceType ,url:( url ) t.getMessage(), t);}}if (invoker ! null) { // Put new invoker in cachenewUrlInvokerMap.put(url, invoker);}} else {newUrlInvokerMap.put(url, invoker);}}return newUrlInvokerMap;
}