[Java] 纯文本查看 复制代码
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_dlx_exchange";
String routingkey = "dlx.dlx";
String msg = "test dlx message";
for (int i = 0; i < 3; i++) {
// deliveryMode=2 持久化,expiration 消息有效时间
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("utf-8")
.expiration("7000")
.build();
channel.basicPublish(exchangeName, routingkey, true, properties, msg.getBytes());
}
}
}
[Java] 纯文本查看 复制代码
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_dlx_exchange";
String routingkey = "dlx.#";
String queueName = "test_dlx_queueName";
Map<String,Object>map =new HashMap<>();
//注意: x-dead-letter-exchange 这个key是固定这样写的,value是你自定义的。
map.put("x-dead-letter-exchange","dlx.exchange");
//声明队列
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
//注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
channel.queueDeclare(queueName, true, false, false, map);
channel.queueBind(queueName, exchangeName, routingkey);
//死信队列声明
channel.exchangeDeclare("dlx.exchange","topic",true,false,null);
channel.queueDeclare("dlx.queue",true,false,false,null);
//routingkey指定为#就行,表示只要路由到死信队列的都接收
channel.queueBind("dlx.queue","dlx.exchange","#");
channel.basicConsume(queueName, true, new TestConsumer(channel));
}
}