1. 程式人生 > >Rust入坑指南:齊頭並進(下)

Rust入坑指南:齊頭並進(下)

前文中我們聊了Rust如何管理執行緒以及如何利用Rust中的鎖進行程式設計。今天我們繼續學習併發程式設計,

原子型別

許多程式語言都會提供原子型別,Rust也不例外,在前文中我們聊了Rust中鎖的使用,有了鎖,就要小心死鎖的問題,Rust雖然聲稱是安全併發,但是仍然無法幫助我們解決死鎖的問題。原子型別就是程式語言為我們提供的無鎖併發程式設計的最佳手段。熟悉Java的同學應該知道,Java的編譯器並不能保證程式碼的執行順序,編譯器會對我們的程式碼的執行順序進行優化,這一操作成為指令重排。而Rust的多執行緒記憶體模型不會進行指令重排,它可以保證指令的執行順序。

通常來講原子型別會提供以下操作:

  • Load:從原子型別讀取值
  • Store:為一個原子型別寫入值
  • CAS(Compare-And-Swap):比較並交換
  • Swap:交換
  • Fetch-add(sub/and/or):表示一系列的原子的加減或邏輯運算

Ok,這些基礎的概念聊完以後,我們就來看看Rust為我們提供了哪些原子型別。Rust的原子型別定義在標準庫std::sync::atomic中,目前它提供了12種原子型別。

下面這段程式碼是Rust演示瞭如何用原子型別實現一個自旋鎖。

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

fn main() {
    let spinlock = Arc::new(AtomicUsize::new(1));
    let spinlock_clone = spinlock.clone();
    let thread = thread::spawn(move|| {
        spinlock_clone.store(0, Ordering::SeqCst);
    });
    while spinlock.load(Ordering::SeqCst) != 0 {}
    if let Err(panic) = thread.join() {
        println!("Thread had an error: {:?}", panic);
    }
}

我們利用AtomicUsize的store方法將它的值設定為0,然後用load方法獲取到它的值,如果不是0,則程式一直空轉。在store和load方法中,我們都用到了一個引數:Ordering::SeqCst,在宣告中能看出來它也是屬於atomic包。

我們在文件中發現它是一個列舉。其定義為

pub enum Ordering {
    Relaxed,
    Release,
    Acquire,
    AcqRel,
    SeqCst,
}

它的作用是將記憶體順序的控制權交給開發者,我們可以自己定義底層的記憶體排序。下面我們一起來看一下這5種排序分別代表什麼意思

  • Relaxed:表示「沒有順序」,也就是開發者不會干預執行緒順序,執行緒只進行原子操作
  • Release:對於使用Release的store操作,在它之前所有使用Acquire的load操作都是可見的
  • Acquire:對於使用Acquire的load操作,在它之前的所有使用Release的store操作也都是可見的
  • AcqRel:它代表讀時使用Acquire順序的load操作,寫時使用Release順序的store操作
  • SeqCst:使用了SeqCst的原子操作都必須先儲存,再載入。

一般情況下建議使用SeqCst,而不推薦使用Relaxed。

執行緒間通訊

Go語言文件中有這樣一句話:不要使用共享記憶體來通訊,應該使用通訊實現共享記憶體。

Rust標準庫選擇了CSP併發模型,也就是依賴channel來進行執行緒間的通訊。它的定義是在標準庫std::sync::mpsc中,裡面定義了三種類型的CSP程序:

  • Sender:傳送非同步訊息
  • SyncSender:傳送同步訊息
  • Receiver:用於接收訊息

我們通過一個栗子來看一下channel是如何建立並收發訊息的。

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

首先,我們先是使用了channel()函式來建立一個channel,它會返回一個(Sender, Receiver)元組。它的緩衝區是無界的。此外,我們還可以使用sync_channel()來建立channel,它返回的則是(SyncSender, Receiver)元組,這樣的channel傳送訊息是同步的,並且可以設定緩衝區大小。

接著,在子執行緒中,我們定義了一個字串變數,並使用send()函式向channel中傳送訊息。這裡send返回的是一個Result型別,所以使用unwrap來傳播錯誤。

在main函式最後,我們又用recv()函式來接收訊息。

這裡需要注意的是,send()函式會轉移所有權,所以,如果你在傳送訊息之後再使用val變數時,程式就會報錯。

現在我們已經掌握了使用Channel進行執行緒間通訊的方法了,這裡還有一段程式碼,感興趣的同學可以自己執行一下這段程式碼看是否能夠順利執行。如果不能,應該怎麼修改這段程式碼呢?

use std::thread;
use std::sync::mpsc;
fn main() {
    let (tx, rx) = mpsc::channel();
    for i in 0..5 {
        let tx = tx.clone();
        thread::spawn(move || {
            tx.send(i).unwrap();
        });
    }

    for rx in rx.iter() {
        println!("{:?}", j);
    }
}

執行緒池

在實際工作中,如果每次都要建立新的執行緒,每次建立、銷燬執行緒的開銷就會變得非常可觀,甚至會成為系統性能的瓶頸。對於這種問題,我們通常使用執行緒池來解決。

Rust的標準庫中沒有現成的執行緒池給我們使用,不過還是有一些第三方庫來支援的。這裡我使用的是threadpool。

首先需要在Cargo.toml中增加依賴threadpool = "1.7.1"。然後就可以使用use threadpool::ThreadPool;將ThreadPool引入我們的程式中了。

use threadpool::ThreadPool;
use std::sync::mpsc::channel;

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = ThreadPool::new(n_workers);

    let (tx, rx) = channel();
    for _ in 0..n_jobs {
        let tx = tx.clone();
        pool.execute(move|| {
            tx.send(1).expect("channel will be there waiting for the pool");
        });
    }

    assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
}

這裡我們使用ThreadPool::new()來建立一個執行緒池,初始化4個工作執行緒。使用時用execute()方法就可以拿出一個執行緒來進行具體的工作。

總結

今天我們介紹了Rust併發程式設計的三種特性:原子型別、執行緒間通訊和執行緒池的使用。

原子型別是我們進行無鎖併發的重要手段,執行緒間通訊和執行緒池也都是工作中所必須使用的。當然併發程式設計的知識遠不止於此,大家有興趣的可以自行學習也可以與我交流討論