黑马程序员技术交流社区

标题: 五、ActiveMQ添加了mysql的持久化后,发了消息,但是MSGS表中没有记录. [打印本页]

作者: y6814365    时间: 2018-5-11 10:58
标题: 五、ActiveMQ添加了mysql的持久化后,发了消息,但是MSGS表中没有记录.
1.持久化以后  activemq数据库 会创建3张表
<bean id="derby-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean>


在 producer 生产者 中需要设置

            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//            producer.setTimeToLive(10);

//            发送
            producer.send(textMessage);
//            producer.send(textMessage, DeliveryMode.PERSISTENT, 1, 60 * 60 * 24);

在consumer 消费者中 添加 注意 设置客户端id 需要在开启链接 前面
//设置客户端id
//设置客户端id
connection.setClientID("client-1");

connection.start();
//            final MessageConsumer messageConsumer = session.createConsumer(topic);//普通订阅
         MessageConsumer messageConsumer = session.createDurableSubscriber(topic,"bb"); //持久订阅



代码:

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @author Yang
* @create 2018-04-19 17:11
* 描述: 生产者
*/
public class TopicProducer {

    /**
     * 用户名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密码
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void send(String message) {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();

            connection.start();

            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);


            //创建队列
            final Topic topic = session.createTopic("topic");
            final MessageProducer producer = session.createProducer(topic);

            TextMessage textMessage = session.createTextMessage(message);

            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//            producer.setTimeToLive(10);

//            发送
            producer.send(textMessage);
//            producer.send(textMessage, DeliveryMode.PERSISTENT, 1, 60 * 60 * 24);

//            session.commit();

            producer.close();


            session.close();

            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        TopicProducer producer = new TopicProducer();
        producer.send("hello world");

    }


}

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @author Yang
* @create 2018-04-19 17:10
* 描述: 消费者
*/


public class TopicConsumer {

    /**
     * 用户名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密码
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void receive() {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();
             //设置客户端id
            connection.setClientID("client-1");

            connection.start();

            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //创建队列
            final Topic topic = session.createTopic("topic");

//            final MessageConsumer messageConsumer = session.createConsumer(topic);//普通订阅
            MessageConsumer messageConsumer = session.createDurableSubscriber(topic,"bb"); //持久订阅

            messageConsumer.setMessageListener(n -> {

                try {
                    TextMessage msg = (TextMessage) n;

                    final String text = msg.getText();

                    if (text.equalsIgnoreCase("hello world")) {
                        System.out.println("               接受信息:         " + msg.getText());
                    } else {
                        System.out.println("     测试重发次数 ");
                        int i = 1 / 0;
                    }


                } catch (JMSException e) {
//                    e.printStackTrace();
                }

            });

        } catch (JMSException e) {
//            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        TopicConsumer consumer = new TopicConsumer();
        consumer.receive();
    }


}





欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2