1. 程式人生 > >RabbitMQ 訊息佇列入門

RabbitMQ 訊息佇列入門

文件 入門

主要的內容:one two three four five six seven

前言

中介軟體

訊息佇列

  • 非同步處理,註冊完發簡訊
  • 應用解耦,訂單介面呼叫扣庫存介面,失敗了怎麼辦?
  • 流量削峰,大量請求到達業務介面,這不行!
  • 日誌處理,每個業務程式碼都呼叫一下寫日誌的方法嗎?結合AOP思想,業務程式為什麼要關心寫日誌的事情?
  • 訊息通訊等,ABC處在聊天室裡面,一起聊天?foreach嗎?

官網有7個入門教程,過了一遍,做個筆記。

正文

HelloWorld

概述

RabbitMQ,是個訊息代理人message broker。它接收,儲存,轉發訊息。

幾個常用的術語:

  1. 生產者Producer,生產傳送訊息。
  2. 消費者Consumer,接收訊息。
  3. 佇列Queue,只受系統記憶體和硬碟大小限制。儲存訊息,生產者往佇列裡面傳送,消費者監聽讀取。

這幾個物件可以分佈在不同的機器。

使用Client

P和C的角色。maven倉庫包為amqp-clientslf4j-nop

<dependencies>
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-nop -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-nop</artifactId>
        <version>1.7.30</version>
    </dependency>
<endencies>

傳送

也就是Producer.java

public class Send {
    private static final String QUEUE_NAME = "hello1";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xxx.xxx.xxx.xxx");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello world";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(Charset.forName("utf-8")));
            System.out.println(" [x] Sent '" + message + "'");
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

ConnectionChannel 都實現了ICloseable介面,所以可以使用try(...)介面自動釋放資源。Channel是我們要經常使用的API物件。channel.queueDeclare是冪等的,只有在沒有的情況下才會建立。然後呼叫basicPublish方法,往佇列傳送位元組陣列訊息。

接收

Consumer,Receiver.java

public class Receiver {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xxx.xxx.xxx.xxx");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println(" [x] Received [" + msg + "]");
        });
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}

connectionchannel 物件沒有使用try-with-resource自動釋放,factory.newConnection()之後程式就會保持執行,呼叫basicConsume方法來消費得到的訊息。該方法第二個autoAck引數寫了false,這樣,訊息就屬於未確認的狀態,每次啟動都會重複收到。

Work Queue 工作佇列

訊息產生的速度大於消費的速度,該怎麼辦?

每個http請求的時間不宜過長,所以可以把內部耗時的方法做成非同步,然後用回撥callback的方式實現。換個角度說就是consumer裡面有比較耗時的任務,可以用thread.sleep()模擬一下。

DeliverCallback deliverCallback = ((consumerTag, message) -> {
    String msg = new String(message.getBody(), "UTF-8");
    int i = new Random().nextInt(5);
    try {
        Thread.sleep(i * 1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(msg + "休眠了" + i + "秒");
});

不過這不是這節的重點,這裡的重點是幾個引數。

訊息確認

首先設定一下每次接收的訊息數,每次一個channel.basicQos(1);。在客戶端沒有確認之前不會接收新的訊息。channel.basicConsume方法的第二個引數autoAck表示自動確認。訊息有兩種狀態,ready和unacked的。訊息傳送到queue→ready→consumer消費,但不確認→unacked→確認→結束,等待下一個。

public class NewTask {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);//一次接收一個訊息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            int i = new Random().nextInt(2);
            try {
                Thread.sleep(i * 1000);
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);//只確認這個tag對應的訊息
                System.out.println(msg + "執行了" + i + "秒,consumerTag=" + consumerTag + "併發送了確認");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        boolean autoAck = false;//不自動確認
        String str = channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
            System.out.println(consumerTag);
        });
    }
}

Message Durablity

Redis類似。

確保訊息不丟。也就是確保未消費的訊息在伺服器意外宕機重啟之後訊息不丟。RabbitMQ會以一定間隔把訊息寫入磁碟,但不是實時【所以還是有一個短的時間間隔會產生訊息的丟失情況】。為了解決這個問題,需要兩個配置

  1. 定義queue的時候設定durable引數為true。rabbitmq不允許queue name相同其他引數不同的兩個佇列,所以可以先刪以前的。

    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);
    
  2. 傳送的時候設定MessageProperties屬性。

    channel.basicPublish("", "hello",
                MessageProperties.PERSISTENT_TEXT_PLAIN,//持久化為文字
                message.getBytes());
    

Fair Dispatch 公平分發

RabbitMQ的預設推送策略是把第N個訊息推送給第N個客戶端,他不會管一個客戶端是否還有沒確認的訊息,所以可能會導致某個客戶端非常的忙。解決方案:

