1. 程式人生 > >RabbitMQ學習筆記(四)-----------------RPC

RabbitMQ學習筆記(四)-----------------RPC

    專案地址:https://github.com/gongxianshengjiadexiaohuihui/RabbitMQ/tree/master/RPC_RabbitMQ

    RPC遠端服務呼叫,舉個例子就是客戶端遠端呼叫服務端的方法幫自己運算,並把結果返回

    流程圖:

專案結構

Client

  • 建立一個反饋佇列,這個佇列的作用是等待伺服器返回處理結果
  • 傳送請求,請求的內容包含,待處理的資訊,correlaitonId(用來檢查反饋內容是否是自己需要 的),反饋佇列的名字
  • 從反饋佇列中取出資訊,對資訊進行驗證(correlationId),驗證通過,返回結果
package com.ggp;


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Client
 * @Description TODO
 * @Author Mr.G
 * @Date 2018/11/21 16:39
 * @Version 1.0
 */
public class Client {
    private Connection connection;
    private Channel channel;
    /**
     * 請求佇列名
     */
    private String requestQueueName = "my_queue";
    /**
     * 等待響應佇列名
     */
    private String replyQueueName;
    private QueueingConsumer consumer;

    public Client() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connection = connectionFactory.newConnection();
        channel = connection.createChannel();

        /**
         * 監聽反饋佇列的狀態
         */
        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName,true,consumer);

    }
    public String call(String message) throws IOException, InterruptedException{
        String result;
        String corrID = UUID.randomUUID().toString();
        /**
         * 建造者模式,建立配置檔案,把corrID和需要反饋資訊的佇列名放進去
         */
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(corrID).replyTo(replyQueueName).build();
        channel.basicPublish("",requestQueueName,properties,message.getBytes());
        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if(delivery.getProperties().getCorrelationId().equals(corrID)){
                result = new String(delivery.getBody(), "UTF-8");
                break;
            }
        }
        return result;
    }

    public void close() throws  Exception{
        connection.close();
    }
    public static void main(String[] args)throws Exception{
        Client client = new Client();
        String result = client.call("20");
        System.out.println(result);
        client.close();
    }
}

Server

  • 接收客戶端資訊,處理內容,計算結果
  • 通過接受的資訊,得到反饋佇列的名字,給該佇列傳送資訊,資訊內容包括correlationId和處理結果
package com.ggp;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.net.ConnectException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Server
 * @Description TODO
 * @Author Mr.G
 * @Date 2018/11/21 17:10
 * @Version 1.0
 */
public class Server {
    private static final String RPC_QUEUE_NAME = "my_queue";
    private static int fib(int n){
        if(n == 0){
            return 0;
        }
        if(n == 1){
            return 1;
        }
        return fib(n-1) + fib(n -1);
    }
    public Server() throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME,false,false, false,null);
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME,false,consumer);
        System.out.println("Server is waiting request");
        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(delivery.getProperties().getCorrelationId()).build();
            String message = new String(delivery.getBody(),"UTF-8");
            int n = Integer.parseInt(message);
            System.out.println("receive the message : "+n);

            String response = ""+fib(n);
            channel.basicPublish("",delivery.getProperties().getReplyTo(),properties,response.getBytes());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }
}

測試方法

package com.ggp.test;

import com.ggp.Client;

/**
 * @ClassName ClintTest
 * @Description TODO
 * @Author Mr.G
 * @Date 2018/11/22 8:41
 * @Version 1.0
 */
public class ClintTest {
    public static void main(String[] args)throws Exception{
        Client client = new Client();
        String result = client.call("20");
        System.out.println(result);

    }
}


package com.ggp.test;

import com.ggp.Client;
import com.ggp.Server;

/**
 * @ClassName Test
 * @Description TODO
 * @Author Mr.G
 * @Date 2018/11/22 8:39
 * @Version 1.0
 */
public class ServerTest {
    public static void main(String[] args)throws Exception{
        new Server();
    }
}

測試結果