<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.222.26 #MQ地址
port: 5672 #MQ端口
username: admin #MQ用户名
password: admin #MQ密码
publisher-confirms: true #开启消息发送成功监听
publisher-returns: true #开启消息发送失败监听
listener:
simple:
acknowledge-mode: manual #手动提交事务
package cn.my.server.clientdemo.zhifu;
import javax.annotation.PostConstruct;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置MQ发送过程监听
* @author twotiger-wxm.
* @date 2020-3-3.
*/
@Configuration
public class RabbmitMqConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 设置发送成功回调
rabbitTemplate.setConfirmCallback(initConfirmCallback());
// 设置发送失败回调
rabbitTemplate.setReturnCallback(initReturnCallback());
}
@Bean
public ReturnCallback initReturnCallback(){
return new FailedListener();
}
@Bean
public ConfirmCallback initConfirmCallback(){
return new SuccessListener();
}
}
package cn.my.server.clientdemo.zhifu;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
/**
* 消息发送成功时触发
* 测试发现队列不存在也会触发,但会在FailedListener之后触发
*/
public class SuccessListener implements ConfirmCallback{
@Override
public void confirm(CorrelationData data, boolean success, String result) {
System.out.println(data+","+success+","+result);
if(null!=data){
System.out.println("业务主键:"+data.getId());
}
if(success){
boolean resultT = true;// 获取流水表状态,为fail则失败(因为队列不存在也会触发,且在失败监听之后触发,所以此处优先排除一下失败监听的操作。)
if(resultT){
//更新支付宝账户金额
System.out.println("消息发送到MQ成功");
System.out.println("【支付宝账户扣款】 update A set amount = amount - 10000 where userId = 1");
}else{
System.out.println("消息发送到MQ失败");
System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");
}
}else{
System.out.println("消息发送到MQ失败原因:"+result);
System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");
}
}
}
package cn.my.server.clientdemo.zhifu;
import java.util.HashMap;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* 消息发送失败时,监听被触发
* 一般像队列不存在,或者网络中断时会触发,且第一个触发
*/
public class FailedListener implements ReturnCallback{
private ObjectMapper mapper = new ObjectMapper();
@Override
public void returnedMessage(Message message, int state, String result, String arg3, String queneName) {
System.out.println(message+","+state+","+result+","+arg3+","+queneName);
try {
String msg = new String(message.getBody());
HashMap data = mapper.readValue(msg, HashMap.class);// 失败的消息内容
System.out.println("失败的转账记录:"+mapper.writeValueAsString(data));
System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");
} catch (Exception e) {
e.printStackTrace();
}
}
}
package cn.my.server.clientdemo.zhifu;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* 支付宝端业务测试
*/
@RestController
@RequestMapping("/api/test")
public class ZhiFuBaoResource {
/**
* 测试接口
* @throws AmqpException
* @throws JsonProcessingException
*/
@GetMapping("/mq")
public void trans1() throws AmqpException, JsonProcessingException {
trans();
}
@Autowired
private RabbitTemplate rabbitTemplate;
private ObjectMapper mapper = new ObjectMapper();
/**
* 转账方法
* @return
* @throws AmqpException
* @throws JsonProcessingException
*/
@Transactional
public boolean trans() throws AmqpException, JsonProcessingException {
Map data = new HashMap<String, Object>();
String transId = "00000001";//生成业务流水号
data.put("transId", transId);
data.put("userId", "1");
data.put("money", 10000);
//A_MESSAGE交易流水表,主键是【交易编号】,重复插入会报异常,类似于幂等操作
System.out.println("【支付宝记账操作】重复插入会报异常,类似于幂等操作 insert into A_MESSAGE(transId,userId,money) values (000001,1,10000)");
// 流水号传入,方便消息发送失败时做操作
CorrelationData correlationData = new CorrelationData(transId);
rabbitTemplate.convertAndSend("","money",mapper.writeValueAsString(data),correlationData);
return true;
}
}
监听支付宝发到消息队列中的消息,做余额宝账号金额更新
package cn.my.server.clientdemo.yuer;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
/**
* 余额宝端消息监听
*/
@Component
public class YuErBaoMessageListeners {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private ObjectMapper mapper = new ObjectMapper();
/**
* 监听消息队列
* @param message 消息内容
* @param channel 消息渠道
* @throws IOException 异常
*/
@RabbitListener(queues = "money")
@RabbitHandler
public void receiveQueue(Message message, Channel channel) throws IOException {
String msg = "";
try {
// 业务处理逻辑
msg = new String(message.getBody());
Map data = mapper.readValue(msg, HashMap.class);
retry(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 手动应答消息已经处理
} catch (Exception e) {
log.error("MQ接收消息内容[" + msg + "],后处理异常:" + e);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 手动应答消息已经处理
}
}
/**
* 更新余额宝账户金额
*
* @param map
*/
@Transactional
public void bizOp(Map<String, Object> map) {
// B_MESSAGE交易流水表,主键是【交易编号】,重复插入会报异常,类似于幂等操作
System.out.println(
"【余额宝记账操作】重复插入会报异常,类似于幂等操作 insert into B_MESSAGE(transId,userId,money) values (000001,1,10000)");
// 更新余额宝账户金额
System.out.println("【余额宝账户入款】 update B set amount = amount +10000 where userId = 1");
}
/**
* 一直报错,重试次数用完了,保存如下信息供人工干预
*
* @param map
* @throws JsonProcessingException
*/
public void failed(Map<String, Object> map) throws JsonProcessingException {
System.out.println("一直报错,重试次数用完了,保存如下信息供人工干预:\n" + mapper.writeValueAsString(map));
}
/**
* 异常时最多重试 3次,成功为止
*
* @param map
* 输入参数
*/
private void retry(Map<String, Object> map) {
// 构建重试模板实例
RetryTemplate retryTemplate = new RetryTemplate();
// 设置重试次数
SimpleRetryPolicy policy = new SimpleRetryPolicy(3,
Collections.<Class<? extends Throwable>, Boolean>singletonMap(Exception.class, true));
// 设置重试间隔时间
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(100);
retryTemplate.setRetryPolicy(policy);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
// 编写业务处理代码逻辑
final RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
public Object doWithRetry(RetryContext context) throws Exception {
System.out.println("第" + (1 + context.getRetryCount()) + "次处理");
try {
bizOp(map);
} catch (Exception e) {
e.printStackTrace();
throw new Exception("捕捉到业务处理异常,需要抛出");// 这个点特别注意,重试的根源通过Exception返回
}
return null;
}
};
// 重试次数执行完依然报错,走如下逻辑
final RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
public Object recover(RetryContext context) throws Exception {
failed(map);
return null;
}
};
try {
// 由retryTemplate 执行execute方法开始逻辑执行
retryTemplate.execute(retryCallback, recoveryCallback);
} catch (Exception e) {
e.printStackTrace();
}
}
}
>> 浏览器直接访问 /api/test/mq
>> 控制台结果如下:
【支付宝记账操作】重复插入会报异常,类似于幂等操作 insert into A_MESSAGE(transId,userId,money) values (000001,1,10000)
【余额宝记账操作】重复插入会报异常,类似于幂等操作 insert into B_MESSAGE(transId,userId,money) values (000001,1,10000)
【余额宝账户入款】 update B set amount = amount + 10000 where userId = 1
【支付宝账户扣款】 update A set amount = amount - 10000 where userId = 1
欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) | 黑马程序员IT技术论坛 X3.2 |