訊息佇列入門(四)ActiveMQ的應用例項
阿新 • • 發佈:2019-02-04
>>部署和啟動ActiveMQ
我下載的是apache-activemq-5.12.0-bin.tar.gz,
解壓到本地目錄,進入到bin路徑下,
執行activemq啟動ActiveMQ。
執行方式:
啟動 ./activemq start
ActiveMQ預設使用的TCP連線埠是61616,
5.0以上版本預設啟動時,開啟了內建的Jetty伺服器,可以進入控制檯檢視管理。
預設使用者名稱admin/admin。
這裡我在虛擬機器裡啟動,訪問地址:
http://192.168.106.128:8161/admin/
ActiveMQ的控制檯功能十分強大,管理起來也很直觀。
>>使用Java連線
建立POM檔案
在Eclipse中新建Java工程,這裡使用Maven管理依賴,
下面是pom.xml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
< project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> < modelVersion >4.0.0</ modelVersion >
< groupId >activemq-sample</ groupId >
< artifactId >activemq-sample</ artifactId >
< version >0.0.1-SNAPSHOT</ version >
< name >activemq-sample</ name >
< description >an activemq practice</ description >
< build >
< sourceDirectory >src</ sourceDirectory >
< plugins >
< plugin >
< artifactId >maven-compiler-plugin</ artifactId >
< version >3.1</ version >
< configuration >
< source >1.7</ source >
< target >1.7</ target >
</ configuration >
</ plugin >
<!-- activemq-core 5.7.0 使用bunble打包,需要新增相關外掛 -->
< plugin >
< groupId >org.apache.felix</ groupId >
< artifactId >maven-bundle-plugin</ artifactId >
< extensions >true</ extensions >
</ plugin >
</ plugins >
</ build >
< dependencies >
<!-- activemq的maven依賴 -->
< dependency >
< groupId >org.apache.activemq</ groupId >
< artifactId >activemq-core</ artifactId >
< version >5.7.0</ version >
< type >bundle</ type >
</ dependency >
</ dependencies >
</ project >
|
在第一次新增activemq的maven依賴時報錯,後來發現activemq-core 5.7.0採用了bundle的打包方式,
必須在pom中配置maven-bundle-plugin。
建立訊息建立者 MsgProducer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
import
javax.jms.Connection;
import
javax.jms.ConnectionFactory;
import
javax.jms.Destination;
import
javax.jms.JMSException;
import
javax.jms.MessageProducer;
import
javax.jms.Session;
import
javax.jms.TextMessage;
import
org.apache.activemq.ActiveMQConnectionFactory;
/**
* @Description: Message Producer
* @author: Bing Yue
*/
public
class MsgProducer {
//如果你在本地啟動,可以直接使用空的ActiveMQConnectionFactory建構函式
private static
final String BROKER_URL= "failover://tcp://192.168.106.128:61616" ;
public
static void main(String[] args) throws
JMSException, InterruptedException{
//建立連線工廠
ConnectionFactory connectionFactory= new ActiveMQConnectionFactory(BROKER_URL);
//獲得連線
Connection conn = connectionFactory.createConnection();
//start
conn.start();
//建立Session,此方法第一個引數表示會話是否在事務中執行,第二個引數設定會話的應答模式
Session session = conn.createSession( false , Session.AUTO_ACKNOWLEDGE);
//建立佇列
Destination dest = session.createQueue( "test-queue" );
//建立訊息生產者
MessageProducer producer = session.createProducer(dest);
for
( int i= 0 ;i< 100 ;i++) {
//初始化一個mq訊息
TextMessage message = session.createTextMessage( "這是第 " + i+ " 條訊息!" );
//傳送訊息
producer.send(message);
System.out.println( "send message:訊息" +i);
//暫停3秒
Thread.sleep( 3000 );
}
//關閉mq連線
conn.close();
}
}
|
建立訊息接收者 MsgProducer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
import
javax.jms.Connection;
import
javax.jms.ConnectionFactory;
import
javax.jms.Destination;
import
javax.jms.JMSException;
import
javax.jms.Message;
import
javax.jms.MessageConsumer;
import
javax.jms.MessageListener;
import
javax.jms.Session;
import
javax.jms.TextMessage;
import
org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @Description: Message Consumer
* @author: Bing Yue
*/
public
class MsgConsumer implements MessageListener {
private static
final String BROKER_URL= "failover://tcp://192.168.106.128:61616" ;
public
static void main(String[] args) throws
JMSException{
//建立連線工廠
ConnectionFactory connectionFactory= new ActiveMQConnectionFactory(BROKER_URL);
//獲得連線
Connection conn = connectionFactory.createConnection();
//start
conn.start();
//建立Session,此方法第一個引數表示會話是否在事務中執行,第二個引數設定會話的應答模式
Session session = conn.createSession( false , Session.AUTO_ACKNOWLEDGE);
//建立佇列
Destination dest = session.createQueue( "test-queue" );
//建立訊息生產者
MessageConsumer consumer = session.createConsumer(dest);
//初始化MessageListener
MsgConsumer msgConsumer = new
MsgConsumer();
//給消費者設定監聽物件
consumer.setMessageListener(msgConsumer);
}
/**
* 消費者需要實現MessageListener介面
* 介面有一個onMessage(Message message)需要在此方法中做訊息的處理
*/
@Override
public
void onMessage(Message msg) {
TextMessage txtMessage = (TextMessage)msg;
try
{
System.out.println( "get message:" + txtMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
|
執行MsgProducer,
登入後臺檢視test-queue佇列,可以看到發出的訊息正在等待被處理:
執行MsgConsumer,接收訊息並在控制檯列印:
通過這個例項可以對ActiveMQ的應用有一個簡單的瞭解。
在實際開發中,通常還需要設定優先順序處理,大部分情況下,訊息的傳送和接收方都會啟用多執行緒,
通過執行緒池來提高處理效率,解耦的同時保持業務處理能力。