1. 程式人生 > >RabbitMq初探——用隊列實現RPC

RabbitMq初探——用隊列實現RPC

await 生產 通過 empty 分享 qos load lose ima

rabbitmq構造rpc

前言


rpc——remote procedure call 遠程調用。在我接觸的使用過http協議、thrift框架來實現遠程調用。其實消息隊列rabbitmq也可以實現。

原理


我們稱調用遠程服務者為Client,遠程服務提供者為Server。

Client充當生產者,將請求發送到rabbitmq隊列中,Server作為消費者,處理Client請求產生結果數據result,此刻Server作為生產者,將result

通過rabbitmq隊列傳遞到Client,Client作為結果數據的消費者,得到result。

技術分享

代碼


rpc_client.php

<?php
/**
 * Created by PhpStorm.
 * User: 王大西
 * Date: 2017/10/23
 * Time: 16:36
 */
require_once __DIR__ . ‘/vendor/autoload.php‘;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcClient
{
    private $connection = null;
    private $channel = null;
    
private $callbackQueue = null; private $response = null; private $corrId = null; public function __construct() { $this->connection = new AMQPStreamConnection(‘127.0.0.1‘, 5672, ‘guest‘, ‘guest‘); $this->channel = $this->connection->channel(); list($this
->callbackQueue, ,) = $this->channel->queue_declare("", false, false, true, false); $this->channel->basic_consume($this->callbackQueue, ‘‘, false, false, false, false, array($this, ‘onResponse‘)); } public function onResponse($rep) { if ($rep->get(‘correlation_id‘) == $this->corrId) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corrId = uniqid(); $msg = new AMQPMessage((string) $n, array( ‘correlation_id‘ => $this->corrId, ‘reply_to‘ => $this->callbackQueue )); $this->channel->basic_publish($msg, ‘‘, ‘rpc_queue1‘); while (!$this->response) { $this->channel->wait(); } return intval($this->response); } } $number = isset($argv[1]) ? $argv[1] : 30; $objRpcClient = new RpcClient(); $response = $objRpcClient->call($number); echo " RPC result $response\n";

rpc_server.php

<?php
/**
 * rpc server
 * Created by PhpStorm.
 * User: 王大西
 * Date: 2017/10/23
 * Time: 16:36
 */
require_once __DIR__ . ‘/vendor/autoload.php‘;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection(‘127.0.0.1‘, 5672, ‘guest‘, ‘guest‘);
$channel = $connection->channel();

$channel->queue_declare(‘rpc_queue1‘, false, false, false, false);

function fib($n){
    if ($n == 0) {
        return 0;
    }
    if ($n == 1) {
        return 1;
    }
    return fib($n-1) + fib($n-2);
}

echo " [x] Awaiting RPC requests\n";
$callback = function($req){
    $n = intval($req->body);
    //todo $n empty return
    echo " [.] fib(", $n, ")\n";

    $msg = new AMQPMessage((string) fib($n), array(‘correlation_id‘ => $req->get("correlation_id")) );
    $req->delivery_info[‘channel‘]->basic_publish($msg, ‘‘, $req->get(‘reply_to‘));

    $req->delivery_info[‘channel‘]->basic_ack($req->delivery_info[‘delivery_tag‘]);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume(‘rpc_queue1‘, ‘‘, false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

測試


server

技術分享

client

技術分享

RabbitMq初探——用隊列實現RPC