本帖最后由 大山哥哥 于 2019-1-27 23:49 编辑
spring boot在企业应用中越来越广泛,其整合各种优秀开源框架的方式也及其简单,下面我们使用Spring Boot来整合ActiveMQ,完成产出Queue类型和Topic类型消息的功能。 需求描述如下:
发送一条消息内容为"QUEUE消息内容"的文本消息到名字为"queue_msg"的Queue中,使用两个监听者监听该Queue,查看消息处理效果;
发送一条消息内容为"TOPIC消息内容"的文本消息到名字为"topic_msg"的Topic中,使用两个监听者监听该Topic,查看消息处理效果。
开发顺序如下:
1.搭建spring boot项目
在pom.xml中继承springboot的起步父依赖,引入springboot整合activemq的起步依赖、测试起步依赖
[XML] 纯文本查看 复制代码 <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.编写入口main方法
[Java] 纯文本查看 复制代码 @SpringBootApplication
public class SpringbootMqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootMqApplication.class, args);
}
}
3.开发配置类
3.1 连接工厂配置类
[Java] 纯文本查看 复制代码 /**
* Destination bean配置类
*/
@Configuration
public class ProducerConfig {
//创建名为queue_msg的QUEUE
@Bean
public Queue createQueue(){
return new ActiveMQQueue("queue_msg");
}
//创建名为topic_msg的TOPIC
@Bean
public Topic createTopic(){
return new ActiveMQTopic("topic_msg");
}
}
3.2 Queue和Topic配置类
[Java] 纯文本查看 复制代码 @Configuration
public class ProducerConfig {
@Bean
public Queue createQueue(){
return new ActiveMQQueue("queue_msg");
}
@Bean
public Topic createTopic(){
return new ActiveMQTopic("topic_msg");
}
}
3.3 监听Queue和Topic的监听器容器配置类
[Java] 纯文本查看 复制代码 @Configuration
public class ListenerContainerConfig {
//配置监听Queue的监听容器工程
@Bean("queueListenerContainer")
public JmsListenerContainerFactory queueListenerContainer(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(false);//监听的是Queue
bean.setConnectionFactory(connectionFactory);
return bean;
}
//配置监听Topic的监听容器工程
@Bean("topicListenerContainer")
public JmsListenerContainerFactory topicListenerContainer(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);//监听的是Topic
bean.setConnectionFactory(connectionFactory);
return bean;
}
}
4.开发消息消费者的监听者
[AppleScript] 纯文本查看 复制代码 /**
* 消费者监听器
*/
@Component
public class Listener {
//监听名为queue_msg Queue的消息,监听类型由queueListenerContainer决定
@JmsListener(destination = "queue_msg",containerFactory = "queueListenerContainer")
public void consumeQueue1(String msg){
System.out.println("queue_msg1获得的消息是" + msg);
}
//监听名为queue_msg Queue的消息,监听类型由queueListenerContainer决定
@JmsListener(destination = "queue_msg",containerFactory = "queueListenerContainer")
public void consumeQueue2(String msg){
System.out.println("queue_msg2获得的消息是" + msg);
}
//监听名为topic_msg Topic的消息,监听类型由topicListenerContainer决定
@JmsListener(destination = "topic_msg",containerFactory = "topicListenerContainer")
public void consumeTopic1(String msg){
System.out.println("topic_msg1获得的消息是" + msg);
}
//监听名为topic_msg Topic的消息,监听类型由topicListenerContainer决定
@JmsListener(destination = "topic_msg",containerFactory = "topicListenerContainer")
public void consumeTopic2(String msg){
System.out.println("topic_msg2获得的消息是" + msg);
}
}
以上配置完毕整合,即可以开发一个测试类来向Queue和Topic中发送消息,并查看消息的消费情况
5.开发测试类
[Java] 纯文本查看 复制代码 @RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootMqApplicationTests {
@Autowired
private Queue queue;//注入配置好的Queue
@Autowired
private Topic topic;//注入配置好的Topic
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;//使用该模板类发送消息
@Test
public void sendMessage() {
//向Queue中发送文本消息
jmsMessagingTemplate.convertAndSend(queue,"\"QUEUE消息内容\"");
//向Topic中发送文本消息
jmsMessagingTemplate.convertAndSend(topic,"\"TOPIC消息内容\"");
}
}
6.测试结果:
[Java] 纯文本查看 复制代码 queue_msg1获得的消息是"QUEUE消息内容"
topic_msg1获得的消息是"TOPIC消息内容"
topic_msg2获得的消息是"TOPIC消息内容" 7.总结:
可以通过结果打印看出Queue消息被接收到了,并且只有一个消费者进行消费,而Topic消息被接受到了,两个消费者同时处理了。
由于springboot整合ActiveMQ之后默认只能处理Queue中的消息,虽然可以通过配置application.properties中的spring.jms.pub-sub-domain=true来改变消息类型为Topic但是也只是单一的一种,所以需要通过自定义的方式来完成两种类型的使用。
详细代码参见附件
springboot_mq.zip
(54.44 KB, 下载次数: 282)
|