呼叫basicOps設定prefetchCount為1,這樣一個客戶端在沒有確認當前訊息之前不會收到下一個訊息。

Publish/Subscribe 釋出/訂閱

一次性給所有的Consumer傳送訊息

回顧一下前面的例子,基本的程式碼流程是

  1. 建立ConnectionFactory→設定引數→建立Connection→建立Channel
  2. Producer宣告QueueName,往Exchanges=”“傳送訊息
  3. Consumer指定相同的QueueName,設定訊息處理函式,讀取資料,傳送確認。

Exchanges

RabbitMQ中有Exchange的概念。訊息實際上不會直接傳送給Queue,而是給Exchange,然後通過exchange轉發給queue,然後給Consumer消費。exchange為空字元表示系統內部預設的exchange。

[root@test]~# rabbitmqctl list_exchanges -p test_host1
Listing exchanges for vhost test_host1 ...
name	type
amq.fanout	fanout
amq.direct	direct
amq.match	headers
amq.rabbitmq.trace	topic direct
amq.headers	headers
amq.topic	topic

amp.* 為系統自帶的exchange。

管理介面檢視

ExchangeType

Type決定一個Exchange怎麼處理接收到的訊息,廣播到所有佇列或者推送到特定的佇列或者直接丟棄訊息。

內建的ExchangeType列舉

public enum BuiltinExchangeType {
    DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");
  	//...省略其他
}

fanout

顧名思義,是一種廣播的處理方式,會發送到所有的queue。看個demo。

先看Send

//Send.java
public class Send {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");

        try (final Connection connection = factory.newConnection();
             final Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//Exchange宣告
            final String msg = String.valueOf(LocalDateTime.now());
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(Charset.defaultCharset()));//第二個routingKey留空待定
            System.out.println("傳送" + msg);
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

定義一個FANOUT型別的Exchange,沒有定義Queue。呼叫basicPublish傳送訊息。

再看Receiver.java

//Receiver.java
public class Receiver {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        final String queueName = channel.queueDeclare().getQueue();//獲取自動生成的queue,
        channel.queueBind(queueName, EXCHANGE_NAME, "");//繫結,最後一個引數待定

        channel.basicConsume(queueName, true, (consumerTag, message) -> {
            final String s = new String(message.getBody(), Charset.defaultCharset());
            System.out.println("收到" + s);
        }, consumerTag -> { });

    }
}

定義一個和Send相同的Exchange。獲取建立的channel對應的系統自動生成的queue(結束之後會自動刪除,避免系統有太多佇列)。繫結queue和exchange。RabbitMQ會丟棄訊息如果這個exchange下面沒有繫結queue的話。

執行多個Receiver例項。因為ExchangeType是fanout,所以,每個例項都會收到廣播的訊息。

對比前面例子中的預設Exchange,一個訊息,傳送到一個Exchange(預設的空字串),因為queuename指定了是同一個,所以,只會有一個client收到訊息。

而這個例子中,queue是自動生成的,所以會有多個自動刪除的queue,一個queue對應一個client。ExchangeType是fanout,所以,每個client都會收到。

Routing

有選擇性的接收訊息

前面例子使用了fanout廣播的方式來發布訊息,一條訊息會被推送到所有的佇列,又因為佇列是自動生成的,一個佇列對應一個consumer,所以所有的consumer都會收到所有的訊息。這無法實現某個consumer只關心某種型別的訊息的需求。所以,這裡引入exchangetype=direct的例子。

name 相同,type不同的exchange不合法,可以先在rabbitmq的管理平臺介面刪除原先的exchange。

Binding

回顧前面的程式碼。publish和queue繫結的時候都留空了routingKey引數。

Send.java

Consumer.java

Consumer的queueBind 和 Producer的basicPublish中routingKey需要匹配。fanout型別的exchange會忽略routingKey引數,所以我們直接留空。

direct Exchange

fanout的訊息分發不太靈活,所以這裡使用direct的Exchange。看下圖,如果Producer產生的routingKey為orange,那麼只會傳送給Q1,那麼只有C1會收到訊息。如果routingKey為black或者green,那麼C2會收到訊息。

Multiple Bindings

多個佇列繫結同一個routingKey也是合理。下面的例子Q1和Q2都會收到black的訊息,這種繫結本質上就退化成了一種前面的fanout Exchange。

Demo

場景:Producer產生3種routingKey的message,Info,Error,Fault。定義兩個Consumer,C1接收Info的message,C2接收Error和Fault。

