1. 程式人生 > >rabbitmq(4)訊息佇列(公平分發)

rabbitmq(4)訊息佇列(公平分發)

1、目標

實現rabbitmq將訊息優先發給閒置的消費者。

2、程式碼

2.1、生產者

private final static String QUEUE_NAME = "hello";

@Test
public void testSend() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.7");
    factory.setUsername("root");
    factory.setPassword("123456"
); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); for(int i=0;i<20;i++) { String message = "Hello "+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [1] Sent '"
+ message + "'"); } channel.close(); connection.close(); }

2.2、消費者1

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.7");
    factory.setUsername("root"
); factory.setPassword("123456"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); // 消費者最多接受1條訊息,直到應答後接受新訊息。保證rabbitmq每次將訊息傳送給閒置的消費者 int prefetchCount = 1; channel.basicQos(prefetchCount); channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println(" [1] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [1] Received '" + message + "'"); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } // 手動應答 channel.basicAck(envelope.getDeliveryTag(), false); } }; /* * 使用公平分發,必須關閉自動應答,使用手動應答 * 當消費者應答rabbitmq後,rabbitmq將刪除該訊息 * 保證即時消費者接受訊息後(未應答)中斷,rabbitmq也會將該訊息傳送給其他消費者而不會出現訊息丟失的問題 */ boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); }

2.2、消費者2

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.7");
    factory.setUsername("root");
    factory.setPassword("123456");

    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);

    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    System.out.println(" [2] Waiting for messages. To exit press CTRL+C");
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [2] Received '" + message + "'");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    };
    boolean autoAck = false;
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}

相關推薦

rabbitmq(4)訊息佇列(公平分發)

1、目標 實現rabbitmq將訊息優先發給閒置的消費者。 2、程式碼 2.1、生產者 private final static String QUEUE_NAME = "hello";

叢集與負載均衡系列(4)——訊息佇列Rabbitmq的搭建

        前面的三篇文章介紹了共享session,從這篇文章開始介紹訊息佇列,這裡用的是Rabbitmq。對於Rabbitmq的一些基本概念,不打算在這裡總結了。因為網上有大把總結的不錯的文章,比如點選開啟連結         這篇文章介紹Rabbitmq的安裝。  

RabbitMQ訊息佇列)叢集配置與使用篇

介紹 MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的

Redis與RabbitMQ作為訊息佇列的對比

RabbitMQ RabbitMQ是實現AMQP(高階訊息佇列協議)的訊息中介軟體的一種,最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道

三分鐘學會在spring boot 專案中使用RabbitMq訊息佇列

第一步:在spring boot專案中新增RabbitMq的maven依賴 <dependency> <groupId>org.springframework.boot</groupId>

RabbitMQ .NET訊息佇列使用入門(三)【MVC實現RPC例子】

每一個孤獨的靈魂都需要陪伴 RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通訊程式之

rabbitMQ模擬訊息佇列群發郵件

首先我們寫一個傳送頁面,第一步就建立與rabbitmq的建立,然後建立通道,接下來建立通道內的交換機,之後進行定義交換機的名稱和型別,交換機進行持久化,以後無論何時重新啟動rabbitmq,資料都不會

RabbitMQ .NET訊息佇列使用入門(二)【多個佇列訊息傳輸】

