本帖最后由 执迷不悟 于 2019-3-26 17:08 编辑
rabbitmq入门学习五
1. 主题模型(Topics)
主题模式: 1、每个消费者监听自己的队列,并且设置带统配符的routingkey。 2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队 看下图例子: 解释: 红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到 黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配到
业务需求: 最常见的增删改,需要不同的系统去处理, 1.1 生产者代码使用topic类型的Exchange,发送消息的routing key有3种: item.isnert、item.update、item.delete [Java] 纯文本查看 复制代码 public class TopicsProducer {
//队列名称
private static final String QUEUE_INFORM_1 = "topic_exchange_queue_1";
private static final String QUEUE_INFORM_2 = "topic_exchange_queue_2";
private static final String QUEUE_INFORM_3 = "topic_exchange_queue_3";
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
private static final String ROUTINGKEY_UPDATE="item.update";
private static final String ROUTINGKEY_DELETE="item.delete";
private static final String ROUTINGKEY_INSTALL="item.install";
private static final String ROUTINGKEY_ALL="item.#";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//建立新连接
connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
channel = connection.createChannel();
channel.queueDeclare(QUEUE_INFORM_1, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_2, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_3, true, false, false, null);
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//进行交换机和队列绑定
channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_UPDATE);
channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_DELETE);
channel.queueBind(QUEUE_INFORM_2, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_INSTALL);
channel.queueBind(QUEUE_INFORM_3, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_ALL);
//发送消息
//发送消息的时候指定routingKey
String message = "新增商品";
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "item.install", null, message.getBytes());
// String message = "删除商品";
// channel.basicPublish(EXCHANGE_TOPICS_INFORM, "item.delete", null, message.getBytes());
// String message = "修改商品";
// channel.basicPublish(EXCHANGE_TOPICS_INFORM, "item.update", null, message.getBytes());
System.out.println("send to mq " + message);
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭连接
//先关闭通道
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
1.2 消费者代码1消费者1代码,接收两种路由模式item.update和item.delete [Java] 纯文本查看 复制代码 public class TopicsComsumer1 {
//队列名称
private static final String QUEUE_INFORM_1 = "topic_exchange_queue_1";;
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
private static final String ROUTINGKEY_UPDATE="item.update";
private static final String ROUTINGKEY_DELETE="item.delete";
public static void main(String[] args) throws Exception {
//建立新连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_INFORM_1,true,false,false,null);
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//进行交换机和队列绑定
channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_UPDATE);
channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_DELETE);
//实现消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息内容
String message= new String(body,"utf-8");
System.out.println("【消费者1】:"+message);
}
};
//监听队列
channel.basicConsume(QUEUE_INFORM_1,true,defaultConsumer);
}
}
1.3 消费者代码2 消费者2代码,只接收item.Install [Java] 纯文本查看 复制代码 public class TopicsComsumer2 {
//队列名称
private static final String QUEUE_INFORM_2 = "topic_exchange_queue_2";
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
private static final String ROUTINGKEY_INSTALL="item.install";
public static void main(String[] args) throws Exception {
//建立新连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_INFORM_2,true,false,false,null);
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//进行交换机和队列绑定
channel.queueBind(QUEUE_INFORM_2, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_INSTALL);
//实现消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息内容
String message= new String(body,"utf-8");
System.out.println("【消费者2】:"+message);
}
};
//监听队列
channel.basicConsume(QUEUE_INFORM_2,true,defaultConsumer);
}
} 1.4 消费者代码3消费者代码3,只接收item.#,只要以item.开头的路由key该消费者都可以接收 [Java] 纯文本查看 复制代码 public class TopicsComsumer3 {
//队列名称
private static final String QUEUE_INFORM_3 = "topic_exchange_queue_3";
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
private static final String ROUTINGKEY_ALL="item.#";
public static void main(String[] args) throws Exception {
//建立新连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_INFORM_3,true,false,false,null);
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//进行交换机和队列绑定
channel.queueBind(QUEUE_INFORM_3, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_ALL);
//实现消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息内容
String message= new String(body,"utf-8");
System.out.println("【消费者3】:"+message);
}
};
//监听队列
channel.basicConsume(QUEUE_INFORM_3,true,defaultConsumer);
}
}
1.5 测试运行生产者代码测试,我们发现交换机绑定的队列中routingkey有值了,分别为: Item.delete,item.update,item.install,item.#。
然后我们继续运行三个消费者查看消息接收情况:通过下面三张消费者截图,我们可以看出,只有绑定了item.install和item.#的队列可以接收消息,而队列1绑定的是item.delete和item.update无法接收消息。 消费者1: 消费者2: 消费者3:
|