什麼是Actor模式

Actors 為最低級別的“計算單元”

以上解釋來自官方文件,看起來“晦澀難懂”。大白話就是說Actors模式是一段需要單執行緒執行的程式碼塊。

實際開發中我們經常會有一些邏輯不能併發執行,我們常用的做法就是加鎖,例如:

lock(obj)
{
//dosomething...
}

或者用Redis等中介軟體,為分散式應用加一些分散式鎖。遺憾的是,使用顯式鎖定機制容易出錯。 它們很容易導致死鎖,並可能對效能產生嚴重影響。Actors模式為單執行緒邏輯提供了一種更好的選擇。

什麼時候用Actors

  • 需要單執行緒執行,比如需要加lock
  • 邏輯可以被劃分為小的執行單元

工作原理

Dapr啟動app時,Sidecar呼叫Actors獲取配置資訊,之後Sidecar將Actors的資訊傳送到安置服務(Placement Service),安置服務會將不同的Actor型別根據其Id和Actor型別分割槽,並將Actor資訊廣播到所有dapr例項

在客戶端呼叫某個Actor時,安置服務會根據其Id和Actor型別,找到其所在的dapr例項,並執行其方法。

呼叫Actor方法

POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method>
  • <actorType>:執行元件型別。
  • <actorId>:要呼叫的特定參與者的 ID。
  • <method>:要呼叫的方法

計時器Timers和提醒器Reminders

Actor可以設定timer和reminder設定執行Actor的時間,有點像我們常用的定時任務。但是timer和reminder也存在不同。

  • timer只作用於啟用狀態的Actor。一個Actor長期不被呼叫,其自己的空閒計時器會逐漸累積,到一定時間後會被Dapr銷燬,timer沒法作用於已銷燬的Actor
  • reminder則可以作用於所有狀態的Actor。主要方式是重置空閒計時器,使其處於活躍狀態

操作timer

POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>

到期時間(due time)表示註冊後 timer 將首次觸發的時間。 period 表示timer在此之後觸發的頻率。 到期時間為0表示立即執行。 負 due times 和負 periods 都是無效。

下面的請求體配置了一個 timer, dueTime 9秒, period 3秒。 這意味著它將在9秒後首次觸發,然後每3秒觸發一次。

{
"dueTime":"0h0m9s0ms",
"period":"0h0m3s0ms"
}

下面的請求體配置了一個 timer, dueTime 0秒, period 3秒。 這意味著它將在註冊之後立即觸發,然後每3秒觸發一次。

{
"dueTime":"0h0m0s0ms",
"period":"0h0m3s0ms"
}

操作reminder

POST/PUT/GET/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>

到期時間(due time)表示註冊後 reminders將首次觸發的時間。 period 表示在此之後 reminders 將觸發的頻率。 到期時間為0表示立即執行。 負 due times 和負 periods 都是無效。 若要註冊僅觸發一次的 reminders ,請將 period 設定為空字串。

下面的請求體配置了一個 reminders, dueTime 9秒, period 3秒。 這意味著它將在9秒後首次觸發,然後每3秒觸發一次。

{
"dueTime":"0h0m9s0ms",
"period":"0h0m3s0ms"
}

下面的請求體配置了一個 reminders, dueTime 0秒, period 3秒。 這意味著它將在註冊之後立即觸發,然後每3秒觸發一次。

{
"dueTime":"0h0m0s0ms",
"period":"0h0m3s0ms"
}

下面的請求體配置了一個 reminders, dueTime 15秒, period 空字串。 這意味著它將在15秒後首次觸發,之後就不再被觸發。

{
"dueTime":"0h0m15s0ms",
"period":""
}

資料持久化

使用 Dapr 狀態管理構建塊儲存執行元件狀態。 由於執行元件可以一輪執行多個狀態操作,因此狀態儲存元件必須支援多項事務。 撰寫本文時,以下狀態儲存支援多項事務:

  • Azure Cosmos DB
  • MongoDB
  • MySQL
  • PostgreSQL
  • Redis
  • RethinkDB
  • SQL Server

