1. 程式人生 > >RabbitMQ .NET訊息佇列使用入門(三)【MVC實現RPC例子】

RabbitMQ .NET訊息佇列使用入門(三)【MVC實現RPC例子】

每一個孤獨的靈魂都需要陪伴

RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通訊程式之間攜帶資訊資料。在OSI網路通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網路分散式多程式在內的應用程式更加容易。

RPC採用客戶機/伺服器模式。請求程式就是一個客戶機,而服務提供程式就是一個伺服器。首先,客戶機呼叫程序傳送一個有程序引數的呼叫資訊到服務程序,然後等待應答資訊。在伺服器端,程序保持睡眠狀態直到呼叫資訊到達為止。當一個呼叫資訊到達,伺服器獲得程序引數,計算結果,傳送答覆資訊,然後等待下一個呼叫資訊,最後,客戶端呼叫程序接收答覆資訊,獲得程序結果,然後呼叫執行繼續進行。

有多種 RPC模式和執行。最初由 Sun 公司提出。IETF ONC 憲章重新修訂了 Sun 版本,使得 ONC RPC 協議成為 IETF 標準協議。現在使用最普遍的模式和執行是開放式軟體基礎的分散式計算環境(DCE)。

工作原理(以Windows下為例)

執行時,一次客戶機對伺服器的RPC呼叫,其內部操作大致有如下十步:
1.呼叫客戶端控制代碼;執行傳送引數
2.呼叫本地系統核心傳送網路訊息
3.訊息傳送到遠端主機
4.伺服器控制代碼得到訊息並取得引數
5.執行遠端過程
6.執行的過程將結果返回伺服器控制代碼
7.伺服器控制代碼返回結果,呼叫遠端系統核心
8.訊息傳回本地主機
9.客戶控制代碼由核心接收訊息
10.客戶接收控制代碼返回的資料

RPC OVER HTTP

Microsoft RPC-over-HTTP 部署(RPC over HTTP)允許RPC客戶端安全和有效地通過Internet 連線到RPC 伺服器程式並執行遠端過程呼叫。這是在一個名稱為RPC-over-HTTP 代理,或簡稱為RPC 代理的中介軟體的幫助下完成的。
RPC 代理執行在IIS計算機上。它接受來自Internet 的RPC 請求,在這些請求上執行認證,檢驗和訪問檢查,如果請求通過所有的測試,RPC 代理將請求轉發給執行真正處理的RPC 伺服器。通過RPC over HTTP,RPC客戶端不和伺服器直接通訊,它們使用RPC 代理作為中介軟體。

新建一個控制檯程式,Program.cs

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Common;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;

namespace RPCServer
{
     class Program
    {
        private static IConnection _recvConn; //接收訊息的連線
        private static IConnection _senderConn; //返回結果的連線
        private static IModel _recvChannel; //接收訊息的通道
        private static IModel _sendChannel; //返回結果的通道
        private static bool isExit;

        private static void Main(string[] args)
        {
            Setup();

            Console.WriteLine("開始使用RPC訊息:");

            WaitCommand();
        }

        private static void Setup()
        {
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest",
                //VirtualHost = "test",
                AutomaticRecoveryEnabled = true
            };

            try
            {
                _recvConn = factory.CreateConnection();
                _recvChannel = _recvConn.CreateModel();
                _recvChannel.QueueDeclare("rpcQueue", false, false, false, null);
                _recvChannel.BasicQos(0, 10, false);
                var consumer = new EventingBasicConsumer(_recvChannel);
                consumer.Received += consumer_Received;
                _recvChannel.BasicConsume("rpcQueue", false, consumer);

                _senderConn = factory.CreateConnection();
                //_sendChannel = _senderConn.CreateModel();
            }
            catch (BrokerUnreachableException ex)
            {
                Console.WriteLine("RabbitMQ伺服器尚未啟動!");
                Thread.Sleep(2000);
                isExit = true;
            }
        }

        /// <summary>
        ///     等待接收指令
        /// </summary>
        private static void WaitCommand()
        {
            while (!isExit)
            {
                string line = Console.ReadLine().ToLower().Trim();
                switch (line)
                {
                    case "exit":
                        Close();
                        isExit = true;
                        break;
                    case "clear":
                        Console.Clear();
                        break;
                    default:
                        break;
                }
            }

            Console.WriteLine("Goodbye!");
        }

        private static void Close()
        {
            if (_recvChannel != null && _recvChannel.IsOpen)
            {
                _recvChannel.Close();
            }

            if (_recvConn != null && _recvConn.IsOpen)
            {
                _recvConn.Close();
            }

            if (_senderConn != null && _senderConn.IsOpen)
            {
                _senderConn.Close();
            }
        }

        private static void consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            byte[] body = e.Body;