//Send.java
public class Send {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {

        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");//省略其他設定
        try (final Connection connection = factory.newConnection();
             final Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//指定exchange的型別
            String messageType = args[0];//傳入routingKey
            String msg = messageType + " message" + LocalDateTime.now();
            channel.basicPublish(EXCHANGE_NAME, messageType, null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("傳送了" + msg);
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
//Receiver.java
public class Receive {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        final String queueName = channel.queueDeclare().getQueue();
        for (String arg : args) {//遍歷所有的routingKey,繫結所有當前queue
            System.out.println("繫結routingKey:" + arg);
            channel.queueBind(queueName, EXCHANGE_NAME, arg);
        }

        channel.basicConsume(queueName, true, ((consumerTag, message) -> {
            final String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("收到" + msg);
        }), consumerTag -> {
        });
    }
}

建立5個啟動配置,3個為Send,分別傳送Info,Fault,Error訊息;2個Receiv,第一個接收Info,第二個接收Error和Fault。

最終效果

Topic

通過pattern模式來指定接收的訊息。

前面例子使用的ExchangeType為direct,相對於fanout,是靈活了一些,但是還是有一些缺點,比如無法組合條件。比如有個consumer關心所有的error訊息以及和a相關的info訊息。這裡就可以使用Topic的Exchange。然後都是通過routingKey引數來指定。

萬用字元

* 星號,代表一個詞

# 井號,代表0個或多個詞(包括一個)

以點分隔,組成routingKey。比如*.a.b.#

如果設定BuiltinExchangeType.TOPIC的exchangeType,但是沒有使用萬用字元,那麼就和BuiltinExchangeType.DIRECT是一樣的。

未匹配任何模式的訊息會被丟棄。

關鍵程式碼

宣告exchange

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

傳送訊息

if (args[0].equals("info")) {
    s = "a's info message";
    channel.basicPublish(EXCHANGE_NAME, "a.info", null, s.getBytes(StandardCharsets.UTF_8));
} else {
    s = "xxx.yyy's error message";
    channel.basicPublish(EXCHANGE_NAME, "xxx.yyy.error", null, s.getBytes(StandardCharsets.UTF_8));
}

使用萬用字元接收訊息

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
final String queueName = channel.queueDeclare().getQueue();//臨時的queue
String routingKey = args[0];//傳入的引數 比如*.info 或 #.error來匹配
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("繫結" + routingKey + "的佇列");
channel.basicConsume(queueName, true, ((consumerTag, message) -> {
    String msg = new String(message.getBody(), StandardCharsets.UTF_8);
    System.out.println("收到訊息" + msg);
}), consumerTag -> {});

RPC 遠端呼叫

遠端過程呼叫。

Client 呼叫 Server的服務。Client傳送訊息,Server消費訊息。Server計算結果,釋出一個訊息到對應的佇列。Client消費佇列裡面的訊息。這一個過程Client和Server都是雙重身份。這個是和其他最主要的區別。

關於RPC

RPC是一種常見的模式,但也存在一些爭議,這主要體現在如果開發者有意或無意的不去注意這是一個本地的方法還是比較耗時的遠端方法。RPC也增加了系統的除錯複雜度。

開發RPC的幾個建議:

  1. 確保方法容易辨識是遠端還是本地
  2. 做好文件
  3. 處理呼叫時候的異常

回撥佇列

Client需要Server的計算結果,所以需要在訊息裡面帶上CallbackQueueName。根據AMQP 0-9-1協議,定義了14個屬性,除了4個比較常用,其他都很少用。

  • deliveryMode 設定訊息的持久化,第二個例子中用過。
  • contentType 設定內容的mime-type ,建議application/json
  • replyTo 回覆佇列名
  • correlationId 關聯id 因為訊息是非同步的,所以可以給每個訊息帶上個id,用來關聯傳送的訊息。
final AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
                .correlationId("uuid")
                .replyTo("xxx")
                .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

簡易版Client

public class Client {
    private static final String RPC_QUEUE_NAME = "rpc_queue";//rpc呼叫的queue,往裡面發rpc呼叫引數

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.xx.x.x");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        String msg = String.valueOf(3);//模擬呼叫引數

        final String replyQueueName = channel.queueDeclare().getQueue();
        String corrId = UUID.randomUUID().toString();
        final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .replyTo(replyQueueName)//回覆的佇列
                .correlationId(corrId)//當前訊息的uuid
                .build();
        channel.basicPublish("",
                RPC_QUEUE_NAME,
                properties,
                msg.getBytes(StandardCharsets.UTF_8));//廣播的方式往rpc queue釋出訊息
        System.out.println("傳送計算[" + msg + "]的訊息");

        //等待訊息回覆
        channel.basicConsume(replyQueueName, true, (consumerTag, message) -> {
            String revCorrId = message.getProperties().getCorrelationId();
            if (corrId.equals(revCorrId)) {//拿到了回覆
                final String result = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println("傳送" + msg + "得到回覆" + result);
            } else {
                System.out.println("收到correlationId:" + revCorrId);
            }
        }, consumerTag -> {
        });
    }
}

channel.queueDeclare()用來宣告一個臨時佇列,為接收返回結果的佇列。程式碼中只發布了一個計算請求,所以basicConsume中corrId判斷其實沒有必要。正常情況下可以在當前臨時佇列釋出多個計算請求,每個的計算結果都傳入到當前的臨時佇列,所以需要判斷corrId的匹配情況。

簡易版Server

public class Server {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("hello from server");
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);//宣告非排他的佇列,用來消費rpc_queue裡面的計算請求。
        channel.basicConsume(RPC_QUEUE_NAME,
                true,//自動回覆,後面就不需要手動ack
                (consumerTag, message) -> {
                    String msg = new String(message.getBody(), StandardCharsets.UTF_8);
                    String replyMsg = new String(msg + "Result");//簡單模擬計算結果。
                    System.out.println("收到" + msg + "開始計算,計算完成結果為:[" + replyMsg + "]");
                  	//拿到需要回復properties
                    String replyQueueName = message.getProperties().getReplyTo();
                    String correlationId = message.getProperties().getCorrelationId();
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(correlationId)//correlationId返回去。
                            .build();
                    // 把計算結果發回去
                    channel.basicPublish("", replyQueueName, replyProps, replyMsg.getBytes(StandardCharsets.UTF_8));
                }, consumerTag -> {

                });
    }
}

