当前位置: 首页 > news >正文

大连市网站推广公司开发网站嵌入广告

大连市网站推广公司,开发网站嵌入广告,视频网站是怎么做的,快速网站备案多少钱前言 原理#xff1a;使用普通消息和订单事务无法保证一致的原因#xff0c;本质上是由于普通消息无法像单机数据库事务一样#xff0c;具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能#xff0c;在普通消息基础上#xff0c;支持二阶段的提交能…前言 原理使用普通消息和订单事务无法保证一致的原因本质上是由于普通消息无法像单机数据库事务一样具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能在普通消息基础上支持二阶段的提交能力。将二阶段提交和本地事务绑定实现全局提交结果的一致性。 解决问题事务消息主要解决生产方和消费方的数据最终一致性问题。 实现方式二阶段消息 反查机制 源码版本4.9.3 源码架构图 源码解析 接下来将会按照上面提到的事务消息原理和源码架构图中的顺序进行源码实现分析。 1. 发送半事务消息 源码入口 org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction(org.apache.rocketmq.common.message.Message, java.lang.Object) public TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException {if (null this.transactionListener) {throw new MQClientException(TransactionListener is null, null);}msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);} 接下来对消息添加PROPERTY_TRANSACTION_PREPARED半事务标记发送事务消息发送成功后执行本地事务。 // 发送事务消息public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {// 检查事务监听器不能为空TransactionListener transactionListener getCheckListener();if (null localTransactionExecuter null transactionListener) {throw new MQClientException(tranExecutor is null, null);}// ignore DelayTimeLevel parameterif (msg.getDelayTimeLevel() ! 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult null;// 添加事务消息标记预处理事务消息半half消息MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, true);MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {// 发送消息sendResult this.send(msg);} catch (Exception e) {throw new MQClientException(send message Exception, e);}LocalTransactionState localTransactionState LocalTransactionState.UNKNOW;Throwable localException null;switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() ! null) {msg.putUserProperty(__transactionId__, sendResult.getTransactionId());}String transactionId msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null ! transactionId !.equals(transactionId)) {msg.setTransactionId(transactionId);}if (null ! localTransactionExecuter) {localTransactionState localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener ! null) {log.debug(Used new transaction API);// 重要发送消息到broker后mq客户端内部执行本地事务localTransactionState transactionListener.executeLocalTransaction(msg, arg);}if (null localTransactionState) {localTransactionState LocalTransactionState.UNKNOW;}if (localTransactionState ! LocalTransactionState.COMMIT_MESSAGE) {log.info(executeLocalTransactionBranch return {}, localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info(executeLocalTransactionBranch exception, e);log.info(msg.toString());localException e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {this.endTransaction(msg, sendResult, localTransactionState, localException);} catch (Exception e) {log.warn(local transaction execute localTransactionState , but end broker transaction failed, e);}TransactionSendResult transactionSendResult new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;} ... 接下来的发送流程和发送普通消息几乎一致broker接受到事务消息后的处理流程也和普通消息几乎一致不一致的地方在于broker的ReputMessageService重分发消息服务线程在后台读取 CommitLog 数据分发到 ConsumeQueue时不会分发半Half事务消息和回滚事务消息对应的CommitLog。因为半事务消息需要等待本地事务执行完才能投递到消费者队列。 消费分发代码org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {Overridepublic void dispatch(DispatchRequest request) {// 事务类型final int tranType MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {// 非事务和提交事务写入消息到消费队列case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:DefaultMessageStore.this.putMessagePositionInfo(request);break;// 预处理事务和回滚事务不做处理case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}}2. 执行本地事务 与 提交本地事务状态 接着上面的发送消息往后看发送事务消息成功后就会执行本地事务。 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() ! null) {msg.putUserProperty(__transactionId__, sendResult.getTransactionId());}String transactionId msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null ! transactionId !.equals(transactionId)) {msg.setTransactionId(transactionId);}if (null ! localTransactionExecuter) {localTransactionState localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener ! null) {log.debug(Used new transaction API);// 重要发送消息到broker后mq客户端内部执行本地事务localTransactionState transactionListener.executeLocalTransaction(msg, arg);}if (null localTransactionState) {localTransactionState LocalTransactionState.UNKNOW;}if (localTransactionState ! LocalTransactionState.COMMIT_MESSAGE) {log.info(executeLocalTransactionBranch return {}, localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info(executeLocalTransactionBranch exception, e);log.info(msg.toString());localException e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;} 这里使用的transactionListener 事务监听器使我们在发起本地事务的代码里写的。如example工程中给的 public class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex new AtomicInteger(0);private ConcurrentHashMapString, Integer localTrans new ConcurrentHashMap();Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value transactionIndex.getAndIncrement();int status value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status localTrans.get(msg.getTransactionId());if (null ! status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;} } 本地事务执行成功后会发送提交消息或回滚消息到broker master节点。 broker收到消息后由EndTransactionProcessor 结束事务消息处理器进行处理。针对 CommitType类型消息去除半事务消息标记发送最终形态的消息且删除原 CommitLog中的消息。针对 RollBackType类型的消息删除原CommitLog中的消息。 org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {// 封装响应final RemotingCommand response RemotingCommand.createResponseCommand(null);// 提取请求头final EndTransactionRequestHeader requestHeader (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.debug(Transaction request:{}, requestHeader);// 处理结束事务消息的broker只能是master节点否者直接返回if (BrokerRole.SLAVE brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn(Message store is slave mode, so end transaction is forbidden. );return response;}// 默认falseif (requestHeader.getFromTransactionCheck()) {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn(Check producer[{}] transaction state, but its pending status. RequestHeader: {} Remark: {},RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {LOGGER.warn(Check producer[{}] transaction state, the producer commit the message. RequestHeader: {} Remark: {},RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn(Check producer[{}] transaction state, the producer rollback the message. RequestHeader: {} Remark: {},RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}} else {// 分类处理switch (requestHeader.getCommitOrRollback()) {// 非事务类型打印日志返回nullcase MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn(The producer[{}] end transaction in sending message, and its pending status. RequestHeader: {} Remark: {},RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}// 提交事物类型break继续处理case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {break;}// 回滚事物类型break继续处理case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn(The producer[{}] end transaction in sending message, rollback the message. RequestHeader: {} Remark: {},RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}}OperationResult result new OperationResult();// 提交事物if (MessageSysFlag.TRANSACTION_COMMIT_TYPE requestHeader.getCommitOrRollback()) {// 调用事物消息提交服务实现是从消息存储组件中拿到预处理消息result this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() ResponseCode.SUCCESS) {// 检查预处理消息封装命令协议RemotingCommand res checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() ResponseCode.SUCCESS) {// 预处理消息转换为broker内部消息MessageExtBrokerInner msgInner endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());// 删除预处理事务消息标记MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);// 发送最终消息RemotingCommand sendResult sendFinalMessage(msgInner);if (sendResult.getCode() ResponseCode.SUCCESS) {this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}}// 回滚事物else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE requestHeader.getCommitOrRollback()) {// 回滚消息从消息存储组件中拿到预处理消息result this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() ResponseCode.SUCCESS) {// 检查消息封装命令协议RemotingCommand res checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() ResponseCode.SUCCESS) {// 删除预处理事务消息this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response;} 3. 未收到事务提交状态时回查本地事务状态 可以看到源码架构图中broker 内部有一个后台线程TransactionalMessageCheckService 事务消息检查服务它会定时扫描没有提交事物状态的事务向 mqClient 发起检查本地事务状态请求。 org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest // 事务消息检查服务 public class TransactionalMessageCheckService extends ServiceThread {Overrideprotected void onWaitEnd() {long timeout brokerController.getBrokerConfig().getTransactionTimeOut(); // 6sint checkMax brokerController.getBrokerConfig().getTransactionCheckMax(); // 15long begin System.currentTimeMillis();log.info(Begin to check prepare message, begin time:{}, begin);this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());log.info(End to check prepare message, consumed time:{}, System.currentTimeMillis() - begin);}} ... 通过事务消息服务 TransactionalMessageService扫描到需要检查的事务后会调用 Broker2Client组件发送请求到mq客户端。 org.apache.rocketmq.broker.client.net.Broker2Client#checkProducerTransactionState // 检查生产者事务状态public void checkProducerTransactionState(final String group, // 生产者组final Channel channel, // 生产者长连接final CheckTransactionStateRequestHeader requestHeader, // 检查事务状态请求头final MessageExt messageExt // 消息扩展) throws Exception {// 构造检查事务状态请求RemotingCommand request RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);request.setBody(MessageDecoder.encode(messageExt, false));try {// 通过网络通信服务器向生产者发送检查事务状态请求单向发送this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);} catch (Exception e) {log.error(Check transaction failed because invoke producer exception. group{}, msgId{}, error{},group, messageExt.getMsgId(), e.toString());}} mqClient接受请求 Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {switch (request.getCode()) {// 检查事务状态case RequestCode.CHECK_TRANSACTION_STATE:return this.checkTransactionState(ctx, request);} 接着调用生产者实例执行检查 producer.checkTransactionState(addr, messageExt, requestHeader); public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {// 解码检查事务状态请求头final CheckTransactionStateRequestHeader requestHeader (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);final ByteBuffer byteBuffer ByteBuffer.wrap(request.getBody());// 解码消息体final MessageExt messageExt MessageDecoder.decode(byteBuffer);if (messageExt ! null) {if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));}String transactionId messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null ! transactionId !.equals(transactionId)) {messageExt.setTransactionId(transactionId);}// 事务消息必须有生产分组名称final String group messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (group ! null) {// 获取生产者实例MQProducerInner producer this.mqClientFactory.selectProducer(group);if (producer ! null) {// 从netty网络通道中获取客户端地址final String addr RemotingHelper.parseChannelRemoteAddr(ctx.channel());// 检查事务状态producer.checkTransactionState(addr, messageExt, requestHeader);} else {log.debug(checkTransactionState, pick producer by group[{}] failed, group);}} else {log.warn(checkTransactionState, pick producer group failed);}} else {log.warn(checkTransactionState, decode message failed);}return null;} 生产者实例在本地执行我们发送事务消息时设置的transactionCheckListener 事务检查监听器。然后将本地事务执行状态重新提交到 broker接下来的流程就和发送事务消息后执行本地事务提交本地事务状态的流程一样了。 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState // 检查事务状态未收到half消息时进行事务回查Overridepublic void checkTransactionState(final String addr, final MessageExt msg,final CheckTransactionStateRequestHeader header) {// 封装一步任务放进线程池异步执行Runnable request new Runnable() {private final String brokerAddr addr;private final MessageExt message msg;private final CheckTransactionStateRequestHeader checkRequestHeader header;private final String group DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();Overridepublic void run() {TransactionCheckListener transactionCheckListener DefaultMQProducerImpl.this.checkListener();TransactionListener transactionListener getCheckListener();if (transactionCheckListener ! null || transactionListener ! null) {LocalTransactionState localTransactionState LocalTransactionState.UNKNOW;Throwable exception null;try {// 调用本地事务监听器检查本地事务状态if (transactionCheckListener ! null) {localTransactionState transactionCheckListener.checkLocalTransactionState(message);} else if (transactionListener ! null) {log.debug(Used new check API in transaction message);// 检查本地事务状态localTransactionState transactionListener.checkLocalTransaction(message);} else {log.warn(CheckTransactionState, pick transactionListener by group[{}] failed, group);}} catch (Throwable e) {log.error(Broker call checkTransactionState, but checkLocalTransactionState exception, e);exception e;}this.processTransactionState(localTransactionState,group,exception);} else {log.warn(CheckTransactionState, pick transactionCheckListener by group[{}] failed, group);}}// 处理事务状态private void processTransactionState(// 调用本地事务监听器查询到的本地事务状态final LocalTransactionState localTransactionState,// 生产者分组final String producerGroup,// 异常final Throwable exception) {// 封装事务状态请求final EndTransactionRequestHeader thisHeader new EndTransactionRequestHeader();thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());thisHeader.setProducerGroup(producerGroup);thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());thisHeader.setFromTransactionCheck(true);String uniqueKey message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqueKey null) {uniqueKey message.getMsgId();}thisHeader.setMsgId(uniqueKey);thisHeader.setTransactionId(checkRequestHeader.getTransactionId());switch (localTransactionState) {// 提交消息状态case COMMIT_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;// 回滚消息状态case ROLLBACK_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);log.warn(when broker check, client rollback this transaction, {}, thisHeader);break;// 未知消息状态case UNKNOW:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);log.warn(when broker check, client does not know this transaction state, {}, thisHeader);break;default:break;}String remark null;if (exception ! null) {remark checkLocalTransactionState Exception: RemotingHelper.exceptionSimpleDesc(exception);}// 执行钩子函数doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);try {// 通过netty网络通信客户端组件向broker发送事务状态请求oneway模式不需要感知发送结果DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);} catch (Exception e) {log.error(endTransactionOneway exception, e);}}};this.checkExecutor.submit(request);}
http://www.pierceye.com/news/100704/

