ActiveMQ(三):ActiveMQ的安全機制、api及訂閱模式demo
一、ActiveMQ安全機制
ActiveMQ是使用jetty部署的,修改密碼需要到相應的配置檔案
配置檔案是這個:
在其第123行新增使用者名稱和密碼,新增配置如下:
<plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="bhz" password="bhz" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins>
這時候這個需要改為這樣才能進行連線:
//第一步:建立ConnectionFactory工廠物件
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
"bhz","tcp://localhost:61616");
二、ActiveMQ的api的使用
1. Connection的使用
省略
2. Session方法的使用
一旦從ConnectionFactory中獲得一個Connection,必須從Connection中建立一個或多個Session.Session是一個傳送或接收訊息的執行緒
session可以被事務化,也可以不被事務化
如果使用事務的話,那麼需要commit,如下
package test.mq.helloworld; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { public static void main(String[] agrs) throws Exception{ //第一步:建立ConnectionFactory工廠物件 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz", "bhz","tcp://localhost:61616"); //第二步:通過ConnectionFactory工廠物件我們建立一個Connection物件 Connection connection = connectionFactory.createConnection(); connection.start(); //第三步:通過connection物件建立Session會話,第一個引數為是否開啟事務,第二個引數為簽收模式,一般設定為自動簽收 Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); //第四步:通過Session建立Destination物件,queue1可看作是放入佇列的訊息名稱,可以自定義 Destination desiDestination = session.createQueue("queue1"); //第五步:通過session建立訊息的生產者或消費者,下面是建立生產者 MessageProducer messageProducer = session.createProducer(desiDestination); //第六步:使用MessageProducer的set方法為其設定持久化特性和非持久化特性,後面再詳細介紹 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//這裡先設定為非持久化 //第七步:通過JMS規範的TextMessage形式建立資料(通過session物件),並用MessageProducer的send方法傳送資料 for(int i =0; i<5;i++){ TextMessage textMessage = session.createTextMessage("這是訊息內容,id為"+i); messageProducer.send(textMessage); } session.commit(); if(connection!=null){ connection.close(); } } }
注意session.commit();
session的簽收模式有三種情況:
1. Session.AUTO_ACKNOWLEDGE :自動簽收
2. Session.CLIENT_ACKNOWLEDGE:客戶端通過呼叫訊息(Message)的acknowledge方法簽收訊息,在這種情況下,簽收發生在Session層面:簽收一個已消費的訊息會自動簽收這個Session所有已消費訊息的收條
3. Session.DUPS_OK_ACKNOWLEDGE:此選項指示Session不必確保對傳送訊息的簽收。它可能引起訊息的重複,但是降低了Session的開銷,所以只有客戶端能容忍重複的訊息,才可使用
上面三種模式,推薦使用的是Session.CLIENT_ACKNOWLEDGE
它的操作如下:
package test.mq.helloworld;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver {
public static void main(String[] agrs) throws Exception{
//第一步:建立ConnectionFactory工廠物件
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
"bhz","tcp://localhost:61616");
//第二步:通過ConnectionFactory工廠物件我們建立一個Connection物件
Connection connection = connectionFactory.createConnection();
connection.start();
//第三步:通過connection物件建立Session會話,第一個引數為是否開啟事務,第二個引數為簽收模式,一般設定為自動簽收
Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
//第四步:通過Session建立Destination物件,queue1可看作是放入佇列的訊息名稱,可以自定義
Destination desiDestination = session.createQueue("queue1");
//第五步:通過session建立訊息的生產者或消費者,下面是建立消費者
MessageConsumer messageConsumer = session.createConsumer(desiDestination);
while(true){
TextMessage msg = (TextMessage) messageConsumer.receive();
msg.acknowledge();
if(msg==null) {
break;
}
System.out.println("收到的內容為"+msg.getText());
}
if(connection!=null){
connection.close();
}
}
}
使用該模式,需要手動的設定接收完畢的訊號msg.acknowledge();,這可以當做告訴佇列,這個訊息已經接收完畢,可以銷燬了。
3.MessageProducer
下面是改造後的程式碼:
package test.mq.helloworld;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
public static void main(String[] agrs) throws Exception{
//第一步:建立ConnectionFactory工廠物件
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
"bhz","tcp://localhost:61616");
//第二步:通過ConnectionFactory工廠物件我們建立一個Connection物件
Connection connection = connectionFactory.createConnection();
connection.start();
//第三步:通過connection物件建立Session會話,第一個引數為是否開啟事務,第二個引數為簽收模式,一般設定為自動簽收
Session session = connection.createSession(Boolean.TRUE,Session.CLIENT_ACKNOWLEDGE);
//第四步:通過Session建立Destination物件,queue1可看作是放入佇列的訊息名稱,可以自定義
Destination desiDestination = session.createQueue("queue1");
//第五步:通過session建立訊息的生產者或消費者,下面是建立生產者
MessageProducer messageProducer = session.createProducer(null);
//第六步:使用MessageProducer的set方法為其設定持久化特性和非持久化特性,後面再詳細介紹
// messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//這裡先設定為非持久化
//第七步:通過JMS規範的TextMessage形式建立資料(通過session物件),並用MessageProducer的send方法傳送資料
for(int i =0; i<5;i++){
//第一個引數:目的地
//第二個引數:訊息文字
//第三個引數:接收模式
//第四個引數:優先順序
//第五個引數:訊息在訊息佇列儲存的時間,這裡指儲存2分鐘
TextMessage textMessage = session.createTextMessage("這是訊息內容,id為"+i);
messageProducer.send(desiDestination,textMessage,DeliveryMode.NON_PERSISTENT, i,1000*60*2);
}
session.commit();
if(connection!=null){
connection.close();
}
}
}
需要注意以下內容:
1. 優先順序並不是嚴格遵循的
2. 訊息在佇列中超過儲存事件後,還沒有人取,那麼就會自動消失
4.MessageConsumer
三、訂閱模式的使用
上面及之前的文章的是p2p模式(點對點模式)
那麼釋出訂閱模式又是如何處理的?下面先看釋出/訂閱模式相關的含義:
下面是demo案例
訊息生產者
package bhz.mq.pd;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Publish {
private ConnectionFactory factory;
private Connection connection;
private Session session;
private MessageProducer producer;
public Publish(){
try {
factory = new ActiveMQConnectionFactory("bhz",
"bhz","tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void sendMessage() throws Exception{
Destination destination = session.createTopic("topic");
TextMessage textMessage = session.createTextMessage("我是內容");
producer.send(destination, textMessage);
}
public static void main(String[] agrs) throws Exception{
Publish p = new Publish();
p.sendMessage();
}
}
上面的主題是:topic
當釋出一條訊息的時候,便會在後臺topics上顯示
下面是消費者的程式碼,消費者訂閱了topic主題
package bhz.mq.pd;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer1 {
private ConnectionFactory factory;
private Connection connection;
private Session session;
private MessageConsumer consumer;
public Consumer1(){
try {
factory = new ActiveMQConnectionFactory("bhz",
"bhz","tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void receive() throws Exception{
Destination destionation = session.createTopic("topic");
consumer = session.createConsumer(destionation);
consumer.setMessageListener(new Listener());
}
public static void main(String[] agrs) throws Exception{
Consumer1 consumer1 = new Consumer1();
consumer1.receive();
}
class Listener implements MessageListener{
@Override
public void onMessage(Message msg) {
// TODO Auto-generated method stub
System.out.println("這是接收的內容:"+msg);
}
}
}
這時候只要生產者釋出訊息,那麼消費者就會接收到訊息