1. 程式人生 > >讀寫(Read-Write)鎖實現

讀寫(Read-Write)鎖實現

大部分情況下,使用一個數據結構時並不會對其進行修改。而是隻需要一個區段的讀取許可權來完成工作。如果有多個執行緒需要讀取某一個數據,沒有理由不讓它們併發的進行讀取。Spinlock 鎖無法區分只有讀以及讀寫混合的場景,因為 spinlock 鎖無法滿足這種潛在的並行操作。為了實現該並行操作,我們需要讀寫鎖。

typedef struct dumbrwlock dumbrwlock;
struct dumbrwlock
{
    spinlock lock;
    unsigned readers;
};

static void dumb_wrlock(dumbrwlock *l)
{
    /* 獲取寫鎖,獲得後新的讀請求等待 */
spin_lock(&l->lock); /* 等待之前的讀請求完成 */ while (l->readers) cpu_relax(); } static void dumb_wrunlock(dumbrwlock *l) { spin_unlock(&l->lock); } static int dumb_wrtrylock(dumbrwlock *l) { /* Want no readers */ if (l->readers) return EBUSY; /* Try to get write lock */
if (spin_trylock(&l->lock)) return EBUSY; if (l->readers) { /* Oops, a reader started */ spin_unlock(&l->lock); return EBUSY; } /* Success! */ return 0; } static void dumb_rdlock(dumbrwlock *l) { while (1) { /* 增一獲取讀鎖 */ atomic_inc(&l->readers); /* Success? */
if (!l->lock) return; /* 有寫鎖加鎖,解鎖讀鎖 */ atomic_dec(&l->readers); while (l->lock) cpu_relax(); } } static void dumb_rdunlock(dumbrwlock *l) { atomic_dec(&l->readers); } static int dumb_rdtrylock(dumbrwlock *l) { /* Speculatively take read lock */ atomic_inc(&l->readers); /* Success? */ if (!l->lock) return 0; /* Failure - undo */ atomic_dec(&l->readers); return EBUSY; } static int dumb_rdupgradelock(dumbrwlock *l) { /* 升級為寫鎖 */ if (spin_trylock(&l->lock)) return EBUSY; /* I'm no longer a reader */ atomic_dec(&l->readers); /* Wait for all other readers to finish */ while (l->readers) cpu_relax(); return 0; }

作為評價以上程式碼的標準,比起 spinlock 我們需要一些額外的資訊。讀請求的比例是一個很重要的因素。讀請求越多,我們應該能夠併發更多的執行緒,程式碼的速度也應該更快。讀請求與寫請求的隨機分佈也很重要,像真實的讀寫場景一樣。因此我們使用了一個併發隨機數生成器,通過在一個位元組(256位)中隨機的選擇 1,25,128或者250個位,我們能夠模擬從大部分請求為讀請求到大部分請求為寫請求的場景。最後,我們需要觀測對於競爭的處理效果。通常情況下,在競爭激烈的場景寫鎖更容易被用到,所以我們只檢視執行緒數等於處理器核數的場景。
以上 dumb 鎖演算法在沒有競爭的場景表現的非常差,如果只使用一個執行緒我們有結果:

Writers per 256 1 25 128 250
Time(s) 3.7 3.8 4.6 5.4

如同預期,當寫請求比例上升時,讀寫鎖的表現趨近與 spinlock 鎖的表現。儘管有競爭,dumb 鎖演算法實際上表現的非常好,在 4 個執行緒的情況下:

Writers per 256 1 25 128 250
Time(s) 1.1 1.9 4.4 5.7

一個很明顯的改進方法是用 ticketlock 演算法來替換很慢的 spinlock 鎖。我們有:

typedef struct dumbtrwlock dumbtrwlock;
struct dumbtrwlock
{
    ticketlock lock;
    unsigned readers;
};

static void dumbt_wrlock(dumbtrwlock *l)
{
    /* Get lock */
    ticket_lock(&l->lock);

    /* Wait for readers to finish */
    while (l->readers) cpu_relax();
}

static void dumbt_wrunlock(dumbtrwlock *l)
{
    ticket_unlock(&l->lock);
}

