1. 程式人生 > >【ActiveMQ】二 基礎知識

【ActiveMQ】二 基礎知識

activeMQ

一 ActiveMQ簡介

1.1 什麼是ActiveMQ

ActiveMQ是Apache推出的,一款開源的,完全支援JMS1.1和J2EE 1.4規範的JMS Provider實現的訊息中介軟體(Message Oriented Middleware,MOM),實際上為什麼把MQ叫做訊息中介軟體。它最初的來源當然是由於系統A與系統B之間有訊息的傳遞。這個時候我們把系統A與系統B之間訊息傳遞的過程打斷。A與B通過MQ來間接通訊的過程。所以這個時候的MQ就叫做訊息中介軟體。

1.2 ActiveMQ的作用

最主要的功能就是:實現JMS Provider,用來幫助實現高可用、高效能、可伸縮、 易用和安全的企業級面向訊息服務的系統。

1.3 ActiveMQ特點

完全支援JMS1.1和J2EE 1.4規範(持久化,XA訊息,事務)

支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

可插拔的體系結構,可以靈活定製,如:訊息儲存方式、安全管理等

很容易和Application Server整合使用

多種語言和協議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP

從設計上保證了高效能的叢集,客戶端-伺服器,點對點

可以很容易的和Spring結合使用

支援通過JDBC和journal提供高速的訊息持久化

支援與Axis的整合。

1.5 ActiveMQ的主要功能

將資訊以訊息的形式,從一個應用程式傳送到另一個或多個應用程式。

1.6 ActiveMQ的主要特點

         1:訊息非同步接受,類似手機簡訊的行為,訊息傳送者不需要等待訊息接受者的響應,減少軟體多系統整合的耦合度。

2:訊息可靠接收,確保訊息在中介軟體可靠儲存,只有接收方收到後才刪除訊息,多個訊息也可以組成原子事務。

1.7 ActiveMQ的主要應用場景

在多個系統間進行整合和通訊的時候,通常會要求:

 1:可靠傳輸,資料不能丟失,有的時候,也會要求不能重複傳輸;

2:非同步傳輸,否則各個系統同步傳送接受資料,互相等待,造成系統瓶頸

1.8 比較知名的訊息中介軟體

IBM MQSeries

BEA WebLogicJMS Server

Oracle AQ 

Tibco

SwiftMQ

AcitveMQ:是免費的java實現的訊息中介軟體

二 ActiveMQ安裝與基本使用

注意:安裝gcc,jdk

2.1 安裝解壓

ActiveMQ伺服器端 1:從http://activemq.apache.org/download.html下載最新的ActiveMQ

2.2 啟動執行

 1:普通啟動:到ActiveMQ/bin下面,./activemq start

2:啟動並指定日誌檔案 ./activemq start > /tmp/activemqlog

2.3 啟動檢查

 ActiveMQ預設採用61616埠提供JMS服務,使用8161埠提供管理控制檯服 務,執行以下命令以便檢驗是否已經成功啟動ActiveMQ服務:

 1:比如檢視61616埠是否開啟: netstat -an | grep 61616

 2:也可以直接檢視控制檯輸出或者日誌檔案

3:還可以直接訪問ActiveMQ的管理頁面:

預設的使用者名稱和密碼是admin/admin

2.4 停止ActiveMQ

可以用./activemq stop

暴力點的可以用ps aux| grep activemq 來得到程序號,然後kill掉

2.5 生產者

public class MsgSendder {

         public static void main(String[] args) throws Exception {

                  ActiveMQConnectionFactory ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");

                   Connection connection = ConnectionFactoryconnectionFactory.createConnection();

                   connection.start();

                   Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

                   Destination destination = session.createQueue("my-queue");

                   MessageProducer producer = session.createProducer(destination);

                            for (int i = 0; i < 3; i++) {

                                     TextMessage message = session.createTextMessage("message--" + i);

                                               Thread.sleep(1000);

                                               producer.send(message);

                            }

                            session.commit();

                            session.close();

                            connection.close();

                   }

}

2.6 消費者

public class MsgReceiver {

    public static void main(String[] args) throws Exception {

        ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");

        Connection connection = cf.createConnection();

        connection.start();

        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createQueue("my-queue");

        MessageConsumer consumer = session.createConsumer(destination);

        int i = 0;

        while (i < 3) {

            i++;

            TextMessage message = (TextMessage) consumer.receive();

            session.commit();

            System.out.println("收到訊息:" + message.getText());

        }

        session.close();

        connection.close();

    }

}

三 activeMQ訊息通訊

3.1 p2p的訊息通訊

3.1.1 producer

public class MsgSendder {

