龙之向导外贸官方网站,商城网站的搜索记录代码怎么做,百度做的网站字体侵权,图书网站开发的实践意义消息中间件 RocketMQ 高级功能和源码分析#xff08;八#xff09;
一、消息中间件 RocketMQ 源码分析#xff1a;实时更新消息消费队列与索引文件流程说明
1、实时更新消息消费队列与索引文件
消息消费队文件、消息属性索引文件都是基于 CommitLog 文件构建的#xff0…消息中间件 RocketMQ 高级功能和源码分析八
一、消息中间件 RocketMQ 源码分析实时更新消息消费队列与索引文件流程说明
1、实时更新消息消费队列与索引文件
消息消费队文件、消息属性索引文件都是基于 CommitLog 文件构建的当消息生产者提交的消息存储在 CommitLog 文件中ConsumerQueue、IndexFile 需要及时更新否则消息无法及时被消费根据消息属性查找消息也会出现较大延迟。RocketMQ 通过开启一个线程 ReputMessageService 来准实时转发 CommitLog 文件更新事件相应的任务处理器根据转发的消息及时更新 ConsumerQueue、IndexFile 文件。
2、消息存储结构 示例图 3、构建消息消费队列和索引文件 示例图 4、 代码DefaultMessageStorestart //设置CommitLog内存中最大偏移量
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
//启动
this.reputMessageService.start();5、 代码DefaultMessageStorerun public void run() {DefaultMessageStore.log.info(this.getServiceName() service started);//每隔1毫秒就继续尝试推送消息到消息消费队列和索引文件while (!this.isStopped()) {try {Thread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() service has exception. , e);}}DefaultMessageStore.log.info(this.getServiceName() service end);
}6、 代码DefaultMessageStoredeReput //从result中循环遍历消息,一次读一条,创建DispatherRequest对象。
for (int readSize 0; readSize result.getSize() doNext; ) {DispatchRequest dispatchRequest DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size dispatchRequest.getBufferSize() -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {if (size 0) {DefaultMessageStore.this.doDispatch(dispatchRequest);}}
}7、 DispatchRequest String topic; //消息主题名称
int queueId; //消息队列ID
long commitLogOffset; //消息物理偏移量
int msgSize; //消息长度
long tagsCode; //消息过滤tag hashCode
long storeTimestamp; //消息存储时间戳
long consumeQueueOffset; //消息队列偏移量
String keys; //消息索引key
boolean success; //是否成功解析到完整的消息
String uniqKey; //消息唯一键
int sysFlag; //消息系统标记
long preparedTransactionOffset; //消息预处理事务偏移量
MapString, String propertiesMap; //消息属性
byte[] bitMap; //位图二、消息中间件 RocketMQ 源码分析转发数据到 ConsumerQueue 文件
1、转发到 ConsumerQueue 消息分发到消息消费队列 示例图 2、 代码 CommitLogDispatcherBuildConsumeQueue 类 class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {Overridepublic void dispatch(DispatchRequest request) {final int tranType MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE://消息分发DefaultMessageStore.this.putMessagePositionInfo(request);break;case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}
}3、 代码DefaultMessageStore#putMessagePositionInfo public void putMessagePositionInfo(DispatchRequest dispatchRequest) {//获得消费队列ConsumeQueue cq this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());//消费队列分发消息cq.putMessagePositionInfoWrapper(dispatchRequest);
}4、 代码DefaultMessageStore#putMessagePositionInfo //依次将消息偏移量、消息长度、tag写入到ByteBuffer中
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
//获得内存映射文件
MappedFile mappedFile this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile ! null) {//将消息追加到内存映射文件,异步输盘return mappedFile.appendMessage(this.byteBufferIndex.array());
}三、消息中间件 RocketMQ 源码分析转发 IndexFile 文件
1、转发到 Index 消息分发到索引文件 示例图 2、 代码 CommitLogDispatcherBuildIndex 类 class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {Overridepublic void dispatch(DispatchRequest request) {if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {DefaultMessageStore.this.indexService.buildIndex(request);}}
}3、 代码DefaultMessageStore#buildIndex public void buildIndex(DispatchRequest req) {//获得索引文件IndexFile indexFile retryGetAndCreateIndexFile();if (indexFile ! null) {//获得文件最大物理偏移量long endPhyOffset indexFile.getEndPhyOffset();DispatchRequest msg req;String topic msg.getTopic();String keys msg.getKeys();//如果该消息的物理偏移量小于索引文件中的最大物理偏移量,则说明是重复数据,忽略本次索引构建if (msg.getCommitLogOffset() endPhyOffset) {return;}final int tranType MessageSysFlag.getTransactionValue(msg.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:break;case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:return;}//如果消息ID不为空,则添加到Hash索引中if (req.getUniqKey() ! null) {indexFile putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));if (indexFile null) {return;}}//构建索引key,RocketMQ支持为同一个消息建立多个索引,多个索引键空格隔开.if (keys ! null keys.length() 0) {String[] keyset keys.split(MessageConst.KEY_SEPARATOR);for (int i 0; i keyset.length; i) {String key keyset[i];if (key.length() 0) {indexFile putKey(indexFile, msg, buildKey(topic, key));if (indexFile null) {return;}}}}} else {log.error(build index error, stop building index);}
}四、消息中间件 RocketMQ 源码分析消息队列和索引文件恢复
1、消息队列和索引文件恢复
由于 RocketMQ 存储首先将消息全量存储在 CommitLog 文件中然后异步生成转发任务更新 ConsumerQueue 和 Index 文件。如果消息成功存储到 CommitLog 文件中转发任务未成功执行此时消息服务器 Broker 由于某个愿意宕机导致CommitLog、ConsumerQueue、IndexFile 文件数据不一致。如果不加以人工修复的话会有一部分消息即便在 CommitLog 中文件中存在但由于没有转发到 ConsumerQueue这部分消息将永远复发被消费者消费。
2、文件恢复总体流程 示例图 3、存储文件加载
代码DefaultMessageStore#load
判断上一次是否异常退出。实现机制是 Broker 在启动时创建 abort 文件在退出时通过 JVM 钩子函数删除 abort 文件。如果下次启动时存在 abort 文件。说明 Broker 时异常退出的CommitLog 与 ConsumerQueue 数据有可能不一致需要进行修复。 //判断临时文件是否存在
boolean lastExitOK !this.isTempFileExist();
//根据临时文件判断当前Broker是否异常退出
private boolean isTempFileExist() {String fileName StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());File file new File(fileName);return file.exists();
}4、 代码DefaultMessageStore#load //加载延时队列
if (null ! scheduleMessageService) {result result this.scheduleMessageService.load();
}// 加载CommitLog文件
result result this.commitLog.load();// 加载消费队列文件
result result this.loadConsumeQueue();if (result) {//加载存储监测点,监测点主要记录CommitLog文件、ConsumerQueue文件、Index索引文件的刷盘点this.storeCheckpoint new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));//加载index文件this.indexService.load(lastExitOK);//根据Broker是否异常退出,执行不同的恢复策略this.recover(lastExitOK);
}5、 代码MappedFileQueue#load
加载 CommitLog 到映射文件 //指向CommitLog文件目录
File dir new File(this.storePath);
//获得文件数组
File[] files dir.listFiles();
if (files ! null) {// 文件排序Arrays.sort(files);//遍历文件for (File file : files) {//如果文件大小和配置文件不一致,退出if (file.length() ! this.mappedFileSize) {return false;}try {//创建映射文件MappedFile mappedFile new MappedFile(file.getPath(), mappedFileSize);mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);//将映射文件添加到队列this.mappedFiles.add(mappedFile);log.info(load file.getPath() OK);} catch (IOException e) {log.error(load file file error, e);return false;}}
}return true;6、 代码DefaultMessageStore#loadConsumeQueue
加载消息消费队列 //执行消费队列目录
File dirLogic new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
//遍历消费队列目录
File[] fileTopicList dirLogic.listFiles();
if (fileTopicList ! null) {for (File fileTopic : fileTopicList) {//获得子目录名称,即topic名称String topic fileTopic.getName();//遍历子目录下的消费队列文件File[] fileQueueIdList fileTopic.listFiles();if (fileQueueIdList ! null) {//遍历文件for (File fileQueueId : fileQueueIdList) {//文件名称即队列IDint queueId;try {queueId Integer.parseInt(fileQueueId.getName());} catch (NumberFormatException e) {continue;}//创建消费队列并加载到内存ConsumeQueue logic new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);this.putConsumeQueue(topic, queueId, logic);if (!logic.load()) {return false;}}}}
}log.info(load logics queue all over, OK);return true;7、 代码IndexService#load
加载索引文件 public boolean load(final boolean lastExitOK) {//索引文件目录File dir new File(this.storePath);//遍历索引文件File[] files dir.listFiles();if (files ! null) {//文件排序Arrays.sort(files);//遍历文件for (File file : files) {try {//加载索引文件IndexFile f new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);f.load();if (!lastExitOK) {//索引文件上次的刷盘时间小于该索引文件的消息时间戳,该文件将立即删除if (f.getEndTimestamp() this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {f.destroy(0);continue;}}//将索引文件添加到队列log.info(load index file OK, f.getFileName());this.indexFileList.add(f);} catch (IOException e) {log.error(load file {} error, file, e);return false;} catch (NumberFormatException e) {log.error(load file {} error, file, e);}}}return true;
}8、 代码DefaultMessageStore#recover
文件恢复根据 Broker 是否正常退出执行不同的恢复策略 private void recover(final boolean lastExitOK) {//获得最大的物理便宜消费队列long maxPhyOffsetOfConsumeQueue this.recoverConsumeQueue();if (lastExitOK) {//正常恢复this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);} else {//异常恢复this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);}//在CommitLog中保存每个消息消费队列当前的存储逻辑偏移量this.recoverTopicQueueTable();
}9、 代码DefaultMessageStore#recoverTopicQueueTable
恢复 ConsumerQueue 后将在 CommitLog 实例中保存每隔消息队列当前的存储逻辑偏移量这也是消息中不仅存储主题、消息队列 ID、还存储了消息队列的关键所在。 public void recoverTopicQueueTable() {HashMapString/* topic-queueid */, Long/* offset */ table new HashMapString, Long(1024);//CommitLog最小偏移量long minPhyOffset this.commitLog.getMinOffset();//遍历消费队列,将消费队列保存在CommitLog中for (ConcurrentMapInteger, ConsumeQueue maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {String key logic.getTopic() - logic.getQueueId();table.put(key, logic.getMaxOffsetInQueue());logic.correctMinOffset(minPhyOffset);}}this.commitLog.setTopicQueueTable(table);
}五、消息中间件 RocketMQ 源码分析正常恢复和异常恢复
1、正常恢复
代码CommitLog#recoverNormally public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {final ListMappedFile mappedFiles this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {//Broker正常停止再重启时,从倒数第三个开始恢复,如果不足3个文件,则从第一个文件开始恢复。int index mappedFiles.size() - 3;if (index 0)index 0;MappedFile mappedFile mappedFiles.get(index);ByteBuffer byteBuffer mappedFile.sliceByteBuffer();long processOffset mappedFile.getFileFromOffset();//代表当前已校验通过的offsetlong mappedFileOffset 0;while (true) {//查找消息DispatchRequest dispatchRequest this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);//消息长度int size dispatchRequest.getMsgSize();//查找结果为true,并且消息长度大于0,表示消息正确.mappedFileOffset向前移动本消息长度if (dispatchRequest.isSuccess() size 0) {mappedFileOffset size;}//如果查找结果为true且消息长度等于0,表示已到该文件末尾,如果还有下一个文件,则重置processOffset和MappedFileOffset重复查找下一个文件,否则跳出循环。else if (dispatchRequest.isSuccess() size 0) {index;if (index mappedFiles.size()) {// Current branch can not happenbreak;} else {//取出每个文件mappedFile mappedFiles.get(index);byteBuffer mappedFile.sliceByteBuffer();processOffset mappedFile.getFileFromOffset();mappedFileOffset 0;}}// 查找结果为false表明该文件未填满所有消息跳出循环结束循环else if (!dispatchRequest.isSuccess()) {log.info(recover physics file end, mappedFile.getFileName());break;}}//更新MappedFileQueue的flushedWhere和committedWhere指针processOffset mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);//删除offset之后的所有文件this.mappedFileQueue.truncateDirtyFiles(processOffset);if (maxPhyOffsetOfConsumeQueue processOffset) {this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}} else {this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();}
}2、 代码MappedFileQueue#truncateDirtyFiles public void truncateDirtyFiles(long offset) {ListMappedFile willRemoveFiles new ArrayListMappedFile();//遍历目录下文件for (MappedFile file : this.mappedFiles) {//文件尾部的偏移量long fileTailOffset file.getFileFromOffset() this.mappedFileSize;//文件尾部的偏移量大于offsetif (fileTailOffset offset) {//offset大于文件的起始偏移量if (offset file.getFileFromOffset()) {//更新wrotePosition、committedPosition、flushedPosistionfile.setWrotePosition((int) (offset % this.mappedFileSize));file.setCommittedPosition((int) (offset % this.mappedFileSize));file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {//offset小于文件的起始偏移量,说明该文件是有效文件后面创建的,释放mappedFile占用内存,删除文件file.destroy(1000);willRemoveFiles.add(file);}}}this.deleteExpiredFile(willRemoveFiles);
}3、异常恢复
Broker 异常停止文件恢复的实现为 CommitLog#recoverAbnormally。异常文件恢复步骤与正常停止文件恢复流程基本相同其主要差别有两个。首先正常停止默认从倒数第三个文件开始进行恢复而异常停止则需要从最后一个文件往前走找到第一个消息存储正常的文件。其次如果 CommitLog 目录没有消息文件如果消息消费队列目录下存在文件则需要销毁。
代码CommitLog#recoverAbnormally if (!mappedFiles.isEmpty()) {// Looking beginning to recover from which fileint index mappedFiles.size() - 1;MappedFile mappedFile null;for (; index 0; index--) {mappedFile mappedFiles.get(index);//判断消息文件是否是一个正确的文件if (this.isMappedFileMatchedRecover(mappedFile)) {log.info(recover from this mapped file mappedFile.getFileName());break;}}//根据索引取出mappedFile文件if (index 0) {index 0;mappedFile mappedFiles.get(index);}//...验证消息的合法性,并将消息转发到消息消费队列和索引文件}else{//未找到mappedFile,重置flushWhere、committedWhere都为0销毁消息队列文件this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();
}上一节关联链接请点击 # 消息中间件 RocketMQ 高级功能和源码分析七