1. 程式人生 > >JMS-訊息中介軟體

JMS-訊息中介軟體

一、簡介 1、JMS(Java Message Service),即:java訊息服務應用程式介面。 2、是Java平臺面向訊息中介軟體(MOM)的API/技術規範。 3、場景:應用與兩個應用程式之間,或者分散式系統架構中分發訊息,可進行非同步/同步方式的通訊,和平臺API無關,          基本多數的MOM都提供對JMS的支援

二、訊息模型

在JMS標準中,有兩種訊息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

Queue與Topic 主要區別

三、JMS 體系架構

(1) ConnectionFactory

建立Connection物件的工廠,針對兩種不同的jms訊息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查詢ConnectionFactory物件。

(2) Destination

Destination的意思是訊息生產者的訊息傳送目標或者說訊息消費者的訊息來源。對於訊息生產者來說,它的Destination是某個佇列(Queue)或某個主題(Topic);對於訊息消費者來說,它的Destination也是某個佇列或主題(即訊息來源)。

所以,Destination實際上就是兩種型別的物件:Queue、Topic可以通過JNDI來查詢Destination。

(3) Connection

Connection表示在客戶端和JMS系統之間建立的連結(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種型別:QueueConnection和TopicConnection。

(4) Session

Session是操作訊息的介面。可以通過session建立生產者、消費者、訊息等。Session提供了事務的功能。當需要使用session傳送/接收多個訊息時,可以將這些傳送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。

(5) 訊息的生產者

訊息生產者由Session建立,並用於將訊息傳送到Destination。同樣,訊息生產者分兩種型別:QueueSender和TopicPublisher。可以呼叫訊息生產者的方法(send或publish方法)傳送訊息。

(6) 訊息消費者

訊息消費者由Session建立,用於接收被髮送到Destination的訊息。兩種型別:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。

(7) MessageListener

訊息監聽器。如果註冊了訊息監聽器,一旦訊息到達,將自動呼叫監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。

四、常用訊息佇列

Active MQ,Rabbit MQ,Zero MQ,Kafka等是常見的JMS實現方式,一般實現的方法基本差不多。

功能描述:生產者將訊息傳送到佇列(佇列的名字為hello)中,消費者從佇列中獲取訊息。

1、Rabbit MQ 實現

(1)新建maven工程 ,引入依賴

<!-- RabbiteMQ Java客戶端 -->         <dependency>             <groupId>com.rabbitmq</groupId>             <artifactId>amqp-client</artifactId>             <version>4.0.3</version>         </dependency>

        <!--測試 -->         <dependency>             <groupId>junit</groupId>             <artifactId>junit</artifactId>             <version>4.7</version>             <scope>test</scope>         </dependency>

(2)編寫生產者、消費者

生產者:

package com.xdl.test;

import java.io.IOException; import java.util.concurrent.TimeoutException;

import org.junit.Test;

import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;

/**  * 生產者  */ public class Provider {

    @Test     public void testBasicPublish() throws IOException, TimeoutException {         // 建立連線工廠         ConnectionFactory factory = new ConnectionFactory();

        // 設定相關引數,地址,埠,賬號,密碼         factory.setHost("127.0.0.1");         factory.setPort(AMQP.PROTOCOL.PORT); // 5672         factory.setUsername("guest");         factory.setPassword("guest");

        // 新建一個長連線         Connection connection = factory.newConnection();

        // 建立一個通道(一個輕量級的連線)         Channel channel = connection.createChannel();

        // 宣告一個佇列         String QUEUE_NAME = "hello";

        // 1-佇列名稱 2-佇列是否持久化 3-是否是排他佇列 4-使用完之後是否刪除此佇列 5-其他屬性         channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 傳送訊息到佇列中         String message = "Hello RabbitMQ!";

//        // 建立路由 1-路由名稱 2-路由型別   //        channel.exchangeDeclare("myexchange", "topic"); //        // 繫結路由佇列 1-佇列名稱 2-路由名稱 3-routing key //        channel.queueBind("heelo", "myexchange", "shensha"); //        // 傳送訊息 1-路由名稱 2-routing key3-其他資訊 4-訊息位元組陣列 //        channel.basicPublish("myexchange", "shensha", null, "HelloWorld".getBytes());                  // 注意:exchange如果不需要寫成空字串,routingKey和佇列名稱保持一致         channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));         System.out.println("Producer Send a message:" + message);

        // 關閉資源         channel.close();         connection.close();     }

} 消費者:

package com.xdl.test;

import java.io.UnsupportedEncodingException; import org.junit.Test;

import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope;

/**  * 消費者  * */ public class Consumer {

         @Test     public  void testBasicConsumer() throws Exception{         ConnectionFactory factory = new ConnectionFactory();         factory.setHost("127.0.0.1");         factory.setPort(AMQP.PROTOCOL.PORT);    // 5672         factory.setUsername("guest");          factory.setPassword("guest");

        // 新建一個長連線         Connection connection = factory.newConnection();

        // 建立一個通道(一個輕量級的連線)         Channel channel = connection.createChannel();                  // 宣告一個佇列         String QUEUE_NAME = "hello";         channel.queueDeclare(QUEUE_NAME, false, false, false, null);         System.out.println("Consumer Wating Receive Message");

        //消費訊息.    1-消費佇列    2-是否自動傳送訊息回執    3-回撥函式        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){             @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {                 String message = new String(body, "UTF-8");                 System.out.println(" [C] Received '" + message + "'");             }         };                 // 訂閱訊息         channel.basicConsume(QUEUE_NAME, true, consumer);     } } (3)安裝RabbitMQ,並啟動訪問

第一步:找到mq的安裝目錄sbin下輸入cmd

輸入:命令rabbitmq-plugins enable rabbitmq_management,訪問地址:http://127.0.0.1:15672/ 預設賬號:guest/guest

ActiveMQ :

主要實現: producer.send(message); //傳送者     consumer.receive(100000); //接受中