static int dumbt_wrtrylock(dumbtrwlock *l)
{
    /* Want no readers */
    if (l->readers) return EBUSY;

    /* Try to get write lock */
    if (ticket_trylock(&l->lock)) return EBUSY;

    if (l->readers)
    {
        /* Oops, a reader started */
        ticket_unlock(&l->lock);
        return EBUSY;
    }

    /* Success! */
    return 0;
}

static void dumbt_rdlock(dumbtrwlock *l)
{
    while (1)
    {
        /* Success? */
        if (ticket_lockable(&l->lock))
        {
            /* Speculatively take read lock */
            atomic_inc(&l->readers);

            /* Success? */
            if (ticket_lockable(&l->lock)) return;

            /* Failure - undo, and wait until we can try again */
            atomic_dec(&l->readers);
        }

        while (!ticket_lockable(&l->lock)) cpu_relax();
    }
}

static void dumbt_rdunlock(dumbtrwlock *l)
{
    atomic_dec(&l->readers);
}

static int dumbt_rdtrylock(dumbtrwlock *l)
{
    /* Speculatively take read lock */
    atomic_inc(&l->readers);

    /* Success? */
    if (ticket_lockable(&l->lock)) return 0;

    /* Failure - undo */
    atomic_dec(&l->readers);

    return EBUSY;
}

static int dumbt_rdupgradelock(dumbtrwlock *l)
{
    /* Try to convert into a write lock */
    if (ticket_trylock(&l->lock)) return EBUSY;

    /* I'm no longer a reader */
    atomic_dec(&l->readers);

    /* Wait for all other readers to finish */
    while (l->readers) cpu_relax();

    return 0;
}

這個演算法在競爭激烈的場景表現的更好,在全部為寫請求時花費了 3.7s。然而在在非擁塞場景確沒有優勢:

Writers per 256 1 25 128 250
Time(s) 2.0 2.5 3.7 4.5

在低寫請求比例的場景該演算法更慢,在高寫請求比例場景更快。而我們使用讀寫鎖的大部分場景寫的比例都是低的,這是對該演算法不理的地方。其將會比它的競爭者慢兩倍。
為了減少衝突,來獲取速度。我們來探究下一個很複雜的演算法實現,在 Reactos 中用來模擬 Microsoft Window 的 slim read-write (SRW)鎖。其使用一個等待佇列,和一個位鎖來控制其等待佇列的處理。它設計讓等待者在不同的記憶體區段空轉來處理更多執行緒的場景。

/* Have a wait block */
#define SRWLOCK_WAIT                    1

/* Users are readers */
#define SRWLOCK_SHARED                  2

/* Bit-lock for editing the wait block */
#define SRWLOCK_LOCK                    4
#define SRWLOCK_LOCK_BIT                2

/* Mask for the above bits */
#define SRWLOCK_MASK                    7

/* Number of current users * 8 */
#define SRWLOCK_USERS                   8

typedef struct srwlock srwlock;
struct srwlock
{
    uintptr_t p;
};

typedef struct srw_sw srw_sw;
struct srw_sw
{
    uintptr_t spin;
    srw_sw *next;
};

typedef struct srw_wb srw_wb;
struct srw_wb
{
    /* s_count is the number of shared acquirers * SRWLOCK_USERS. */
    uintptr_t s_count;

    /* Last points to the last wait block in the chain. The value
       is only valid when read from the first wait block. */
    srw_wb *last;

    /* Next points to the next wait block in the chain. */
    srw_wb *next;

    /* The wake chain is only valid for shared wait blocks */
    srw_sw *wake;
    srw_sw *last_shared;

    int ex;
};

/* Wait for control of wait block */
static srw_wb *lock_wb(srwlock *l)
{
    uintptr_t p;

    /* Spin on the wait block bit lock */
    while (atomic_bitsetandtest(&l->p, SRWLOCK_LOCK_BIT)) cpu_relax();

    p = l->p;
    barrier();

    if (!(p & SRWLOCK_WAIT))
    {
        /* Oops, looks like the wait block was removed. */
        atomic_clear_bit(&l->p, SRWLOCK_LOCK_BIT);
        return NULL;
    }

    return (srw_wb *)(p & ~SRWLOCK_MASK);
}

