当前位置: 首页 > news >正文

南通科技网站建设辽宁省网站制作公司排名

南通科技网站建设,辽宁省网站制作公司排名,如何给网站做快速排名,深圳市万齐创想科技有限公司前言 ConsumerOffsetManager负责管理Broker端的topicConfig元数据信息#xff0c;它继承了ConfigManager组件#xff0c;且定时将内存中维护的topic元数据信息#xff0c;注册到远程NameServer集群#xff0c;并持久化到磁盘文件。 源码版本#xff1a;4.9.3 源码架构图…前言 ConsumerOffsetManager负责管理Broker端的topicConfig元数据信息它继承了ConfigManager组件且定时将内存中维护的topic元数据信息注册到远程NameServer集群并持久化到磁盘文件。 源码版本4.9.3 源码架构图 核心数据结构 topic元数据管理组件最核心的数据结构是它所维护的topicConfigTable topic元数据表key是topicNamevalue是topic元数据对象。 public class TopicConfigManager extends ConfigManager {private static final InternalLogger log InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// 锁超时时间3秒private static final long LOCK_TIMEOUT_MILLIS 3000;// 调度队列数量 18个队列private static final int SCHEDULE_TOPIC_QUEUE_NUM 18;// topic元数据锁private transient final Lock topicConfigTableLock new ReentrantLock();// 核心topic元数据表key是topicName value是topic元数据private final ConcurrentMapString, TopicConfig topicConfigTable new ConcurrentHashMapString, TopicConfig(1024);// 数据版本private final DataVersion dataVersion new DataVersion();private transient BrokerController brokerController; } 深入看一下TopicConfig元数据对象主要包含了主题名称、队列数量、权限、过滤类型等核心结构。 public class TopicConfig {private static final String SEPARATOR ;// 默认读写队列数量都是16个public static int defaultReadQueueNums 16;public static int defaultWriteQueueNums 16;// 主题名称private String topicName;private int readQueueNums defaultReadQueueNums;private int writeQueueNums defaultWriteQueueNums;// 权限private int perm PermName.PERM_READ | PermName.PERM_WRITE;// 过滤类型private TopicFilterType topicFilterType TopicFilterType.SINGLE_TAG;// 系统标识private int topicSysFlag 0;private boolean order false; } 接下来看一下topic元数据管理组件的核心行为。在创建对象实例阶段初始化了大量系统级topic元数据且继承了ConfigManager的数据持久化与加载磁盘文件能力。并且提供了大量可以自动创建topic的方法在创建和更新内存中的topic元数据后会更新将元数据信息并发注册到远程所有NameServer集群节点使用到了countDownLatch、oneWay通信且持久化到磁盘文件。 下面是topic元数据管理组件所有的方法行为 public class TopicConfigManager extends ConfigManager {private static final InternalLogger log InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// 锁超时时间3秒private static final long LOCK_TIMEOUT_MILLIS 3000;// 调度队列数量 18个队列private static final int SCHEDULE_TOPIC_QUEUE_NUM 18;// topic元数据锁private transient final Lock topicConfigTableLock new ReentrantLock();// 核心topic元数据表key是topicName value是topic元数据private final ConcurrentMapString, TopicConfig topicConfigTable new ConcurrentHashMapString, TopicConfig(1024);// 数据版本private final DataVersion dataVersion new DataVersion();private transient BrokerController brokerController;public TopicConfigManager() {}public TopicConfigManager(BrokerController brokerController) {this.brokerController brokerController;// 初始化系统元数据{// 系统测试topicString topic TopicValidator.RMQ_SYS_SELF_TEST_TOPIC;TopicConfig topicConfig new TopicConfig(topic);TopicValidator.addSystemTopic(topic);topicConfig.setReadQueueNums(1);topicConfig.setWriteQueueNums(1);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}{// 自动创建topicif (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {String topic TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;TopicConfig topicConfig new TopicConfig(topic);TopicValidator.addSystemTopic(topic);topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());int perm PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;topicConfig.setPerm(perm);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}}{// 性能测试topicString topic TopicValidator.RMQ_SYS_BENCHMARK_TOPIC;TopicConfig topicConfig new TopicConfig(topic);TopicValidator.addSystemTopic(topic);topicConfig.setReadQueueNums(1024);topicConfig.setWriteQueueNums(1024);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}{// 集群topicString topic this.brokerController.getBrokerConfig().getBrokerClusterName();TopicConfig topicConfig new TopicConfig(topic);TopicValidator.addSystemTopic(topic);int perm PermName.PERM_INHERIT;if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {perm | PermName.PERM_READ | PermName.PERM_WRITE;}topicConfig.setPerm(perm);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}{// broker topicString topic this.brokerController.getBrokerConfig().getBrokerName();TopicConfig topicConfig new TopicConfig(topic);TopicValidator.addSystemTopic(topic);int perm PermName.PERM_INHERIT;if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {perm | PermName.PERM_READ | PermName.PERM_WRITE;}topicConfig.setReadQueueNums(1);topicConfig.setWriteQueueNums(1);topicConfig.setPerm(perm);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}{// offset moved event topicString topic TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT;TopicConfig topicConfig new TopicConfig(topic);TopicValidator.addSystemTopic(topic);topicConfig.setReadQueueNums(1);topicConfig.setWriteQueueNums(1);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}{// schedule topicString topic TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;TopicConfig topicConfig new TopicConfig(topic);TopicValidator.addSystemTopic(topic);topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}{// trace topicif (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {String topic this.brokerController.getBrokerConfig().getMsgTraceTopicName();TopicConfig topicConfig new TopicConfig(topic);TopicValidator.addSystemTopic(topic);topicConfig.setReadQueueNums(1);topicConfig.setWriteQueueNums(1);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}}{// reply topicString topic this.brokerController.getBrokerConfig().getBrokerClusterName() _ MixAll.REPLY_TOPIC_POSTFIX;TopicConfig topicConfig new TopicConfig(topic);TopicValidator.addSystemTopic(topic);topicConfig.setReadQueueNums(1);topicConfig.setWriteQueueNums(1);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}}// 查询topic元数据public TopicConfig selectTopicConfig(final String topic) {return this.topicConfigTable.get(topic);}// 在发送消息时自动创建topicpublic TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {TopicConfig topicConfig null;boolean createNew false;try {// 获取一把全局锁防止并发创建topic超时时间3秒if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {// 如果存在topic元数据直接返回topic元数据topicConfig this.topicConfigTable.get(topic);if (topicConfig ! null)return topicConfig;// 获取默认topic元数据TopicConfig defaultTopicConfig this.topicConfigTable.get(defaultTopic);if (defaultTopicConfig ! null) {// 如果默认topic元数据是允许自动创建if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {// 如果禁止自动创建则默认topic元数据权限为只读if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);}}// 如果默认topic元数据是允许继承根据默认topic创建if (PermName.isInherited(defaultTopicConfig.getPerm())) {topicConfig new TopicConfig(topic);int queueNums Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());if (queueNums 0) {queueNums 0;}topicConfig.setReadQueueNums(queueNums);topicConfig.setWriteQueueNums(queueNums);int perm defaultTopicConfig.getPerm();perm ~PermName.PERM_INHERIT;topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());} else {log.warn(Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}],defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);}} else {log.warn(Create new topic failed, because the default topic[{}] not exist. producer:[{}],defaultTopic, remoteAddress);}if (topicConfig ! null) {log.info(Create new topic by default topic:[{}] config:[{}] producer:[{}],defaultTopic, topicConfig, remoteAddress);// 将topic元数据添加到topicConfigTable中this.topicConfigTable.put(topic, topicConfig);// 升级版本号this.dataVersion.nextVersion();// 创建topiccreateNew true;// 持久化元数据this.persist();}} finally {this.topicConfigTableLock.unlock();}}} catch (InterruptedException e) {log.error(createTopicInSendMessageMethod exception, e);}if (createNew) {this.brokerController.registerBrokerAll(false, true, true);}return topicConfig;}public TopicConfig createTopicInSendMessageBackMethod(final String topic,final int clientDefaultTopicQueueNums,final int perm,final int topicSysFlag) {TopicConfig topicConfig this.topicConfigTable.get(topic);if (topicConfig ! null)return topicConfig;boolean createNew false;try {// 获取一把全局锁防止并发创建topic超时时间3秒if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {// 如果存在元数据直接返回topicConfig this.topicConfigTable.get(topic);if (topicConfig ! null)return topicConfig;topicConfig new TopicConfig(topic);topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);// 如果不存在则创建log.info(create new topic {}, topicConfig);this.topicConfigTable.put(topic, topicConfig);createNew true;// 升级版本号this.dataVersion.nextVersion();// 持久化元数据this.persist();} finally {// 释放锁this.topicConfigTableLock.unlock();}}} catch (InterruptedException e) {log.error(createTopicInSendMessageBackMethod exception, e);}if (createNew) {// 注册所有broker信息到nameServerthis.brokerController.registerBrokerAll(false, true, true);}return topicConfig;}// 创建事务消息检查最大时间的topicpublic TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQueueNums, final int perm) {TopicConfig topicConfig this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);if (topicConfig ! null)return topicConfig;boolean createNew false;try {if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {topicConfig this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);if (topicConfig ! null)return topicConfig;topicConfig new TopicConfig(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(0);log.info(create new topic {}, topicConfig);this.topicConfigTable.put(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC, topicConfig);createNew true;this.dataVersion.nextVersion();this.persist();} finally {this.topicConfigTableLock.unlock();}}} catch (InterruptedException e) {log.error(create TRANS_CHECK_MAX_TIME_TOPIC exception, e);}if (createNew) {this.brokerController.registerBrokerAll(false, true, true);}return topicConfig;}// 更新topic的unit flagpublic void updateTopicUnitFlag(final String topic, final boolean unit) {TopicConfig topicConfig this.topicConfigTable.get(topic);if (topicConfig ! null) {// 更新单元标int oldTopicSysFlag topicConfig.getTopicSysFlag();if (unit) {topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag));} else {topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));}log.info(update topic sys flag. oldTopicSysFlag{}, newTopicSysFlag{}, oldTopicSysFlag,topicConfig.getTopicSysFlag());// 存入内存数元数据tablethis.topicConfigTable.put(topic, topicConfig);// 更新数据版本号this.dataVersion.nextVersion();// 持久化this.persist();// 注册所有broker信息到nameServerthis.brokerController.registerBrokerAll(false, true, true);}}// 更新topic的unit sub flagpublic void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) {TopicConfig topicConfig this.topicConfigTable.get(topic);if (topicConfig ! null) {int oldTopicSysFlag topicConfig.getTopicSysFlag();if (hasUnitSub) {topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));} else {topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitSubFlag(oldTopicSysFlag));}log.info(update topic sys flag. oldTopicSysFlag{}, newTopicSysFlag{}, oldTopicSysFlag,topicConfig.getTopicSysFlag());this.topicConfigTable.put(topic, topicConfig);this.dataVersion.nextVersion();this.persist();this.brokerController.registerBrokerAll(false, true, true);}}// 更新topic元数据public void updateTopicConfig(final TopicConfig topicConfig) {TopicConfig old this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);if (old ! null) {log.info(update topic config, old:[{}] new:[{}], old, topicConfig);} else {log.info(create new topic [{}], topicConfig);}// 更新数据版本号this.dataVersion.nextVersion();// 持久化元数据到磁盘this.persist();}// 更新顺序 topic元数据public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {if (orderKVTableFromNs ! null orderKVTableFromNs.getTable() ! null) {boolean isChange false;SetString orderTopics orderKVTableFromNs.getTable().keySet();for (String topic : orderTopics) {TopicConfig topicConfig this.topicConfigTable.get(topic);if (topicConfig ! null !topicConfig.isOrder()) {// 在顺序topic列表中更新topic为顺序topictopicConfig.setOrder(true);isChange true;log.info(update order topic config, topic{}, order{}, topic, true);}}for (Map.EntryString, TopicConfig entry : this.topicConfigTable.entrySet()) {String topic entry.getKey();if (!orderTopics.contains(topic)) {// 不在顺序topic列表中更新为非顺序topicTopicConfig topicConfig entry.getValue();if (topicConfig.isOrder()) {topicConfig.setOrder(false);isChange true;log.info(update order topic config, topic{}, order{}, topic, false);}}}if (isChange) {// 更新数据版本号this.dataVersion.nextVersion();// 持久化元数据到磁盘this.persist();}}}public boolean isOrderTopic(final String topic) {TopicConfig topicConfig this.topicConfigTable.get(topic);if (topicConfig null) {return false;} else {return topicConfig.isOrder();}}// 删除topic元数据并持久化public void deleteTopicConfig(final String topic) {TopicConfig old this.topicConfigTable.remove(topic);if (old ! null) {log.info(delete topic config OK, topic: {}, old);this.dataVersion.nextVersion();this.persist();} else {log.warn(delete topic config failed, topic: {} not exists, topic);}}public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {TopicConfigSerializeWrapper topicConfigSerializeWrapper new TopicConfigSerializeWrapper();topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);topicConfigSerializeWrapper.setDataVersion(this.dataVersion);return topicConfigSerializeWrapper;}Overridepublic String encode() {return encode(false);}Overridepublic String configFilePath() {return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());}// 解码json字符串解析为topic元数据结构Overridepublic void decode(String jsonString) {if (jsonString ! null) {TopicConfigSerializeWrapper topicConfigSerializeWrapper TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);if (topicConfigSerializeWrapper ! null) {this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);}}}// 将核心数据结构内容编码为json字符串public String encode(final boolean prettyFormat) {TopicConfigSerializeWrapper topicConfigSerializeWrapper new TopicConfigSerializeWrapper();topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);topicConfigSerializeWrapper.setDataVersion(this.dataVersion);return topicConfigSerializeWrapper.toJson(prettyFormat);}private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) {IteratorEntryString, TopicConfig it tcs.getTopicConfigTable().entrySet().iterator();while (it.hasNext()) {EntryString, TopicConfig next it.next();log.info(load exist local topic, {}, next.getValue().toString());}}public DataVersion getDataVersion() {return dataVersion;}public ConcurrentMapString, TopicConfig getTopicConfigTable() {return topicConfigTable;} }
http://www.pierceye.com/news/186671/

