1. 程式人生 > >Java訊息佇列--ActiveMq 實戰

Java訊息佇列--ActiveMq 實戰

https://www.cnblogs.com/jaycekon/p/6225058.html

ActiveMQ 提供了Windows 和Linux、Unix 等幾個版本,樓主這裡選擇了Linux 版本下進行開發

從它的目錄來說,還是很簡單的: 

    • bin存放的是指令碼檔案
    • conf存放的是基本配置檔案
    • data存放的是日誌檔案
    • docs存放的是說明文件
    • examples存放的是簡單的例項
    • lib存放的是activemq所需jar包
    • webapps用於存放專案的目錄

進入到ActiveMQ 安裝目錄的Bin 目錄,linux 下輸入 ./activemq start 啟動activeMQ 服務。

   輸入命令之後,會提示我們建立了一個程序IP 號,這時候說明服務已經成功啟動了。

  

  ActiveMQ預設啟動時,啟動了內建的jetty伺服器,提供一個用於監控ActiveMQ的admin應用。 
  admin:http://127.0.0.1:8161/admin/

  我們在瀏覽器開啟連結之後輸入賬號密碼(這裡和tomcat 伺服器類似)

  預設賬號:admin

  密碼:admin

  

   到這裡為止,ActiveMQ 服務端就啟動完畢了。

   ActiveMQ 在linux 下的終止命令是 ./activemq stop

3、建立一個ActiveMQ工程

   專案目錄結構:

  

  上述在官網下載ActiveMq 的時候,我們可以在目錄下看到一個jar包:

  

  這個jar 包就是我們需要在專案中進行開發中使用到的相關依賴。

  3.1 建立生產者

複製程式碼
public class Producter {

    //ActiveMq 的預設使用者名稱
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的預設登入密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的連結地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    AtomicInteger count 
= new AtomicInteger(0); //連結工廠 ConnectionFactory connectionFactory; //連結物件 Connection connection; //事務管理 Session session; ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>(); public void init(){ try { //建立一個連結工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); //從工廠中建立一個連結 connection = connectionFactory.createConnection(); //開啟連結 connection.start(); //建立一個事務(這裡通過引數可以設定事務的級別) session = connection.createSession(true,Session.SESSION_TRANSACTED); } catch (JMSException e) { e.printStackTrace(); } } public void sendMessage(String disname){ try { //建立一個訊息佇列 Queue queue = session.createQueue(disname); //訊息生產者 MessageProducer messageProducer = null; if(threadLocal.get()!=null){ messageProducer = threadLocal.get(); }else{ messageProducer = session.createProducer(queue); threadLocal.set(messageProducer); } while(true){ Thread.sleep(1000); int num = count.getAndIncrement(); //建立一條訊息 TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+ "productor:我是大帥哥,我現在正在生產東西!,count:"+num); System.out.println(Thread.currentThread().getName()+ "productor:我是大帥哥,我現在正在生產東西!,count:"+num); //傳送訊息 messageProducer.send(msg); //提交事務 session.commit(); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
複製程式碼

  3.2 建立消費者

複製程式碼
public class Comsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
複製程式碼

4、執行ActiveMQ專案