static void srwlock_init(srwlock *l)
{
    l->p = 0;
}

static void srwlock_rdlock(srwlock *l)
{
    srw_wb swblock;
    srw_sw sw;
    uintptr_t p;
    srw_wb *wb, *shared;

    while (1)
    {
        barrier();
        p = l->p;

        cpu_relax();

        if (!p)
        {
            /* This is a fast path, we can simply try to set the shared count to 1 */
            if (!cmpxchg(&l->p, 0, SRWLOCK_USERS | SRWLOCK_SHARED)) return;

            continue;
        }

        /* Don't interfere with locking */
        if (p & SRWLOCK_LOCK) continue;

        if (p & SRWLOCK_SHARED)
        {
            if (!(p & SRWLOCK_WAIT))
            {
                /* This is a fast path, just increment the number of current shared locks */
                if (cmpxchg(&l->p, p, p + SRWLOCK_USERS) == p) return;
            }
            else
            {
                /* There's other waiters already, lock the wait blocks and increment the shared count */
                wb = lock_wb(l);
                if (wb) break;
            }

            continue;
        }

        /* Initialize wait block */
        swblock.ex = FALSE;
        swblock.next = NULL;
        swblock.last = &swblock;
        swblock.wake = &sw;

        sw.next = NULL;
        sw.spin = 0;

        if (!(p & SRWLOCK_WAIT))
        {
            /*
             * We need to setup the first wait block.
             * Currently an exclusive lock is held, change the lock to contended mode.
             */
            swblock.s_count = SRWLOCK_USERS;
            swblock.last_shared = &sw;

            if (cmpxchg(&l->p, p, (uintptr_t)&swblock | SRWLOCK_WAIT) == p)
            {
                while (!sw.spin) cpu_relax();
                return;
            }

            continue;
        }

        /* Handle the contended but not shared case */

        /*
         * There's other waiters already, lock the wait blocks and increment the shared count.
         * If the last block in the chain is an exclusive lock, add another block.
         */
        swblock.s_count = 0;

        wb = lock_wb(l);
        if (!wb) continue;

        shared = wb->last;
        if (shared->ex)
        {
            shared->next = &swblock;
            wb->last = &swblock;

            shared = &swblock;
        }
        else
        {
            shared->last_shared->next = &sw;
        }

        shared->s_count += SRWLOCK_USERS;
        shared->last_shared = &sw;

        /* Unlock */
        barrier();
        l->p &= ~SRWLOCK_LOCK;

        /* Wait to be woken */
        while (!sw.spin) cpu_relax();

        return;
    }

    /* The contended and shared case */
    sw.next = NULL;
    sw.spin = 0;

    if (wb->ex)
    {
        /*
         * We need to setup a new wait block.
         * Although we're currently in a shared lock and we're acquiring
         * a shared lock, there are exclusive locks queued in between.
         * We need to wait until those are released.
         */
        shared = wb->last;

        if (shared->ex)
        {
            swblock.ex = FALSE;
            swblock.s_count = SRWLOCK_USERS;
            swblock.next = NULL;
            swblock.last = &swblock;
            swblock.wake = &sw;
            swblock.last_shared = &sw;

            shared->next = &swblock;
            wb->last = &swblock;
        }
        else
        {
            shared->last_shared->next = &sw;
            shared->s_count += SRWLOCK_USERS;
            shared->last_shared = &sw;
        }
    }
    else
    {
        wb->last_shared->next = &sw;
        wb->s_count += SRWLOCK_USERS;
        wb->last_shared = &sw;
    }

    /* Unlock */
    barrier();
    l->p &= ~SRWLOCK_LOCK;

    /* Wait to be woken */
    while (!sw.spin) cpu_relax();
}


