activimq消息隊列基本配置
阿新 • • 發佈:2017-05-23
http bsp trace jms admin knowledge 設定 ring throw
背景:
在了解一個分布式框架的時候,偶然接觸到activimq消息隊列,於是就決定寫一個小demo
首先是linux配置activimq
去官網下載一個activimq的linux安裝包,直接解壓,到bin目錄下執行:
activemq start
這樣就啟動了,很簡單
然後可以查看是否有新的端口61616被監聽
接著查看是否有相關進程
同時activimq還有一個後臺監控頁面, 但是這個默認端口是8161,訪問地址為:
http://10.10.10.30:8161/admin/
默認用戶名、密碼均為admin
接下來是測試代碼,maven庫地址:
1 <dependency>2 <groupId>org.apache.activemq</groupId> 3 <artifactId>activemq-core</artifactId> 4 <version>5.7.0</version> 5 </dependency>
1 package com.asen.activimq; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory;5 import javax.jms.DeliveryMode; 6 import javax.jms.Destination; 7 import javax.jms.MessageProducer; 8 import javax.jms.Session; 9 import javax.jms.TextMessage; 10 11 import org.apache.activemq.ActiveMQConnection; 12 import org.apache.activemq.ActiveMQConnectionFactory; 13 14 public class Sender {15 private static final int SEND_NUMBER = 5; 16 17 public static void main(String[] args) { 18 // ConnectionFactory :連接工廠,JMS 用它創建連接 19 ConnectionFactory connectionFactory; 20 // Connection :JMS 客戶端到JMS 21 // Provider 的連接 22 Connection connection = null; 23 // Session: 一個發送或接收消息的線程 24 Session session; 25 // Destination :消息的目的地;消息發送給誰. 26 Destination destination; 27 // MessageProducer:消息發送者 28 MessageProducer producer; 29 // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar 30 connectionFactory = new ActiveMQConnectionFactory( 31 ActiveMQConnection.DEFAULT_USER, 32 ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.10.30:61616"); 33 try { 34 // 從工廠獲取連接對象 35 connection = connectionFactory.createConnection(); 36 // 啟動 37 connection.start(); 38 // 獲取操作連接 39 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 40 // 獲取session註意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 41 // 獲取消息目的地 42 destination = session.createQueue("FirstQueue"); 43 // 獲取發送者 44 producer = session.createProducer(destination); 45 // 設置不持久化,此處學習,實際根據項目決定 46 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 47 // 構造消息,此處寫死,項目就是參數,或者方法獲取 48 sendMessage(session, producer); 49 session.commit(); 50 } catch (Exception e) { 51 e.printStackTrace(); 52 } finally { 53 try { 54 if (null != connection) 55 connection.close(); 56 } catch (Exception e) { 57 e.printStackTrace(); 58 } 59 } 60 } 61 62 public static void sendMessage(Session session, MessageProducer producer) 63 throws Exception { 64 65 for (int i = 1; i <= SEND_NUMBER; i++) { 66 TextMessage message = session.createTextMessage("ActiveMq 發送消息" + i); 67 // 發送消息到目的地方 68 System.out.println("發送消息:" + "ActiveMq 發送消息" + i); 69 producer.send(message); 70 } 71 } 72 }
1 package com.asen.activimq; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.Destination; 6 import javax.jms.MessageConsumer; 7 import javax.jms.Session; 8 import javax.jms.TextMessage; 9 10 import org.apache.activemq.ActiveMQConnection; 11 import org.apache.activemq.ActiveMQConnectionFactory; 12 13 public class Receiver { 14 public static void main(String[] args) { 15 // ConnectionFactory :連接工廠,JMS 用它創建連接 16 ConnectionFactory connectionFactory; 17 // Connection :JMS 客戶端到JMS Provider 的連接 18 Connection connection = null; 19 // Session: 一個發送或接收消息的線程 20 Session session ; 21 // Destination :消息的目的地;消息發送給誰. 22 Destination destination; 23 // 消費者,消息接收者 24 MessageConsumer consumer; 25 // 初始化工廠 26 connectionFactory = new ActiveMQConnectionFactory( 27 ActiveMQConnection.DEFAULT_USER, 28 ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.10.30:61616"); 29 try { 30 // 從工廠獲取連接對象 31 connection = connectionFactory.createConnection(); 32 // 啟動 33 connection.start(); 34 // 獲取操作連接 35 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 36 // 獲取session註意參數值xingbo.xu-queue是一個服務器的queue,須在ActiveMq的console配置 37 // 獲取消息目的地:隊列 38 destination = session.createQueue("FirstQueue"); 39 // 接收消息 40 consumer = session.createConsumer(destination); 41 while(true){ 42 // 設置接收者接收消息的時間,為了便於測試,這裏設定為100s 43 TextMessage message = (TextMessage)consumer.receive(100000); 44 System.out.println("message:" + message); 45 if(null != message){ 46 System.out.println("接收到消息:" + message.getText()); 47 }else{ 48 break; 49 } 50 } 51 } catch (Exception e) { 52 e.printStackTrace(); 53 } finally { 54 try { 55 if (null != connection) { 56 connection.close(); 57 } 58 } catch (Exception e) { 59 e.printStackTrace(); 60 } 61 } 62 63 } 64 }
運行之後,後臺監控頁面可以看見:
activimq持久化常用的有三種方式:1、文件持久化 2、mysql持久化 3、oracle持久化
在activimq的配置文件中默認開啟了文件持久化
同時我們需要修改一行代碼:
這樣在activimq重啟之後就不會有消息丟失了
activimq消息隊列基本配置