1. 程式人生 > >學習之路-RabbitMQ(三):RabbitMQ的入門程式

學習之路-RabbitMQ(三):RabbitMQ的入門程式

1.建立maven工程
在這裡插入圖片描述
2.分別在兩個工程中匯入依賴

<dependency>
	<groupId>com.rabbitmq</groupId> 
	<artifactId>amqp‐client</artifactId> 
	<version>4.0.3</version>
	<!‐‐此版本與spring boot 1.5.9版本匹配‐‐>
</dependency>
<dependency>
    <groupId>org.springframework.boot</
groupId
>
<artifactId>spring‐boot‐starter‐logging</artifactId> </dependency>

3.建立生產著

package com.xuecheng.rabbitmq.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException; /** * @Auther: 星仔 * @Date: 2018/12/24 21:31 * @Description: */ public class ProducerTest01 { //佇列名稱 private static final String QUEUE = "helloworld"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection =
null; Channel channel = null; try { //建立連線工廠,建立連線 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); //建立虛擬主機,rabbitmq預設虛擬機器名稱為“/”,虛擬機器相當於一個獨立的mq伺服器 factory.setVirtualHost("/"); //創建於MQ服務的TCP連線 connection = factory.newConnection(); //創建於EXchange的通道,每一個通道都相當於一個會話事務 channel = connection.createChannel(); //宣告佇列,如果RabbitMQ中沒有該佇列,則會建立 /** *引數:String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> params *引數明細: * 1. queue:佇列名稱 * 2. durable:是否持久化,如果持久化,將MQ重啟之後佇列還在 * 3. exclusive: 是否獨佔連線,佇列只允許在該佇列中訪問,一旦連線關閉,該佇列將自動刪除,如果將此引數設定為true,那麼可用於臨時佇列的建立 * 4. autoDelete:自動刪除,佇列不再使用時是否關閉,如果將此引數設定為true將exclusive設定為true,可用於建立臨時佇列 * 5. params: 可以設定佇列的一些擴充套件引數,比如設定存活時間等等 */ channel.queueDeclare(QUEUE,true,false,false,null); //傳送訊息 /** * 引數:String exchange,String routingKey,String props,Byte[] body * 引數明細: * 1、exchange: 交換機,如果不使用,將使用MQ的預設交換機 * 2、routingKey: 路由key,交換機根據路由key將訊息轉發到指定的佇列,如果使用預設交換機,routingKey設定為佇列的名稱 * 3、props:訊息的屬性 * 4、body: 訊息內容 */ String msg = "helllo world"; channel.basicPublish("",QUEUE,null,msg.getBytes()); } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); }finally{ //關閉連線,先關閉通道,在關閉連線 if(channel!=null){ channel.close(); } if(connection!=null){ connection.close(); } } } }

可以登入RabbitMQ的客戶端進行檢視,已經多了一個helloworld的佇列,其中有一條訊息待銷費
4.建立消費者來消費訊息

package com.xuecheng.rabbitmq.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: 星仔
 * @Date: 2018/12/24 22:43
 * @Description:
 */
public class ConsumerTest01 {

    //佇列名稱
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連線工廠,建立連線
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //建立虛擬主機,rabbitmq預設虛擬機器名稱為“/”,虛擬機器相當於一個獨立的mq伺服器
        factory.setVirtualHost("/");
        //創建於MQ服務的TCP連線
        Connection connection = factory.newConnection();
        //創建於EXchange的通道,每一個通道都相當於一個會話事務
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE,true,false,false,null);
        //宣告消費訊息的方法
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /**
             * 消費者接收訊息呼叫此方法
             * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定
             * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
             * @param properties 訊息屬性
             * @param body 訊息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException{
                //交換機
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //訊息id,mq在channel中用來標識訊息的id,用於確認訊息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //訊息內容
                String msg = new String(body,"utf-8"); System.out.println("receive message.." + msg);
            }
        };
        /**
         * 監聽佇列
         * 引數:String queue, boolean autoAck,Consumer callback
         * 引數明細:
         * 1.queue: 佇列名稱
         * 2.autoAck: 自動回覆,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動回覆
         * 3.callback: 消費訊息的方法,消費者接收到訊息後呼叫此方法
         */
        channel.basicConsume(QUEUE,true,consumer);
    }
}

總結:
1、傳送端操作流程
1)建立連線
2)建立通道
3)宣告佇列
4)傳送訊息
2、接收端
1)建立連線
2)建立通道
3)宣告佇列
4)監聽佇列
5)接收訊息
6)ack回覆