static void srwlock_rdunlock(srwlock *l)
{
    uintptr_t p, np;
    srw_wb *wb;
    srw_wb *next;

    while (1)
    {
        barrier();
        p = l->p;

        cpu_relax();

        if (p & SRWLOCK_WAIT)
        {
            /*
             * There's a wait block, we need to wake a pending exclusive acquirer,
             * if this is the last shared release.
             */
            wb = lock_wb(l);
            if (wb) break;

            continue;
        }

        /* Don't interfere with locking */
        if (p & SRWLOCK_LOCK) continue;

        /*
         * This is a fast path, we can simply decrement the shared
         * count and store the pointer
         */
        np = p - SRWLOCK_USERS;

        /* If we are the last reader, then the lock is unused */
        if (np == SRWLOCK_SHARED) np = 0;

        /* Try to release the lock */
        if (cmpxchg(&l->p, p, np) == p) return;
    }

    wb->s_count -= SRWLOCK_USERS;

    if (wb->s_count)
    {
        /* Unlock */
        barrier();
        l->p &= ~SRWLOCK_LOCK;
        return;
    }

    next = wb->next;
    if (next)
    {
        /*
         * There's more blocks chained, we need to update the pointers
         * in the next wait block and update the wait block pointer.
         */
        np = (uintptr_t)next | SRWLOCK_WAIT;

        next->last = wb->last;
    }
    else
    {
        /* Convert the lock to a simple exclusive lock. */
        np = SRWLOCK_USERS;
    }

    barrier();
    /* This also unlocks wb lock bit */
    l->p = np;
    barrier();
    wb->wake = (void *) 1;
    barrier();

    /* We released the lock */
}

static int srwlock_rdtrylock(srwlock *s)
{
    uintptr_t p = s->p;

    barrier();

    /* This is a fast path, we can simply try to set the shared count to 1 */
    if (!p && (cmpxchg(&s->p, 0, SRWLOCK_USERS | SRWLOCK_SHARED) == 0)) return 0;

    if ((p & (SRWLOCK_SHARED | SRWLOCK_WAIT)) == SRWLOCK_SHARED)
    {
        /* This is a fast path, just increment the number of current shared locks */
        if (cmpxchg(&s->p, p, p + SRWLOCK_USERS) == p) return 0;
    }

    return EBUSY;
}


static void srwlock_wrlock(srwlock *l)
{
    srw_wb swblock;
    uintptr_t p, np;

    /* Fastpath - no other readers or writers */
    if (!l->p && (!cmpxchg(&l->p, 0, SRWLOCK_USERS))) return;

    /* Initialize wait block */
    swblock.ex = TRUE;
    swblock.next = NULL;
    swblock.last = &swblock;
    swblock.wake = NULL;

    while (1)
    {
        barrier();
        p = l->p;
        cpu_relax();

        if (p & SRWLOCK_WAIT)
        {
            srw_wb *wb = lock_wb(l);
            if (!wb) continue;

            /* Complete Initialization of block */
            swblock.s_count = 0;

            wb->last->next = &swblock;
            wb->last = &swblock;

            /* Unlock */
            barrier();
            l->p &= ~SRWLOCK_LOCK;

            /* Has our wait block became the first one in the chain? */
            while (!swblock.wake) cpu_relax();

            return;
        }

        /* Fastpath - no other readers or writers */
        if (!p)
        {
            if (!cmpxchg(&l->p, 0, SRWLOCK_USERS)) return;
            continue;
        }

        /* Don't interfere with locking */
        if (p & SRWLOCK_LOCK) continue;

        /* There are no wait blocks so far, we need to add ourselves as the first wait block. */
        if (p & SRWLOCK_SHARED)
        {
            swblock.s_count = p & ~SRWLOCK_MASK;
            np = (uintptr_t)&swblock | SRWLOCK_SHARED | SRWLOCK_WAIT;
        }
        else
        {
            swblock.s_count = 0;
            np = (uintptr_t)&swblock | SRWLOCK_WAIT;
        }

        /* Try to make change */
        if (cmpxchg(&l->p, p, np) == p) break;
    }

    /* Has our wait block became the first one in the chain? */
    while (!swblock.wake) cpu_relax();
}


