当前位置: 首页 > news >正文

专业建站推广服务平面设计专业学校排名

专业建站推广服务,平面设计专业学校排名,seo短视频网页入口引流怎么做,行业类网站应如何建设RocketMQ源码阅读-九-自定义过滤规则Flitersrv 什么是FiltersrvFiltersrv注册到Broker过滤类Consumer发起订阅设置过滤类代码Consumer上传过滤类代码Flitersrv编译过滤类代码 过滤消息Consumer 从 Filtersrv 拉取消息Flitersrv从Broker拉取消息 Flitersrv的高可用总结 什么是Fi… RocketMQ源码阅读-九-自定义过滤规则Flitersrv 什么是FiltersrvFiltersrv注册到Broker过滤类Consumer发起订阅设置过滤类代码Consumer上传过滤类代码Flitersrv编译过滤类代码 过滤消息Consumer 从 Filtersrv 拉取消息Flitersrv从Broker拉取消息 Flitersrv的高可用总结 什么是Filtersrv Filtersrv 负责自定义规则过滤 Consumer 从 Broker 拉取的消息。其在系统中的位置如下图: Filtersrv优点 减少了 Broker 的负担减少了 Consumer 接收无用的消息 缺点 多了一层 Filtersrv 网络开销 Filtersrv注册到Broker Flitersrv与Broker的对应关系为 一个Flitersrv对应一个Broker一个Broker对应多个Flitersrv Flitersrv的高可用 启动多个FlitersrvFlitersrv注册失败会自动退出 Flitersrv注册到Broker的核心代码在FiltersrvController类中其初始化源码如下 public boolean initialize() {MixAll.printObjectProperties(log, this.filtersrvConfig);this.remotingServer new NettyRemotingServer(this.nettyServerConfig);this.remotingExecutor Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),new ThreadFactoryImpl(RemotingExecutorThread_));this.registerProcessor();// 固定间隔注册到Brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {FiltersrvController.this.registerFilterServerToBroker();}}, 15, 10, TimeUnit.SECONDS);this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis() - 1000);this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() - 1000);this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr());this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid()));return true; }调用了FiltersrvController#registerFilterServerToBroker public void registerFilterServerToBroker() {try {RegisterFilterServerResponseHeader responseHeader this.filterServerOuterAPI.registerFilterServerToBroker(this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setDefaultBrokerId(responseHeader.getBrokerId());if (null this.brokerName) {this.brokerName responseHeader.getBrokerName();}log.info(register filter server{} to broker{} OK, Return: {} {},this.localAddr(),this.filtersrvConfig.getConnectWhichBroker(),responseHeader.getBrokerName(),responseHeader.getBrokerId());} catch (Exception e) {log.warn(register filter server Exception, e);log.warn(access broker failed, kill oneself);System.exit(-1); // 异常退出} }此方法会去 注册Filtersrv 到 Broker如果注册失败会关闭Filtersrv。其中注册Flitersrv到Broker调用的FilterServerOuterAPI的registerFilterServerToBroker方法 public RegisterFilterServerResponseHeader registerFilterServerToBroker(final String brokerAddr,final String filterServerAddr ) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {RegisterFilterServerRequestHeader requestHeader new RegisterFilterServerRequestHeader();requestHeader.setFilterServerAddr(filterServerAddr);RemotingCommand request RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader);RemotingCommand response this.remotingClient.invokeSync(brokerAddr, request, 3000);assert response ! null;switch (response.getCode()) {case ResponseCode.SUCCESS: {RegisterFilterServerResponseHeader responseHeader (RegisterFilterServerResponseHeader) response.decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class);return responseHeader;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark()); }此方法构建注册请求将filterServerAddr注册到Broker。 过滤类 Consumer会上传过滤类代码给Flitersrv然后Flitersrv编译过滤类代码使用。其关系如下图 Consumer发起订阅设置过滤类代码 Consumer在发起订阅时可以进行过滤类代码的设置DefaultMQPushConsumer#subscribe // 将主题订阅到消费订阅。 // 参数 // topic – 要消费的主题。 // fullClassName – 全类名必须扩展 org.apache.rocketmq.common.filter。消息过滤器 // filterClassSource – 类源代码采用UTF-8文件编码必须对你的代码安全负责 Override public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource); }Consumer上传过滤类代码 在 Consumer 心跳注册到 Broker 的同时上传 过滤类代码 到 Broker 对应的所有 Filtersrv。类MQClientInstance核心代码如下 public void sendHeartbeatToAllBrokerWithLock() {if (this.lockHeartbeat.tryLock()) {try {// 发送心跳到Brokerthis.sendHeartbeatToAllBroker();// 上传过滤类源码到Filtersrvthis.uploadFilterClassSource();} catch (final Exception e) {log.error(sendHeartbeatToAllBroker exception, e);} finally {this.lockHeartbeat.unlock();}} else {log.warn(lock heartBeat, but failed.);} }上传过滤类源码到Filtersrv调用方法MQClientInstance#uploadFilterClassSource /*** 上传过滤类到Filtersrv*/ private void uploadFilterClassSource() {IteratorEntryString, MQConsumerInner it this.consumerTable.entrySet().iterator();while (it.hasNext()) {EntryString, MQConsumerInner next it.next();MQConsumerInner consumer next.getValue();if (ConsumeType.CONSUME_PASSIVELY consumer.consumeType()) {SetSubscriptionData subscriptions consumer.subscriptions();for (SubscriptionData sub : subscriptions) {if (sub.isClassFilterMode() sub.getFilterClassSource() ! null) {final String consumerGroup consumer.groupName();final String className sub.getSubString();final String topic sub.getTopic();final String filterClassSource sub.getFilterClassSource();try {this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);} catch (Exception e) {log.error(uploadFilterClassToAllFilterServer Exception, e);}}}}} }Flitersrv编译过滤类代码 Filtersrv 编译使用 Consumer 上传的 过滤类代码。核心代码在FilterClassManager#registerFilterClass /*** 注册过滤类** param consumerGroup 消费分组* param topic Topic* param className 过滤类名* param classCRC 过滤类源码CRC* param filterSourceBinary 过滤类源码* return 是否注册成功*/ public boolean registerFilterClass(final String consumerGroup, final String topic,final String className, final int classCRC, final byte[] filterSourceBinary) {final String key buildKey(consumerGroup, topic);// 判断是否要注册新的过滤类boolean registerNew false;FilterClassInfo filterClassInfoPrev this.filterClassTable.get(key);if (null filterClassInfoPrev) {registerNew true;} else {if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {if (filterClassInfoPrev.getClassCRC() ! classCRC classCRC ! 0) { // 类有变化registerNew true;}}}// 注册新的过滤类if (registerNew) {synchronized (this.compileLock) {filterClassInfoPrev this.filterClassTable.get(key);if (null ! filterClassInfoPrev filterClassInfoPrev.getClassCRC() classCRC) {return true;}try {FilterClassInfo filterClassInfoNew new FilterClassInfo();filterClassInfoNew.setClassName(className);filterClassInfoNew.setClassCRC(0);filterClassInfoNew.setMessageFilter(null);if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {String javaSource new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);// 编译新的过滤类Class? newClass DynaCode.compileAndLoadClass(className, javaSource);// 创建新的过滤类对象Object newInstance newClass.newInstance();filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);filterClassInfoNew.setClassCRC(classCRC);}this.filterClassTable.put(key, filterClassInfoNew);} catch (Throwable e) {String info String.format(FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s,consumerGroup, topic, className);log.error(info, e);return false;}}}return true; }此方法将传进来的源代码编译生成一个MessageFilter示例保存在filterClassTable中。 过滤消息 再次回顾Flitersrv在整体中的位置Consumer从Broker拉取消息的时候会经过Flitersrv过滤消息。实际上是从Flitersrv拉取消息。 Consumer 从 Filtersrv 拉取消息 Consumer 拉取 使用过滤类方式订阅 的消费消息时从 Broker 对应的 Filtersrv 列表随机选择一个拉取消息。如果选择不到 Filtersrv则无法拉取消息。因此Filtersrv 一定要做高可用。拉取消息的核心代码在类PullAPIWrapper#pullKernelImpl方法中 /*** 拉取消息核心方法** param mq 消息队列* param subExpression 订阅表达式* param subVersion 订阅版本号* param offset 拉取队列开始位置* param maxNums 批量拉取消息数量* param sysFlag 拉取系统标识* param commitOffset 提交消费进度* param brokerSuspendMaxTimeMillis broker挂起请求最大时间* param timeoutMillis 请求broker超时时间* param communicationMode 通讯模式* param pullCallback 拉取回调* return 拉取消息结果。只有通讯模式为同步时才返回结果否则返回null。* throws MQClientException 当寻找不到 broker 时或发生其他client异常* throws RemotingException 当远程调用发生异常时* throws MQBrokerException 当 broker 发生异常时。只有通讯模式为同步时才会发生该异常。* throws InterruptedException 当发生中断异常时*/ protected PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 获取Broker信息FindBrokerResult findBrokerResult this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);if (null findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}// 请求拉取消息if (findBrokerResult ! null) {int sysFlagInner sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}PullMessageRequestHeader requestHeader new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);// 若订阅topic使用过滤类使用filtersrv获取消息String brokerAddr findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);}PullResult pullResult this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}// Broker信息不存在则抛出异常throw new MQClientException(The broker[ mq.getBrokerName() ] not exist, null); }代码第68行计算从哪个Flitersrv拉取消息PullAPIWrapper#computPullFromWhichFilterServer /*** 计算filtersrv地址。如果有多个filtersrv随机选择一个。** param topic Topic* param brokerAddr broker地址* return filtersrv地址* throws MQClientException 当filtersrv不存在时*/ private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) throws MQClientException {ConcurrentHashMapString, TopicRouteData topicRouteTable this.mQClientFactory.getTopicRouteTable();if (topicRouteTable ! null) {TopicRouteData topicRouteData topicRouteTable.get(topic);ListString list topicRouteData.getFilterServerTable().get(brokerAddr);if (list ! null !list.isEmpty()) {return list.get(randomNum() % list.size());}}throw new MQClientException(Find Filter Server Failed, Broker Addr: brokerAddr topic: topic, null); }Flitersrv从Broker拉取消息 Flitersrv 向 Broker 拉取消息时实际使用的 DefaultMQPullConsumer.java 的方法和逻辑DefaultRequestProcessor#pullMessageForward /*** 拉取消息** param ctx 拉取消息context* param request 拉取消息请求* return 响应* throws Exception 当发生异常时*/ private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {final RemotingCommand response RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);final PullMessageResponseHeader responseHeader (PullMessageResponseHeader) response.readCustomHeader();final PullMessageRequestHeader requestHeader (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);final FilterContext filterContext new FilterContext();filterContext.setConsumerGroup(requestHeader.getConsumerGroup());response.setOpaque(request.getOpaque());DefaultMQPullConsumer pullConsumer this.filtersrvController.getDefaultMQPullConsumer();// 校验Topic过滤类是否完整final FilterClassInfo findFilterClass this.filtersrvController.getFilterClassManager().findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());if (null findFilterClass) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(Find Filter class failed, not registered);return response;}if (null findFilterClass.getMessageFilter()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(Find Filter class failed, registered but no class);return response;}// 设置下次请求从 Broker主节点。responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);MessageQueue mq new MessageQueue();mq.setTopic(requestHeader.getTopic());mq.setQueueId(requestHeader.getQueueId());mq.setBrokerName(this.filtersrvController.getBrokerName());long offset requestHeader.getQueueOffset();int maxNums requestHeader.getMaxMsgNums();final PullCallback pullCallback new PullCallback() {Overridepublic void onSuccess(PullResult pullResult) {responseHeader.setMaxOffset(pullResult.getMaxOffset());responseHeader.setMinOffset(pullResult.getMinOffset());responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());response.setRemark(null);switch (pullResult.getPullStatus()) {case FOUND:response.setCode(ResponseCode.SUCCESS);ListMessageExt msgListOK new ArrayListMessageExt();try {for (MessageExt msg : pullResult.getMsgFoundList()) {// 使用过滤类过滤消息boolean match findFilterClass.getMessageFilter().match(msg, filterContext);if (match) {msgListOK.add(msg);}}if (!msgListOK.isEmpty()) {returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);return;} else {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);}} catch (Throwable e) {final String error String.format(do Message Filter Exception, ConsumerGroup: %s Topic: %s ,requestHeader.getConsumerGroup(), requestHeader.getTopic());log.error(error, e);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(error RemotingHelper.exceptionSimpleDesc(e));returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);return;}break;case NO_MATCHED_MSG:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case NO_NEW_MSG:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_ILLEGAL:response.setCode(ResponseCode.PULL_OFFSET_MOVED);break;default:break;}returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);}Overridepublic void onException(Throwable e) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(Pull Callback Exception, RemotingHelper.exceptionSimpleDesc(e));returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);return;}};// 拉取消息pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);return null; }此方法通过DefaultMQPullConsumer执行消息拉取在回调方法PullCallback中上述代码62行执行过滤逻辑没被过滤的才保存。 Flitersrv的高可用 通过多个Flitersrv完成高可用多个Flitersrv部署的示意图如下 一个Consumer对应多个Flitersrv一个Flitersrv对应一个Broker一个Broker对应多个Flitersrv Consumer会从所有Broker对应的Flitersrv中随即选择一个进行消息拉取。 总结 Flitersrv为了减少Broker的负担且减少Consumer接收无用消息而生。Flitersrv作为中间层Consumer订阅时传过滤类给BrokerBroker将过滤类传给FlitersrvFlitersrv处理并实例化过滤类。消息拉取时Consumer向Flitersrv拉取消息Flitersrv先向Broker拉取消息然后经过过滤类的过滤再将满足条件的消息传给Consumer。
http://www.pierceye.com/news/763211/

