1. 程式人生 > >Active MQ延時傳送訊息的Java示例程式碼段

Active MQ延時傳送訊息的Java示例程式碼段

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQDestination;
 
/**
 * <b>function:</b> 訊息傳送者
 * @author hoojo
 * @createDate 2013-6-19 上午11:26:43
 * @file MessageSender.java
 * @package com.hoo.mq.jms
 * @project ActiveMQ-5.8
 * @blog http://blog.csdn.net/IBM_hoojo
 * @email 
[email protected]
* @version 1.0 */ public class TestActiveMQ { // tcp 地址 public static final String BROKER_URL = "failover:(tcp://localhost:61616)"; // 目標,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp public static final String DESTINATION = "kagula.queue"; /** * <b>function:</b> 傳送訊息 * @author hoojo * @createDate 2013-6-19 下午12:05:42 * @param session * @param producer * @throws Exception */ public static void sendMessage(Session session, MessageProducer producer) throws Exception { //例一: String message = "直接傳送資料"; TextMessage tm = session.createTextMessage(message); producer.send(tm); //例二: //需要修改activemq.xml才能生效,在<broker>裡新增屬性schedulerSupport="true" message = "延時10秒傳送資料"; TextMessage tm2 = session.createTextMessage("Send Message After 10 seconds!"); long delayTime = 1 * 10 * 1000; tm2.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayTime); producer.send(tm2); } public static void run() throws Exception { Connection connection = null; Session session = null; try { // 建立連結工廠 ConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 通過工廠建立一個連線 connection = factory.createConnection(); // 啟動連線 connection.start(); // 建立一個session會話 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個訊息佇列 Destination destination = session.createQueue(DESTINATION); // 建立訊息製作者 MessageProducer producer = session.createProducer(destination); // 設定持久化模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, producer); // 提交會話 session.commit(); } catch (Exception e) { throw e; } finally { // 關閉釋放資源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void ClearMessage() { // ConnectionFactory :連線工廠,JMS 用它建立連線 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連線 Connection connection = null; // Session: 一個傳送或接收訊息的執行緒 Session session; // Destination :訊息的目的地;訊息傳送給誰. Destination destination; // 消費者,訊息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); try { // 構造從工廠得到連線物件 connection = connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連線 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置 destination = session.createQueue(DESTINATION); consumer = session.createConsumer(destination); while (true) { TextMessage message = (TextMessage) consumer.receive(10); if (null != message) { System.out.println("把訊息拿下來但不處理!"); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } /** * 刪除佇列 * 如果有consumer連在佇列上,刪除佇列會失敗! * 所以不建議使用 * 這個函式有程序阻塞問題 * @param url * @param queueName */ public static void ClearQueue(String url,String queueName) { ActiveMQConnection con = null; try { con = (ActiveMQConnection) new ActiveMQConnectionFactory(url).createConnection(); Destination queue=null; queue = con.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue(queueName); con.start(); con.destroyDestination((ActiveMQDestination) queue); } catch (JMSException e) { e.printStackTrace(); } finally { try { con.stop(); } catch(Exception e) { e.printStackTrace(); } }//end finally }//end func /* * Active MQ後臺管理頁面,使用者名稱密碼分別為admin,admin * http://127.0.0.1:8161/admin/ * */ public static void main(String[] args) throws Exception { System.out.println("program begin!"); //刪除佇列 //ClearQueue(BROKER_URL,DESTINATION); //通過取佇列中的訊息,把佇列中的訊息刪除 ClearMessage(); //傳送訊息示例 TestActiveMQ.run(); System.out.println("program end!"); }//end function }//end class


相關推薦

Active MQ傳送訊息Java示例程式碼

import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import java

springboot整合rabbitMq實現訊息傳送

實現思路:利用mq的ttl設定訊息失效時間 當達到設定時間後通過交換機到達死信佇列中,消費者端繫結讀取死信佇列中資訊來達到延時傳送訊息的功能。 demo 如下: (1)在pom.xml 中引入rabbitMq相關包 <dependency>

Redis實現可靠低訊息佇列

不過因為使用的是decrBy會導致一種情況出現,當前庫存還剩1個,2個執行緒同時請求,一個請求減去1個庫存,另一個請求減去2個庫存,,如果減去2個庫存的先執行,他會返回一個-1,然後我會加回去,但是在加回去的之前,減1庫存的執行緒執行了,會返回-2,依然沒有辦法減庫存成功,所以在這中情況下,我採用當減庫存返回

執行儲存過程,返回引數示例程式碼

//引數 SqlParameter[] fparams = new SqlParameter[7]; fparams[0] = new SqlParameter("@ID", ID); fparams[

kafka學習(1)linux下的安裝和啟動,以及Java示例程式碼

1. 安裝 1.1 下載kafka並解壓 wget http://mirror.bit.edu.cn/apache/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz tar -zxvf kafka_2.11-0.11.0.0.tgz 1.2 修改

【語音處理】wav轉pcm mp3轉pcm Java示例程式碼

【語音處理】wav轉pcmJava示例程式碼 程式碼地址:https://gitee.com/xshuai/ai/blob/master/AIDemo/src/main/java/com/xs/au

微信公眾平臺接入java示例程式碼

第一步:申請訊息介面 需要申請訊息介面,很簡單隻需要在微信公眾平臺後臺填寫Servlet地址即可,這裡不多說。 第二步:驗證URL有效性 需要編寫URL有效性驗證程式碼,這裡以Java程式碼做示例,官網已給出PHP示例 開發者提交資訊後,微信伺服器將傳送GET請求

阿里MQ普通+順序+訊息 整合Spring

前言 由於公司專案需要,研究了下AliWareMQ。阿里mq的普通訊息和延時訊息還是挺簡單的。不過在順序訊息的時候出現了一些瓶頸。後來查閱原始碼和依據demo整理了一版融合Spring的版本。 例項 mq配置檔案(Spring) 主要是順序訊息的

Java關於DelayQueue做訊息推送

最近比較閒,看某專案原始碼時看到有用DelayQueue類來做延時的訊息推送。 DelayQueue是Delayed元素的一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部 是延遲期滿後儲存時間最長的Delayed元素。如果延遲都還沒有期滿,則佇列沒有頭部,並

Spring boot實戰專案整合阿里雲RocketMQ (非開源版)訊息佇列實現傳送普通訊息訊息 --附程式碼

一.為什麼選擇RocketMQ訊息佇列? 首先RocketMQ是阿里巴巴自研出來的,也已開源。其效能和穩定性從雙11就能看出來,借用阿里的一句官方介紹:歷年雙 11 購物狂歡節零點千萬級 TPS、萬億級資料洪峰,創造了全球最大的業務訊息併發以及流轉紀錄(日誌類訊息除外);  在始終保證高效能前提下

java的單例設計模式(對象的加載)考試專用

clas pac ack 延時加載 專用 public get private static java的單例設計模式(對象的延時加載) ,稱為:懶漢式 考試專用例:package day6;public class Single2 { //考試專用 ,對象的延時加

JAVA實現過期MAP 支持自定義過期觸發事件

keys 算法 public 寫入 hash pty static 實現 ssa 如題,直接上代碼: 1 import java.util.Iterator; 2 import java.util.concurrent.ConcurrentHashMap; 3

Java程式碼向指定的WebHook傳送訊息

1.我們通過Java來向某個WebHook地址傳送POST請求,並攜帶我們需要傳送的訊息 2.程式碼示例 搭建Maven專案,在pom.xml檔案裡引入httpclient依賴 <dependency> <groupId>org.apache.httpcompo

Redis 非同步訊息佇列與佇列

        訊息中介軟體,大家都會想到  Rabbitmq 和 Kafka 作為訊息佇列中介軟體,來給應用程式之間增加非同步訊息傳遞功能。這兩個中介軟體都是專業的訊息佇列中介軟體,特性之多超出了大多數人的理解能力。但是這種屬於重量級的應

訊息佇列

下面程式碼按需要填寫 @Bean public Queue delayQueuePerMessageTTL() { Map<String, Object> argument = new HashMap<>(); argument.put(“x-message-ttl

Java模擬壓測裝置傳送訊息到伺服器(Rabbitmq) python模擬上報訊息到rabbitMQ(protobuf)

進入idea,新建一個maven專案 主要是模擬150個裝置同時併發,併發時間持續15min 1.建立客戶端,構造請求傳送到對應的rabbitmq的佇列,用的protobuf協議。 import com.google.protobuf.ByteString; import com.

ccf歷年第四題java解答之-201503-4-網路(90分)

使用bfs求樹的直徑,執行超時,90分 import java.util.LinkedList; import java.util.Queue; import java.util.Scanner; class Node{ public int no; public int

IBM MQMQ傳送訊息

1、  檢視目前已建立的佇列管理器及執行狀態:dspmq 2、  在同一臺機器上模擬,建立並啟動兩個佇列管理器 分別用於SEND傳送和RE接收訊息 建立:crtmqm  SEND、crtmqm  RE 啟動:strmqm  SEND、strmqm  RE 3、  執行傳

java整合WebSocket向所有使用者傳送訊息

package com.reading.controller.library; import org.springframework.stereotype.Controller; import org.springframework.web.socket.server.st

效能提升五十倍:訊息佇列聚合通知的重要性

前言 這個話題對我而言,是影響很久的事情。從第一次使用訊息佇列開始,業務背景是報名系統通知到我們的系統。正常流量下資料都能正常通知過來,但遇到匯入報名人時,採用了Task非同步通知,資料量一大,佇列就死了。當時是儘量採用同步方式,減少併發量。  後來業務上有了專門的營銷系統