static void srwlock_wrunlock(srwlock *l)
{
    uintptr_t p, np;
    srw_wb *wb;
    srw_wb *next;
    srw_sw *wake, *wake_next;

    while (1)
    {
        barrier();
        p = l->p;
        cpu_relax();

        if (p == SRWLOCK_USERS)
        {
            /*
             * This is the fast path, we can simply clear the SRWLOCK_USERS bit.
             * All other bits should be 0 now because this is a simple exclusive lock,
             * and no one else is waiting.
             */

            if (cmpxchg(&l->p, SRWLOCK_USERS, 0) == SRWLOCK_USERS) return;

            continue;
        }

        /* There's a wait block, we need to wake the next pending acquirer */
        wb = lock_wb(l);
        if (wb) break;
    }

    next = wb->next;
    if (next)
    {
        /*
         * There's more blocks chained, we need to update the pointers
         * in the next wait block and update the wait block pointer.
         */
        np = (uintptr_t)next | SRWLOCK_WAIT;
        if (!wb->ex)
        {
            /* Save the shared count */
            next->s_count = wb->s_count;

            np |= SRWLOCK_SHARED;
        }

        next->last = wb->last;
    }
    else
    {
        /* Convert the lock to a simple lock. */
        if (wb->ex)
        {
            np = SRWLOCK_USERS;
        }
        else
        {
            np = wb->s_count | SRWLOCK_SHARED;
        }
    }

    barrier();
    /* Also unlocks lock bit */
    l->p = np;
    barrier();

    if (wb->ex)
    {
        barrier();
        /* Notify the next waiter */
        wb->wake = (void *) 1;
        barrier();
        return;
    }

    /* We now need to wake all others required. */
    for (wake = wb->wake; wake; wake = wake_next)
    {
        barrier();
        wake_next = wake->next;
        barrier();
        wake->spin = 1;
        barrier();
    }
}

static int srwlock_wrtrylock(srwlock *s)
{
    /* No other readers or writers? */
    if (!s->p && (cmpxchg(&s->p, 0, SRWLOCK_USERS) == 0)) return 0;

    return EBUSY;
}

這並不是在 Reactos 中的真正實現,對其進行了一些簡化和清理。一個位標誌被移除了,那麼它的表現如何呢,在非競爭場景,其和基於 ticket 的讀寫鎖差不多。在 4 個執行緒競爭的場景表現為:

Writers per 256 1 25 128 250
Time(s) 2.2 3.2 5.7 6.4

這表現的很差,在競爭場景比 dumb 演算法更慢。其獲得的效能改進並不值得這樣複雜的實現。

另外一個可能是用一些位結合讀者數目來描述寫者的狀態。一個類似的技巧在 Linux 核心中被使用來實現其讀寫鎖。是的寫者處於一個飢餓的狀態,我們有了以下實現:

#define RW_WAIT_BIT     0
#define RW_WRITE_BIT    1
#define RW_READ_BIT     2

#define RW_WAIT     1
#define RW_WRITE    2
#define RW_READ     4

typedef unsigned rwlock;

static void wrlock(rwlock *l)
{
    while (1)
    {
        unsigned state = *l;

        /* 沒有讀者和寫者 */
        if (state < RW_WRITE)
        {
            /* 設定狀態,加寫鎖 */
            if (cmpxchg(l, state, RW_WRITE) == state) return;

            /* 有人併發的加了鎖 */
            state = *l;
        }

        /* 設定有寫者在等待 */
        if (!(state & RW_WAIT)) atomic_set_bit(l, RW_WAIT_BIT);

        /* 等待鎖被釋放 */
        while (*l > RW_WAIT) cpu_relax();
    }
}

static void wrunlock(rwlock *l)
{   /* 釋放寫鎖 */
    atomic_add(l, -RW_WRITE);
}

static int wrtrylock(rwlock *l)
{
    unsigned state = *l;

    if ((state < RW_WRITE) && (cmpxchg(l, state, state + RW_WRITE) == state)) return 0;

    return EBUSY;
}

static void rdlock(rwlock *l)
{
    while (1)
    {
        /* 是否有寫鎖或寫者在等待 */
        while (*l & (RW_WAIT | RW_WRITE)) cpu_relax();

        /* 獲取讀鎖 */
        if (!(atomic_xadd(l, RW_READ) & (RW_WAIT | RW_WRITE))) return;

        /* 獲取讀鎖失敗,解讀鎖 */
        atomic_add(l, -RW_READ);
    }
}

