本帖最后由 小李哥 于 2018-5-3 15:45 编辑
分布式事务解决方案一:消息一致性
主动方应用流程图解析:
1.发送消息到消息中间件,2.消息中间件接受消息,存储消息。将存储消息发回主动方。
3.主动方接受消息,执行业务操作。4.主动方发送成功处理结果到中间件。
5.根据主动方的成功失败信息,进行删除或者投送信息到被动方应用。
业务上的逻辑,A系统在处理事务时候,先发送预处理消息到消息服务系统,在消息系统里面存储预处理消息,修改状态为WAITING_CONFIRM。
等到A系统处理完成之后,再修改消息服务系统的消息状态为SENDING。
roncoo-pay-app-message项目的RoncooPayAppMessageTask类,是用来启动定时任务的,启动main方法出发两个子线程,第一个是处理状态为“待确认”但已超时的消息。
log.info("执行(处理[waiting_confirm]状态的消息)任务开始");
settScheduled.handleWaitingConfirmTimeOutMessages();
log.info("执行(处理[waiting_confirm]状态的消息)任务结束");
跟踪源码发现执行MessageScheduledImpl的handleWaitingConfirmTimeOutMessages()。其作用是查询A系统执行业务方法执行的状态(所以A系统需要提供查询接口给消息系统查询),A系统业务执行成功,修改状态为SENDING。
如果A系统业务执行失败,通过messageID删除该记录。
对下面代码解析:
-------------------------------------------------------------------------------------------
roncoo-pay-app-message 消息系统。
类------RoncooPayAppMessageTask 测试类。入口
------- MessageBiz 业务处理类
配置文件---------RpTransactionMessageMapper.xml消息记录表
---------------------------------------------------------------------------------------------
使用流程:主服务 调用业务方法前,先调用message确认发送业务消息。执行业务之后,如果成功在主服务修改message系统的状态。如果不成功修改message的记录为失败。
1)roncoo-pay-service-trade - 主服务操作系统。先完成支付,然后查询支付结果,如果成功再调用确认方法completeFailOrder
-----------RpTradePaymentManagerServiceImpl
-----------预发送消息:279行 rpTransactionMessageService.saveMessageWaitingConfirm(rpTransactionMessage);
-----------处理业务方法。错误会抛异常。不执行下面的代码。
----------- 执行成功确认方法:292行 rpTransactionMessageService.confirmAndSendMessage(rpTransactionMessage.getMessageId());
2)roncoo-pay-service-message - 中间件确认系统。
-----------RpTransactionMessageServiceImpl
/**
* 预存储消息.
*/
public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 确认并发送消息.
*/
public void confirmAndSendMessage(String messageId) throws MessageBizException;
/**
* 存储并发送消息.
*/
public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 直接发送消息.
*/
public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 重发消息.
*/
public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 根据messageId重发某条消息.
*/
public void reSendMessageByMessageId(String messageId) throws MessageBizException;
/**
* 将消息标记为死亡消息.
*/
public void setMessageToAreadlyDead(String messageId) throws MessageBizException;
/**
* 根据消息ID获取消息
*/
public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;
/**
* 根据消息ID删除消息
*/
public void deleteMessageByMessageId(String messageId) throws MessageBizException;
/**
* 重发某个消息队列中的全部已死亡的消息.
*/
public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;
/**
* 获取分页数据
*/
PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException;
3)记录数据:RpTransactionMessageDaoImpl 先保存saveMessageWaitingConfirm,等主业务完成JmsTemplate发送到消费端。
4)实体类:调用RpTradePaymentManagerServiceImpl类的方法sealRpTransactionMessage实体类RpTradePaymentRecord转换成RpTransactionMessage消费记录对象
public class RpTransactionMessage extends BaseEntity {
private static final long serialVersionUID = 1757377457814546156L;
private String messageId;
private String messageBody;
private String messageDataType;
private String consumerQueue;
private Integer messageSendTimes;
private String areadlyDead;
private String field1;
private String field2;
private String field3;
}
public class BaseEntity implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String id = StringUtil.get32UUID();// 主键ID.
private Integer version = 0;// 版本号默认为0
private String status;// 状态 PublicStatusEnum
private String creater;// 创建人.
private Date createTime = new Date();// 创建时间.
private String editor;// 修改人.
private Date editTime;// 修改时间.
private String remark;// 描述
}
------------------------------------定时调度,重新发送---------------------------
roncoo-pay-app-message
--------------MessageScheduledImpl 调度 MessageBiz 执行handleSendingTimeOutMessage方法
订单成功,待发送消息。主动方发送消息支付成功消息到mq失败情况----- 确认消息或者删除消息 handleWaitingConfirmTimeOutMessages
(MessageBiz需要实现主动方任务接口,查询成功,修改状态。失败超时,删除数据)
这样就可以确保主动方提供消息到消息系统的消息一致性了。 按照这个思路笔记,亲测是可以完成的。
https://github.com/qiudxx/roncoo-pay-dubbo.git
|
|