RabbitMQ .NET訊息佇列使用入門(三)【MVC實現RPC例子】
每一個孤獨的靈魂都需要陪伴
RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通訊程式之間攜帶資訊資料。在OSI網路通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網路分散式多程式在內的應用程式更加容易。
RPC採用客戶機/伺服器模式。請求程式就是一個客戶機,而服務提供程式就是一個伺服器。首先,客戶機呼叫程序傳送一個有程序引數的呼叫資訊到服務程序,然後等待應答資訊。在伺服器端,程序保持睡眠狀態直到呼叫資訊到達為止。當一個呼叫資訊到達,伺服器獲得程序引數,計算結果,傳送答覆資訊,然後等待下一個呼叫資訊,最後,客戶端呼叫程序接收答覆資訊,獲得程序結果,然後呼叫執行繼續進行。
有多種 RPC模式和執行。最初由 Sun 公司提出。IETF ONC 憲章重新修訂了 Sun 版本,使得 ONC RPC 協議成為 IETF 標準協議。現在使用最普遍的模式和執行是開放式軟體基礎的分散式計算環境(DCE)。
工作原理(以Windows下為例)
執行時,一次客戶機對伺服器的RPC呼叫,其內部操作大致有如下十步:
1.呼叫客戶端控制代碼;執行傳送引數
2.呼叫本地系統核心傳送網路訊息
3.訊息傳送到遠端主機
4.伺服器控制代碼得到訊息並取得引數
5.執行遠端過程
6.執行的過程將結果返回伺服器控制代碼
7.伺服器控制代碼返回結果,呼叫遠端系統核心
8.訊息傳回本地主機
9.客戶控制代碼由核心接收訊息
10.客戶接收控制代碼返回的資料
RPC OVER HTTP
Microsoft RPC-over-HTTP 部署(RPC over HTTP)允許RPC客戶端安全和有效地通過Internet 連線到RPC 伺服器程式並執行遠端過程呼叫。這是在一個名稱為RPC-over-HTTP 代理,或簡稱為RPC 代理的中介軟體的幫助下完成的。
RPC 代理執行在IIS計算機上。它接受來自Internet 的RPC 請求,在這些請求上執行認證,檢驗和訪問檢查,如果請求通過所有的測試,RPC 代理將請求轉發給執行真正處理的RPC 伺服器。通過RPC over HTTP,RPC客戶端不和伺服器直接通訊,它們使用RPC 代理作為中介軟體。
新建一個控制檯程式,Program.cs
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Common;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
namespace RPCServer
{
class Program
{
private static IConnection _recvConn; //接收訊息的連線
private static IConnection _senderConn; //返回結果的連線
private static IModel _recvChannel; //接收訊息的通道
private static IModel _sendChannel; //返回結果的通道
private static bool isExit;
private static void Main(string[] args)
{
Setup();
Console.WriteLine("開始使用RPC訊息:");
WaitCommand();
}
private static void Setup()
{
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest",
//VirtualHost = "test",
AutomaticRecoveryEnabled = true
};
try
{
_recvConn = factory.CreateConnection();
_recvChannel = _recvConn.CreateModel();
_recvChannel.QueueDeclare("rpcQueue", false, false, false, null);
_recvChannel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(_recvChannel);
consumer.Received += consumer_Received;
_recvChannel.BasicConsume("rpcQueue", false, consumer);
_senderConn = factory.CreateConnection();
//_sendChannel = _senderConn.CreateModel();
}
catch (BrokerUnreachableException ex)
{
Console.WriteLine("RabbitMQ伺服器尚未啟動!");
Thread.Sleep(2000);
isExit = true;
}
}
/// <summary>
/// 等待接收指令
/// </summary>
private static void WaitCommand()
{
while (!isExit)
{
string line = Console.ReadLine().ToLower().Trim();
switch (line)
{
case "exit":
Close();
isExit = true;
break;
case "clear":
Console.Clear();
break;
default:
break;
}
}
Console.WriteLine("Goodbye!");
}
private static void Close()
{
if (_recvChannel != null && _recvChannel.IsOpen)
{
_recvChannel.Close();
}
if (_recvConn != null && _recvConn.IsOpen)
{
_recvConn.Close();
}
if (_senderConn != null && _senderConn.IsOpen)
{
_senderConn.Close();
}
}
private static void consumer_Received(object sender, BasicDeliverEventArgs e)
{
byte[] body = e.Body;
Task.Run(() => HandlingMessage(body, e));
}
/// <summary>
/// 訊息處理
/// </summary>
/// <param name="msgModel"></param>
/// <param name="e"></param>
private static async void HandlingMessage(byte[] body, BasicDeliverEventArgs e)
{
bool isSuccess = false;
bool hasRejected = false;
string message = Encoding.UTF8.GetString(body);
string replyMsg = "";
IModel _senderChannel = null;
try
{
_senderChannel = _senderConn.CreateModel(); //多執行緒中每個執行緒使用獨立的通道
replyMsg = message + " 處理成功";
var random = new Random();
int num = random.Next(0, 4);
//模擬處理失敗
/*if (random.Next(0, 11) == 4)
{
throw new Exception("處理失敗", null);
}*/
//模擬解析失敗
if (random.Next(0, 11) == 8)
{
throw new MessageException("訊息解析失敗");
}
//await Task.Delay(num * 1000); //模擬訊息處理
//這裡簡單處理,僅格式化輸出訊息內容
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
" Used: " + num + "s MSG:" + message);
isSuccess = true;
}
catch (MessageException msgEx)
{
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
" ERROR:" + msgEx.Message + " MSG:" + message);
_recvChannel.BasicReject(e.DeliveryTag, false); //不再重新分發
hasRejected = true;
replyMsg = message + "解析失敗";
isSuccess = true;
}
catch (Exception ex)
{
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
" ERROR:" + ex.Message + " MSG:" + message);
replyMsg = "處理失敗";
}
if (isSuccess)
{
try
{
IBasicProperties props = e.BasicProperties;
IBasicProperties replyProps = _senderChannel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
_senderChannel.BasicPublish("", e.BasicProperties.ReplyTo, replyProps, Encoding.UTF8.GetBytes(replyMsg)); //傳送訊息到內容檢查佇列
if (!hasRejected)
{
_recvChannel.BasicAck(e.DeliveryTag, false); //確認處理成功 此處與不再重新分發,只能出現一次
}
}
catch (AlreadyClosedException acEx)
{
Console.WriteLine("ERROR:連線已關閉");
}
}
else
{
_recvChannel.BasicReject(e.DeliveryTag, true); //處理失敗,重新分發
}
_senderChannel.Close();
}
}
}
MVC站點
TestViewModel.cs程式碼:
public class TestViewModel
{
public string ReplyMessage { get; set; }
}
TestController.cs程式碼:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.Mvc;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using Common;
using WebApp.Models;
using System.Threading.Tasks;
namespace WebApp.Controllers
{
public class TestController : AsyncController
{
// GET: /Test/
public async Task<ActionResult> Index()
{
TestViewModel viewModel = new TestViewModel() { ReplyMessage = "交換機關閉著." };
return View(viewModel);
}
[HttpPost]
public async Task<MvcHtmlString> DoWork()
{
string message = "訊息:" + new Random().Next(1, 10000).ToString();
string replyMsg = "";
RPCClient client = null;
try
{
client = RPCClient.GetInstance();
replyMsg = await client.CallAsync(message);
}
catch (AlreadyClosedException e)
{
replyMsg = "交換機關閉著.";
}
catch (NoRPCConsumeException ex)
{
replyMsg = ex.Message;
}
catch (Exception e)
{
replyMsg = e.Message;
}
return new MvcHtmlString("<p>" + replyMsg + "</p>");
}
public async Task<ActionResult> Serv()
{
string message = "訊息:" + new Random().Next(1, 10000).ToString();
var client = new ServiceReference1.WebService1SoapClient();
string reply = (await client.HandlerMessageAsync(message)).Body.HandlerMessageResult;
TestViewModel viewModel = new TestViewModel() { ReplyMessage = reply };
return View("Index", viewModel);
}
}
}
Index.cshtml
@model WebApp.Models.TestViewModel
@{
Layout = null;
}
<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width" />
<title>Index</title>
</head>
<body>
<input type="button" name="sendBtn" id="sendBtn" value="發一條訊息到佇列" />
<div id="receiveArea">
<p>@Model.ReplyMessage</p>
</div>
<script src="~/Scripts/jquery-1.10.2.min.js"></script>
<script>
$("#sendBtn").on("click", function () {
$.ajax({
url: "/Test/DoWork",
dataType: "html",
type: "POST",
success: function (result) {
console.log(result);
$("#receiveArea").append($(result));
}
});
});
</script>
</body>
</html>