A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 执迷不悟 于 2019-2-20 18:18 编辑

                       rabbitmq入门学习二

1. 竞争消费者模式(Work queues)
竞争消费者模型图:
                       
  work queues与基本消息模型相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
1.1 编写工具类
rabbitmq连接代码提取成工具类,减少代码冗余
[Java] 纯文本查看 复制代码
public class ConnectionUtil {[/align]    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("192.168.56.130");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}
1.2 生产者
[Java] 纯文本查看 复制代码
public class WorkProducer {

    //队列名称
    private static final String QUEUE="work";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection=null;
        Channel channel=null;
        try {
            connection = ConnectionUtil.getConnection();
            channel  = connection.createChannel();
            channel.queueDeclare(QUEUE,true,false,false,null);
            //循环发送多条消息
            for (int i = 0; i < 50; i++) {
                // 消息内容
                String message = "task .. " + i;
                channel.basicPublish("", QUEUE, null, message.getBytes());
                System.out.println(" [x] send '" + message + "'");
                Thread.sleep(i * 2);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            channel.close();
            connection.close();
        }
    }
}

1.3 消费者1
[Java] 纯文本查看 复制代码
public class WorkConsumer1 {
    //队列名称
    private static final String QUEUE="work";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE, true, false, false, null);
        //定义消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("receive message.." + msg);
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE, autoAck, defaultConsumer);
    }
}


1.4 消费者2
消费者2和消费者1基本一样,唯一不同的是在获取消息后加了一个Thread.sleep(1000);模拟消费耗时
[Java] 纯文本查看 复制代码
public class WorkConsumer2 {
    //队列名称
    private static final String QUEUE="work";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE, true, false, false, null);
        //定义消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("receive message.." + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE, autoAck, defaultConsumer);
    }
}

1.5测试和问题解决:
   1、分别启动消费者1和消费者2
   2、启动生产者发送多个消息。
结果:
  1、一条消息只会被一个消费者接收;
  2rabbit采用轮询的方式将消息是平均发送给消费者的;
  3、消费者在处理完某条消息后,才会收到下一条消息。
结果:
存在问题?
  1. 消费者2比消费者1的效率要低,一次任务的耗时较长,然而两人最终消费的消息数量是一样的
  2. 消费者1大量时间处于空闲状态,消费者2一直忙碌
  3. 这种情况它不是一个竞争关系,而是采用轮询的方式将消息是平均发送给消费者的。
问题解决:
  我们可以通过channel.basicQos(1);设置rabbitmq消费信息的条数来处理完成竞争消费模式,rabbitmq一次只会发送一条消息给该消费者,只有等消费者确认消费后才会发送第二条消息

修改代码如下:
[Java] 纯文本查看 复制代码
public class WorkConsumer2 {
    //队列名称
    private static final String QUEUE="work";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE, true, false, false, null);

//设置每次消费消息条数
        channel.basicQos(1);
        //定义消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("【消费者2】receive: " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE, autoAck, defaultConsumer);
    }
}

重新运行测试效果:
  从下图我们看以看出消费者2只消费了3条消息,消费者1消费了余下的所有消息,这才是竞争消费模式,处理快的多消费,处理慢的少消费。


1 个回复

倒序浏览
一个人一座城0.0 来自手机 中级黑马 2019-2-23 19:15:31
沙发
看一看。
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马