1. 程式人生 > >01-訊息中介軟體概述和ActiveMq入門

01-訊息中介軟體概述和ActiveMq入門

1.mq解決的問題

  • 系統非同步處理
  • 應用解耦
  • 流量削峰
  • 日誌處理
  • 訊息通訊

2.訊息中介軟體的2中模型

2.1 Point-to-Point(P2P) / 點對點 / 類比:送快遞

image

特點:
+ 一個消費生產者必須有一個訊息消費者。一對一的關係
+ 一個訊息傳送到queue中,如果mqserver重啟,訊息不會丟失(當然也可以設定為丟失。預設是不會丟失的)

2.2 Topic/ 主題(釋出訂閱(Pub/Sub) )/類比:廣播

image

特點:
+ 一個生產者生產的訊息可以同時被多個訊息消費者消費。一對多。
+ 一個消費者可以消費來自不同生產者的訊息。

3. Java Messaging Service規範

3.1 JMS規範模型包含如下幾個要素

  • 連線工廠
  • 獲取連線
  • 建立會話
  • JMS的目的/broker
  • 建立生產者
  • 建立消費者

5. Hello ActiveMQ

5.1 原生api


package com.hs.gz.hellodemo.mq.demo;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;

/**
 * 生產者
 * @author hasee
 *
 */
public class Producer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String BROKEURL = "tcp://139.199.158.112:61616";
    private static final int SENDNUM = 3;
    public static void main(String[] args) throws Exception {
        //工廠
        ConnectionFactory factory;
        //連線
        Connection connection;
        //會話
        Session session;
        //目的地
        Destination destination;
        //消費者
        MessageProducer producer;
        //指定使用者名稱,密碼和url來建立連線工廠
        factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
        //從連線工廠中獲取麗連線
        connection = factory.createConnection();
        //從連線中建立session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         /* 建立一個名為HelloWorld訊息佇列*/
        destination = session.createQueue("HelloWorld");
        /*往佇列裡面註冊生產者*/
        producer = session.createProducer(destination);
        for (int i = 0; i < SENDNUM; i++) {
             String msg = "傳送訊息"+i+" "+System.currentTimeMillis();
             TextMessage textMessage = session.createTextMessage(msg);
             producer.send(textMessage);
        }
        System.out.println("生成者生產ok.....");
        producer.close();
        session.close();
        connection.close();
    }
}

package com.hs.gz.hellodemo.mq.demo;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消費者
 * @author hasee
 *
 */
public class Consumer {
    /* 預設連線使用者名稱 */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /* 預設連線密碼 */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /* 預設連線地址 */
    private static final String BROKEURL = "tcp://139.199.158.112:61616";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory;
        Connection connection;
        Session session;
        Destination destination = null;
        MessageConsumer consumer;
        factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
        connection = factory.createConnection();
         /* 啟動連線*/
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue("HelloWorld");
        consumer = session.createConsumer(destination);
        //一直監聽mqserver,如果有待消費的訊息就進行消費
        Message message;
        while((message = consumer.receive()) != null ) {
            System.out.println("consumer..." + ((TextMessage)message).getText());
        }
        
        consumer.close();
        session.close();
        connection.close();
    }
}