ActiveMQ學習-Api介紹 (4)
Session
第一節提到了訊息提供者在建立session的時候第一個引數是事務的意思,如果將其改成 true ,那麼當傳送完訊息後就要 commit() 一下,訊息才能發出
public class Producer { public static void main(String[] args) throws JMSException { // 省略了的程式碼... // 如果session不支援事件,就是FALSE,支援事務就是true Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 省略了的程式碼... // 上面開啟了事務,這裡就要提交一下 session.commit(); // 當然如果上面傳送訊息出問題了,也可以通過 rollback() 回滾掉 // session.rollback(); } }
訊息消費者在消費訊息的時候 session 的事務就沒有意義了,true, false 都一樣的消費,不過為了保持一致,還是建議都寫成一樣的,既:提供者開啟事務,消費者也開啟事務
建立session的時候,第二個引數是簽收模式,有以下幾種
TextMessage.acknowledge()
MessageProducer
MessageProducer的send方法最多有5個引數
void send( Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException;
- destination 目的地,訊息要傳送到哪去,通過session建立
Destination destination = session.createQueue("amq-demo");
- message 訊息內容,它是一個介面,實現有以下幾種訊息型別
- deliveryMode 訊息的傳輸模式
- DeliveryMode.NON_PERSISTENT 訊息不做持久化
- DeliveryMode.PERSISTENT 訊息持久化
- priority 訊息優先順序,有0-9十個級別,0-4為普通訊息,5-9是加急訊息,預設為4
- 優先順序開啟要在
activemq.xml
裡進行配置才能生效,找到policyEntries
標籤,在裡面加上<policyEntry queue="amq-demo" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1"/>
即可
- 優先順序開啟要在
- timeToLive 訊息存活時間,預設一直存活
MessageConsumer
MessageConsumer是session建立的,一般給一個Destination即可,不過它還有兩個引數
// 建立佇列消費者 MessageConsumer createConsumer( Destination destination, java.lang.String messageSelector, boolean NoLocal) throws JMSException; // 建立主題消費者 TopicSubscriber createDurableSubscriber( Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException;
MapMessages
這裡說一下MessageSelector
舉見個例子
// 選擇名字為 z3 的訊息 String selector1 = "name = 'z3'"; // 選擇年齡大於20的訊息 String selector2 = "age > 20"; // 選擇名字為 z3 和 年齡大於20的訊息 String selector3 = "name = 'z3' AND age > 20"; MessageConsumer consumer = session.createConsumer(destination, selector1);
訊息的接收方式,通過 receive()
方法來接收,該方法是阻塞的,一般會用一個監聽來實現
HelloWorld裡接收方式是通過 while(true){}
來實現的,這種程式碼不推薦
while(true) { TextMessage msg = (TextMessage) consumer.receive(); System.out.println("消費資料:" + msg.getText()); }
下面是通過監聽的方式來接收訊息
public class Consumer { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616" ); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("amq-demo"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MsgListener()); //while(true) { //TextMessage msg = (TextMessage) consumer.receive(); //System.out.println("消費資料:" + msg.getText()); //} } static class MsgListener implements MessageListener { public void onMessage(Message message) { try { System.out.println("消費資料:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } } }
看起來是不是優雅多了 : )
建立臨時訊息
訊息提供者還可以建立臨時訊息,通過 session.createTemporaryQueue()
方法建立,這種方式建立的訊息,當connection關閉之後,訊息也就沒有了
釋出訂閱
除了佇列,ActiveMQ還可以建立主題,消費者可以通過訂閱主題來消費訊息,用法跟佇列基本一致,下面上程式碼
提供者
public class Producer { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616" ); Connection connection = connectionFactory.createConnection(); connection.start(); // 如果session不支援事件,就是FALSE,支援事務就是true Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("amq-topic-demo"); MessageProducer producer = session.createProducer(null); for (int i = 0; i < 100; i++) { TextMessage msg = session.createTextMessage("生產訊息" + i); producer.send(destination, msg); } //session.commit(); //session.rollback(); connection.close(); } }
消費者1 與 消費者2 是一樣的,把日誌內容改一下就可以了
public class Consumer1 { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616" ); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("amq-topic-demo"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MsgListener()); } static class MsgListener implements MessageListener { public void onMessage(Message message) { try { System.out.println("消費者1 消費資料:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } } }
這個啟動就要先啟動所有的消費者,然後再啟動提供者,這樣提供者釋出的主題訊息,消費者才能進行消費,執行一下,可以看到消費者1跟消費者2都消費了提供者提供的訊息
原文連結: