1. 程式人生 > >ActiveMQ(三):ActiveMQ的安全機制、api及訂閱模式demo

ActiveMQ(三):ActiveMQ的安全機制、api及訂閱模式demo

一、ActiveMQ安全機制

ActiveMQ是使用jetty部署的,修改密碼需要到相應的配置檔案 
配置檔案是這個:

配置檔案

在其第123行新增使用者名稱和密碼,新增配置如下:

    <plugins>
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUser username="bhz" password="bhz" groups="users,admins"/>
                </users>
            </simpleAuthenticationPlugin>
        </plugins>
  •  

這時候這個需要改為這樣才能進行連線:

//第一步:建立ConnectionFactory工廠物件
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");
  •  

二、ActiveMQ的api的使用

1. Connection的使用

省略

2. Session方法的使用

一旦從ConnectionFactory中獲得一個Connection,必須從Connection中建立一個或多個Session.Session是一個傳送或接收訊息的執行緒 
session可以被事務化,也可以不被事務化 
如果使用事務的話,那麼需要commit,如下

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    public static void main(String[] agrs) throws Exception{

        //第一步:建立ConnectionFactory工廠物件
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");

        //第二步:通過ConnectionFactory工廠物件我們建立一個Connection物件
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //第三步:通過connection物件建立Session會話,第一個引數為是否開啟事務,第二個引數為簽收模式,一般設定為自動簽收
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

        //第四步:通過Session建立Destination物件,queue1可看作是放入佇列的訊息名稱,可以自定義
        Destination desiDestination = session.createQueue("queue1");

        //第五步:通過session建立訊息的生產者或消費者,下面是建立生產者
        MessageProducer messageProducer = session.createProducer(desiDestination);

        //第六步:使用MessageProducer的set方法為其設定持久化特性和非持久化特性,後面再詳細介紹
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//這裡先設定為非持久化

        //第七步:通過JMS規範的TextMessage形式建立資料(通過session物件),並用MessageProducer的send方法傳送資料
        for(int i =0; i<5;i++){
            TextMessage textMessage =  session.createTextMessage("這是訊息內容,id為"+i);
            messageProducer.send(textMessage);
        }
        session.commit();
        if(connection!=null){
            connection.close();
        }

    }

}
  •  

注意session.commit(); 
session的簽收模式有三種情況: 
1. Session.AUTO_ACKNOWLEDGE :自動簽收 
2. Session.CLIENT_ACKNOWLEDGE:客戶端通過呼叫訊息(Message)的acknowledge方法簽收訊息,在這種情況下,簽收發生在Session層面:簽收一個已消費的訊息會自動簽收這個Session所有已消費訊息的收條 
3. Session.DUPS_OK_ACKNOWLEDGE:此選項指示Session不必確保對傳送訊息的簽收。它可能引起訊息的重複,但是降低了Session的開銷,所以只有客戶端能容忍重複的訊息,才可使用 
上面三種模式,推薦使用的是Session.CLIENT_ACKNOWLEDGE 
它的操作如下:

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {

    public static void main(String[] agrs) throws Exception{

        //第一步:建立ConnectionFactory工廠物件
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");

        //第二步:通過ConnectionFactory工廠物件我們建立一個Connection物件
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //第三步:通過connection物件建立Session會話,第一個引數為是否開啟事務,第二個引數為簽收模式,一般設定為自動簽收
        Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);

        //第四步:通過Session建立Destination物件,queue1可看作是放入佇列的訊息名稱,可以自定義
        Destination desiDestination = session.createQueue("queue1");

        //第五步:通過session建立訊息的生產者或消費者,下面是建立消費者
        MessageConsumer messageConsumer = session.createConsumer(desiDestination);

        while(true){
            TextMessage msg = (TextMessage) messageConsumer.receive();
            msg.acknowledge();
            if(msg==null) {
                break;
            }
            System.out.println("收到的內容為"+msg.getText());
        }
        if(connection!=null){
            connection.close();
        }

    }

}
  •  