孤獨將會是人生中遇見的最大困難。 實體類: DocumentType.cs public enum DocumentType { //日誌 Journal = 1, //論文

Redis與RabbitMQ作為訊息佇列的比較

本文僅針對RabbitMQ與Redis做佇列應用時的情況進行對比 具體採用什麼方式實現,還需要取決於系統的實際需求 簡要介紹 RabbitMQ RabbitMQ是實現AMQP(高階訊息佇列協議)的訊息中介軟體的一種,最初起源於金融系統,用於在分散式系統中儲存轉發訊

spring整合rabbitmq實現訊息佇列

RabbitTemplate概念瞭解 RabbitMQ是由Erlang(愛立信公司)語言開發,實現Advanced Message Queuing Protocol (AMQP高階訊息佇列協議)的訊息中介軟體。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道

RabbitMQ訊息佇列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

(使用Java客戶端) 一、概述 在前面的專題學習中,我們建立了一個工作佇列,在工作佇列中假如每個任務交給一個確定的工作者,不管是生產者還是消費者都必須知道一個指定的佇列名稱才能傳送和接收訊息,而RabbitMQ訊息模型的核心思想就是生產者不會將訊息直接傳送給佇列。 因為生

RabbitMQ訊息佇列)專題學習03 Work Queues(工作佇列)

一、概述 工作佇列(Work queues) (使用Java客戶端) 在前面的專題學習中,我們使用Java語言實現了一個簡單的從名為"hello"的佇列中傳送和接收訊息的程式,在這部內容中我們將建立一個工作佇列被用來分配定時訊息任務,而且通過多個接收者(工作者)實現。 工作

C#呼叫RabbitMQ實現訊息佇列

前言 我在剛接觸使用中介軟體的時候,發現,中介軟體的使用並不是最難的,反而是中介軟體的下載,安裝,配置才是最難的。 所以,這篇文章我們從頭開始學習RabbitMq,真正的從頭開始。 關於訊息佇列 其實訊息佇列沒有那麼神祕,我們這樣想一下,使用者訪問網站,最終是要將資料以HTTP的協議的方式,通過網路傳

RabbitMQ 訊息佇列 - topic 模式分發訊息

推薦閱讀 https://blog.csdn.net/column/details/15500.html topic 模式 根據 Binding 指定的 RoutingKey, Exchange 對 key 進行模式匹配後投遞到相應的 Queue, 模式匹配時符號

RabbitMQ 訊息佇列 - fanout 模式分發訊息

推薦閱讀 https://blog.csdn.net/column/details/15500.html fanout 模式 將同一個 message 傳送到所有同該 Exchange 繫結的 queue, 只要 RoutingKey 是一樣, 這條訊息都會被投遞

RabbitMQ 訊息佇列 - direct 模式分發訊息

推薦閱讀 https://blog.csdn.net/column/details/15500.html direct 模式 根據 Binding 指定的 Routing Key, 將符合Key的訊息傳送到 Binding 的 Queue p_direc

工作佇列work queues 公平分發(fair dispatch) And 訊息應答與訊息持久化

生產者 1 package cn.wh.work; 2 3 import cn.wh.util.RabbitMqConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Co

(九)RabbitMQ訊息佇列-通過Headers模式分發訊息

Headers型別的exchange使用的比較少,以至於官方文件貌似都沒提到,它是忽略routingKey的一種路由方式。是使用Headers來匹配的。Headers是一個鍵值對,可以定義成Hashtable。傳送者在傳送的時候定義一些鍵值對,接收者也可以再繫結時候傳入一些鍵值對,兩者匹配的

(八)RabbitMQ訊息佇列-通過Topic主題模式分發訊息

前兩章我們講了RabbitMQ的direct模式和fanout模式,本章介紹topic主題模式的應用。如果對direct模式下通過routingkey來匹配訊息的模式已經有一定了解那fanout也很好理解。簡單的可以理解成direct是通過routingkey精準匹配的,而topic是通過r

(六)RabbitMQ訊息佇列-訊息任務分發訊息ACK確認機制(PHP版)

在前面一章介紹了在PHP中如何使用RabbitMQ,至此入門的的部分就完成了,我們內心中一定還有很多疑問:如果多個消費者消費同一個佇列怎麼辦?如果這幾個消費者分任務的權重不同怎麼辦?怎麼把同一個佇列不同級別的任務分發給不同的消費者?如果消費者異常離線怎麼辦?不要著急,後面將慢慢解開面紗。我們