1. 程式人生 > >消息隊列 (2) java實現簡單的RabbtMQ

消息隊列 (2) java實現簡單的RabbtMQ

java實現 java cal png bit close 項目 rri XML

假設有如下問題:

  1.如果消費者連接中斷,這期間我們應該怎麽辦?

  2.如何做到負載均衡?

  3.如何有效的將數據發送到相關的接收者?就是怎麽樣過濾

  4.如何保證消費者收到完整正確的數據

  5.如何讓優先級高的接收者先收到數據

一、"Hello RabbitMQ"

技術分享圖片

如圖:P代表生產者,C代表消費者,紅色部分為消息隊列

二、項目開始

  1.首先創建一個maven項目,然後導入rabbitMQjar包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>testRabbit</groupId> <artifactId>test</artifactId> <
version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies
> </project>

  2.創建消費者Producer

public class Producer {
    public final static String QUEUE_NAME = "rabbitMQ.test";

    public static void main(String[] args) throws IOException, TimeoutException {

        //創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();

        //設置RabbitMQ相關信息
        factory.setHost("localhost");

        //創建一個新的連接
        Connection connection = factory.newConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //聲明一個隊列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //發送消息到隊列中
        String message = "hello rabbitMQ";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
        System.out.println("Producer Send : " + message);

        //關閉通道和連接
        channel.close();
        connection.close();
    }
}

queueDeclare第一個參數表示隊列名稱,第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存),第三個參數為是否獨占隊列(創建者可以使用的私有隊列,斷開後自動刪除),第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列,第五個參數為隊列的其他參數。

basicPublish第一個參數為交換機名稱,第二個參數為隊列映射的路由key,第三個參數為消息的其他屬性,第四個參數為發送消息的主體。

  3.創建消費者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer {
    public final static String QUEUE_NAME = "rabbitMQ.test";

    public static void main(String[] args) throws IOException, TimeoutException {
        //創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置RabbitMQ地址
        factory.setHost("localhost");
        //創建一個新的連接
        Connection connection = factory.newConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //聲明要關註的隊列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        System.out.println("客戶端等待接受消息");

        //DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,告訴服務器我們需要哪個頻道的信息,如果頻道中有消息,就會執行回調函數handleDelivery
        Consumer comsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String (body,"UTF-8");
                System.out.println("客戶端接收:"+message);
            }
        };

        //自動回答隊列應答 -- RabbitMQ中的消息確認機制
         channel.basicConsume(QUEUE_NAME,true,comsumer);

    }
}

該方法用於獲取生產者發送的消息,其中envelope主要存放生產者相關的信息(比如交換機、路由key等)body是消息實體。

運行結果如下:

技術分享圖片

技術分享圖片

三、實現任務分發

技術分享圖片

一個隊列的優點就是很容易處理並行化的工作能力,但是如果我們積累了大量的工作,我們就需要更多的工作者來處理,這樣就需要采用分布機制了。

新建一個生產者NewTask

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class NewTask {
    public final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String [] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);

        //分發消息
        for(int i = 0;i<10;i++){
            String message = "hello RabbitMQ "+ i;
            channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
            System.out.println("NewTask send :" + message);
        }

        channel.close();
        connection.close();

    }
}

然後創建2個工作者work1和work2代碼一樣

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Work1 {
    private static final String TASK_QUEUE_NAME = "task_queue";


    public static void main(String[] args) throws IOException, TimeoutException {

        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        System.out.println("Work1 等待接受消息");

        //每次從隊列獲取的數量
        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Worker1 接受到消息:"+message);
                try{
                    //throw new Exception();
                    doWork(message);
                }catch (Exception ex){
                    channel.abort();
                }finally {
                    System.out.println("Work1 完成了");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        boolean autoAck=false;
        //消息消費完成確認
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);

    }
    private static void doWork(String task) {
        try {
            Thread.sleep(1000); // 暫停1秒鐘
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

channel.basicQos(1);保證一次只分發一個,autoAck是否自動回復,如果為true的話,每次生產者只要發送信息就會從內存中刪除,那麽如果消費者程序異常退出,那麽就無法獲取數據,我們當時不希望出現這樣的情況,所以才去手動回復,每當消費者收到並處理信息然後在通知生產者,最後從隊列中刪除這條信息。如果消費者異常退出,如果還有其他消費者,那麽就會把隊列中的消息發送給其他消費者,如果沒有,等消費者啟動時候再次發送。

兩個都不拋出異常時:

技術分享圖片

技術分享圖片

技術分享圖片

其中一個設置為異常時,會把消息都發送給另一個正常的。等待異常的程序重啟後,才會繼續給它發送。

消息隊列 (2) java實現簡單的RabbtMQ