rabbitmq
rabbitmq模式
Direct模式相当于一对一模式,一个消息被发送者发送后,会被转发到一个绑定的消息队列中,然后被一个接收者接收!
topic和direct的区别,没有什么大的区别,就是binding key和routing key的匹配方式可以通过通配符的方式,也就是说路由模式是topic模式的一个特例。
Fanout Exchange形式又叫广播形式,因此我们发送到路由器的消息会使得绑定到该路由器的每一个Queue接收到消息,这个时候就算指定了Key,或者规则(即convertAndSend方法的参数2),也会被忽略!
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
流程
application.yml配置文件
spring:
rabbitmq:
host: rabbitmq.qfjava.cn
port: 8800
virtual-host: /test
username: nore
password: 2090
MassageLisrener 消息监听器
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "jla") //声明当前类是用于接收指定队列消息的,必须在方法上面添加RabbitHandler注解
public class MessageListener {
@RabbitListener(queues = "jla",autoStartup = "true")//指定当前方法是用于处理哪个队列的消息的,不需要在类上面添加注解
public void laowangkaishiyuehuileyebuzhidaohuiquyueshui(String message) {
System.err.println("收到了消息====>" + message);
}
@RabbitHandler //需要在类上面添加@RabbitListener(queues = "shenmemingzi")
public void laowangyouyueleyigeren(String message) {
System.err.println("handler收到的消息" + message);
}
}
RabbitMQConfig 配置文件
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class RabbitMQConfig {
@Bean
public Queue queue1() {
return new Queue("jla");
}
// @Bean
// public Queue queue2() {
// return new Queue("shenmemingzi2");
// }
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost("baseservice.qfjava.cn");
cachingConnectionFactory.setPort(8800);
cachingConnectionFactory.setUsername("nore");
cachingConnectionFactory.setPassword("2090");
cachingConnectionFactory.setVirtualHost("/test");
return cachingConnectionFactory;
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("springbootfanout");
}
@Bean
public Binding bindFanout(FanoutExchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange);//绑定指定队列到指定的交换机
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("springbootdirect");
}
@Bean
public Binding bindDirect(DirectExchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("chifan");//绑定并且指定key
}
// @Bean
// public Binding bindDirect2(DirectExchange exchange, Queue queue) {
// return BindingBuilder.bind(queue).to(exchange).with("chifan2");//绑定并且指定key
// }
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("springboottopic");
}
@Bean
public Binding bindTopic(TopicExchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("*.chifan");
}
}
Sender 发送消息
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private AmqpTemplate template;
public void sendMessage(String queue, String message) {
template.convertAndSend(queue,message);
}
}
TestMain 启动测试
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class TestMain {
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class);
Sender sender= context.getBean(Sender.class);
sender.sendMessage("jla", "一起吃饭去");
}
}
|
|