A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

1. 概述
必须必须必须 前置阅读内容:
2. 事务消息发送2.1 Producer 发送事务消息
  • 活动图如下(结合 核心代码 理解):
  • 实现代码如下:
  1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】  2: /**  3:  * 发送事务消息  4:  *  5:  * @param msg 消息  6:  * @param tranExecuter 【本地事务】执行器  7:  * @param arg 【本地事务】执行器参数  8:  * @return 事务发送结果  9:  * @throws MQClientException 当 Client 发生异常时 10:  */ 11: public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) 12:     throws MQClientException { 13:     if (null == tranExecuter) { 14:         throw new MQClientException("tranExecutor is null", null); 15:     } 16:     Validators.checkMessage(msg, this.defaultMQProducer); 17:  18:     // 发送【Half消息】 19:     SendResult sendResult; 20:     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); 21:     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); 22:     try { 23:         sendResult = this.send(msg); 24:     } catch (Exception e) { 25:         throw new MQClientException("send message Exception", e); 26:     } 27:  28:     // 处理发送【Half消息】结果 29:     LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; 30:     Throwable localException = null; 31:     switch (sendResult.getSendStatus()) { 32:         // 发送【Half消息】成功,执行【本地事务】逻辑 33:         case SEND_OK: { 34:             try { 35:                 if (sendResult.getTransactionId() != null) { // 事务编号。目前开源版本暂时没用到,猜想ONS在使用。 36:                     msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); 37:                 } 38:  39:                 // 执行【本地事务】逻辑 40:                 localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); 41:                 if (null == localTransactionState) { 42:                     localTransactionState = LocalTransactionState.UNKNOW; 43:                 } 44:  45:                 if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { 46:                     log.info("executeLocalTransactionBranch return {}", localTransactionState); 47:                     log.info(msg.toString()); 48:                 } 49:             } catch (Throwable e) { 50:                 log.info("executeLocalTransactionBranch exception", e); 51:                 log.info(msg.toString()); 52:                 localException = e; 53:             } 54:         } 55:         break; 56:         // 发送【Half消息】失败,标记【本地事务】状态为回滚 57:         case FLUSH_DISK_TIMEOUT: 58:         case FLUSH_SLAVE_TIMEOUT: 59:         case SLAVE_NOT_AVAILABLE: 60:             localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; 61:             break; 62:         default: 63:             break; 64:     } 65:  66:     // 结束事务:提交消息 COMMIT / ROLLBACK 67:     try { 68:         this.endTransaction(sendResult, localTransactionState, localException); 69:     } catch (Exception e) { 70:         log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); 71:     } 72:  73:     // 返回【事务发送结果】 74:     TransactionSendResult transactionSendResult = new TransactionSendResult(); 75:     transactionSendResult.setSendStatus(sendResult.getSendStatus()); 76:     transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); 77:     transactionSendResult.setMsgId(sendResult.getMsgId()); 78:     transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); 79:     transactionSendResult.setTransactionId(sendResult.getTransactionId()); 80:     transactionSendResult.setLocalTransactionState(localTransactionState); 81:     return transactionSendResult; 82: } 83:  84: /** 85:  * 结束事务:提交消息 COMMIT / ROLLBACK 86:  * 87:  * @param sendResult 发送【Half消息】结果 88:  * @param localTransactionState 【本地事务】状态 89:  * @param localException 执行【本地事务】逻辑产生的异常 90:  * @throws RemotingException 当远程调用发生异常时 91:  * @throws MQBrokerException 当 Broker 发生异常时 92:  * @throws InterruptedException 当线程中断时 93:  * @throws UnknownHostException 当解码消息编号失败是 94:  */ 95: public void endTransaction(// 96:     final SendResult sendResult, // 97:     final LocalTransactionState localTransactionState, // 98:     final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { 99:     // 解码消息编号100:     final MessageId id;101:     if (sendResult.getOffsetMsgId() != null) {102:         id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());103:     } else {104:         id = MessageDecoder.decodeMessageId(sendResult.getMsgId());105:     }106: 107:     // 创建请求108:     String transactionId = sendResult.getTransactionId();109:     final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());110:     EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();111:     requestHeader.setTransactionId(transactionId);112:     requestHeader.setCommitLogOffset(id.getOffset());113:     switch (localTransactionState) {114:         case COMMIT_MESSAGE:115:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);116:             break;117:         case ROLLBACK_MESSAGE:118:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);119:             break;120:         case UNKNOW:121:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);122:             break;123:         default:124:             break;125:     }126:     requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());127:     requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());128:     requestHeader.setMsgId(sendResult.getMsgId());129:     String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;130: 131:     // 提交消息 COMMIT / ROLLBACK。!!!通信方式为:Oneway!!!132:     this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());133: }2.2 Broker 处理结束事务请求

3 个回复

倒序浏览
回复 使用道具 举报
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马