若要配置要與執行元件一起使用的狀態儲存元件,需要將以下元資料附加到狀態儲存配置

- name: actorStateStore
value: "true"

win10自承載模式下已預設設定此項 C:\Users\<username>\.dapr\components\statestore.yaml

專案例項

Actor操作

下面將通過一個稽核流程的例子來演示。

還是用前面的FrontEnd專案,引入nuget包Dapr.Actors和Dapr.Actors.AspNetCore

定義IOrderStatusActor介面,需要繼承自IActor

using Dapr.Actors;

using System.Threading.Tasks;

namespace FrontEnd.ActorDefine
{
public interface IOrderStatusActor : IActor
{
Task<string> Paid(string orderId);
Task<string> GetStatus(string orderId);
}
}

定義OrderStatusActor實現IOrderStatusActor,並繼承自Actor

using Dapr.Actors.Runtime;

using System.Threading.Tasks;

namespace FrontEnd.ActorDefine
{
public class OrderStatusActor : Actor, IOrderStatusActor
{
public OrderStatusActor(ActorHost host) : base(host)
{
} public async Task<string> Paid(string orderId)
{
// change order status to paid
await StateManager.AddOrUpdateStateAsync(orderId, "init", (key, currentStatus) => "paid");
return orderId;
} public async Task<string> GetStatus(string orderId)
{
return await StateManager.GetStateAsync<string>(orderId);
} }
}

需要注意的是,執行元件方法的返回型別必須為 Task 或 Task<T> 。 此外,執行元件方法最多隻能有一個引數。 返回型別和引數都必須可 System.Text.Json 序列化

Actor的api是必需的,因為 Dapr 挎鬥呼叫應用程式來承載和與執行元件例項進行互動,所以在Startup的Configure中配置

            app.UseEndpoints(endpoints =>
{
endpoints.MapActorsHandlers();
// .......
});

Startup類也是用於註冊特定執行元件型別的位置。 在ConfigureServices ScoreActor 使用註冊 services.AddActors :

            services.AddActors(options =>
{
options.Actors.RegisterActor<OrderStatusActor>();
});

為測試這個Actor,需要定義一個介面呼叫,新增ActorController

using Dapr.Actors;
using Dapr.Actors.Client; using FrontEnd.ActorDefine; using Microsoft.AspNetCore.Mvc; using System.Threading.Tasks; namespace FrontEnd.Controllers
{
[Route("[controller]")]
[ApiController]
public class ActorController : ControllerBase
{
[HttpGet("paid/{orderId}")]
public async Task<ActionResult> PaidAsync(string orderId)
{
var actorId = new ActorId(orderId);
var proxy = ActorProxy.Create<IOrderStatusActor>(actorId, "OrderStatusActor");
var result = await proxy.Paid(orderId);
return Ok(result);
}
}
}

ActorProxy.Create 為建立代理例項。 Create方法採用兩個引數:標識特定執行元件和執行元件 ActorId 型別。 它還具有一個泛型型別引數,用於指定執行元件型別所實現的執行元件介面。 由於伺服器和客戶端應用程式都需要使用執行元件介面,它們通常儲存在單獨的共享專案中。

下面通過postman測試下,呼叫成功

檢視redis中的資料

127.0.0.1:6379> keys *
1) "test_topic"
2) "frontend||guid"
3) "frontend||name"
5) "newOrder"
6) "frontend||OrderStatusActor||myid-123||123"
7) "myapp2||key2"
8) "myapp2||key1"
9) "deathStarStatus"
10) "myapp||name"
127.0.0.1:6379> hgetall frontend||OrderStatusActor||myid-123||123
1) "data"
2) "\"init\""
3) "version"
4) "1"

可以發現actor資料的命名規則是appName||ActorName||ActorId||key

同樣可以使用注入的方式建立proxy,ActorController中注入IActorProxyFactory

        private readonly IActorProxyFactory _actorProxyFactory;

        public ActorController(IActorProxyFactory actorProxyFactory)
{
_actorProxyFactory = actorProxyFactory;
}

