01-訊息中介軟體概述和ActiveMq入門
阿新 • • 發佈:2018-12-02
1.mq解決的問題
- 系統非同步處理
- 應用解耦
- 流量削峰
- 日誌處理
- 訊息通訊
2.訊息中介軟體的2中模型
2.1 Point-to-Point(P2P) / 點對點 / 類比:送快遞
特點:
+ 一個消費生產者必須有一個訊息消費者。一對一的關係
+ 一個訊息傳送到queue中,如果mqserver重啟,訊息不會丟失(當然也可以設定為丟失。預設是不會丟失的)
2.2 Topic/ 主題(釋出訂閱(Pub/Sub) )/類比:廣播
特點:
+ 一個生產者生產的訊息可以同時被多個訊息消費者消費。一對多。
+ 一個消費者可以消費來自不同生產者的訊息。
3. Java Messaging Service規範
3.1 JMS規範模型包含如下幾個要素
- 連線工廠
- 獲取連線
- 建立會話
- JMS的目的/broker
- 建立生產者
- 建立消費者
5. Hello ActiveMQ
5.1 原生api
package com.hs.gz.hellodemo.mq.demo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMessage; /** * 生產者 * @author hasee * */ public class Producer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL = "tcp://139.199.158.112:61616"; private static final int SENDNUM = 3; public static void main(String[] args) throws Exception { //工廠 ConnectionFactory factory; //連線 Connection connection; //會話 Session session; //目的地 Destination destination; //消費者 MessageProducer producer; //指定使用者名稱,密碼和url來建立連線工廠 factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL); //從連線工廠中獲取麗連線 connection = factory.createConnection(); //從連線中建立session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); /* 建立一個名為HelloWorld訊息佇列*/ destination = session.createQueue("HelloWorld"); /*往佇列裡面註冊生產者*/ producer = session.createProducer(destination); for (int i = 0; i < SENDNUM; i++) { String msg = "傳送訊息"+i+" "+System.currentTimeMillis(); TextMessage textMessage = session.createTextMessage(msg); producer.send(textMessage); } System.out.println("生成者生產ok....."); producer.close(); session.close(); connection.close(); } } package com.hs.gz.hellodemo.mq.demo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消費者 * @author hasee * */ public class Consumer { /* 預設連線使用者名稱 */ private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; /* 預設連線密碼 */ private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; /* 預設連線地址 */ private static final String BROKEURL = "tcp://139.199.158.112:61616"; public static void main(String[] args) throws Exception { ConnectionFactory factory; Connection connection; Session session; Destination destination = null; MessageConsumer consumer; factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL); connection = factory.createConnection(); /* 啟動連線*/ connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("HelloWorld"); consumer = session.createConsumer(destination); //一直監聽mqserver,如果有待消費的訊息就進行消費 Message message; while((message = consumer.receive()) != null ) { System.out.println("consumer..." + ((TextMessage)message).getText()); } consumer.close(); session.close(); connection.close(); } }