1. 程式人生 > >ActiveMQ入門例項,以及類封裝

ActiveMQ入門例項,以及類封裝

這篇文章適合已經搭建好了activeMQ環境的人,需要封裝下activeMQ基本功能的人。封裝的不好,僅作參考。
package com.quhuhu.sync.util;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 
 * @author liang.he
 *
 */
public class ActiveMqCreater {
    //queue的名字,這個對應你在activeMQ上面建立的Queue的名字
    public static String subject = "FirstQueue";
    // ConnectionFactory :連線工廠,JMS 用它建立連線
    public static ConnectionFactory connectionFactory = null;
    // Connection :JMS 客戶端到JMS Provider 的連線
    public static Connection connection = null;
    // Session: 一個傳送或接收訊息的執行緒
    private Session session = null;
    // Destination :訊息的目的地;訊息傳送給誰.
    public static Destination destination = null;
    // MessageProducer:訊息傳送者
    private MessageProducer producer = null;
    //MessageConsumer : 訊息接收者
    private MessageConsumer receiver = null;
    
    static {
        connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 
     * @return 初始化訊息傳送者
     * @throws Exception
     */
    public MessageProducer initMessageProducer() throws Exception{
        session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        //訊息目的地
        destination = session.createQueue(subject);
        // 得到訊息【傳送者】
        producer = session.createProducer(destination);
        // 設定不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        return producer;
    }
    
    /**
     * 
     * @return 初始化訊息接收者
     * @throws JMSException
     */
    public MessageConsumer initMessageConsumer() throws JMSException {
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        //訊息目的地
        destination = session.createQueue(subject);
        return session.createConsumer(destination);
    }
     
    /**
     * 訊息傳送方法
     * @param msg 訊息
     * @throws JMSException
     */
    public void sendMsg(String msg) throws JMSException {
        try {
            System.out.println("fasongle ");
            initMessageProducer().send(session.createTextMessage(msg));
            session.commit();
            System.out.println("傳送訊息:" + msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 訊息接受方法
     * @return msg 訊息
     * @throws JMSException
     */
    public String receiveMsg(MessageConsumer consumer) throws JMSException {
        TextMessage message = (TextMessage)consumer.receive();
        if (null != message) {
            return message.getText();
        }
        return null;
    }
    
    /**
     * 訊息接受方法
     * @return msg 訊息
     * @throws JMSException
     */
    public String receiveMsg(MessageConsumer consumer, long time) throws JMSException {
        TextMessage message = (TextMessage)consumer.receive(time);
        if (null != message) {
            return message.getText();
        }
        return null;
    }
    
    /**
     * 釋放MessageConsumer資源
     * @throws JMSException
     */
    public void close(MessageConsumer receiver) throws JMSException {
        System.out.println("close JMSconnection!");
        if(null != receiver) {
            receiver.close();
        }
    }
    
    /**
     * 釋放MessageProducer資源
     * @throws JMSException
     */
    public void close(MessageProducer producer) throws JMSException {
        System.out.println("close JMSconnection!");
        if(null != producer) {
            producer.close();
        }
    }
    
    /**
     * 釋放Session資源
     * @throws JMSException
     */
    public void close(Session session) throws JMSException {
        System.out.println("close JMSconnection!");
        if(null != session) {
            session.close();
        }
    }
    
    /**
     * 釋放Connection資源
     * @throws JMSException
     */
    public void close(Connection connection) throws JMSException {
        System.out.println("close JMSconnection!");
        if(null != connection) {
            connection.close();
        }
    }
    
}
封裝的ActiveMQ類
下面是測試接受類:
import javax.jms.JMSException;
import javax.jms.MessageConsumer;

public class TestReceive {
    public static void main(String[] args) throws JMSException {
        JMSUtil js = new JMSUtil();
        MessageConsumer consumer = js.initMessageConsumer();
        while (true) {
            //設定接收者接收訊息的時間,為了便於測試,這裡誰定為100s
            System.out.println("收到訊息" + js.receiveMsg(consumer, 50000));
        }
    }
}

下面是測試傳送類:

import javax.jms.JMSException;

public class TestSend {
    public static void main(String[] args) throws JMSException {
        JMSUtil js = new JMSUtil();
        js.sendMsg("11111111111111ssssssssssssssssssssssassssssssssss");
        js.sendMsg("2222222222sssssssssssssssssss2ssssssssss");
        js.sendMsg("333333333333sssssssssssssssssssdssssssssss");
        js.sendMsg("444444444sssssssssssssssssssssssssssss");
        js.sendMsg("55555555555ssssssssss2sssssssssssssssssss");
        js.sendMsg("666666666666sssssssssssss3ssssssssssssssss");
    }
}

執行的時候先執行接收類,在執行傳送類;


activemq的環境自行百度,很簡單,我這裡是使用的activeMQ的QUEUE訊息佇列,activeMQ還提供topic,感興趣的可以自己研究下他們的區別與用法。