A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 小李哥 于 2018-5-3 15:45 编辑

分布式事务解决方案一:消息一致性


主动方应用流程图解析:
1.发送消息到消息中间件,2.消息中间件接受消息,存储消息。将存储消息发回主动方。
3.主动方接受消息,执行业务操作。4.主动方发送成功处理结果到中间件。
5.根据主动方的成功失败信息,进行删除或者投送信息到被动方应用。

业务上的逻辑,A系统在处理事务时候,先发送预处理消息到消息服务系统,在消息系统里面存储预处理消息,修改状态为WAITING_CONFIRM。
等到A系统处理完成之后,再修改消息服务系统的消息状态为SENDING。


roncoo-pay-app-message项目的RoncooPayAppMessageTask类,是用来启动定时任务的,启动main方法出发两个子线程,第一个是处理状态为“待确认”但已超时的消息。
log.info("执行(处理[waiting_confirm]状态的消息)任务开始");
settScheduled.handleWaitingConfirmTimeOutMessages();
log.info("执行(处理[waiting_confirm]状态的消息)任务结束");
跟踪源码发现执行MessageScheduledImpl的handleWaitingConfirmTimeOutMessages()。其作用是查询A系统执行业务方法执行的状态(所以A系统需要提供查询接口给消息系统查询),A系统业务执行成功,修改状态为SENDING。
如果A系统业务执行失败,通过messageID删除该记录。

对下面代码解析:
-------------------------------------------------------------------------------------------
roncoo-pay-app-message  消息系统。

                        类------RoncooPayAppMessageTask      测试类。入口

                          ------- MessageBiz               业务处理类


                        配置文件---------RpTransactionMessageMapper.xml消息记录表




---------------------------------------------------------------------------------------------
使用流程:主服务  调用业务方法前,先调用message确认发送业务消息。执行业务之后,如果成功在主服务修改message系统的状态。如果不成功修改message的记录为失败。

1)roncoo-pay-service-trade - 主服务操作系统。先完成支付,然后查询支付结果,如果成功再调用确认方法completeFailOrder  
        -----------RpTradePaymentManagerServiceImpl   
        -----------预发送消息:279行      rpTransactionMessageService.saveMessageWaitingConfirm(rpTransactionMessage);   
        -----------处理业务方法。错误会抛异常。不执行下面的代码。
        ----------- 执行成功确认方法:292行    rpTransactionMessageService.confirmAndSendMessage(rpTransactionMessage.getMessageId());

   2)roncoo-pay-service-message -  中间件确认系统。
        -----------RpTransactionMessageServiceImpl  

        /**
         * 预存储消息.
         */
        public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
        
        
        /**
         * 确认并发送消息.
         */
        public void confirmAndSendMessage(String messageId) throws MessageBizException;

        
        /**
         * 存储并发送消息.
         */
        public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

        
        /**
         * 直接发送消息.
         */
        public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
        
        
        /**
         * 重发消息.
         */
        public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
        
        
        /**
         * 根据messageId重发某条消息.
         */
        public void reSendMessageByMessageId(String messageId) throws MessageBizException;
        
        
        /**
         * 将消息标记为死亡消息.
         */
        public void setMessageToAreadlyDead(String messageId) throws MessageBizException;


        /**
         * 根据消息ID获取消息
         */
        public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;

        /**
         * 根据消息ID删除消息
         */
        public void deleteMessageByMessageId(String messageId) throws MessageBizException;
        
        
        /**
         * 重发某个消息队列中的全部已死亡的消息.
         */
        public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;
        
        /**
         * 获取分页数据
         */
        PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException;


     3)记录数据:RpTransactionMessageDaoImpl       先保存saveMessageWaitingConfirm,等主业务完成JmsTemplate发送到消费端。

     4)实体类:调用RpTradePaymentManagerServiceImpl类的方法sealRpTransactionMessage实体类RpTradePaymentRecord转换成RpTransactionMessage消费记录对象
   
   public class RpTransactionMessage extends BaseEntity {

        private static final long serialVersionUID = 1757377457814546156L;

        private String messageId;

        private String messageBody;

        private String messageDataType;

        private String consumerQueue;

        private Integer messageSendTimes;

        private String areadlyDead;

        private String field1;

        private String field2;

        private String field3;

}

public class BaseEntity implements Serializable {

        /**
         *
         */
        private static final long serialVersionUID = 1L;
        
        private String id = StringUtil.get32UUID();// 主键ID.
        private Integer version = 0;// 版本号默认为0
        private String status;// 状态 PublicStatusEnum
        private String creater;// 创建人.
        private Date createTime = new Date();// 创建时间.
        private String editor;// 修改人.
        private Date editTime;// 修改时间.
        private String remark;// 描述
}



------------------------------------定时调度,重新发送---------------------------
roncoo-pay-app-message
        --------------MessageScheduledImpl   调度 MessageBiz 执行handleSendingTimeOutMessage方法
                                             订单成功,待发送消息。主动方发送消息支付成功消息到mq失败情况----- 确认消息或者删除消息 handleWaitingConfirmTimeOutMessages
                      (MessageBiz需要实现主动方任务接口,查询成功,修改状态。失败超时,删除数据)


这样就可以确保主动方提供消息到消息系统的消息一致性了。  按照这个思路笔记,亲测是可以完成的。


https://github.com/qiudxx/roncoo-pay-dubbo.git

11.png (360.87 KB, 下载次数: 10)

11.png

22.png (23.76 KB, 下载次数: 9)

22.png

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马