使用該模式,需要手動的設定接收完畢的訊號msg.acknowledge();,這可以當做告訴佇列,這個訊息已經接收完畢,可以銷燬了。

3.MessageProducer

MessageProducer詳解 
下面是改造後的程式碼:

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    public static void main(String[] agrs) throws Exception{

        //第一步:建立ConnectionFactory工廠物件
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");

        //第二步:通過ConnectionFactory工廠物件我們建立一個Connection物件
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //第三步:通過connection物件建立Session會話,第一個引數為是否開啟事務,第二個引數為簽收模式,一般設定為自動簽收
        Session session = connection.createSession(Boolean.TRUE,Session.CLIENT_ACKNOWLEDGE);

        //第四步:通過Session建立Destination物件,queue1可看作是放入佇列的訊息名稱,可以自定義
        Destination desiDestination = session.createQueue("queue1");

        //第五步:通過session建立訊息的生產者或消費者,下面是建立生產者
        MessageProducer messageProducer = session.createProducer(null);

        //第六步:使用MessageProducer的set方法為其設定持久化特性和非持久化特性,後面再詳細介紹
    //  messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//這裡先設定為非持久化

        //第七步:通過JMS規範的TextMessage形式建立資料(通過session物件),並用MessageProducer的send方法傳送資料
        for(int i =0; i<5;i++){
            //第一個引數:目的地
            //第二個引數:訊息文字
            //第三個引數:接收模式
            //第四個引數:優先順序
            //第五個引數:訊息在訊息佇列儲存的時間,這裡指儲存2分鐘
            TextMessage textMessage =  session.createTextMessage("這是訊息內容,id為"+i);
            messageProducer.send(desiDestination,textMessage,DeliveryMode.NON_PERSISTENT, i,1000*60*2);
        }
        session.commit();
        if(connection!=null){
            connection.close();
        }

    }

}
  •  

需要注意以下內容: 
1. 優先順序並不是嚴格遵循的 
2. 訊息在佇列中超過儲存事件後,還沒有人取,那麼就會自動消失

4.MessageConsumer

MessageConsumer詳解

建立臨時訊息

三、訂閱模式的使用

上面及之前的文章的是p2p模式(點對點模式) 
那麼釋出訂閱模式又是如何處理的?下面先看釋出/訂閱模式相關的含義: 
釋出訂閱模式 
下面是demo案例 
訊息生產者

package bhz.mq.pd;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Publish {

    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public Publish(){
        try {
            factory = new ActiveMQConnectionFactory("bhz",
                    "bhz","tcp://localhost:61616");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(null);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public void sendMessage() throws Exception{
        Destination destination = session.createTopic("topic");
        TextMessage textMessage = session.createTextMessage("我是內容");
        producer.send(destination, textMessage);
    }

    public static void main(String[] agrs) throws Exception{
        Publish p = new Publish();
        p.sendMessage();
    }
}
  •  

上面的主題是:topic 
當釋出一條訊息的時候,便會在後臺topics上顯示 
後臺多了一條主題

下面是消費者的程式碼,消費者訂閱了topic主題

package bhz.mq.pd;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer1 {

    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageConsumer consumer;

    public Consumer1(){
        try {
            factory = new ActiveMQConnectionFactory("bhz",
                    "bhz","tcp://localhost:61616");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
    public void receive() throws Exception{
        Destination destionation = session.createTopic("topic");
        consumer = session.createConsumer(destionation);
        consumer.setMessageListener(new Listener());
    }

    public static void main(String[] agrs) throws Exception{
        Consumer1 consumer1 = new Consumer1();
        consumer1.receive();
    }
    class Listener implements MessageListener{
        @Override
        public void onMessage(Message msg) {
            // TODO Auto-generated method stub
            System.out.println("這是接收的內容:"+msg);
        }

    }
}

這時候只要生產者釋出訊息,那麼消費者就會接收到訊息