什么网站做简历最好,电销名单渠道在哪里找,网站后台有安全狗,做网站不错的公司无论是生产者还是消费者#xff0c;在底层都要和Broker打交道#xff0c;进行消息收发。在源码层面#xff0c;底层的功能被抽象成同一个类#xff0c;负责和Broker打交道#xff0c;下面详细介绍这个类的情况。
1 MQClientInstance类的创建规则
MQClientInstance是客户… 无论是生产者还是消费者在底层都要和Broker打交道进行消息收发。在源码层面底层的功能被抽象成同一个类负责和Broker打交道下面详细介绍这个类的情况。
1 MQClientInstance类的创建规则
MQClientInstance是客户端各种类型的Consumer和Producer的底层类。这个类首先从NameServer获取并保存各种配置信息比如Topic的Route信息。同时MQClientInstance还会通过MQClientAPIImpl类实现消息的收发也就是从Broker获取消息或者发送消息到Broker。
既然MQClientInstance实现的是底层通信功能和获取并保存元数据的功能就没必要每个Consumer或Producer都创建一个对象一个MQClientInstance对象可以被多个Consumer或Producer公用。RocketMQ通过一个工厂类达到共用MQClientInstance的目的。MQClientInstance的创建如代码清单11-12所示。
代码清单11-12 创建MQClientInstance MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); 注意MQClientInstance是通过工厂类被创建的并不是一个单例模式有些情况下需要创建多个实例。首先来看看MQClientInstance的创建规则如代码清单11-13所示。
代码清单11-13 MQClientInstance创建规则 public MQClientInstance getAndCreateMQClientInstance( final ClientConfig clientConfig, RPCHook rpcHook) { String clientId clientConfig.buildMQClientId(); MQClientInstance instance this.factoryTable.get(clientId); if (null instance) { instance new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev this.factoryTable.putIfAbsent(clientId, instance); if (prev ! null) { instance prev; log.warn(Returned Previous MQClientInstance for clientId:[{}], clientId); } else { log.info(Created new MQClientInstance for clientId:[{}], clientId); } } return instance; } 系统中维护了ConcurrentMapString/*clientId*/MQClientInstancefactoryTable这个Map对象每创建一个新的MQClientInstance都会以clientId作为Key放入Map结构中。clientId的格式是“clientIp”“InstanceName”其中clientIp是客户端机器的IP地址一般不会变instancename有默认值也可以被手动设置。
普通情况下一个用到RocketMQ客户端的Java程序或者说一个JVM进程只要有一个MQClientInstance实例就够了。这时候创建一个或多个Consumer或者Producer底层使用的是同一个MQClientInstance实例。
在quick start文档中创建一个DefaultMQPushConsumer来接收消息没有设置这个Consumer的InstanceName参数通过setInstanceName函数进行设置这个时候InstanceName的值是默认的“DEFAULT”。实际创建的MQClientInstance个数由设定的逻辑进行控制。InstanceName的生成逻辑如代码清单11-14所示。
代码清单11-14 InstanceName生成逻辑 if (this.defaultMQPushConsumer.getMessageModel() MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } public void changeInstanceNameToPID() { if (this.instanceName.equals(DEFAULT)) { this.instanceName String.valueOf(UtilAll.getPid()); } } 从InstanceName的创建逻辑就可以看出如果创建Consumer或者Producer类型的时候不手动指定InstanceName进程中只会有一个MQClientInstance对象。
有些情况下只有一个MQClientInstance对象是不够的比如一个Java程序需要连接两个RoceketMQ集群从一个集群读取消息发送到另一个集群一个MQClientInstance对象无法支持这种场景。这种情况下一定要手动指定不同的InstanceName底层会创建两个MQClientInstance对象。
2 MQClientInstance类的功能
首先来看一下MQClientInstance类的Start函数从Start函数中的逻辑能大致了解MQClientInstance类的功能如代码清单11-15所示。
代码清单11-15 MQClientInstance类Start函数 public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState ServiceState.START_FAILED; // If not specified,looking address from name server if (null this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start (false); log.info(the client factory [{}] start OK, this.clientId); this.serviceState ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException(The Factory object[ this.getClientId() ] has been created before, and failed., null); default: break; } } } Start函数中的MQClientAPIImpl对象用来负责底层消息通信然后启动pullMessageService和rebalanceService。在类的成员变量中用topicRouteTable、brokerAddrTable等来存储从NameServer中获得的集群状态信息并通过一个ScheduledTask来维护这些信息。MQClientInstance中定时执行的任务如代码清单11-16所示。
代码清单11-16 MQClientInstance中定时执行的任务 private void startScheduledTask() { if (null this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { Override public void run() { try { MQClientInstance.this.mQClientAPIImpl .fetchNameServerAddr(); } catch (Exception e) { log.error(ScheduledTask fetchNameServerAddr exception, e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error(ScheduledTask updateTopicRouteInfoFromNameServer exception, e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit .MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error(ScheduledTask sendHeartbeatToAllBroker exception, e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit .MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error(ScheduledTask persistAllConsumerOffset exception, e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { Override public void run() { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error(ScheduledTask adjustThreadPool exception, e); } } }, 1, 1, TimeUnit.MINUTES); } 从代码中可以看出MQClientInstance会定时进行如下几个操作获取NameServer地址、更新TopicRoute信息、清理离线的Broker和保存消费者的Offset。