1. 程式人生 > >在.NET Core中使用Channel(二)

在.NET Core中使用Channel(二)

在我們之前的文章中,看了一些非常簡單的例子來說明Channel是如何工作的,我們看到了一些非常漂亮的特性,但大多數情況下它與其他某某Queue實現非常相似。讓我們進入一些更高階的話題。我說的是高階,但其中很多都非常簡單。

讀/寫分離

如果你曾經在兩個類之間共享佇列,你就會知道任何一個類都可以讀/寫,即使它們本不應該這樣做。例如:

class MyProducer
{
    private readonly Queue<int> _queue;

    public MyProducer(Queue<int> queue)
    {
        _queue = queue;
    }
}

class MyConsumer
{
    private readonly Queue<int> _queue;

    public MyConsumer(Queue<int> queue)
    {
        _queue = queue;
    }
}

因此,生產者應該只寫佇列,消費者應該只讀佇列,在這兩種情況下,它們可以對佇列執行所有操作。雖然你可能在自己的腦海中希望消費者只讀取,但另一個開發人員可能會出現呼叫Enqueue,除了程式碼審查之外,沒有什麼可以阻止他們犯這個錯誤。

但是有了Channel,我們可以做不同的事情。

class Program
{
    static async Task Main(string[] args)
    {
        var myChannel = Channel.CreateUnbounded<int>();
        var producer = new MyProducer(myChannel.Writer);
        var consumer = new MyConsumer(myChannel.Reader);
    }
}

class MyProducer
{
    private readonly ChannelWriter<int> _channelWriter;

    public MyProducer(ChannelWriter<int> channelWriter)
    {
        _channelWriter = channelWriter;
    }
}

class MyConsumer
{
    private readonly ChannelReader<int> _channelReader;

    public MyConsumer(ChannelReader<int> channelReader)
    {
        _channelReader = channelReader;
    }
}

在這個例子中,我添加了一個main方法來向你展示如何建立writer/reader,但它非常簡單。這裡我們可以看到,對於我們的生產者,我只傳遞給它一個ChannelWriter,所以它只能做寫操作。對於我們的消費者,我們傳遞給它一個ChannelReader,所以它只能讀取。

當然,這並不意味著其他開發人員不能修改程式碼並開始注入根Channel物件,或者同時傳入ChannelWriter/ChannelReader,但這至少比之前的情況要好得多。

完成一個Channel

我們在前面看到,當在通道上呼叫ReadAsync()時,它實際上會在那裡等待訊息,但是如果沒有更多的訊息到來呢?對於.net中的其他佇列,我們通常需要傳遞某種共享的布林值或一個CancellationToken。但有了Channel,就更容易了。

考慮以下幾點:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int>();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    try
    {
        while (true)
        {
            var item = await myChannel.Reader.ReadAsync();
            Console.WriteLine(item);
            await Task.Delay(1000);
        }
    }catch(ChannelClosedException e)
    {
        Console.WriteLine("Channel was closed!");
    }
}

我讓第二個執行緒儘可能快地寫入我們的Channel,然後完成它。然後我們的讀取器緩慢讀取,每次讀取之間有1秒的延遲。注意,我們捕獲了ChannelClosedExecption,當你嘗試從關閉通道讀取最後訊息之後時將呼叫它。

我只是想說清楚。在Channel上呼叫Complete()不會立即關閉通道並殺死讀取該通道的所有人。而是通知所有服務,一旦最後一條訊息被讀取,我們就完成了。這很重要,因為這意味著當我們等待新條目時,當佇列是空的時,當佇列是滿的時,是否呼叫Complete()都無關緊要。我們可以肯定,我們將完成所有可得到的工作。

在Channel中使用IAsyncEnumerable

以我們試圖關閉一個Channel為例,有兩件事引起了我的注意。

我們有一個while(true)迴圈。這並不是真的那麼糟糕,但它有點礙眼。

為了打破這個迴圈,並知道Channel已經完成,我們必須捕獲異常並將其吞下。

使用命令“ReadAllAsync()”來解決這些問題,它返回一個IAsyncEnumerable。程式碼看起來有點像這樣:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int>();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    await foreach(var item in myChannel.Reader.ReadAllAsync())
    {
        Console.WriteLine(item);
        await Task.Delay(1000);
    }
}

現在程式碼讀起來好多了,並且去掉了捕獲異常的一些多餘的東西。因為我們使用的是IAsyncEnumerable,所以我們仍然可以像以前那樣等待每一項,但是我們不再需要捕獲異常,因為當Channel完成時,它只是簡單地說沒有其他東西了,然後迴圈退出。

同樣,這消除了在處理佇列時必須編寫的一些混亂程式碼。以前你必須編寫某種無限迴圈,而現在它只是一個真正整潔的迴圈,可以處理底層的所有東西。

接下來是什麼

到目前為止,我們一直在使用“無限的”通道。你可能已經猜到了,當然也可以選擇使用BoundedChannel。檢視本系列的下一部分,更好地理解這些東西。

 歡迎關注我的公眾號,如果你有喜歡的外文技術文章,可以通過公眾號留言推薦給我。

 

原文連結:https://dotnetcoretutorials.com/2020/11/24/using-channels-in-net-core-part-2-advanced-channels/