相关文章:

  • 大淘客网站建设婚庆网页设计作品dw
  • 嘉兴网站关键词优化后端开发流程
  • 有网络网站打不开怎么回事培训机构推广
  • 淄博网站建设优化珍云网站可信图标
  • 大连外贸网站建设江门营销网站建设
  • 县网站建设方案怎么做付费的小说网站
  • 企业公众号以及网站建设我想做个网站
  • 网站设为主页功能怎么做怎样制作h5
  • 网站的内容与功能设计微信公众平台小程序二维码怎么生成
  • 西安网站快速优化重庆明建网络科技有限公司干啥的
  • 广州市天河区门户网站软件制作公司
  • 做网站前期创建文件夹博罗高端网站建设价格
  • 襄阳网站建设价格淄博网站推广价格
  • 网站推广的软件六安网站制作哪里有
  • 大型门户网站模板wordpress有哪些小工具
  • 有flash的网站新闻资讯app制作公司
  • 网站和平台有什么区别黄页88怎么发信息质量高
  • 阿里建站价格小户型室内装修设计公司网站
  • 建设银行网站安全性分析网络推广服务平台
  • 大型购物网站建设福建微网站建设公司
  • 做网站软件j程序员找工作网站
  • 济南网站建设系统画册设计公司宣传册
  • 上海网站设计方案家纺网站建设
  • 衡水精品网站建设游戏广告推广平台
  • 响应式企业网站建设营销战略
  • wordpress离线浏览搜索引擎优化包括
  • 门户网站建设需要多少呼伦贝尔市住房和城乡建设局网站
  • 静海集团网站建设住房城乡建设网站
  • 个人备案挂企业网站网站开发公司照片
  • 网站建设课程体会国内最新新闻简短