            Task.Run(() => HandlingMessage(body, e));
        }

        /// <summary>
        ///     訊息處理
        /// </summary>
        /// <param name="msgModel"></param>
        /// <param name="e"></param>
        private static async void HandlingMessage(byte[] body, BasicDeliverEventArgs e)
        {
            bool isSuccess = false;
            bool hasRejected = false;
            string message = Encoding.UTF8.GetString(body);
            string replyMsg = "";
            IModel _senderChannel = null;

            try
            {
                _senderChannel = _senderConn.CreateModel(); //多執行緒中每個執行緒使用獨立的通道
                replyMsg = message + "   處理成功";

                var random = new Random();
                int num = random.Next(0, 4);

                //模擬處理失敗
                /*if (random.Next(0, 11) == 4)
                {
                    throw new Exception("處理失敗", null);
                }*/

                //模擬解析失敗
                if (random.Next(0, 11) == 8)
                {
                    throw new MessageException("訊息解析失敗");
                }
                //await Task.Delay(num * 1000);   //模擬訊息處理

                //這裡簡單處理,僅格式化輸出訊息內容
                Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
                                  " Used: " + num + "s MSG:" + message);

                isSuccess = true;
            }
            catch (MessageException msgEx)
            {
                Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
                                  " ERROR:" + msgEx.Message + " MSG:" + message);
                _recvChannel.BasicReject(e.DeliveryTag, false); //不再重新分發
                hasRejected = true;
                replyMsg = message + "解析失敗";
                isSuccess = true;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
                                  " ERROR:" + ex.Message + " MSG:" + message);
                replyMsg = "處理失敗";
            }

            if (isSuccess)
            {
                try
                {
                    IBasicProperties props = e.BasicProperties;
                    IBasicProperties replyProps = _senderChannel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;
                    _senderChannel.BasicPublish("", e.BasicProperties.ReplyTo, replyProps, Encoding.UTF8.GetBytes(replyMsg)); //傳送訊息到內容檢查佇列
                    if (!hasRejected)
                    {
                        _recvChannel.BasicAck(e.DeliveryTag, false); //確認處理成功  此處與不再重新分發,只能出現一次
                    }
                }
                catch (AlreadyClosedException acEx)
                {
                    Console.WriteLine("ERROR:連線已關閉");
                }
            }
            else
            {
                _recvChannel.BasicReject(e.DeliveryTag, true); //處理失敗,重新分發
            }
            _senderChannel.Close();
        }
    }
}

MVC站點

TestViewModel.cs程式碼:

 public class TestViewModel
 {
        public string ReplyMessage { get; set; }
 }

TestController.cs程式碼:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.Mvc;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using Common;
using WebApp.Models;
using System.Threading.Tasks;

namespace WebApp.Controllers
{
    public class TestController : AsyncController
    { 
        // GET: /Test/
        public async Task<ActionResult> Index()
        { 
                TestViewModel viewModel = new TestViewModel() { ReplyMessage = "交換機關閉著." }; 
                return View(viewModel); 
        }

        [HttpPost]
        public async Task<MvcHtmlString> DoWork()
        {
            string message = "訊息:" + new Random().Next(1, 10000).ToString();
            string replyMsg = "";
            RPCClient client = null;
            try
            { 
                client = RPCClient.GetInstance();
                replyMsg = await client.CallAsync(message);
            }
            catch (AlreadyClosedException e)
            {
                replyMsg = "交換機關閉著.";
            }
            catch (NoRPCConsumeException ex)
            {
                replyMsg = ex.Message;
            }
            catch (Exception e)
            {
                replyMsg = e.Message;
            } 

            return new MvcHtmlString("<p>" + replyMsg + "</p>");
        }

        public async Task<ActionResult> Serv()
        {
            string message = "訊息:" + new Random().Next(1, 10000).ToString();
            var client = new ServiceReference1.WebService1SoapClient();
            string reply = (await client.HandlerMessageAsync(message)).Body.HandlerMessageResult;
            TestViewModel viewModel = new TestViewModel() { ReplyMessage = reply };
            return View("Index", viewModel);
        }
     }
}

Index.cshtml

@model WebApp.Models.TestViewModel

@{
    Layout = null;
}

<!DOCTYPE html>

<html>
<head>
    <meta name="viewport" content="width=device-width" />
    <title>Index</title>
</head>
<body>
    <input type="button" name="sendBtn" id="sendBtn" value="發一條訊息到佇列" />
    <div id="receiveArea"> 
        <p>@Model.ReplyMessage</p>
    </div>
    <script src="~/Scripts/jquery-1.10.2.min.js"></script>
    <script>
        $("#sendBtn").on("click", function () {
            $.ajax({
                url: "/Test/DoWork",
                dataType: "html",
                type: "POST",
                success: function (result) {
                    console.log(result);
                    $("#receiveArea").append($(result));
                }
            });
        });
    </script>
</body>
</html>

執行結果如圖:

這裡寫圖片描述

這裡寫圖片描述