1. 概述 必须必须必须 前置阅读内容: 2. 事务消息发送2.1 Producer 发送事务消息 1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】 2: /** 3: * 发送事务消息 4: * 5: * @param msg 消息 6: * @param tranExecuter 【本地事务】执行器 7: * @param arg 【本地事务】执行器参数 8: * @return 事务发送结果 9: * @throws MQClientException 当 Client 发生异常时 10: */ 11: public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) 12: throws MQClientException { 13: if (null == tranExecuter) { 14: throw new MQClientException("tranExecutor is null", null); 15: } 16: Validators.checkMessage(msg, this.defaultMQProducer); 17: 18: // 发送【Half消息】 19: SendResult sendResult; 20: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); 21: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); 22: try { 23: sendResult = this.send(msg); 24: } catch (Exception e) { 25: throw new MQClientException("send message Exception", e); 26: } 27: 28: // 处理发送【Half消息】结果 29: LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; 30: Throwable localException = null; 31: switch (sendResult.getSendStatus()) { 32: // 发送【Half消息】成功,执行【本地事务】逻辑 33: case SEND_OK: { 34: try { 35: if (sendResult.getTransactionId() != null) { // 事务编号。目前开源版本暂时没用到,猜想ONS在使用。 36: msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); 37: } 38: 39: // 执行【本地事务】逻辑 40: localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); 41: if (null == localTransactionState) { 42: localTransactionState = LocalTransactionState.UNKNOW; 43: } 44: 45: if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { 46: log.info("executeLocalTransactionBranch return {}", localTransactionState); 47: log.info(msg.toString()); 48: } 49: } catch (Throwable e) { 50: log.info("executeLocalTransactionBranch exception", e); 51: log.info(msg.toString()); 52: localException = e; 53: } 54: } 55: break; 56: // 发送【Half消息】失败,标记【本地事务】状态为回滚 57: case FLUSH_DISK_TIMEOUT: 58: case FLUSH_SLAVE_TIMEOUT: 59: case SLAVE_NOT_AVAILABLE: 60: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; 61: break; 62: default: 63: break; 64: } 65: 66: // 结束事务:提交消息 COMMIT / ROLLBACK 67: try { 68: this.endTransaction(sendResult, localTransactionState, localException); 69: } catch (Exception e) { 70: log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); 71: } 72: 73: // 返回【事务发送结果】 74: TransactionSendResult transactionSendResult = new TransactionSendResult(); 75: transactionSendResult.setSendStatus(sendResult.getSendStatus()); 76: transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); 77: transactionSendResult.setMsgId(sendResult.getMsgId()); 78: transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); 79: transactionSendResult.setTransactionId(sendResult.getTransactionId()); 80: transactionSendResult.setLocalTransactionState(localTransactionState); 81: return transactionSendResult; 82: } 83: 84: /** 85: * 结束事务:提交消息 COMMIT / ROLLBACK 86: * 87: * @param sendResult 发送【Half消息】结果 88: * @param localTransactionState 【本地事务】状态 89: * @param localException 执行【本地事务】逻辑产生的异常 90: * @throws RemotingException 当远程调用发生异常时 91: * @throws MQBrokerException 当 Broker 发生异常时 92: * @throws InterruptedException 当线程中断时 93: * @throws UnknownHostException 当解码消息编号失败是 94: */ 95: public void endTransaction(// 96: final SendResult sendResult, // 97: final LocalTransactionState localTransactionState, // 98: final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { 99: // 解码消息编号100: final MessageId id;101: if (sendResult.getOffsetMsgId() != null) {102: id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());103: } else {104: id = MessageDecoder.decodeMessageId(sendResult.getMsgId());105: }106: 107: // 创建请求108: String transactionId = sendResult.getTransactionId();109: final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());110: EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();111: requestHeader.setTransactionId(transactionId);112: requestHeader.setCommitLogOffset(id.getOffset());113: switch (localTransactionState) {114: case COMMIT_MESSAGE:115: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);116: break;117: case ROLLBACK_MESSAGE:118: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);119: break;120: case UNKNOW:121: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);122: break;123: default:124: break;125: }126: requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());127: requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());128: requestHeader.setMsgId(sendResult.getMsgId());129: String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;130: 131: // 提交消息 COMMIT / ROLLBACK。!!!通信方式为:Oneway!!!132: this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());133: }2.2 Broker 处理结束事务请求 |
|