新增獲取資料介面

        [HttpGet("get/{orderId}")]
public async Task<ActionResult> GetAsync(string orderId)
{
var proxy = _actorProxyFactory.CreateActorProxy<IOrderStatusActor>(
new ActorId("myid-" + orderId),
"OrderStatusActor"); return Ok(await proxy.GetStatus(orderId));
}

postman測試

Timer操作

使用Actor基類的 RegisterTimerAsync 方法計劃計時器。在OrderStatusActor類中新增方法

        public Task StartTimerAsync(string name, string text)
{
return RegisterTimerAsync(
name,
nameof(TimerCallbackAsync),
Encoding.UTF8.GetBytes(text),
TimeSpan.Zero,
TimeSpan.FromSeconds(3));
} public Task TimerCallbackAsync(byte[] state)
{
var text = Encoding.UTF8.GetString(state); _logger.LogInformation($"Timer fired: {text}"); return Task.CompletedTask;
}

StartTimerAsync方法呼叫 RegisterTimerAsync 來計劃計時器。 RegisterTimerAsync 採用五個引數:

  1. 計時器的名稱。
  2. 觸發計時器時要呼叫的方法的名稱。
  3. 要傳遞給回撥方法的狀態。
  4. 首次呼叫回撥方法之前要等待的時間。
  5. 回撥方法呼叫之間的時間間隔。 可以指定 以 TimeSpan.FromMilliseconds(-1) 禁用定期訊號。

在OrderStatusActor構造方法中呼叫StartTimerAsync

StartTimerAsync("test-timer", "this is a test timer").ConfigureAwait(false).GetAwaiter().GetResult();

通過呼叫paid介面例項化一個Actor,即可開啟timer

檢視控制檯,timer觸發成功

== APP == info: FrontEnd.ActorDefine.OrderStatusActor[0]
== APP == Timer fired: this is a test timer

TimerCallbackAsync方法以二進位制形式接收使用者狀態。 在示例中,回撥在將狀態寫入日誌之前將狀態 string 解碼回 。

可以通過呼叫 來停止計時器 UnregisterTimerAsync 

    public Task StopTimerAsync(string name)
{
return UnregisterTimerAsync(name);
}

Reminder操作

使用Actor基類的 RegisterReminderAsync 方法計劃計時器。在OrderStatusActor類中新增方法

        public Task SetReminderAsync(string text)
{
return RegisterReminderAsync(
"test-reminder",
Encoding.UTF8.GetBytes(text),
TimeSpan.Zero,
TimeSpan.FromSeconds(1));
} public Task ReceiveReminderAsync(
string reminderName, byte[] state,
TimeSpan dueTime, TimeSpan period)
{
if (reminderName == "test-reminder")
{
var text = Encoding.UTF8.GetString(state); Logger.LogWarning($"reminder fired: {text}");
} return Task.CompletedTask;
}

RegisterReminderAsync方法類似於 RegisterTimerAsync ,但不必顯式指定回撥方法。 如上面的示例所示,實現 IRemindable.ReceiveReminderAsync 以處理觸發的提醒。

    public class OrderStatusActor : Actor, IOrderStatusActor, IRemindable

ReceiveReminderAsync觸發提醒時呼叫 方法。 它採用 4 個引數:

  1. 提醒的名稱。
  2. 註冊期間提供的使用者狀態。
  3. 註冊期間提供的呼叫到期時間。
  4. 註冊期間提供的呼叫週期。

在OrderStatusActor構造方法中呼叫SetReminderAsync

            SetReminderAsync("this is a test reminder").ConfigureAwait(false).GetAwaiter().GetResult();

通過呼叫paid介面例項化一個Actor,即可開啟reminder

檢視控制檯,reminder觸發成功

== APP == warn: FrontEnd.ActorDefine.OrderStatusActor[0]
== APP == reminder fired: this is a test reminder