1. 程式人生 > >rabbitmq學習(四):利用rabbitmq實現遠端rpc呼叫

rabbitmq學習(四):利用rabbitmq實現遠端rpc呼叫

一、rabbitmq實現rpc呼叫的原理

·rabbitmq實現rpc的原理是:客戶端向一個佇列中傳送訊息,並註冊一個回撥的佇列用於接收服務端返回的訊息,該訊息需要宣告一個叫做correaltionId的屬性,該屬性將是該次請求的唯一標識。服務端在接受到訊息(在需要時可以驗證correaltionId)後,處理訊息,並將訊息傳送到客戶端註冊的回撥佇列中。原理圖如下:  

  

二、程式碼實現

  下面我們將模擬實現一個rpc客戶端和rpc服務端。客戶端給服務端傳送message,服務端收到後處理message,再將處理後的訊息返給客戶端

  rpc客戶端

  

/**
 * rpc客戶端
 
*/ public class RpcClient { //傳送訊息的佇列名稱 private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = null; Channel channel = null; try
{ connection = connectionFactory.newConnection(); channel = connection.createChannel(); //建立回撥佇列 String callbackQueue = channel.queueDeclare().getQueue(); //建立回撥佇列,消費者從回撥佇列中接收服務端傳送的訊息 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(callbackQueue,
true,consumer); //建立訊息帶有correlationId的訊息屬性 String correlationId = UUID.randomUUID().toString(); AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(callbackQueue).build(); String message = "hello rabbitmq"; channel.basicPublish("",RPC_QUEUE_NAME,basicProperties,message.getBytes()); System.out.println("RpcClient send message " + message + ", correaltionId = " + correlationId); //接收回調訊息 while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String receivedCorrelationId = delivery.getProperties().getCorrelationId(); if(correlationId.equals(receivedCorrelationId)){ System.out.println("RpcClient receive format message " + new String(delivery.getBody(), "UTF-8") + ", correaltionId = " + correlationId); break; } } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }

 

   rpc服務端

  

/**
 * rpc伺服器
 */
public class RpcServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static String format(String message){
        return "......" + message + "......";
    }

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //宣告消費者預取的訊息數量
            channel.basicQos(1);
            channel.basicConsume(RPC_QUEUE_NAME,false,consumer);//採用手動回覆訊息
            System.out.println("RpcServer waitting for receive message");

            while (true){
                //接收並處理訊息
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("RpcServer receive message " + message);
                String response = format(message);
                //確認收到訊息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

                //取出訊息的correlationId
                AMQP.BasicProperties properties = delivery.getProperties();
                String correlationId = properties.getCorrelationId();

                //建立具有與接收訊息相同的correlationId的訊息屬性
                AMQP.BasicProperties replyProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).build();
                channel.basicPublish("",properties.getReplyTo(),replyProperties,response.getBytes());
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

   先執行服務端,再執行客戶端,結果如下:

  RpcClient

  

  RpcServer