static void rdunlock(rwlock *l)
{
    atomic_add(l, -RW_READ);
}

static int rdtrylock(rwlock *l)
{
    /* Try to get read lock */
    unsigned state = atomic_xadd(l, RW_READ);

    if (!(state & (RW_WAIT | RW_WRITE))) return 0;

    /* Undo */
    atomic_add(l, -RW_READ);

    return EBUSY;
}

/* Get a read lock, even if a writer is waiting */
static int rdforcelock(rwlock *l)
{
    /* Try to get read lock */
    unsigned state = atomic_xadd(l, RW_READ);

    /* 即使有寫者在等待也可以強制加讀鎖 */
    if (!(state & RW_WRITE)) return 0;

    /* Undo */
    atomic_add(l, -RW_READ);

    return EBUSY;
}

/* Try to upgrade from a read to a write lock atomically */
static int rdtryupgradelock(rwlock *l)
{
    /* Someone else is trying (and will succeed) to upgrade to a write lock? */
    if (atomic_bitsetandtest(l, RW_WRITE_BIT)) return EBUSY;

    /* Don't count myself any more */
    atomic_add(l, -RW_READ);

    /* Wait until there are no more readers */
    while (*l > (RW_WAIT | RW_WRITE)) cpu_relax();

    return 0;
}

該鎖實現,和使用 ticket 鎖作為 spinlock 鎖的 dumb 鎖演算法表現差不多。

Writers per 256 1 25 128 250
Time(s) 2.0 3.4 3.9 4.6

在 Linux 核心中實現的版本是用匯編語言寫的,或許會快一些。它使用了一個事實是原子增操作可以用來設定零標誌。也就意味著很慢的 add-and-test 方法是不需要的,可以用一個快的兩條指令替代。
使用半移動的 C 程式碼,我們能做的更好。存在一種為讀寫鎖設計的 ticket 鎖。RedHat 的 David Howells 在 2002 年給 Linux 核心提供一個實現。其大幅優化了 IBM 的 Joseph Seigh 在90年代初提出的版本。一種類似的演算法被 Mellor-Crummey 和 Michael Scott 在他們里程碑式的論文 “Scalable Read-Writer Synchronization for Shared-Memory Multiprocessors”。將其轉化為 C 程式碼實現如下:

typedef union rwticket rwticket;

union rwticket
{
    unsigned u;
    unsigned short us;
    __extension__ struct
    {
        unsigned char write;
        unsigned char read;
        unsigned char users;
    } s;
};

static void rwticket_wrlock(rwticket *l)
{
    unsigned me = atomic_xadd(&l->u, (1<<16));
    unsigned char val = me >> 16;

    while (val != l->s.write) cpu_relax();   /* ticket 鎖在val為特定值時加鎖成功 */
}

static void rwticket_wrunlock(rwticket *l)
{
    rwticket t = *l;

    barrier();

    t.s.write++;
    t.s.read++;

    *(unsigned short *) l = t.us;
}

static int rwticket_wrtrylock(rwticket *l)
{
    unsigned me = l->s.users;
    unsigned char menew = me + 1;
    unsigned read = l->s.read << 8;
    unsigned cmp = (me << 16) + read + me;
    unsigned cmpnew = (menew << 16) + read + me;

    if (cmpxchg(&l->u, cmp, cmpnew) == cmp) return 0;

    return EBUSY;
}

static void rwticket_rdlock(rwticket *l)
{
    unsigned me = atomic_xadd(&l->u, (1<<16));
    unsigned char val = me >> 16;

    while (val != l->s.read) cpu_relax();
    l->s.read++;
}

static void rwticket_rdunlock(rwticket *l)
{
    atomic_inc(&l->s.write);
}

static int rwticket_rdtrylock(rwticket *l)
{
    unsigned me = l->s.users;
    unsigned write = l->s.write;
    unsigned char menew = me + 1;
    unsigned cmp = (me << 16) + (me << 8) + write;
    unsigned cmpnew = ((unsigned) menew << 16) + (menew << 8) + write;

    if (cmpxchg(&l->u, cmp, cmpnew) == cmp) return 0;

    return EBUSY;
}

