1. 程式人生 > >【RabbitMq】rabbitMq訊息確認機制

【RabbitMq】rabbitMq訊息確認機制

 

一、提出問題

生產者將訊息傳送出去後,訊息是否到達RabbitMq伺服器呢?預設的情況下,是不知道的

二、引入訊息確認機制

兩種方式:

          1.AMQP實現事務機制

          2.confirm模式

三、AMQP實現事務機制

  3.1 簡單示例

txSelect():使用者將當前channel設定成transaction
txCommit():提交事務
txRollback():回滾事務
package com.wj.transation.config;


import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @建立人 wj
 * @建立時間 2018/11/2
 * @描述
 */
public class MqConfig {
    public static Connection getConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection= factory.newConnection();
        return connection;
    }

}

生產者:

package com.wj.transation;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wj.transation.config.MqConfig;

/**
 * @建立人 wj
 * @建立時間 2018/11/5
 * @描述 用事務模式,進行訊息確認機制
 */
public class TxSend {

    private static final String QUEUE_NAME = "test_queue_tx";

    public static void send() throws Exception {
        Connection connection = MqConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "hello tx!";
        try {
            // 使用者將當前channel設定成transaction
            channel.txSelect();

            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            // 異常,使回滾
            int x=1/0;


            System.out.println("【傳送端】訊息:"+msg);
            //提交事務
            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
            System.out.println("發生異常,回滾");
        }
        channel.close();
        connection.close();
    }
}

 

消費者

package com.wj.transation;

import com.rabbitmq.client.*;
import com.wj.transation.config.MqConfig;

import java.io.IOException;

/**
 * @建立人 wj
 * @建立時間 2018/11/5
 * @描述
 */
public class TxRecv {
    private static final String QUEUE_NAME = "test_queue_tx";

    public static void recv() throws Exception {
        Connection connection = MqConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("[接收端]訊息:" + new String(body, "utf-8"));
            }
        });
    }
}

執行結果:

註釋異常,執行結果:

  3.2 缺點

使用事務機制的話會降低RabbitMQ的效能,降低了吞吐量,增加了channel的請求數,耗時

四、Confirm模式

confirm模式分為三種:(差異在於生產者,前面二者序列

普通模式(一個)

批處理 (多個)

非同步confirm模式

4.1普通模式

package com.wj.confirm.general;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wj.confirm.util.ConnectionUtil;



/**
 * @建立人 wj
 * @建立時間 2018/11/5
 * @描述 普通模式
 */
public class Send {

    private static final String QUEUE_NAME="general_queue";

    public static void send() throws Exception {
        Connection connection=ConnectionUtil.getConnection();
        Channel channel=connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //生產者呼叫confirmSelect 將channel設定為confirm模式
        channel.confirmSelect();

        String msg="hello confirm!";
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

        if (!channel.waitForConfirms()){
            System.out.println("訊息傳送失敗!");
        }else {
            System.out.println("訊息傳送成功!");
        }

    }
}

4.2批量模式

package com.wj.confirm.batch;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wj.confirm.util.ConnectionUtil;


/**
 * @建立人 wj
 * @建立時間 2018/11/5
 * @描述 批量模式
 */
public class Send {

    private static final String QUEUE_NAME = "batch_queue";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //生產者呼叫confirmSelect 將channel設定為confirm模式
        channel.confirmSelect();

        //        批量
        for (int i = 0; i < 10; i++) {
            String msg = "hello confirm! [" + i + "]";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }

        //確認
        if (!channel.waitForConfirms()) {
            System.out.println("訊息傳送失敗!");
        } else {
            System.out.println("訊息傳送成功!");
        }

    }
}

4.3非同步confirm模式

package com.wj.confirm.asynchronization;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.wj.confirm.util.ConnectionUtil;

import java.io.IOException;
import java.util.*;


/**
 * @建立人 wj
 * @建立時間 2018/11/5
 * @描述 批量模式
 */
public class Send {

    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //生產者呼叫confirmSelect 將channel設定為confirm模式
        channel.confirmSelect();
        //        未確認的訊息標識
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
        //        通道新增監聽
        channel.addConfirmListener(new ConfirmListener() {
            //            沒有問題的handleAck
            @Override
            public void handleAck(long deliverTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("---handleNack----multiple");
                    confirmSet.headSet(deliverTag + 1).clear();
                } else {
                    System.out.println("false");
                    confirmSet.remove(deliverTag);
                }
            }

            @Override
            public void handleNack(long deliverTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("---handleNack----multiple");
                    confirmSet.headSet(deliverTag + 1).clear();
                } else {
                    System.out.println("false");
                    confirmSet.remove(deliverTag);
                }
            }
        });


        String msg = "hello confirm——ack! ";
        while (true) {
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            confirmSet.add(seqNo);
        }
    }

}

1,2的順序,2,1也可以,推薦是2,1

消費者:

package com.wj.confirm.asynchronization;

import com.rabbitmq.client.*;
import com.wj.confirm.util.ConnectionUtil;

import java.io.IOException;

/**
 * @建立人 wj
 * @建立時間 2018/11/5
 * @描述
 */
public class Recv {

    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("[接收端]訊息:"+new String(body,"utf-8"));
            }
        });
    }

}

gitHub:

https://github.com/InnocenceWj/rabbitmq_msg__confirm.git