本帖最后由 执迷不悟 于 2019-2-14 15:08 编辑
RabbitMQ入门学习篇一
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。 一、RabbitMQ的5中消息模型 这五种模型分别为如下5中对应图片的1 2 3 4 5: 1.基本消息模型, “Hello World” 2.竞争消费者模式, Work queues 3.发布/订阅模型, Publish/Subscribe 4.路由模型, Routing 5.主题模型, Topics
1.基本消息模型官方文档说明: RabbitMQ是一个消息的代理者(Message Broker):它接收消息并且传递消息。你可以认为它是一个邮局:当你投递邮件到一个邮箱,你很肯定邮递员会终究会将邮件递交给你的收件人。与此类似,RabbitMQ 可以是一个邮箱、邮局、同时还有邮递员。不同之处在于:RabbitMQ不是传递纸质邮件,而是二进制的数据。 基本消息模型图:
在上图的模型中,有以下概念: - P:生产者,也就是要发送消息的程序 - C:消费者:消息的接受者,会一直等待消息到来。 - queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。 1.1 生产者生产消息[Java] 纯文本查看 复制代码 public class Producer1 {
//队列名称
private static final String QUEUE="helloWord";
public static void main(String[] args) throws IOException,TimeoutException {
Connection connection=null;
Channel channel=null;
try {
//创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//rabbitmq服务所在ip
connectionFactory.setHost("192.168.56.130");
//rabbitmq的连接端口
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");//账户名
connectionFactory.setPassword("guest");//密码
//当前账户绑定的虚拟主机
connectionFactory.setVirtualHost("/");
//获取连接
connection = connectionFactory.newConnection();
//创建与交换机的通道,可以创建多个,每一个通道代表一个会话任务
channel = connection.createChannel();
//绑定队列
channel.queueDeclare(QUEUE,true,false,false,null);
String message="hello: "+System.currentTimeMillis();
//发送消息
channel.basicPublish("",QUEUE,null,message.getBytes());
System.out.println("send+++++++++++++++++");
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.close();
connection.close();
}
}
}
1.1.1 绑定队列(queueDeclare)方法参数详解:声明队列: channel.queueDeclare(QUEUE,true,false,false,null);
param1(queue): 队列名称
param2(durable): 是否持久化,如果持久化,mq重启后队列还在
param3(exclusive): 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
param4(autoDelete): 队列不在使用的时候是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
param5(arguments): 队列参数,可以设置一个队列的扩展参数,比如:可设置存活时间 1.1.2 发送消息(basicPublish)方法参数详解:发送消息: channel.basicPublish("",QUEUE,null,message.getBytes()); param1: Exchange(交换机)的名称,如果没有指定使用默认的,default exchange交换机
param2: routingkey,消息的路由key, 交换机根据路由绑定的队列,发送消息
param3: 消息包含的属性
param4: 消息体
如果不指定交换机,消息会发送给默认交换机,队列也会绑定默认交换机,但是不能显示绑定或者解除绑定。如果使用默认交换机,routingkey等于队列名称。 1.1.1 运行生产者控制台结果:
Rabbitmq的web管理界面查看消息: 我们可以看到helloWord队列已经创建成功,并且有一条未读消息
消息预览: 我们可以通过管理界面进行消息预览,点击队列进入队列详情页面: 先后点击标注1和标注2,进行消息预览,Payload后面就是消息
1.2 消费者消费消息[Java] 纯文本查看 复制代码 public class consumer1 {
//队列名称
private static final String QUEUE="helloWord";
public static void main(String[] args) throws IOException,TimeoutException {
//创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.56.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE, true, false, false, null);
//定义消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,Envelope envelope,
AMQP.BasicProperties properties,byte[] body)throws IOException
{
//交换机
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息内容
String msg = new String(body, "utf-8");
System.out.println("receive message.." + msg);
}
};
/**
* 监听队列:
* 1.队列名称
* 2.是否自动回复,设置为true自动进行回复,mq接收到回复会删除消息
* 设置为false需要手动进行确认消息
* 3. 消费消息的方法,消费者接收到消息后调用此方法,获取消息
*/
boolean autoAck = true;
channel.basicConsume(QUEUE, autoAck, defaultConsumer);
}
}
1.2.1 运行消费者消费消息控制台结果:
这个时候rabbitmq管理界面的消息也会消失:
|