A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 longxf_sjz 于 2019-8-11 10:08 编辑

1.       Spring整合JMS
1.1发布订阅模式
1.1.1消息生产者
    创建maven工程,添加SpringJms 、activeMQ依赖
[XML] 纯文本查看 复制代码
<dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>                
        </dependency>
<dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-client</artifactId>
                <version>5.13.4</version>
        </dependency>
创建spring配置文件
[XML] 纯文本查看 复制代码
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.25.135:61616"/>  
        </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean> 
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <property name="connectionFactory" ref="connectionFactory"/>  
        </bean>      
    <!--这个是队列目的地,导入索引库-->  
        <bean id="queueSolrDestination" class="org.apache.activemq.command.ActiveMQQueue">  
            <constructor-arg value="pinyougou_queue_solr"/>  
        </bean> 
1.1.2 消息生产者自定义类发送消息
[Java] 纯文本查看 复制代码
jmsTemplate.send(queueSolrDestination, new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(jsonString);
    }
                                });
1.1.3消息消费者
    创建maven工程,添加SpringJms 、activeMQ依赖
[XML] 纯文本查看 复制代码
<dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>                
        </dependency>
<dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-client</artifactId>
                <version>5.13.4</version>
        </dependency>
创建spring配置文件
[XML] 纯文本查看 复制代码
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.25.135:61616"/>  
        </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean> 
    <!--这个是队列目的地,导入索引库-->  
        <bean id="queueSolrDestination" class="org.apache.activemq.command.ActiveMQQueue">  
            <constructor-arg value="pinyougou_queue_solr"/>  
        </bean> 
        <!-- 消息监听容器  导入索引库-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                <property name="connectionFactory" ref="connectionFactory" />
                <property name="destination" ref="queueSolrDestination" />
                <property name="messageListener" ref="itemSearchListener" />
        </bean>
1.1.4   消息消费方自定义监听类
[Java] 纯文本查看 复制代码
@Component
public class ItemSearchListener implements MessageListener {
        @Autowired
        private ItemSearchService itemSearchService;
        @Override
        public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                        String text = textMessage.getText();//json字符串
                        System.out.println("监听到消息:"+text);
                        List<TbItem> itemList = JSON.parseArray(text, TbItem.class);
                        itemSearchService.importList(itemList);
                        System.out.println("导入到solr索引库");
                } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
        }
}
2 ActiveMQ管理平台
列表各列信息含义如下:
Number Of Pending Messages当前未出队列的数量。
Number Of Consumers这个是正在监听的消费者数量
Messages Enqueued进入队列的消息  进入队列的总数量,包括出队列的。
MessagesDequeued出了队列的消息  可以理解为是消费这消费掉的数量。

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马