以上讀寫鎖表現的很好,在低寫請求比例時比 dumb spinlock 讀寫鎖一樣快,在高寫請求比例時和 dumb ticketlock 讀寫鎖也幾乎一樣快。當沒有競爭時其也沒有效能的下降,在所有的例子花費了 3.7s。

Writers per 256 1 25 128 250
Time(s) 1.1 1.8 3.9 4.7

在主要為讀請求的場景,該演算法比簡單的 spinlock 鎖快 5 倍,它的唯一缺點在於不能自動的將讀鎖升級為寫鎖(可以做,但是 rwticket_wrunlock 需要使用一個原子操作,會導致其變慢一些)。這個缺點也正是Linux核心沒有采用該演算法的原因。另一部分原因在於,如果你擁有一個讀鎖,那麼遞迴的獲取讀鎖總是會成功。然而,如果這個需求不需要的話,那麼這個演算法是一個很好的選擇。

最後需要注意的是該讀寫 ticket 鎖演算法並不是最優的,當讀者和寫者交替的再等待隊列出現時,寫者(執行),讀者1,寫者,讀者2。這兩個讀執行緒可能會被弄混,導致它們能夠併發的執行。比如,第二個讀者可能並不需要等待第二個寫者完成。幸運的是,當執行緒數少時很少出現該場景。對於4個執行緒,當讀者和寫著一樣多時其發生概率為 1/16,其餘場景則會更小。不幸的是,當執行緒數增加時,其速度下降的速率比起最優順序的執行將會是兩倍。

修復這個問題一個明顯需要做的事是,檢測讀者需要在等待連結串列中排隊。然而,因為在4個執行緒併發的場景效果是如此不顯著。我們很難以一個很低的代價來做該檢查。因此當多核機器變得很常見時,該使用哪種演算法將會是一個值得考慮的問題。

相關推薦

Read-Write實現

大部分情況下,使用一個數據結構時並不會對其進行修改。而是隻需要一個區段的讀取許可權來完成工作。如果有多個執行緒需要讀取某一個數據,沒有理由不讓它們併發的進行讀取。Spinlock 鎖無法區分只有讀以及讀寫混合的場景,因為 spinlock 鎖無法滿足這種潛在

如何將NTFS格式的行動硬碟掛接到Mac OS上進行Read/Write操作

現在硬碟便宜,很多同學都有行動硬碟,如果你同時使用Windows與Mac OS的話,行動硬碟最好不要使用NTFS檔案系統,否則在Mac OS上,你只能讀你的行動硬碟,不能寫。 但是實際上的情況是,行動硬碟上有很多東西了,且最初是格式化為了NTFS格式,這時候重新格式化是很麻煩的,要做資料移動。

Java實現CSV格式檔案的操作API

首先引入maven <dependency> <groupId>net.sourceforge.javacsv</groupId> <

windows下測試磁盤HD Tune

hd tune hd tune測試磁盤讀寫速度 3個SATA磁盤組成RAID 5,4個SAS硬盤組成另外一個RAID 5。測試結果如下:用HD Tune測試讀速度:用HD Tune測試寫速度(需要刪除分區再測試):windows下測試磁盤讀寫(HD Tune)

IO復制視頻練習

IOpackage com.chen.io1; import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.FileInputStream;import java.io.FileNotFoundEx

Hadoop_08_客戶端向HDFS上傳數據流程

pack 查詢 文件路徑 hdfs 校驗 blocks 管理 con 讀取數據 1.HDFS的工作機制: HDFS集群分為兩大角色:NameNode、DataNode (Secondary Namenode) NameNode負責管理整個文件系統的元數據 DataNode

C 檔案二進位制檔案

  我們將介紹 C 程式設計師如何建立、開啟、關閉文字檔案或二進位制檔案。 一個檔案,無論它是文字檔案還是二進位制檔案,都是代表了一系列的位元組。C 語言不僅提供了訪問頂層的函式,也提供了底層(OS)呼叫來處理儲存裝置上的檔案。本章將講解檔案管理的重要呼叫。 開啟檔案 您可

