1. 程式人生 > >rabbitmq4-工作佇列及公平分發模式

rabbitmq4-工作佇列及公平分發模式

建議大家如果沒有看前一篇文章的時候,還是看一看第一篇文章,因為上篇文章的確把很多的概念都講解的比較清楚。我發現有很多東西在單獨使用rabbitmq是做不了的,例如自定義message投遞的id,所以我希望快速的把這幾篇介紹的博文寫完,然後進入springboot的整合篇,但是我不建議新手一上來就開始使用springboot的整合,就想我在群裡面聽到的,不知道channel為何物更別提其他的概念了,只有一個穩紮穩打的基礎在往高階的地方學習的時候才不費力。

一、簡單工作佇列

image.png
我想大概這種模式的應用場景也就剩下了應用層面的解耦了吧,話不多話,下面直接用程式碼展示

二、生產者程式碼:

public class Producer {

    public static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException{

        final Connection conn = ConnUtils.getConn();
        final Channel channel = conn.createChannel();
        boolean
durable = true; boolean exclusive = false; boolean autoDelete = false; channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, null); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { @Override public
void handleAck(long deliveryTag, boolean multiple) throws IOException { // 這個目前在單獨使用rabbitmq的時候沒有辦法找到自定義這個訊息標識的辦法,但是在和springboot整合之後會提供這樣的方法 System.out.println(multiple); System.out.println("wtf 需要這麼熱嗎:::::"+deliveryTag); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("啊哈哈,你被拒絕了……"); } }); // 這個地方也可以搞一個執行緒來進行傳送 channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 ".getBytes()); channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 +1".getBytes()); channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 +2".getBytes()); channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 +3".getBytes()); channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 +4".getBytes()); channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 +5".getBytes()); channel.close(); conn.close(); } }

三、兩個消費者(只需要把程式碼拷貝一份就可以了)

public class Consumer01 {
    public static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnUtils.getConn();
        final Channel channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("Recv001"+"message == "+new String(body,"utf-8"));
                channel.basicAck(deliveryTag,false);
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

先啟動兩個消費者,因為訊息太少,如果先啟動生產者,在啟動消費者,一個消費者立馬就消費完了。

四、結果分析

image.png
image.png
我們發現兩個消費者總是已奇偶的形式出現的,加入兩個消費者的消費能力不一樣,消費者1消費能力比較高,但是以這種模式的話,那麼整個系統的消費能力的上線就有比較弱的消費者2來決定了。所以下面介紹一種公平分發模式:公平指的是能者多勞

我們在channel申明的下面加一行程式碼:我們分別設定consumer1的消費能力為3,consumer2的消費者能力為1

 /**
  * prefetchCount:告訴MQ不要同時給一個消費者推送超過prefetchCount個訊息,
   * 即一點prefetchCount個訊息沒有應答,該消費者就會發生阻塞
   * global:指的是該設定是針對該consumer還是針對channel級別
   */
channel.basicQos(3false);

下面我們在觀察結果:
image.png
image.png
我們可以看到奇偶的模式不見了,而且消費者1的吞吐量是大於消費者2的

本節到這裡就結束了,有很多的介紹希望大家多去看看前面的文章。