【ActiveMQ】二 基礎知識
activeMQ
一 ActiveMQ簡介
1.1 什麼是ActiveMQ
ActiveMQ是Apache推出的,一款開源的,完全支援JMS1.1和J2EE 1.4規範的JMS Provider實現的訊息中介軟體(Message Oriented Middleware,MOM),實際上為什麼把MQ叫做訊息中介軟體。它最初的來源當然是由於系統A與系統B之間有訊息的傳遞。這個時候我們把系統A與系統B之間訊息傳遞的過程打斷。A與B通過MQ來間接通訊的過程。所以這個時候的MQ就叫做訊息中介軟體。
1.2 ActiveMQ的作用
最主要的功能就是:實現JMS Provider,用來幫助實現高可用、高效能、可伸縮、 易用和安全的企業級面向訊息服務的系統。
1.3 ActiveMQ特點
完全支援JMS1.1和J2EE 1.4規範(持久化,XA訊息,事務)
支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
可插拔的體系結構,可以靈活定製,如:訊息儲存方式、安全管理等
很容易和Application Server整合使用
多種語言和協議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP
從設計上保證了高效能的叢集,客戶端-伺服器,點對點
可以很容易的和Spring結合使用
支援通過JDBC和journal提供高速的訊息持久化
支援與Axis的整合。
1.5 ActiveMQ的主要功能
將資訊以訊息的形式,從一個應用程式傳送到另一個或多個應用程式。
1.6 ActiveMQ的主要特點
1:訊息非同步接受,類似手機簡訊的行為,訊息傳送者不需要等待訊息接受者的響應,減少軟體多系統整合的耦合度。
2:訊息可靠接收,確保訊息在中介軟體可靠儲存,只有接收方收到後才刪除訊息,多個訊息也可以組成原子事務。
1.7 ActiveMQ的主要應用場景
在多個系統間進行整合和通訊的時候,通常會要求:
1:可靠傳輸,資料不能丟失,有的時候,也會要求不能重複傳輸;
2:非同步傳輸,否則各個系統同步傳送接受資料,互相等待,造成系統瓶頸
1.8 比較知名的訊息中介軟體
IBM MQSeries
BEA WebLogicJMS Server
Oracle AQ
Tibco
SwiftMQ
AcitveMQ:是免費的java實現的訊息中介軟體
二 ActiveMQ安裝與基本使用
注意:安裝gcc,jdk等
2.1 安裝解壓
ActiveMQ伺服器端 1:從http://activemq.apache.org/download.html下載最新的ActiveMQ
2.2 啟動執行
1:普通啟動:到ActiveMQ/bin下面,./activemq start
2:啟動並指定日誌檔案 ./activemq start > /tmp/activemqlog
2.3 啟動檢查
ActiveMQ預設採用61616埠提供JMS服務,使用8161埠提供管理控制檯服 務,執行以下命令以便檢驗是否已經成功啟動ActiveMQ服務:
1:比如檢視61616埠是否開啟: netstat -an | grep 61616
2:也可以直接檢視控制檯輸出或者日誌檔案
3:還可以直接訪問ActiveMQ的管理頁面:
預設的使用者名稱和密碼是admin/admin
2.4 停止ActiveMQ
可以用./activemq stop
暴力點的可以用ps aux| grep activemq 來得到程序號,然後kill掉
2.5 生產者
public class MsgSendder {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection connection = ConnectionFactoryconnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("message--" + i);
Thread.sleep(1000);
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
2.6 消費者
public class MsgReceiver {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageConsumer consumer = session.createConsumer(destination);
int i = 0;
while (i < 3) {
i++;
TextMessage message = (TextMessage) consumer.receive();
session.commit();
System.out.println("收到訊息:" + message.getText());
}
session.close();
connection.close();
}
}
三 activeMQ訊息通訊
3.1 p2p的訊息通訊
3.1.1 producer
public class MsgSendder {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection connection = ConnectionFactoryconnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("message--" + i);
Thread.sleep(1000);
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
3.1.2 consumer
public class MsgReceiver {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageConsumer consumer = session.createConsumer(destination);
int i = 0;
while (i < 3) {
i++;
TextMessage message = (TextMessage) consumer.receive();
session.commit();
System.out.println("收到訊息:" + message.getText());
}
session.close();
connection.close();
}
}
3.1.3 管理平臺
3.2 非持久化訊息
3.2.1 producer
public class NonPersisiTopicSender {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection createConnection = conFactory.createConnection();
createConnection.start();
Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic createTopic = createSession.createTopic("蜘蛛俠");
MessageProducer createProducer = createSession.createProducer(createTopic);
for(int i=0;i<3;i++){
TextMessage createTextMessage = createSession.createTextMessage("message"+i);
createProducer.send(createTextMessage);
}
createSession.commit();
createSession.close();
createConnection.close();
}
}
3.2.2 consumer
public class NonPersisiTopicReceiver {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection createConnection = activeMQConnectionFactory.createConnection();
createConnection.start();
Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic createTopic = createSession.createTopic("蜘蛛俠");
MessageConsumer createConsumer = createSession.createConsumer(createTopic);
TextMessage message = (TextMessage)createConsumer.receive();
while(message!=null){
System.out.println(message.getText());
message = (TextMessage)createConsumer.receive();
}
createSession.commit();
createSession.close();
createConnection.close();
}
}
3.2.3 管理平臺
3.3 持久化訊息
3.3.1 producer
public class PersisiTopicSender {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection createConnection = conFactory.createConnection();
Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic createTopic = createSession.createTopic("persisitent");
MessageProducer createProducer = createSession.createProducer(createTopic);
createProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
createConnection.start();
for(int i=0;i<3;i++){
TextMessage createTextMessage = createSession.createTextMessage("message"+i);
createProducer.send(createTextMessage);
}
createSession.commit();
createSession.close();
createConnection.close();
}
}
3.3.2 consumer
public class PersisiTopicReceiver {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection createConnection = activeMQConnectionFactory.createConnection();
createConnection.setClientID("訂閱者B_ID");
createConnection.start();
Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic createTopic = createSession.createTopic("persisitent");
TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "T1");
TextMessage message = (TextMessage)createDurableSubscriber.receive();
while(message!=null){
System.out.println(message.getText());
message = (TextMessage)createDurableSubscriber.receive();
}
createSession.commit();
createSession.close();
createConnection.close();
}
}
3.3.3 管理平臺
3.4 總結
3.4.1持久化訊息
這是 ActiveMQ 的預設傳送模式,此模式保證這些訊息只被傳送一次和成功使用一次。對於這些訊息,可靠性是優先考慮的因素。可靠性的另一個重要方面是確保永續性訊息傳送至目標後,訊息服務在向消費者傳送它們之前不會丟失這些訊息。 這意味著在永續性訊息傳送至目標時,訊息服務將其放入永續性資料儲存。如果訊息服務由於某種原因導致失敗,它可以恢復此訊息並將此訊息傳送至相應的消費者。雖然這樣增加了訊息傳送的開銷,但卻增加了可靠性。
3.4.2非持久化訊息
保證這些訊息最多被傳送一次。對於這些訊息,可靠性並非主要的考慮因素。 此模式並不要求永續性的資料儲存,也不保證訊息服務由於某種原因導致失敗後訊息不會丟失。有兩種方法指定傳送模式:
1.使用setDeliveryMode 方法,這樣所有的訊息都採用此傳送模式; 如: producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
2.使用send 方法為每一條訊息設定傳送模式