相关文章:

  • 大型网站建设的难点是什么物联网技术
  • 怎么免费建个免费的站点写作网站5妙不写就删除
  • 深圳网站建设软件开发公司排名网站做301的坏处
  • ai网站制作的图片
  • 自己想开个网站怎么弄移动端网站设计欣赏
  • 国外网站建站上海品牌策划设计
  • 郑州网站制作选择乐云seo网站建设误区图
  • 湖南智能网站建设多少钱会声会影免费模板网站
  • 社区网站建设方案书建站之星官方网站
  • 过时的网站什么公司做企业网站
  • 最新企业网站搜索引擎优化是做什么
  • 提高网站公信力 单仁手机设计培训网站建设
  • asp.net网站管理系统域名注册报备
  • 买了个网站后怎么做如何提高 网站的点击量
  • 哪些行业网站推广做的多o2o商城源码
  • 北京seo站内优化电商网站前端页面响应式设计
  • 贵港seo关键词整站优化网站恶意攻击
  • 王磊网络网站建设公关
  • 怎么建网站做推广win网站建设
  • 在线做英语题的网站wordpress被设置不录入
  • 桃花岛网站是什么翻硬币网站怎么做
  • 做海报的网站有哪些内容windows同步wordpress
  • 制作网页的网站费用属于资本性支出吗安徽区块链虚拟币网站开发方案
  • 做网站前产品经理要了解什么搜索引擎优化免费
  • 广州网站建设技术方案营销网站推广策略
  • 郑州网站建设、中国菲律宾铁路项目
  • 潜江网站开发学校网站建设领导小组
  • 桂林临桂区建设局网站厦门 微网站建设公司哪家好
  • 如何用云服务器搭建个人网站有些人做网站不用钱的,对吗?
  • 月嫂网站建设方案建设网站询价对比表模板