A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 执迷不悟 于 2019-3-26 17:08 编辑

              rabbitmq入门学习五


1. 主题模型(Topics)
               
主题模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队
看下图例子:
解释:
红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配到

业务需求:
  最常见的增删改,需要不同的系统去处理,
1.1 生产者代码
使用topic类型的Exchange,发送消息的routing key3种: item.isnertitem.updateitem.delete
[Java] 纯文本查看 复制代码
public class TopicsProducer {
    //队列名称
    private static final String QUEUE_INFORM_1 = "topic_exchange_queue_1";
    private static final String QUEUE_INFORM_2 = "topic_exchange_queue_2";
    private static final String QUEUE_INFORM_3 = "topic_exchange_queue_3";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_UPDATE="item.update";
    private static final String ROUTINGKEY_DELETE="item.delete";
    private static final String ROUTINGKEY_INSTALL="item.install";
    private static final String ROUTINGKEY_ALL="item.#";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //建立新连接
            connection = ConnectionUtil.getConnection();
            //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_INFORM_1, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_2, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_3, true, false, false, null);
            //声明一个交换机
            channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
            //进行交换机和队列绑定
            channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_UPDATE);
            channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_DELETE);
            channel.queueBind(QUEUE_INFORM_2, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_INSTALL);
            channel.queueBind(QUEUE_INFORM_3, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_ALL);
            //发送消息
            //发送消息的时候指定routingKey
            String message = "新增商品";
            channel.basicPublish(EXCHANGE_TOPICS_INFORM, "item.install", null, message.getBytes());
//                String message = "删除商品";
//                channel.basicPublish(EXCHANGE_TOPICS_INFORM, "item.delete", null, message.getBytes());
//                String message = "修改商品";
//                channel.basicPublish(EXCHANGE_TOPICS_INFORM, "item.update", null, message.getBytes());
            System.out.println("send to mq " + message);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭连接
            //先关闭通道
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

1.2 消费者代码1
消费者1代码,接收两种路由模式item.updateitem.delete
[Java] 纯文本查看 复制代码
public class TopicsComsumer1 {
    //队列名称
    private static final String QUEUE_INFORM_1 = "topic_exchange_queue_1";;
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_UPDATE="item.update";
    private static final String ROUTINGKEY_DELETE="item.delete";


    public static void main(String[] args) throws Exception {
        //建立新连接
        Connection connection = ConnectionUtil.getConnection();
        //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_INFORM_1,true,false,false,null);
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        //进行交换机和队列绑定
        channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_UPDATE);
        channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_DELETE);
        //实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //消息内容
                String message= new String(body,"utf-8");
                System.out.println("【消费者1】:"+message);
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_INFORM_1,true,defaultConsumer);

    }
}

1.3 消费者代码2
  消费者2代码,只接收item.Install
[Java] 纯文本查看 复制代码
public class TopicsComsumer2 {
    //队列名称
    private static final String QUEUE_INFORM_2 = "topic_exchange_queue_2";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_INSTALL="item.install";

    public static void main(String[] args) throws Exception {
        //建立新连接
        Connection connection = ConnectionUtil.getConnection();
        //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_INFORM_2,true,false,false,null);
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        //进行交换机和队列绑定
        channel.queueBind(QUEUE_INFORM_2, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_INSTALL);

        //实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //消息内容
                String message= new String(body,"utf-8");
                System.out.println("【消费者2】:"+message);
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_INFORM_2,true,defaultConsumer);

    }
}
1.4 消费者代码3
消费者代码3,只接收item.#,只要以item.开头的路由key该消费者都可以接收
[Java] 纯文本查看 复制代码
public class TopicsComsumer3 {
    //队列名称
    private static final String QUEUE_INFORM_3 = "topic_exchange_queue_3";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_ALL="item.#";

    public static void main(String[] args) throws Exception {
        //建立新连接
        Connection connection = ConnectionUtil.getConnection();
        //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_INFORM_3,true,false,false,null);
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        //进行交换机和队列绑定
        channel.queueBind(QUEUE_INFORM_3, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_ALL);
        //实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //消息内容
                String message= new String(body,"utf-8");
                System.out.println("【消费者3】:"+message);
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_INFORM_3,true,defaultConsumer);

    }
}

1.5 测试
运行生产者代码测试,我们发现交换机绑定的队列中routingkey有值了,分别为:
Item.deleteitem.updateitem.installitem.#
   

然后我们继续运行三个消费者查看消息接收情况:通过下面三张消费者截图,我们可以看出,只有绑定了item.installitem.#的队列可以接收消息,而队列1绑定的是item.deleteitem.update无法接收消息。
消费者1
消费者2
消费者3
   

2 个回复

倒序浏览
香港作为世界三大金融中心之一,经济形势已经经过爆发式发展的时期,如今的香港,经济贸易依然处于领先位置,并且在不断地平稳发展。而之所以有那么多人认为香港的经济形势不乐观,主要是因为近几年国内的发展势头太过迅猛,尤其是房地产这一块,两相对比之下,香港平稳的发展才不那么显眼。为吸引相关世界各地人才来港,我们将通过“优秀人才入境计划”按现时年度1000的配额,为人才清单下合资格人士提供入境便利。在“优秀人才入境计划”下,获批准的申请者无须在来港定居前先获得本地僱主聘任。符合人才清单相关专业资格的申请者,可在“优秀人才入境计划”的“综合计分制”下获得额外分数。香港公布人才清单,多项优惠就等你来!香港优才计划申请条件评估网址(http://www.galaxy-immi.com/obscure/assessment/1.html?pla=sq&spreadword=heim)
回复 使用道具 举报
奈斯,感谢分享
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马