C\C++對大檔案的快速記憶體對映

1、 建立檔案(CreateFile),如下: HANDLE CreateFile( LPCTSTR lpFileName, DWORD dwDesiredAccess, DWORD dwShareMode, LPSECURITY_ATTRIBU

linux下使用C++程式操作檔案的複製/拷貝

注意:在linux下使用eclipse+CDT執行下面的程式時,請在root狀態下!!!在終端中,進入root狀態,然後在root狀態下,開啟eclipse,再執行下面的程式。(因為涉及到檔案讀寫許可權,普通狀態可能無法完成操作,root狀態下會更好一些。)一.關於檔案操作的

LINUX C語言檔案的非二進位制

#include<stdio.h> #include<stdlib.h> #include<unistd.h> #include<fcntl.h> #include<string.h> int main() { i

四種檔案的方式:系統呼叫open/read/write,C語言fopen,fgets, fputs,C++(ifstream, ofstream getline,)泛型演算法

第一種方法是系統呼叫 (1)open系統呼叫    原型:        #include <sys/types.h>        #include <sys/stat.h>        #include <fcntl.h>     

bug解決-核心C庫防FORTIFY: write: prevented read past end of buffer

備註:展訊平臺 1、問題描述   昨天同事問我一個問題,報的是一個native crash問題,問題log如下所示: 01-05 00:01:12.600 2794 6237 F libc : Fatal signal 6 (SIGABRT), code -6 in

Golang 入門系列十六的使用場景主要涉及到哪些?為什麼會比普通

前面已經講過很多Golang系列知識,感興趣的可以看看以前的文章,https://www.cnblogs.com/zhangweizhong/category/1275863.html, 接下來要說的是golang的鎖的使用場景主要涉及到哪些?讀寫鎖為什麼會比普通鎖快。   一、什麼場景下需要用到鎖

大數據【二】HDFS部署及文件包含eclipse hadoop配置

throw 大數據 我的電腦 ssh 生效 manager 方法 slave .sh 一  原理闡述 1‘  DFS     分布式文件系統(即DFS,Distributed File System),指文件系統管理的物理存儲資源不一定直接連接在本地節點上,而是通過計算機網

[轉]C#對文本文件的

eve nextline reader 操作系統 為我 reat ini toe http 原網頁:http://www.cnblogs.com/infly123/archive/2013/05/18/3085872.html 計算機在最初只支持ASCII編碼,但是後來為了

C#文件txt 簡單方式

換行 line string txt 不換行 返回 true text empty 1.文件寫入 // 路徑,寫入內容 System.IO.File.WriteAllText(@".\File.txt", string.Empty); 可更換相應的方法 2.文件讀入 /

windows下測試磁盤Iometer

測試磁盤讀寫速度 iometer測試磁盤讀寫速度 參考鏈接:http://hll142475.blog.163.com/blog/static/62138201151113835216/http://blog.csdn.net/yuesichiu/article/details/8499787http

Light libraries是一組通用的C基礎庫,目標是為減少重復造輪子而全部用POSIX C實現

six clas 原子操作 roi 實現 class 動態庫 readme tps Light libraries是一組通用的C基礎庫,目標是為減少重復造輪子而寫實現了日誌、原子操作、哈希字典、紅黑樹、動態庫加載、線程、鎖操作、配置文件、os適配層、事件驅動、工作隊列、RP

python之文件的2

import 文件讀寫 哈哈 進入 imp std 技術 都是 繼續 小R昨天因為在研究weblogic的漏洞就沒來得及學習python(好吧,這都是借口,懶了,大家可不能像我這樣。要堅持每天都學)。 這個進度是有點慢呀。哎呀,沒事沒事,我還年輕,才20歲。 哈哈,玩

基於zk的分布幸運28源碼下載式leader選舉實現

rest logger 接口 ets abstract 問題 .get single created 做了幸運28源碼下載論壇:haozbbs.com Q1446595067 兩版實現,先是直接用zk的接口做的,後來又用curator做了個。主要是用來在集群環境中確定一個主