专业建站推广服务,平面设计专业学校排名,seo短视频网页入口引流怎么做,行业类网站应如何建设RocketMQ源码阅读-九-自定义过滤规则Flitersrv 什么是FiltersrvFiltersrv注册到Broker过滤类Consumer发起订阅设置过滤类代码Consumer上传过滤类代码Flitersrv编译过滤类代码 过滤消息Consumer 从 Filtersrv 拉取消息Flitersrv从Broker拉取消息 Flitersrv的高可用总结 什么是Fi… RocketMQ源码阅读-九-自定义过滤规则Flitersrv 什么是FiltersrvFiltersrv注册到Broker过滤类Consumer发起订阅设置过滤类代码Consumer上传过滤类代码Flitersrv编译过滤类代码 过滤消息Consumer 从 Filtersrv 拉取消息Flitersrv从Broker拉取消息 Flitersrv的高可用总结 什么是Filtersrv
Filtersrv 负责自定义规则过滤 Consumer 从 Broker 拉取的消息。其在系统中的位置如下图:
Filtersrv优点
减少了 Broker 的负担减少了 Consumer 接收无用的消息
缺点
多了一层 Filtersrv 网络开销
Filtersrv注册到Broker
Flitersrv与Broker的对应关系为
一个Flitersrv对应一个Broker一个Broker对应多个Flitersrv
Flitersrv的高可用
启动多个FlitersrvFlitersrv注册失败会自动退出
Flitersrv注册到Broker的核心代码在FiltersrvController类中其初始化源码如下
public boolean initialize() {MixAll.printObjectProperties(log, this.filtersrvConfig);this.remotingServer new NettyRemotingServer(this.nettyServerConfig);this.remotingExecutor Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),new ThreadFactoryImpl(RemotingExecutorThread_));this.registerProcessor();// 固定间隔注册到Brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {FiltersrvController.this.registerFilterServerToBroker();}}, 15, 10, TimeUnit.SECONDS);this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis() - 1000);this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() - 1000);this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr());this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid()));return true;
}调用了FiltersrvController#registerFilterServerToBroker
public void registerFilterServerToBroker() {try {RegisterFilterServerResponseHeader responseHeader this.filterServerOuterAPI.registerFilterServerToBroker(this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setDefaultBrokerId(responseHeader.getBrokerId());if (null this.brokerName) {this.brokerName responseHeader.getBrokerName();}log.info(register filter server{} to broker{} OK, Return: {} {},this.localAddr(),this.filtersrvConfig.getConnectWhichBroker(),responseHeader.getBrokerName(),responseHeader.getBrokerId());} catch (Exception e) {log.warn(register filter server Exception, e);log.warn(access broker failed, kill oneself);System.exit(-1); // 异常退出}
}此方法会去 注册Filtersrv 到 Broker如果注册失败会关闭Filtersrv。其中注册Flitersrv到Broker调用的FilterServerOuterAPI的registerFilterServerToBroker方法
public RegisterFilterServerResponseHeader registerFilterServerToBroker(final String brokerAddr,final String filterServerAddr
) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {RegisterFilterServerRequestHeader requestHeader new RegisterFilterServerRequestHeader();requestHeader.setFilterServerAddr(filterServerAddr);RemotingCommand request RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader);RemotingCommand response this.remotingClient.invokeSync(brokerAddr, request, 3000);assert response ! null;switch (response.getCode()) {case ResponseCode.SUCCESS: {RegisterFilterServerResponseHeader responseHeader (RegisterFilterServerResponseHeader) response.decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class);return responseHeader;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());
}此方法构建注册请求将filterServerAddr注册到Broker。
过滤类
Consumer会上传过滤类代码给Flitersrv然后Flitersrv编译过滤类代码使用。其关系如下图
Consumer发起订阅设置过滤类代码
Consumer在发起订阅时可以进行过滤类代码的设置DefaultMQPushConsumer#subscribe
// 将主题订阅到消费订阅。
// 参数
// topic – 要消费的主题。
// fullClassName – 全类名必须扩展 org.apache.rocketmq.common.filter。消息过滤器
// filterClassSource – 类源代码采用UTF-8文件编码必须对你的代码安全负责
Override
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
}Consumer上传过滤类代码
在 Consumer 心跳注册到 Broker 的同时上传 过滤类代码 到 Broker 对应的所有 Filtersrv。类MQClientInstance核心代码如下
public void sendHeartbeatToAllBrokerWithLock() {if (this.lockHeartbeat.tryLock()) {try {// 发送心跳到Brokerthis.sendHeartbeatToAllBroker();// 上传过滤类源码到Filtersrvthis.uploadFilterClassSource();} catch (final Exception e) {log.error(sendHeartbeatToAllBroker exception, e);} finally {this.lockHeartbeat.unlock();}} else {log.warn(lock heartBeat, but failed.);}
}上传过滤类源码到Filtersrv调用方法MQClientInstance#uploadFilterClassSource
/*** 上传过滤类到Filtersrv*/
private void uploadFilterClassSource() {IteratorEntryString, MQConsumerInner it this.consumerTable.entrySet().iterator();while (it.hasNext()) {EntryString, MQConsumerInner next it.next();MQConsumerInner consumer next.getValue();if (ConsumeType.CONSUME_PASSIVELY consumer.consumeType()) {SetSubscriptionData subscriptions consumer.subscriptions();for (SubscriptionData sub : subscriptions) {if (sub.isClassFilterMode() sub.getFilterClassSource() ! null) {final String consumerGroup consumer.groupName();final String className sub.getSubString();final String topic sub.getTopic();final String filterClassSource sub.getFilterClassSource();try {this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);} catch (Exception e) {log.error(uploadFilterClassToAllFilterServer Exception, e);}}}}}
}Flitersrv编译过滤类代码
Filtersrv 编译使用 Consumer 上传的 过滤类代码。核心代码在FilterClassManager#registerFilterClass
/*** 注册过滤类** param consumerGroup 消费分组* param topic Topic* param className 过滤类名* param classCRC 过滤类源码CRC* param filterSourceBinary 过滤类源码* return 是否注册成功*/
public boolean registerFilterClass(final String consumerGroup, final String topic,final String className, final int classCRC, final byte[] filterSourceBinary) {final String key buildKey(consumerGroup, topic);// 判断是否要注册新的过滤类boolean registerNew false;FilterClassInfo filterClassInfoPrev this.filterClassTable.get(key);if (null filterClassInfoPrev) {registerNew true;} else {if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {if (filterClassInfoPrev.getClassCRC() ! classCRC classCRC ! 0) { // 类有变化registerNew true;}}}// 注册新的过滤类if (registerNew) {synchronized (this.compileLock) {filterClassInfoPrev this.filterClassTable.get(key);if (null ! filterClassInfoPrev filterClassInfoPrev.getClassCRC() classCRC) {return true;}try {FilterClassInfo filterClassInfoNew new FilterClassInfo();filterClassInfoNew.setClassName(className);filterClassInfoNew.setClassCRC(0);filterClassInfoNew.setMessageFilter(null);if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {String javaSource new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);// 编译新的过滤类Class? newClass DynaCode.compileAndLoadClass(className, javaSource);// 创建新的过滤类对象Object newInstance newClass.newInstance();filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);filterClassInfoNew.setClassCRC(classCRC);}this.filterClassTable.put(key, filterClassInfoNew);} catch (Throwable e) {String info String.format(FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s,consumerGroup, topic, className);log.error(info, e);return false;}}}return true;
}此方法将传进来的源代码编译生成一个MessageFilter示例保存在filterClassTable中。
过滤消息
再次回顾Flitersrv在整体中的位置Consumer从Broker拉取消息的时候会经过Flitersrv过滤消息。实际上是从Flitersrv拉取消息。
Consumer 从 Filtersrv 拉取消息
Consumer 拉取 使用过滤类方式订阅 的消费消息时从 Broker 对应的 Filtersrv 列表随机选择一个拉取消息。如果选择不到 Filtersrv则无法拉取消息。因此Filtersrv 一定要做高可用。拉取消息的核心代码在类PullAPIWrapper#pullKernelImpl方法中
/*** 拉取消息核心方法** param mq 消息队列* param subExpression 订阅表达式* param subVersion 订阅版本号* param offset 拉取队列开始位置* param maxNums 批量拉取消息数量* param sysFlag 拉取系统标识* param commitOffset 提交消费进度* param brokerSuspendMaxTimeMillis broker挂起请求最大时间* param timeoutMillis 请求broker超时时间* param communicationMode 通讯模式* param pullCallback 拉取回调* return 拉取消息结果。只有通讯模式为同步时才返回结果否则返回null。* throws MQClientException 当寻找不到 broker 时或发生其他client异常* throws RemotingException 当远程调用发生异常时* throws MQBrokerException 当 broker 发生异常时。只有通讯模式为同步时才会发生该异常。* throws InterruptedException 当发生中断异常时*/
protected PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 获取Broker信息FindBrokerResult findBrokerResult this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);if (null findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}// 请求拉取消息if (findBrokerResult ! null) {int sysFlagInner sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}PullMessageRequestHeader requestHeader new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);// 若订阅topic使用过滤类使用filtersrv获取消息String brokerAddr findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);}PullResult pullResult this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}// Broker信息不存在则抛出异常throw new MQClientException(The broker[ mq.getBrokerName() ] not exist, null);
}代码第68行计算从哪个Flitersrv拉取消息PullAPIWrapper#computPullFromWhichFilterServer
/*** 计算filtersrv地址。如果有多个filtersrv随机选择一个。** param topic Topic* param brokerAddr broker地址* return filtersrv地址* throws MQClientException 当filtersrv不存在时*/
private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
throws MQClientException {ConcurrentHashMapString, TopicRouteData topicRouteTable this.mQClientFactory.getTopicRouteTable();if (topicRouteTable ! null) {TopicRouteData topicRouteData topicRouteTable.get(topic);ListString list topicRouteData.getFilterServerTable().get(brokerAddr);if (list ! null !list.isEmpty()) {return list.get(randomNum() % list.size());}}throw new MQClientException(Find Filter Server Failed, Broker Addr: brokerAddr topic: topic, null);
}Flitersrv从Broker拉取消息
Flitersrv 向 Broker 拉取消息时实际使用的 DefaultMQPullConsumer.java 的方法和逻辑DefaultRequestProcessor#pullMessageForward
/*** 拉取消息** param ctx 拉取消息context* param request 拉取消息请求* return 响应* throws Exception 当发生异常时*/
private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {final RemotingCommand response RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);final PullMessageResponseHeader responseHeader (PullMessageResponseHeader) response.readCustomHeader();final PullMessageRequestHeader requestHeader (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);final FilterContext filterContext new FilterContext();filterContext.setConsumerGroup(requestHeader.getConsumerGroup());response.setOpaque(request.getOpaque());DefaultMQPullConsumer pullConsumer this.filtersrvController.getDefaultMQPullConsumer();// 校验Topic过滤类是否完整final FilterClassInfo findFilterClass this.filtersrvController.getFilterClassManager().findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());if (null findFilterClass) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(Find Filter class failed, not registered);return response;}if (null findFilterClass.getMessageFilter()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(Find Filter class failed, registered but no class);return response;}// 设置下次请求从 Broker主节点。responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);MessageQueue mq new MessageQueue();mq.setTopic(requestHeader.getTopic());mq.setQueueId(requestHeader.getQueueId());mq.setBrokerName(this.filtersrvController.getBrokerName());long offset requestHeader.getQueueOffset();int maxNums requestHeader.getMaxMsgNums();final PullCallback pullCallback new PullCallback() {Overridepublic void onSuccess(PullResult pullResult) {responseHeader.setMaxOffset(pullResult.getMaxOffset());responseHeader.setMinOffset(pullResult.getMinOffset());responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());response.setRemark(null);switch (pullResult.getPullStatus()) {case FOUND:response.setCode(ResponseCode.SUCCESS);ListMessageExt msgListOK new ArrayListMessageExt();try {for (MessageExt msg : pullResult.getMsgFoundList()) {// 使用过滤类过滤消息boolean match findFilterClass.getMessageFilter().match(msg, filterContext);if (match) {msgListOK.add(msg);}}if (!msgListOK.isEmpty()) {returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);return;} else {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);}} catch (Throwable e) {final String error String.format(do Message Filter Exception, ConsumerGroup: %s Topic: %s ,requestHeader.getConsumerGroup(), requestHeader.getTopic());log.error(error, e);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(error RemotingHelper.exceptionSimpleDesc(e));returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);return;}break;case NO_MATCHED_MSG:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case NO_NEW_MSG:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_ILLEGAL:response.setCode(ResponseCode.PULL_OFFSET_MOVED);break;default:break;}returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);}Overridepublic void onException(Throwable e) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(Pull Callback Exception, RemotingHelper.exceptionSimpleDesc(e));returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);return;}};// 拉取消息pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);return null;
}此方法通过DefaultMQPullConsumer执行消息拉取在回调方法PullCallback中上述代码62行执行过滤逻辑没被过滤的才保存。 Flitersrv的高可用
通过多个Flitersrv完成高可用多个Flitersrv部署的示意图如下
一个Consumer对应多个Flitersrv一个Flitersrv对应一个Broker一个Broker对应多个Flitersrv
Consumer会从所有Broker对应的Flitersrv中随即选择一个进行消息拉取。
总结
Flitersrv为了减少Broker的负担且减少Consumer接收无用消息而生。Flitersrv作为中间层Consumer订阅时传过滤类给BrokerBroker将过滤类传给FlitersrvFlitersrv处理并实例化过滤类。消息拉取时Consumer向Flitersrv拉取消息Flitersrv先向Broker拉取消息然后经过过滤类的过滤再将满足条件的消息传给Consumer。