1. 程式人生 > >ActiveMQ發訊息、收訊息、持久化,查詢佇列剩餘訊息數、出隊數的實現

ActiveMQ發訊息、收訊息、持久化,查詢佇列剩餘訊息數、出隊數的實現

1.首先啟動activeMQ的服務

  1. public class RunServer {  
  2.    
  3.     /** 啟動activeMQ服務 */  
  4.     public static void main(String[] args) throws Exception {  
  5.         RunServer rs = new
     RunServer();         
  6.         BrokerService broker = rs.startServer();  
  7.     }  
  8.       
  9.     public
     BrokerService startServer() throws Exception{  
  10.         // java程式碼呼叫activemq相關的類來構造並啟動brokerService  
  11.         BrokerService broker = new BrokerService();  
  12.    
  13.         // 以下是持久化的配置  
  14.         // 持久化檔案儲存位置  
  15.         File dataFilterDir = new File("targer/amq-in-action/kahadb");  
  16.         KahaDBStore kaha = new KahaDBStore();  
  17.         kaha.setDirectory(dataFilterDir);  
  18.         // use a bigger journal file  
  19.         kaha.setJournalMaxFileLength(1024*100);  
  20.         // small batch means more frequent and smaller writes  
  21.         kaha.setIndexWriteBatchSize(100);  
  22.         // do the index write in a separate thread  
  23.         kaha.setEnableIndexWriteAsync(true);  
  24.           
  25.         broker.setPersistenceAdapter(kaha);  
  26.         // create a transport connector  
  27.         broker.addConnector("tcp://localhost:61616");  
  28.         broker.setUseJmx(true);  
  29.         //broker.setDataDirectory("data/");  
  30.           
  31.    
  32.         // 以下是ManagementContext的配置,從這個容器中可以取得訊息佇列中未執行的訊息數、消費者數、出隊數等等  
  33.         // 設定ManagementContext  
  34.         ManagementContext context = broker.getManagementContext();  
  35.         context.setConnectorPort(2011);  
  36.         context.setJmxDomainName("my-broker");  
  37.         context.setConnectorPath("/jmxrmi");  
  38.         broker.start();  
  39.         System.in.read();  
  40.         return broker;  
  41.     }  

2.傳送訊息

  1. public class Sender {  
  2.     private static final int SEND_NUMBER = 1;  
  3.    
  4.     public static void main(String[] args) {  
  5.         // ConnectionFactory :連線工廠,JMS 用它建立連線  
  6.         ConnectionFactory connectionFactory;  
  7.         // Connection :JMS 客戶端到JMS Provider 的連線  
  8.         Connection connection = null;  
  9.         // Session: 一個傳送或接收訊息的執行緒  
  10.         Session session;  
  11.         // Destination :訊息的目的地;訊息傳送給誰.  
  12.         Destination destination;  
  13.         // MessageProducer:訊息傳送者  
  14.         MessageProducer producer;  
  15.         // TextMessage message;  
  16.         // 構造ConnectionFactory例項物件,此處採用ActiveMq的實現jar  
  17.    
  18.         connectionFactory = new ActiveMQConnectionFactory(  
  19.                 ActiveMQConnection.DEFAULT_USER,  
  20.                 ActiveMQConnection.DEFAULT_PASSWORD,  
  21.                 "tcp://localhost:61616");  
  22.         try {  
  23.             // 構造從工廠得到連線物件  
  24.             connection = connectionFactory.createConnection();  
  25.             // 啟動  
  26.             connection.start();  
  27.             // 獲取操作連線  
  28.             session = connection.createSession(Boolean.TRUE,  
  29.                     Session.AUTO_ACKNOWLEDGE);  
  30.             // 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置  
  31.             destination = session.createQueue("test-persistence");  
  32.             // 得到訊息生成者【傳送者】  
  33.             producer = session.createProducer(destination);  
  34.             // 設定不持久化,可以更改  
  35.             producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
  36.             // 構造訊息  
  37.             sendMessage(session, producer);  
  38.             session.commit();  
  39.    
  40.         } catch (Exception e) {  
  41.             e.printStackTrace();  
  42.         } finally {  
  43.             try {  
  44.                 if (null != connection)  
  45.                     connection.close();  
  46.             } catch (Throwable ignore) {  
  47.             }  
  48.         }  
  49.    
  50.     }  
  51.    
  52.     public static void sendMessage(Session session, MessageProducer producer)  
  53.             throws Exception {  
  54.         for (int i = 1; i <= SEND_NUMBER; i++) {  
  55.             TextMessage message = session  
  56.                     .createTextMessage("ActiveMq 傳送的訊息" + i);  
  57.             // 傳送訊息到目的地方  
  58.             System.out.println("傳送訊息:" + i);  
  59.             producer.send(message);  
  60.         }  
  61.     }  
3.收訊息
  1. public class Receiver {  
  2.     public static void main(String[] args) {  
  3.         // ConnectionFactory :連線工廠,JMS 用它建立連線  
  4.         ConnectionFactory connectionFactory;  
  5.         // Connection :JMS 客戶端到JMS Provider 的連線  
  6.         Connection connection = null;  
  7.         // Session: 一個傳送或接收訊息的執行緒  
  8.         Session session;  
  9.         // Destination :訊息的目的地;訊息傳送給誰.  
  10.         Destination destination;  
  11.         // 消費者,訊息接收者  
  12.         MessageConsumer consumer;  
  13.    
  14.         connectionFactory = new ActiveMQConnectionFactory(  
  15.                 ActiveMQConnection.DEFAULT_USER,  
  16.                 ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");  
  17.         try {  
  18.             // 構造從工廠得到連線物件  
  19.             connection = connectionFactory.createConnection();  
  20.             // 啟動  
  21.             connection.start();  
  22.             // 獲取操作連線  
  23.             session = connection.createSession(Boolean.TRUE,  
  24.                     Session.AUTO_ACKNOWLEDGE);  
  25.             //test-queue跟sender的保持一致,一個建立一個來接收  
  26.             destination = session.createQueue("test-persistence");  
  27.             consumer = session.createConsumer(destination);  
  28.               
  29.             consumer.setMessageListener(new MessageListener() {  
  30.                 public void onMessage(Message arg0) {  
  31.                     try {  
  32.                         Thread.sleep(1000);  
  33.                     } catch (InterruptedException e1) {  
  34.                         e1.printStackTrace();  
  35.                     }  
  36.                     System.out.println("==================");  
  37.                     try {  
  38.                         System.out.println("RECEIVE1第一個獲得者:"  
  39.                                 + ((TextMessage) arg0).getText());  
  40.                     } catch (JMSException e) {  
  41.                         // TODO Auto-generated catch block  
  42.                         e.printStackTrace();  
  43.                     }  
  44.                   
  45.                 }  
  46.             });  
  47.         } catch (Exception e) {  
  48.             e.printStackTrace();  
  49.         }  
  50.         finally {  
  51.             try {  
  52.                 if (null != connection)  
  53.                     connection.close();  
  54.             } catch (Throwable ignore) {  
  55. 相關推薦

    ActiveMQ訊息訊息持久化查詢佇列剩餘訊息實現

    1.首先啟動activeMQ的服務 public class RunServer {           /** 啟動activeMQ服務&

    比較全面的Eclipse配置詳解(包括智慧提示設定智慧提示外掛修改修改空格自動上屏JDK配置各種快捷鍵列表……) - decarl - 部落格園

    Eclipse編輯器基本設定 1、新增行號 在邊緣處右鍵 2、改字型 字型的一般配置 3、去掉拼寫錯誤檢查 4、Java程式碼風格 程式碼格式化 Ctrl + Shift + F 之後點選右邊的New按鈕,新建一個風格。

    比較全面的Eclipse配置詳解(包括智慧提示設定智慧提示外掛修改修改空格自動上屏JDK配置各種快捷鍵列表……)

    Eclipse編輯器基本設定 1、新增行號 在邊緣處右鍵 2、改字型 字型的一般配置 3、去掉拼寫錯誤檢查 4、Java程式碼風格 程式碼格式化 Ctrl + Shift + F 之後點選右邊的Ne

    根據物料號和貨物料憑證查詢對應的序列號

    SELECT   b~sernr c~lbbsa            FROM ser03 AS a INNER JOIN objk AS b               ON a~obknr = b~obknr inner join equi AS d  on      d~sernr = b~sernr

    為什麼使用訊息佇列訊息佇列有什麼優點和缺點?KafkaActiveMQRabbitMQRocketMQ 都有什麼優點和缺點?

    面試題 為什麼使用訊息佇列? 訊息佇列有什麼優點和缺點? Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什麼區別,以及適合哪些場景? 面試官心理分析 其實面試官主要是想看看: 第一,你知不知道你們系統裡為什麼要用訊息佇列這個東西? 不少候選人,說自己專案裡用了 Redis、M

    關於阿里訊息佇列RocketMQ(安裝使用和坑)你需要知道的事情

    為什麼選擇RocketMQ Apache RocketMQ作為阿里開源的一款高效能、高吞吐量的分散式訊息中介軟體。因為阿里有海量的資料量,無數業務場景的應用,是RocketMQ搶盡風頭風頭,成為不可多得中介軟體專案,加上已經正式加入Apach俱樂部,作為頂級的開源專案! 一、關於

    [藍芽] 6基於nRF51822的藍芽心率計工程訊息流Log分析(詳細)

    轉載自: http://www.cnblogs.com/zjutlitao/p/5051166.html   開機初始化Log   Log編號

    原 Win32 SDK基礎(11)—— 訊息佇列和GetMessage/PeekMessageSendMessage/Postmesage

    版權宣告:本文為博主原創文章,若轉載請註明出處;若有錯誤,歡迎指出;若有問題,歡迎留言。 https://blog.csdn.net/lzhui1987/article/details/70144952 一、訊息佇列 1.1 訊息佇列        

    訊息佇列MQ選型 - KafkaRabbitMQ對比

    image.png 適應場景 非同步處理,應用解耦,流量削鋒和訊息通訊 對比 feature scenario Kafka RabbitMQ 備註 PUB-SUB 釋出訂閱模型

    C#應用WindowsApi實現查詢\列舉(FindWindowEnumChildWindows)窗體控制元件併發送訊息

    轉載:http://www.cnblogs.com/hehexiaoxia/p/4223927.html 首先介紹基本WindowsApi: public static extern IntPtr FindWindow(string lpClassName, strin

    JMS訊息持久化ActiveMQ訊息持久化到mySql資料庫中

    ActiveMQ5.8.0版本採用kahadb作為預設的訊息持久化方式。使用預設的持久化機制,我們不容易直接看到訊息究竟是如何持久的。ActiveMQ提供的JDBC持久化機制,能夠將持久化資訊儲存到資

    Linux c 基於記憶體的程序通訊—共享記憶體共享佇列訊息佇列

    基於記憶體的程序通訊: 1.      核心共享記憶體 程式設計模型:  1.1.建立共享記憶體,得到一個ID  shmget 1.2.把ID影射成虛擬地址(掛載)  shmat        1.3.使用虛擬地址訪問核心共享記憶體使用任何記憶體函式與運算子號       

    Spring+ActiveMQ訊息持久化Topic持久化訂閱

    訊息持久化就是將訊息儲存到磁碟,這樣的好處就是如果服務掛了,則訊息還儲存在磁碟不會丟失,服務起來後還能找到訊息並在此傳送,訊息的持久化和訊息的傳送模型是沒有關係的。 訊息持久化的配置很方便的,所以其他的那些就不寫出來了,可以看看上一篇文章中的同步非同步實現方式。

    Win32 SDK基礎(11)—— 訊息佇列和GetMessage/PeekMessageSendMessage/Postmesage

    一、訊息佇列 1.1 訊息佇列         訊息佇列是用來存放訊息的一個佇列,訊息在佇列中先入先出,所有的視窗程式都具有訊息佇列,程式可以從佇列中獲取訊息。 1.2 訊息佇列的型別         系統訊息佇列:由作業系統維護的訊息佇列,存放系統產生的訊息,如滑鼠、鍵盤

    位元組跳動面試官這樣問訊息佇列:高可用不重複消費可靠傳輸順序消費訊息堆積我整理了下

    ## 寫在前面 又到了年底跳槽高峰季,很多小夥伴出去面試時,不少面試官都會問到訊息佇列的問題,不少小夥伴回答的不是很完美,有些小夥伴是心裡知道答案,嘴上卻沒有很好的表達出來,究其根本原因,還是對相關的知識點理解的不夠透徹。今天,我們就一起來探討下這個話題。注:文章有點長,你說你能一鼓作氣看完,我有點不信!!

    解決持久化據太大單個節點的硬盤無法存儲的問題;解決運算量太大單個節點的內存CPU無法處理的問題

    pro 一致性哈希 普通 .html 價格 str oca 計劃 硬件 需要學習的技術很多,要自學新知識也不是一件容易的事,選擇一個自己比較感興趣的會是一個比較好的開端,於是,打算學一學分布式系統。   帶著問題,有目的的學習,先了解整體架構,在深入感興趣的細節,這是我的

    IIS連接IIS並連接IIS最大並工作線程應用程序池的列長度

    這就是 規範性 初級 展示 約會 第一次 數量 企業 通用 關於並發你真的了解嗎?(一) 前言:對於很多工作時間短或者編程經驗不足的程序員來說,大多數會覺得並發這個詞離自己太遙遠,之所以知道並發也不過是因為受那些技術大佬成天討論並發等問題耳濡目染罷了。更有甚者,一些

    QPS相關的概念收集(吞吐量(TPS)QPS響應時間(RT))

    臺電腦 接受 邏輯 .cn 客戶 lan 頁面 增長 value 一、概念: 1、響應時間(RT) 響應時間是指系統對請求作出響應的時間。直觀上看,這個指標與人對軟件性能的主觀感受是非常一致的,因為它完整地記錄了整個計算機系統處理請求的時間。由於一個系統通常會提供許多

    Nginx並每秒連接下載速度限制防攻擊殺手鐧

    意思 bsp p s address ket agen 將在 hit sun 1.限制IP訪問頻率:HttpLimitZoneModule 限制並發連接數實例limit_zone只能定義在http作用域,limit_conn可以定義在http server location

    ActiveMQ 送和就消息

    ace listen OS factor row conn ack 多個 tar 一、添加 jar 包 <dependency> <groupId>org.apache.activemq</groupId> <ar