黑马程序员技术交流社区

标题: 【西安校区】分布式事务——纯MQ实现 [打印本页]

作者: 逆风TO    时间: 2020-3-4 10:20
标题: 【西安校区】分布式事务——纯MQ实现
本帖最后由 逆风TO 于 2020-3-4 10:22 编辑

一、MQ实现分布式事务,最简单的原理框架:
借助MQ的消息可靠传递,实现业务间解耦、事务强一致

1、>> 生产者发送消息做可靠性检查,确保消息真正投递出去;

2、>> 消费者做幂等,确保业务没有重复执行;

3、>> 消费者做异常重试,反复出错时需要捕捉异常并记录,以便手工干预;

二、场景实践:
场景
以支付宝转账到余额宝为例,在支付宝已经扣款成功的情况下,余额宝一定收到转账

>> 支付宝和余额宝是两个微服务;

>> 用户用支付宝转1万元到余额宝;

支付宝账户先扣除金额,MQ通知余额宝账户添加金额;

>> 支付宝账户表: update A set amount = amount - 10000 where userId = 1;

>> 余额宝账户表: update B set amount = amount + 10000 where userId = 1;

具体操作
1、创建队列
>> 创建一个持久化的队列,名称为money;

Durable(持久化保存),Transient(即时保存)。

2、 maven引入
(余额宝和支付宝两个服务都需要)
[Java] 纯文本查看 复制代码
<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-amqp</artifactId>

</dependency>


AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。



3、  添加配置
(余额宝和支付宝两个服务都需要)
[Java] 纯文本查看 复制代码
spring:    

    rabbitmq:

        host: 192.168.222.26 #MQ地址

        port: 5672 #MQ端口

        username: admin #MQ用户名

        password: admin #MQ密码

        publisher-confirms: true #开启消息发送成功监听

        publisher-returns: true #开启消息发送失败监听

        listener:

            simple:

                acknowledge-mode: manual #手动提交事务


4 支付宝端代码编写
向支付宝账号扣款,同时发消息到队列中,通知余额宝账号金额更新

>>  配置MQ发送过程监听
[Java] 纯文本查看 复制代码
package cn.my.server.clientdemo.zhifu;

import javax.annotation.PostConstruct;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 配置MQ发送过程监听
* @author twotiger-wxm.
* @date 2020-3-3.
*/
@Configuration
public class RabbmitMqConfig {

        @Autowired
        private RabbitTemplate rabbitTemplate;

        @PostConstruct
        public void init() {
            // 设置发送成功回调
            rabbitTemplate.setConfirmCallback(initConfirmCallback());
            // 设置发送失败回调
            rabbitTemplate.setReturnCallback(initReturnCallback());
        }

        @Bean
        public ReturnCallback initReturnCallback(){
            return new FailedListener();
        }

        @Bean
        public ConfirmCallback initConfirmCallback(){
            return new SuccessListener();
        }

}

[AppleScript] 纯文本查看 复制代码
package cn.my.server.clientdemo.zhifu;

import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;

import org.springframework.amqp.rabbit.support.CorrelationData;

/**

* 消息发送成功时触发

* 测试发现队列不存在也会触发,但会在FailedListener之后触发

*/

public class SuccessListener implements ConfirmCallback{

    @Override

    public void confirm(CorrelationData data, boolean success, String result) {

        System.out.println(data+","+success+","+result);

        if(null!=data){

            System.out.println("业务主键:"+data.getId());

        }

        if(success){   

            boolean resultT = true;// 获取流水表状态,为fail则失败(因为队列不存在也会触发,且在失败监听之后触发,所以此处优先排除一下失败监听的操作。)

            if(resultT){

                //更新支付宝账户金额

                System.out.println("消息发送到MQ成功");

                System.out.println("【支付宝账户扣款】  update A set amount = amount - 10000 where userId = 1");

            }else{

                System.out.println("消息发送到MQ失败");

                System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");

            }      

        }else{

            System.out.println("消息发送到MQ失败原因:"+result);

            System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");

        }      

    }

     

}

package cn.my.server.clientdemo.zhifu;

import java.util.HashMap;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;

import com.fasterxml.jackson.databind.ObjectMapper;

/**

* 消息发送失败时,监听被触发

* 一般像队列不存在,或者网络中断时会触发,且第一个触发

*/

public class FailedListener implements ReturnCallback{

    private ObjectMapper mapper = new ObjectMapper();

    @Override

