什麼是Actor模式
Actors 為最低級別的“計算單元”
以上解釋來自官方文件,看起來“晦澀難懂”。大白話就是說Actors模式是一段需要單執行緒執行的程式碼塊。
實際開發中我們經常會有一些邏輯不能併發執行,我們常用的做法就是加鎖,例如:
lock(obj)
{
//dosomething...
}
或者用Redis等中介軟體,為分散式應用加一些分散式鎖。遺憾的是,使用顯式鎖定機制容易出錯。 它們很容易導致死鎖,並可能對效能產生嚴重影響。Actors模式為單執行緒邏輯提供了一種更好的選擇。
什麼時候用Actors
- 需要單執行緒執行,比如需要加lock
- 邏輯可以被劃分為小的執行單元
工作原理
Dapr啟動app時,Sidecar呼叫Actors獲取配置資訊,之後Sidecar將Actors的資訊傳送到安置服務(Placement Service),安置服務會將不同的Actor型別根據其Id和Actor型別分割槽,並將Actor資訊廣播到所有dapr例項。
在客戶端呼叫某個Actor時,安置服務會根據其Id和Actor型別,找到其所在的dapr例項,並執行其方法。
呼叫Actor方法
POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method>
<actorType>
:執行元件型別。<actorId>
:要呼叫的特定參與者的 ID。<method>
:要呼叫的方法
計時器Timers和提醒器Reminders
Actor可以設定timer和reminder設定執行Actor的時間,有點像我們常用的定時任務。但是timer和reminder也存在不同。
- timer只作用於啟用狀態的Actor。一個Actor長期不被呼叫,其自己的空閒計時器會逐漸累積,到一定時間後會被Dapr銷燬,timer沒法作用於已銷燬的Actor。
- reminder則可以作用於所有狀態的Actor。主要方式是重置空閒計時器,使其處於活躍狀態
操作timer
POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>
到期時間(due time)表示註冊後 timer 將首次觸發的時間。 period
表示timer在此之後觸發的頻率。 到期時間為0表示立即執行。 負 due times 和負 periods 都是無效。
下面的請求體配置了一個 timer, dueTime
9秒, period
3秒。 這意味著它將在9秒後首次觸發,然後每3秒觸發一次。
{
"dueTime":"0h0m9s0ms",
"period":"0h0m3s0ms"
}
下面的請求體配置了一個 timer, dueTime
0秒, period
3秒。 這意味著它將在註冊之後立即觸發,然後每3秒觸發一次。
{
"dueTime":"0h0m0s0ms",
"period":"0h0m3s0ms"
}
操作reminder
POST/PUT/GET/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>
到期時間(due time)表示註冊後 reminders將首次觸發的時間。 period
表示在此之後 reminders 將觸發的頻率。 到期時間為0表示立即執行。 負 due times 和負 periods 都是無效。 若要註冊僅觸發一次的 reminders ,請將 period 設定為空字串。
下面的請求體配置了一個 reminders, dueTime
9秒, period
3秒。 這意味著它將在9秒後首次觸發,然後每3秒觸發一次。
{
"dueTime":"0h0m9s0ms",
"period":"0h0m3s0ms"
}
下面的請求體配置了一個 reminders, dueTime
0秒, period
3秒。 這意味著它將在註冊之後立即觸發,然後每3秒觸發一次。
{
"dueTime":"0h0m0s0ms",
"period":"0h0m3s0ms"
}
下面的請求體配置了一個 reminders, dueTime
15秒, period
空字串。 這意味著它將在15秒後首次觸發,之後就不再被觸發。
{
"dueTime":"0h0m15s0ms",
"period":""
}
資料持久化
使用 Dapr 狀態管理構建塊儲存執行元件狀態。 由於執行元件可以一輪執行多個狀態操作,因此狀態儲存元件必須支援多項事務。 撰寫本文時,以下狀態儲存支援多項事務:
- Azure Cosmos DB
- MongoDB
- MySQL
- PostgreSQL
- Redis
- RethinkDB
- SQL Server
若要配置要與執行元件一起使用的狀態儲存元件,需要將以下元資料附加到狀態儲存配置:
- name: actorStateStore
value: "true"
win10自承載模式下已預設設定此項 C:\Users\<username>\.dapr\components\statestore.yaml
專案例項
Actor操作
下面將通過一個稽核流程的例子來演示。
還是用前面的FrontEnd專案,引入nuget包Dapr.Actors和Dapr.Actors.AspNetCore。
定義IOrderStatusActor介面,需要繼承自IActor
using Dapr.Actors; using System.Threading.Tasks; namespace FrontEnd.ActorDefine
{
public interface IOrderStatusActor : IActor
{
Task<string> Paid(string orderId);
Task<string> GetStatus(string orderId);
}
}
定義OrderStatusActor實現IOrderStatusActor,並繼承自Actor
using Dapr.Actors.Runtime; using System.Threading.Tasks; namespace FrontEnd.ActorDefine
{
public class OrderStatusActor : Actor, IOrderStatusActor
{
public OrderStatusActor(ActorHost host) : base(host)
{
} public async Task<string> Paid(string orderId)
{
// change order status to paid
await StateManager.AddOrUpdateStateAsync(orderId, "init", (key, currentStatus) => "paid");
return orderId;
} public async Task<string> GetStatus(string orderId)
{
return await StateManager.GetStateAsync<string>(orderId);
} }
}
需要注意的是,執行元件方法的返回型別必須為 Task 或 Task<T> 。 此外,執行元件方法最多隻能有一個引數。 返回型別和引數都必須可 System.Text.Json 序列化。
Actor的api是必需的,因為 Dapr 挎鬥呼叫應用程式來承載和與執行元件例項進行互動,所以在Startup的Configure中配置
app.UseEndpoints(endpoints =>
{
endpoints.MapActorsHandlers();
// .......
});
Startup
類也是用於註冊特定執行元件型別的位置。 在ConfigureServices
ScoreActor
使用註冊 services.AddActors
:
services.AddActors(options =>
{
options.Actors.RegisterActor<OrderStatusActor>();
});
為測試這個Actor,需要定義一個介面呼叫,新增ActorController
using Dapr.Actors;
using Dapr.Actors.Client; using FrontEnd.ActorDefine; using Microsoft.AspNetCore.Mvc; using System.Threading.Tasks; namespace FrontEnd.Controllers
{
[Route("[controller]")]
[ApiController]
public class ActorController : ControllerBase
{
[HttpGet("paid/{orderId}")]
public async Task<ActionResult> PaidAsync(string orderId)
{
var actorId = new ActorId(orderId);
var proxy = ActorProxy.Create<IOrderStatusActor>(actorId, "OrderStatusActor");
var result = await proxy.Paid(orderId);
return Ok(result);
}
}
}
ActorProxy.Create
為建立代理例項。 Create
方法採用兩個引數:標識特定執行元件和執行元件 ActorId
型別。 它還具有一個泛型型別引數,用於指定執行元件型別所實現的執行元件介面。 由於伺服器和客戶端應用程式都需要使用執行元件介面,它們通常儲存在單獨的共享專案中。
下面通過postman測試下,呼叫成功
檢視redis中的資料
127.0.0.1:6379> keys *
1) "test_topic"
2) "frontend||guid"
3) "frontend||name"
5) "newOrder"
6) "frontend||OrderStatusActor||myid-123||123"
7) "myapp2||key2"
8) "myapp2||key1"
9) "deathStarStatus"
10) "myapp||name"
127.0.0.1:6379> hgetall frontend||OrderStatusActor||myid-123||123
1) "data"
2) "\"init\""
3) "version"
4) "1"
可以發現actor資料的命名規則是appName||ActorName||ActorId||key
同樣可以使用注入的方式建立proxy,ActorController中注入IActorProxyFactory
private readonly IActorProxyFactory _actorProxyFactory; public ActorController(IActorProxyFactory actorProxyFactory)
{
_actorProxyFactory = actorProxyFactory;
}
新增獲取資料介面
[HttpGet("get/{orderId}")]
public async Task<ActionResult> GetAsync(string orderId)
{
var proxy = _actorProxyFactory.CreateActorProxy<IOrderStatusActor>(
new ActorId("myid-" + orderId),
"OrderStatusActor"); return Ok(await proxy.GetStatus(orderId));
}
postman測試
Timer操作
使用Actor基類的 RegisterTimerAsync
方法計劃計時器。在OrderStatusActor類中新增方法
public Task StartTimerAsync(string name, string text)
{
return RegisterTimerAsync(
name,
nameof(TimerCallbackAsync),
Encoding.UTF8.GetBytes(text),
TimeSpan.Zero,
TimeSpan.FromSeconds(3));
} public Task TimerCallbackAsync(byte[] state)
{
var text = Encoding.UTF8.GetString(state); _logger.LogInformation($"Timer fired: {text}"); return Task.CompletedTask;
}
StartTimerAsync
方法呼叫 RegisterTimerAsync
來計劃計時器。 RegisterTimerAsync
採用五個引數:
- 計時器的名稱。
- 觸發計時器時要呼叫的方法的名稱。
- 要傳遞給回撥方法的狀態。
- 首次呼叫回撥方法之前要等待的時間。
- 回撥方法呼叫之間的時間間隔。 可以指定 以
TimeSpan.FromMilliseconds(-1)
禁用定期訊號。
在OrderStatusActor構造方法中呼叫StartTimerAsync
StartTimerAsync("test-timer", "this is a test timer").ConfigureAwait(false).GetAwaiter().GetResult();
通過呼叫paid介面例項化一個Actor,即可開啟timer
檢視控制檯,timer觸發成功
== APP == info: FrontEnd.ActorDefine.OrderStatusActor[0]
== APP == Timer fired: this is a test timer
TimerCallbackAsync
方法以二進位制形式接收使用者狀態。 在示例中,回撥在將狀態寫入日誌之前將狀態 string
解碼回 。
可以通過呼叫 來停止計時器 UnregisterTimerAsync
:
public Task StopTimerAsync(string name)
{
return UnregisterTimerAsync(name);
}
Reminder操作
使用Actor基類的 RegisterReminderAsync 方法計劃計時器。在OrderStatusActor類中新增方法
public Task SetReminderAsync(string text)
{
return RegisterReminderAsync(
"test-reminder",
Encoding.UTF8.GetBytes(text),
TimeSpan.Zero,
TimeSpan.FromSeconds(1));
} public Task ReceiveReminderAsync(
string reminderName, byte[] state,
TimeSpan dueTime, TimeSpan period)
{
if (reminderName == "test-reminder")
{
var text = Encoding.UTF8.GetString(state); Logger.LogWarning($"reminder fired: {text}");
} return Task.CompletedTask;
}
RegisterReminderAsync
方法類似於 RegisterTimerAsync
,但不必顯式指定回撥方法。 如上面的示例所示,實現 IRemindable.ReceiveReminderAsync
以處理觸發的提醒。
public class OrderStatusActor : Actor, IOrderStatusActor, IRemindable
ReceiveReminderAsync
觸發提醒時呼叫 方法。 它採用 4 個引數:
- 提醒的名稱。
- 註冊期間提供的使用者狀態。
- 註冊期間提供的呼叫到期時間。
- 註冊期間提供的呼叫週期。
在OrderStatusActor構造方法中呼叫SetReminderAsync
SetReminderAsync("this is a test reminder").ConfigureAwait(false).GetAwaiter().GetResult();
通過呼叫paid介面例項化一個Actor,即可開啟reminder
檢視控制檯,reminder觸發成功
== APP == warn: FrontEnd.ActorDefine.OrderStatusActor[0]
== APP == reminder fired: this is a test reminder