A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 大山哥哥 于 2019-1-27 23:49 编辑

        spring boot在企业应用中越来越广泛,其整合各种优秀开源框架的方式也及其简单,下面我们使用Spring Boot来整合ActiveMQ,完成产出Queue类型和Topic类型消息的功能。        需求描述如下
                发送一条消息内容为"QUEUE消息内容"的文本消息到名字为"queue_msg"的Queue中,使用两个监听者监听该Queue,查看消息处理效果;
                发送一条消息内容为"TOPIC消息内容"的文本消息到名字为"topic_msg"的Topic中,使用两个监听者监听该Topic,查看消息处理效果。
        开发顺序如下
                1.搭建spring boot项目
                   在pom.xml中继承springboot的起步父依赖,引入springboot整合activemq的起步依赖、测试起步依赖
[XML] 纯文本查看 复制代码
<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
    </parent>
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

                2.编写入口main方法
[Java] 纯文本查看 复制代码
@SpringBootApplication
public class SpringbootMqApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringbootMqApplication.class, args);
    }
}

                3.开发配置类
                    3.1 连接工厂配置类
[Java] 纯文本查看 复制代码
/**
 * Destination bean配置类
 */
@Configuration
public class ProducerConfig {
    //创建名为queue_msg的QUEUE
    @Bean
    public Queue createQueue(){
        return new ActiveMQQueue("queue_msg");
    }

    //创建名为topic_msg的TOPIC
    @Bean
    public Topic createTopic(){
        return new ActiveMQTopic("topic_msg");
    }
}

                    3.2 Queue和Topic配置类
[Java] 纯文本查看 复制代码
@Configuration
public class ProducerConfig {
    @Bean
    public Queue createQueue(){
        return new ActiveMQQueue("queue_msg");
    }

    @Bean
    public Topic createTopic(){
        return new ActiveMQTopic("topic_msg");
    }
}

                    3.3 监听Queue和Topic的监听器容器配置类
[Java] 纯文本查看 复制代码
@Configuration
public class ListenerContainerConfig {

    //配置监听Queue的监听容器工程
    @Bean("queueListenerContainer")
    public JmsListenerContainerFactory queueListenerContainer(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(false);//监听的是Queue
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    //配置监听Topic的监听容器工程
    @Bean("topicListenerContainer")
    public JmsListenerContainerFactory topicListenerContainer(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);//监听的是Topic
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }
}

                4.开发消息消费者的监听者
[AppleScript] 纯文本查看 复制代码
/**
 * 消费者监听器
 */
@Component
public class Listener {

    //监听名为queue_msg Queue的消息,监听类型由queueListenerContainer决定
    @JmsListener(destination = "queue_msg",containerFactory = "queueListenerContainer")
    public void consumeQueue1(String msg){
        System.out.println("queue_msg1获得的消息是" + msg);
    }
    //监听名为queue_msg Queue的消息,监听类型由queueListenerContainer决定
    @JmsListener(destination = "queue_msg",containerFactory = "queueListenerContainer")
    public void consumeQueue2(String msg){
        System.out.println("queue_msg2获得的消息是" + msg);
    }


    //监听名为topic_msg Topic的消息,监听类型由topicListenerContainer决定
    @JmsListener(destination = "topic_msg",containerFactory = "topicListenerContainer")
    public void consumeTopic1(String msg){
        System.out.println("topic_msg1获得的消息是" + msg);
    }
    //监听名为topic_msg Topic的消息,监听类型由topicListenerContainer决定
    @JmsListener(destination = "topic_msg",containerFactory = "topicListenerContainer")
    public void consumeTopic2(String msg){
        System.out.println("topic_msg2获得的消息是" + msg);
    }
}

以上配置完毕整合,即可以开发一个测试类来向Queue和Topic中发送消息,并查看消息的消费情况
                5.开发测试类
[Java] 纯文本查看 复制代码
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootMqApplicationTests {


    @Autowired
    private Queue queue;//注入配置好的Queue

    @Autowired
    private Topic topic;//注入配置好的Topic

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;//使用该模板类发送消息

    @Test
    public void sendMessage() {
        //向Queue中发送文本消息
        jmsMessagingTemplate.convertAndSend(queue,"\"QUEUE消息内容\"");
        //向Topic中发送文本消息
        jmsMessagingTemplate.convertAndSend(topic,"\"TOPIC消息内容\"");
    }
}

                6.测试结果
[Java] 纯文本查看 复制代码
queue_msg1获得的消息是"QUEUE消息内容"
topic_msg1获得的消息是"TOPIC消息内容"
topic_msg2获得的消息是"TOPIC消息内容"
                7.总结
         可以通过结果打印看出Queue消息被接收到了,并且只有一个消费者进行消费,而Topic消息被接受到了,两个消费者同时处理了。
         由于springboot整合ActiveMQ之后默认只能处理Queue中的消息,虽然可以通过配置application.properties中的spring.jms.pub-sub-domain=true来改变消息类型为Topic但是也只是单一的一种,所以需要通过自定义的方式来完成两种类型的使用。
详细代码参见附件
springboot_mq.zip (54.44 KB, 下载次数: 284)





1 个回复

倒序浏览
一个人一座城0.0 来自手机 中级黑马 2019-1-28 09:52:14
沙发
看一看。
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马