    public static void main(String[] args) throws Exception {

        ActiveMQConnectionFactory ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");

        Connection connection = ConnectionFactoryconnectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createQueue("my-queue");

        MessageProducer producer = session.createProducer(destination);

            for (int i = 0; i < 3; i++) {

                TextMessage message = session.createTextMessage("message--" + i);

                    Thread.sleep(1000);

                    producer.send(message);

            }

            session.commit();

            session.close();

            connection.close();

        }

}

3.1.2 consumer

public class MsgReceiver {

    public static void main(String[] args) throws Exception {

       ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");

       Connection connection = cf.createConnection();

       connection.start();

       final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

       Destination destination = session.createQueue("my-queue");

       MessageConsumer consumer = session.createConsumer(destination);

       int i = 0;

       while (i < 3) {

           i++;

           TextMessage message = (TextMessage) consumer.receive();

           session.commit();

           System.out.println("收到訊息:" + message.getText());

       }

       session.close();

       connection.close();

    }

}

3.1.3 管理平臺

3.2 非持久化訊息

3.2.1  producer

   public class NonPersisiTopicSender {

     public static void main(String[] args) throws JMSException {

                       ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");

                      Connection createConnection = conFactory.createConnection();

                       createConnection.start();

                       Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

                       Topic createTopic = createSession.createTopic("蜘蛛俠");

                       MessageProducer createProducer = createSession.createProducer(createTopic);

                       for(int i=0;i<3;i++){

                                TextMessage createTextMessage = createSession.createTextMessage("message"+i);

                                createProducer.send(createTextMessage);

                       }

                       createSession.commit();

                       createSession.close();

                       createConnection.close();

          }

}

3.2.2  consumer

public class NonPersisiTopicReceiver {

    public static void main(String[] args) throws JMSException {

       ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");

       Connection createConnection = activeMQConnectionFactory.createConnection();

       createConnection.start();

       Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

       Topic createTopic = createSession.createTopic("蜘蛛俠");

       MessageConsumer createConsumer = createSession.createConsumer(createTopic);

       TextMessage message = (TextMessage)createConsumer.receive();

       while(message!=null){

           System.out.println(message.getText());

           message = (TextMessage)createConsumer.receive();

       }

       createSession.commit();

       createSession.close();

       createConnection.close();

    }

}

3.2.3  管理平臺

3.3 持久化訊息

3.3.1  producer

public class PersisiTopicSender {

     public static void main(String[] args) throws JMSException {

         ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");

         Connection createConnection = conFactory.createConnection();

         Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

         Topic createTopic = createSession.createTopic("persisitent");

         MessageProducer createProducer = createSession.createProducer(createTopic);

         createProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

         createConnection.start();

         for(int i=0;i<3;i++){

             TextMessage createTextMessage = createSession.createTextMessage("message"+i);

             createProducer.send(createTextMessage);

         }

         createSession.commit();

         createSession.close();

         createConnection.close();

     }

}

3.3.2  consumer

public class PersisiTopicReceiver {

    public static void main(String[] args) throws JMSException {

        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");

        Connection createConnection = activeMQConnectionFactory.createConnection();

        createConnection.setClientID("訂閱者B_ID");

        createConnection.start();

        Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

        Topic createTopic = createSession.createTopic("persisitent");

        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "T1");

        TextMessage message = (TextMessage)createDurableSubscriber.receive();

        while(message!=null){

            System.out.println(message.getText());

            message = (TextMessage)createDurableSubscriber.receive();

        }

        createSession.commit();

        createSession.close();

        createConnection.close();

    }

}

3.3.3  管理平臺

3.4 總結

3.4.1持久化訊息

這是 ActiveMQ 的預設傳送模式,此模式保證這些訊息只被傳送一次和成功使用一次。對於這些訊息,可靠性是優先考慮的因素。可靠性的另一個重要方面是確保永續性訊息傳送至目標後,訊息服務在向消費者傳送它們之前不會丟失這些訊息。 這意味著在永續性訊息傳送至目標時,訊息服務將其放入永續性資料儲存。如果訊息服務由於某種原因導致失敗,它可以恢復此訊息並將此訊息傳送至相應的消費者。雖然這樣增加了訊息傳送的開銷,但卻增加了可靠性。

3.4.2非持久化訊息

保證這些訊息最多被傳送一次。對於這些訊息,可靠性並非主要的考慮因素。 此模式並不要求永續性的資料儲存,也不保證訊息服務由於某種原因導致失敗後訊息不會丟失。有兩種方法指定傳送模式:

1.使用setDeliveryMode 方法,這樣所有的訊息都採用此傳送模式; 如: producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

2.使用send 方法為每一條訊息設定傳送模式