  4.1 生產者開始生產訊息

複製程式碼
public class TestMq {
    public static void main(String[] args){
        Producter producter = new Producter();
        producter.init();
        TestMq testMq = new TestMq();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producter)).start();
    }

    private class ProductorMq implements Runnable{
        Producter producter;
        public ProductorMq(Producter producter){
            this.producter = producter;
        }

        @Override
        public void run() {
            while(true){
                try {
                    producter.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
複製程式碼

   執行結果:

複製程式碼
 INFO | Successfully connected to tcp://localhost:61616
Thread-6productor:我是大帥哥,我現在正在生產東西!,count:0
Thread-4productor:我是大帥哥,我現在正在生產東西!,count:1
Thread-2productor:我是大帥哥,我現在正在生產東西!,count:3
Thread-5productor:我是大帥哥,我現在正在生產東西!,count:2
Thread-3productor:我是大帥哥,我現在正在生產東西!,count:4
Thread-6productor:我是大帥哥,我現在正在生產東西!,count:5
Thread-3productor:我是大帥哥,我現在正在生產東西!,count:6
Thread-5productor:我是大帥哥,我現在正在生產東西!,count:7
Thread-2productor:我是大帥哥,我現在正在生產東西!,count:8
Thread-4productor:我是大帥哥,我現在正在生產東西!,count:9
Thread-6productor:我是大帥哥,我現在正在生產東西!,count:10
Thread-3productor:我是大帥哥,我現在正在生產東西!,count:11
Thread-5productor:我是大帥哥,我現在正在生產東西!,count:12
Thread-2productor:我是大帥哥,我現在正在生產東西!,count:13
Thread-4productor:我是大帥哥,我現在正在生產東西!,count:14
Thread-6productor:我是大帥哥,我現在正在生產東西!,count:15
Thread-3productor:我是大帥哥,我現在正在生產東西!,count:16
Thread-5productor:我是大帥哥,我現在正在生產東西!,count:17
Thread-2productor:我是大帥哥,我現在正在生產東西!,count:18
Thread-4productor:我是大帥哥,我現在正在生產東西!,count:19
複製程式碼

  

  4.2 消費者開始消費訊息

複製程式碼
public class TestConsumer {
    public static void main(String[] args){
        Comsumer comsumer = new Comsumer();
        comsumer.init();
        TestConsumer testConsumer = new TestConsumer();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
    }

    private class ConsumerMq implements Runnable{
        Comsumer comsumer;
        public ConsumerMq(Comsumer comsumer){
            this.comsumer = comsumer;
        }

        @Override
        public void run() {
            while(true){
                try {
                    comsumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
複製程式碼

  執行結果:

12345678910111213141516171819202122INFO | Successfully connected to tcp://localhost:61616Thread-2: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我現在正在生產東西!,count:4--->0Thread-3: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我現在正在生產東西!,count:36--->1Thread-4: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我現在正在生產東西!,count:38--->2Thread-5: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我現在正在生產東西!,count:37--->3Thread-2: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我現在正在生產東西!,count:2--->4Thread-3: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我現在正在生產東西!,count:40--->5Thread-4: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我現在正在生產東西!,count:42--->6Thread-5: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我現在正在生產東西!,count:41--->7Thread-2: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我現在正在生產東西!,count:1--->8Thread-3: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我現在正在生產東西!,count:44--->9Thread-4: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我現在正在生產東西!,count:46--->10Thread-5: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我現在正在生產東西!,count:45--->11Thread-2: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我現在正在生產東西!,count:3--->12Thread-3: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我現在正在生產東西!,count:48--->13Thread-4: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我現在正在生產東西!,count:50--->14Thread-5: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我現在正在生產東西!,count:49--->15Thread-4: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我現在正在生產東西!,count:54--->16Thread-2: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我現在正在生產東西!,count:6--->17Thread-3: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我現在正在生產東西!,count:52--->18Thread-5: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我現在正在生產東西!,count:53--->19Thread-4: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我現在正在生產東西!,count:58--->20

  檢視執行結果,我們可以做ActiveMQ 服務端:http://127.0.0.1:8161/admin/ 裡面的Queues 中檢視我們生產的訊息。

 

5、ActiveMQ的特性

 5.1 ActiveMq 的特性 

  1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA訊息,事務)
  3. 對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性
  4. 通過了常見J2EE伺服器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上
  5. 支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支援通過JDBC和journal提供高速的訊息持久化
  7. 從設計上保證了高效能的叢集,客戶端-伺服器,點對點
  8. 支援Ajax
  9. 支援與Axis的整合
  10. 可以很容易得呼叫內嵌JMS provider,進行測試

 5.2 什麼情況下使用ActiveMQ?

  1. 多個專案之間整合 
    (1) 跨平臺 
    (2) 多語言 
    (3) 多專案
  2. 降低系統間模組的耦合度,解耦 
    (1) 軟體擴充套件性
  3. 系統前後端隔離 
    (1) 前後端隔離,遮蔽高安全區