1. 程式人生 > >activimq消息隊列基本配置

activimq消息隊列基本配置

http bsp trace jms admin knowledge 設定 ring throw

背景:

在了解一個分布式框架的時候,偶然接觸到activimq消息隊列,於是就決定寫一個小demo

首先是linux配置activimq

去官網下載一個activimq的linux安裝包,直接解壓,到bin目錄下執行:

activemq start

這樣就啟動了,很簡單

然後可以查看是否有新的端口61616被監聽

技術分享

接著查看是否有相關進程

技術分享

同時activimq還有一個後臺監控頁面, 但是這個默認端口是8161,訪問地址為:

http://10.10.10.30:8161/admin/

默認用戶名、密碼均為admin

技術分享

接下來是測試代碼,maven庫地址:

1 <dependency>
2 <groupId>org.apache.activemq</groupId> 3 <artifactId>activemq-core</artifactId> 4 <version>5.7.0</version> 5 </dependency>
 1 package com.asen.activimq;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
5 import javax.jms.DeliveryMode; 6 import javax.jms.Destination; 7 import javax.jms.MessageProducer; 8 import javax.jms.Session; 9 import javax.jms.TextMessage; 10 11 import org.apache.activemq.ActiveMQConnection; 12 import org.apache.activemq.ActiveMQConnectionFactory; 13 14 public class Sender {
15 private static final int SEND_NUMBER = 5; 16 17 public static void main(String[] args) { 18 // ConnectionFactory :連接工廠,JMS 用它創建連接 19 ConnectionFactory connectionFactory; 20 // Connection :JMS 客戶端到JMS 21 // Provider 的連接 22 Connection connection = null; 23 // Session: 一個發送或接收消息的線程 24 Session session; 25 // Destination :消息的目的地;消息發送給誰. 26 Destination destination; 27 // MessageProducer:消息發送者 28 MessageProducer producer; 29 // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar 30 connectionFactory = new ActiveMQConnectionFactory( 31 ActiveMQConnection.DEFAULT_USER, 32 ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.10.30:61616"); 33 try { 34 // 從工廠獲取連接對象 35 connection = connectionFactory.createConnection(); 36 // 啟動 37 connection.start(); 38 // 獲取操作連接 39 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 40 // 獲取session註意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 41 // 獲取消息目的地 42 destination = session.createQueue("FirstQueue"); 43 // 獲取發送者 44 producer = session.createProducer(destination); 45 // 設置不持久化,此處學習,實際根據項目決定 46 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 47 // 構造消息,此處寫死,項目就是參數,或者方法獲取 48 sendMessage(session, producer); 49 session.commit(); 50 } catch (Exception e) { 51 e.printStackTrace(); 52 } finally { 53 try { 54 if (null != connection) 55 connection.close(); 56 } catch (Exception e) { 57 e.printStackTrace(); 58 } 59 } 60 } 61 62 public static void sendMessage(Session session, MessageProducer producer) 63 throws Exception { 64 65 for (int i = 1; i <= SEND_NUMBER; i++) { 66 TextMessage message = session.createTextMessage("ActiveMq 發送消息" + i); 67 // 發送消息到目的地方 68 System.out.println("發送消息:" + "ActiveMq 發送消息" + i); 69 producer.send(message); 70 } 71 } 72 }
 1 package com.asen.activimq;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.MessageConsumer;
 7 import javax.jms.Session;
 8 import javax.jms.TextMessage;
 9 
10 import org.apache.activemq.ActiveMQConnection;
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 public class Receiver {
14     public static void main(String[] args) {
15         // ConnectionFactory :連接工廠,JMS 用它創建連接
16         ConnectionFactory connectionFactory;
17         // Connection :JMS 客戶端到JMS Provider 的連接
18         Connection connection = null;
19         // Session: 一個發送或接收消息的線程
20         Session session ;
21         // Destination :消息的目的地;消息發送給誰.
22         Destination destination;
23         // 消費者,消息接收者
24         MessageConsumer consumer;
25         // 初始化工廠
26         connectionFactory = new ActiveMQConnectionFactory(
27                 ActiveMQConnection.DEFAULT_USER,
28                 ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.10.30:61616");
29         try {
30             // 從工廠獲取連接對象
31             connection = connectionFactory.createConnection();
32             // 啟動
33             connection.start();
34             // 獲取操作連接
35             session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
36             // 獲取session註意參數值xingbo.xu-queue是一個服務器的queue,須在ActiveMq的console配置  
37             // 獲取消息目的地:隊列
38             destination = session.createQueue("FirstQueue");
39             // 接收消息
40             consumer = session.createConsumer(destination);
41             while(true){
42                 // 設置接收者接收消息的時間,為了便於測試,這裏設定為100s
43                 TextMessage message = (TextMessage)consumer.receive(100000);
44                 System.out.println("message:" + message);
45                 if(null != message){
46                     System.out.println("接收到消息:" + message.getText());
47                 }else{
48                     break;
49                 }
50             }
51         } catch (Exception e) {
52             e.printStackTrace();
53         } finally {
54             try {
55                 if (null != connection) {
56                     connection.close();
57                 }
58             } catch (Exception e) {
59                 e.printStackTrace();
60             }
61         }
62 
63     }
64 }

運行之後,後臺監控頁面可以看見:

技術分享

技術分享

activimq持久化常用的有三種方式:1、文件持久化 2、mysql持久化 3、oracle持久化

在activimq的配置文件中默認開啟了文件持久化

技術分享

同時我們需要修改一行代碼:

技術分享

這樣在activimq重啟之後就不會有消息丟失了

activimq消息隊列基本配置