    public void returnedMessage(Message message, int state, String result, String arg3, String queneName) {

        System.out.println(message+","+state+","+result+","+arg3+","+queneName);

         

        try {

            String msg = new String(message.getBody());

            HashMap data = mapper.readValue(msg, HashMap.class);// 失败的消息内容

            System.out.println("失败的转账记录:"+mapper.writeValueAsString(data));

            System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

     

}


>>  转账接口编写
[Java] 纯文本查看 复制代码
package cn.my.server.clientdemo.zhifu;

import java.util.HashMap;

import java.util.Map;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.support.CorrelationData;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.transaction.annotation.Transactional;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.databind.ObjectMapper;

/**

* 支付宝端业务测试

*/

@RestController

@RequestMapping("/api/test")

public class ZhiFuBaoResource {

    /**

     * 测试接口

     * @throws AmqpException

     * @throws JsonProcessingException

     */

    @GetMapping("/mq")

    public void trans1() throws AmqpException, JsonProcessingException {

        trans();

    }

    @Autowired

    private RabbitTemplate rabbitTemplate;

    private ObjectMapper mapper = new ObjectMapper();

    /**

     * 转账方法

     * @return

     * @throws AmqpException

     * @throws JsonProcessingException

     */

    @Transactional

    public boolean trans() throws AmqpException, JsonProcessingException {

        Map data = new HashMap<String, Object>();

        String transId = "00000001";//生成业务流水号

        data.put("transId", transId);

        data.put("userId", "1");

        data.put("money", 10000);

         

        //A_MESSAGE交易流水表,主键是【交易编号】,重复插入会报异常,类似于幂等操作

        System.out.println("【支付宝记账操作】重复插入会报异常,类似于幂等操作  insert into A_MESSAGE(transId,userId,money) values (000001,1,10000)");

            

        // 流水号传入,方便消息发送失败时做操作

        CorrelationData correlationData = new CorrelationData(transId);

        rabbitTemplate.convertAndSend("","money",mapper.writeValueAsString(data),correlationData);

        return true;

    }

}

5 余额宝端代码编写

监听支付宝发到消息队列中的消息,做余额宝账号金额更新

[Java] 纯文本查看 复制代码
package cn.my.server.clientdemo.yuer;

import java.io.IOException;

import java.util.Collections;

import java.util.HashMap;

import java.util.Map;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.retry.RecoveryCallback;

import org.springframework.retry.RetryCallback;

import org.springframework.retry.RetryContext;

import org.springframework.retry.backoff.FixedBackOffPolicy;

import org.springframework.retry.policy.SimpleRetryPolicy;

import org.springframework.retry.support.RetryTemplate;

import org.springframework.stereotype.Component;

import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.databind.ObjectMapper;

import com.rabbitmq.client.Channel;

/**

* 余额宝端消息监听

*/

@Component

public class YuErBaoMessageListeners {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    private ObjectMapper mapper = new ObjectMapper();



    /**

     * 监听消息队列

     * @param message  消息内容

     * @param channel  消息渠道

     * @throws IOException  异常

     */

    @RabbitListener(queues = "money")

    @RabbitHandler

    public void receiveQueue(Message message, Channel channel) throws IOException {

        String msg = "";

        try {

            // 业务处理逻辑

            msg = new String(message.getBody());

            Map data = mapper.readValue(msg, HashMap.class);

            retry(data);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 手动应答消息已经处理

        } catch (Exception e) {

            log.error("MQ接收消息内容[" + msg + "],后处理异常:" + e);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 手动应答消息已经处理

        }

    }



    /**

     * 更新余额宝账户金额

     *

     * @param map

     */

    @Transactional

    public void bizOp(Map<String, Object> map) {

        // B_MESSAGE交易流水表,主键是【交易编号】,重复插入会报异常,类似于幂等操作

        System.out.println(

            "【余额宝记账操作】重复插入会报异常,类似于幂等操作 insert into B_MESSAGE(transId,userId,money) values (000001,1,10000)");

        // 更新余额宝账户金额

        System.out.println("【余额宝账户入款】 update B set amount = amount +10000 where userId = 1");



    }



    /**

     * 一直报错,重试次数用完了,保存如下信息供人工干预

     *

     * @param map

     * @throws JsonProcessingException

     */

    public void failed(Map<String, Object> map) throws JsonProcessingException {

        System.out.println("一直报错,重试次数用完了,保存如下信息供人工干预:\n" + mapper.writeValueAsString(map));

    }



    /**

     * 异常时最多重试 3次,成功为止

     *

     * @param map

     *            输入参数

     */

    private void retry(Map<String, Object> map) {

        // 构建重试模板实例

        RetryTemplate retryTemplate = new RetryTemplate();

        // 设置重试次数

        SimpleRetryPolicy policy = new SimpleRetryPolicy(3,

                Collections.<Class<? extends Throwable>, Boolean>singletonMap(Exception.class, true));

        // 设置重试间隔时间

        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();

        fixedBackOffPolicy.setBackOffPeriod(100);

        retryTemplate.setRetryPolicy(policy);

        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        // 编写业务处理代码逻辑

        final RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {

            public Object doWithRetry(RetryContext context) throws Exception {

                System.out.println("第" + (1 + context.getRetryCount()) + "次处理");

                try {

                    bizOp(map);

                } catch (Exception e) {

                    e.printStackTrace();

                    throw new Exception("捕捉到业务处理异常,需要抛出");// 这个点特别注意,重试的根源通过Exception返回

                }

                return null;

            }

        };

        // 重试次数执行完依然报错,走如下逻辑

        final RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {

            public Object recover(RetryContext context) throws Exception {

                failed(map);

                return null;

            }

        };

        try {

            // 由retryTemplate 执行execute方法开始逻辑执行

            retryTemplate.execute(retryCallback, recoveryCallback);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

6 测试

>> 浏览器直接访问 /api/test/mq

>> 控制台结果如下:

[Java] 纯文本查看 复制代码
【支付宝记账操作】重复插入会报异常,类似于幂等操作  insert into A_MESSAGE(transId,userId,money) values (000001,1,10000)

【余额宝记账操作】重复插入会报异常,类似于幂等操作 insert into B_MESSAGE(transId,userId,money) values (000001,1,10000)

【余额宝账户入款】 update B set amount = amount + 10000 where userId = 1

【支付宝账户扣款】 update A set amount = amount - 10000 where userId = 1







欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2