相关文章:

  • 网站不备案不能访问吗wordpress主题开发404页面
  • 工作总结个人总结自动app优化下载
  • 网站开发推荐书籍比较大的外贸网站
  • 上饶建设网站郑州网
  • 做淘宝客网站一定要备案吗没有网站域名备案
  • 用QQ群做网站排名慈溪网站制作哪家最好
  • 兴宁市网站建设手工艺品网站建设策划书
  • flash做网站导航网站品牌建设流程
  • 公司建设网站属于什么费用网站打模块
  • 网站建设应注意的问题网站备案验证码错误
  • 网站核验点网站自己怎么做的
  • 购物网站建设平台canvas可画网页版
  • 企业信息平台系统网站推广优化建设
  • 免费网站模板制作自助建站上建的网站免费吗
  • 深圳市网站建设外包公司门户网站代码结构
  • 昆明做网站建设找谁最新版在线 网
  • 东昌府聊城网站建设网站广告做的好的企业案例分析
  • asp三层架构做网站网站开发前端基础
  • 医院网站建设方案策划书把网站做成app的软件下载
  • 网站建设实践报告3000字wordpress消息提示插件
  • 网站制作的评价标准做网站后台需要什么
  • 学院网站建设服务宗旨实惠的网站建设产品
  • 网站改名 备案影视制作
  • 网站开发亿码酷技术网站建设选谋者
  • 智能家居网站模板怎样做网站标题优化
  • 深圳制作网站制作公司哪家好最简洁 wordpress主题
  • 重庆忠县网站建设公司推荐国内公关公司
  • 给彩票网站做代理违法吗wordpress文章与页面关联
  • 网站标题加后缀模拟ip访问网站
  • 临清网站建设费用什么是网络营销的基础