消費RPC_QUEUE_NAME的計算請求,然後根據訊息裡面帶的getReplyTo()的值返回給客戶端。

Publisher Confirm 釋出確認

可靠釋出

啟用生產者確認

根據AMQP 0.9.1協議,這個確認預設是沒有啟用的,可以通過confirmSelect方法啟用。

Channel channel = connection.createChannel();
channel.confirmSelect();

這個方法是針對channel的,不是針對每個訊息,所以,只要 在開啟channel之後呼叫一次就好。

確認每個訊息

先是一個簡單的例子,每發完一個訊息,都讓系統確認等待一下。

while (thereAreMessagesToPublish()) {
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.basicPublish(exchange, queue, properties, body);
    // 5秒超時
    channel.waitForConfirmsOrDie(5_000);
}

每次發完一個訊息,都等待最多5秒鐘的一遍確認。這個很明顯會極大的影響系統的吞吐率。

批量確認

傳送一個確認一個明顯會比較low,所以這裡引入一種批量確認的方式。不過這只是一種自己業務程式碼的確認機制,不是rabbitmq提供的。

int batchSize = 100;//
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.basicPublish(exchange, queue, properties, body);
    outstandingMessageCount++;//傳送一個加1
    if (outstandingMessageCount == batchSize) {
        ch.waitForConfirmsOrDie(5_000);//到達batchSize之後確認
        outstandingMessageCount = 0;
    }
}
if (outstandingMessageCount > 0) {
    ch.waitForConfirmsOrDie(5_000);//確認剩下的
}

這種確認吞吐量是上來了,不過最大的問題是當confirm出問題了之後是無法定位到具體哪個有問題。

ConcurrentSkipListMap 和 channel.getNextPublishSeqNo()

channel.getNextPublishSeqNo可以獲取釋出的訊息的下一個序號,有序遞增。ConcurrentSkipListMap有一個heapMap方法,可以返回key小於等於param的map子集。在釋出訊息之前先獲取序號,作為key放到map裡面。

map.put(nextPublishSeqNo, byteMsg);
channel.basicPublish("", queueName, null, msgStr.substring(i, i + 1).getBytes(StandardCharsets.UTF_8));

非同步確認

Producer只管發訊息,然後註冊一個非同步回撥函式。rabbitmq提供了兩個回撥函式。一個是傳送成功的回撥,一個是傳送失敗的回撥。兩個函式的引數是一樣的,兩個。

  • sequence number 序號。表示成功/失敗的訊息編號
  • multiple 布林值。false表示只有一個被確認。true表示小於等於當前序號的訊息傳送成功/失敗
channel.confirmSelect();//啟用訊息確認
channel.addConfirmListener(
        (deliveryTag, multiple) -> {
            if (multiple) {
                System.out.println("序號" + deliveryTag + "的資訊傳送成功");
                map.remove(deliveryTag);
            } else {
                System.out.println("序號小於" + deliveryTag + "的資訊傳送成功");
                final ConcurrentNavigableMap<Long, Byte> confirmed = map.headMap(deliveryTag, true);
                confirmed.clear();
            }
        },
        (deliveryTag, multiple) -> {
            if (!multiple) {
                System.out.println("傳送失敗的資訊sequence number:" + deliveryTag);
            } else {
                System.out.println("序號小於" + deliveryTag + "的訊息傳送失敗");
            }
        });