xv6學習筆記(5) : 鎖與管道與多cpu
1. xv6鎖結構
1. xv6作業系統要求在核心臨界區操作時中斷必須關閉。
如果此時中斷開啟,那麼可能會出現以下死鎖情況:
- 程序A在核心態執行並拿下了p鎖時,觸發中斷進入中斷處理程式。
- 中斷處理程式也在核心態中請求p鎖,由於鎖在A程序手裡,且只有A程序執行時才能釋放p鎖,因此中斷處理程式必須返回,p鎖才能被釋放。
- 那麼此時中斷處理程式會永遠拿不到鎖,陷入無限迴圈,進入死鎖。
2. xv6中的自旋鎖
Xv6中實現了自旋鎖(Spinlock)用於核心臨界區訪問的同步和互斥。自旋鎖最大的特徵是當程序拿不到鎖時會進入無限迴圈,直到拿到鎖退出迴圈。Xv6使用100ms一次的時鐘中斷和Round-Robin排程演算法來避免陷入自旋鎖的程序一直無限迴圈下去。Xv6允許同時執行多個CPU核,多核CPU上的等待佇列實現相當複雜,因此使用自旋鎖是相對比較簡單且能正確執行的實現方案。
v6中鎖的定義如下
// Mutual exclusion lock.
struct spinlock {
uint locked; // Is the lock held?
// For debugging:
char *name; // Name of lock.
struct cpu *cpu; // The cpu holding the lock.
uint pcs[10]; // The call stack (an array of program counters)
// that locked the lock.
};
核心的變數只有一個locked
,當locked
為1時代表鎖已被佔用,反之未被佔用,初始值為0。
在呼叫鎖之前,必須對鎖進行初始化。
void initlock(struct spinlock *lk, char *name) {
lk->name = name;
lk->locked = 0;
lk->cpu = 0;
}
這裡的需要注意的就是如何對locked
變數進行原子操作佔用鎖和釋放鎖。
這兩步具體被實現為acquire()
和release()
函式。
1. acquire函式
- 首先禁止中斷防止死鎖(如果這裡不禁止中斷就可能會出現上述的死鎖問題
- 然後利用
xchg
來上鎖
void
acquire(struct spinlock *lk)
{
pushcli(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");
// The xchg is atomic.
while(xchg(&lk->locked, 1) != 0)
;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen after the lock is acquired.
__sync_synchronize();
// Record info about lock acquisition for debugging.
lk->cpu = mycpu();
getcallerpcs(&lk, lk->pcs);
}
xchg
這裡採用內聯彙編實現
最前面的lock表示這是一條指令字首,它保證了這條指令對匯流排和快取的獨佔權,也就是這條指令的執行過程中不會有其他CPU或同CPU內的指令訪問快取和記憶體。
static inline uint xchg(volatile uint *addr, uint newval) {
uint result;
// The + in "+m" denotes a read-modify-write operand.
asm volatile("lock; xchgl %0, %1" :
"+m" (*addr), "=a" (result) :
"1" (newval) :
"cc");
return result;
}
最後,acquire()
函式使用__sync_synchronize
為了避免編譯器對這段程式碼進行指令順序調整的話和避免CPU在這塊程式碼採用亂序執行的優化。
這樣就保證了原子操作的加鎖。也就是把lk->locked
設定為1
2. release函式
// Release the lock.
void release(struct spinlock *lk) {
if(!holding(lk))
panic("release");
lk->pcs[0] = 0;
lk->cpu = 0;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that all the stores in the critical
// section are visible to other cores before the lock is released.
// Both the C compiler and the hardware may re-order loads and
// stores; __sync_synchronize() tells them both not to.
__sync_synchronize();
// Release the lock, equivalent to lk->locked = 0.
// This code can't use a C assignment, since it might
// not be atomic. A real OS would use C atomics here.
asm volatile("movl $0, %0" : "+m" (lk->locked) : );
popcli();
}
release
函式為了保證設定locked為0的操作的原子性,同樣使用了內聯彙編。
2. 訊號量
在xv6中沒有訊號量,但是我看部落格有大佬利用xv6中提供的介面實現了訊號量
3. 喚醒與睡眠
1. 先看有問題的版本
struct q {
struct spinlock lock;
void *ptr;
};
void *
send(struct q *q, void *p)
{
acquire(&q->lock);
while(q->ptr != 0)
;
q->ptr = p;
wakeup(q);
release(&q->lock);
}
void*
recv(struct q *q)
{
void *p;
acquire(&q->lock);
while((p = q->ptr) == 0)
sleep(q);
q->ptr = 0;
release(&q->lock;
return p;
}
這個程式碼乍一看是沒什麼問題,發出訊息之後,呼叫wakeup喚醒程序來接受。而且在傳送的過程中保持鎖。反正出現問題。在傳送完成後釋放鎖。同時wakeup會讓程序從seleep狀態返回,這裡的返回就會到recv函式來進行接受。
但是這裡的問題在於噹噹 recv
帶著鎖 q->lock
進入睡眠後,傳送者就會在希望獲得鎖時一直阻塞。
所以想要解決問題,我們必須要改變 sleep
的介面。sleep
必須將鎖作為一個引數,然後在進入睡眠狀態後釋放之;這樣就能避免上面提到的“遺失的喚醒”問題。一旦程序被喚醒了,sleep
在返回之前還需要重新獲得鎖。於是我們應該使用下面的程式碼:
2. 上面版本的改進
struct q {
struct spinlock lock;
void *ptr;
};
void *
send(struct q *q, void *p)
{
acquire(&q->lock);
while(q->ptr != 0)
;
q->ptr = p;
wakeup(q);
release(&q->lock);
}
void*
recv(struct q *q)
{
void *p;
acquire(&q->lock);
while((p = q->ptr) == 0)
sleep(q, &q->lock);
q->ptr = 0;
release(&q->lock;
return p;
}
recv
持有 q->lock
就能防止 send
在 recv
檢查 q->ptr
與呼叫 sleep
之間呼叫 wakeup
了。當然,為了避免死鎖,接收程序最好別在睡眠時仍持有鎖。所以我們希望 sleep
能用原子操作釋放 q->lock
並讓接收程序進入休眠狀態。
完整的傳送者/接收者的實現還應該讓傳送者在等待接收者拿出前一個 send
放入的值時處於休眠狀態。
3. xv6的wakeup和sleep
sleep(chan)
讓程序在任意的 chan
上休眠,稱之為等待佇列(wait channel)。sleep
讓呼叫程序休眠,釋放所佔 CPU。wakeup(chan)
則喚醒在 chan
上休眠的所有程序,讓他們的 sleep
呼叫返回。如果沒有程序在 chan
上等待喚醒,wakeup
就什麼也不做。
總體思路是希望
sleep
將當前程序轉化為SLEEPING
狀態並呼叫sched
以釋放 CPU,而wakeup
則尋找一個睡眠狀態的程序並把它標記為RUNNABLE
。
1. sleep函式
- 首先sleep會做幾個檢查:必須存在當前程序、並且
sleep
必須持有鎖(2558-2559)。接著sleep
要求持有ptable.lock
- 於是該程序就會同時持有鎖
ptable.lock
和lk
了。呼叫者(例如recv
)是必須持有lk
的,這樣可以保證其他程序(例如一個正在執行的send
)無法呼叫wakeup(chan)
。而如今sleep
已經持有了ptable.lock
,那麼它現在就能安全地釋放lk
了:這樣即使別的程序呼叫了wakeup(chan)
,wakeup
也不可能在沒有持有ptable.lock
的情況下執行,所以wakeup
必須等待sleep
讓程序睡眠後才能執行。這樣一來,wakeup
就不會錯過sleep
了。 - 這裡要注意如果
lk == ptable.lock
的話則就會跳過對於if語句的判斷 - 後面做的就是把當前程序放入chan中(等待佇列)
- 隨後進行排程交出cpu
void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
if(p == 0)
panic("sleep");
if(lk == 0)
panic("sleep without lk");
// Must acquire ptable.lock in order to
// change p->state and then call sched.
// Once we hold ptable.lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup runs with ptable.lock locked),
// so it's okay to release lk.
if(lk != &ptable.lock){ //DOC: sleeplock0
acquire(&ptable.lock); //DOC: sleeplock1
release(lk);
}
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
if(lk != &ptable.lock){ //DOC: sleeplock2
release(&ptable.lock);
acquire(lk);
}
}
2. wakeup函式
- 遍歷所有的程序表
- 判斷p->chan == chan 設定為runable
// Wake up all processes sleeping on chan.
void
wakeup(void *chan)
{
acquire(&ptable.lock);
wakeup1(chan);
release(&ptable.lock);
}
// wakeup --> wakeup1
// Wake up all processes sleeping on chan.
// The ptable lock must be held.
//
static void
wakeup1(void *chan)
{
struct proc *p;
for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
if(p->state == SLEEPING && p->chan == chan)
p->state = RUNNABLE;
}
4.管道
在xv6中管道是比較複雜的使用 sleep
/wakeup
來同步讀者寫者佇列的例子。
1. 管道結構體
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};
我們從管道的一端寫入資料位元組,然後資料被拷貝到核心緩衝區中,接著就能從管道的另一端讀取資料了。
中有一個鎖 lock
和記憶體緩衝區。其中的域 nread
和 nwrite
表示從緩衝區讀出和寫入的位元組數。pipe
對緩衝區做了包裝,使得雖然計數器繼續增長,但實際上在 data[PIPESIZE - 1]
之後寫入的位元組存放在 data[0]
。這樣就讓我們可以區分一個滿的緩衝區(nwrite == nread + PIPESIZE
)和一個空的緩衝區(nwrite == nread
),但這也意味著我們必須使用 buf[nread % PIPESIZE]
而不是 buf[nread]
來讀出/寫入資料。
2. Piperead()函式
- 當緩衝資料區為空的時候(並且writeopen被打開了)這個時候就要等。因為寫操作還沒完成所以主動sleep
- 否則就可以讀了
- 讀完了就喚醒寫
int
piperead(struct pipe *p, char *addr, int n)
{
int i;
acquire(&p->lock);
while(p->nread == p->nwrite && p->writeopen){ //DOC: pipe-empty
if(myproc()->killed){
release(&p->lock);
return -1;
}
sleep(&p->nread, &p->lock); //DOC: piperead-sleep
}
for(i = 0; i < n; i++){ //DOC: piperead-copy
if(p->nread == p->nwrite)
break;
addr[i] = p->data[p->nread++ % PIPESIZE];
}
wakeup(&p->nwrite); //DOC: piperead-wakeup
release(&p->lock);
return i;
}
3. pipewrite()函式
- 當緩衝區已經滿了,但是readopen被打開了,這個時候就要喚醒讀緩衝區的程序去讀操作。
- 否則就是簡單的寫操作了
- 寫完了就喚醒讀
int
pipewrite(struct pipe *p, char *addr, int n)
{
int i;
acquire(&p->lock);
for(i = 0; i < n; i++){
while(p->nwrite == p->nread + PIPESIZE){ //DOC: pipewrite-full
if(p->readopen == 0 || myproc()->killed){
release(&p->lock);
return -1;
}
wakeup(&p->nread);
sleep(&p->nwrite, &p->lock); //DOC: pipewrite-sleep
}
p->data[p->nwrite++ % PIPESIZE] = addr[i];
}
wakeup(&p->nread); //DOC: pipewrite-wakeup1
release(&p->lock);
return n;
}
5. xv6下的多cpu啟動
這裡多cpu主要結合828的lab來一起分析。要是有不同的也會做一下對比
在xv6中和828中對於多cpu的支援都是SMP體系架構下的。
在SMP下所有的cpu的地位是一模一樣的,他們具有相同的許可權,相同的資源。所有CPU在SMP中在功能上相同,但在引導過程中,它們可以分為兩種型別:
- 引導處理器BSP(bootstrap processor)負責初始化系統並且引導作業系統。
- 應用處理器AP(application processor)在作業系統啟動之後被BSP啟用。
由哪個(些)處理器來擔任BSP的功能是由BIOS和硬體決定的,之前的所有程式碼都是在BSP上實現的。
總的步驟如下參考
BIOS 啟動 BSP,完成讀取核心balabala的操作一直到main.c中
BSP 從
MP Configuration Table
中獲取多處理器的的配置資訊BSP 啟動 APs,通過傳送
INIT-SIPI-SIPI
訊息給 APsAPs 啟動,各個 APs 處理器要像 BSP 一樣建立自己的一些機制,比如保護模式,分頁,中斷等等
關於多核啟動在網上看見了一個很好的部落格我這裡也是大多參考於它
1. mpinit函式
mpinit
函式負責是檢測CPU個數並將檢測到的CPU存入一個全域性的陣列中。
這裡我們需要解析MP Configuration Table。而在xv6中這個資訊使用mp結構體來儲存的。下面是要注意的點!!
這個結構只可能出現在三個位置,尋找 floating pointer 的時候就按下面的循序查詢:
- EBDA(Extended BIOS Data Area)最開始的 1KB
- 系統基本記憶體的最後 1KB (對於 640 KB 的基本記憶體來說就是 639KB-640KB,對於 512KB 的基本記憶體來說就是 511KB-512KB)
- BIOS 的 ROM 區域,在
到
之間
第一步要呼叫
mpconfig
函式來解析出mpvoid
mpinit(void)
{
uchar *p, *e;
int ismp;
struct mp *mp;
struct mpconf *conf;
struct mpproc *proc;
struct mpioapic *ioapic; if((conf = mpconfig(&mp)) == 0)
panic("Expect to run on an SMP");
而
mpconfig
函式最重要的就是利用mpserach
函式關於對照關係上面給了一個表格
這裡就是完全按照上面說的三個順序進行測試的,只不過對應的地址變化(我也不會,也懶得看了。)就是知道他是依此去這三個地方find mp就行了
static struct mp* mpsearch(void) //尋找mp floating pointer 結構
{
uchar *bda;
uint p;
struct mp *mp; bda = (uchar *) P2V(0x400); //BIOS Data Area地址 if((p = ((bda[0x0F]<<8)| bda[0x0E]) << 4)){ //在EBDA中最開始1K中尋找
if((mp = mpsearch1(p, 1024)))
return mp;
} else { //在基本記憶體的最後1K中查詢
p = ((bda[0x14]<<8)|bda[0x13])*1024;
if((mp = mpsearch1(p-1024, 1024)))
return mp;
}
return mpsearch1(0xF0000, 0x10000); //在0xf0000~0xfffff中查詢
}
下面就是根據讀到的conf來解析cpu資訊
ismp = 1;
lapic = (uint*)conf->lapicaddr;
for(p=(uchar*)(conf+1), e=(uchar*)conf+conf->length; p<e; ){
switch(*p){
case MPPROC: // 如果是處理器
proc = (struct mpproc*)p;
if(ncpu < NCPU) {
cpus[ncpu].apicid = proc->apicid; // 為cpu分配apicid 也就是說apicid標識了唯一的cpu
ncpu++;
}
p += sizeof(struct mpproc);
continue;
case MPIOAPIC:
ioapic = (struct mpioapic*)p;
ioapicid = ioapic->apicno;
p += sizeof(struct mpioapic);
continue;
case MPBUS:
case MPIOINTR:
case MPLINTR:
p += 8;
continue;
default:
ismp = 0;
break;
}
}
if(!ismp)
panic("Didn't find a suitable machine");
if(mp->imcrp){
// Bochs doesn't support IMCR, so this doesn't run on Bochs.
// But it would on real hardware.
outb(0x22, 0x70); // Select IMCR
outb(0x23, inb(0x23) | 1); // Mask external interrupts.
}
}
2. lapicinit(void)
這是在啟動多個aps之間要做的一些初始化操作。
下面用得最多的lapicw
操作其實非常簡單
static void
lapicw(int index, int value)
{
lapic[index] = value;
lapic[ID]; // wait for write to finish, by reading
}
中斷流程:
- 一個 CPU 給其他 CPU 傳送中斷的時候, 就在自己的 ICR 中, 放中斷向量和目標LAPIC ID, 然後通過匯流排傳送到對應 LAPIC,
- 目標 LAPIC 根據自己的 LVT(Local Vector Table) 來對不同的中斷進行處理.
- 處理完了寫EOI 表示處理完了.
所以下面的操作其實就是在初始化LVT表
void
lapicinit(void)
{
if(!lapic)
return;
// Enable local APIC; set spurious interrupt vector.
lapicw(SVR, ENABLE | (T_IRQ0 + IRQ_SPURIOUS));
// The timer repeatedly counts down at bus frequency
// from lapic[TICR] and then issues an interrupt.
// If xv6 cared more about precise timekeeping,
// TICR would be calibrated using an external time source.
lapicw(TDCR, X1);
lapicw(TIMER, PERIODIC | (T_IRQ0 + IRQ_TIMER));
lapicw(TICR, 10000000);
// Disable logical interrupt lines.
lapicw(LINT0, MASKED);
lapicw(LINT1, MASKED);
// Disable performance counter overflow interrupts
// on machines that provide that interrupt entry.
if(((lapic[VER]>>16) & 0xFF) >= 4)
lapicw(PCINT, MASKED);
// Map error interrupt to IRQ_ERROR.
lapicw(ERROR, T_IRQ0 + IRQ_ERROR);
// Clear error status register (requires back-to-back writes).
lapicw(ESR, 0);
lapicw(ESR, 0);
// Ack any outstanding interrupts.
lapicw(EOI, 0);
// Send an Init Level De-Assert to synchronise arbitration ID's.
lapicw(ICRHI, 0);
lapicw(ICRLO, BCAST | INIT | LEVEL);
while(lapic[ICRLO] & DELIVS)
;
// Enable interrupts on the APIC (but not on the processor).
lapicw(TPR, 0);
}
3. startothers函式
尋到了有多少個 CPU,而且也有了每個 CPU 的標識資訊,就可以去啟動它們了,直接來看 startothers 的程式碼:
entryother.S 是APs啟動時要執行的程式碼,連結器將映像放在
_binary_entryother_start
然後將其移動到0x7000然後利用for迴圈,迴圈啟動APs
最後呼叫
lapicstartap()
函式來啟動 APs
static void
startothers(void)
{
extern uchar _binary_entryother_start[], _binary_entryother_size[];
uchar *code;
struct cpu *c;
char *stack;
// Write entry code to unused memory at 0x7000.
// The linker has placed the image of entryother.S in
// _binary_entryother_start.
code = P2V(0x7000);
memmove(code, _binary_entryother_start, (uint)_binary_entryother_size);
for(c = cpus; c < cpus+ncpu; c++){
if(c == mycpu()) // We've started already.
continue;
// Tell entryother.S what stack to use, where to enter, and what
// pgdir to use. We cannot use kpgdir yet, because the AP processor
// is running in low memory, so we use entrypgdir for the APs too.
stack = kalloc();
*(void**)(code-4) = stack + KSTACKSIZE;
*(void(**)(void))(code-8) = mpenter;
*(int**)(code-12) = (void *) V2P(entrypgdir);
lapicstartap(c->apicid, V2P(code));
// wait for cpu to finish mpmain()
while(c->started == 0)
;
}
}
4. lapicstartap函式
前面提到過BSP是通過傳送訊號給APs來啟動其他cpu的,簡單來說就是一個 CPU 通過寫 LAPIC 的 ICR 暫存器來與其他 CPU 進行通訊
其實下面一大堆我覺得看不懂也沒差。。太低層了。只要知道下面這一點就行了
BSP通過向AP逐個傳送中斷來啟動AP,首先發送INIT中斷來初始化AP,然後傳送SIPI中斷來啟動AP,傳送中斷使用的是寫ICR暫存器的方式
void
lapicstartap(uchar apicid, uint addr)
{
int i;
ushort *wrv;
// "The BSP must initialize CMOS shutdown code to 0AH
// and the warm reset vector (DWORD based at 40:67) to point at
// the AP startup code prior to the [universal startup algorithm]."
outb(CMOS_PORT, 0xF); // offset 0xF is shutdown code
outb(CMOS_PORT+1, 0x0A);
wrv = (ushort*)P2V((0x40<<4 | 0x67)); // Warm reset vector
wrv[0] = 0;
wrv[1] = addr >> 4;
// "Universal startup algorithm."
// Send INIT (level-triggered) interrupt to reset other CPU.
lapicw(ICRHI, apicid<<24);
lapicw(ICRLO, INIT | LEVEL | ASSERT);
microdelay(200);
lapicw(ICRLO, INIT | LEVEL);
microdelay(100); // should be 10ms, but too slow in Bochs!
// Send startup IPI (twice!) to enter code.
// Regular hardware is supposed to only accept a STARTUP
// when it is in the halted state due to an INIT. So the second
// should be ignored, but it is part of the official Intel algorithm.
// Bochs complains about the second one. Too bad for Bochs.
for(i = 0; i < 2; i++){
lapicw(ICRHI, apicid<<24);
lapicw(ICRLO, STARTUP | (addr>>12));
microdelay(200);
}
}
5. 看一下entryother.S
上面的程式碼總共就幹了兩件事,1.找到所有的cpu資訊、2. 利用IPI
中斷,也就是寫ICR
暫存器來啟動其他的APS。
而啟動APs是要執行boot程式碼的。對應的地址是0x7000。也就是entryother.S
的程式碼
這個程式碼前面乾的事情和entry.S
基本一樣的。需要注意的就是下面這兩行
# Switch to the stack allocated by startothers()
movl (start-4), %esp
# Call mpenter()
call *(start-8)
// Other CPUs jump here from entryother.S.
static void
mpenter(void)
{
switchkvm(); //切換到核心頁表
seginit(); // 初始化gdt
lapicinit(); // 初始化apic
mpmain();
}
static void mpmain(void)
{
cprintf("cpu%d: starting %d\n", cpuid(), cpuid());
idtinit(); // 載入GDT
xchg(&(mycpu()->started), 1); // 將started置1表啟動完成了
scheduler(); // 開始排程程序執行程式了
}
可以看到,這裡面所做的工作主要還是初始化建立環境,最後 CPU 這個結構體中的元素 started 置 1 表示這個 CPU 已經啟動好了,這裡就會通知 startothers 函式,可以啟動下一個 AP 了。最後就是呼叫 scheduler() 可以開始排程執行程式了。
下面是startothers
中的一段while迴圈。
// startothers
// wait for cpu to finish mpmain()
while(c->started == 0)
;